codex_convert_proxy/convert/streaming/
state.rs1use crate::convert::context::ResponseRequestContext;
23use crate::types::chat_api::ChatStreamChunk;
24
25use super::super::util::{extract_queries_from_arguments, map_tool_name_to_output_type};
26
27#[derive(Debug, Clone, Default)]
29pub struct TextAccumulator {
30 pub full_text: String,
32 pub refusal_text: String,
34 pub reasoning_text: String,
37 pub thinking_buffer: String,
40 pub is_thinking: bool,
42}
43
44#[derive(Debug, Clone, Default)]
47pub struct IndexAllocator {
48 pub content_index: u32,
49 pub next_output_index: u32,
50 pub text_output_index: Option<u32>,
51 pub reasoning_output_index: Option<u32>,
52 pub next_sequence_number: u64,
55}
56
57impl IndexAllocator {
58 pub fn take_sequence_number(&mut self) -> u64 {
60 let n = self.next_sequence_number;
61 self.next_sequence_number += 1;
62 n
63 }
64}
65
66#[derive(Debug, Clone, Default)]
68pub struct UsageMetrics {
69 pub input_tokens: Option<i64>,
70 pub output_tokens: Option<i64>,
71 pub total_tokens: Option<i64>,
72 pub cached_tokens: Option<i64>,
73 pub reasoning_tokens: Option<i64>,
74}
75
76impl UsageMetrics {
77 pub fn update_from_chunk(&mut self, chunk: &ChatStreamChunk) {
79 if let Some(usage) = &chunk.usage {
80 self.input_tokens = usage.prompt_tokens.map(|v| v as i64);
81 self.output_tokens = usage.completion_tokens.map(|v| v as i64);
82 self.total_tokens = usage.total_tokens.map(|v| v as i64);
83 self.cached_tokens = usage
84 .prompt_tokens_details
85 .as_ref()
86 .and_then(|d| d.cached_tokens)
87 .map(|v| v as i64);
88 self.reasoning_tokens = usage
89 .completion_tokens_details
90 .as_ref()
91 .and_then(|d| d.reasoning_tokens)
92 .map(|v| v as i64);
93 }
94 }
95}
96
97#[derive(Debug, Clone, Default)]
99pub struct ToolCallTracker {
100 pub current: Vec<ToolCallState>,
101 pub completed: Vec<ToolCallState>,
102}
103
104#[derive(Debug, Clone)]
106pub struct EmitState {
107 pub is_first_chunk: bool,
108 pub is_output_item_added: bool,
109 pub is_content_part_added: bool,
110 pub is_reasoning_added: bool,
111 pub is_function_call_item_added: bool,
112 pub is_completed: bool,
113 pub final_status: String,
115 pub incomplete_reason: Option<String>,
117}
118
119impl Default for EmitState {
120 fn default() -> Self {
121 Self {
122 is_first_chunk: true,
123 is_output_item_added: false,
124 is_content_part_added: false,
125 is_reasoning_added: false,
126 is_function_call_item_added: false,
127 is_completed: false,
128 final_status: "completed".to_string(),
129 incomplete_reason: None,
130 }
131 }
132}
133
134#[derive(Debug, Clone)]
136pub struct StreamState {
137 pub response_id: String,
138 pub output_id: String,
139 pub model: String,
140
141 pub text: TextAccumulator,
142 pub indices: IndexAllocator,
143 pub usage: UsageMetrics,
144 pub tool_calls: ToolCallTracker,
145 pub emit: EmitState,
146
147 pub request_context: Option<ResponseRequestContext>,
152}
153
154#[derive(Debug, Clone)]
155pub struct ToolCallState {
156 pub upstream_id: Option<String>,
157 pub id: String,
158 pub call_id: String,
159 pub item_type: String,
160 pub name: String,
161 pub arguments: String,
162 pub output_index: u32,
163 pub chat_api_index: u32,
164}
165
166impl StreamState {
167 pub fn new(
169 response_id: String,
170 model: String,
171 request_context: Option<ResponseRequestContext>,
172 ) -> Self {
173 Self {
174 output_id: format!("msg_{}", response_id),
175 response_id,
176 model,
177 text: TextAccumulator::default(),
178 indices: IndexAllocator::default(),
179 usage: UsageMetrics::default(),
180 tool_calls: ToolCallTracker::default(),
181 emit: EmitState::default(),
182 request_context,
183 }
184 }
185
186 #[inline]
188 pub fn take_sequence_number(&mut self) -> u64 {
189 self.indices.take_sequence_number()
190 }
191
192 #[inline]
194 pub fn update_usage(&mut self, chunk: &ChatStreamChunk) {
195 self.usage.update_from_chunk(chunk);
196 }
197
198 pub fn build_response_object(&self) -> Box<crate::types::response_api::ResponseObject> {
204 use crate::types::response_api::{
205 InputTokensDetails, OutputItemType, OutputTokensDetails, ResponseContentPart,
206 ResponseObject, ResponseOutputItem, Usage,
207 };
208 use chrono::Utc;
209
210 let ctx_opt = self.request_context.as_ref();
211
212 let mut output = Vec::new();
213
214 if self.emit.is_reasoning_added && !self.text.reasoning_text.is_empty() {
216 output.push(ResponseOutputItem {
217 id: format!("reasoning_{}", self.response_id),
218 item_type: OutputItemType::Reasoning,
219 status: None,
220 content: Some(vec![]),
221 summary: Some(vec![
222 crate::types::response_api::ReasoningSummaryPart::SummaryText {
223 text: self.text.reasoning_text.clone(),
224 },
225 ]),
226 role: None,
227 name: None,
228 arguments: None,
229 call_id: None,
230 queries: None,
231 results: None,
232 namespace: None,
233 });
234 }
235
236 if self.emit.is_output_item_added
238 && (!self.text.full_text.is_empty() || !self.text.refusal_text.is_empty())
239 {
240 let mut content_parts = Vec::new();
241 if !self.text.full_text.is_empty() {
242 content_parts.push(ResponseContentPart::OutputText {
243 text: self.text.full_text.clone(),
244 annotations: vec![],
245 logprobs: vec![],
246 });
247 }
248 if !self.text.refusal_text.is_empty() {
249 content_parts.push(ResponseContentPart::Refusal {
250 refusal: self.text.refusal_text.clone(),
251 });
252 }
253 output.push(ResponseOutputItem {
254 id: self.output_id.clone(),
255 item_type: OutputItemType::Message,
256 status: Some("completed".to_string()),
257 content: Some(content_parts),
258 role: Some("assistant".to_string()),
259 name: None,
260 arguments: None,
261 call_id: None,
262 queries: None,
263 results: None,
264 summary: None,
265 namespace: None,
266 });
267 }
268
269 for tc in &self.tool_calls.completed {
271 let item_type = map_tool_name_to_output_type(&tc.name, ctx_opt.map(|ctx| &ctx.tools));
272 let (queries, results) = if item_type != OutputItemType::FunctionCall {
273 (
274 extract_queries_from_arguments(&tc.arguments),
275 Some(serde_json::Value::Null),
276 )
277 } else {
278 (None, None)
279 };
280 output.push(ResponseOutputItem {
281 id: tc.id.clone(),
282 item_type,
283 status: Some("completed".to_string()),
284 content: None,
285 role: None,
286 name: Some(tc.name.clone()),
287 arguments: Some(tc.arguments.clone()),
288 call_id: Some(tc.call_id.clone()),
289 queries,
290 results,
291 summary: None,
292 namespace: None,
293 });
294 }
295
296 let mut response = ResponseObject::stub(
300 self.response_id.clone(),
301 self.model.clone(),
302 self.emit.final_status.clone(),
303 Utc::now().timestamp(),
304 ctx_opt,
305 );
306 response.completed_at = Some(Utc::now().timestamp());
307 response.incomplete_details = self
308 .emit
309 .incomplete_reason
310 .as_ref()
311 .map(|reason| serde_json::json!({ "reason": reason }));
312 response.output = output;
313 response.usage = if self.usage.input_tokens.is_some()
314 || self.usage.output_tokens.is_some()
315 || self.usage.total_tokens.is_some()
316 {
317 Some(Usage {
318 input_tokens: self.usage.input_tokens,
319 input_tokens_details: Some(InputTokensDetails {
320 cached_tokens: self.usage.cached_tokens.unwrap_or(0),
321 }),
322 output_tokens: self.usage.output_tokens,
323 output_tokens_details: Some(OutputTokensDetails {
324 reasoning_tokens: self.usage.reasoning_tokens.unwrap_or(0),
325 }),
326 total_tokens: self.usage.total_tokens,
327 })
328 } else {
329 None
330 };
331 Box::new(response)
332 }
333}