offline_intelligence/worker_threads/
llm_worker.rs1use std::sync::{Arc, RwLock};
7use futures_util::StreamExt;
8use tracing::{info, debug, warn};
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 memory::Message,
13 model_runtime::RuntimeManager,
14};
15
16#[derive(Debug, Serialize)]
18struct ChatCompletionRequest {
19 model: String,
20 messages: Vec<ChatMessage>,
21 max_tokens: u32,
22 temperature: f32,
23 stream: bool,
24 cache_prompt: bool,
27}
28
29#[derive(Debug, Serialize)]
31struct EmbeddingRequest {
32 model: String,
33 input: Vec<String>,
34}
35
36#[derive(Debug, Deserialize)]
38struct EmbeddingResponse {
39 data: Vec<EmbeddingData>,
40}
41
42#[derive(Debug, Deserialize)]
43struct EmbeddingData {
44 embedding: Vec<f32>,
45}
46
47#[derive(Debug, Serialize, Deserialize, Clone)]
48struct ChatMessage {
49 role: String,
50 content: String,
51}
52
53#[derive(Debug, Deserialize)]
55struct ChatCompletionResponse {
56 choices: Vec<ChatChoice>,
57}
58
59#[derive(Debug, Deserialize)]
60struct ChatChoice {
61 message: Option<ChatMessage>,
62}
63
64#[derive(Debug, Deserialize)]
66struct StreamChunk {
67 choices: Vec<StreamChoice>,
68}
69
70#[derive(Debug, Deserialize)]
71struct StreamChoice {
72 delta: Option<ChatDelta>,
73 finish_reason: Option<String>,
74}
75
76#[derive(Debug, Deserialize, Clone)]
77struct ChatDelta {
78 content: Option<String>,
79}
80
81pub struct LLMWorker {
82 backend_url: String,
83 http_client: reqwest::Client,
84 runtime_manager: RwLock<Option<Arc<RuntimeManager>>>,
85}
86
87impl LLMWorker {
88 pub fn new(shared_state: std::sync::Arc<crate::shared_state::SharedState>) -> Self {
90 let backend_url = shared_state.config.backend_url.clone();
91 Self {
92 backend_url,
93 http_client: reqwest::Client::builder()
94 .timeout(std::time::Duration::from_secs(600))
95 .build()
96 .unwrap_or_default(),
97 runtime_manager: RwLock::new(None),
98 }
99 }
100
101 pub fn new_with_backend(backend_url: String) -> Self {
103 info!("LLM worker initialized with backend: {}", backend_url);
104 Self {
105 backend_url,
106 http_client: reqwest::Client::builder()
107 .timeout(std::time::Duration::from_secs(600))
108 .build()
109 .unwrap_or_default(),
110 runtime_manager: RwLock::new(None),
111 }
112 }
113
114 pub fn set_runtime_manager(&self, runtime_manager: Arc<RuntimeManager>) {
116 if let Ok(mut guard) = self.runtime_manager.write() {
117 *guard = Some(runtime_manager);
118 info!("✅ Runtime manager linked to LLM worker");
119 } else {
120 warn!("⚠️ Failed to acquire lock to set runtime manager on LLM worker");
121 }
122 }
123
124 fn get_runtime_manager(&self) -> Option<Arc<RuntimeManager>> {
126 self.runtime_manager.read().ok().and_then(|guard| (*guard).clone())
127 }
128
129 pub async fn is_runtime_ready(&self) -> bool {
131 let has_runtime_manager = self.runtime_manager.read().is_ok();
132 let result = if let Some(ref rm) = self.get_runtime_manager() {
133 let ready = rm.is_ready().await;
134 info!("LLMWorker is_ready: runtime_manager exists={}, is_ready={}", has_runtime_manager, ready);
135 ready
136 } else {
137 info!("LLMWorker is_ready: no runtime_manager set");
138 false
139 };
140 result
141 }
142
143 fn to_chat_messages(messages: &[Message]) -> Vec<ChatMessage> {
148 messages.iter().map(|m| ChatMessage {
149 role: m.role.clone(),
150 content: m.content.clone(),
151 }).collect()
152 }
153
154 pub async fn generate_response(
156 &self,
157 _session_id: String,
158 context: Vec<Message>,
159 ) -> anyhow::Result<String> {
160 debug!("LLM worker generating response (non-streaming)");
161
162 let request = ChatCompletionRequest {
163 model: "local-llm".to_string(),
164 messages: Self::to_chat_messages(&context),
165 max_tokens: 2000,
166 temperature: 0.7,
167 stream: false,
168 cache_prompt: true,
169 };
170
171 let url = if let Some(ref rm) = self.get_runtime_manager() {
173 if rm.is_ready().await {
175 if let Some(base_url) = rm.get_base_url().await {
176 format!("{}/v1/chat/completions", base_url)
177 } else {
178 return Err(anyhow::anyhow!(
180 "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
181 ));
182 }
183 } else {
184 return Err(anyhow::anyhow!(
186 "Model engine is not ready yet. Please load a model from the Models panel first."
187 ));
188 }
189 } else {
190 return Err(anyhow::anyhow!(
192 "No model loaded. Please download an engine and load a model from the Models panel."
193 ));
194 };
195
196 let response = self.http_client
197 .post(&url)
198 .json(&request)
199 .send()
200 .await
201 .map_err(|e| {
202 if e.is_connect() {
203 anyhow::anyhow!(
204 "Cannot connect to local LLM server. Please download and load a model from the Models panel."
205 )
206 } else {
207 anyhow::anyhow!("LLM backend request failed: {}", e)
208 }
209 })?;
210
211 if !response.status().is_success() {
212 let status = response.status();
213 let body = response.text().await.unwrap_or_default();
214 return Err(anyhow::anyhow!("LLM backend returned {}: {}", status, body));
215 }
216
217 let completion: ChatCompletionResponse = response.json().await
218 .map_err(|e| anyhow::anyhow!("Failed to parse LLM response: {}", e))?;
219
220 let content = completion.choices
221 .first()
222 .and_then(|c| c.message.as_ref())
223 .map(|m| m.content.clone())
224 .unwrap_or_default();
225
226 Ok(content)
227 }
228
229 pub async fn stream_response(
232 &self,
233 messages: Vec<Message>,
234 max_tokens: u32,
235 temperature: f32,
236 ) -> anyhow::Result<impl futures_util::Stream<Item = Result<String, anyhow::Error>>> {
237 debug!("LLM worker starting streaming response");
238
239 let request = ChatCompletionRequest {
240 model: "local-llm".to_string(),
241 messages: Self::to_chat_messages(&messages),
242 max_tokens,
243 temperature,
244 stream: true,
245 cache_prompt: true,
246 };
247
248 let url = if let Some(ref rm) = self.get_runtime_manager() {
250 if rm.is_ready().await {
252 if let Some(base_url) = rm.get_base_url().await {
253 format!("{}/v1/chat/completions", base_url)
254 } else {
255 return Err(anyhow::anyhow!(
257 "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
258 ));
259 }
260 } else {
261 return Err(anyhow::anyhow!(
263 "Model engine is not ready yet. Please load a model from the Models panel first."
264 ));
265 }
266 } else {
267 return Err(anyhow::anyhow!(
269 "No model loaded. Please download an engine and load a model from the Models panel."
270 ));
271 };
272
273 let response = self.http_client
274 .post(&url)
275 .json(&request)
276 .send()
277 .await
278 .map_err(|e| {
279 if e.is_connect() {
280 anyhow::anyhow!(
281 "Cannot connect to local LLM server. Please download and load a model from the Models panel."
282 )
283 } else {
284 anyhow::anyhow!("LLM backend request failed: {}", e)
285 }
286 })?;
287
288 if !response.status().is_success() {
289 let status = response.status();
290 let body = response.text().await.unwrap_or_default();
291 return Err(anyhow::anyhow!("LLM backend returned {}: {}", status, body));
292 }
293
294 let byte_stream = response.bytes_stream();
295
296 let sse_stream = async_stream::try_stream! {
297 let mut buffer = String::new();
298
299 futures_util::pin_mut!(byte_stream);
300
301 while let Some(chunk_result) = byte_stream.next().await {
302 let chunk = chunk_result
303 .map_err(|e| anyhow::anyhow!("Stream read error: {}", e))?;
304
305 buffer.push_str(&String::from_utf8_lossy(&chunk));
306
307 while let Some(newline_pos) = buffer.find('\n') {
308 let line = buffer[..newline_pos].trim().to_string();
309 buffer = buffer[newline_pos + 1..].to_string();
310
311 if line.is_empty() {
312 continue;
313 }
314
315 if line.starts_with("data: ") {
316 let data = &line[6..];
317
318 if data == "[DONE]" {
319 yield "data: [DONE]\n\n".to_string();
320 return;
321 }
322
323 match serde_json::from_str::<StreamChunk>(data) {
324 Ok(chunk) => {
325 let finished = chunk.choices.iter()
326 .any(|c| c.finish_reason.is_some());
327
328 yield format!("data: {}\n\n", data);
329
330 if finished {
331 yield "data: [DONE]\n\n".to_string();
332 return;
333 }
334 }
335 Err(_) => {
336 yield format!("data: {}\n\n", data);
337 }
338 }
339 }
340 }
341 }
342
343 yield "data: [DONE]\n\n".to_string();
344 };
345
346 Ok(sse_stream)
347 }
348
349 pub async fn batch_process(
351 &self,
352 prompts: Vec<(String, Vec<Message>)>,
353 ) -> anyhow::Result<Vec<String>> {
354 debug!("LLM worker batch processing {} prompts", prompts.len());
355
356 let mut responses = Vec::new();
357 for (session_id, messages) in prompts {
358 match self.generate_response(session_id.clone(), messages).await {
359 Ok(response) => responses.push(response),
360 Err(e) => {
361 warn!("Batch item {} failed: {}", session_id, e);
362 responses.push(format!("Error: {}", e));
363 }
364 }
365 }
366
367 info!("Batch processed {} prompts", responses.len());
368 Ok(responses)
369 }
370
371 pub async fn initialize_model(&self, model_path: &str) -> anyhow::Result<()> {
373 debug!("LLM worker model init (HTTP proxy mode): {}", model_path);
374 Ok(())
375 }
376
377 pub async fn generate_embeddings(
381 &self,
382 texts: Vec<String>,
383 ) -> anyhow::Result<Vec<Vec<f32>>> {
384 if texts.is_empty() {
385 return Ok(Vec::new());
386 }
387
388 debug!("Generating embeddings for {} text(s) via llama-server", texts.len());
389
390 let request = EmbeddingRequest {
391 model: "local-llm".to_string(),
392 input: texts,
393 };
394
395 let url = if let Some(ref rm) = self.get_runtime_manager() {
397 if rm.is_ready().await {
399 if let Some(base_url) = rm.get_base_url().await {
400 format!("{}/v1/embeddings", base_url)
401 } else {
402 return Err(anyhow::anyhow!(
404 "Model engine is initializing. Please wait a moment and try again."
405 ));
406 }
407 } else {
408 return Err(anyhow::anyhow!(
410 "Model engine is not ready yet. Please load a model from the Models panel first."
411 ));
412 }
413 } else {
414 return Err(anyhow::anyhow!(
416 "No model loaded. Please download an engine and load a model from the Models panel."
417 ));
418 };
419
420 let response = self.http_client
421 .post(&url)
422 .json(&request)
423 .send()
424 .await
425 .map_err(|e| anyhow::anyhow!("Embedding request failed: {}", e))?;
426
427 if !response.status().is_success() {
428 let status = response.status();
429 let body = response.text().await.unwrap_or_default();
430 return Err(anyhow::anyhow!("Embedding endpoint returned {}: {}", status, body));
431 }
432
433 let embedding_response: EmbeddingResponse = response.json().await
434 .map_err(|e| anyhow::anyhow!("Failed to parse embedding response: {}", e))?;
435
436 let embeddings: Vec<Vec<f32>> = embedding_response.data
437 .into_iter()
438 .map(|d| d.embedding)
439 .collect();
440
441 debug!("Generated {} embeddings (dim={})",
442 embeddings.len(),
443 embeddings.first().map(|e| e.len()).unwrap_or(0));
444
445 Ok(embeddings)
446 }
447
448 pub async fn generate_title(
450 &self,
451 prompt: &str,
452 max_tokens: u32,
453 ) -> anyhow::Result<String> {
454 debug!("LLM worker generating title for prompt ({} chars)", prompt.len());
455
456 let messages = vec![Message {
457 role: "user".to_string(),
458 content: prompt.to_string(),
459 }];
460
461 let request = ChatCompletionRequest {
462 model: "local-llm".to_string(),
463 messages: Self::to_chat_messages(&messages),
464 max_tokens: max_tokens.min(20),
465 temperature: 0.3,
466 stream: false,
467 cache_prompt: true,
468 };
469
470 let url = if let Some(ref rm) = self.get_runtime_manager() {
472 if rm.is_ready().await {
474 if let Some(base_url) = rm.get_base_url().await {
475 format!("{}/v1/chat/completions", base_url)
476 } else {
477 return Err(anyhow::anyhow!(
479 "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
480 ));
481 }
482 } else {
483 return Err(anyhow::anyhow!(
485 "Model engine is not ready yet. Please load a model from the Models panel first."
486 ));
487 }
488 } else {
489 return Err(anyhow::anyhow!(
491 "No model loaded. Please download an engine and load a model from the Models panel."
492 ));
493 };
494
495 let response = self.http_client
496 .post(&url)
497 .json(&request)
498 .send()
499 .await
500 .map_err(|e| anyhow::anyhow!("Title generation request failed: {}", e))?;
501
502 if !response.status().is_success() {
503 let status = response.status();
504 let body = response.text().await.unwrap_or_default();
505 return Err(anyhow::anyhow!("Title generation failed ({}): {}", status, body));
506 }
507
508 let completion: ChatCompletionResponse = response.json().await
509 .map_err(|e| anyhow::anyhow!("Failed to parse title response: {}", e))?;
510
511 let title = completion.choices
512 .first()
513 .and_then(|c| c.message.as_ref())
514 .map(|m| m.content.trim().to_string())
515 .unwrap_or_else(|| "New Chat".to_string());
516
517 let title = title.trim_matches('"').trim_matches('\'').to_string();
518
519 info!("Generated title: '{}'", title);
520 Ok(title)
521 }
522}