Skip to main content

sh_layer1/streaming/
providers.rs

1//! LLM 提供商流式格式
2//!
3//! 定义各 LLM 提供商的流式响应格式和统一的事件抽象。
4
5use serde::{Deserialize, Serialize};
6
7/// 统一的流式事件
8#[derive(Debug, Clone)]
9pub enum StreamEvent {
10    /// 消息开始
11    MessageStart { id: String, model: String },
12    /// 内容块开始
13    ContentBlockStart {
14        index: u32,
15        block_type: ContentBlockType,
16    },
17    /// 内容块增量
18    ContentBlockDelta { index: u32, delta: ContentDelta },
19    /// 内容块结束
20    ContentBlockStop { index: u32 },
21    /// 消息增量
22    MessageDelta {
23        stop_reason: Option<String>,
24        usage: StreamUsage,
25    },
26    /// 消息结束
27    MessageStop,
28}
29
30/// 内容块类型
31#[derive(Debug, Clone)]
32pub enum ContentBlockType {
33    Text,
34    Thinking,
35    ToolUse { id: String, name: String },
36}
37
38/// 内容增量
39#[derive(Debug, Clone)]
40pub enum ContentDelta {
41    Text(String),
42    Thinking(String),
43    ToolInput(String),
44}
45
46/// 流式用量
47#[derive(Debug, Clone, Default)]
48pub struct StreamUsage {
49    pub input_tokens: u32,
50    pub output_tokens: u32,
51}
52
53/// 流式提供商类型
54#[derive(Debug, Clone, Copy)]
55pub enum StreamProvider {
56    Anthropic,
57    OpenAI,
58    Gemini,
59    AzureOpenAI,
60    Bedrock,
61    Ollama,
62}
63
64/// 流式响应状态
65#[derive(Debug)]
66pub struct StreamState {
67    model: String,
68    message_started: bool,
69    text_started: bool,
70    text_finished: bool,
71    thinking_started: bool,
72    thinking_finished: bool,
73    finished: bool,
74    stop_reason: Option<String>,
75    usage: Option<StreamUsage>,
76    #[allow(dead_code)]
77    tool_index_offset: u32,
78    #[allow(dead_code)]
79    tool_calls_count: u32,
80}
81
82impl StreamState {
83    /// 创建新的流状态
84    pub fn new(model: String) -> Self {
85        Self {
86            model,
87            message_started: false,
88            text_started: false,
89            text_finished: false,
90            thinking_started: false,
91            thinking_finished: false,
92            finished: false,
93            stop_reason: None,
94            usage: None,
95            tool_index_offset: 0,
96            tool_calls_count: 0,
97        }
98    }
99
100    /// 处理 Anthropic 事件
101    pub fn ingest_anthropic(&mut self, event: AnthropicStreamEvent) -> Vec<StreamEvent> {
102        let mut events = Vec::new();
103
104        match event {
105            AnthropicStreamEvent::MessageStart { message } => {
106                if !self.message_started {
107                    self.message_started = true;
108                    events.push(StreamEvent::MessageStart {
109                        id: message.id,
110                        model: message.model,
111                    });
112                }
113            }
114            AnthropicStreamEvent::ContentBlockStart {
115                index,
116                content_block,
117            } => {
118                let block_type = match content_block {
119                    AnthropicContentBlock::Text { .. } => ContentBlockType::Text,
120                    AnthropicContentBlock::Thinking { .. } => ContentBlockType::Thinking,
121                    AnthropicContentBlock::ToolUse { id, name, .. } => {
122                        ContentBlockType::ToolUse { id, name }
123                    }
124                };
125                events.push(StreamEvent::ContentBlockStart { index, block_type });
126            }
127            AnthropicStreamEvent::ContentBlockDelta { index, delta } => {
128                let content_delta = match delta {
129                    AnthropicContentDelta::Text { text } => ContentDelta::Text(text),
130                    AnthropicContentDelta::Thinking { thinking } => {
131                        ContentDelta::Thinking(thinking)
132                    }
133                    AnthropicContentDelta::InputJson { partial_json } => {
134                        ContentDelta::ToolInput(partial_json)
135                    }
136                };
137                events.push(StreamEvent::ContentBlockDelta {
138                    index,
139                    delta: content_delta,
140                });
141            }
142            AnthropicStreamEvent::ContentBlockStop { index } => {
143                events.push(StreamEvent::ContentBlockStop { index });
144            }
145            AnthropicStreamEvent::MessageDelta { delta, usage } => {
146                self.stop_reason = delta.stop_reason;
147                self.usage = Some(StreamUsage {
148                    input_tokens: usage.input_tokens,
149                    output_tokens: usage.output_tokens,
150                });
151                events.push(StreamEvent::MessageDelta {
152                    stop_reason: self.stop_reason.clone(),
153                    usage: self.usage.clone().unwrap_or_default(),
154                });
155            }
156            AnthropicStreamEvent::MessageStop { .. } => {
157                events.push(StreamEvent::MessageStop);
158            }
159        }
160
161        events
162    }
163
164    /// 处理 OpenAI 事件
165    pub fn ingest_openai(&mut self, chunk: OpenAiStreamChunk) -> Vec<StreamEvent> {
166        let mut events = Vec::new();
167
168        if !self.message_started {
169            self.message_started = true;
170            events.push(StreamEvent::MessageStart {
171                id: chunk.id.clone(),
172                model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
173            });
174        }
175
176        if let Some(usage) = chunk.usage {
177            self.usage = Some(StreamUsage {
178                input_tokens: usage.prompt_tokens,
179                output_tokens: usage.completion_tokens,
180            });
181        }
182
183        for choice in chunk.choices {
184            // 处理 reasoning_content(思考内容)
185            if let Some(reasoning) = choice.delta.reasoning_content.filter(|v| !v.is_empty()) {
186                if !self.thinking_started {
187                    self.thinking_started = true;
188                    events.push(StreamEvent::ContentBlockStart {
189                        index: 0,
190                        block_type: ContentBlockType::Thinking,
191                    });
192                }
193                events.push(StreamEvent::ContentBlockDelta {
194                    index: 0,
195                    delta: ContentDelta::Thinking(reasoning),
196                });
197            }
198
199            // 处理常规内容
200            if let Some(content) = choice.delta.content.filter(|v| !v.is_empty()) {
201                // 如果之前有思考块,先关闭它
202                if self.thinking_started && !self.thinking_finished {
203                    self.thinking_finished = true;
204                    events.push(StreamEvent::ContentBlockStop { index: 0 });
205                }
206
207                let text_index = if self.thinking_started { 1 } else { 0 };
208                if !self.text_started {
209                    self.text_started = true;
210                    events.push(StreamEvent::ContentBlockStart {
211                        index: text_index,
212                        block_type: ContentBlockType::Text,
213                    });
214                }
215                events.push(StreamEvent::ContentBlockDelta {
216                    index: text_index,
217                    delta: ContentDelta::Text(content),
218                });
219            }
220
221            // 处理工具调用
222            for (i, tool_call) in choice.delta.tool_calls.into_iter().enumerate() {
223                let tool_index = (if self.thinking_started { 2 } else { 1 }) + i as u32;
224                if let Some(name) = tool_call.function.name {
225                    events.push(StreamEvent::ContentBlockStart {
226                        index: tool_index,
227                        block_type: ContentBlockType::ToolUse {
228                            id: tool_call.id.unwrap_or_default(),
229                            name,
230                        },
231                    });
232                }
233                if let Some(args) = tool_call.function.arguments {
234                    events.push(StreamEvent::ContentBlockDelta {
235                        index: tool_index,
236                        delta: ContentDelta::ToolInput(args),
237                    });
238                }
239            }
240
241            // 处理结束原因
242            if let Some(finish_reason) = choice.finish_reason {
243                self.stop_reason = Some(normalize_openai_finish_reason(&finish_reason));
244            }
245        }
246
247        events
248    }
249
250    /// 处理 Ollama 流式事件
251    pub fn ingest_ollama(&mut self, chunk: OllamaStreamChunk) -> Vec<StreamEvent> {
252        let mut events = Vec::new();
253
254        // 消息开始
255        if !self.message_started {
256            self.message_started = true;
257            events.push(StreamEvent::MessageStart {
258                id: "".to_string(),
259                model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
260            });
261        }
262
263        // 内容增量
264        if let Some(message) = &chunk.message {
265            if let Some(content) = &message.content {
266                if !content.is_empty() {
267                    if !self.text_started {
268                        self.text_started = true;
269                        events.push(StreamEvent::ContentBlockStart {
270                            index: 0,
271                            block_type: ContentBlockType::Text,
272                        });
273                    }
274                    events.push(StreamEvent::ContentBlockDelta {
275                        index: 0,
276                        delta: ContentDelta::Text(content.clone()),
277                    });
278                }
279            }
280        }
281
282        // 消息结束
283        if chunk.done {
284            // 更新用量
285            if chunk.prompt_eval_count.is_some() || chunk.eval_count.is_some() {
286                self.usage = Some(StreamUsage {
287                    input_tokens: chunk.prompt_eval_count.unwrap_or(0),
288                    output_tokens: chunk.eval_count.unwrap_or(0),
289                });
290            }
291
292            // 关闭文本块
293            if self.text_started && !self.text_finished {
294                self.text_finished = true;
295                events.push(StreamEvent::ContentBlockStop { index: 0 });
296            }
297
298            events.push(StreamEvent::MessageDelta {
299                stop_reason: Some("stop".to_string()),
300                usage: self.usage.clone().unwrap_or_default(),
301            });
302            events.push(StreamEvent::MessageStop);
303        }
304
305        events
306    }
307
308    /// 完成流处理
309    pub fn finish(&mut self) -> Vec<StreamEvent> {
310        if self.finished {
311            return Vec::new();
312        }
313        self.finished = true;
314
315        let mut events = Vec::new();
316
317        // 关闭思考块
318        if self.thinking_started && !self.thinking_finished {
319            self.thinking_finished = true;
320            events.push(StreamEvent::ContentBlockStop { index: 0 });
321        }
322
323        // 关闭文本块
324        if self.text_started && !self.text_finished {
325            self.text_finished = true;
326            let text_index = if self.thinking_started { 1 } else { 0 };
327            events.push(StreamEvent::ContentBlockStop { index: text_index });
328        }
329
330        // 发送消息增量
331        if self.message_started {
332            events.push(StreamEvent::MessageDelta {
333                stop_reason: self
334                    .stop_reason
335                    .clone()
336                    .or_else(|| Some("end_turn".to_string())),
337                usage: self.usage.clone().unwrap_or_default(),
338            });
339            events.push(StreamEvent::MessageStop);
340        }
341
342        events
343    }
344}
345
346fn normalize_openai_finish_reason(reason: &str) -> String {
347    match reason {
348        "stop" => "end_turn".to_string(),
349        "tool_calls" => "tool_use".to_string(),
350        other => other.to_string(),
351    }
352}
353
354// ============================================================================
355// Anthropic 流式格式
356// ============================================================================
357
358/// Anthropic 流式事件
359#[derive(Debug, Clone, Deserialize, Serialize)]
360#[serde(tag = "type", rename_all = "snake_case")]
361pub enum AnthropicStreamEvent {
362    /// 消息开始
363    MessageStart { message: AnthropicMessageStart },
364    /// 内容块开始
365    ContentBlockStart {
366        index: u32,
367        content_block: AnthropicContentBlock,
368    },
369    /// 内容块增量
370    ContentBlockDelta {
371        index: u32,
372        delta: AnthropicContentDelta,
373    },
374    /// 内容块结束
375    ContentBlockStop { index: u32 },
376    /// 消息增量
377    MessageDelta {
378        delta: AnthropicMessageDelta,
379        #[serde(default)]
380        usage: AnthropicStreamUsage,
381    },
382    /// 消息结束
383    MessageStop {},
384}
385
386/// Anthropic 消息开始事件
387#[derive(Debug, Clone, Deserialize, Serialize)]
388pub struct AnthropicMessageStart {
389    pub id: String,
390    #[serde(rename = "type")]
391    pub kind: String,
392    pub role: String,
393    pub model: String,
394    #[serde(default)]
395    pub content: Vec<AnthropicContentBlock>,
396    #[serde(default)]
397    pub stop_reason: Option<String>,
398    #[serde(default)]
399    pub stop_sequence: Option<String>,
400    #[serde(default)]
401    pub usage: AnthropicStreamUsage,
402}
403
404/// Anthropic 内容块
405#[derive(Debug, Clone, Deserialize, Serialize)]
406#[serde(tag = "type", rename_all = "snake_case")]
407pub enum AnthropicContentBlock {
408    Text {
409        text: String,
410    },
411    Thinking {
412        thinking: String,
413    },
414    ToolUse {
415        id: String,
416        name: String,
417        input: serde_json::Value,
418    },
419}
420
421/// Anthropic 内容增量
422#[derive(Debug, Clone, Deserialize, Serialize)]
423#[serde(tag = "type", rename_all = "snake_case")]
424pub enum AnthropicContentDelta {
425    #[serde(rename = "text_delta")]
426    Text { text: String },
427    #[serde(rename = "thinking_delta")]
428    Thinking { thinking: String },
429    #[serde(rename = "input_json_delta")]
430    InputJson { partial_json: String },
431}
432
433/// Anthropic 消息增量
434#[derive(Debug, Clone, Deserialize, Serialize)]
435pub struct AnthropicMessageDelta {
436    pub stop_reason: Option<String>,
437    pub stop_sequence: Option<String>,
438}
439
440/// Anthropic 流式用量
441#[derive(Debug, Clone, Default, Deserialize, Serialize)]
442pub struct AnthropicStreamUsage {
443    #[serde(default)]
444    pub input_tokens: u32,
445    #[serde(default)]
446    pub output_tokens: u32,
447}
448
449// ============================================================================
450// OpenAI 流式格式
451// ============================================================================
452
453/// OpenAI 流式事件
454#[derive(Debug, Clone, Deserialize)]
455pub struct OpenAiStreamChunk {
456    pub id: String,
457    #[serde(default)]
458    pub model: Option<String>,
459    #[serde(default)]
460    pub choices: Vec<OpenAiStreamChoice>,
461    #[serde(default)]
462    pub usage: Option<OpenAiStreamUsage>,
463}
464
465/// OpenAI 流式选择
466#[derive(Debug, Clone, Deserialize)]
467pub struct OpenAiStreamChoice {
468    pub delta: OpenAiStreamDelta,
469    #[serde(default)]
470    pub finish_reason: Option<String>,
471}
472
473/// OpenAI 流式增量
474#[derive(Debug, Default, Clone, Deserialize)]
475pub struct OpenAiStreamDelta {
476    #[serde(default)]
477    pub content: Option<String>,
478    #[serde(default)]
479    pub reasoning_content: Option<String>,
480    #[serde(default)]
481    pub tool_calls: Vec<OpenAiStreamToolCall>,
482}
483
484/// OpenAI 流式工具调用
485#[derive(Debug, Clone, Deserialize)]
486pub struct OpenAiStreamToolCall {
487    #[serde(default)]
488    pub index: u32,
489    #[serde(default)]
490    pub id: Option<String>,
491    #[serde(default)]
492    pub function: OpenAiStreamFunction,
493}
494
495/// OpenAI 流式函数
496#[derive(Debug, Default, Clone, Deserialize)]
497pub struct OpenAiStreamFunction {
498    #[serde(default)]
499    pub name: Option<String>,
500    #[serde(default)]
501    pub arguments: Option<String>,
502}
503
504/// OpenAI 流式用量
505#[derive(Debug, Clone, Deserialize)]
506pub struct OpenAiStreamUsage {
507    #[serde(default)]
508    pub prompt_tokens: u32,
509    #[serde(default)]
510    pub completion_tokens: u32,
511}
512
513// ============================================================================
514// Ollama 流式格式
515// ============================================================================
516
517/// Ollama 流式响应块
518#[derive(Debug, Clone, Deserialize)]
519pub struct OllamaStreamChunk {
520    #[serde(default)]
521    pub model: Option<String>,
522    #[serde(default)]
523    pub message: Option<OllamaStreamMessage>,
524    #[serde(default)]
525    pub done: bool,
526    #[serde(default)]
527    pub prompt_eval_count: Option<u32>,
528    #[serde(default)]
529    pub eval_count: Option<u32>,
530}
531
532/// Ollama 流式消息
533#[derive(Debug, Clone, Deserialize)]
534pub struct OllamaStreamMessage {
535    #[serde(default)]
536    pub role: Option<String>,
537    #[serde(default)]
538    pub content: Option<String>,
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544
545    #[test]
546    fn stream_state_handles_anthropic_events() {
547        let mut state = StreamState::new("claude-sonnet-4-6".to_string());
548
549        let start_event = AnthropicStreamEvent::MessageStart {
550            message: AnthropicMessageStart {
551                id: "msg_123".to_string(),
552                kind: "message".to_string(),
553                role: "assistant".to_string(),
554                model: "claude-sonnet-4-6".to_string(),
555                content: vec![],
556                stop_reason: None,
557                stop_sequence: None,
558                usage: AnthropicStreamUsage::default(),
559            },
560        };
561
562        let events = state.ingest_anthropic(start_event);
563        assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
564    }
565
566    #[test]
567    fn stream_state_handles_openai_events() {
568        let mut state = StreamState::new("gpt-4o".to_string());
569
570        let chunk = OpenAiStreamChunk {
571            id: "chatcmpl_123".to_string(),
572            model: Some("gpt-4o".to_string()),
573            choices: vec![OpenAiStreamChoice {
574                delta: OpenAiStreamDelta {
575                    content: Some("Hello".to_string()),
576                    ..Default::default()
577                },
578                finish_reason: None,
579            }],
580            usage: None,
581        };
582
583        let events = state.ingest_openai(chunk);
584        assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
585        assert!(matches!(events[1], StreamEvent::ContentBlockStart { .. }));
586    }
587}