Skip to main content

forge_guardrails/server/
lifecycle.rs

1use std::net::{SocketAddr, TcpListener, TcpStream};
2#[cfg(unix)]
3use std::os::unix::process::CommandExt;
4use std::path::Path;
5use std::process::{Child, ExitStatus};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use crate::error::BackendError;
10
11#[cfg(unix)]
12use nix::sys::signal::{kill, killpg, Signal};
13#[cfg(unix)]
14use nix::unistd::Pid;
15
16use super::args::{build_backend_args, validate_backend_extra_flags};
17use super::manager::{RunConfig, ServerManager};
18use super::runtime::validate_llamafile_runtime_path;
19
20const BACKEND_READY_TIMEOUT: Duration = Duration::from_secs(180);
21const BACKEND_READY_POLL_INTERVAL: Duration = Duration::from_secs(2);
22const BACKEND_PROPS_TIMEOUT: Duration = Duration::from_secs(5);
23const BACKEND_STOP_TIMEOUT: Duration = Duration::from_secs(10);
24const BACKEND_VRAM_CLEAR_DELAY: Duration = Duration::from_secs(3);
25
26#[derive(Debug, Clone, Copy)]
27pub(super) struct LifecycleOptions {
28    pub(super) ready_timeout: Duration,
29    pub(super) ready_poll_interval: Duration,
30    pub(super) stop_timeout: Duration,
31    pub(super) vram_clear_delay: Duration,
32}
33
34impl Default for LifecycleOptions {
35    fn default() -> Self {
36        Self {
37            ready_timeout: BACKEND_READY_TIMEOUT,
38            ready_poll_interval: BACKEND_READY_POLL_INTERVAL,
39            stop_timeout: BACKEND_STOP_TIMEOUT,
40            vram_clear_delay: BACKEND_VRAM_CLEAR_DELAY,
41        }
42    }
43}
44
45#[derive(Debug)]
46enum ManagedChildState {
47    Missing,
48    Running,
49    Exited(ExitStatus),
50}
51
52impl ServerManager {
53    /// Start the backend process. No-op for ollama.
54    /// For llamaserver/llamafile, spawns a subprocess with the given config.
55    /// Returns true if a new process was spawned, false if reused.
56    #[allow(clippy::too_many_arguments)]
57    pub fn start(
58        &self,
59        model: &str,
60        gguf_path: &Path,
61        mode: &str,
62        extra_flags: &[String],
63        ctx_override: Option<i64>,
64        cache_type_k: Option<&str>,
65        cache_type_v: Option<&str>,
66        n_slots: Option<i64>,
67        kv_unified: bool,
68    ) -> Result<bool, BackendError> {
69        self.start_with_options(
70            model,
71            gguf_path,
72            mode,
73            extra_flags,
74            ctx_override,
75            cache_type_k,
76            cache_type_v,
77            n_slots,
78            kv_unified,
79            LifecycleOptions::default(),
80        )
81    }
82
83    #[allow(clippy::too_many_arguments)]
84    pub(super) fn start_with_options(
85        &self,
86        model: &str,
87        gguf_path: &Path,
88        mode: &str,
89        extra_flags: &[String],
90        ctx_override: Option<i64>,
91        cache_type_k: Option<&str>,
92        cache_type_v: Option<&str>,
93        n_slots: Option<i64>,
94        kv_unified: bool,
95        options: LifecycleOptions,
96    ) -> Result<bool, BackendError> {
97        let extra_flags =
98            validate_backend_extra_flags(extra_flags).map_err(|e| BackendError::new(0, e))?;
99        let new_config = RunConfig {
100            model: model.to_string(),
101            mode: mode.to_string(),
102            ctx_override,
103            extra_flags: extra_flags.clone(),
104            cache_type_k: cache_type_k.map(|s| s.to_string()),
105            cache_type_v: cache_type_v.map(|s| s.to_string()),
106            n_slots,
107            kv_unified,
108        };
109
110        // Check if we can reuse the running process.
111        let can_reuse = {
112            let guard = self
113                .current_config
114                .lock()
115                .map_err(|e| BackendError::new(0, e.to_string()))?;
116            guard.as_ref() == Some(&new_config)
117        };
118        if can_reuse
119            && (self.backend == "ollama"
120                || matches!(self.child_state()?, ManagedChildState::Running))
121        {
122            return Ok(false);
123        }
124
125        // Stop existing if running.
126        self.stop_with_options(options.stop_timeout, options.vram_clear_delay)?;
127
128        if self.backend == "ollama" {
129            let mut guard = self
130                .current_config
131                .lock()
132                .map_err(|e| BackendError::new(0, e.to_string()))?;
133            *guard = Some(new_config);
134            return Ok(false);
135        }
136
137        let binary = if self.backend == "llamafile" {
138            validate_llamafile_runtime_path(self.llamafile_runtime.as_deref().ok_or_else(
139                || BackendError::new(0, "llamafile backend requires llamafile_runtime"),
140            )?)?
141        } else {
142            Path::new("llama-server").to_path_buf()
143        };
144
145        self.ensure_backend_port_available()?;
146
147        let args = build_backend_args(
148            &self.backend,
149            self.port,
150            gguf_path,
151            mode,
152            &extra_flags,
153            ctx_override,
154            cache_type_k,
155            cache_type_v,
156            n_slots,
157            kv_unified,
158        )
159        .map_err(|e| BackendError::new(0, e))?;
160        let mut cmd = std::process::Command::new(&binary);
161        cmd.args(&args);
162        #[cfg(unix)]
163        cmd.process_group(0);
164
165        cmd.stdout(std::process::Stdio::null());
166        cmd.stderr(std::process::Stdio::null());
167
168        let child = cmd
169            .spawn()
170            .map_err(|e| BackendError::new(0, format!("Failed to start {:?}: {}", binary, e)))?;
171
172        {
173            let mut guard = self
174                .process
175                .lock()
176                .map_err(|e| BackendError::new(0, e.to_string()))?;
177            *guard = Some(child);
178        }
179
180        if let Err(err) = self.wait_until_ready(options.ready_timeout, options.ready_poll_interval)
181        {
182            let _ = self.stop_with_options(options.stop_timeout, Duration::ZERO);
183            return Err(err);
184        }
185
186        {
187            let mut guard = self
188                .current_config
189                .lock()
190                .map_err(|e| BackendError::new(0, e.to_string()))?;
191            *guard = Some(new_config);
192        }
193
194        Ok(true)
195    }
196
197    fn child_state(&self) -> Result<ManagedChildState, BackendError> {
198        let mut guard = self
199            .process
200            .lock()
201            .map_err(|e| BackendError::new(0, e.to_string()))?;
202        match guard.as_mut() {
203            Some(child) => match child
204                .try_wait()
205                .map_err(|e| BackendError::new(0, format!("Failed to poll backend: {}", e)))?
206            {
207                Some(status) => Ok(ManagedChildState::Exited(status)),
208                None => Ok(ManagedChildState::Running),
209            },
210            None => Ok(ManagedChildState::Missing),
211        }
212    }
213
214    fn ensure_backend_port_available(&self) -> Result<(), BackendError> {
215        let port: u16 = self
216            .port
217            .try_into()
218            .map_err(|_| BackendError::new(0, format!("Invalid backend port: {}", self.port)))?;
219        let addr = SocketAddr::from(([127, 0, 0, 1], port));
220
221        let mut last_err = None;
222        for i in 0..10 {
223            if i > 0 {
224                thread::sleep(Duration::from_millis(50));
225            }
226            if TcpStream::connect_timeout(&addr, Duration::from_millis(50)).is_ok() {
227                last_err = Some(BackendError::new(
228                    0,
229                    format!("Backend port {} is already accepting connections", port),
230                ));
231                continue;
232            }
233            match TcpListener::bind(("127.0.0.1", port)) {
234                Ok(listener) => {
235                    drop(listener);
236                    return Ok(());
237                }
238                Err(e) => {
239                    last_err = Some(BackendError::new(
240                        0,
241                        format!("Backend port {} is not available on 127.0.0.1: {}", port, e),
242                    ));
243                }
244            }
245        }
246        Err(last_err.unwrap_or_else(|| {
247            BackendError::new(0, format!("Backend port {} is not available", port))
248        }))
249    }
250
251    pub(super) fn wait_until_ready(
252        &self,
253        timeout: Duration,
254        poll_interval: Duration,
255    ) -> Result<(), BackendError> {
256        let url = format!("http://127.0.0.1:{}/props", self.port);
257        let rt = tokio::runtime::Builder::new_current_thread()
258            .enable_io()
259            .enable_time()
260            .build()
261            .map_err(|e| BackendError::new(0, e.to_string()))?;
262        let client = reqwest::Client::new();
263        let deadline = Instant::now() + timeout;
264
265        loop {
266            match self.child_state()? {
267                ManagedChildState::Running => {}
268                ManagedChildState::Exited(status) => {
269                    return Err(BackendError::new(
270                        0,
271                        format!("Backend exited before readiness: {}", status),
272                    ));
273                }
274                ManagedChildState::Missing => {
275                    return Err(BackendError::new(
276                        0,
277                        "Backend process disappeared before readiness",
278                    ));
279                }
280            }
281
282            let ready = rt.block_on(async {
283                match client.get(&url).timeout(BACKEND_PROPS_TIMEOUT).send().await {
284                    Ok(resp) if resp.status().is_success() => resp
285                        .json::<serde_json::Value>()
286                        .await
287                        .ok()
288                        .and_then(|json| {
289                            json.get("default_generation_settings")
290                                .and_then(|settings| settings.as_object())
291                                .map(|_| true)
292                        })
293                        .unwrap_or(false),
294                    _ => false,
295                }
296            });
297            if ready {
298                return Ok(());
299            }
300
301            if Instant::now() >= deadline {
302                return Err(BackendError::new(
303                    0,
304                    format!(
305                        "Backend failed to become ready within {}s",
306                        timeout.as_secs()
307                    ),
308                ));
309            }
310            thread::sleep(poll_interval);
311        }
312    }
313
314    /// Stop the backend process. For ollama, runs 'ollama stop'.
315    /// Waits up to 10s for termination, then forces kill.
316    /// Sleeps 3s after termination for VRAM clearing.
317    pub fn stop(&self) -> Result<(), BackendError> {
318        self.stop_with_options(BACKEND_STOP_TIMEOUT, BACKEND_VRAM_CLEAR_DELAY)
319    }
320
321    pub(super) fn stop_with_options(
322        &self,
323        stop_timeout: Duration,
324        vram_clear_delay: Duration,
325    ) -> Result<(), BackendError> {
326        if self.backend == "ollama" {
327            if let Ok(guard) = self.current_config.lock() {
328                if let Some(ref cfg) = *guard {
329                    let _ = std::process::Command::new("ollama")
330                        .arg("stop")
331                        .arg(&cfg.model)
332                        .output();
333                }
334            }
335            if let Ok(mut g) = self.current_config.lock() {
336                *g = None;
337            }
338            return Ok(());
339        }
340
341        let mut process_guard = self
342            .process
343            .lock()
344            .map_err(|e| BackendError::new(0, e.to_string()))?;
345
346        if let Some(ref mut child) = *process_guard {
347            let _ = terminate_child(child);
348            match wait_for_child_exit(child, stop_timeout) {
349                Ok(Some(_)) => {}
350                Ok(None) => {
351                    let _ = kill_child_now(child);
352                    let _ = child.wait();
353                }
354                Err(_) => {
355                    let _ = kill_child_now(child);
356                    let _ = child.wait();
357                }
358            }
359        }
360        *process_guard = None;
361
362        if let Ok(mut g) = self.current_config.lock() {
363            *g = None;
364        }
365        if let Ok(mut g) = self.last_context.lock() {
366            *g = None;
367        }
368
369        if !vram_clear_delay.is_zero() {
370            thread::sleep(vram_clear_delay);
371        }
372
373        Ok(())
374    }
375}
376
377fn wait_for_child_exit(
378    child: &mut Child,
379    timeout: Duration,
380) -> std::io::Result<Option<ExitStatus>> {
381    let deadline = Instant::now() + timeout;
382    loop {
383        if let Some(status) = child.try_wait()? {
384            return Ok(Some(status));
385        }
386        if Instant::now() >= deadline {
387            return Ok(None);
388        }
389        thread::sleep(Duration::from_millis(50));
390    }
391}
392
393fn terminate_child(child: &mut Child) -> std::io::Result<()> {
394    #[cfg(unix)]
395    {
396        let pid = Pid::from_raw(child.id() as i32);
397        let direct = kill(pid, Signal::SIGTERM);
398        let group = killpg(pid, Signal::SIGTERM);
399        if direct.is_err() && group.is_err() {
400            return child.kill();
401        }
402        Ok(())
403    }
404    #[cfg(not(unix))]
405    {
406        child.kill()
407    }
408}
409
410fn kill_child_now(child: &mut Child) -> std::io::Result<()> {
411    #[cfg(unix)]
412    {
413        if killpg(Pid::from_raw(child.id() as i32), Signal::SIGKILL).is_err() {
414            return child.kill();
415        }
416        Ok(())
417    }
418    #[cfg(not(unix))]
419    {
420        child.kill()
421    }
422}
423
424impl Drop for ServerManager {
425    fn drop(&mut self) {
426        if let Ok(mut guard) = self.process.lock() {
427            if let Some(ref mut child) = *guard {
428                let _ = terminate_child(child);
429                let _ = wait_for_child_exit(child, Duration::from_millis(200));
430                let _ = kill_child_now(child);
431                let _ = child.wait();
432            }
433        }
434    }
435}