Skip to main content

offline_intelligence/worker_threads/
llm_worker.rs

1//! LLM worker thread implementation
2//!
3//! Handles LLM inference by proxying requests to the local llama-server process.
4//! This is the 1-hop architecture: shared memory state → HTTP to localhost llama-server.
5
6use std::sync::{Arc, RwLock};
7use futures_util::StreamExt;
8use tracing::{info, debug, warn};
9use serde::{Deserialize, Serialize};
10
11use crate::{
12    memory::Message,
13    model_runtime::RuntimeManager,
14};
15
16/// Chat completion request sent to llama-server (OpenAI-compatible format)
17#[derive(Debug, Serialize)]
18struct ChatCompletionRequest {
19    model: String,
20    messages: Vec<ChatMessage>,
21    max_tokens: u32,
22    temperature: f32,
23    stream: bool,
24}
25
26/// Embedding request sent to llama-server (OpenAI-compatible format)
27#[derive(Debug, Serialize)]
28struct EmbeddingRequest {
29    model: String,
30    input: Vec<String>,
31}
32
33/// Embedding response from llama-server
34#[derive(Debug, Deserialize)]
35struct EmbeddingResponse {
36    data: Vec<EmbeddingData>,
37}
38
39#[derive(Debug, Deserialize)]
40struct EmbeddingData {
41    embedding: Vec<f32>,
42}
43
44#[derive(Debug, Serialize, Deserialize, Clone)]
45struct ChatMessage {
46    role: String,
47    content: String,
48}
49
50/// Non-streaming response from llama-server
51#[derive(Debug, Deserialize)]
52struct ChatCompletionResponse {
53    choices: Vec<ChatChoice>,
54}
55
56#[derive(Debug, Deserialize)]
57struct ChatChoice {
58    message: Option<ChatMessage>,
59}
60
61/// Streaming delta chunk from llama-server
62#[derive(Debug, Deserialize)]
63struct StreamChunk {
64    choices: Vec<StreamChoice>,
65}
66
67#[derive(Debug, Deserialize)]
68struct StreamChoice {
69    delta: Option<ChatDelta>,
70    finish_reason: Option<String>,
71}
72
73#[derive(Debug, Deserialize, Clone)]
74struct ChatDelta {
75    content: Option<String>,
76}
77
78pub struct LLMWorker {
79    backend_url: String,
80    http_client: reqwest::Client,
81    runtime_manager: RwLock<Option<Arc<RuntimeManager>>>,
82}
83
84impl LLMWorker {
85    /// Create with shared state (legacy constructor)
86    pub fn new(shared_state: std::sync::Arc<crate::shared_state::SharedState>) -> Self {
87        let backend_url = shared_state.config.backend_url.clone();
88        Self {
89            backend_url,
90            http_client: reqwest::Client::builder()
91                .timeout(std::time::Duration::from_secs(600))
92                .build()
93                .unwrap_or_default(),
94            runtime_manager: RwLock::new(None),
95        }
96    }
97
98    /// Create with explicit backend URL
99    pub fn new_with_backend(backend_url: String) -> Self {
100        info!("LLM worker initialized with backend: {}", backend_url);
101        Self {
102            backend_url,
103            http_client: reqwest::Client::builder()
104                .timeout(std::time::Duration::from_secs(600))
105                .build()
106                .unwrap_or_default(),
107            runtime_manager: RwLock::new(None),
108        }
109    }
110
111    /// Set the runtime manager
112    pub fn set_runtime_manager(&self, runtime_manager: Arc<RuntimeManager>) {
113        if let Ok(mut guard) = self.runtime_manager.write() {
114            *guard = Some(runtime_manager);
115            info!("✅ Runtime manager linked to LLM worker");
116        } else {
117            warn!("⚠️  Failed to acquire lock to set runtime manager on LLM worker");
118        }
119    }
120    
121    /// Get the runtime manager if available
122    fn get_runtime_manager(&self) -> Option<Arc<RuntimeManager>> {
123        self.runtime_manager.read().ok().and_then(|guard| (*guard).clone())
124    }
125
126    /// Check if runtime is ready for inference
127    pub async fn is_runtime_ready(&self) -> bool {
128        let has_runtime_manager = self.runtime_manager.read().is_ok();
129        let result = if let Some(ref rm) = self.get_runtime_manager() {
130            let ready = rm.is_ready().await;
131            info!("LLMWorker is_ready: runtime_manager exists={}, is_ready={}", has_runtime_manager, ready);
132            ready
133        } else {
134            info!("LLMWorker is_ready: no runtime_manager set");
135            false
136        };
137        result
138    }
139
140    // Override the original methods to use runtime manager when available
141
142
143    /// Convert internal Message format to OpenAI-compatible ChatMessage
144    fn to_chat_messages(messages: &[Message]) -> Vec<ChatMessage> {
145        messages.iter().map(|m| ChatMessage {
146            role: m.role.clone(),
147            content: m.content.clone(),
148        }).collect()
149    }
150
151    /// Generate a complete (non-streaming) response from the LLM.
152    pub async fn generate_response(
153        &self,
154        _session_id: String,
155        context: Vec<Message>,
156    ) -> anyhow::Result<String> {
157        debug!("LLM worker generating response (non-streaming)");
158
159        let request = ChatCompletionRequest {
160            model: "local-llm".to_string(),
161            messages: Self::to_chat_messages(&context),
162            max_tokens: 2000,
163            temperature: 0.7,
164            stream: false,
165        };
166
167        // Determine the URL to use based on whether runtime manager is available AND ready
168        let url = if let Some(ref rm) = self.get_runtime_manager() {
169            // Check if runtime is actually initialized and ready
170            if rm.is_ready().await {
171                if let Some(base_url) = rm.get_base_url().await {
172                    format!("{}/v1/chat/completions", base_url)
173                } else {
174                    // Runtime manager exists but no base URL - engine not ready
175                    return Err(anyhow::anyhow!(
176                        "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
177                    ));
178                }
179            } else {
180                // Runtime manager exists but not ready
181                return Err(anyhow::anyhow!(
182                    "Model engine is not ready yet. Please load a model from the Models panel first."
183                ));
184            }
185        } else {
186            // No runtime manager set yet - still initializing or no engine installed
187            return Err(anyhow::anyhow!(
188                "No model loaded. Please download an engine and load a model from the Models panel."
189            ));
190        };
191        
192        let response = self.http_client
193            .post(&url)
194            .json(&request)
195            .send()
196            .await
197            .map_err(|e| {
198                if e.is_connect() {
199                    anyhow::anyhow!(
200                        "Cannot connect to local LLM server. Please download and load a model from the Models panel."
201                    )
202                } else {
203                    anyhow::anyhow!("LLM backend request failed: {}", e)
204                }
205            })?;
206
207        if !response.status().is_success() {
208            let status = response.status();
209            let body = response.text().await.unwrap_or_default();
210            return Err(anyhow::anyhow!("LLM backend returned {}: {}", status, body));
211        }
212
213        let completion: ChatCompletionResponse = response.json().await
214            .map_err(|e| anyhow::anyhow!("Failed to parse LLM response: {}", e))?;
215
216        let content = completion.choices
217            .first()
218            .and_then(|c| c.message.as_ref())
219            .map(|m| m.content.clone())
220            .unwrap_or_default();
221
222        Ok(content)
223    }
224
225    /// Stream response tokens from the LLM as Server-Sent Events.
226    /// Returns a stream of SSE-formatted strings ready to send to the client.
227    pub async fn stream_response(
228        &self,
229        messages: Vec<Message>,
230        max_tokens: u32,
231        temperature: f32,
232    ) -> anyhow::Result<impl futures_util::Stream<Item = Result<String, anyhow::Error>>> {
233        debug!("LLM worker starting streaming response");
234
235        let request = ChatCompletionRequest {
236            model: "local-llm".to_string(),
237            messages: Self::to_chat_messages(&messages),
238            max_tokens,
239            temperature,
240            stream: true,
241        };
242
243        // Determine the URL to use based on whether runtime manager is available AND ready
244        let url = if let Some(ref rm) = self.get_runtime_manager() {
245            // Check if runtime is actually initialized and ready
246            if rm.is_ready().await {
247                if let Some(base_url) = rm.get_base_url().await {
248                    format!("{}/v1/chat/completions", base_url)
249                } else {
250                    // Runtime manager exists but no base URL - engine not ready
251                    return Err(anyhow::anyhow!(
252                        "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
253                    ));
254                }
255            } else {
256                // Runtime manager exists but not ready
257                return Err(anyhow::anyhow!(
258                    "Model engine is not ready yet. Please load a model from the Models panel first."
259                ));
260            }
261        } else {
262            // No runtime manager set yet - still initializing or no engine installed
263            return Err(anyhow::anyhow!(
264                "No model loaded. Please download an engine and load a model from the Models panel."
265            ));
266        };
267        
268        let response = self.http_client
269            .post(&url)
270            .json(&request)
271            .send()
272            .await
273            .map_err(|e| {
274                if e.is_connect() {
275                    anyhow::anyhow!(
276                        "Cannot connect to local LLM server. Please download and load a model from the Models panel."
277                    )
278                } else {
279                    anyhow::anyhow!("LLM backend request failed: {}", e)
280                }
281            })?;
282
283        if !response.status().is_success() {
284            let status = response.status();
285            let body = response.text().await.unwrap_or_default();
286            return Err(anyhow::anyhow!("LLM backend returned {}: {}", status, body));
287        }
288
289        let byte_stream = response.bytes_stream();
290
291        let sse_stream = async_stream::try_stream! {
292            let mut buffer = String::new();
293
294            futures_util::pin_mut!(byte_stream);
295
296            while let Some(chunk_result) = byte_stream.next().await {
297                let chunk = chunk_result
298                    .map_err(|e| anyhow::anyhow!("Stream read error: {}", e))?;
299
300                buffer.push_str(&String::from_utf8_lossy(&chunk));
301
302                while let Some(newline_pos) = buffer.find('\n') {
303                    let line = buffer[..newline_pos].trim().to_string();
304                    buffer = buffer[newline_pos + 1..].to_string();
305
306                    if line.is_empty() {
307                        continue;
308                    }
309
310                    if line.starts_with("data: ") {
311                        let data = &line[6..];
312
313                        if data == "[DONE]" {
314                            yield "data: [DONE]\n\n".to_string();
315                            return;
316                        }
317
318                        match serde_json::from_str::<StreamChunk>(data) {
319                            Ok(chunk) => {
320                                let finished = chunk.choices.iter()
321                                    .any(|c| c.finish_reason.is_some());
322
323                                yield format!("data: {}\n\n", data);
324
325                                if finished {
326                                    yield "data: [DONE]\n\n".to_string();
327                                    return;
328                                }
329                            }
330                            Err(_) => {
331                                yield format!("data: {}\n\n", data);
332                            }
333                        }
334                    }
335                }
336            }
337
338            yield "data: [DONE]\n\n".to_string();
339        };
340
341        Ok(sse_stream)
342    }
343
344    /// Batch process multiple prompts (non-streaming)
345    pub async fn batch_process(
346        &self,
347        prompts: Vec<(String, Vec<Message>)>,
348    ) -> anyhow::Result<Vec<String>> {
349        debug!("LLM worker batch processing {} prompts", prompts.len());
350
351        let mut responses = Vec::new();
352        for (session_id, messages) in prompts {
353            match self.generate_response(session_id.clone(), messages).await {
354                Ok(response) => responses.push(response),
355                Err(e) => {
356                    warn!("Batch item {} failed: {}", session_id, e);
357                    responses.push(format!("Error: {}", e));
358                }
359            }
360        }
361
362        info!("Batch processed {} prompts", responses.len());
363        Ok(responses)
364    }
365
366    /// Initialize LLM model (no-op for HTTP proxy mode)
367    pub async fn initialize_model(&self, model_path: &str) -> anyhow::Result<()> {
368        debug!("LLM worker model init (HTTP proxy mode): {}", model_path);
369        Ok(())
370    }
371
372    /// Generate embeddings for one or more text inputs via llama-server's /v1/embeddings endpoint.
373    /// This reuses the vectors llama.cpp already computes during inference — no separate model needed.
374    /// Returns a Vec of embedding vectors (one per input string).
375    pub async fn generate_embeddings(
376        &self,
377        texts: Vec<String>,
378    ) -> anyhow::Result<Vec<Vec<f32>>> {
379        if texts.is_empty() {
380            return Ok(Vec::new());
381        }
382
383        debug!("Generating embeddings for {} text(s) via llama-server", texts.len());
384
385        let request = EmbeddingRequest {
386            model: "local-llm".to_string(),
387            input: texts,
388        };
389
390        // Determine the URL to use based on whether runtime manager is available AND ready
391        let url = if let Some(ref rm) = self.get_runtime_manager() {
392            // Check if runtime is actually initialized and ready
393            if rm.is_ready().await {
394                if let Some(base_url) = rm.get_base_url().await {
395                    format!("{}/v1/embeddings", base_url)
396                } else {
397                    // Runtime manager exists but no base URL - engine not ready
398                    return Err(anyhow::anyhow!(
399                        "Model engine is initializing. Please wait a moment and try again."
400                    ));
401                }
402            } else {
403                // Runtime manager exists but not ready
404                return Err(anyhow::anyhow!(
405                    "Model engine is not ready yet. Please load a model from the Models panel first."
406                ));
407            }
408        } else {
409            // No runtime manager set yet - still initializing or no engine installed
410            return Err(anyhow::anyhow!(
411                "No model loaded. Please download an engine and load a model from the Models panel."
412            ));
413        };
414        
415        let response = self.http_client
416            .post(&url)
417            .json(&request)
418            .send()
419            .await
420            .map_err(|e| anyhow::anyhow!("Embedding request failed: {}", e))?;
421
422        if !response.status().is_success() {
423            let status = response.status();
424            let body = response.text().await.unwrap_or_default();
425            return Err(anyhow::anyhow!("Embedding endpoint returned {}: {}", status, body));
426        }
427
428        let embedding_response: EmbeddingResponse = response.json().await
429            .map_err(|e| anyhow::anyhow!("Failed to parse embedding response: {}", e))?;
430
431        let embeddings: Vec<Vec<f32>> = embedding_response.data
432            .into_iter()
433            .map(|d| d.embedding)
434            .collect();
435
436        debug!("Generated {} embeddings (dim={})",
437            embeddings.len(),
438            embeddings.first().map(|e| e.len()).unwrap_or(0));
439
440        Ok(embeddings)
441    }
442
443    /// Generate title for a chat using the LLM
444    pub async fn generate_title(
445        &self,
446        prompt: &str,
447        max_tokens: u32,
448    ) -> anyhow::Result<String> {
449        debug!("LLM worker generating title for prompt ({} chars)", prompt.len());
450
451        let messages = vec![Message {
452            role: "user".to_string(),
453            content: prompt.to_string(),
454        }];
455
456        let request = ChatCompletionRequest {
457            model: "local-llm".to_string(),
458            messages: Self::to_chat_messages(&messages),
459            max_tokens: max_tokens.min(20),
460            temperature: 0.3,
461            stream: false,
462        };
463
464        // Determine the URL to use based on whether runtime manager is available AND ready
465        let url = if let Some(ref rm) = self.get_runtime_manager() {
466            // Check if runtime is actually initialized and ready
467            if rm.is_ready().await {
468                if let Some(base_url) = rm.get_base_url().await {
469                    format!("{}/v1/chat/completions", base_url)
470                } else {
471                    // Runtime manager exists but no base URL - engine not ready
472                    return Err(anyhow::anyhow!(
473                        "Model engine is initializing. Please wait a moment and try again, or load a model from the Models panel."
474                    ));
475                }
476            } else {
477                // Runtime manager exists but not ready
478                return Err(anyhow::anyhow!(
479                    "Model engine is not ready yet. Please load a model from the Models panel first."
480                ));
481            }
482        } else {
483            // No runtime manager set yet - still initializing or no engine installed
484            return Err(anyhow::anyhow!(
485                "No model loaded. Please download an engine and load a model from the Models panel."
486            ));
487        };
488        
489        let response = self.http_client
490            .post(&url)
491            .json(&request)
492            .send()
493            .await
494            .map_err(|e| anyhow::anyhow!("Title generation request failed: {}", e))?;
495
496        if !response.status().is_success() {
497            let status = response.status();
498            let body = response.text().await.unwrap_or_default();
499            return Err(anyhow::anyhow!("Title generation failed ({}): {}", status, body));
500        }
501
502        let completion: ChatCompletionResponse = response.json().await
503            .map_err(|e| anyhow::anyhow!("Failed to parse title response: {}", e))?;
504
505        let title = completion.choices
506            .first()
507            .and_then(|c| c.message.as_ref())
508            .map(|m| m.content.trim().to_string())
509            .unwrap_or_else(|| "New Chat".to_string());
510
511        let title = title.trim_matches('"').trim_matches('\'').to_string();
512
513        info!("Generated title: '{}'", title);
514        Ok(title)
515    }
516}