Skip to main content

codex_convert_proxy/convert/streaming/
state.rs

1//! Streaming state types: StreamState and ToolCallState.
2//!
3//! `StreamState` was historically a single struct with ~30 fields covering
4//! seven distinct concerns. It is now decomposed into single-responsibility
5//! sub-structs (still `pub` for convenience at the call sites — the grouping
6//! itself documents intent and lifecycle):
7//!
8//! - [`TextAccumulator`] — text/refusal/reasoning content + the thinking-tag
9//!   buffer used by the streaming parser.
10//! - [`IndexAllocator`] — `output_index` / `content_index` allocator state and
11//!   the spec-required `sequence_number` counter.
12//! - [`UsageMetrics`] — token counts received from upstream.
13//! - [`ToolCallTracker`] — in-flight (`current`) vs finalised (`completed`)
14//!   function-call state.
15//! - [`EmitState`] — boolean flags that drive the streaming emit state
16//!   machine + the final response status / incomplete reason.
17//!
18//! `request_context` no longer lives on `StreamState`; callers pass it as a
19//! parameter to [`StreamState::build_response_object`] so that non-streaming
20//! flows don't have to construct a `StreamState` just to carry context.
21
22use crate::convert::context::ResponseRequestContext;
23use crate::types::chat_api::ChatStreamChunk;
24
25use super::super::util::{extract_queries_from_arguments, map_tool_name_to_output_type};
26
27/// Accumulated text content emitted across stream chunks.
28#[derive(Debug, Clone, Default)]
29pub struct TextAccumulator {
30    /// Assistant text seen so far (with thinking tags stripped).
31    pub full_text: String,
32    /// Refusal text emitted via `delta.refusal`.
33    pub refusal_text: String,
34    /// Reasoning content captured from `delta.reasoning_content` or thinking
35    /// tags.
36    pub reasoning_text: String,
37    /// Partial buffer for unterminated `<think>` / `<thought>` tags split
38    /// across chunks.
39    pub thinking_buffer: String,
40    /// True if the next text byte belongs to a thinking tag's interior.
41    pub is_thinking: bool,
42}
43
44/// Output-index / content-index / sequence-number allocators for the
45/// stream's lifetime.
46#[derive(Debug, Clone, Default)]
47pub struct IndexAllocator {
48    pub content_index: u32,
49    pub next_output_index: u32,
50    pub text_output_index: Option<u32>,
51    pub reasoning_output_index: Option<u32>,
52    /// Monotonic sequence counter for SSE events (spec-required
53    /// `sequence_number`).
54    pub next_sequence_number: u64,
55}
56
57impl IndexAllocator {
58    /// Allocate and return the next sequence number, then advance the counter.
59    pub fn take_sequence_number(&mut self) -> u64 {
60        let n = self.next_sequence_number;
61        self.next_sequence_number += 1;
62        n
63    }
64}
65
66/// Token usage observed from upstream chunks.
67#[derive(Debug, Clone, Default)]
68pub struct UsageMetrics {
69    pub input_tokens: Option<i64>,
70    pub output_tokens: Option<i64>,
71    pub total_tokens: Option<i64>,
72    pub cached_tokens: Option<i64>,
73    pub reasoning_tokens: Option<i64>,
74}
75
76impl UsageMetrics {
77    /// Absorb token counts from a streaming chunk's usage block, if present.
78    pub fn update_from_chunk(&mut self, chunk: &ChatStreamChunk) {
79        if let Some(usage) = &chunk.usage {
80            self.input_tokens = usage.prompt_tokens.map(|v| v as i64);
81            self.output_tokens = usage.completion_tokens.map(|v| v as i64);
82            self.total_tokens = usage.total_tokens.map(|v| v as i64);
83            self.cached_tokens = usage
84                .prompt_tokens_details
85                .as_ref()
86                .and_then(|d| d.cached_tokens)
87                .map(|v| v as i64);
88            self.reasoning_tokens = usage
89                .completion_tokens_details
90                .as_ref()
91                .and_then(|d| d.reasoning_tokens)
92                .map(|v| v as i64);
93        }
94    }
95}
96
97/// In-flight + finalised function-call state.
98#[derive(Debug, Clone, Default)]
99pub struct ToolCallTracker {
100    pub current: Vec<ToolCallState>,
101    pub completed: Vec<ToolCallState>,
102}
103
104/// Booleans driving the streaming emit state machine.
105#[derive(Debug, Clone)]
106pub struct EmitState {
107    pub is_first_chunk: bool,
108    pub is_output_item_added: bool,
109    pub is_content_part_added: bool,
110    pub is_reasoning_added: bool,
111    pub is_function_call_item_added: bool,
112    pub is_completed: bool,
113    /// Final response status derived from `finish_reason`.
114    pub final_status: String,
115    /// Optional incomplete reason when `final_status == "incomplete"`.
116    pub incomplete_reason: Option<String>,
117}
118
119impl Default for EmitState {
120    fn default() -> Self {
121        Self {
122            is_first_chunk: true,
123            is_output_item_added: false,
124            is_content_part_added: false,
125            is_reasoning_added: false,
126            is_function_call_item_added: false,
127            is_completed: false,
128            final_status: "completed".to_string(),
129            incomplete_reason: None,
130        }
131    }
132}
133
134/// Streaming converter state for tracking incremental changes.
135#[derive(Debug, Clone)]
136pub struct StreamState {
137    pub response_id: String,
138    pub output_id: String,
139    pub model: String,
140
141    pub text: TextAccumulator,
142    pub indices: IndexAllocator,
143    pub usage: UsageMetrics,
144    pub tool_calls: ToolCallTracker,
145    pub emit: EmitState,
146
147    /// Backwards-compat carrier for the request context. New code paths should
148    /// pass `Option<&ResponseRequestContext>` directly to
149    /// `build_response_object`; this field will be removed once `ProxyContext`
150    /// stores the context separately.
151    pub request_context: Option<ResponseRequestContext>,
152}
153
154#[derive(Debug, Clone)]
155pub struct ToolCallState {
156    pub upstream_id: Option<String>,
157    pub id: String,
158    pub call_id: String,
159    pub item_type: String,
160    pub name: String,
161    pub arguments: String,
162    pub output_index: u32,
163    pub chat_api_index: u32,
164}
165
166impl StreamState {
167    /// Create a new stream state.
168    pub fn new(
169        response_id: String,
170        model: String,
171        request_context: Option<ResponseRequestContext>,
172    ) -> Self {
173        Self {
174            output_id: format!("msg_{}", response_id),
175            response_id,
176            model,
177            text: TextAccumulator::default(),
178            indices: IndexAllocator::default(),
179            usage: UsageMetrics::default(),
180            tool_calls: ToolCallTracker::default(),
181            emit: EmitState::default(),
182            request_context,
183        }
184    }
185
186    /// Allocate and return the next sequence number, then advance the counter.
187    #[inline]
188    pub fn take_sequence_number(&mut self) -> u64 {
189        self.indices.take_sequence_number()
190    }
191
192    /// Update usage from a ChatStreamChunk.
193    #[inline]
194    pub fn update_usage(&mut self, chunk: &ChatStreamChunk) {
195        self.usage.update_from_chunk(chunk);
196    }
197
198    /// Build the final ResponseObject with all accumulated outputs.
199    ///
200    /// The `ctx` parameter supplies request-level fields (instructions, tools,
201    /// sampling params, etc.). It defaults to `self.request_context` for
202    /// backwards compatibility but new callers should pass it explicitly.
203    pub fn build_response_object(&self) -> Box<crate::types::response_api::ResponseObject> {
204        use crate::types::response_api::{
205            InputTokensDetails, OutputItemType, OutputTokensDetails, ResponseContentPart,
206            ResponseObject, ResponseOutputItem, Usage,
207        };
208        use chrono::Utc;
209
210        let ctx_opt = self.request_context.as_ref();
211
212        let mut output = Vec::new();
213
214        // Add reasoning output if present
215        if self.emit.is_reasoning_added && !self.text.reasoning_text.is_empty() {
216            output.push(ResponseOutputItem {
217                id: format!("reasoning_{}", self.response_id),
218                item_type: OutputItemType::Reasoning,
219                status: None,
220                content: Some(vec![]),
221                summary: Some(vec![
222                    crate::types::response_api::ReasoningSummaryPart::SummaryText {
223                        text: self.text.reasoning_text.clone(),
224                    },
225                ]),
226                role: None,
227                name: None,
228                arguments: None,
229                call_id: None,
230                queries: None,
231                results: None,
232                namespace: None,
233            });
234        }
235
236        // Add assistant message output (text and/or refusal)
237        if self.emit.is_output_item_added
238            && (!self.text.full_text.is_empty() || !self.text.refusal_text.is_empty())
239        {
240            let mut content_parts = Vec::new();
241            if !self.text.full_text.is_empty() {
242                content_parts.push(ResponseContentPart::OutputText {
243                    text: self.text.full_text.clone(),
244                    annotations: vec![],
245                    logprobs: vec![],
246                });
247            }
248            if !self.text.refusal_text.is_empty() {
249                content_parts.push(ResponseContentPart::Refusal {
250                    refusal: self.text.refusal_text.clone(),
251                });
252            }
253            output.push(ResponseOutputItem {
254                id: self.output_id.clone(),
255                item_type: OutputItemType::Message,
256                status: Some("completed".to_string()),
257                content: Some(content_parts),
258                role: Some("assistant".to_string()),
259                name: None,
260                arguments: None,
261                call_id: None,
262                queries: None,
263                results: None,
264                summary: None,
265                namespace: None,
266            });
267        }
268
269        // Add function call outputs
270        for tc in &self.tool_calls.completed {
271            let item_type = map_tool_name_to_output_type(&tc.name, ctx_opt.map(|ctx| &ctx.tools));
272            let (queries, results) = if item_type != OutputItemType::FunctionCall {
273                (
274                    extract_queries_from_arguments(&tc.arguments),
275                    Some(serde_json::Value::Null),
276                )
277            } else {
278                (None, None)
279            };
280            output.push(ResponseOutputItem {
281                id: tc.id.clone(),
282                item_type,
283                status: Some("completed".to_string()),
284                content: None,
285                role: None,
286                name: Some(tc.name.clone()),
287                arguments: Some(tc.arguments.clone()),
288                call_id: Some(tc.call_id.clone()),
289                queries,
290                results,
291                summary: None,
292                namespace: None,
293            });
294        }
295
296        // Start from the typed stub so request-level fields and spec-required
297        // defaults are populated consistently, then layer in the streamed
298        // output + final status + usage.
299        let mut response = ResponseObject::stub(
300            self.response_id.clone(),
301            self.model.clone(),
302            self.emit.final_status.clone(),
303            Utc::now().timestamp(),
304            ctx_opt,
305        );
306        response.completed_at = Some(Utc::now().timestamp());
307        response.incomplete_details = self
308            .emit
309            .incomplete_reason
310            .as_ref()
311            .map(|reason| serde_json::json!({ "reason": reason }));
312        response.output = output;
313        response.usage = if self.usage.input_tokens.is_some()
314            || self.usage.output_tokens.is_some()
315            || self.usage.total_tokens.is_some()
316        {
317            Some(Usage {
318                input_tokens: self.usage.input_tokens,
319                input_tokens_details: Some(InputTokensDetails {
320                    cached_tokens: self.usage.cached_tokens.unwrap_or(0),
321                }),
322                output_tokens: self.usage.output_tokens,
323                output_tokens_details: Some(OutputTokensDetails {
324                    reasoning_tokens: self.usage.reasoning_tokens.unwrap_or(0),
325                }),
326                total_tokens: self.usage.total_tokens,
327            })
328        } else {
329            None
330        };
331        Box::new(response)
332    }
333}