Skip to main content

offline_intelligence/model_runtime/
gguf_runtime.rs

1//! GGUF Runtime Adapter
2//!
3//! Wraps the existing llama-server (llama.cpp) for GGUF models.
4//! This adapter spawns the llama-server process and proxies requests via HTTP.
5
6use async_trait::async_trait;
7use super::runtime_trait::*;
8use std::process::{Child, Command, Stdio};
9use std::time::Duration;
10use tracing::{info, warn};
11use tokio::time::sleep;
12
13pub struct GGUFRuntime {
14    config: Option<RuntimeConfig>,
15    server_process: Option<Child>,
16    http_client: reqwest::Client,
17    base_url: String,
18}
19
20impl GGUFRuntime {
21    pub fn new() -> Self {
22        Self {
23            config: None,
24            server_process: None,
25            http_client: reqwest::Client::builder()
26                .timeout(Duration::from_secs(600))
27                .build()
28                .unwrap_or_default(),
29            base_url: String::new(),
30        }
31    }
32
33    /// Start llama-server process
34    async fn start_server(&mut self, config: &RuntimeConfig) -> anyhow::Result<()> {
35        let binary_path = config.runtime_binary.as_ref()
36            .ok_or_else(|| anyhow::anyhow!("GGUF runtime requires runtime_binary path"))?;
37
38        if !binary_path.exists() {
39            return Err(anyhow::anyhow!(
40                "llama-server binary not found at: {}",
41                binary_path.display()
42            ));
43        }
44
45        info!("Starting llama-server for GGUF model: {}", config.model_path.display());
46        info!("  Binary: {}", binary_path.display());
47        info!("  Port: {}", config.port);
48        info!("  Context Size: {}", config.context_size);
49        info!("  GPU Layers: {}", config.gpu_layers);
50
51        // Verify model file exists before starting
52        if !config.model_path.exists() {
53            return Err(anyhow::anyhow!(
54                "Model file not found at: {}",
55                config.model_path.display()
56            ));
57        }
58
59        // Build command arguments
60        let mut cmd = Command::new(binary_path);
61        cmd.arg("--model").arg(&config.model_path)
62            .arg("--host").arg(&config.host)
63            .arg("--port").arg(config.port.to_string())
64            .arg("--ctx-size").arg(config.context_size.to_string())
65            .arg("--batch-size").arg(config.batch_size.to_string())
66            .arg("--threads").arg(config.threads.to_string())
67            .arg("--n-gpu-layers").arg(config.gpu_layers.to_string());
68
69        // Log the full command for debugging
70        info!("📋 Full command: {:?} {}", binary_path,
71            format!("--model {} --host {} --port {} --ctx-size {} --batch-size {} --threads {} --n-gpu-layers {}",
72                config.model_path.display(), config.host, config.port,
73                config.context_size, config.batch_size, config.threads, config.gpu_layers));
74
75        // On macOS: set DYLD_LIBRARY_PATH to the directory that contains
76        // llama-server so that co-located dylibs (libllama.dylib,
77        // libggml.dylib, libggml-metal.dylib, libggml-cpu.dylib …) are
78        // found by dyld at process start.  Without this, the child process
79        // will exit immediately with a "dyld: Library not loaded" error.
80        #[cfg(target_os = "macos")]
81        {
82            if let Some(binary_dir) = binary_path.parent() {
83                let lib_path = binary_dir.to_string_lossy().to_string();
84                info!("macOS: setting DYLD_LIBRARY_PATH={}", lib_path);
85                // Prepend to any existing value so system dylibs are still found.
86                let existing = std::env::var("DYLD_LIBRARY_PATH").unwrap_or_default();
87                let new_val = if existing.is_empty() {
88                    lib_path
89                } else {
90                    format!("{}:{}", lib_path, existing)
91                };
92                cmd.env("DYLD_LIBRARY_PATH", new_val);
93            }
94        }
95
96        // On Windows, hide the console window
97        #[cfg(target_os = "windows")]
98        {
99            use std::os::windows::process::CommandExt;
100            const CREATE_NO_WINDOW: u32 = 0x08000000;
101            cmd.creation_flags(CREATE_NO_WINDOW);
102        }
103
104        cmd.stdout(Stdio::piped())
105            .stderr(Stdio::piped());
106
107        // Spawn the process
108        let child = cmd.spawn()
109            .map_err(|e| anyhow::anyhow!("Failed to spawn llama-server: {}", e))?;
110
111        self.server_process = Some(child);
112        self.base_url = format!("http://{}:{}", config.host, config.port);
113
114        info!("llama-server process started, waiting for health check...");
115
116        // Wait for server to be ready (up to 120 seconds) with exponential backoff.
117        // Checks at 100 ms → 200 ms → 400 ms → … → 2 s (cap) so a fast start is
118        // detected in < 200 ms instead of the old fixed 2 s minimum.
119        let _start = std::time::Instant::now();
120        let mut delay_ms: u64 = 100;
121        let mut last_log_secs: u64 = 0;
122        loop {
123            sleep(Duration::from_millis(delay_ms)).await;
124
125            if self.is_ready().await {
126                info!("✅ GGUF runtime ready after {:.1}s", _start.elapsed().as_secs_f64());
127                return Ok(());
128            }
129            let elapsed_secs = _start.elapsed().as_secs();
130            if elapsed_secs >= 120 {
131                break;
132            }
133            if elapsed_secs >= last_log_secs + 10 {
134                info!("Still waiting for llama-server... ({}/120s)", elapsed_secs);
135                last_log_secs = elapsed_secs;
136            }
137            delay_ms = (delay_ms * 2).min(2_000);
138        }
139
140        Err(anyhow::anyhow!("llama-server failed to become ready within 120 seconds"))
141    }
142
143    /// Send SIGTERM to the child process (Unix only) and wait up to
144    /// `grace_secs` seconds for it to exit before returning.
145    /// Returns true if the process exited gracefully, false on timeout.
146    #[cfg(unix)]
147    fn send_sigterm_and_wait(child: &mut Child, grace_secs: u64) -> bool {
148        if let Some(pid) = child.id() {
149            // `kill -TERM <pid>` — portable across macOS and Linux
150            let _ = std::process::Command::new("kill")
151                .args(["-TERM", &pid.to_string()])
152                .output();
153
154            let deadline = std::time::Instant::now() + Duration::from_secs(grace_secs);
155            while std::time::Instant::now() < deadline {
156                if let Ok(Some(_)) = child.try_wait() {
157                    return true; // exited gracefully
158                }
159                std::thread::sleep(Duration::from_millis(100));
160            }
161        }
162        false // timed out
163    }
164}
165
166impl Default for GGUFRuntime {
167    fn default() -> Self {
168        Self::new()
169    }
170}
171
172#[async_trait]
173impl ModelRuntime for GGUFRuntime {
174    fn supported_format(&self) -> ModelFormat {
175        ModelFormat::GGUF
176    }
177
178    async fn initialize(&mut self, config: RuntimeConfig) -> anyhow::Result<()> {
179        info!("Initializing GGUF runtime");
180
181        // Validate config
182        if config.format != ModelFormat::GGUF {
183            return Err(anyhow::anyhow!(
184                "GGUF runtime received wrong format: {:?}",
185                config.format
186            ));
187        }
188
189        if !config.model_path.exists() {
190            return Err(anyhow::anyhow!(
191                "Model file not found: {}",
192                config.model_path.display()
193            ));
194        }
195
196        self.config = Some(config.clone());
197        self.start_server(&config).await?;
198
199        Ok(())
200    }
201
202    async fn is_ready(&self) -> bool {
203        if self.base_url.is_empty() {
204            return false;
205        }
206
207        let health_url = format!("{}/health", self.base_url);
208        // Use a short per-request timeout for health probes so that the
209        // /healthz handler never blocks longer than 3 s even if llama-server
210        // is in a degraded/hung state (e.g. orphan process from a previous run).
211        match self.http_client
212            .get(&health_url)
213            .timeout(Duration::from_secs(3))
214            .send()
215            .await
216        {
217            Ok(resp) => resp.status().is_success(),
218            Err(_) => false,
219        }
220    }
221
222    async fn health_check(&self) -> anyhow::Result<String> {
223        if self.base_url.is_empty() {
224            return Err(anyhow::anyhow!("Runtime not initialized"));
225        }
226
227        let health_url = format!("{}/health", self.base_url);
228        let resp = self.http_client.get(&health_url)
229            .send()
230            .await
231            .map_err(|e| anyhow::anyhow!("Health check failed: {}", e))?;
232
233        if resp.status().is_success() {
234            Ok("healthy".to_string())
235        } else {
236            Err(anyhow::anyhow!("Health check returned: {}", resp.status()))
237        }
238    }
239
240    fn base_url(&self) -> String {
241        self.base_url.clone()
242    }
243
244    async fn generate(
245        &self,
246        request: InferenceRequest,
247    ) -> anyhow::Result<InferenceResponse> {
248        let url = self.completions_url();
249
250        let payload = serde_json::json!({
251            "model": "local-llm",
252            "messages": request.messages,
253            "max_tokens": request.max_tokens,
254            "temperature": request.temperature,
255            "stream": false,
256        });
257
258        let resp = self.http_client.post(&url)
259            .json(&payload)
260            .send()
261            .await
262            .map_err(|e| anyhow::anyhow!("Inference request failed: {}", e))?;
263
264        if !resp.status().is_success() {
265            let status = resp.status();
266            let body = resp.text().await.unwrap_or_default();
267            return Err(anyhow::anyhow!("Inference failed ({}): {}", status, body));
268        }
269
270        let response: serde_json::Value = resp.json().await
271            .map_err(|e| anyhow::anyhow!("Failed to parse response: {}", e))?;
272
273        let content = response["choices"][0]["message"]["content"]
274            .as_str()
275            .unwrap_or("")
276            .to_string();
277
278        let finish_reason = response["choices"][0]["finish_reason"]
279            .as_str()
280            .map(|s| s.to_string());
281
282        Ok(InferenceResponse {
283            content,
284            finish_reason,
285        })
286    }
287
288    async fn generate_stream(
289        &self,
290        request: InferenceRequest,
291    ) -> anyhow::Result<Box<dyn futures_util::Stream<Item = Result<String, anyhow::Error>> + Send + Unpin>> {
292        use futures_util::StreamExt;
293
294        let url = self.completions_url();
295
296        let payload = serde_json::json!({
297            "model": "local-llm",
298            "messages": request.messages,
299            "max_tokens": request.max_tokens,
300            "temperature": request.temperature,
301            "stream": true,
302        });
303
304        let resp = self.http_client.post(&url)
305            .json(&payload)
306            .send()
307            .await
308            .map_err(|e| anyhow::anyhow!("Stream request failed: {}", e))?;
309
310        if !resp.status().is_success() {
311            let status = resp.status();
312            let body = resp.text().await.unwrap_or_default();
313            return Err(anyhow::anyhow!("Stream failed ({}): {}", status, body));
314        }
315
316        let byte_stream = resp.bytes_stream();
317
318        let sse_stream = async_stream::try_stream! {
319            let mut buffer = String::new();
320            futures_util::pin_mut!(byte_stream);
321
322            while let Some(chunk_result) = byte_stream.next().await {
323                let chunk = chunk_result.map_err(|e| anyhow::anyhow!("Stream read error: {}", e))?;
324                buffer.push_str(&String::from_utf8_lossy(&chunk));
325
326                while let Some(newline_pos) = buffer.find('\n') {
327                    let line = buffer[..newline_pos].trim().to_string();
328                    buffer = buffer[newline_pos + 1..].to_string();
329
330                    if line.is_empty() || !line.starts_with("data: ") {
331                        continue;
332                    }
333
334                    let data = &line[6..];
335                    if data == "[DONE]" {
336                        return;
337                    }
338
339                    yield format!("data: {}\n\n", data);
340                }
341            }
342        };
343
344        Ok(Box::new(Box::pin(sse_stream)))
345    }
346
347    async fn shutdown(&mut self) -> anyhow::Result<()> {
348        info!("Shutting down GGUF runtime");
349
350        if let Some(mut child) = self.server_process.take() {
351            // On Unix (macOS + Linux): send SIGTERM first so llama-server can
352            // release Metal command queues / CUDA contexts gracefully.
353            // Give it up to 3 s before escalating to SIGKILL.
354            #[cfg(unix)]
355            {
356                // 1 s grace (was 3 s) — enough for llama-server to flush its Metal/CUDA
357                // contexts; any longer only adds latency to model switching.
358                let exited_gracefully = Self::send_sigterm_and_wait(&mut child, 1);
359                if exited_gracefully {
360                    info!("llama-server shut down gracefully after SIGTERM");
361                    return Ok(());
362                }
363                info!("llama-server did not exit after SIGTERM — sending SIGKILL");
364            }
365
366            // SIGKILL (or TerminateProcess on Windows)
367            match child.kill() {
368                Ok(_) => {
369                    info!("llama-server process killed");
370                    // wait() is safe here: we are in an async fn but this is
371                    // a blocking call on an already-dead process, so it returns
372                    // immediately.
373                    let _ = child.wait();
374                }
375                Err(e) => {
376                    // Process may have already exited on its own
377                    warn!("Failed to kill llama-server (may have already exited): {}", e);
378                }
379            }
380        }
381
382        self.config = None;
383        self.base_url.clear();
384        Ok(())
385    }
386
387    fn metadata(&self) -> RuntimeMetadata {
388        RuntimeMetadata {
389            format: ModelFormat::GGUF,
390            runtime_name: "llama.cpp (llama-server)".to_string(),
391            version: "latest".to_string(),
392            supports_gpu: true,
393            supports_streaming: true,
394        }
395    }
396}
397
398impl Drop for GGUFRuntime {
399    fn drop(&mut self) {
400        if let Some(mut child) = self.server_process.take() {
401            // Best-effort kill — we intentionally do NOT call child.wait() here
402            // because Drop can be invoked from an async Tokio context and a
403            // blocking wait would stall the thread-pool worker.
404            // The OS reclaims the zombie when the Tokio runtime itself exits.
405            let _ = child.kill();
406        }
407    }
408}