praxis_llm/
streaming.rs

1use anyhow::Result;
2use futures::Stream;
3use reqwest::Response;
4use serde::{Deserialize, Serialize};
5use std::pin::Pin;
6
7use crate::buffer_utils::{SseLineParser, parse_sse_stream};
8
9pub use crate::buffer_utils::{CircularLineBuffer, EventBatcher};
10
11use crate::openai::ResponseStreamChunk;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(tag = "type", rename_all = "snake_case")]
15pub enum StreamEvent {
16    Reasoning {
17        content: String,
18    },
19    
20    Message {
21        content: String,
22    },
23    
24    ToolCall {
25        index: u32,
26        #[serde(skip_serializing_if = "Option::is_none")]
27        id: Option<String>,
28        #[serde(skip_serializing_if = "Option::is_none")]
29        name: Option<String>,
30        #[serde(skip_serializing_if = "Option::is_none")]
31        arguments: Option<String>,
32    },
33    
34    Done {
35        #[serde(skip_serializing_if = "Option::is_none")]
36        finish_reason: Option<String>,
37    },
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ChatStreamChunk {
42    pub id: String,
43    pub object: String,
44    pub created: i64,
45    pub model: String,
46    pub choices: Vec<StreamChoice>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct StreamChoice {
51    pub index: u32,
52    pub delta: Delta,
53    pub finish_reason: Option<String>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct Delta {
58    pub role: Option<String>,
59    pub content: Option<String>,
60    pub tool_calls: Option<Vec<ToolCallDelta>>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ToolCallDelta {
65    pub index: u32,
66    pub id: Option<String>,
67    #[serde(rename = "type")]
68    pub tool_type: Option<String>,
69    pub function: Option<FunctionDelta>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct FunctionDelta {
74    pub name: Option<String>,
75    pub arguments: Option<String>,
76}
77
78impl ChatStreamChunk {
79    pub fn content(&self) -> Option<&str> {
80        self.choices
81            .first()
82            .and_then(|c| c.delta.content.as_deref())
83    }
84    
85    pub fn is_done(&self) -> bool {
86        self.choices
87            .first()
88            .and_then(|c| c.finish_reason.as_ref())
89            .is_some()
90    }
91    
92    fn to_stream_events(&self) -> Vec<StreamEvent> {
93        let mut events = Vec::new();
94        
95        if let Some(choice) = self.choices.first() {
96            if let Some(content) = &choice.delta.content {
97                if !content.is_empty() {
98                    events.push(StreamEvent::Message {
99                        content: content.clone(),
100                    });
101                }
102            }
103            
104            if let Some(tool_calls) = &choice.delta.tool_calls {
105                for tc in tool_calls {
106                    events.push(StreamEvent::ToolCall {
107                        index: tc.index,
108                        id: tc.id.clone(),
109                        name: tc.function.as_ref().and_then(|f| f.name.clone()),
110                        arguments: tc.function.as_ref().and_then(|f| f.arguments.clone()),
111                    });
112                }
113            }
114            
115            if let Some(finish_reason) = &choice.finish_reason {
116                events.push(StreamEvent::Done {
117                    finish_reason: Some(finish_reason.clone()),
118                });
119            }
120        }
121        
122        events
123    }
124}
125
126/// Chat SSE parser (Strategy Pattern)
127struct ChatSseParser;
128
129impl SseLineParser for ChatSseParser {
130    fn parse_data_line(&self, data: &str) -> Result<Vec<StreamEvent>> {
131        let chunk: ChatStreamChunk = serde_json::from_str(data)
132            .map_err(|e| anyhow::anyhow!("Failed to parse chat chunk: {}", e))?;
133        
134        Ok(chunk.to_stream_events())
135    }
136}
137
138/// Response SSE parser (Strategy Pattern)
139struct ResponseSseParser;
140
141impl SseLineParser for ResponseSseParser {
142    fn parse_data_line(&self, data: &str) -> Result<Vec<StreamEvent>> {
143        let chunk: ResponseStreamChunk = serde_json::from_str(data)
144            .map_err(|e| anyhow::anyhow!("Failed to parse response chunk: {}", e))?;
145        
146        let mut events = Vec::new();
147        
148        if chunk.is_done() {
149            events.push(StreamEvent::Done {
150                finish_reason: chunk.status.clone(),
151            });
152            return Ok(events);
153        }
154        
155        let is_reasoning = chunk.output_index.map(|idx| idx == 0).unwrap_or(false);
156        
157        // Debug: log what we're receiving
158        tracing::debug!(
159            "ResponseStreamChunk - output_index: {:?}, is_reasoning: {}, delta: {:?}",
160            chunk.output_index,
161            is_reasoning,
162            chunk.delta
163        );
164        
165        if is_reasoning {
166            if let Some(text) = chunk.reasoning_text() {
167                if !text.is_empty() {
168                    tracing::debug!("Emitting Reasoning event with {} chars", text.len());
169                    events.push(StreamEvent::Reasoning { content: text });
170                }
171            }
172        } else {
173            if let Some(text) = chunk.message_text() {
174                if !text.is_empty() {
175                    tracing::debug!("Emitting Message event with {} chars", text.len());
176                    events.push(StreamEvent::Message { content: text });
177                }
178            }
179        }
180        
181        Ok(events)
182    }
183}
184
185pub fn parse_chat_sse_stream(
186    response: Response,
187) -> Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>> {
188    parse_sse_stream(response, ChatSseParser)
189}
190
191pub fn parse_response_sse_stream(
192    response: Response,
193) -> Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>> {
194    parse_sse_stream(response, ResponseSseParser)
195}
196
197pub use ChatStreamChunk as StreamChunk;
198
199/// Default SSE parser (uses chat parser for backwards compatibility)
200pub fn parse_sse_stream_legacy(response: Response) -> Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>> {
201    parse_chat_sse_stream(response)
202}
203