1use std::net::{SocketAddr, TcpListener, TcpStream};
2#[cfg(unix)]
3use std::os::unix::process::CommandExt;
4use std::path::Path;
5use std::process::{Child, ExitStatus};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use crate::error::BackendError;
10
11#[cfg(unix)]
12use nix::sys::signal::{kill, killpg, Signal};
13#[cfg(unix)]
14use nix::unistd::Pid;
15
16use super::args::{build_backend_args, validate_backend_extra_flags};
17use super::manager::{RunConfig, ServerManager};
18use super::runtime::validate_llamafile_runtime_path;
19
20const BACKEND_READY_TIMEOUT: Duration = Duration::from_secs(180);
21const BACKEND_READY_POLL_INTERVAL: Duration = Duration::from_secs(2);
22const BACKEND_PROPS_TIMEOUT: Duration = Duration::from_secs(5);
23const BACKEND_STOP_TIMEOUT: Duration = Duration::from_secs(10);
24const BACKEND_VRAM_CLEAR_DELAY: Duration = Duration::from_secs(3);
25
26#[derive(Debug, Clone, Copy)]
27pub(super) struct LifecycleOptions {
28 pub(super) ready_timeout: Duration,
29 pub(super) ready_poll_interval: Duration,
30 pub(super) stop_timeout: Duration,
31 pub(super) vram_clear_delay: Duration,
32}
33
34impl Default for LifecycleOptions {
35 fn default() -> Self {
36 Self {
37 ready_timeout: BACKEND_READY_TIMEOUT,
38 ready_poll_interval: BACKEND_READY_POLL_INTERVAL,
39 stop_timeout: BACKEND_STOP_TIMEOUT,
40 vram_clear_delay: BACKEND_VRAM_CLEAR_DELAY,
41 }
42 }
43}
44
45#[derive(Debug)]
46enum ManagedChildState {
47 Missing,
48 Running,
49 Exited(ExitStatus),
50}
51
52impl ServerManager {
53 #[allow(clippy::too_many_arguments)]
57 pub fn start(
58 &self,
59 model: &str,
60 gguf_path: &Path,
61 mode: &str,
62 extra_flags: &[String],
63 ctx_override: Option<i64>,
64 cache_type_k: Option<&str>,
65 cache_type_v: Option<&str>,
66 n_slots: Option<i64>,
67 kv_unified: bool,
68 ) -> Result<bool, BackendError> {
69 self.start_with_options(
70 model,
71 gguf_path,
72 mode,
73 extra_flags,
74 ctx_override,
75 cache_type_k,
76 cache_type_v,
77 n_slots,
78 kv_unified,
79 LifecycleOptions::default(),
80 )
81 }
82
83 #[allow(clippy::too_many_arguments)]
84 pub(super) fn start_with_options(
85 &self,
86 model: &str,
87 gguf_path: &Path,
88 mode: &str,
89 extra_flags: &[String],
90 ctx_override: Option<i64>,
91 cache_type_k: Option<&str>,
92 cache_type_v: Option<&str>,
93 n_slots: Option<i64>,
94 kv_unified: bool,
95 options: LifecycleOptions,
96 ) -> Result<bool, BackendError> {
97 let extra_flags =
98 validate_backend_extra_flags(extra_flags).map_err(|e| BackendError::new(0, e))?;
99 let new_config = RunConfig {
100 model: model.to_string(),
101 mode: mode.to_string(),
102 ctx_override,
103 extra_flags: extra_flags.clone(),
104 cache_type_k: cache_type_k.map(|s| s.to_string()),
105 cache_type_v: cache_type_v.map(|s| s.to_string()),
106 n_slots,
107 kv_unified,
108 };
109
110 let can_reuse = {
112 let guard = self
113 .current_config
114 .lock()
115 .map_err(|e| BackendError::new(0, e.to_string()))?;
116 guard.as_ref() == Some(&new_config)
117 };
118 if can_reuse
119 && (self.backend == "ollama"
120 || matches!(self.child_state()?, ManagedChildState::Running))
121 {
122 return Ok(false);
123 }
124
125 self.stop_with_options(options.stop_timeout, options.vram_clear_delay)?;
127
128 if self.backend == "ollama" {
129 let mut guard = self
130 .current_config
131 .lock()
132 .map_err(|e| BackendError::new(0, e.to_string()))?;
133 *guard = Some(new_config);
134 return Ok(false);
135 }
136
137 let binary = if self.backend == "llamafile" {
138 validate_llamafile_runtime_path(self.llamafile_runtime.as_deref().ok_or_else(
139 || BackendError::new(0, "llamafile backend requires llamafile_runtime"),
140 )?)?
141 } else {
142 Path::new("llama-server").to_path_buf()
143 };
144
145 self.ensure_backend_port_available()?;
146
147 let args = build_backend_args(
148 &self.backend,
149 self.port,
150 gguf_path,
151 mode,
152 &extra_flags,
153 ctx_override,
154 cache_type_k,
155 cache_type_v,
156 n_slots,
157 kv_unified,
158 )
159 .map_err(|e| BackendError::new(0, e))?;
160 let mut cmd = std::process::Command::new(&binary);
161 cmd.args(&args);
162 #[cfg(unix)]
163 cmd.process_group(0);
164
165 cmd.stdout(std::process::Stdio::null());
166 cmd.stderr(std::process::Stdio::null());
167
168 let child = cmd
169 .spawn()
170 .map_err(|e| BackendError::new(0, format!("Failed to start {:?}: {}", binary, e)))?;
171
172 {
173 let mut guard = self
174 .process
175 .lock()
176 .map_err(|e| BackendError::new(0, e.to_string()))?;
177 *guard = Some(child);
178 }
179
180 if let Err(err) = self.wait_until_ready(options.ready_timeout, options.ready_poll_interval)
181 {
182 let _ = self.stop_with_options(options.stop_timeout, Duration::ZERO);
183 return Err(err);
184 }
185
186 {
187 let mut guard = self
188 .current_config
189 .lock()
190 .map_err(|e| BackendError::new(0, e.to_string()))?;
191 *guard = Some(new_config);
192 }
193
194 Ok(true)
195 }
196
197 fn child_state(&self) -> Result<ManagedChildState, BackendError> {
198 let mut guard = self
199 .process
200 .lock()
201 .map_err(|e| BackendError::new(0, e.to_string()))?;
202 match guard.as_mut() {
203 Some(child) => match child
204 .try_wait()
205 .map_err(|e| BackendError::new(0, format!("Failed to poll backend: {}", e)))?
206 {
207 Some(status) => Ok(ManagedChildState::Exited(status)),
208 None => Ok(ManagedChildState::Running),
209 },
210 None => Ok(ManagedChildState::Missing),
211 }
212 }
213
214 fn ensure_backend_port_available(&self) -> Result<(), BackendError> {
215 let port: u16 = self
216 .port
217 .try_into()
218 .map_err(|_| BackendError::new(0, format!("Invalid backend port: {}", self.port)))?;
219 let addr = SocketAddr::from(([127, 0, 0, 1], port));
220
221 let mut last_err = None;
222 for i in 0..10 {
223 if i > 0 {
224 thread::sleep(Duration::from_millis(50));
225 }
226 if TcpStream::connect_timeout(&addr, Duration::from_millis(50)).is_ok() {
227 last_err = Some(BackendError::new(
228 0,
229 format!("Backend port {} is already accepting connections", port),
230 ));
231 continue;
232 }
233 match TcpListener::bind(("127.0.0.1", port)) {
234 Ok(listener) => {
235 drop(listener);
236 return Ok(());
237 }
238 Err(e) => {
239 last_err = Some(BackendError::new(
240 0,
241 format!("Backend port {} is not available on 127.0.0.1: {}", port, e),
242 ));
243 }
244 }
245 }
246 Err(last_err.unwrap_or_else(|| {
247 BackendError::new(0, format!("Backend port {} is not available", port))
248 }))
249 }
250
251 pub(super) fn wait_until_ready(
252 &self,
253 timeout: Duration,
254 poll_interval: Duration,
255 ) -> Result<(), BackendError> {
256 let url = format!("http://127.0.0.1:{}/props", self.port);
257 let rt = tokio::runtime::Builder::new_current_thread()
258 .enable_io()
259 .enable_time()
260 .build()
261 .map_err(|e| BackendError::new(0, e.to_string()))?;
262 let client = reqwest::Client::new();
263 let deadline = Instant::now() + timeout;
264
265 loop {
266 match self.child_state()? {
267 ManagedChildState::Running => {}
268 ManagedChildState::Exited(status) => {
269 return Err(BackendError::new(
270 0,
271 format!("Backend exited before readiness: {}", status),
272 ));
273 }
274 ManagedChildState::Missing => {
275 return Err(BackendError::new(
276 0,
277 "Backend process disappeared before readiness",
278 ));
279 }
280 }
281
282 let ready = rt.block_on(async {
283 match client.get(&url).timeout(BACKEND_PROPS_TIMEOUT).send().await {
284 Ok(resp) if resp.status().is_success() => resp
285 .json::<serde_json::Value>()
286 .await
287 .ok()
288 .and_then(|json| {
289 json.get("default_generation_settings")
290 .and_then(|settings| settings.as_object())
291 .map(|_| true)
292 })
293 .unwrap_or(false),
294 _ => false,
295 }
296 });
297 if ready {
298 return Ok(());
299 }
300
301 if Instant::now() >= deadline {
302 return Err(BackendError::new(
303 0,
304 format!(
305 "Backend failed to become ready within {}s",
306 timeout.as_secs()
307 ),
308 ));
309 }
310 thread::sleep(poll_interval);
311 }
312 }
313
314 pub fn stop(&self) -> Result<(), BackendError> {
318 self.stop_with_options(BACKEND_STOP_TIMEOUT, BACKEND_VRAM_CLEAR_DELAY)
319 }
320
321 pub(super) fn stop_with_options(
322 &self,
323 stop_timeout: Duration,
324 vram_clear_delay: Duration,
325 ) -> Result<(), BackendError> {
326 if self.backend == "ollama" {
327 if let Ok(guard) = self.current_config.lock() {
328 if let Some(ref cfg) = *guard {
329 let _ = std::process::Command::new("ollama")
330 .arg("stop")
331 .arg(&cfg.model)
332 .output();
333 }
334 }
335 if let Ok(mut g) = self.current_config.lock() {
336 *g = None;
337 }
338 return Ok(());
339 }
340
341 let mut process_guard = self
342 .process
343 .lock()
344 .map_err(|e| BackendError::new(0, e.to_string()))?;
345
346 if let Some(ref mut child) = *process_guard {
347 let _ = terminate_child(child);
348 match wait_for_child_exit(child, stop_timeout) {
349 Ok(Some(_)) => {}
350 Ok(None) => {
351 let _ = kill_child_now(child);
352 let _ = child.wait();
353 }
354 Err(_) => {
355 let _ = kill_child_now(child);
356 let _ = child.wait();
357 }
358 }
359 }
360 *process_guard = None;
361
362 if let Ok(mut g) = self.current_config.lock() {
363 *g = None;
364 }
365 if let Ok(mut g) = self.last_context.lock() {
366 *g = None;
367 }
368
369 if !vram_clear_delay.is_zero() {
370 thread::sleep(vram_clear_delay);
371 }
372
373 Ok(())
374 }
375}
376
377fn wait_for_child_exit(
378 child: &mut Child,
379 timeout: Duration,
380) -> std::io::Result<Option<ExitStatus>> {
381 let deadline = Instant::now() + timeout;
382 loop {
383 if let Some(status) = child.try_wait()? {
384 return Ok(Some(status));
385 }
386 if Instant::now() >= deadline {
387 return Ok(None);
388 }
389 thread::sleep(Duration::from_millis(50));
390 }
391}
392
393fn terminate_child(child: &mut Child) -> std::io::Result<()> {
394 #[cfg(unix)]
395 {
396 let pid = Pid::from_raw(child.id() as i32);
397 let direct = kill(pid, Signal::SIGTERM);
398 let group = killpg(pid, Signal::SIGTERM);
399 if direct.is_err() && group.is_err() {
400 return child.kill();
401 }
402 Ok(())
403 }
404 #[cfg(not(unix))]
405 {
406 child.kill()
407 }
408}
409
410fn kill_child_now(child: &mut Child) -> std::io::Result<()> {
411 #[cfg(unix)]
412 {
413 if killpg(Pid::from_raw(child.id() as i32), Signal::SIGKILL).is_err() {
414 return child.kill();
415 }
416 Ok(())
417 }
418 #[cfg(not(unix))]
419 {
420 child.kill()
421 }
422}
423
424impl Drop for ServerManager {
425 fn drop(&mut self) {
426 if let Ok(mut guard) = self.process.lock() {
427 if let Some(ref mut child) = *guard {
428 let _ = terminate_child(child);
429 let _ = wait_for_child_exit(child, Duration::from_millis(200));
430 let _ = kill_child_now(child);
431 let _ = child.wait();
432 }
433 }
434 }
435}