1use std::collections::{BTreeMap, HashMap};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use rand::{distributions::Alphanumeric, Rng};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use tokio::sync::mpsc;
12
13use super::{tool_choice_to_string, Client, ClientError, Result};
14use crate::formatter::multimodal::{build_multimodal_layout, build_multimodal_messages};
15use crate::ipc::client::{ResponseDelta, ResponseStateEvent};
16use crate::ipc::serialization::PromptPayload;
17
18const RESPONSE_ID_PREFIX: &str = "resp_";
19const MESSAGE_ID_PREFIX: &str = "msg_";
20const FUNCTION_CALL_ID_PREFIX: &str = "fc_";
21const TOOL_CALL_ID_PREFIX: &str = "call_";
22
23fn current_timestamp() -> i64 {
24 SystemTime::now()
25 .duration_since(UNIX_EPOCH)
26 .map(|d| d.as_secs() as i64)
27 .unwrap_or(0)
28}
29
30fn generate_id(prefix: &str) -> String {
31 let random: String = rand::thread_rng()
32 .sample_iter(&Alphanumeric)
33 .take(22)
34 .map(char::from)
35 .collect();
36 format!("{}{}", prefix, random)
37}
38
39fn generate_response_id() -> String {
40 generate_id(RESPONSE_ID_PREFIX)
41}
42
43fn generate_message_id() -> String {
44 generate_id(MESSAGE_ID_PREFIX)
45}
46
47fn generate_function_call_id() -> String {
48 generate_id(FUNCTION_CALL_ID_PREFIX)
49}
50
51fn generate_tool_call_id() -> String {
52 generate_id(TOOL_CALL_ID_PREFIX)
53}
54
55fn finish_reason_to_incomplete(reason: Option<&str>) -> Option<IncompleteDetails> {
56 let normalized = reason.unwrap_or_default().to_lowercase();
57 if matches!(
58 normalized.as_str(),
59 "length" | "max_tokens" | "max_output_tokens"
60 ) {
61 Some(IncompleteDetails {
62 reason: "max_output_tokens".to_string(),
63 })
64 } else if normalized == "content_filter" {
65 Some(IncompleteDetails {
66 reason: "content_filter".to_string(),
67 })
68 } else {
69 None
70 }
71}
72
73fn value_to_string(value: &Value) -> String {
74 match value {
75 Value::String(text) => text.clone(),
76 _ => value.to_string(),
77 }
78}
79
80#[derive(Debug, Deserialize)]
81struct CompletedToolCallValue {
82 name: String,
83 #[serde(default = "default_tool_call_arguments")]
84 arguments: Value,
85}
86
87fn default_tool_call_arguments() -> Value {
88 Value::Object(Default::default())
89}
90
91fn parse_tool_call_completion_value(value: &Value) -> Option<(String, String)> {
92 let structured_value = match value {
93 Value::String(text) => serde_json::from_str::<Value>(text).ok()?,
94 Value::Object(_) => value.clone(),
95 _ => return None,
96 };
97
98 let tool_call: CompletedToolCallValue = serde_json::from_value(structured_value).ok()?;
99 Some((tool_call.name, tool_call.arguments.to_string()))
100}
101
102fn normalize_response_tool_schema(tool: &Value) -> Value {
103 let Some(obj) = tool.as_object() else {
104 return tool.clone();
105 };
106
107 let type_name = obj.get("type").and_then(Value::as_str);
108 let name = obj.get("name").and_then(Value::as_str);
109 let parameters = obj.get("parameters");
110
111 if type_name != Some("function") || name.is_none() || parameters.is_none() {
112 return tool.clone();
113 }
114
115 let name = name.unwrap_or_default();
116 let description = obj
117 .get("description")
118 .and_then(Value::as_str)
119 .unwrap_or(name);
120 let strict = obj.get("strict").and_then(Value::as_bool).unwrap_or(true);
121 let parameters = parameters
122 .cloned()
123 .unwrap_or(Value::Object(Default::default()));
124
125 serde_json::json!({
126 "name": name,
127 "type": "object",
128 "description": description,
129 "properties": {
130 "name": {"const": name},
131 "arguments": parameters,
132 },
133 "strict": strict,
134 "required": ["name", "arguments"],
135 })
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(untagged)]
140pub enum ResponsesInput {
141 Text(String),
142 Items(Vec<ResponseInputItem>),
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
146#[serde(tag = "type", rename_all = "snake_case")]
147pub enum ResponseInputItem {
148 Message {
149 role: String,
150 content: Value,
151 #[serde(default, skip_serializing_if = "Option::is_none")]
152 tool_calls: Option<Vec<Value>>,
153 #[serde(default, skip_serializing_if = "Option::is_none")]
154 tool_call_id: Option<String>,
155 },
156 FunctionCall {
157 call_id: String,
158 name: String,
159 arguments: String,
160 },
161 FunctionCallOutput {
162 call_id: String,
163 output: String,
164 },
165 Reasoning {
166 #[serde(default, skip_serializing_if = "Option::is_none")]
167 summary: Option<Vec<Value>>,
168 #[serde(default, skip_serializing_if = "Option::is_none")]
169 encrypted_content: Option<String>,
170 },
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct ResponsesRequest {
175 pub input: ResponsesInput,
176 #[serde(default)]
177 pub stream: bool,
178 #[serde(default)]
179 pub instructions: Option<String>,
180 #[serde(default)]
181 pub temperature: Option<f64>,
182 #[serde(default)]
183 pub top_p: Option<f64>,
184 #[serde(default)]
185 pub top_k: Option<i32>,
186 #[serde(default)]
187 pub min_p: Option<f64>,
188 #[serde(default)]
189 pub frequency_penalty: Option<f64>,
190 #[serde(default)]
191 pub presence_penalty: Option<f64>,
192 #[serde(default)]
193 pub max_output_tokens: Option<i32>,
194 #[serde(default)]
195 pub top_logprobs: Option<i32>,
196 #[serde(default)]
197 pub tools: Vec<Value>,
198 #[serde(default)]
199 pub tool_choice: Option<Value>,
200 #[serde(default)]
201 pub max_tool_calls: Option<i32>,
202 #[serde(default)]
203 pub text: Option<Value>,
204 #[serde(default)]
205 pub reasoning_effort: Option<String>,
206 #[serde(default)]
207 pub metadata: Option<HashMap<String, String>>,
208 #[serde(default)]
209 pub parallel_tool_calls: bool,
210}
211
212impl ResponsesRequest {
213 pub fn from_text(text: impl Into<String>) -> Self {
214 Self {
215 input: ResponsesInput::Text(text.into()),
216 stream: false,
217 instructions: None,
218 temperature: None,
219 top_p: None,
220 top_k: None,
221 min_p: None,
222 frequency_penalty: None,
223 presence_penalty: None,
224 max_output_tokens: None,
225 top_logprobs: None,
226 tools: Vec::new(),
227 tool_choice: None,
228 max_tool_calls: None,
229 text: None,
230 reasoning_effort: None,
231 metadata: None,
232 parallel_tool_calls: false,
233 }
234 }
235
236 fn to_messages(&self) -> Vec<HashMap<String, Value>> {
237 match &self.input {
238 ResponsesInput::Text(text) => {
239 let mut message = HashMap::new();
240 message.insert("role".to_string(), Value::String("user".to_string()));
241 message.insert("content".to_string(), Value::String(text.clone()));
242 vec![message]
243 }
244 ResponsesInput::Items(items) => {
245 let mut messages = Vec::new();
246 for item in items {
247 match item {
248 ResponseInputItem::Message {
249 role,
250 content,
251 tool_calls,
252 tool_call_id,
253 } => {
254 let mut message = HashMap::new();
255 message.insert("role".to_string(), Value::String(role.clone()));
256 message.insert("content".to_string(), content.clone());
257 if let Some(calls) = tool_calls {
258 message
259 .insert("tool_calls".to_string(), Value::Array(calls.clone()));
260 }
261 if let Some(call_id) = tool_call_id {
262 message.insert(
263 "tool_call_id".to_string(),
264 Value::String(call_id.clone()),
265 );
266 }
267 messages.push(message);
268 }
269 ResponseInputItem::FunctionCall {
270 call_id,
271 name,
272 arguments,
273 } => {
274 let mut message = HashMap::new();
275 message
276 .insert("role".to_string(), Value::String("assistant".to_string()));
277 message.insert("content".to_string(), Value::String(String::new()));
278 message.insert(
279 "tool_calls".to_string(),
280 Value::Array(vec![serde_json::json!({
281 "id": call_id,
282 "type": "function",
283 "function": {
284 "name": name,
285 "arguments": arguments,
286 }
287 })]),
288 );
289 messages.push(message);
290 }
291 ResponseInputItem::FunctionCallOutput { call_id, output } => {
292 let mut message = HashMap::new();
293 message.insert("role".to_string(), Value::String("tool".to_string()));
294 message.insert("content".to_string(), Value::String(output.clone()));
295 message
296 .insert("tool_call_id".to_string(), Value::String(call_id.clone()));
297 messages.push(message);
298 }
299 ResponseInputItem::Reasoning { .. } => {
300 }
302 }
303 }
304 messages
305 }
306 }
307 }
308}
309
310#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
311#[serde(rename_all = "snake_case")]
312#[derive(Default)]
313pub enum OutputStatus {
314 #[default]
315 Completed,
316 Incomplete,
317 InProgress,
318 Failed,
319}
320
321#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
322pub struct OutputTextContent {
323 #[serde(rename = "type")]
324 pub content_type: String,
325 pub text: String,
326 #[serde(default)]
327 pub annotations: Vec<Value>,
328 #[serde(default)]
329 pub logprobs: Vec<Value>,
330}
331
332impl OutputTextContent {
333 pub fn new(text: impl Into<String>) -> Self {
334 Self {
335 content_type: "output_text".to_string(),
336 text: text.into(),
337 annotations: Vec::new(),
338 logprobs: Vec::new(),
339 }
340 }
341}
342
343#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
344pub struct ReasoningContent {
345 #[serde(rename = "type")]
346 pub content_type: String,
347 pub text: String,
348}
349
350impl ReasoningContent {
351 pub fn new(text: impl Into<String>) -> Self {
352 Self {
353 content_type: "reasoning_text".to_string(),
354 text: text.into(),
355 }
356 }
357}
358
359#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
360pub struct ReasoningSummaryTextContent {
361 #[serde(rename = "type")]
362 pub content_type: String,
363 pub text: String,
364}
365
366#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
367pub struct OutputMessage {
368 #[serde(rename = "type")]
369 pub output_type: String,
370 pub id: String,
371 pub status: OutputStatus,
372 pub role: String,
373 pub content: Vec<OutputTextContent>,
374}
375
376#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
377pub struct OutputFunctionCall {
378 #[serde(rename = "type")]
379 pub output_type: String,
380 pub id: String,
381 pub call_id: String,
382 pub name: String,
383 pub arguments: String,
384 #[serde(default, skip_serializing_if = "Option::is_none")]
385 pub metadata: Option<Value>,
386 pub status: OutputStatus,
387}
388
389#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
390pub struct OutputReasoning {
391 #[serde(rename = "type")]
392 pub output_type: String,
393 pub id: String,
394 #[serde(default)]
395 pub status: OutputStatus,
396 #[serde(default)]
397 pub summary: Vec<ReasoningSummaryTextContent>,
398 #[serde(default)]
399 pub content: Vec<ReasoningContent>,
400 #[serde(default, skip_serializing_if = "Option::is_none")]
401 pub encrypted_content: Option<String>,
402}
403
404#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
405#[serde(untagged)]
406pub enum ResponseOutputItem {
407 Message(OutputMessage),
408 FunctionCall(OutputFunctionCall),
409 Reasoning(OutputReasoning),
410}
411
412impl ResponseOutputItem {
413 pub fn item_type(&self) -> &'static str {
414 match self {
415 Self::Message(_) => "message",
416 Self::FunctionCall(_) => "function_call",
417 Self::Reasoning(_) => "reasoning",
418 }
419 }
420}
421
422#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
423pub struct IncompleteDetails {
424 pub reason: String,
425}
426
427#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
428pub struct ResponseError {
429 pub code: String,
430 pub message: String,
431}
432
433#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
434pub struct InputTokensDetails {
435 pub cached_tokens: u32,
436}
437
438#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
439pub struct OutputTokensDetails {
440 pub reasoning_tokens: u32,
441}
442
443#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
444pub struct ResponseUsage {
445 pub input_tokens: u32,
446 pub output_tokens: u32,
447 pub total_tokens: u32,
448 #[serde(default, skip_serializing_if = "Option::is_none")]
449 pub input_tokens_details: Option<InputTokensDetails>,
450 #[serde(default, skip_serializing_if = "Option::is_none")]
451 pub output_tokens_details: Option<OutputTokensDetails>,
452}
453
454#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
455pub struct ResponseObject {
456 pub id: String,
457 pub object: String,
458 pub created_at: i64,
459 #[serde(default, skip_serializing_if = "Option::is_none")]
460 pub completed_at: Option<i64>,
461 pub status: OutputStatus,
462 #[serde(default, skip_serializing_if = "Option::is_none")]
463 pub incomplete_details: Option<IncompleteDetails>,
464 #[serde(default, skip_serializing_if = "Option::is_none")]
465 pub error: Option<ResponseError>,
466 pub model: String,
467 pub output: Vec<ResponseOutputItem>,
468 #[serde(default, skip_serializing_if = "Option::is_none")]
469 pub usage: Option<ResponseUsage>,
470 #[serde(default, skip_serializing_if = "Option::is_none")]
471 pub metadata: Option<HashMap<String, String>>,
472 pub parallel_tool_calls: bool,
473 #[serde(default, skip_serializing_if = "Option::is_none")]
474 pub temperature: Option<f64>,
475 #[serde(default, skip_serializing_if = "Option::is_none")]
476 pub top_p: Option<f64>,
477 #[serde(default, skip_serializing_if = "Option::is_none")]
478 pub presence_penalty: Option<f64>,
479 #[serde(default, skip_serializing_if = "Option::is_none")]
480 pub frequency_penalty: Option<f64>,
481 #[serde(default, skip_serializing_if = "Option::is_none")]
482 pub top_k: Option<i32>,
483 #[serde(default, skip_serializing_if = "Option::is_none")]
484 pub min_p: Option<f64>,
485 #[serde(default, skip_serializing_if = "Option::is_none")]
486 pub instructions: Option<String>,
487 #[serde(default, skip_serializing_if = "Option::is_none")]
488 pub max_output_tokens: Option<i32>,
489 #[serde(default, skip_serializing_if = "Option::is_none")]
490 pub top_logprobs: Option<i32>,
491 #[serde(default, skip_serializing_if = "Option::is_none")]
492 pub tool_choice: Option<Value>,
493 #[serde(default)]
494 pub tools: Vec<Value>,
495 #[serde(default, skip_serializing_if = "Option::is_none")]
496 pub max_tool_calls: Option<i32>,
497 #[serde(default, skip_serializing_if = "Option::is_none")]
498 pub text: Option<Value>,
499}
500
501#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
502pub struct ResponseSnapshot {
503 pub id: String,
504 pub object: String,
505 pub created_at: i64,
506 #[serde(default, skip_serializing_if = "Option::is_none")]
507 pub completed_at: Option<i64>,
508 pub status: OutputStatus,
509 #[serde(default, skip_serializing_if = "Option::is_none")]
510 pub incomplete_details: Option<IncompleteDetails>,
511 pub model: String,
512 pub output: Vec<ResponseOutputItem>,
513 #[serde(default, skip_serializing_if = "Option::is_none")]
514 pub usage: Option<ResponseUsage>,
515}
516
517#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub enum ResponseContentPart {
519 OutputText(OutputTextContent),
520 Reasoning(ReasoningContent),
521}
522
523#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
524pub struct ResponseCreatedEvent {
525 pub sequence_number: u64,
526 pub response: ResponseSnapshot,
527}
528
529#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
530pub struct ResponseInProgressEvent {
531 pub sequence_number: u64,
532 pub response: ResponseSnapshot,
533}
534
535#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
536pub struct ResponseCompletedEvent {
537 pub sequence_number: u64,
538 pub response: ResponseSnapshot,
539}
540
541#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
542pub struct ResponseFailedEvent {
543 pub sequence_number: u64,
544 pub response: ResponseSnapshot,
545}
546
547#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
548pub struct ResponseIncompleteEvent {
549 pub sequence_number: u64,
550 pub response: ResponseSnapshot,
551}
552
553#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
554pub struct OutputItemAddedEvent {
555 pub sequence_number: u64,
556 pub output_index: u32,
557 pub item: ResponseOutputItem,
558}
559
560#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
561pub struct OutputItemDoneEvent {
562 pub sequence_number: u64,
563 pub output_index: u32,
564 pub item: ResponseOutputItem,
565}
566
567#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
568pub struct ContentPartAddedEvent {
569 pub sequence_number: u64,
570 pub item_id: String,
571 pub output_index: u32,
572 pub content_index: u32,
573 pub part: ResponseContentPart,
574}
575
576#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
577pub struct ContentPartDoneEvent {
578 pub sequence_number: u64,
579 pub item_id: String,
580 pub output_index: u32,
581 pub content_index: u32,
582 pub part: ResponseContentPart,
583}
584
585#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
586pub struct OutputTextDeltaEvent {
587 pub sequence_number: u64,
588 pub item_id: String,
589 pub output_index: u32,
590 pub content_index: u32,
591 pub delta: String,
592 #[serde(default)]
593 pub logprobs: Vec<Value>,
594}
595
596#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
597pub struct OutputTextDoneEvent {
598 pub sequence_number: u64,
599 pub item_id: String,
600 pub output_index: u32,
601 pub content_index: u32,
602 pub text: String,
603 #[serde(default)]
604 pub logprobs: Vec<Value>,
605}
606
607#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
608pub struct FunctionCallArgumentsDeltaEvent {
609 pub sequence_number: u64,
610 pub item_id: String,
611 pub output_index: u32,
612 pub delta: String,
613}
614
615#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
616pub struct FunctionCallArgumentsDoneEvent {
617 pub sequence_number: u64,
618 pub item_id: String,
619 pub output_index: u32,
620 pub arguments: String,
621}
622
623#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
624pub struct ReasoningDeltaEvent {
625 pub sequence_number: u64,
626 pub item_id: String,
627 pub output_index: u32,
628 pub content_index: u32,
629 pub delta: String,
630}
631
632#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
633pub struct ReasoningDoneEvent {
634 pub sequence_number: u64,
635 pub item_id: String,
636 pub output_index: u32,
637 pub content_index: u32,
638 pub text: String,
639}
640
641#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
642pub struct ReasoningSummaryTextDeltaEvent {
643 pub sequence_number: u64,
644 pub item_id: String,
645 pub output_index: u32,
646 pub summary_index: u32,
647 pub delta: String,
648}
649
650#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
651pub struct ReasoningSummaryTextDoneEvent {
652 pub sequence_number: u64,
653 pub item_id: String,
654 pub output_index: u32,
655 pub summary_index: u32,
656 pub text: String,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
660pub struct StreamErrorDetail {
661 pub message: String,
662 #[serde(rename = "type")]
663 pub error_type: String,
664}
665
666#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
667pub struct StreamErrorEvent {
668 pub sequence_number: u64,
669 pub error: StreamErrorDetail,
670}
671
672#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
673pub enum ResponseEvent {
674 ResponseCreated(ResponseCreatedEvent),
675 ResponseInProgress(ResponseInProgressEvent),
676 ResponseCompleted(ResponseCompletedEvent),
677 ResponseFailed(ResponseFailedEvent),
678 ResponseIncomplete(ResponseIncompleteEvent),
679 OutputItemAdded(OutputItemAddedEvent),
680 OutputItemDone(OutputItemDoneEvent),
681 ContentPartAdded(ContentPartAddedEvent),
682 ContentPartDone(ContentPartDoneEvent),
683 OutputTextDelta(OutputTextDeltaEvent),
684 OutputTextDone(OutputTextDoneEvent),
685 FunctionCallArgumentsDelta(FunctionCallArgumentsDeltaEvent),
686 FunctionCallArgumentsDone(FunctionCallArgumentsDoneEvent),
687 ReasoningDelta(ReasoningDeltaEvent),
688 ReasoningDone(ReasoningDoneEvent),
689 ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaEvent),
690 ReasoningSummaryTextDone(ReasoningSummaryTextDoneEvent),
691 Error(StreamErrorEvent),
692 Done,
693}
694
695impl ResponseEvent {
696 pub fn event_type(&self) -> &'static str {
697 match self {
698 Self::ResponseCreated(_) => "response.created",
699 Self::ResponseInProgress(_) => "response.in_progress",
700 Self::ResponseCompleted(_) => "response.completed",
701 Self::ResponseFailed(_) => "response.failed",
702 Self::ResponseIncomplete(_) => "response.incomplete",
703 Self::OutputItemAdded(_) => "response.output_item.added",
704 Self::OutputItemDone(_) => "response.output_item.done",
705 Self::ContentPartAdded(_) => "response.content_part.added",
706 Self::ContentPartDone(_) => "response.content_part.done",
707 Self::OutputTextDelta(_) => "response.output_text.delta",
708 Self::OutputTextDone(_) => "response.output_text.done",
709 Self::FunctionCallArgumentsDelta(_) => "response.function_call_arguments.delta",
710 Self::FunctionCallArgumentsDone(_) => "response.function_call_arguments.done",
711 Self::ReasoningDelta(_) => "response.reasoning.delta",
712 Self::ReasoningDone(_) => "response.reasoning.done",
713 Self::ReasoningSummaryTextDelta(_) => "response.reasoning_summary_text.delta",
714 Self::ReasoningSummaryTextDone(_) => "response.reasoning_summary_text.done",
715 Self::Error(_) => "error",
716 Self::Done => "done",
717 }
718 }
719}
720
721pub enum ResponsesResult {
722 Complete(Box<ResponseObject>),
723 Stream(mpsc::Receiver<ResponseEvent>),
724}
725
726#[derive(Debug, Clone)]
727struct StreamingOutputItem {
728 item_id: String,
729 item_type: String,
730 call_id: Option<String>,
731 function_name: Option<String>,
732 accumulated_content: String,
733 accumulated_arguments: String,
734 status: OutputStatus,
735}
736
737impl StreamingOutputItem {
738 fn new(item_type: &str) -> Self {
739 let item_id = match item_type {
740 "tool_call" => generate_function_call_id(),
741 "reasoning" => generate_id("reasoning_"),
742 _ => generate_message_id(),
743 };
744
745 Self {
746 item_id,
747 item_type: item_type.to_string(),
748 call_id: None,
749 function_name: None,
750 accumulated_content: String::new(),
751 accumulated_arguments: String::new(),
752 status: OutputStatus::InProgress,
753 }
754 }
755
756 fn to_skeleton(&self) -> ResponseOutputItem {
757 match self.item_type.as_str() {
758 "tool_call" => ResponseOutputItem::FunctionCall(OutputFunctionCall {
759 output_type: "function_call".to_string(),
760 id: self.item_id.clone(),
761 call_id: self.call_id.clone().unwrap_or_else(generate_tool_call_id),
762 name: self.function_name.clone().unwrap_or_default(),
763 arguments: String::new(),
764 metadata: None,
765 status: OutputStatus::InProgress,
766 }),
767 "reasoning" => ResponseOutputItem::Reasoning(OutputReasoning {
768 output_type: "reasoning".to_string(),
769 id: self.item_id.clone(),
770 status: OutputStatus::InProgress,
771 summary: Vec::new(),
772 content: Vec::new(),
773 encrypted_content: None,
774 }),
775 _ => ResponseOutputItem::Message(OutputMessage {
776 output_type: "message".to_string(),
777 id: self.item_id.clone(),
778 status: OutputStatus::InProgress,
779 role: "assistant".to_string(),
780 content: Vec::new(),
781 }),
782 }
783 }
784
785 fn to_completed(&self) -> ResponseOutputItem {
786 match self.item_type.as_str() {
787 "tool_call" => ResponseOutputItem::FunctionCall(OutputFunctionCall {
788 output_type: "function_call".to_string(),
789 id: self.item_id.clone(),
790 call_id: self.call_id.clone().unwrap_or_else(generate_tool_call_id),
791 name: self.function_name.clone().unwrap_or_default(),
792 arguments: self.accumulated_arguments.clone(),
793 metadata: None,
794 status: OutputStatus::Completed,
795 }),
796 "reasoning" => ResponseOutputItem::Reasoning(OutputReasoning {
797 output_type: "reasoning".to_string(),
798 id: self.item_id.clone(),
799 status: OutputStatus::Completed,
800 summary: Vec::new(),
801 content: if self.accumulated_content.is_empty() {
802 Vec::new()
803 } else {
804 vec![ReasoningContent::new(self.accumulated_content.clone())]
805 },
806 encrypted_content: None,
807 }),
808 _ => ResponseOutputItem::Message(OutputMessage {
809 output_type: "message".to_string(),
810 id: self.item_id.clone(),
811 status: OutputStatus::Completed,
812 role: "assistant".to_string(),
813 content: if self.accumulated_content.is_empty() {
814 Vec::new()
815 } else {
816 vec![OutputTextContent::new(self.accumulated_content.clone())]
817 },
818 }),
819 }
820 }
821}
822
823#[derive(Debug, Clone)]
824struct ResponseStreamState {
825 response_id: String,
826 model: String,
827 created_at: i64,
828 completed_at: Option<i64>,
829 items: BTreeMap<u32, StreamingOutputItem>,
830 sequence_number: u64,
831 status: OutputStatus,
832 incomplete_details: Option<IncompleteDetails>,
833 usage: Option<ResponseUsage>,
834}
835
836impl ResponseStreamState {
837 fn new(response_id: String, model: String) -> Self {
838 Self {
839 response_id,
840 model,
841 created_at: current_timestamp(),
842 completed_at: None,
843 items: BTreeMap::new(),
844 sequence_number: 0,
845 status: OutputStatus::InProgress,
846 incomplete_details: None,
847 usage: None,
848 }
849 }
850
851 fn next_sequence_number(&mut self) -> u64 {
852 let current = self.sequence_number;
853 self.sequence_number = self.sequence_number.saturating_add(1);
854 current
855 }
856
857 fn get_or_create_item(
858 &mut self,
859 output_index: u32,
860 item_type: &str,
861 identifier: &str,
862 ) -> &mut StreamingOutputItem {
863 self.items.entry(output_index).or_insert_with(|| {
864 let mut item = StreamingOutputItem::new(item_type);
865 if item_type == "tool_call" {
866 item.call_id = Some(generate_tool_call_id());
867 if !identifier.is_empty() {
868 item.function_name =
869 Some(identifier.trim_start_matches("tool_call:").to_string());
870 }
871 }
872 item
873 })
874 }
875
876 fn snapshot(&self) -> ResponseSnapshot {
877 let output = self
878 .items
879 .values()
880 .map(|item| {
881 if item.status == OutputStatus::Completed {
882 item.to_completed()
883 } else {
884 item.to_skeleton()
885 }
886 })
887 .collect::<Vec<_>>();
888
889 ResponseSnapshot {
890 id: self.response_id.clone(),
891 object: "response".to_string(),
892 created_at: self.created_at,
893 completed_at: self.completed_at,
894 status: self.status,
895 incomplete_details: self.incomplete_details.clone(),
896 model: self.model.clone(),
897 output,
898 usage: self.usage.clone(),
899 }
900 }
901}
902
903#[derive(Debug, Clone)]
904struct AggregatedOutputItem {
905 item_type: String,
906 content: String,
907 arguments: String,
908 identifier: String,
909 function_name: String,
910}
911
912fn process_state_event_for_output(
913 event: &ResponseStateEvent,
914 output_items: &mut BTreeMap<u32, AggregatedOutputItem>,
915) {
916 let output_index = event.output_index;
917 let item_type = if event.item_type.is_empty() {
918 "message"
919 } else {
920 event.item_type.as_str()
921 };
922
923 let item = output_items
924 .entry(output_index)
925 .or_insert_with(|| AggregatedOutputItem {
926 item_type: item_type.to_string(),
927 content: String::new(),
928 arguments: String::new(),
929 identifier: event.identifier.clone(),
930 function_name: if item_type == "tool_call" {
931 event
932 .identifier
933 .trim_start_matches("tool_call:")
934 .to_string()
935 } else {
936 String::new()
937 },
938 });
939
940 if item.item_type == "tool_call" && event.identifier == "arguments" {
941 if event.event_type == "content_delta" {
942 item.arguments.push_str(&event.delta);
943 } else if event.event_type == "item_completed" {
944 if let Some(value) = &event.value {
945 item.arguments = value_to_string(value);
946 }
947 }
948 return;
949 }
950
951 if event.event_type == "content_delta" {
952 item.content.push_str(&event.delta);
953 } else if event.event_type == "item_completed" {
954 if item.item_type == "tool_call" {
955 item.identifier = event.identifier.clone();
956 if let Some(value) = &event.value {
957 if let Some((function_name, arguments)) = parse_tool_call_completion_value(value) {
958 item.function_name = function_name;
959 item.arguments = arguments;
960 }
961 }
962 if item.function_name.is_empty() {
963 item.function_name = event
964 .identifier
965 .trim_start_matches("tool_call:")
966 .to_string();
967 }
968 } else if let Some(value) = &event.value {
969 item.content = value_to_string(value);
970 }
971 }
972}
973
974fn build_output_items(
975 output_items: &BTreeMap<u32, AggregatedOutputItem>,
976) -> Vec<ResponseOutputItem> {
977 let mut output = Vec::new();
978 for item in output_items.values() {
979 match item.item_type.as_str() {
980 "tool_call" => {
981 output.push(ResponseOutputItem::FunctionCall(OutputFunctionCall {
982 output_type: "function_call".to_string(),
983 id: generate_function_call_id(),
984 call_id: generate_tool_call_id(),
985 name: item.function_name.clone(),
986 arguments: item.arguments.clone(),
987 metadata: None,
988 status: OutputStatus::Completed,
989 }));
990 }
991 "reasoning" => {
992 output.push(ResponseOutputItem::Reasoning(OutputReasoning {
993 output_type: "reasoning".to_string(),
994 id: generate_id("reasoning_"),
995 status: OutputStatus::Completed,
996 summary: Vec::new(),
997 content: if item.content.is_empty() {
998 Vec::new()
999 } else {
1000 vec![ReasoningContent::new(item.content.clone())]
1001 },
1002 encrypted_content: None,
1003 }));
1004 }
1005 _ => {
1006 output.push(ResponseOutputItem::Message(OutputMessage {
1007 output_type: "message".to_string(),
1008 id: generate_message_id(),
1009 status: OutputStatus::Completed,
1010 role: "assistant".to_string(),
1011 content: if item.content.is_empty() {
1012 Vec::new()
1013 } else {
1014 vec![OutputTextContent::new(item.content.clone())]
1015 },
1016 }));
1017 }
1018 }
1019 }
1020
1021 if output.is_empty() {
1022 output.push(ResponseOutputItem::Message(OutputMessage {
1023 output_type: "message".to_string(),
1024 id: generate_message_id(),
1025 status: OutputStatus::Completed,
1026 role: "assistant".to_string(),
1027 content: Vec::new(),
1028 }));
1029 }
1030
1031 output
1032}
1033
1034fn update_usage_from_delta(delta: &ResponseDelta, usage: &mut ResponseUsage) {
1035 if let Some(prompt_tokens) = delta.prompt_token_count {
1036 usage.input_tokens = usage.input_tokens.max(prompt_tokens);
1037 }
1038 if let Some(generation_len) = delta.generation_len {
1039 usage.output_tokens = usage.output_tokens.max(generation_len);
1040 }
1041 if let Some(cached_tokens) = delta.cached_token_count {
1042 usage.input_tokens_details = Some(InputTokensDetails { cached_tokens });
1043 }
1044 if let Some(reasoning_tokens) = delta.reasoning_tokens {
1045 usage.output_tokens_details = Some(OutputTokensDetails { reasoning_tokens });
1046 }
1047 usage.total_tokens = usage.input_tokens + usage.output_tokens;
1048}
1049
1050fn process_state_event_for_streaming(
1051 event: &ResponseStateEvent,
1052 stream_state: &mut ResponseStreamState,
1053 events: &mut Vec<ResponseEvent>,
1054) {
1055 let item_type = if event.item_type.is_empty() {
1056 "message"
1057 } else {
1058 event.item_type.as_str()
1059 };
1060
1061 let output_index = event.output_index;
1062 let identifier = event.identifier.clone();
1063
1064 if item_type == "tool_call" && identifier == "arguments" {
1065 if event.event_type == "content_delta" {
1066 let item_id = {
1067 let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1068 item.accumulated_arguments.push_str(&event.delta);
1069 item.item_id.clone()
1070 };
1071 let sequence_number = stream_state.next_sequence_number();
1072 events.push(ResponseEvent::FunctionCallArgumentsDelta(
1073 FunctionCallArgumentsDeltaEvent {
1074 sequence_number,
1075 item_id,
1076 output_index,
1077 delta: event.delta.clone(),
1078 },
1079 ));
1080 } else if event.event_type == "item_completed" {
1081 if let Some(value) = &event.value {
1082 let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1083 item.accumulated_arguments = value_to_string(value);
1084 }
1085 }
1086 return;
1087 }
1088
1089 if event.event_type == "item_started" {
1090 let (item_id, skeleton_item) = {
1091 let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1092 (item.item_id.clone(), item.to_skeleton())
1093 };
1094 let sequence_number = stream_state.next_sequence_number();
1095 events.push(ResponseEvent::OutputItemAdded(OutputItemAddedEvent {
1096 sequence_number,
1097 output_index,
1098 item: skeleton_item,
1099 }));
1100
1101 if item_type == "message" {
1102 let sequence_number = stream_state.next_sequence_number();
1103 events.push(ResponseEvent::ContentPartAdded(ContentPartAddedEvent {
1104 sequence_number,
1105 item_id,
1106 output_index,
1107 content_index: 0,
1108 part: ResponseContentPart::OutputText(OutputTextContent::new("")),
1109 }));
1110 } else if item_type == "reasoning" {
1111 let sequence_number = stream_state.next_sequence_number();
1112 events.push(ResponseEvent::ContentPartAdded(ContentPartAddedEvent {
1113 sequence_number,
1114 item_id,
1115 output_index,
1116 content_index: 0,
1117 part: ResponseContentPart::Reasoning(ReasoningContent::new("")),
1118 }));
1119 }
1120 return;
1121 }
1122
1123 if event.event_type == "content_delta" {
1124 let item_id = {
1125 let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1126 item.accumulated_content.push_str(&event.delta);
1127 item.item_id.clone()
1128 };
1129 if item_type == "message" {
1130 let sequence_number = stream_state.next_sequence_number();
1131 events.push(ResponseEvent::OutputTextDelta(OutputTextDeltaEvent {
1132 sequence_number,
1133 item_id,
1134 output_index,
1135 content_index: 0,
1136 delta: event.delta.clone(),
1137 logprobs: Vec::new(),
1138 }));
1139 } else if item_type == "reasoning" {
1140 let sequence_number = stream_state.next_sequence_number();
1141 events.push(ResponseEvent::ReasoningDelta(ReasoningDeltaEvent {
1142 sequence_number,
1143 item_id,
1144 output_index,
1145 content_index: 0,
1146 delta: event.delta.clone(),
1147 }));
1148 }
1149 return;
1150 }
1151
1152 if event.event_type == "item_completed" {
1153 let (item_id, accumulated_content, completed_item) = {
1154 let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1155 if let Some(value) = &event.value {
1156 if item_type == "tool_call" {
1157 if let Some((function_name, arguments)) =
1158 parse_tool_call_completion_value(value)
1159 {
1160 item.function_name = Some(function_name);
1161 item.accumulated_arguments = arguments;
1162 }
1163 } else {
1164 item.accumulated_content = value_to_string(value);
1165 }
1166 }
1167 item.status = OutputStatus::Completed;
1168 if item_type == "tool_call" && item.function_name.is_none() {
1169 item.function_name = Some(identifier.trim_start_matches("tool_call:").to_string());
1170 }
1171 (
1172 item.item_id.clone(),
1173 item.accumulated_content.clone(),
1174 item.to_completed(),
1175 )
1176 };
1177
1178 if item_type == "message" {
1179 let sequence_number = stream_state.next_sequence_number();
1180 events.push(ResponseEvent::OutputTextDone(OutputTextDoneEvent {
1181 sequence_number,
1182 item_id: item_id.clone(),
1183 output_index,
1184 content_index: 0,
1185 text: accumulated_content.clone(),
1186 logprobs: Vec::new(),
1187 }));
1188
1189 let sequence_number = stream_state.next_sequence_number();
1190 events.push(ResponseEvent::ContentPartDone(ContentPartDoneEvent {
1191 sequence_number,
1192 item_id: item_id.clone(),
1193 output_index,
1194 content_index: 0,
1195 part: ResponseContentPart::OutputText(OutputTextContent::new(accumulated_content)),
1196 }));
1197 } else if item_type == "reasoning" {
1198 let sequence_number = stream_state.next_sequence_number();
1199 events.push(ResponseEvent::ReasoningDone(ReasoningDoneEvent {
1200 sequence_number,
1201 item_id,
1202 output_index,
1203 content_index: 0,
1204 text: accumulated_content,
1205 }));
1206 } else if item_type == "tool_call" {
1207 let arguments = {
1208 let item = stream_state.get_or_create_item(output_index, item_type, &identifier);
1209 item.accumulated_arguments.clone()
1210 };
1211 let sequence_number = stream_state.next_sequence_number();
1212 events.push(ResponseEvent::FunctionCallArgumentsDone(
1213 FunctionCallArgumentsDoneEvent {
1214 sequence_number,
1215 item_id: item_id.clone(),
1216 output_index,
1217 arguments,
1218 },
1219 ));
1220 }
1221
1222 let sequence_number = stream_state.next_sequence_number();
1223 events.push(ResponseEvent::OutputItemDone(OutputItemDoneEvent {
1224 sequence_number,
1225 output_index,
1226 item: completed_item,
1227 }));
1228 }
1229}
1230
1231fn emit_stream_fallback_item_done(
1232 stream_state: &mut ResponseStreamState,
1233 events: &mut Vec<ResponseEvent>,
1234) {
1235 let indexes = stream_state
1236 .items
1237 .iter()
1238 .filter_map(|(index, item)| {
1239 if item.status != OutputStatus::Completed {
1240 Some(*index)
1241 } else {
1242 None
1243 }
1244 })
1245 .collect::<Vec<_>>();
1246
1247 for output_index in indexes {
1248 if let Some(item) = stream_state.items.get_mut(&output_index) {
1249 item.status = OutputStatus::Completed;
1250 let item_type = item.item_type.clone();
1251 let item_id = item.item_id.clone();
1252 let accumulated_content = item.accumulated_content.clone();
1253 let accumulated_arguments = item.accumulated_arguments.clone();
1254 let completed_item = item.to_completed();
1255
1256 if item_type == "message" {
1257 let sequence_number = stream_state.next_sequence_number();
1258 events.push(ResponseEvent::OutputTextDone(OutputTextDoneEvent {
1259 sequence_number,
1260 item_id: item_id.clone(),
1261 output_index,
1262 content_index: 0,
1263 text: accumulated_content.clone(),
1264 logprobs: Vec::new(),
1265 }));
1266 let sequence_number = stream_state.next_sequence_number();
1267 events.push(ResponseEvent::ContentPartDone(ContentPartDoneEvent {
1268 sequence_number,
1269 item_id: item_id.clone(),
1270 output_index,
1271 content_index: 0,
1272 part: ResponseContentPart::OutputText(OutputTextContent::new(
1273 accumulated_content,
1274 )),
1275 }));
1276 } else if item_type == "tool_call" {
1277 let sequence_number = stream_state.next_sequence_number();
1278 events.push(ResponseEvent::FunctionCallArgumentsDone(
1279 FunctionCallArgumentsDoneEvent {
1280 sequence_number,
1281 item_id: item_id.clone(),
1282 output_index,
1283 arguments: accumulated_arguments,
1284 },
1285 ));
1286 } else if item_type == "reasoning" {
1287 let sequence_number = stream_state.next_sequence_number();
1288 events.push(ResponseEvent::ReasoningDone(ReasoningDoneEvent {
1289 sequence_number,
1290 item_id: item_id.clone(),
1291 output_index,
1292 content_index: 0,
1293 text: accumulated_content,
1294 }));
1295 }
1296
1297 let sequence_number = stream_state.next_sequence_number();
1298 events.push(ResponseEvent::OutputItemDone(OutputItemDoneEvent {
1299 sequence_number,
1300 output_index,
1301 item: completed_item,
1302 }));
1303 }
1304 }
1305}
1306
1307async fn stream_response_events(
1308 mut delta_rx: mpsc::UnboundedReceiver<ResponseDelta>,
1309 event_tx: mpsc::Sender<ResponseEvent>,
1310 response_id: String,
1311 model: String,
1312) {
1313 let mut stream_state = ResponseStreamState::new(response_id, model);
1314
1315 if event_tx
1316 .send(ResponseEvent::ResponseCreated(ResponseCreatedEvent {
1317 sequence_number: stream_state.next_sequence_number(),
1318 response: stream_state.snapshot(),
1319 }))
1320 .await
1321 .is_err()
1322 {
1323 return;
1324 }
1325
1326 if event_tx
1327 .send(ResponseEvent::ResponseInProgress(ResponseInProgressEvent {
1328 sequence_number: stream_state.next_sequence_number(),
1329 response: stream_state.snapshot(),
1330 }))
1331 .await
1332 .is_err()
1333 {
1334 return;
1335 }
1336
1337 let mut error_detail: Option<String> = None;
1338 let mut finish_reason: Option<String> = None;
1339 let mut usage = ResponseUsage::default();
1340
1341 while let Some(delta) = delta_rx.recv().await {
1342 if let Some(error) = &delta.error {
1343 error_detail = Some(error.clone());
1344 break;
1345 }
1346
1347 let mut mapped_events = Vec::new();
1348 for event in &delta.state_events {
1349 process_state_event_for_streaming(event, &mut stream_state, &mut mapped_events);
1350 }
1351
1352 for event in mapped_events {
1353 if event_tx.send(event).await.is_err() {
1354 return;
1355 }
1356 }
1357
1358 update_usage_from_delta(&delta, &mut usage);
1359 if usage.total_tokens > 0 {
1360 stream_state.usage = Some(usage.clone());
1361 }
1362
1363 if let Some(reason) = &delta.finish_reason {
1364 finish_reason = Some(reason.to_lowercase());
1365 }
1366
1367 if delta.is_final_delta {
1368 break;
1369 }
1370 }
1371
1372 let mut completion_events = Vec::new();
1373 emit_stream_fallback_item_done(&mut stream_state, &mut completion_events);
1374 for event in completion_events {
1375 if event_tx.send(event).await.is_err() {
1376 return;
1377 }
1378 }
1379
1380 if let Some(detail) = error_detail {
1381 stream_state.status = OutputStatus::Failed;
1382
1383 if event_tx
1384 .send(ResponseEvent::ResponseFailed(ResponseFailedEvent {
1385 sequence_number: stream_state.next_sequence_number(),
1386 response: stream_state.snapshot(),
1387 }))
1388 .await
1389 .is_err()
1390 {
1391 return;
1392 }
1393
1394 if event_tx
1395 .send(ResponseEvent::Error(StreamErrorEvent {
1396 sequence_number: stream_state.next_sequence_number(),
1397 error: StreamErrorDetail {
1398 message: detail,
1399 error_type: "inference_error".to_string(),
1400 },
1401 }))
1402 .await
1403 .is_err()
1404 {
1405 return;
1406 }
1407 } else {
1408 stream_state.completed_at = Some(current_timestamp());
1409
1410 if let Some(incomplete_details) = finish_reason_to_incomplete(finish_reason.as_deref()) {
1411 stream_state.status = OutputStatus::Incomplete;
1412 stream_state.incomplete_details = Some(incomplete_details);
1413 if event_tx
1414 .send(ResponseEvent::ResponseIncomplete(ResponseIncompleteEvent {
1415 sequence_number: stream_state.next_sequence_number(),
1416 response: stream_state.snapshot(),
1417 }))
1418 .await
1419 .is_err()
1420 {
1421 return;
1422 }
1423 } else {
1424 stream_state.status = OutputStatus::Completed;
1425 if event_tx
1426 .send(ResponseEvent::ResponseCompleted(ResponseCompletedEvent {
1427 sequence_number: stream_state.next_sequence_number(),
1428 response: stream_state.snapshot(),
1429 }))
1430 .await
1431 .is_err()
1432 {
1433 return;
1434 }
1435 }
1436 }
1437
1438 let _ = event_tx.send(ResponseEvent::Done).await;
1439}
1440
1441async fn gather_non_streaming_response(
1442 mut delta_rx: mpsc::UnboundedReceiver<ResponseDelta>,
1443 model: &str,
1444 request: &ResponsesRequest,
1445) -> Result<ResponseObject> {
1446 let created_at = current_timestamp();
1447 let mut completed_at: Option<i64> = None;
1448 let mut output_items: BTreeMap<u32, AggregatedOutputItem> = BTreeMap::new();
1449 let mut fallback_content = String::new();
1450 let mut usage = ResponseUsage::default();
1451 let mut error_detail: Option<String> = None;
1452 let mut finish_reason: Option<String> = None;
1453
1454 while let Some(delta) = delta_rx.recv().await {
1455 if let Some(error) = &delta.error {
1456 error_detail = Some(error.clone());
1457 }
1458
1459 if let Some(content) = &delta.content {
1460 fallback_content.push_str(content);
1461 }
1462
1463 for event in &delta.state_events {
1464 process_state_event_for_output(event, &mut output_items);
1465 }
1466
1467 update_usage_from_delta(&delta, &mut usage);
1468
1469 if let Some(reason) = &delta.finish_reason {
1470 finish_reason = Some(reason.to_lowercase());
1471 }
1472
1473 if delta.is_final_delta {
1474 completed_at = Some(current_timestamp());
1475 break;
1476 }
1477 }
1478
1479 if let Some(error) = error_detail {
1480 return Err(ClientError::RequestFailed(error));
1481 }
1482
1483 let incomplete_details = finish_reason_to_incomplete(finish_reason.as_deref());
1484 let output = if output_items.is_empty() && !fallback_content.is_empty() {
1485 vec![ResponseOutputItem::Message(OutputMessage {
1486 output_type: "message".to_string(),
1487 id: generate_message_id(),
1488 status: OutputStatus::Completed,
1489 role: "assistant".to_string(),
1490 content: vec![OutputTextContent::new(fallback_content)],
1491 })]
1492 } else {
1493 build_output_items(&output_items)
1494 };
1495
1496 Ok(ResponseObject {
1497 id: generate_response_id(),
1498 object: "response".to_string(),
1499 created_at,
1500 completed_at,
1501 status: if incomplete_details.is_some() {
1502 OutputStatus::Incomplete
1503 } else {
1504 OutputStatus::Completed
1505 },
1506 incomplete_details,
1507 error: None,
1508 model: model.to_string(),
1509 output,
1510 usage: Some(usage),
1511 metadata: request.metadata.clone(),
1512 parallel_tool_calls: request.parallel_tool_calls,
1513 temperature: request.temperature,
1514 top_p: request.top_p,
1515 presence_penalty: request.presence_penalty,
1516 frequency_penalty: request.frequency_penalty,
1517 top_k: request.top_k,
1518 min_p: request.min_p,
1519 instructions: request.instructions.clone(),
1520 max_output_tokens: request.max_output_tokens,
1521 top_logprobs: request.top_logprobs,
1522 tool_choice: request.tool_choice.clone(),
1523 tools: request.tools.clone(),
1524 max_tool_calls: request.max_tool_calls,
1525 text: request.text.clone(),
1526 })
1527}
1528
1529impl Client {
1530 pub async fn aresponses(
1532 &self,
1533 model_id: &str,
1534 request: ResponsesRequest,
1535 ) -> Result<ResponsesResult> {
1536 let info = self.registry.ensure_loaded(model_id).await?;
1537 let formatter = info.require_formatter()?;
1538 let request_id = self.ipc.next_request_id();
1539 tracing::debug!(
1540 request_id,
1541 model_id = %model_id,
1542 stream = request.stream,
1543 "Building responses request"
1544 );
1545 let messages = request.to_messages();
1546 tracing::trace!(
1547 request_id,
1548 model_id = %model_id,
1549 messages = ?messages,
1550 "Responses messages before multimodal expansion"
1551 );
1552 let reasoning_flag = request.reasoning_effort.is_some();
1553
1554 let (messages_for_template, image_buffers, capabilities, content_order) =
1555 build_multimodal_messages(formatter, &messages, request.instructions.as_deref())
1556 .map_err(|e| ClientError::Multimodal(e.to_string()))?;
1557
1558 if messages_for_template.is_empty() {
1559 return Err(ClientError::RequestFailed(
1560 "Response request must include at least one content segment.".into(),
1561 ));
1562 }
1563 tracing::trace!(
1564 request_id,
1565 model_id = %model_id,
1566 messages_for_template = ?messages_for_template,
1567 "Responses messages after multimodal expansion"
1568 );
1569
1570 let tool_schemas = request
1571 .tools
1572 .iter()
1573 .map(normalize_response_tool_schema)
1574 .collect::<Vec<_>>();
1575 let tool_schemas_json = if tool_schemas.is_empty() {
1576 String::new()
1577 } else {
1578 serde_json::to_string(&tool_schemas).unwrap_or_default()
1579 };
1580 let template_tools = (!tool_schemas.is_empty()).then_some(tool_schemas.as_slice());
1581
1582 let prompt_text = formatter
1583 .apply_template_with_tools(
1584 &messages_for_template,
1585 true,
1586 reasoning_flag,
1587 None,
1588 template_tools,
1589 )
1590 .map_err(|e| ClientError::Formatter(e.to_string()))?;
1591
1592 let capability_placeholder = formatter.capability_placeholder_token();
1593
1594 let layout_segments = build_multimodal_layout(
1595 &prompt_text,
1596 &image_buffers,
1597 &capabilities,
1598 &content_order,
1599 formatter.image_placeholder_token(),
1600 formatter.should_clip_image_placeholder(),
1601 capability_placeholder,
1602 )
1603 .map_err(|e| ClientError::Multimodal(e.to_string()))?;
1604
1605 let final_prompt = formatter.strip_template_placeholders(&prompt_text);
1606 tracing::debug!(
1607 request_id,
1608 model_id = %model_id,
1609 prompt_chars = final_prompt.chars().count(),
1610 image_count = image_buffers.len(),
1611 capability_count = capabilities.len(),
1612 layout_segment_count = layout_segments.len(),
1613 "Prepared responses prompt payload"
1614 );
1615 tracing::trace!(
1616 request_id,
1617 model_id = %model_id,
1618 prompt = %format!("\n{}", final_prompt),
1619 "Responses prompt sent to PIE"
1620 );
1621
1622 let response_format_json = request
1623 .text
1624 .as_ref()
1625 .map(|text| text.get("format").cloned().unwrap_or_else(|| text.clone()))
1626 .map(|response_format| serde_json::to_string(&response_format).unwrap_or_default())
1627 .unwrap_or_default();
1628
1629 let rng_seed = rand::thread_rng().gen::<u64>();
1630 let prompt_payload = PromptPayload {
1631 prompt: final_prompt,
1632 image_buffers,
1633 capabilities: super::convert_capabilities(&capabilities),
1634 layout: super::convert_layout(&layout_segments),
1635 max_generated_tokens: request.max_output_tokens.unwrap_or(0),
1636 temperature: request.temperature.unwrap_or(1.0),
1637 top_p: request.top_p.unwrap_or(1.0),
1638 top_k: request.top_k.unwrap_or(-1),
1639 min_p: request.min_p.unwrap_or(0.0),
1640 rng_seed,
1641 stop_sequences: Vec::new(),
1642 num_candidates: 1,
1643 best_of: Some(1),
1644 final_candidates: Some(1),
1645 frequency_penalty: request.frequency_penalty.unwrap_or(0.0),
1646 presence_penalty: request.presence_penalty.unwrap_or(0.0),
1647 repetition_penalty: 1.0,
1648 repetition_context_size: 60,
1649 top_logprobs: request.top_logprobs.unwrap_or(0),
1650 logit_bias: HashMap::new(),
1651 tool_schemas_json,
1652 tool_calling_tokens: formatter.get_tool_calling_tokens().clone(),
1653 tool_choice: tool_choice_to_string(request.tool_choice.as_ref()),
1654 max_tool_calls: request.max_tool_calls.unwrap_or(0).max(0),
1655 response_format_json,
1656 task_name: None,
1657 reasoning_effort: request.reasoning_effort.clone(),
1658 };
1659 tracing::debug!(
1660 request_id,
1661 model_id = %model_id,
1662 stream = request.stream,
1663 "Dispatching responses request to PIE"
1664 );
1665 let (_batch_size, stream) = self.ipc.send_batch_request(
1666 request_id,
1667 model_id,
1668 &info.model_path,
1669 &[prompt_payload],
1670 )?;
1671
1672 if request.stream {
1673 let (event_tx, event_rx) = mpsc::channel(256);
1674 tokio::spawn(stream_response_events(
1675 stream,
1676 event_tx,
1677 generate_response_id(),
1678 model_id.to_string(),
1679 ));
1680 Ok(ResponsesResult::Stream(event_rx))
1681 } else {
1682 let response = gather_non_streaming_response(stream, model_id, &request).await?;
1683 Ok(ResponsesResult::Complete(Box::new(response)))
1684 }
1685 }
1686}
1687
1688#[cfg(test)]
1689mod tests {
1690 use super::*;
1691
1692 #[test]
1693 fn test_response_event_type_names() {
1694 let snapshot = ResponseSnapshot {
1695 id: "resp_1".to_string(),
1696 object: "response".to_string(),
1697 created_at: 1,
1698 completed_at: None,
1699 status: OutputStatus::InProgress,
1700 incomplete_details: None,
1701 model: "model".to_string(),
1702 output: Vec::new(),
1703 usage: None,
1704 };
1705
1706 let event = ResponseEvent::ResponseCreated(ResponseCreatedEvent {
1707 sequence_number: 0,
1708 response: snapshot,
1709 });
1710 assert_eq!(event.event_type(), "response.created");
1711 assert_eq!(ResponseEvent::Done.event_type(), "done");
1712 }
1713
1714 #[test]
1715 fn test_finish_reason_to_incomplete() {
1716 assert_eq!(
1717 finish_reason_to_incomplete(Some("length")),
1718 Some(IncompleteDetails {
1719 reason: "max_output_tokens".to_string(),
1720 })
1721 );
1722 assert_eq!(
1723 finish_reason_to_incomplete(Some("content_filter")),
1724 Some(IncompleteDetails {
1725 reason: "content_filter".to_string(),
1726 })
1727 );
1728 assert_eq!(finish_reason_to_incomplete(Some("stop")), None);
1729 }
1730
1731 #[test]
1732 fn test_request_input_conversion_string() {
1733 let request = ResponsesRequest::from_text("hello");
1734 let messages = request.to_messages();
1735 assert_eq!(messages.len(), 1);
1736 assert_eq!(
1737 messages[0].get("role").and_then(Value::as_str),
1738 Some("user")
1739 );
1740 }
1741
1742 #[test]
1743 fn test_request_input_conversion_tool_items() {
1744 let request = ResponsesRequest {
1745 input: ResponsesInput::Items(vec![
1746 ResponseInputItem::FunctionCall {
1747 call_id: "call_1".to_string(),
1748 name: "get_weather".to_string(),
1749 arguments: "{\"location\":\"SF\"}".to_string(),
1750 },
1751 ResponseInputItem::FunctionCallOutput {
1752 call_id: "call_1".to_string(),
1753 output: "{\"temperature\":65}".to_string(),
1754 },
1755 ]),
1756 stream: false,
1757 instructions: None,
1758 temperature: None,
1759 top_p: None,
1760 top_k: None,
1761 min_p: None,
1762 frequency_penalty: None,
1763 presence_penalty: None,
1764 max_output_tokens: None,
1765 top_logprobs: None,
1766 tools: Vec::new(),
1767 tool_choice: None,
1768 max_tool_calls: None,
1769 text: None,
1770 reasoning_effort: None,
1771 metadata: None,
1772 parallel_tool_calls: false,
1773 };
1774
1775 let messages = request.to_messages();
1776 assert_eq!(messages.len(), 2);
1777 assert_eq!(
1778 messages[0].get("role").and_then(Value::as_str),
1779 Some("assistant")
1780 );
1781 assert_eq!(
1782 messages[1].get("role").and_then(Value::as_str),
1783 Some("tool")
1784 );
1785 }
1786
1787 #[test]
1788 fn test_response_delta_state_event_deserialization_shape() {
1789 let json = serde_json::json!({
1790 "request_id": 1,
1791 "is_final_delta": false,
1792 "state_events": [
1793 {
1794 "event_type": "item_started",
1795 "item_type": "message",
1796 "output_index": 0,
1797 "identifier": "",
1798 "delta": ""
1799 }
1800 ]
1801 });
1802 let parsed: ResponseDelta = serde_json::from_value(json).expect("deserialize failed");
1803 assert_eq!(parsed.state_events.len(), 1);
1804 assert_eq!(parsed.state_events[0].event_type, "item_started");
1805 }
1806
1807 #[test]
1808 fn test_build_output_items_uses_structured_tool_call_completion_value() {
1809 let mut output_items = BTreeMap::new();
1810
1811 process_state_event_for_output(
1812 &ResponseStateEvent {
1813 event_type: "content_delta".to_string(),
1814 item_type: "tool_call".to_string(),
1815 output_index: 0,
1816 identifier: "arguments".to_string(),
1817 delta: r#"location="Tokyo", verbose=True"#.to_string(),
1818 value: None,
1819 },
1820 &mut output_items,
1821 );
1822 process_state_event_for_output(
1823 &ResponseStateEvent {
1824 event_type: "item_completed".to_string(),
1825 item_type: "tool_call".to_string(),
1826 output_index: 0,
1827 identifier: "tool_call:get_weather".to_string(),
1828 delta: String::new(),
1829 value: Some(Value::String(
1830 serde_json::json!({
1831 "name": "get_weather",
1832 "arguments": {
1833 "location": "Tokyo",
1834 "verbose": true,
1835 "limit": null,
1836 },
1837 })
1838 .to_string(),
1839 )),
1840 },
1841 &mut output_items,
1842 );
1843
1844 let output = build_output_items(&output_items);
1845 let ResponseOutputItem::FunctionCall(call) = &output[0] else {
1846 panic!("expected function call output");
1847 };
1848
1849 assert_eq!(call.name, "get_weather");
1850 assert_eq!(
1851 serde_json::from_str::<Value>(&call.arguments).expect("valid JSON arguments"),
1852 serde_json::json!({
1853 "location": "Tokyo",
1854 "verbose": true,
1855 "limit": null,
1856 })
1857 );
1858 }
1859
1860 #[test]
1861 fn test_streaming_tool_call_done_uses_structured_completion_value() {
1862 let mut stream_state = ResponseStreamState::new("resp_1".to_string(), "model".to_string());
1863 let mut events = Vec::new();
1864
1865 process_state_event_for_streaming(
1866 &ResponseStateEvent {
1867 event_type: "item_started".to_string(),
1868 item_type: "tool_call".to_string(),
1869 output_index: 0,
1870 identifier: "tool_call:get_weather".to_string(),
1871 delta: String::new(),
1872 value: None,
1873 },
1874 &mut stream_state,
1875 &mut events,
1876 );
1877 process_state_event_for_streaming(
1878 &ResponseStateEvent {
1879 event_type: "content_delta".to_string(),
1880 item_type: "tool_call".to_string(),
1881 output_index: 0,
1882 identifier: "arguments".to_string(),
1883 delta: r#"location="Tokyo", verbose=True"#.to_string(),
1884 value: None,
1885 },
1886 &mut stream_state,
1887 &mut events,
1888 );
1889 process_state_event_for_streaming(
1890 &ResponseStateEvent {
1891 event_type: "item_completed".to_string(),
1892 item_type: "tool_call".to_string(),
1893 output_index: 0,
1894 identifier: "arguments".to_string(),
1895 delta: String::new(),
1896 value: Some(Value::String(
1897 r#"location="Tokyo", verbose=True"#.to_string(),
1898 )),
1899 },
1900 &mut stream_state,
1901 &mut events,
1902 );
1903 process_state_event_for_streaming(
1904 &ResponseStateEvent {
1905 event_type: "item_completed".to_string(),
1906 item_type: "tool_call".to_string(),
1907 output_index: 0,
1908 identifier: "tool_call:get_weather".to_string(),
1909 delta: String::new(),
1910 value: Some(Value::String(
1911 serde_json::json!({
1912 "name": "get_weather",
1913 "arguments": {
1914 "location": "Tokyo",
1915 "verbose": true,
1916 },
1917 })
1918 .to_string(),
1919 )),
1920 },
1921 &mut stream_state,
1922 &mut events,
1923 );
1924
1925 let argument_done_events = events
1926 .iter()
1927 .filter_map(|event| match event {
1928 ResponseEvent::FunctionCallArgumentsDone(done) => Some(done),
1929 _ => None,
1930 })
1931 .collect::<Vec<_>>();
1932 assert_eq!(argument_done_events.len(), 1);
1933 assert_eq!(
1934 serde_json::from_str::<Value>(&argument_done_events[0].arguments)
1935 .expect("valid JSON arguments"),
1936 serde_json::json!({
1937 "location": "Tokyo",
1938 "verbose": true,
1939 })
1940 );
1941
1942 let completed_calls = events
1943 .iter()
1944 .filter_map(|event| match event {
1945 ResponseEvent::OutputItemDone(done) => match &done.item {
1946 ResponseOutputItem::FunctionCall(call) => Some(call),
1947 _ => None,
1948 },
1949 _ => None,
1950 })
1951 .collect::<Vec<_>>();
1952 assert_eq!(completed_calls.len(), 1);
1953 assert_eq!(completed_calls[0].name, "get_weather");
1954 assert_eq!(
1955 serde_json::from_str::<Value>(&completed_calls[0].arguments)
1956 .expect("valid JSON arguments"),
1957 serde_json::json!({
1958 "location": "Tokyo",
1959 "verbose": true,
1960 })
1961 );
1962 }
1963}