1use super::Client;
2use anyhow::{anyhow, Result};
3use llm_connector::{types::ChatRequest, StreamFormat};
4use tokio_stream::wrappers::UnboundedReceiverStream;
5
6impl Client {
7 #[allow(dead_code)]
12 pub async fn chat_stream_with_format(
13 &self,
14 model: &str,
15 messages: Vec<llm_connector::types::Message>,
16 format: StreamFormat,
17 ) -> Result<UnboundedReceiverStream<String>> {
18 use futures_util::StreamExt;
19
20 let request = ChatRequest {
22 model: model.to_string(),
23 messages,
24 stream: Some(true),
25 ..Default::default()
26 };
27
28 tracing::info!("🔄 Requesting streaming from LLM connector...");
29
30 let mut stream = self.llm_client.chat_stream(&request).await
32 .map_err(|e| anyhow!("LLM connector streaming error: {}", e))?;
33
34 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
35 let model_name = model.to_string();
36
37 tokio::spawn(async move {
38 tracing::info!("🔄 Starting to process stream chunks (Ollama format)...");
39 let mut chunk_count = 0;
40
41 while let Some(chunk) = stream.next().await {
42 tracing::debug!("📥 Received raw chunk from stream");
43
44 match chunk {
45 Ok(stream_chunk) => {
46 tracing::debug!("✅ Chunk OK, checking for content...");
47
48 if let Some(content) = stream_chunk.get_content() {
50 if !content.is_empty() {
51 chunk_count += 1;
52 tracing::info!("📦 Received chunk #{}: '{}' ({} chars)", chunk_count, content, content.len());
53
54 let response_chunk = serde_json::json!({
56 "model": &model_name,
57 "created_at": chrono::Utc::now().to_rfc3339(),
58 "message": {
59 "role": "assistant",
60 "content": content,
61 "images": null
62 },
63 "done": false
64 });
65
66 let formatted_data = match format {
67 StreamFormat::SSE => format!("data: {}\n\n", response_chunk),
68 StreamFormat::NDJSON => format!("{}\n", response_chunk),
69 StreamFormat::Json => response_chunk.to_string(),
70 };
71
72 if tx.send(formatted_data).is_err() {
73 tracing::warn!("⚠️ Failed to send chunk to receiver (client disconnected?)");
74 break;
75 }
76 tracing::debug!("✅ Sent chunk #{} to client", chunk_count);
77 }
78 } else {
79 tracing::debug!("⚠️ Chunk has no content (likely metadata or finish chunk)");
80 }
81 }
82 Err(e) => {
83 tracing::error!("❌ Stream error: {:?}", e);
84 break;
85 }
86 }
87 }
88
89 tracing::info!("✅ Stream processing completed. Total chunks: {}", chunk_count);
90
91 let final_chunk = serde_json::json!({
93 "model": model_name,
94 "created_at": chrono::Utc::now().to_rfc3339(),
95 "message": {
96 "role": "assistant",
97 "content": ""
98 },
99 "done": true
100 });
101
102 let formatted_final = match format {
103 StreamFormat::SSE => format!("data: {}\n\n", final_chunk),
104 StreamFormat::NDJSON => format!("{}\n", final_chunk),
105 StreamFormat::Json => final_chunk.to_string(),
106 };
107 let _ = tx.send(formatted_final);
108 tracing::info!("🏁 Sent final chunk");
109 });
110
111 Ok(UnboundedReceiverStream::new(rx))
112 }
113
114 #[allow(dead_code)]
122 pub async fn chat_stream_openai(
123 &self,
124 model: &str,
125 messages: Vec<llm_connector::types::Message>,
126 tools: Option<Vec<llm_connector::types::Tool>>,
127 format: StreamFormat,
128 ) -> Result<UnboundedReceiverStream<String>> {
129 use futures_util::StreamExt;
130
131 let request = ChatRequest {
133 model: model.to_string(),
134 messages,
135 stream: Some(true),
136 tools,
137 ..Default::default()
138 };
139
140 tracing::info!("🔄 Requesting streaming from LLM connector...");
141
142 let mut stream = self.llm_client.chat_stream(&request).await
144 .map_err(|e| anyhow!("LLM connector streaming error: {}", e))?;
145
146 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
147 let model_name = model.to_string();
148
149 tokio::spawn(async move {
150 tracing::info!("🔄 Starting to process stream chunks (OpenAI format)...");
151 let mut chunk_count = 0;
152 let mut has_tool_calls = false; while let Some(chunk) = stream.next().await {
155 tracing::debug!("📥 Received raw chunk from stream");
156
157 match chunk {
158 Ok(stream_chunk) => {
159 tracing::debug!("✅ Chunk OK, checking for content or tool_calls...");
160
161 let mut delta = serde_json::json!({});
163 let mut has_data = false;
164
165 if let Some(content) = stream_chunk.get_content() {
167 if !content.is_empty() {
168 delta["content"] = serde_json::json!(content);
169 has_data = true;
170 chunk_count += 1;
171 tracing::info!("📦 Received chunk #{}: '{}' ({} chars)", chunk_count, content, content.len());
172 }
173 }
174
175 if let Some(first_choice) = stream_chunk.choices.get(0) {
177 if let Some(tool_calls) = &first_choice.delta.tool_calls {
178 if let Ok(tool_calls_value) = serde_json::to_value(tool_calls) {
179 delta["tool_calls"] = tool_calls_value;
180 has_data = true;
181 has_tool_calls = true; chunk_count += 1;
183 tracing::info!("🔧 Received chunk #{} with tool_calls: {} calls", chunk_count, tool_calls.len());
184 }
185 }
186 }
187
188 if has_data {
189 let openai_chunk = serde_json::json!({
191 "id": "chatcmpl-123",
192 "object": "chat.completion.chunk",
193 "created": chrono::Utc::now().timestamp(),
194 "model": &model_name,
195 "choices": [{
196 "index": 0,
197 "delta": delta,
198 "finish_reason": null
199 }]
200 });
201
202 let formatted_data = match format {
203 StreamFormat::SSE => format!("data: {}\n\n", openai_chunk),
204 StreamFormat::NDJSON => format!("{}\n", openai_chunk),
205 StreamFormat::Json => openai_chunk.to_string(),
206 };
207
208 if tx.send(formatted_data).is_err() {
210 tracing::warn!("⚠️ Failed to send chunk to receiver (client disconnected?)");
211 break;
212 }
213 tracing::debug!("✅ Sent chunk #{} to client", chunk_count);
214 } else {
215 tracing::debug!("⚠️ Chunk has no content or tool_calls (likely metadata or finish chunk)");
216 }
217 }
218 Err(e) => {
219 tracing::error!("❌ Stream error: {:?}", e);
220 break;
221 }
222 }
223 }
224
225 tracing::info!("✅ Stream processing completed. Total chunks: {}", chunk_count);
226
227 let finish_reason = if has_tool_calls {
230 tracing::info!("🎯 Setting finish_reason to 'tool_calls' (detected tool_calls in stream)");
231 "tool_calls"
232 } else {
233 "stop"
234 };
235
236 let final_chunk = serde_json::json!({
237 "id": "chatcmpl-123",
238 "object": "chat.completion.chunk",
239 "created": chrono::Utc::now().timestamp(),
240 "model": model_name,
241 "choices": [{
242 "index": 0,
243 "delta": {},
244 "finish_reason": finish_reason
245 }]
246 });
247
248 let formatted_final = match format {
249 StreamFormat::SSE => format!("data: {}\n\ndata: [DONE]\n\n", final_chunk),
250 StreamFormat::NDJSON => format!("{}\n", final_chunk),
251 StreamFormat::Json => final_chunk.to_string(),
252 };
253 let _ = tx.send(formatted_final);
254 tracing::info!("🏁 Sent final chunk and [DONE] marker");
255 });
256
257 Ok(UnboundedReceiverStream::new(rx))
258 }
259}
260