offline_intelligence/worker_threads/
llm_worker.rs1use std::sync::{Arc, RwLock};
7use futures_util::StreamExt;
8use tracing::{info, debug, warn};
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 memory::Message,
13 model_runtime::RuntimeManager,
14};
15
16#[derive(Debug, Serialize)]
18struct ChatCompletionRequest {
19 model: String,
20 messages: Vec<ChatMessage>,
21 max_tokens: u32,
22 temperature: f32,
23 stream: bool,
24}
25
26#[derive(Debug, Serialize)]
28struct EmbeddingRequest {
29 model: String,
30 input: Vec<String>,
31}
32
33#[derive(Debug, Deserialize)]
35struct EmbeddingResponse {
36 data: Vec<EmbeddingData>,
37}
38
39#[derive(Debug, Deserialize)]
40struct EmbeddingData {
41 embedding: Vec<f32>,
42}
43
44#[derive(Debug, Serialize, Deserialize, Clone)]
45struct ChatMessage {
46 role: String,
47 content: String,
48}
49
50#[derive(Debug, Deserialize)]
52struct ChatCompletionResponse {
53 choices: Vec<ChatChoice>,
54}
55
56#[derive(Debug, Deserialize)]
57struct ChatChoice {
58 message: Option<ChatMessage>,
59}
60
61#[derive(Debug, Deserialize)]
63struct StreamChunk {
64 choices: Vec<StreamChoice>,
65}
66
67#[derive(Debug, Deserialize)]
68struct StreamChoice {
69 delta: Option<ChatDelta>,
70 finish_reason: Option<String>,
71}
72
73#[derive(Debug, Deserialize, Clone)]
74struct ChatDelta {
75 content: Option<String>,
76}
77
78pub struct LLMWorker {
79 backend_url: String,
80 http_client: reqwest::Client,
81 runtime_manager: RwLock<Option<Arc<RuntimeManager>>>,
82}
83
84impl LLMWorker {
85 pub fn new(shared_state: std::sync::Arc<crate::shared_state::SharedState>) -> Self {
87 let backend_url = shared_state.config.backend_url.clone();
88 Self {
89 backend_url,
90 http_client: reqwest::Client::builder()
91 .timeout(std::time::Duration::from_secs(600))
92 .build()
93 .unwrap_or_default(),
94 runtime_manager: RwLock::new(None),
95 }
96 }
97
98 pub fn new_with_backend(backend_url: String) -> Self {
100 info!("LLM worker initialized with backend: {}", backend_url);
101 Self {
102 backend_url,
103 http_client: reqwest::Client::builder()
104 .timeout(std::time::Duration::from_secs(600))
105 .build()
106 .unwrap_or_default(),
107 runtime_manager: RwLock::new(None),
108 }
109 }
110
111 pub fn set_runtime_manager(&self, runtime_manager: Arc<RuntimeManager>) {
113 if let Ok(mut guard) = self.runtime_manager.write() {
114 *guard = Some(runtime_manager);
115 info!("✅ Runtime manager linked to LLM worker");
116 } else {
117 warn!("⚠️ Failed to acquire lock to set runtime manager on LLM worker");
118 }
119 }
120
121 fn get_runtime_manager(&self) -> Option<Arc<RuntimeManager>> {
123 self.runtime_manager.read().ok().and_then(|guard| (*guard).clone())
124 }
125
126 pub async fn is_runtime_ready(&self) -> bool {
128 let has_runtime_manager = self.runtime_manager.read().is_ok();
129 let result = if let Some(ref rm) = self.get_runtime_manager() {
130 let ready = rm.is_ready().await;
131 info!("LLMWorker is_ready: runtime_manager exists={}, is_ready={}", has_runtime_manager, ready);
132 ready
133 } else {
134 info!("LLMWorker is_ready: no runtime_manager set");
135 false
136 };
137 result
138 }
139
140 fn to_chat_messages(messages: &[Message]) -> Vec<ChatMessage> {
145 messages.iter().map(|m| ChatMessage {
146 role: m.role.clone(),
147 content: m.content.clone(),
148 }).collect()
149 }
150
151 pub async fn generate_response(
153 &self,
154 _session_id: String,
155 context: Vec<Message>,
156 ) -> anyhow::Result<String> {
157 debug!("LLM worker generating response (non-streaming)");
158
159 let request = ChatCompletionRequest {
160 model: "local-llm".to_string(),
161 messages: Self::to_chat_messages(&context),
162 max_tokens: 2000,
163 temperature: 0.7,
164 stream: false,
165 };
166
167 let url = if let Some(ref rm) = self.get_runtime_manager() {
169 if rm.is_ready().await {
171 if let Some(base_url) = rm.get_base_url().await {
172 format!("{}/v1/chat/completions", base_url)
173 } else {
174 return Err(anyhow::anyhow!(
176 "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
177 ));
178 }
179 } else {
180 return Err(anyhow::anyhow!(
182 "Model engine is not ready yet. Please load a model from the Models panel first."
183 ));
184 }
185 } else {
186 return Err(anyhow::anyhow!(
188 "No model loaded. Please download an engine and load a model from the Models panel."
189 ));
190 };
191
192 let response = self.http_client
193 .post(&url)
194 .json(&request)
195 .send()
196 .await
197 .map_err(|e| {
198 if e.is_connect() {
199 anyhow::anyhow!(
200 "Cannot connect to local LLM server. Please download and load a model from the Models panel."
201 )
202 } else {
203 anyhow::anyhow!("LLM backend request failed: {}", e)
204 }
205 })?;
206
207 if !response.status().is_success() {
208 let status = response.status();
209 let body = response.text().await.unwrap_or_default();
210 return Err(anyhow::anyhow!("LLM backend returned {}: {}", status, body));
211 }
212
213 let completion: ChatCompletionResponse = response.json().await
214 .map_err(|e| anyhow::anyhow!("Failed to parse LLM response: {}", e))?;
215
216 let content = completion.choices
217 .first()
218 .and_then(|c| c.message.as_ref())
219 .map(|m| m.content.clone())
220 .unwrap_or_default();
221
222 Ok(content)
223 }
224
225 pub async fn stream_response(
228 &self,
229 messages: Vec<Message>,
230 max_tokens: u32,
231 temperature: f32,
232 ) -> anyhow::Result<impl futures_util::Stream<Item = Result<String, anyhow::Error>>> {
233 debug!("LLM worker starting streaming response");
234
235 let request = ChatCompletionRequest {
236 model: "local-llm".to_string(),
237 messages: Self::to_chat_messages(&messages),
238 max_tokens,
239 temperature,
240 stream: true,
241 };
242
243 let url = if let Some(ref rm) = self.get_runtime_manager() {
245 if rm.is_ready().await {
247 if let Some(base_url) = rm.get_base_url().await {
248 format!("{}/v1/chat/completions", base_url)
249 } else {
250 return Err(anyhow::anyhow!(
252 "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
253 ));
254 }
255 } else {
256 return Err(anyhow::anyhow!(
258 "Model engine is not ready yet. Please load a model from the Models panel first."
259 ));
260 }
261 } else {
262 return Err(anyhow::anyhow!(
264 "No model loaded. Please download an engine and load a model from the Models panel."
265 ));
266 };
267
268 let response = self.http_client
269 .post(&url)
270 .json(&request)
271 .send()
272 .await
273 .map_err(|e| {
274 if e.is_connect() {
275 anyhow::anyhow!(
276 "Cannot connect to local LLM server. Please download and load a model from the Models panel."
277 )
278 } else {
279 anyhow::anyhow!("LLM backend request failed: {}", e)
280 }
281 })?;
282
283 if !response.status().is_success() {
284 let status = response.status();
285 let body = response.text().await.unwrap_or_default();
286 return Err(anyhow::anyhow!("LLM backend returned {}: {}", status, body));
287 }
288
289 let byte_stream = response.bytes_stream();
290
291 let sse_stream = async_stream::try_stream! {
292 let mut buffer = String::new();
293
294 futures_util::pin_mut!(byte_stream);
295
296 while let Some(chunk_result) = byte_stream.next().await {
297 let chunk = chunk_result
298 .map_err(|e| anyhow::anyhow!("Stream read error: {}", e))?;
299
300 buffer.push_str(&String::from_utf8_lossy(&chunk));
301
302 while let Some(newline_pos) = buffer.find('\n') {
303 let line = buffer[..newline_pos].trim().to_string();
304 buffer = buffer[newline_pos + 1..].to_string();
305
306 if line.is_empty() {
307 continue;
308 }
309
310 if line.starts_with("data: ") {
311 let data = &line[6..];
312
313 if data == "[DONE]" {
314 yield "data: [DONE]\n\n".to_string();
315 return;
316 }
317
318 match serde_json::from_str::<StreamChunk>(data) {
319 Ok(chunk) => {
320 let finished = chunk.choices.iter()
321 .any(|c| c.finish_reason.is_some());
322
323 yield format!("data: {}\n\n", data);
324
325 if finished {
326 yield "data: [DONE]\n\n".to_string();
327 return;
328 }
329 }
330 Err(_) => {
331 yield format!("data: {}\n\n", data);
332 }
333 }
334 }
335 }
336 }
337
338 yield "data: [DONE]\n\n".to_string();
339 };
340
341 Ok(sse_stream)
342 }
343
344 pub async fn batch_process(
346 &self,
347 prompts: Vec<(String, Vec<Message>)>,
348 ) -> anyhow::Result<Vec<String>> {
349 debug!("LLM worker batch processing {} prompts", prompts.len());
350
351 let mut responses = Vec::new();
352 for (session_id, messages) in prompts {
353 match self.generate_response(session_id.clone(), messages).await {
354 Ok(response) => responses.push(response),
355 Err(e) => {
356 warn!("Batch item {} failed: {}", session_id, e);
357 responses.push(format!("Error: {}", e));
358 }
359 }
360 }
361
362 info!("Batch processed {} prompts", responses.len());
363 Ok(responses)
364 }
365
366 pub async fn initialize_model(&self, model_path: &str) -> anyhow::Result<()> {
368 debug!("LLM worker model init (HTTP proxy mode): {}", model_path);
369 Ok(())
370 }
371
372 pub async fn generate_embeddings(
376 &self,
377 texts: Vec<String>,
378 ) -> anyhow::Result<Vec<Vec<f32>>> {
379 if texts.is_empty() {
380 return Ok(Vec::new());
381 }
382
383 debug!("Generating embeddings for {} text(s) via llama-server", texts.len());
384
385 let request = EmbeddingRequest {
386 model: "local-llm".to_string(),
387 input: texts,
388 };
389
390 let url = if let Some(ref rm) = self.get_runtime_manager() {
392 if rm.is_ready().await {
394 if let Some(base_url) = rm.get_base_url().await {
395 format!("{}/v1/embeddings", base_url)
396 } else {
397 return Err(anyhow::anyhow!(
399 "Model engine is initializing. Please wait a moment and try again."
400 ));
401 }
402 } else {
403 return Err(anyhow::anyhow!(
405 "Model engine is not ready yet. Please load a model from the Models panel first."
406 ));
407 }
408 } else {
409 return Err(anyhow::anyhow!(
411 "No model loaded. Please download an engine and load a model from the Models panel."
412 ));
413 };
414
415 let response = self.http_client
416 .post(&url)
417 .json(&request)
418 .send()
419 .await
420 .map_err(|e| anyhow::anyhow!("Embedding request failed: {}", e))?;
421
422 if !response.status().is_success() {
423 let status = response.status();
424 let body = response.text().await.unwrap_or_default();
425 return Err(anyhow::anyhow!("Embedding endpoint returned {}: {}", status, body));
426 }
427
428 let embedding_response: EmbeddingResponse = response.json().await
429 .map_err(|e| anyhow::anyhow!("Failed to parse embedding response: {}", e))?;
430
431 let embeddings: Vec<Vec<f32>> = embedding_response.data
432 .into_iter()
433 .map(|d| d.embedding)
434 .collect();
435
436 debug!("Generated {} embeddings (dim={})",
437 embeddings.len(),
438 embeddings.first().map(|e| e.len()).unwrap_or(0));
439
440 Ok(embeddings)
441 }
442
443 pub async fn generate_title(
445 &self,
446 prompt: &str,
447 max_tokens: u32,
448 ) -> anyhow::Result<String> {
449 debug!("LLM worker generating title for prompt ({} chars)", prompt.len());
450
451 let messages = vec![Message {
452 role: "user".to_string(),
453 content: prompt.to_string(),
454 }];
455
456 let request = ChatCompletionRequest {
457 model: "local-llm".to_string(),
458 messages: Self::to_chat_messages(&messages),
459 max_tokens: max_tokens.min(20),
460 temperature: 0.3,
461 stream: false,
462 };
463
464 let url = if let Some(ref rm) = self.get_runtime_manager() {
466 if rm.is_ready().await {
468 if let Some(base_url) = rm.get_base_url().await {
469 format!("{}/v1/chat/completions", base_url)
470 } else {
471 return Err(anyhow::anyhow!(
473 "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
474 ));
475 }
476 } else {
477 return Err(anyhow::anyhow!(
479 "Model engine is not ready yet. Please load a model from the Models panel first."
480 ));
481 }
482 } else {
483 return Err(anyhow::anyhow!(
485 "No model loaded. Please download an engine and load a model from the Models panel."
486 ));
487 };
488
489 let response = self.http_client
490 .post(&url)
491 .json(&request)
492 .send()
493 .await
494 .map_err(|e| anyhow::anyhow!("Title generation request failed: {}", e))?;
495
496 if !response.status().is_success() {
497 let status = response.status();
498 let body = response.text().await.unwrap_or_default();
499 return Err(anyhow::anyhow!("Title generation failed ({}): {}", status, body));
500 }
501
502 let completion: ChatCompletionResponse = response.json().await
503 .map_err(|e| anyhow::anyhow!("Failed to parse title response: {}", e))?;
504
505 let title = completion.choices
506 .first()
507 .and_then(|c| c.message.as_ref())
508 .map(|m| m.content.trim().to_string())
509 .unwrap_or_else(|| "New Chat".to_string());
510
511 let title = title.trim_matches('"').trim_matches('\'').to_string();
512
513 info!("Generated title: '{}'", title);
514 Ok(title)
515 }
516}