llm_link/llm/
stream.rs

1use super::Client;
2use anyhow::{anyhow, Result};
3use llm_connector::{types::ChatRequest, StreamFormat};
4use tokio_stream::wrappers::UnboundedReceiverStream;
5
6impl Client {
7    /// Send a streaming chat request with specified format (Ollama-style response)
8    ///
9    /// This method returns streaming responses in Ollama API format, which is used by
10    /// Ollama-compatible clients like Zed.dev.
11    #[allow(dead_code)]
12    pub async fn chat_stream_with_format(
13        &self,
14        model: &str,
15        messages: Vec<llm_connector::types::Message>,
16        format: StreamFormat,
17    ) -> Result<UnboundedReceiverStream<String>> {
18        use futures_util::StreamExt;
19
20        // Messages are already in llm-connector format
21        let request = ChatRequest {
22            model: model.to_string(),
23            messages,
24            stream: Some(true),
25            ..Default::default()
26        };
27
28        tracing::info!("🔄 Requesting streaming from LLM connector...");
29
30        // Use real streaming API
31        let mut stream = self.llm_client.chat_stream(&request).await
32            .map_err(|e| anyhow!("LLM connector streaming error: {}", e))?;
33
34        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
35        let model_name = model.to_string();
36
37        tokio::spawn(async move {
38            tracing::info!("🔄 Starting to process stream chunks (Ollama format)...");
39            let mut chunk_count = 0;
40
41            while let Some(chunk) = stream.next().await {
42                tracing::debug!("📥 Received raw chunk from stream");
43
44                match chunk {
45                    Ok(stream_chunk) => {
46                        tracing::debug!("✅ Chunk OK, checking for content...");
47
48                        // Check for content
49                        if let Some(content) = stream_chunk.get_content() {
50                            if !content.is_empty() {
51                                chunk_count += 1;
52                                tracing::info!("📦 Received chunk #{}: '{}' ({} chars)", chunk_count, content, content.len());
53
54                                // Build Ollama-format streaming response
55                                let response_chunk = serde_json::json!({
56                                    "model": &model_name,
57                                    "created_at": chrono::Utc::now().to_rfc3339(),
58                                    "message": {
59                                        "role": "assistant",
60                                        "content": content,
61                                        "images": null
62                                    },
63                                    "done": false
64                                });
65
66                                let formatted_data = match format {
67                                    StreamFormat::SSE => format!("data: {}\n\n", response_chunk),
68                                    StreamFormat::NDJSON => format!("{}\n", response_chunk),
69                                    StreamFormat::Json => response_chunk.to_string(),
70                                };
71
72                                if tx.send(formatted_data).is_err() {
73                                    tracing::warn!("⚠️ Failed to send chunk to receiver (client disconnected?)");
74                                    break;
75                                }
76                                tracing::debug!("✅ Sent chunk #{} to client", chunk_count);
77                            }
78                        } else {
79                            tracing::debug!("⚠️ Chunk has no content (likely metadata or finish chunk)");
80                        }
81                    }
82                    Err(e) => {
83                        tracing::error!("❌ Stream error: {:?}", e);
84                        break;
85                    }
86                }
87            }
88
89            tracing::info!("✅ Stream processing completed. Total chunks: {}", chunk_count);
90
91            // Send final message
92            let final_chunk = serde_json::json!({
93                "model": model_name,
94                "created_at": chrono::Utc::now().to_rfc3339(),
95                "message": {
96                    "role": "assistant",
97                    "content": ""
98                },
99                "done": true
100            });
101
102            let formatted_final = match format {
103                StreamFormat::SSE => format!("data: {}\n\n", final_chunk),
104                StreamFormat::NDJSON => format!("{}\n", final_chunk),
105                StreamFormat::Json => final_chunk.to_string(),
106            };
107            let _ = tx.send(formatted_final);
108            tracing::info!("🏁 Sent final chunk");
109        });
110
111        Ok(UnboundedReceiverStream::new(rx))
112    }
113
114    /// Send a streaming chat request for OpenAI API (OpenAI-style response)
115    ///
116    /// This method returns streaming responses in OpenAI API format, which is used by
117    /// OpenAI-compatible clients like Codex CLI.
118    ///
119    /// Key feature: Automatically corrects finish_reason from "stop" to "tool_calls"
120    /// when tool_calls are detected in the stream.
121    #[allow(dead_code)]
122    pub async fn chat_stream_openai(
123        &self,
124        model: &str,
125        messages: Vec<llm_connector::types::Message>,
126        tools: Option<Vec<llm_connector::types::Tool>>,
127        format: StreamFormat,
128    ) -> Result<UnboundedReceiverStream<String>> {
129        use futures_util::StreamExt;
130
131        // Messages are already in llm-connector format
132        let request = ChatRequest {
133            model: model.to_string(),
134            messages,
135            stream: Some(true),
136            tools,
137            ..Default::default()
138        };
139
140        tracing::info!("🔄 Requesting streaming from LLM connector...");
141
142        // Use real streaming API
143        let mut stream = self.llm_client.chat_stream(&request).await
144            .map_err(|e| anyhow!("LLM connector streaming error: {}", e))?;
145
146        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
147        let model_name = model.to_string();
148
149        tokio::spawn(async move {
150            tracing::info!("🔄 Starting to process stream chunks (OpenAI format)...");
151            let mut chunk_count = 0;
152            let mut has_tool_calls = false;  // Track if tool_calls detected
153
154            while let Some(chunk) = stream.next().await {
155                tracing::debug!("📥 Received raw chunk from stream");
156
157                match chunk {
158                    Ok(stream_chunk) => {
159                        tracing::debug!("✅ Chunk OK, checking for content or tool_calls...");
160
161                        // Build delta object
162                        let mut delta = serde_json::json!({});
163                        let mut has_data = false;
164
165                        // Check for content
166                        if let Some(content) = stream_chunk.get_content() {
167                            if !content.is_empty() {
168                                delta["content"] = serde_json::json!(content);
169                                has_data = true;
170                                chunk_count += 1;
171                                tracing::info!("📦 Received chunk #{}: '{}' ({} chars)", chunk_count, content, content.len());
172                            }
173                        }
174
175                        // Check for tool_calls (extract from choices[0].delta.tool_calls)
176                        if let Some(first_choice) = stream_chunk.choices.get(0) {
177                            if let Some(tool_calls) = &first_choice.delta.tool_calls {
178                                if let Ok(tool_calls_value) = serde_json::to_value(tool_calls) {
179                                    delta["tool_calls"] = tool_calls_value;
180                                    has_data = true;
181                                    has_tool_calls = true;  // Mark tool_calls detected
182                                    chunk_count += 1;
183                                    tracing::info!("🔧 Received chunk #{} with tool_calls: {} calls", chunk_count, tool_calls.len());
184                                }
185                            }
186                        }
187
188                        if has_data {
189                            // Build OpenAI-standard streaming response format
190                            let openai_chunk = serde_json::json!({
191                                "id": "chatcmpl-123",
192                                "object": "chat.completion.chunk",
193                                "created": chrono::Utc::now().timestamp(),
194                                "model": &model_name,
195                                "choices": [{
196                                    "index": 0,
197                                    "delta": delta,
198                                    "finish_reason": null
199                                }]
200                            });
201
202                            let formatted_data = match format {
203                                StreamFormat::SSE => format!("data: {}\n\n", openai_chunk),
204                                StreamFormat::NDJSON => format!("{}\n", openai_chunk),
205                                StreamFormat::Json => openai_chunk.to_string(),
206                            };
207
208                            // Send all chunks immediately (preserve streaming experience)
209                            if tx.send(formatted_data).is_err() {
210                                tracing::warn!("⚠️ Failed to send chunk to receiver (client disconnected?)");
211                                break;
212                            }
213                            tracing::debug!("✅ Sent chunk #{} to client", chunk_count);
214                        } else {
215                            tracing::debug!("⚠️ Chunk has no content or tool_calls (likely metadata or finish chunk)");
216                        }
217                    }
218                    Err(e) => {
219                        tracing::error!("❌ Stream error: {:?}", e);
220                        break;
221                    }
222                }
223            }
224
225            tracing::info!("✅ Stream processing completed. Total chunks: {}", chunk_count);
226
227            // Send final message at stream end
228            // 🎯 Key fix: If tool_calls detected, finish_reason should be "tool_calls" not "stop"
229            let finish_reason = if has_tool_calls {
230                tracing::info!("🎯 Setting finish_reason to 'tool_calls' (detected tool_calls in stream)");
231                "tool_calls"
232            } else {
233                "stop"
234            };
235
236            let final_chunk = serde_json::json!({
237                "id": "chatcmpl-123",
238                "object": "chat.completion.chunk",
239                "created": chrono::Utc::now().timestamp(),
240                "model": model_name,
241                "choices": [{
242                    "index": 0,
243                    "delta": {},
244                    "finish_reason": finish_reason
245                }]
246            });
247
248            let formatted_final = match format {
249                StreamFormat::SSE => format!("data: {}\n\ndata: [DONE]\n\n", final_chunk),
250                StreamFormat::NDJSON => format!("{}\n", final_chunk),
251                StreamFormat::Json => final_chunk.to_string(),
252            };
253            let _ = tx.send(formatted_final);
254            tracing::info!("🏁 Sent final chunk and [DONE] marker");
255        });
256
257        Ok(UnboundedReceiverStream::new(rx))
258    }
259}
260