1use anyhow::Result;
2use futures::Stream;
3use reqwest::Response;
4use serde::{Deserialize, Serialize};
5use std::pin::Pin;
6
7use crate::buffer_utils::{SseLineParser, parse_sse_stream};
8
9pub use crate::buffer_utils::{CircularLineBuffer, EventBatcher};
10
11use crate::openai::ResponseStreamChunk;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(tag = "type", rename_all = "snake_case")]
15pub enum StreamEvent {
16 Reasoning {
17 content: String,
18 },
19
20 Message {
21 content: String,
22 },
23
24 ToolCall {
25 index: u32,
26 #[serde(skip_serializing_if = "Option::is_none")]
27 id: Option<String>,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 name: Option<String>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 arguments: Option<String>,
32 },
33
34 Done {
35 #[serde(skip_serializing_if = "Option::is_none")]
36 finish_reason: Option<String>,
37 },
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ChatStreamChunk {
42 pub id: String,
43 pub object: String,
44 pub created: i64,
45 pub model: String,
46 pub choices: Vec<StreamChoice>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct StreamChoice {
51 pub index: u32,
52 pub delta: Delta,
53 pub finish_reason: Option<String>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct Delta {
58 pub role: Option<String>,
59 pub content: Option<String>,
60 pub tool_calls: Option<Vec<ToolCallDelta>>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ToolCallDelta {
65 pub index: u32,
66 pub id: Option<String>,
67 #[serde(rename = "type")]
68 pub tool_type: Option<String>,
69 pub function: Option<FunctionDelta>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct FunctionDelta {
74 pub name: Option<String>,
75 pub arguments: Option<String>,
76}
77
78impl ChatStreamChunk {
79 pub fn content(&self) -> Option<&str> {
80 self.choices
81 .first()
82 .and_then(|c| c.delta.content.as_deref())
83 }
84
85 pub fn is_done(&self) -> bool {
86 self.choices
87 .first()
88 .and_then(|c| c.finish_reason.as_ref())
89 .is_some()
90 }
91
92 fn to_stream_events(&self) -> Vec<StreamEvent> {
93 let mut events = Vec::new();
94
95 if let Some(choice) = self.choices.first() {
96 if let Some(content) = &choice.delta.content {
97 if !content.is_empty() {
98 events.push(StreamEvent::Message {
99 content: content.clone(),
100 });
101 }
102 }
103
104 if let Some(tool_calls) = &choice.delta.tool_calls {
105 for tc in tool_calls {
106 events.push(StreamEvent::ToolCall {
107 index: tc.index,
108 id: tc.id.clone(),
109 name: tc.function.as_ref().and_then(|f| f.name.clone()),
110 arguments: tc.function.as_ref().and_then(|f| f.arguments.clone()),
111 });
112 }
113 }
114
115 if let Some(finish_reason) = &choice.finish_reason {
116 events.push(StreamEvent::Done {
117 finish_reason: Some(finish_reason.clone()),
118 });
119 }
120 }
121
122 events
123 }
124}
125
126struct ChatSseParser;
128
129impl SseLineParser for ChatSseParser {
130 fn parse_data_line(&self, data: &str) -> Result<Vec<StreamEvent>> {
131 let chunk: ChatStreamChunk = serde_json::from_str(data)
132 .map_err(|e| anyhow::anyhow!("Failed to parse chat chunk: {}", e))?;
133
134 Ok(chunk.to_stream_events())
135 }
136}
137
138struct ResponseSseParser;
140
141impl SseLineParser for ResponseSseParser {
142 fn parse_data_line(&self, data: &str) -> Result<Vec<StreamEvent>> {
143 let chunk: ResponseStreamChunk = serde_json::from_str(data)
144 .map_err(|e| anyhow::anyhow!("Failed to parse response chunk: {}", e))?;
145
146 let mut events = Vec::new();
147
148 if chunk.is_done() {
149 events.push(StreamEvent::Done {
150 finish_reason: chunk.status.clone(),
151 });
152 return Ok(events);
153 }
154
155 let is_reasoning = chunk.output_index.map(|idx| idx == 0).unwrap_or(false);
156
157 tracing::debug!(
159 "ResponseStreamChunk - output_index: {:?}, is_reasoning: {}, delta: {:?}",
160 chunk.output_index,
161 is_reasoning,
162 chunk.delta
163 );
164
165 if is_reasoning {
166 if let Some(text) = chunk.reasoning_text() {
167 if !text.is_empty() {
168 tracing::debug!("Emitting Reasoning event with {} chars", text.len());
169 events.push(StreamEvent::Reasoning { content: text });
170 }
171 }
172 } else {
173 if let Some(text) = chunk.message_text() {
174 if !text.is_empty() {
175 tracing::debug!("Emitting Message event with {} chars", text.len());
176 events.push(StreamEvent::Message { content: text });
177 }
178 }
179 }
180
181 Ok(events)
182 }
183}
184
185pub fn parse_chat_sse_stream(
186 response: Response,
187) -> Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>> {
188 parse_sse_stream(response, ChatSseParser)
189}
190
191pub fn parse_response_sse_stream(
192 response: Response,
193) -> Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>> {
194 parse_sse_stream(response, ResponseSseParser)
195}
196
197pub use ChatStreamChunk as StreamChunk;
198
199pub fn parse_sse_stream_legacy(response: Response) -> Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>> {
201 parse_chat_sse_stream(response)
202}
203