1use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone)]
9pub enum StreamEvent {
10 MessageStart { id: String, model: String },
12 ContentBlockStart {
14 index: u32,
15 block_type: ContentBlockType,
16 },
17 ContentBlockDelta { index: u32, delta: ContentDelta },
19 ContentBlockStop { index: u32 },
21 MessageDelta {
23 stop_reason: Option<String>,
24 usage: StreamUsage,
25 },
26 MessageStop,
28}
29
30#[derive(Debug, Clone)]
32pub enum ContentBlockType {
33 Text,
34 Thinking,
35 ToolUse { id: String, name: String },
36}
37
38#[derive(Debug, Clone)]
40pub enum ContentDelta {
41 Text(String),
42 Thinking(String),
43 ToolInput(String),
44}
45
46#[derive(Debug, Clone, Default)]
48pub struct StreamUsage {
49 pub input_tokens: u32,
50 pub output_tokens: u32,
51}
52
53#[derive(Debug, Clone, Copy)]
55pub enum StreamProvider {
56 Anthropic,
57 OpenAI,
58 Gemini,
59 AzureOpenAI,
60 Bedrock,
61 Ollama,
62}
63
64#[derive(Debug)]
66pub struct StreamState {
67 model: String,
68 message_started: bool,
69 text_started: bool,
70 text_finished: bool,
71 thinking_started: bool,
72 thinking_finished: bool,
73 finished: bool,
74 stop_reason: Option<String>,
75 usage: Option<StreamUsage>,
76 #[allow(dead_code)]
77 tool_index_offset: u32,
78 #[allow(dead_code)]
79 tool_calls_count: u32,
80}
81
82impl StreamState {
83 pub fn new(model: String) -> Self {
85 Self {
86 model,
87 message_started: false,
88 text_started: false,
89 text_finished: false,
90 thinking_started: false,
91 thinking_finished: false,
92 finished: false,
93 stop_reason: None,
94 usage: None,
95 tool_index_offset: 0,
96 tool_calls_count: 0,
97 }
98 }
99
100 pub fn ingest_anthropic(&mut self, event: AnthropicStreamEvent) -> Vec<StreamEvent> {
102 let mut events = Vec::new();
103
104 match event {
105 AnthropicStreamEvent::MessageStart { message } => {
106 if !self.message_started {
107 self.message_started = true;
108 events.push(StreamEvent::MessageStart {
109 id: message.id,
110 model: message.model,
111 });
112 }
113 }
114 AnthropicStreamEvent::ContentBlockStart {
115 index,
116 content_block,
117 } => {
118 let block_type = match content_block {
119 AnthropicContentBlock::Text { .. } => ContentBlockType::Text,
120 AnthropicContentBlock::Thinking { .. } => ContentBlockType::Thinking,
121 AnthropicContentBlock::ToolUse { id, name, .. } => {
122 ContentBlockType::ToolUse { id, name }
123 }
124 };
125 events.push(StreamEvent::ContentBlockStart { index, block_type });
126 }
127 AnthropicStreamEvent::ContentBlockDelta { index, delta } => {
128 let content_delta = match delta {
129 AnthropicContentDelta::Text { text } => ContentDelta::Text(text),
130 AnthropicContentDelta::Thinking { thinking } => {
131 ContentDelta::Thinking(thinking)
132 }
133 AnthropicContentDelta::InputJson { partial_json } => {
134 ContentDelta::ToolInput(partial_json)
135 }
136 };
137 events.push(StreamEvent::ContentBlockDelta {
138 index,
139 delta: content_delta,
140 });
141 }
142 AnthropicStreamEvent::ContentBlockStop { index } => {
143 events.push(StreamEvent::ContentBlockStop { index });
144 }
145 AnthropicStreamEvent::MessageDelta { delta, usage } => {
146 self.stop_reason = delta.stop_reason;
147 self.usage = Some(StreamUsage {
148 input_tokens: usage.input_tokens,
149 output_tokens: usage.output_tokens,
150 });
151 events.push(StreamEvent::MessageDelta {
152 stop_reason: self.stop_reason.clone(),
153 usage: self.usage.clone().unwrap_or_default(),
154 });
155 }
156 AnthropicStreamEvent::MessageStop { .. } => {
157 events.push(StreamEvent::MessageStop);
158 }
159 }
160
161 events
162 }
163
164 pub fn ingest_openai(&mut self, chunk: OpenAiStreamChunk) -> Vec<StreamEvent> {
166 let mut events = Vec::new();
167
168 if !self.message_started {
169 self.message_started = true;
170 events.push(StreamEvent::MessageStart {
171 id: chunk.id.clone(),
172 model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
173 });
174 }
175
176 if let Some(usage) = chunk.usage {
177 self.usage = Some(StreamUsage {
178 input_tokens: usage.prompt_tokens,
179 output_tokens: usage.completion_tokens,
180 });
181 }
182
183 for choice in chunk.choices {
184 if let Some(reasoning) = choice.delta.reasoning_content.filter(|v| !v.is_empty()) {
186 if !self.thinking_started {
187 self.thinking_started = true;
188 events.push(StreamEvent::ContentBlockStart {
189 index: 0,
190 block_type: ContentBlockType::Thinking,
191 });
192 }
193 events.push(StreamEvent::ContentBlockDelta {
194 index: 0,
195 delta: ContentDelta::Thinking(reasoning),
196 });
197 }
198
199 if let Some(content) = choice.delta.content.filter(|v| !v.is_empty()) {
201 if self.thinking_started && !self.thinking_finished {
203 self.thinking_finished = true;
204 events.push(StreamEvent::ContentBlockStop { index: 0 });
205 }
206
207 let text_index = if self.thinking_started { 1 } else { 0 };
208 if !self.text_started {
209 self.text_started = true;
210 events.push(StreamEvent::ContentBlockStart {
211 index: text_index,
212 block_type: ContentBlockType::Text,
213 });
214 }
215 events.push(StreamEvent::ContentBlockDelta {
216 index: text_index,
217 delta: ContentDelta::Text(content),
218 });
219 }
220
221 for (i, tool_call) in choice.delta.tool_calls.into_iter().enumerate() {
223 let tool_index = (if self.thinking_started { 2 } else { 1 }) + i as u32;
224 if let Some(name) = tool_call.function.name {
225 events.push(StreamEvent::ContentBlockStart {
226 index: tool_index,
227 block_type: ContentBlockType::ToolUse {
228 id: tool_call.id.unwrap_or_default(),
229 name,
230 },
231 });
232 }
233 if let Some(args) = tool_call.function.arguments {
234 events.push(StreamEvent::ContentBlockDelta {
235 index: tool_index,
236 delta: ContentDelta::ToolInput(args),
237 });
238 }
239 }
240
241 if let Some(finish_reason) = choice.finish_reason {
243 self.stop_reason = Some(normalize_openai_finish_reason(&finish_reason));
244 }
245 }
246
247 events
248 }
249
250 pub fn ingest_ollama(&mut self, chunk: OllamaStreamChunk) -> Vec<StreamEvent> {
252 let mut events = Vec::new();
253
254 if !self.message_started {
256 self.message_started = true;
257 events.push(StreamEvent::MessageStart {
258 id: "".to_string(),
259 model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
260 });
261 }
262
263 if let Some(message) = &chunk.message {
265 if let Some(content) = &message.content {
266 if !content.is_empty() {
267 if !self.text_started {
268 self.text_started = true;
269 events.push(StreamEvent::ContentBlockStart {
270 index: 0,
271 block_type: ContentBlockType::Text,
272 });
273 }
274 events.push(StreamEvent::ContentBlockDelta {
275 index: 0,
276 delta: ContentDelta::Text(content.clone()),
277 });
278 }
279 }
280 }
281
282 if chunk.done {
284 if chunk.prompt_eval_count.is_some() || chunk.eval_count.is_some() {
286 self.usage = Some(StreamUsage {
287 input_tokens: chunk.prompt_eval_count.unwrap_or(0),
288 output_tokens: chunk.eval_count.unwrap_or(0),
289 });
290 }
291
292 if self.text_started && !self.text_finished {
294 self.text_finished = true;
295 events.push(StreamEvent::ContentBlockStop { index: 0 });
296 }
297
298 events.push(StreamEvent::MessageDelta {
299 stop_reason: Some("stop".to_string()),
300 usage: self.usage.clone().unwrap_or_default(),
301 });
302 events.push(StreamEvent::MessageStop);
303 }
304
305 events
306 }
307
308 pub fn finish(&mut self) -> Vec<StreamEvent> {
310 if self.finished {
311 return Vec::new();
312 }
313 self.finished = true;
314
315 let mut events = Vec::new();
316
317 if self.thinking_started && !self.thinking_finished {
319 self.thinking_finished = true;
320 events.push(StreamEvent::ContentBlockStop { index: 0 });
321 }
322
323 if self.text_started && !self.text_finished {
325 self.text_finished = true;
326 let text_index = if self.thinking_started { 1 } else { 0 };
327 events.push(StreamEvent::ContentBlockStop { index: text_index });
328 }
329
330 if self.message_started {
332 events.push(StreamEvent::MessageDelta {
333 stop_reason: self
334 .stop_reason
335 .clone()
336 .or_else(|| Some("end_turn".to_string())),
337 usage: self.usage.clone().unwrap_or_default(),
338 });
339 events.push(StreamEvent::MessageStop);
340 }
341
342 events
343 }
344}
345
346fn normalize_openai_finish_reason(reason: &str) -> String {
347 match reason {
348 "stop" => "end_turn".to_string(),
349 "tool_calls" => "tool_use".to_string(),
350 other => other.to_string(),
351 }
352}
353
354#[derive(Debug, Clone, Deserialize, Serialize)]
360#[serde(tag = "type", rename_all = "snake_case")]
361pub enum AnthropicStreamEvent {
362 MessageStart { message: AnthropicMessageStart },
364 ContentBlockStart {
366 index: u32,
367 content_block: AnthropicContentBlock,
368 },
369 ContentBlockDelta {
371 index: u32,
372 delta: AnthropicContentDelta,
373 },
374 ContentBlockStop { index: u32 },
376 MessageDelta {
378 delta: AnthropicMessageDelta,
379 #[serde(default)]
380 usage: AnthropicStreamUsage,
381 },
382 MessageStop {},
384}
385
386#[derive(Debug, Clone, Deserialize, Serialize)]
388pub struct AnthropicMessageStart {
389 pub id: String,
390 #[serde(rename = "type")]
391 pub kind: String,
392 pub role: String,
393 pub model: String,
394 #[serde(default)]
395 pub content: Vec<AnthropicContentBlock>,
396 #[serde(default)]
397 pub stop_reason: Option<String>,
398 #[serde(default)]
399 pub stop_sequence: Option<String>,
400 #[serde(default)]
401 pub usage: AnthropicStreamUsage,
402}
403
404#[derive(Debug, Clone, Deserialize, Serialize)]
406#[serde(tag = "type", rename_all = "snake_case")]
407pub enum AnthropicContentBlock {
408 Text {
409 text: String,
410 },
411 Thinking {
412 thinking: String,
413 },
414 ToolUse {
415 id: String,
416 name: String,
417 input: serde_json::Value,
418 },
419}
420
421#[derive(Debug, Clone, Deserialize, Serialize)]
423#[serde(tag = "type", rename_all = "snake_case")]
424pub enum AnthropicContentDelta {
425 #[serde(rename = "text_delta")]
426 Text { text: String },
427 #[serde(rename = "thinking_delta")]
428 Thinking { thinking: String },
429 #[serde(rename = "input_json_delta")]
430 InputJson { partial_json: String },
431}
432
433#[derive(Debug, Clone, Deserialize, Serialize)]
435pub struct AnthropicMessageDelta {
436 pub stop_reason: Option<String>,
437 pub stop_sequence: Option<String>,
438}
439
440#[derive(Debug, Clone, Default, Deserialize, Serialize)]
442pub struct AnthropicStreamUsage {
443 #[serde(default)]
444 pub input_tokens: u32,
445 #[serde(default)]
446 pub output_tokens: u32,
447}
448
449#[derive(Debug, Clone, Deserialize)]
455pub struct OpenAiStreamChunk {
456 pub id: String,
457 #[serde(default)]
458 pub model: Option<String>,
459 #[serde(default)]
460 pub choices: Vec<OpenAiStreamChoice>,
461 #[serde(default)]
462 pub usage: Option<OpenAiStreamUsage>,
463}
464
465#[derive(Debug, Clone, Deserialize)]
467pub struct OpenAiStreamChoice {
468 pub delta: OpenAiStreamDelta,
469 #[serde(default)]
470 pub finish_reason: Option<String>,
471}
472
473#[derive(Debug, Default, Clone, Deserialize)]
475pub struct OpenAiStreamDelta {
476 #[serde(default)]
477 pub content: Option<String>,
478 #[serde(default)]
479 pub reasoning_content: Option<String>,
480 #[serde(default)]
481 pub tool_calls: Vec<OpenAiStreamToolCall>,
482}
483
484#[derive(Debug, Clone, Deserialize)]
486pub struct OpenAiStreamToolCall {
487 #[serde(default)]
488 pub index: u32,
489 #[serde(default)]
490 pub id: Option<String>,
491 #[serde(default)]
492 pub function: OpenAiStreamFunction,
493}
494
495#[derive(Debug, Default, Clone, Deserialize)]
497pub struct OpenAiStreamFunction {
498 #[serde(default)]
499 pub name: Option<String>,
500 #[serde(default)]
501 pub arguments: Option<String>,
502}
503
504#[derive(Debug, Clone, Deserialize)]
506pub struct OpenAiStreamUsage {
507 #[serde(default)]
508 pub prompt_tokens: u32,
509 #[serde(default)]
510 pub completion_tokens: u32,
511}
512
513#[derive(Debug, Clone, Deserialize)]
519pub struct OllamaStreamChunk {
520 #[serde(default)]
521 pub model: Option<String>,
522 #[serde(default)]
523 pub message: Option<OllamaStreamMessage>,
524 #[serde(default)]
525 pub done: bool,
526 #[serde(default)]
527 pub prompt_eval_count: Option<u32>,
528 #[serde(default)]
529 pub eval_count: Option<u32>,
530}
531
532#[derive(Debug, Clone, Deserialize)]
534pub struct OllamaStreamMessage {
535 #[serde(default)]
536 pub role: Option<String>,
537 #[serde(default)]
538 pub content: Option<String>,
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544
545 #[test]
546 fn stream_state_handles_anthropic_events() {
547 let mut state = StreamState::new("claude-sonnet-4-6".to_string());
548
549 let start_event = AnthropicStreamEvent::MessageStart {
550 message: AnthropicMessageStart {
551 id: "msg_123".to_string(),
552 kind: "message".to_string(),
553 role: "assistant".to_string(),
554 model: "claude-sonnet-4-6".to_string(),
555 content: vec![],
556 stop_reason: None,
557 stop_sequence: None,
558 usage: AnthropicStreamUsage::default(),
559 },
560 };
561
562 let events = state.ingest_anthropic(start_event);
563 assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
564 }
565
566 #[test]
567 fn stream_state_handles_openai_events() {
568 let mut state = StreamState::new("gpt-4o".to_string());
569
570 let chunk = OpenAiStreamChunk {
571 id: "chatcmpl_123".to_string(),
572 model: Some("gpt-4o".to_string()),
573 choices: vec![OpenAiStreamChoice {
574 delta: OpenAiStreamDelta {
575 content: Some("Hello".to_string()),
576 ..Default::default()
577 },
578 finish_reason: None,
579 }],
580 usage: None,
581 };
582
583 let events = state.ingest_openai(chunk);
584 assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
585 assert!(matches!(events[1], StreamEvent::ContentBlockStart { .. }));
586 }
587}