1use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone)]
9pub enum StreamEvent {
10 MessageStart { id: String, model: String },
12 ContentBlockStart {
14 index: u32,
15 block_type: ContentBlockType,
16 },
17 ContentBlockDelta { index: u32, delta: ContentDelta },
19 ContentBlockStop { index: u32 },
21 MessageDelta {
23 stop_reason: Option<String>,
24 usage: StreamUsage,
25 },
26 MessageStop,
28}
29
30#[derive(Debug, Clone)]
32pub enum ContentBlockType {
33 Text,
34 Thinking,
35 ToolUse { id: String, name: String },
36}
37
38#[derive(Debug, Clone)]
40pub enum ContentDelta {
41 Text(String),
42 Thinking(String),
43 ToolInput(String),
44}
45
46#[derive(Debug, Clone, Default)]
48pub struct StreamUsage {
49 pub input_tokens: u32,
50 pub output_tokens: u32,
51}
52
53#[derive(Debug, Clone, Copy)]
55pub enum StreamProvider {
56 Anthropic,
57 AnthropicCompatible,
59 OpenAI,
60 OpenAICompatible,
62 Gemini,
63 AzureOpenAI,
64 Bedrock,
65 Ollama,
66}
67
68#[derive(Debug)]
70pub struct StreamState {
71 model: String,
72 message_started: bool,
73 text_started: bool,
74 text_finished: bool,
75 thinking_started: bool,
76 thinking_finished: bool,
77 finished: bool,
78 stop_reason: Option<String>,
79 usage: Option<StreamUsage>,
80 #[allow(dead_code)]
81 tool_index_offset: u32,
82 #[allow(dead_code)]
83 tool_calls_count: u32,
84}
85
86impl StreamState {
87 pub fn new(model: String) -> Self {
89 Self {
90 model,
91 message_started: false,
92 text_started: false,
93 text_finished: false,
94 thinking_started: false,
95 thinking_finished: false,
96 finished: false,
97 stop_reason: None,
98 usage: None,
99 tool_index_offset: 0,
100 tool_calls_count: 0,
101 }
102 }
103
104 pub fn ingest_anthropic(&mut self, event: AnthropicStreamEvent) -> Vec<StreamEvent> {
106 let mut events = Vec::new();
107
108 match event {
109 AnthropicStreamEvent::MessageStart { message } => {
110 if !self.message_started {
111 self.message_started = true;
112 events.push(StreamEvent::MessageStart {
113 id: message.id,
114 model: message.model,
115 });
116 }
117 }
118 AnthropicStreamEvent::ContentBlockStart {
119 index,
120 content_block,
121 } => {
122 let block_type = match content_block {
123 AnthropicContentBlock::Text { .. } => ContentBlockType::Text,
124 AnthropicContentBlock::Thinking { .. } => ContentBlockType::Thinking,
125 AnthropicContentBlock::ToolUse { id, name, .. } => {
126 ContentBlockType::ToolUse { id, name }
127 }
128 };
129 events.push(StreamEvent::ContentBlockStart { index, block_type });
130 }
131 AnthropicStreamEvent::ContentBlockDelta { index, delta } => {
132 let content_delta = match delta {
133 AnthropicContentDelta::Text { text } => ContentDelta::Text(text),
134 AnthropicContentDelta::Thinking { thinking } => {
135 ContentDelta::Thinking(thinking)
136 }
137 AnthropicContentDelta::InputJson { partial_json } => {
138 ContentDelta::ToolInput(partial_json)
139 }
140 };
141 events.push(StreamEvent::ContentBlockDelta {
142 index,
143 delta: content_delta,
144 });
145 }
146 AnthropicStreamEvent::ContentBlockStop { index } => {
147 events.push(StreamEvent::ContentBlockStop { index });
148 }
149 AnthropicStreamEvent::MessageDelta { delta, usage } => {
150 self.stop_reason = delta.stop_reason;
151 self.usage = Some(StreamUsage {
152 input_tokens: usage.input_tokens,
153 output_tokens: usage.output_tokens,
154 });
155 events.push(StreamEvent::MessageDelta {
156 stop_reason: self.stop_reason.clone(),
157 usage: self.usage.clone().unwrap_or_default(),
158 });
159 }
160 AnthropicStreamEvent::MessageStop { .. } => {
161 events.push(StreamEvent::MessageStop);
162 }
163 }
164
165 events
166 }
167
168 pub fn ingest_openai(&mut self, chunk: OpenAiStreamChunk) -> Vec<StreamEvent> {
170 let mut events = Vec::new();
171
172 if !self.message_started {
173 self.message_started = true;
174 events.push(StreamEvent::MessageStart {
175 id: chunk.id.clone(),
176 model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
177 });
178 }
179
180 if let Some(usage) = chunk.usage {
181 self.usage = Some(StreamUsage {
182 input_tokens: usage.prompt_tokens,
183 output_tokens: usage.completion_tokens,
184 });
185 }
186
187 for choice in chunk.choices {
188 if let Some(reasoning) = choice.delta.reasoning_content.filter(|v| !v.is_empty()) {
190 if !self.thinking_started {
191 self.thinking_started = true;
192 events.push(StreamEvent::ContentBlockStart {
193 index: 0,
194 block_type: ContentBlockType::Thinking,
195 });
196 }
197 events.push(StreamEvent::ContentBlockDelta {
198 index: 0,
199 delta: ContentDelta::Thinking(reasoning),
200 });
201 }
202
203 if let Some(content) = choice.delta.content.filter(|v| !v.is_empty()) {
205 if self.thinking_started && !self.thinking_finished {
207 self.thinking_finished = true;
208 events.push(StreamEvent::ContentBlockStop { index: 0 });
209 }
210
211 let text_index = if self.thinking_started { 1 } else { 0 };
212 if !self.text_started {
213 self.text_started = true;
214 events.push(StreamEvent::ContentBlockStart {
215 index: text_index,
216 block_type: ContentBlockType::Text,
217 });
218 }
219 events.push(StreamEvent::ContentBlockDelta {
220 index: text_index,
221 delta: ContentDelta::Text(content),
222 });
223 }
224
225 for (i, tool_call) in choice.delta.tool_calls.into_iter().enumerate() {
227 let tool_index = (if self.thinking_started { 2 } else { 1 }) + i as u32;
228 if let Some(name) = tool_call.function.name {
229 events.push(StreamEvent::ContentBlockStart {
230 index: tool_index,
231 block_type: ContentBlockType::ToolUse {
232 id: tool_call.id.unwrap_or_default(),
233 name,
234 },
235 });
236 }
237 if let Some(args) = tool_call.function.arguments {
238 events.push(StreamEvent::ContentBlockDelta {
239 index: tool_index,
240 delta: ContentDelta::ToolInput(args),
241 });
242 }
243 }
244
245 if let Some(finish_reason) = choice.finish_reason {
247 self.stop_reason = Some(normalize_openai_finish_reason(&finish_reason));
248 }
249 }
250
251 events
252 }
253
254 pub fn ingest_ollama(&mut self, chunk: OllamaStreamChunk) -> Vec<StreamEvent> {
256 let mut events = Vec::new();
257
258 if !self.message_started {
260 self.message_started = true;
261 events.push(StreamEvent::MessageStart {
262 id: "".to_string(),
263 model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
264 });
265 }
266
267 if let Some(message) = &chunk.message {
269 if let Some(content) = &message.content {
270 if !content.is_empty() {
271 if !self.text_started {
272 self.text_started = true;
273 events.push(StreamEvent::ContentBlockStart {
274 index: 0,
275 block_type: ContentBlockType::Text,
276 });
277 }
278 events.push(StreamEvent::ContentBlockDelta {
279 index: 0,
280 delta: ContentDelta::Text(content.clone()),
281 });
282 }
283 }
284 }
285
286 if chunk.done {
288 if chunk.prompt_eval_count.is_some() || chunk.eval_count.is_some() {
290 self.usage = Some(StreamUsage {
291 input_tokens: chunk.prompt_eval_count.unwrap_or(0),
292 output_tokens: chunk.eval_count.unwrap_or(0),
293 });
294 }
295
296 if self.text_started && !self.text_finished {
298 self.text_finished = true;
299 events.push(StreamEvent::ContentBlockStop { index: 0 });
300 }
301
302 events.push(StreamEvent::MessageDelta {
303 stop_reason: Some("stop".to_string()),
304 usage: self.usage.clone().unwrap_or_default(),
305 });
306 events.push(StreamEvent::MessageStop);
307 }
308
309 events
310 }
311
312 pub fn finish(&mut self) -> Vec<StreamEvent> {
314 if self.finished {
315 return Vec::new();
316 }
317 self.finished = true;
318
319 let mut events = Vec::new();
320
321 if self.thinking_started && !self.thinking_finished {
323 self.thinking_finished = true;
324 events.push(StreamEvent::ContentBlockStop { index: 0 });
325 }
326
327 if self.text_started && !self.text_finished {
329 self.text_finished = true;
330 let text_index = if self.thinking_started { 1 } else { 0 };
331 events.push(StreamEvent::ContentBlockStop { index: text_index });
332 }
333
334 if self.message_started {
336 events.push(StreamEvent::MessageDelta {
337 stop_reason: self
338 .stop_reason
339 .clone()
340 .or_else(|| Some("end_turn".to_string())),
341 usage: self.usage.clone().unwrap_or_default(),
342 });
343 events.push(StreamEvent::MessageStop);
344 }
345
346 events
347 }
348}
349
350fn normalize_openai_finish_reason(reason: &str) -> String {
351 match reason {
352 "stop" => "end_turn".to_string(),
353 "tool_calls" => "tool_use".to_string(),
354 other => other.to_string(),
355 }
356}
357
358#[derive(Debug, Clone, Deserialize, Serialize)]
364#[serde(tag = "type", rename_all = "snake_case")]
365pub enum AnthropicStreamEvent {
366 MessageStart { message: AnthropicMessageStart },
368 ContentBlockStart {
370 index: u32,
371 content_block: AnthropicContentBlock,
372 },
373 ContentBlockDelta {
375 index: u32,
376 delta: AnthropicContentDelta,
377 },
378 ContentBlockStop { index: u32 },
380 MessageDelta {
382 delta: AnthropicMessageDelta,
383 #[serde(default)]
384 usage: AnthropicStreamUsage,
385 },
386 MessageStop {},
388}
389
390#[derive(Debug, Clone, Deserialize, Serialize)]
392pub struct AnthropicMessageStart {
393 pub id: String,
394 #[serde(rename = "type")]
395 pub kind: String,
396 pub role: String,
397 pub model: String,
398 #[serde(default)]
399 pub content: Vec<AnthropicContentBlock>,
400 #[serde(default)]
401 pub stop_reason: Option<String>,
402 #[serde(default)]
403 pub stop_sequence: Option<String>,
404 #[serde(default)]
405 pub usage: AnthropicStreamUsage,
406}
407
408#[derive(Debug, Clone, Deserialize, Serialize)]
410#[serde(tag = "type", rename_all = "snake_case")]
411pub enum AnthropicContentBlock {
412 Text {
413 text: String,
414 },
415 Thinking {
416 thinking: String,
417 },
418 ToolUse {
419 id: String,
420 name: String,
421 input: serde_json::Value,
422 },
423}
424
425#[derive(Debug, Clone, Deserialize, Serialize)]
427#[serde(tag = "type", rename_all = "snake_case")]
428pub enum AnthropicContentDelta {
429 #[serde(rename = "text_delta")]
430 Text { text: String },
431 #[serde(rename = "thinking_delta")]
432 Thinking { thinking: String },
433 #[serde(rename = "input_json_delta")]
434 InputJson { partial_json: String },
435}
436
437#[derive(Debug, Clone, Deserialize, Serialize)]
439pub struct AnthropicMessageDelta {
440 pub stop_reason: Option<String>,
441 pub stop_sequence: Option<String>,
442}
443
444#[derive(Debug, Clone, Default, Deserialize, Serialize)]
446pub struct AnthropicStreamUsage {
447 #[serde(default)]
448 pub input_tokens: u32,
449 #[serde(default)]
450 pub output_tokens: u32,
451}
452
453#[derive(Debug, Clone, Deserialize)]
459pub struct OpenAiStreamChunk {
460 pub id: String,
461 #[serde(default)]
462 pub model: Option<String>,
463 #[serde(default)]
464 pub choices: Vec<OpenAiStreamChoice>,
465 #[serde(default)]
466 pub usage: Option<OpenAiStreamUsage>,
467}
468
469#[derive(Debug, Clone, Deserialize)]
471pub struct OpenAiStreamChoice {
472 pub delta: OpenAiStreamDelta,
473 #[serde(default)]
474 pub finish_reason: Option<String>,
475}
476
477#[derive(Debug, Default, Clone, Deserialize)]
479pub struct OpenAiStreamDelta {
480 #[serde(default)]
481 pub content: Option<String>,
482 #[serde(default)]
483 pub reasoning_content: Option<String>,
484 #[serde(default)]
485 pub tool_calls: Vec<OpenAiStreamToolCall>,
486}
487
488#[derive(Debug, Clone, Deserialize)]
490pub struct OpenAiStreamToolCall {
491 #[serde(default)]
492 pub index: u32,
493 #[serde(default)]
494 pub id: Option<String>,
495 #[serde(default)]
496 pub function: OpenAiStreamFunction,
497}
498
499#[derive(Debug, Default, Clone, Deserialize)]
501pub struct OpenAiStreamFunction {
502 #[serde(default)]
503 pub name: Option<String>,
504 #[serde(default)]
505 pub arguments: Option<String>,
506}
507
508#[derive(Debug, Clone, Deserialize)]
510pub struct OpenAiStreamUsage {
511 #[serde(default)]
512 pub prompt_tokens: u32,
513 #[serde(default)]
514 pub completion_tokens: u32,
515}
516
517#[derive(Debug, Clone, Deserialize)]
523pub struct OllamaStreamChunk {
524 #[serde(default)]
525 pub model: Option<String>,
526 #[serde(default)]
527 pub message: Option<OllamaStreamMessage>,
528 #[serde(default)]
529 pub done: bool,
530 #[serde(default)]
531 pub prompt_eval_count: Option<u32>,
532 #[serde(default)]
533 pub eval_count: Option<u32>,
534}
535
536#[derive(Debug, Clone, Deserialize)]
538pub struct OllamaStreamMessage {
539 #[serde(default)]
540 pub role: Option<String>,
541 #[serde(default)]
542 pub content: Option<String>,
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548
549 #[test]
550 fn stream_state_handles_anthropic_events() {
551 let mut state = StreamState::new("claude-sonnet-4-6".to_string());
552
553 let start_event = AnthropicStreamEvent::MessageStart {
554 message: AnthropicMessageStart {
555 id: "msg_123".to_string(),
556 kind: "message".to_string(),
557 role: "assistant".to_string(),
558 model: "claude-sonnet-4-6".to_string(),
559 content: vec![],
560 stop_reason: None,
561 stop_sequence: None,
562 usage: AnthropicStreamUsage::default(),
563 },
564 };
565
566 let events = state.ingest_anthropic(start_event);
567 assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
568 }
569
570 #[test]
571 fn stream_state_handles_openai_events() {
572 let mut state = StreamState::new("gpt-4o".to_string());
573
574 let chunk = OpenAiStreamChunk {
575 id: "chatcmpl_123".to_string(),
576 model: Some("gpt-4o".to_string()),
577 choices: vec![OpenAiStreamChoice {
578 delta: OpenAiStreamDelta {
579 content: Some("Hello".to_string()),
580 ..Default::default()
581 },
582 finish_reason: None,
583 }],
584 usage: None,
585 };
586
587 let events = state.ingest_openai(chunk);
588 assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
589 assert!(matches!(events[1], StreamEvent::ContentBlockStart { .. }));
590 }
591}