offline_intelligence/model_runtime/
gguf_runtime.rs1use async_trait::async_trait;
7use super::runtime_trait::*;
8use std::process::{Child, Command, Stdio};
9use std::time::Duration;
10use tracing::{info, warn};
11use tokio::time::sleep;
12
13pub struct GGUFRuntime {
14 config: Option<RuntimeConfig>,
15 server_process: Option<Child>,
16 http_client: reqwest::Client,
17 base_url: String,
18}
19
20impl GGUFRuntime {
21 pub fn new() -> Self {
22 Self {
23 config: None,
24 server_process: None,
25 http_client: reqwest::Client::builder()
26 .timeout(Duration::from_secs(600))
27 .build()
28 .unwrap_or_default(),
29 base_url: String::new(),
30 }
31 }
32
33 async fn start_server(&mut self, config: &RuntimeConfig) -> anyhow::Result<()> {
35 let binary_path = config.runtime_binary.as_ref()
36 .ok_or_else(|| anyhow::anyhow!("GGUF runtime requires runtime_binary path"))?;
37
38 if !binary_path.exists() {
39 return Err(anyhow::anyhow!(
40 "llama-server binary not found at: {}",
41 binary_path.display()
42 ));
43 }
44
45 info!("Starting llama-server for GGUF model: {}", config.model_path.display());
46 info!(" Binary: {}", binary_path.display());
47 info!(" Port: {}", config.port);
48 info!(" Context Size: {}", config.context_size);
49 info!(" GPU Layers: {}", config.gpu_layers);
50
51 if !config.model_path.exists() {
53 return Err(anyhow::anyhow!(
54 "Model file not found at: {}",
55 config.model_path.display()
56 ));
57 }
58
59 let mut cmd = Command::new(binary_path);
61 cmd.arg("--model").arg(&config.model_path)
62 .arg("--host").arg(&config.host)
63 .arg("--port").arg(config.port.to_string())
64 .arg("--ctx-size").arg(config.context_size.to_string())
65 .arg("--batch-size").arg(config.batch_size.to_string())
66 .arg("--threads").arg(config.threads.to_string())
67 .arg("--n-gpu-layers").arg(config.gpu_layers.to_string());
68
69 info!("📋 Full command: {:?} {}", binary_path,
71 format!("--model {} --host {} --port {} --ctx-size {} --batch-size {} --threads {} --n-gpu-layers {}",
72 config.model_path.display(), config.host, config.port,
73 config.context_size, config.batch_size, config.threads, config.gpu_layers));
74
75 #[cfg(target_os = "macos")]
81 {
82 if let Some(binary_dir) = binary_path.parent() {
83 let lib_path = binary_dir.to_string_lossy().to_string();
84 info!("macOS: setting DYLD_LIBRARY_PATH={}", lib_path);
85 let existing = std::env::var("DYLD_LIBRARY_PATH").unwrap_or_default();
87 let new_val = if existing.is_empty() {
88 lib_path
89 } else {
90 format!("{}:{}", lib_path, existing)
91 };
92 cmd.env("DYLD_LIBRARY_PATH", new_val);
93 }
94 }
95
96 #[cfg(target_os = "windows")]
98 {
99 use std::os::windows::process::CommandExt;
100 const CREATE_NO_WINDOW: u32 = 0x08000000;
101 cmd.creation_flags(CREATE_NO_WINDOW);
102 }
103
104 cmd.stdout(Stdio::piped())
105 .stderr(Stdio::piped());
106
107 let child = cmd.spawn()
109 .map_err(|e| anyhow::anyhow!("Failed to spawn llama-server: {}", e))?;
110
111 self.server_process = Some(child);
112 self.base_url = format!("http://{}:{}", config.host, config.port);
113
114 info!("llama-server process started, waiting for health check...");
115
116 let _start = std::time::Instant::now();
120 let mut delay_ms: u64 = 100;
121 let mut last_log_secs: u64 = 0;
122 loop {
123 sleep(Duration::from_millis(delay_ms)).await;
124
125 if self.is_ready().await {
126 info!("✅ GGUF runtime ready after {:.1}s", _start.elapsed().as_secs_f64());
127 return Ok(());
128 }
129 let elapsed_secs = _start.elapsed().as_secs();
130 if elapsed_secs >= 120 {
131 break;
132 }
133 if elapsed_secs >= last_log_secs + 10 {
134 info!("Still waiting for llama-server... ({}/120s)", elapsed_secs);
135 last_log_secs = elapsed_secs;
136 }
137 delay_ms = (delay_ms * 2).min(2_000);
138 }
139
140 Err(anyhow::anyhow!("llama-server failed to become ready within 120 seconds"))
141 }
142
143 #[cfg(unix)]
147 fn send_sigterm_and_wait(child: &mut Child, grace_secs: u64) -> bool {
148 if let Some(pid) = child.id() {
149 let _ = std::process::Command::new("kill")
151 .args(["-TERM", &pid.to_string()])
152 .output();
153
154 let deadline = std::time::Instant::now() + Duration::from_secs(grace_secs);
155 while std::time::Instant::now() < deadline {
156 if let Ok(Some(_)) = child.try_wait() {
157 return true; }
159 std::thread::sleep(Duration::from_millis(100));
160 }
161 }
162 false }
164}
165
166impl Default for GGUFRuntime {
167 fn default() -> Self {
168 Self::new()
169 }
170}
171
172#[async_trait]
173impl ModelRuntime for GGUFRuntime {
174 fn supported_format(&self) -> ModelFormat {
175 ModelFormat::GGUF
176 }
177
178 async fn initialize(&mut self, config: RuntimeConfig) -> anyhow::Result<()> {
179 info!("Initializing GGUF runtime");
180
181 if config.format != ModelFormat::GGUF {
183 return Err(anyhow::anyhow!(
184 "GGUF runtime received wrong format: {:?}",
185 config.format
186 ));
187 }
188
189 if !config.model_path.exists() {
190 return Err(anyhow::anyhow!(
191 "Model file not found: {}",
192 config.model_path.display()
193 ));
194 }
195
196 self.config = Some(config.clone());
197 self.start_server(&config).await?;
198
199 Ok(())
200 }
201
202 async fn is_ready(&self) -> bool {
203 if self.base_url.is_empty() {
204 return false;
205 }
206
207 let health_url = format!("{}/health", self.base_url);
208 match self.http_client
212 .get(&health_url)
213 .timeout(Duration::from_secs(3))
214 .send()
215 .await
216 {
217 Ok(resp) => resp.status().is_success(),
218 Err(_) => false,
219 }
220 }
221
222 async fn health_check(&self) -> anyhow::Result<String> {
223 if self.base_url.is_empty() {
224 return Err(anyhow::anyhow!("Runtime not initialized"));
225 }
226
227 let health_url = format!("{}/health", self.base_url);
228 let resp = self.http_client.get(&health_url)
229 .send()
230 .await
231 .map_err(|e| anyhow::anyhow!("Health check failed: {}", e))?;
232
233 if resp.status().is_success() {
234 Ok("healthy".to_string())
235 } else {
236 Err(anyhow::anyhow!("Health check returned: {}", resp.status()))
237 }
238 }
239
240 fn base_url(&self) -> String {
241 self.base_url.clone()
242 }
243
244 async fn generate(
245 &self,
246 request: InferenceRequest,
247 ) -> anyhow::Result<InferenceResponse> {
248 let url = self.completions_url();
249
250 let payload = serde_json::json!({
251 "model": "local-llm",
252 "messages": request.messages,
253 "max_tokens": request.max_tokens,
254 "temperature": request.temperature,
255 "stream": false,
256 });
257
258 let resp = self.http_client.post(&url)
259 .json(&payload)
260 .send()
261 .await
262 .map_err(|e| anyhow::anyhow!("Inference request failed: {}", e))?;
263
264 if !resp.status().is_success() {
265 let status = resp.status();
266 let body = resp.text().await.unwrap_or_default();
267 return Err(anyhow::anyhow!("Inference failed ({}): {}", status, body));
268 }
269
270 let response: serde_json::Value = resp.json().await
271 .map_err(|e| anyhow::anyhow!("Failed to parse response: {}", e))?;
272
273 let content = response["choices"][0]["message"]["content"]
274 .as_str()
275 .unwrap_or("")
276 .to_string();
277
278 let finish_reason = response["choices"][0]["finish_reason"]
279 .as_str()
280 .map(|s| s.to_string());
281
282 Ok(InferenceResponse {
283 content,
284 finish_reason,
285 })
286 }
287
288 async fn generate_stream(
289 &self,
290 request: InferenceRequest,
291 ) -> anyhow::Result<Box<dyn futures_util::Stream<Item = Result<String, anyhow::Error>> + Send + Unpin>> {
292 use futures_util::StreamExt;
293
294 let url = self.completions_url();
295
296 let payload = serde_json::json!({
297 "model": "local-llm",
298 "messages": request.messages,
299 "max_tokens": request.max_tokens,
300 "temperature": request.temperature,
301 "stream": true,
302 });
303
304 let resp = self.http_client.post(&url)
305 .json(&payload)
306 .send()
307 .await
308 .map_err(|e| anyhow::anyhow!("Stream request failed: {}", e))?;
309
310 if !resp.status().is_success() {
311 let status = resp.status();
312 let body = resp.text().await.unwrap_or_default();
313 return Err(anyhow::anyhow!("Stream failed ({}): {}", status, body));
314 }
315
316 let byte_stream = resp.bytes_stream();
317
318 let sse_stream = async_stream::try_stream! {
319 let mut buffer = String::new();
320 futures_util::pin_mut!(byte_stream);
321
322 while let Some(chunk_result) = byte_stream.next().await {
323 let chunk = chunk_result.map_err(|e| anyhow::anyhow!("Stream read error: {}", e))?;
324 buffer.push_str(&String::from_utf8_lossy(&chunk));
325
326 while let Some(newline_pos) = buffer.find('\n') {
327 let line = buffer[..newline_pos].trim().to_string();
328 buffer = buffer[newline_pos + 1..].to_string();
329
330 if line.is_empty() || !line.starts_with("data: ") {
331 continue;
332 }
333
334 let data = &line[6..];
335 if data == "[DONE]" {
336 return;
337 }
338
339 yield format!("data: {}\n\n", data);
340 }
341 }
342 };
343
344 Ok(Box::new(Box::pin(sse_stream)))
345 }
346
347 async fn shutdown(&mut self) -> anyhow::Result<()> {
348 info!("Shutting down GGUF runtime");
349
350 if let Some(mut child) = self.server_process.take() {
351 #[cfg(unix)]
355 {
356 let exited_gracefully = Self::send_sigterm_and_wait(&mut child, 1);
359 if exited_gracefully {
360 info!("llama-server shut down gracefully after SIGTERM");
361 return Ok(());
362 }
363 info!("llama-server did not exit after SIGTERM — sending SIGKILL");
364 }
365
366 match child.kill() {
368 Ok(_) => {
369 info!("llama-server process killed");
370 let _ = child.wait();
374 }
375 Err(e) => {
376 warn!("Failed to kill llama-server (may have already exited): {}", e);
378 }
379 }
380 }
381
382 self.config = None;
383 self.base_url.clear();
384 Ok(())
385 }
386
387 fn metadata(&self) -> RuntimeMetadata {
388 RuntimeMetadata {
389 format: ModelFormat::GGUF,
390 runtime_name: "llama.cpp (llama-server)".to_string(),
391 version: "latest".to_string(),
392 supports_gpu: true,
393 supports_streaming: true,
394 }
395 }
396}
397
398impl Drop for GGUFRuntime {
399 fn drop(&mut self) {
400 if let Some(mut child) = self.server_process.take() {
401 let _ = child.kill();
406 }
407 }
408}