1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use crate::api::models::ChatChunk;
use anyhow::Result;
use futures::StreamExt;
use tokio::sync::mpsc;
/// Events emitted during streaming.
#[derive(Debug, Clone)]
pub enum StreamEvent {
/// A text token was received.
Token(String),
/// A reasoning/thinking token (preserved for thought continuity).
Reasoning(String),
/// A tool call is being accumulated.
ToolCallStart {
index: u32,
id: String,
name: String,
},
/// Tool call arguments chunk.
ToolCallArgs { index: u32, args_chunk: String },
/// Stream completed, with optional token usage from the final chunk.
Done {
prompt_tokens: u32,
completion_tokens: u32,
cached_tokens: u32,
},
/// Error occurred.
// Emitted on parse or network errors during streaming; not yet matched by callers.
Error(String),
}
/// Process an SSE streaming response and emit events.
pub async fn process_stream(
response: reqwest::Response,
tx: mpsc::UnboundedSender<StreamEvent>,
) -> Result<()> {
let mut stream = response.bytes_stream();
// Pre-allocate SSE buffer to reduce realloc churn during streaming.
let mut buffer = String::with_capacity(4096);
let mut prompt_tokens: u32 = 0;
let mut completion_tokens: u32 = 0;
let mut cached_tokens: u32 = 0;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
// Process complete SSE lines (handle both \n and \r\n per SSE spec).
while let Some(line_end) = buffer.find('\n') {
let line = buffer[..line_end].trim_end_matches('\r').to_string();
buffer.drain(..=line_end);
if line.is_empty() || line.starts_with(':') {
continue;
}
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
let _ = tx.send(StreamEvent::Done {
prompt_tokens,
completion_tokens,
cached_tokens,
});
return Ok(());
}
match serde_json::from_str::<ChatChunk>(data) {
Ok(chunk) => {
tracing::trace!(chunk_id = %chunk.id, "Streaming chunk received");
// Capture usage from final chunk (Z.ai / OpenAI send it here).
// cached_tokens may appear as top-level field or nested in
// prompt_tokens_details depending on provider.
if let Some(ref usage) = chunk.usage {
tracing::trace!(total_tokens = usage.total_tokens, "Streaming usage");
prompt_tokens = usage.prompt_tokens;
completion_tokens = usage.completion_tokens;
// Prefer nested details; fall back to top-level field.
cached_tokens = usage
.prompt_tokens_details
.as_ref()
.map(|d| d.cached_tokens)
.unwrap_or(usage.cached_tokens);
}
for choice in &chunk.choices {
// Skip non-primary choices (index > 0) for single-output models.
if choice.index > 0 {
tracing::trace!(
index = choice.index,
"Skipping non-primary streaming choice"
);
continue;
}
if let Some(ref role) = choice.delta.role {
tracing::trace!(role = %role, "Streaming delta role");
}
// Reasoning content (preserved thinking)
if let Some(ref reasoning) = choice.delta.reasoning_content {
let _ = tx.send(StreamEvent::Reasoning(reasoning.clone()));
}
// Text content
if let Some(ref content) = choice.delta.content {
let _ = tx.send(StreamEvent::Token(content.text_content()));
}
// Tool calls
if let Some(ref tool_calls) = choice.delta.tool_calls {
for tc in tool_calls {
if let Some(ref id) = tc.id {
let name = tc
.function
.as_ref()
.and_then(|f| f.name.clone())
.unwrap_or_default();
let _ = tx.send(StreamEvent::ToolCallStart {
index: tc.index,
id: id.clone(),
name,
});
}
if let Some(ref func) = tc.function
&& let Some(ref args) = func.arguments
{
let _ = tx.send(StreamEvent::ToolCallArgs {
index: tc.index,
args_chunk: args.clone(),
});
}
}
}
if choice.finish_reason.is_some() {
let _ = tx.send(StreamEvent::Done {
prompt_tokens,
completion_tokens,
cached_tokens,
});
}
}
}
Err(e) => {
tracing::warn!("Failed to parse SSE chunk: {e}");
let _ = tx.send(StreamEvent::Error(format!("Parse error: {e}")));
}
}
}
}
}
let _ = tx.send(StreamEvent::Done {
prompt_tokens,
completion_tokens,
cached_tokens,
});
Ok(())
}