Skip to main content

offline_intelligence/worker_threads/
llm_worker.rs

1//! LLM worker thread implementation
2//!
3//! Handles LLM inference by proxying requests to the local llama-server process.
4//! This is the 1-hop architecture: shared memory state → HTTP to localhost llama-server.
5
6use std::sync::{Arc, RwLock};
7use futures_util::StreamExt;
8use tracing::{info, debug, warn};
9use serde::{Deserialize, Serialize};
10
11use crate::{
12    memory::Message,
13    model_runtime::RuntimeManager,
14};
15
16/// Chat completion request sent to llama-server (OpenAI-compatible format)
17#[derive(Debug, Serialize)]
18struct ChatCompletionRequest {
19    model: String,
20    messages: Vec<ChatMessage>,
21    max_tokens: u32,
22    temperature: f32,
23    stream: bool,
24    /// Cache the KV state of this prompt so repeated system-prompt prefixes
25    /// reuse the cached state. Drops warm TTFT from ~50ms to ~5ms.
26    cache_prompt: bool,
27}
28
29/// Embedding request sent to llama-server (OpenAI-compatible format)
30#[derive(Debug, Serialize)]
31struct EmbeddingRequest {
32    model: String,
33    input: Vec<String>,
34}
35
36/// Embedding response from llama-server
37#[derive(Debug, Deserialize)]
38struct EmbeddingResponse {
39    data: Vec<EmbeddingData>,
40}
41
42#[derive(Debug, Deserialize)]
43struct EmbeddingData {
44    embedding: Vec<f32>,
45}
46
47#[derive(Debug, Serialize, Deserialize, Clone)]
48struct ChatMessage {
49    role: String,
50    content: String,
51}
52
53/// Non-streaming response from llama-server
54#[derive(Debug, Deserialize)]
55struct ChatCompletionResponse {
56    choices: Vec<ChatChoice>,
57}
58
59#[derive(Debug, Deserialize)]
60struct ChatChoice {
61    message: Option<ChatMessage>,
62}
63
64/// Streaming delta chunk from llama-server
65#[derive(Debug, Deserialize)]
66struct StreamChunk {
67    choices: Vec<StreamChoice>,
68}
69
70#[derive(Debug, Deserialize)]
71struct StreamChoice {
72    delta: Option<ChatDelta>,
73    finish_reason: Option<String>,
74}
75
76#[derive(Debug, Deserialize, Clone)]
77struct ChatDelta {
78    content: Option<String>,
79}
80
81pub struct LLMWorker {
82    backend_url: String,
83    http_client: reqwest::Client,
84    runtime_manager: RwLock<Option<Arc<RuntimeManager>>>,
85}
86
87impl LLMWorker {
88    /// Create with shared state (legacy constructor)
89    pub fn new(shared_state: std::sync::Arc<crate::shared_state::SharedState>) -> Self {
90        let backend_url = shared_state.config.backend_url.clone();
91        Self {
92            backend_url,
93            http_client: reqwest::Client::builder()
94                .timeout(std::time::Duration::from_secs(600))
95                .build()
96                .unwrap_or_default(),
97            runtime_manager: RwLock::new(None),
98        }
99    }
100
101    /// Create with explicit backend URL
102    pub fn new_with_backend(backend_url: String) -> Self {
103        info!("LLM worker initialized with backend: {}", backend_url);
104        Self {
105            backend_url,
106            http_client: reqwest::Client::builder()
107                .timeout(std::time::Duration::from_secs(600))
108                .build()
109                .unwrap_or_default(),
110            runtime_manager: RwLock::new(None),
111        }
112    }
113
114    /// Set the runtime manager
115    pub fn set_runtime_manager(&self, runtime_manager: Arc<RuntimeManager>) {
116        if let Ok(mut guard) = self.runtime_manager.write() {
117            *guard = Some(runtime_manager);
118            info!("✅ Runtime manager linked to LLM worker");
119        } else {
120            warn!("⚠️  Failed to acquire lock to set runtime manager on LLM worker");
121        }
122    }
123    
124    /// Get the runtime manager if available
125    fn get_runtime_manager(&self) -> Option<Arc<RuntimeManager>> {
126        self.runtime_manager.read().ok().and_then(|guard| (*guard).clone())
127    }
128
129    /// Check if runtime is ready for inference
130    pub async fn is_runtime_ready(&self) -> bool {
131        let has_runtime_manager = self.runtime_manager.read().is_ok();
132        let result = if let Some(ref rm) = self.get_runtime_manager() {
133            let ready = rm.is_ready().await;
134            info!("LLMWorker is_ready: runtime_manager exists={}, is_ready={}", has_runtime_manager, ready);
135            ready
136        } else {
137            info!("LLMWorker is_ready: no runtime_manager set");
138            false
139        };
140        result
141    }
142
143    // Override the original methods to use runtime manager when available
144
145
146    /// Convert internal Message format to OpenAI-compatible ChatMessage
147    fn to_chat_messages(messages: &[Message]) -> Vec<ChatMessage> {
148        messages.iter().map(|m| ChatMessage {
149            role: m.role.clone(),
150            content: m.content.clone(),
151        }).collect()
152    }
153
154    /// Generate a complete (non-streaming) response from the LLM.
155    pub async fn generate_response(
156        &self,
157        _session_id: String,
158        context: Vec<Message>,
159    ) -> anyhow::Result<String> {
160        debug!("LLM worker generating response (non-streaming)");
161
162        let request = ChatCompletionRequest {
163            model: "local-llm".to_string(),
164            messages: Self::to_chat_messages(&context),
165            max_tokens: 2000,
166            temperature: 0.7,
167            stream: false,
168            cache_prompt: true,
169        };
170
171        // Determine the URL to use based on whether runtime manager is available AND ready
172        let url = if let Some(ref rm) = self.get_runtime_manager() {
173            // Check if runtime is actually initialized and ready
174            if rm.is_ready().await {
175                if let Some(base_url) = rm.get_base_url().await {
176                    format!("{}/v1/chat/completions", base_url)
177                } else {
178                    // Runtime manager exists but no base URL - engine not ready
179                    return Err(anyhow::anyhow!(
180                        "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
181                    ));
182                }
183            } else {
184                // Runtime manager exists but not ready
185                return Err(anyhow::anyhow!(
186                    "Model engine is not ready yet. Please load a model from the Models panel first."
187                ));
188            }
189        } else {
190            // No runtime manager set yet - still initializing or no engine installed
191            return Err(anyhow::anyhow!(
192                "No model loaded. Please download an engine and load a model from the Models panel."
193            ));
194        };
195        
196        let response = self.http_client
197            .post(&url)
198            .json(&request)
199            .send()
200            .await
201            .map_err(|e| {
202                if e.is_connect() {
203                    anyhow::anyhow!(
204                        "Cannot connect to local LLM server. Please download and load a model from the Models panel."
205                    )
206                } else {
207                    anyhow::anyhow!("LLM backend request failed: {}", e)
208                }
209            })?;
210
211        if !response.status().is_success() {
212            let status = response.status();
213            let body = response.text().await.unwrap_or_default();
214            return Err(anyhow::anyhow!("LLM backend returned {}: {}", status, body));
215        }
216
217        let completion: ChatCompletionResponse = response.json().await
218            .map_err(|e| anyhow::anyhow!("Failed to parse LLM response: {}", e))?;
219
220        let content = completion.choices
221            .first()
222            .and_then(|c| c.message.as_ref())
223            .map(|m| m.content.clone())
224            .unwrap_or_default();
225
226        Ok(content)
227    }
228
229    /// Stream response tokens from the LLM as Server-Sent Events.
230    /// Returns a stream of SSE-formatted strings ready to send to the client.
231    pub async fn stream_response(
232        &self,
233        messages: Vec<Message>,
234        max_tokens: u32,
235        temperature: f32,
236    ) -> anyhow::Result<impl futures_util::Stream<Item = Result<String, anyhow::Error>>> {
237        debug!("LLM worker starting streaming response");
238
239        let request = ChatCompletionRequest {
240            model: "local-llm".to_string(),
241            messages: Self::to_chat_messages(&messages),
242            max_tokens,
243            temperature,
244            stream: true,
245            cache_prompt: true,
246        };
247
248        // Determine the URL to use based on whether runtime manager is available AND ready
249        let url = if let Some(ref rm) = self.get_runtime_manager() {
250            // Check if runtime is actually initialized and ready
251            if rm.is_ready().await {
252                if let Some(base_url) = rm.get_base_url().await {
253                    format!("{}/v1/chat/completions", base_url)
254                } else {
255                    // Runtime manager exists but no base URL - engine not ready
256                    return Err(anyhow::anyhow!(
257                        "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
258                    ));
259                }
260            } else {
261                // Runtime manager exists but not ready
262                return Err(anyhow::anyhow!(
263                    "Model engine is not ready yet. Please load a model from the Models panel first."
264                ));
265            }
266        } else {
267            // No runtime manager set yet - still initializing or no engine installed
268            return Err(anyhow::anyhow!(
269                "No model loaded. Please download an engine and load a model from the Models panel."
270            ));
271        };
272        
273        let response = self.http_client
274            .post(&url)
275            .json(&request)
276            .send()
277            .await
278            .map_err(|e| {
279                if e.is_connect() {
280                    anyhow::anyhow!(
281                        "Cannot connect to local LLM server. Please download and load a model from the Models panel."
282                    )
283                } else {
284                    anyhow::anyhow!("LLM backend request failed: {}", e)
285                }
286            })?;
287
288        if !response.status().is_success() {
289            let status = response.status();
290            let body = response.text().await.unwrap_or_default();
291            return Err(anyhow::anyhow!("LLM backend returned {}: {}", status, body));
292        }
293
294        let byte_stream = response.bytes_stream();
295
296        let sse_stream = async_stream::try_stream! {
297            let mut buffer = String::new();
298
299            futures_util::pin_mut!(byte_stream);
300
301            while let Some(chunk_result) = byte_stream.next().await {
302                let chunk = chunk_result
303                    .map_err(|e| anyhow::anyhow!("Stream read error: {}", e))?;
304
305                buffer.push_str(&String::from_utf8_lossy(&chunk));
306
307                while let Some(newline_pos) = buffer.find('\n') {
308                    let line = buffer[..newline_pos].trim().to_string();
309                    buffer = buffer[newline_pos + 1..].to_string();
310
311                    if line.is_empty() {
312                        continue;
313                    }
314
315                    if line.starts_with("data: ") {
316                        let data = &line[6..];
317
318                        if data == "[DONE]" {
319                            yield "data: [DONE]\n\n".to_string();
320                            return;
321                        }
322
323                        match serde_json::from_str::<StreamChunk>(data) {
324                            Ok(chunk) => {
325                                let finished = chunk.choices.iter()
326                                    .any(|c| c.finish_reason.is_some());
327
328                                yield format!("data: {}\n\n", data);
329
330                                if finished {
331                                    yield "data: [DONE]\n\n".to_string();
332                                    return;
333                                }
334                            }
335                            Err(_) => {
336                                yield format!("data: {}\n\n", data);
337                            }
338                        }
339                    }
340                }
341            }
342
343            yield "data: [DONE]\n\n".to_string();
344        };
345
346        Ok(sse_stream)
347    }
348
349    /// Batch process multiple prompts (non-streaming)
350    pub async fn batch_process(
351        &self,
352        prompts: Vec<(String, Vec<Message>)>,
353    ) -> anyhow::Result<Vec<String>> {
354        debug!("LLM worker batch processing {} prompts", prompts.len());
355
356        let mut responses = Vec::new();
357        for (session_id, messages) in prompts {
358            match self.generate_response(session_id.clone(), messages).await {
359                Ok(response) => responses.push(response),
360                Err(e) => {
361                    warn!("Batch item {} failed: {}", session_id, e);
362                    responses.push(format!("Error: {}", e));
363                }
364            }
365        }
366
367        info!("Batch processed {} prompts", responses.len());
368        Ok(responses)
369    }
370
371    /// Initialize LLM model (no-op for HTTP proxy mode)
372    pub async fn initialize_model(&self, model_path: &str) -> anyhow::Result<()> {
373        debug!("LLM worker model init (HTTP proxy mode): {}", model_path);
374        Ok(())
375    }
376
377    /// Generate embeddings for one or more text inputs via llama-server's /v1/embeddings endpoint.
378    /// This reuses the vectors llama.cpp already computes during inference — no separate model needed.
379    /// Returns a Vec of embedding vectors (one per input string).
380    pub async fn generate_embeddings(
381        &self,
382        texts: Vec<String>,
383    ) -> anyhow::Result<Vec<Vec<f32>>> {
384        if texts.is_empty() {
385            return Ok(Vec::new());
386        }
387
388        debug!("Generating embeddings for {} text(s) via llama-server", texts.len());
389
390        let request = EmbeddingRequest {
391            model: "local-llm".to_string(),
392            input: texts,
393        };
394
395        // Determine the URL to use based on whether runtime manager is available AND ready
396        let url = if let Some(ref rm) = self.get_runtime_manager() {
397            // Check if runtime is actually initialized and ready
398            if rm.is_ready().await {
399                if let Some(base_url) = rm.get_base_url().await {
400                    format!("{}/v1/embeddings", base_url)
401                } else {
402                    // Runtime manager exists but no base URL - engine not ready
403                    return Err(anyhow::anyhow!(
404                        "Model engine is initializing. Please wait a moment and try again."
405                    ));
406                }
407            } else {
408                // Runtime manager exists but not ready
409                return Err(anyhow::anyhow!(
410                    "Model engine is not ready yet. Please load a model from the Models panel first."
411                ));
412            }
413        } else {
414            // No runtime manager set yet - still initializing or no engine installed
415            return Err(anyhow::anyhow!(
416                "No model loaded. Please download an engine and load a model from the Models panel."
417            ));
418        };
419        
420        let response = self.http_client
421            .post(&url)
422            .json(&request)
423            .send()
424            .await
425            .map_err(|e| anyhow::anyhow!("Embedding request failed: {}", e))?;
426
427        if !response.status().is_success() {
428            let status = response.status();
429            let body = response.text().await.unwrap_or_default();
430            return Err(anyhow::anyhow!("Embedding endpoint returned {}: {}", status, body));
431        }
432
433        let embedding_response: EmbeddingResponse = response.json().await
434            .map_err(|e| anyhow::anyhow!("Failed to parse embedding response: {}", e))?;
435
436        let embeddings: Vec<Vec<f32>> = embedding_response.data
437            .into_iter()
438            .map(|d| d.embedding)
439            .collect();
440
441        debug!("Generated {} embeddings (dim={})",
442            embeddings.len(),
443            embeddings.first().map(|e| e.len()).unwrap_or(0));
444
445        Ok(embeddings)
446    }
447
448    /// Generate title for a chat using the LLM
449    pub async fn generate_title(
450        &self,
451        prompt: &str,
452        max_tokens: u32,
453    ) -> anyhow::Result<String> {
454        debug!("LLM worker generating title for prompt ({} chars)", prompt.len());
455
456        let messages = vec![Message {
457            role: "user".to_string(),
458            content: prompt.to_string(),
459        }];
460
461        let request = ChatCompletionRequest {
462            model: "local-llm".to_string(),
463            messages: Self::to_chat_messages(&messages),
464            max_tokens: max_tokens.min(20),
465            temperature: 0.3,
466            stream: false,
467            cache_prompt: true,
468        };
469
470        // Determine the URL to use based on whether runtime manager is available AND ready
471        let url = if let Some(ref rm) = self.get_runtime_manager() {
472            // Check if runtime is actually initialized and ready
473            if rm.is_ready().await {
474                if let Some(base_url) = rm.get_base_url().await {
475                    format!("{}/v1/chat/completions", base_url)
476                } else {
477                    // Runtime manager exists but no base URL - engine not ready
478                    return Err(anyhow::anyhow!(
479                        "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
480                    ));
481                }
482            } else {
483                // Runtime manager exists but not ready
484                return Err(anyhow::anyhow!(
485                    "Model engine is not ready yet. Please load a model from the Models panel first."
486                ));
487            }
488        } else {
489            // No runtime manager set yet - still initializing or no engine installed
490            return Err(anyhow::anyhow!(
491                "No model loaded. Please download an engine and load a model from the Models panel."
492            ));
493        };
494        
495        let response = self.http_client
496            .post(&url)
497            .json(&request)
498            .send()
499            .await
500            .map_err(|e| anyhow::anyhow!("Title generation request failed: {}", e))?;
501
502        if !response.status().is_success() {
503            let status = response.status();
504            let body = response.text().await.unwrap_or_default();
505            return Err(anyhow::anyhow!("Title generation failed ({}): {}", status, body));
506        }
507
508        let completion: ChatCompletionResponse = response.json().await
509            .map_err(|e| anyhow::anyhow!("Failed to parse title response: {}", e))?;
510
511        let title = completion.choices
512            .first()
513            .and_then(|c| c.message.as_ref())
514            .map(|m| m.content.trim().to_string())
515            .unwrap_or_else(|| "New Chat".to_string());
516
517        let title = title.trim_matches('"').trim_matches('\'').to_string();
518
519        info!("Generated title: '{}'", title);
520        Ok(title)
521    }
522}