Skip to main content

sh_layer1/streaming/
providers.rs

1//! LLM 提供商流式格式
2//!
3//! 定义各 LLM 提供商的流式响应格式和统一的事件抽象。
4
5use serde::{Deserialize, Serialize};
6
7/// 统一的流式事件
8#[derive(Debug, Clone)]
9pub enum StreamEvent {
10    /// 消息开始
11    MessageStart { id: String, model: String },
12    /// 内容块开始
13    ContentBlockStart {
14        index: u32,
15        block_type: ContentBlockType,
16    },
17    /// 内容块增量
18    ContentBlockDelta { index: u32, delta: ContentDelta },
19    /// 内容块结束
20    ContentBlockStop { index: u32 },
21    /// 消息增量
22    MessageDelta {
23        stop_reason: Option<String>,
24        usage: StreamUsage,
25    },
26    /// 消息结束
27    MessageStop,
28}
29
30/// 内容块类型
31#[derive(Debug, Clone)]
32pub enum ContentBlockType {
33    Text,
34    Thinking,
35    ToolUse { id: String, name: String },
36}
37
38/// 内容增量
39#[derive(Debug, Clone)]
40pub enum ContentDelta {
41    Text(String),
42    Thinking(String),
43    ToolInput(String),
44}
45
46/// 流式用量
47#[derive(Debug, Clone, Default)]
48pub struct StreamUsage {
49    pub input_tokens: u32,
50    pub output_tokens: u32,
51}
52
53/// 流式提供商类型
54#[derive(Debug, Clone, Copy)]
55pub enum StreamProvider {
56    Anthropic,
57    /// Anthropic-compatible provider (e.g. tencent-coding)
58    AnthropicCompatible,
59    OpenAI,
60    /// OpenAI-compatible provider (e.g. deepseek, glm, qwen)
61    OpenAICompatible,
62    Gemini,
63    AzureOpenAI,
64    Bedrock,
65    Ollama,
66}
67
68/// 流式响应状态
69#[derive(Debug)]
70pub struct StreamState {
71    model: String,
72    message_started: bool,
73    text_started: bool,
74    text_finished: bool,
75    thinking_started: bool,
76    thinking_finished: bool,
77    finished: bool,
78    stop_reason: Option<String>,
79    usage: Option<StreamUsage>,
80    #[allow(dead_code)]
81    tool_index_offset: u32,
82    #[allow(dead_code)]
83    tool_calls_count: u32,
84}
85
86impl StreamState {
87    /// 创建新的流状态
88    pub fn new(model: String) -> Self {
89        Self {
90            model,
91            message_started: false,
92            text_started: false,
93            text_finished: false,
94            thinking_started: false,
95            thinking_finished: false,
96            finished: false,
97            stop_reason: None,
98            usage: None,
99            tool_index_offset: 0,
100            tool_calls_count: 0,
101        }
102    }
103
104    /// 处理 Anthropic 事件
105    pub fn ingest_anthropic(&mut self, event: AnthropicStreamEvent) -> Vec<StreamEvent> {
106        let mut events = Vec::new();
107
108        match event {
109            AnthropicStreamEvent::MessageStart { message } => {
110                if !self.message_started {
111                    self.message_started = true;
112                    events.push(StreamEvent::MessageStart {
113                        id: message.id,
114                        model: message.model,
115                    });
116                }
117            }
118            AnthropicStreamEvent::ContentBlockStart {
119                index,
120                content_block,
121            } => {
122                let block_type = match content_block {
123                    AnthropicContentBlock::Text { .. } => ContentBlockType::Text,
124                    AnthropicContentBlock::Thinking { .. } => ContentBlockType::Thinking,
125                    AnthropicContentBlock::ToolUse { id, name, .. } => {
126                        ContentBlockType::ToolUse { id, name }
127                    }
128                };
129                events.push(StreamEvent::ContentBlockStart { index, block_type });
130            }
131            AnthropicStreamEvent::ContentBlockDelta { index, delta } => {
132                let content_delta = match delta {
133                    AnthropicContentDelta::Text { text } => ContentDelta::Text(text),
134                    AnthropicContentDelta::Thinking { thinking } => {
135                        ContentDelta::Thinking(thinking)
136                    }
137                    AnthropicContentDelta::InputJson { partial_json } => {
138                        ContentDelta::ToolInput(partial_json)
139                    }
140                };
141                events.push(StreamEvent::ContentBlockDelta {
142                    index,
143                    delta: content_delta,
144                });
145            }
146            AnthropicStreamEvent::ContentBlockStop { index } => {
147                events.push(StreamEvent::ContentBlockStop { index });
148            }
149            AnthropicStreamEvent::MessageDelta { delta, usage } => {
150                self.stop_reason = delta.stop_reason;
151                self.usage = Some(StreamUsage {
152                    input_tokens: usage.input_tokens,
153                    output_tokens: usage.output_tokens,
154                });
155                events.push(StreamEvent::MessageDelta {
156                    stop_reason: self.stop_reason.clone(),
157                    usage: self.usage.clone().unwrap_or_default(),
158                });
159            }
160            AnthropicStreamEvent::MessageStop { .. } => {
161                events.push(StreamEvent::MessageStop);
162            }
163        }
164
165        events
166    }
167
168    /// 处理 OpenAI 事件
169    pub fn ingest_openai(&mut self, chunk: OpenAiStreamChunk) -> Vec<StreamEvent> {
170        let mut events = Vec::new();
171
172        if !self.message_started {
173            self.message_started = true;
174            events.push(StreamEvent::MessageStart {
175                id: chunk.id.clone(),
176                model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
177            });
178        }
179
180        if let Some(usage) = chunk.usage {
181            self.usage = Some(StreamUsage {
182                input_tokens: usage.prompt_tokens,
183                output_tokens: usage.completion_tokens,
184            });
185        }
186
187        for choice in chunk.choices {
188            // 处理 reasoning_content(思考内容)
189            if let Some(reasoning) = choice.delta.reasoning_content.filter(|v| !v.is_empty()) {
190                if !self.thinking_started {
191                    self.thinking_started = true;
192                    events.push(StreamEvent::ContentBlockStart {
193                        index: 0,
194                        block_type: ContentBlockType::Thinking,
195                    });
196                }
197                events.push(StreamEvent::ContentBlockDelta {
198                    index: 0,
199                    delta: ContentDelta::Thinking(reasoning),
200                });
201            }
202
203            // 处理常规内容
204            if let Some(content) = choice.delta.content.filter(|v| !v.is_empty()) {
205                // 如果之前有思考块,先关闭它
206                if self.thinking_started && !self.thinking_finished {
207                    self.thinking_finished = true;
208                    events.push(StreamEvent::ContentBlockStop { index: 0 });
209                }
210
211                let text_index = if self.thinking_started { 1 } else { 0 };
212                if !self.text_started {
213                    self.text_started = true;
214                    events.push(StreamEvent::ContentBlockStart {
215                        index: text_index,
216                        block_type: ContentBlockType::Text,
217                    });
218                }
219                events.push(StreamEvent::ContentBlockDelta {
220                    index: text_index,
221                    delta: ContentDelta::Text(content),
222                });
223            }
224
225            // 处理工具调用
226            for (i, tool_call) in choice.delta.tool_calls.into_iter().enumerate() {
227                let tool_index = (if self.thinking_started { 2 } else { 1 }) + i as u32;
228                if let Some(name) = tool_call.function.name {
229                    events.push(StreamEvent::ContentBlockStart {
230                        index: tool_index,
231                        block_type: ContentBlockType::ToolUse {
232                            id: tool_call.id.unwrap_or_default(),
233                            name,
234                        },
235                    });
236                }
237                if let Some(args) = tool_call.function.arguments {
238                    events.push(StreamEvent::ContentBlockDelta {
239                        index: tool_index,
240                        delta: ContentDelta::ToolInput(args),
241                    });
242                }
243            }
244
245            // 处理结束原因
246            if let Some(finish_reason) = choice.finish_reason {
247                self.stop_reason = Some(normalize_openai_finish_reason(&finish_reason));
248            }
249        }
250
251        events
252    }
253
254    /// 处理 Ollama 流式事件
255    pub fn ingest_ollama(&mut self, chunk: OllamaStreamChunk) -> Vec<StreamEvent> {
256        let mut events = Vec::new();
257
258        // 消息开始
259        if !self.message_started {
260            self.message_started = true;
261            events.push(StreamEvent::MessageStart {
262                id: "".to_string(),
263                model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
264            });
265        }
266
267        // 内容增量
268        if let Some(message) = &chunk.message {
269            if let Some(content) = &message.content {
270                if !content.is_empty() {
271                    if !self.text_started {
272                        self.text_started = true;
273                        events.push(StreamEvent::ContentBlockStart {
274                            index: 0,
275                            block_type: ContentBlockType::Text,
276                        });
277                    }
278                    events.push(StreamEvent::ContentBlockDelta {
279                        index: 0,
280                        delta: ContentDelta::Text(content.clone()),
281                    });
282                }
283            }
284        }
285
286        // 消息结束
287        if chunk.done {
288            // 更新用量
289            if chunk.prompt_eval_count.is_some() || chunk.eval_count.is_some() {
290                self.usage = Some(StreamUsage {
291                    input_tokens: chunk.prompt_eval_count.unwrap_or(0),
292                    output_tokens: chunk.eval_count.unwrap_or(0),
293                });
294            }
295
296            // 关闭文本块
297            if self.text_started && !self.text_finished {
298                self.text_finished = true;
299                events.push(StreamEvent::ContentBlockStop { index: 0 });
300            }
301
302            events.push(StreamEvent::MessageDelta {
303                stop_reason: Some("stop".to_string()),
304                usage: self.usage.clone().unwrap_or_default(),
305            });
306            events.push(StreamEvent::MessageStop);
307        }
308
309        events
310    }
311
312    /// 完成流处理
313    pub fn finish(&mut self) -> Vec<StreamEvent> {
314        if self.finished {
315            return Vec::new();
316        }
317        self.finished = true;
318
319        let mut events = Vec::new();
320
321        // 关闭思考块
322        if self.thinking_started && !self.thinking_finished {
323            self.thinking_finished = true;
324            events.push(StreamEvent::ContentBlockStop { index: 0 });
325        }
326
327        // 关闭文本块
328        if self.text_started && !self.text_finished {
329            self.text_finished = true;
330            let text_index = if self.thinking_started { 1 } else { 0 };
331            events.push(StreamEvent::ContentBlockStop { index: text_index });
332        }
333
334        // 发送消息增量
335        if self.message_started {
336            events.push(StreamEvent::MessageDelta {
337                stop_reason: self
338                    .stop_reason
339                    .clone()
340                    .or_else(|| Some("end_turn".to_string())),
341                usage: self.usage.clone().unwrap_or_default(),
342            });
343            events.push(StreamEvent::MessageStop);
344        }
345
346        events
347    }
348}
349
350fn normalize_openai_finish_reason(reason: &str) -> String {
351    match reason {
352        "stop" => "end_turn".to_string(),
353        "tool_calls" => "tool_use".to_string(),
354        other => other.to_string(),
355    }
356}
357
358// ============================================================================
359// Anthropic 流式格式
360// ============================================================================
361
362/// Anthropic 流式事件
363#[derive(Debug, Clone, Deserialize, Serialize)]
364#[serde(tag = "type", rename_all = "snake_case")]
365pub enum AnthropicStreamEvent {
366    /// 消息开始
367    MessageStart { message: AnthropicMessageStart },
368    /// 内容块开始
369    ContentBlockStart {
370        index: u32,
371        content_block: AnthropicContentBlock,
372    },
373    /// 内容块增量
374    ContentBlockDelta {
375        index: u32,
376        delta: AnthropicContentDelta,
377    },
378    /// 内容块结束
379    ContentBlockStop { index: u32 },
380    /// 消息增量
381    MessageDelta {
382        delta: AnthropicMessageDelta,
383        #[serde(default)]
384        usage: AnthropicStreamUsage,
385    },
386    /// 消息结束
387    MessageStop {},
388}
389
390/// Anthropic 消息开始事件
391#[derive(Debug, Clone, Deserialize, Serialize)]
392pub struct AnthropicMessageStart {
393    pub id: String,
394    #[serde(rename = "type")]
395    pub kind: String,
396    pub role: String,
397    pub model: String,
398    #[serde(default)]
399    pub content: Vec<AnthropicContentBlock>,
400    #[serde(default)]
401    pub stop_reason: Option<String>,
402    #[serde(default)]
403    pub stop_sequence: Option<String>,
404    #[serde(default)]
405    pub usage: AnthropicStreamUsage,
406}
407
408/// Anthropic 内容块
409#[derive(Debug, Clone, Deserialize, Serialize)]
410#[serde(tag = "type", rename_all = "snake_case")]
411pub enum AnthropicContentBlock {
412    Text {
413        text: String,
414    },
415    Thinking {
416        thinking: String,
417    },
418    ToolUse {
419        id: String,
420        name: String,
421        input: serde_json::Value,
422    },
423}
424
425/// Anthropic 内容增量
426#[derive(Debug, Clone, Deserialize, Serialize)]
427#[serde(tag = "type", rename_all = "snake_case")]
428pub enum AnthropicContentDelta {
429    #[serde(rename = "text_delta")]
430    Text { text: String },
431    #[serde(rename = "thinking_delta")]
432    Thinking { thinking: String },
433    #[serde(rename = "input_json_delta")]
434    InputJson { partial_json: String },
435}
436
437/// Anthropic 消息增量
438#[derive(Debug, Clone, Deserialize, Serialize)]
439pub struct AnthropicMessageDelta {
440    pub stop_reason: Option<String>,
441    pub stop_sequence: Option<String>,
442}
443
444/// Anthropic 流式用量
445#[derive(Debug, Clone, Default, Deserialize, Serialize)]
446pub struct AnthropicStreamUsage {
447    #[serde(default)]
448    pub input_tokens: u32,
449    #[serde(default)]
450    pub output_tokens: u32,
451}
452
453// ============================================================================
454// OpenAI 流式格式
455// ============================================================================
456
457/// OpenAI 流式事件
458#[derive(Debug, Clone, Deserialize)]
459pub struct OpenAiStreamChunk {
460    pub id: String,
461    #[serde(default)]
462    pub model: Option<String>,
463    #[serde(default)]
464    pub choices: Vec<OpenAiStreamChoice>,
465    #[serde(default)]
466    pub usage: Option<OpenAiStreamUsage>,
467}
468
469/// OpenAI 流式选择
470#[derive(Debug, Clone, Deserialize)]
471pub struct OpenAiStreamChoice {
472    pub delta: OpenAiStreamDelta,
473    #[serde(default)]
474    pub finish_reason: Option<String>,
475}
476
477/// OpenAI 流式增量
478#[derive(Debug, Default, Clone, Deserialize)]
479pub struct OpenAiStreamDelta {
480    #[serde(default)]
481    pub content: Option<String>,
482    #[serde(default)]
483    pub reasoning_content: Option<String>,
484    #[serde(default)]
485    pub tool_calls: Vec<OpenAiStreamToolCall>,
486}
487
488/// OpenAI 流式工具调用
489#[derive(Debug, Clone, Deserialize)]
490pub struct OpenAiStreamToolCall {
491    #[serde(default)]
492    pub index: u32,
493    #[serde(default)]
494    pub id: Option<String>,
495    #[serde(default)]
496    pub function: OpenAiStreamFunction,
497}
498
499/// OpenAI 流式函数
500#[derive(Debug, Default, Clone, Deserialize)]
501pub struct OpenAiStreamFunction {
502    #[serde(default)]
503    pub name: Option<String>,
504    #[serde(default)]
505    pub arguments: Option<String>,
506}
507
508/// OpenAI 流式用量
509#[derive(Debug, Clone, Deserialize)]
510pub struct OpenAiStreamUsage {
511    #[serde(default)]
512    pub prompt_tokens: u32,
513    #[serde(default)]
514    pub completion_tokens: u32,
515}
516
517// ============================================================================
518// Ollama 流式格式
519// ============================================================================
520
521/// Ollama 流式响应块
522#[derive(Debug, Clone, Deserialize)]
523pub struct OllamaStreamChunk {
524    #[serde(default)]
525    pub model: Option<String>,
526    #[serde(default)]
527    pub message: Option<OllamaStreamMessage>,
528    #[serde(default)]
529    pub done: bool,
530    #[serde(default)]
531    pub prompt_eval_count: Option<u32>,
532    #[serde(default)]
533    pub eval_count: Option<u32>,
534}
535
536/// Ollama 流式消息
537#[derive(Debug, Clone, Deserialize)]
538pub struct OllamaStreamMessage {
539    #[serde(default)]
540    pub role: Option<String>,
541    #[serde(default)]
542    pub content: Option<String>,
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548
549    #[test]
550    fn stream_state_handles_anthropic_events() {
551        let mut state = StreamState::new("claude-sonnet-4-6".to_string());
552
553        let start_event = AnthropicStreamEvent::MessageStart {
554            message: AnthropicMessageStart {
555                id: "msg_123".to_string(),
556                kind: "message".to_string(),
557                role: "assistant".to_string(),
558                model: "claude-sonnet-4-6".to_string(),
559                content: vec![],
560                stop_reason: None,
561                stop_sequence: None,
562                usage: AnthropicStreamUsage::default(),
563            },
564        };
565
566        let events = state.ingest_anthropic(start_event);
567        assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
568    }
569
570    #[test]
571    fn stream_state_handles_openai_events() {
572        let mut state = StreamState::new("gpt-4o".to_string());
573
574        let chunk = OpenAiStreamChunk {
575            id: "chatcmpl_123".to_string(),
576            model: Some("gpt-4o".to_string()),
577            choices: vec![OpenAiStreamChoice {
578                delta: OpenAiStreamDelta {
579                    content: Some("Hello".to_string()),
580                    ..Default::default()
581                },
582                finish_reason: None,
583            }],
584            usage: None,
585        };
586
587        let events = state.ingest_openai(chunk);
588        assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
589        assert!(matches!(events[1], StreamEvent::ContentBlockStart { .. }));
590    }
591}