Skip to main content

greentic_operator/services/
runner.rs

1use std::env;
2use std::fs::OpenOptions;
3use std::path::{Path, PathBuf};
4use std::process::{Command, Stdio};
5
6use sysinfo::{Pid, ProcessesToUpdate, System};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ProcessStatus {
10    Running,
11    NotRunning,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ServiceState {
16    Started,
17    AlreadyRunning,
18    Stopped,
19    NotRunning,
20}
21
22pub fn start_process(
23    command: &str,
24    args: &[String],
25    envs: &[(&str, String)],
26    pid_path: &Path,
27    log_path: &Path,
28    cwd: Option<&Path>,
29) -> anyhow::Result<ServiceState> {
30    if let Some(pid) = read_pid(pid_path)? {
31        if is_process_running(pid)? {
32            if should_restart_for_command(pid, command)? {
33                kill_process(pid)?;
34                let _ = std::fs::remove_file(pid_path);
35            } else {
36                return Ok(ServiceState::AlreadyRunning);
37            }
38        } else {
39            let _ = std::fs::remove_file(pid_path);
40        }
41    }
42
43    if let Some(parent) = pid_path.parent() {
44        ensure_dir_logged(parent, "pid directory")?;
45    }
46    if let Some(parent) = log_path.parent() {
47        ensure_dir_logged(parent, "log directory")?;
48    }
49    if !log_path.exists() {
50        // Ensure a log file exists before the child starts so early failures are captured.
51        std::fs::File::create(log_path)?;
52    }
53
54    let log_file = OpenOptions::new()
55        .create(true)
56        .append(true)
57        .open(log_path)?;
58    let log_file_err = log_file.try_clone()?;
59
60    let mut command = Command::new(command);
61    command.args(args);
62    command.envs(envs.iter().map(|(key, value)| (*key, value)));
63    if let Some(cwd) = cwd {
64        command.current_dir(cwd);
65    }
66    #[cfg(unix)]
67    {
68        use std::os::unix::process::CommandExt;
69        unsafe {
70            command.pre_exec(|| {
71                if libc::setpgid(0, 0) != 0 {
72                    let err = std::io::Error::last_os_error();
73                    if err.raw_os_error() == Some(libc::EPERM) {
74                        return Ok(());
75                    }
76                    return Err(err);
77                }
78                Ok(())
79            });
80        }
81    }
82    let child = command
83        .stdout(Stdio::from(log_file))
84        .stderr(Stdio::from(log_file_err))
85        .spawn()?;
86
87    let pid = child.id();
88    std::fs::write(pid_path, pid.to_string())?;
89
90    Ok(ServiceState::Started)
91}
92
93pub fn stop_process(pid_path: &Path) -> anyhow::Result<ServiceState> {
94    let pid = match read_pid(pid_path)? {
95        Some(pid) => pid,
96        None => return Ok(ServiceState::NotRunning),
97    };
98
99    if !is_pid_running(pid_path)? {
100        let _ = std::fs::remove_file(pid_path);
101        return Ok(ServiceState::NotRunning);
102    }
103
104    kill_process(pid)?;
105    let _ = std::fs::remove_file(pid_path);
106    Ok(ServiceState::Stopped)
107}
108
109pub fn process_status(pid_path: &Path) -> anyhow::Result<ProcessStatus> {
110    if is_pid_running(pid_path)? {
111        Ok(ProcessStatus::Running)
112    } else {
113        Ok(ProcessStatus::NotRunning)
114    }
115}
116
117pub fn tail_log(path: &Path) -> anyhow::Result<()> {
118    if !path.exists() {
119        return Err(anyhow::anyhow!(
120            "Log file does not exist: {}",
121            path.display()
122        ));
123    }
124
125    #[cfg(unix)]
126    {
127        let status = Command::new("tail")
128            .args(["-f", &path.display().to_string()])
129            .status()?;
130        if !status.success() {
131            return Err(anyhow::anyhow!("tail exited with {}", status));
132        }
133        Ok(())
134    }
135
136    #[cfg(windows)]
137    {
138        let contents = std::fs::read_to_string(path)?;
139        println!("{contents}");
140        Ok(())
141    }
142}
143
144fn ensure_dir_logged(path: &Path, description: &str) -> anyhow::Result<()> {
145    if demo_debug_enabled() {
146        println!("demo debug: ensuring {description} at {}", path.display());
147    }
148    match std::fs::create_dir_all(path) {
149        Ok(()) => {
150            if demo_debug_enabled() {
151                println!("demo debug: ensured {description}");
152            }
153            Ok(())
154        }
155        Err(err) => {
156            if demo_debug_enabled() {
157                eprintln!(
158                    "demo debug: failed to create {description} at {}: {err}",
159                    path.display()
160                );
161            }
162            Err(err.into())
163        }
164    }
165}
166
167fn demo_debug_enabled() -> bool {
168    matches!(
169        env::var("GREENTIC_OPERATOR_DEMO_DEBUG").as_deref(),
170        Ok("1") | Ok("true") | Ok("yes")
171    )
172}
173
174fn is_pid_running(pid_path: &Path) -> anyhow::Result<bool> {
175    let pid = match read_pid(pid_path)? {
176        Some(pid) => pid,
177        None => return Ok(false),
178    };
179    is_process_running(pid)
180}
181
182fn read_pid(pid_path: &Path) -> anyhow::Result<Option<u32>> {
183    if !pid_path.exists() {
184        return Ok(None);
185    }
186    let contents = std::fs::read_to_string(pid_path)?;
187    let trimmed = contents.trim();
188    if trimmed.is_empty() {
189        return Ok(None);
190    }
191    let pid: u32 = trimmed.parse()?;
192    Ok(Some(pid))
193}
194
195fn should_restart_for_command(pid: u32, command: &str) -> anyhow::Result<bool> {
196    let command_path = Path::new(command);
197    if !command_path.is_absolute() {
198        return Ok(false);
199    }
200    let command_path =
201        std::fs::canonicalize(command_path).unwrap_or_else(|_| command_path.to_path_buf());
202    let Some(proc_path) = process_exe(pid) else {
203        return Ok(false);
204    };
205    let proc_path = std::fs::canonicalize(&proc_path).unwrap_or(proc_path);
206    Ok(proc_path != command_path)
207}
208
209fn process_exe(pid: u32) -> Option<PathBuf> {
210    let mut system = System::new();
211    let pid = Pid::from_u32(pid);
212    system.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
213    system
214        .process(pid)
215        .and_then(|process| process.exe())
216        .map(|path| path.to_path_buf())
217}
218
219#[cfg(unix)]
220fn is_process_running(pid: u32) -> anyhow::Result<bool> {
221    let result = unsafe { libc::kill(pid as i32, 0) };
222    if result == 0 {
223        return Ok(true);
224    }
225    let err = std::io::Error::last_os_error();
226    if err.raw_os_error() == Some(libc::ESRCH) {
227        Ok(false)
228    } else {
229        Err(err.into())
230    }
231}
232
233#[cfg(unix)]
234fn kill_process(pid: u32) -> anyhow::Result<()> {
235    let pid = pid as i32;
236    let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
237    if result == 0 {
238        return Ok(());
239    }
240    let err = std::io::Error::last_os_error();
241    if err.raw_os_error() == Some(libc::ESRCH) {
242        return Ok(());
243    }
244    let fallback = unsafe { libc::kill(pid, libc::SIGTERM) };
245    if fallback == 0 {
246        Ok(())
247    } else {
248        Err(std::io::Error::last_os_error().into())
249    }
250}
251
252#[cfg(windows)]
253fn is_process_running(pid: u32) -> anyhow::Result<bool> {
254    let output = Command::new("tasklist")
255        .args(["/FI", &format!("PID eq {pid}")])
256        .output()?;
257    let stdout = String::from_utf8_lossy(&output.stdout);
258    Ok(stdout.contains(&pid.to_string()))
259}
260
261#[cfg(windows)]
262fn kill_process(pid: u32) -> anyhow::Result<()> {
263    let status = Command::new("taskkill")
264        .args(["/PID", &pid.to_string(), "/T", "/F"])
265        .status()?;
266    if status.success() {
267        Ok(())
268    } else {
269        Err(anyhow::anyhow!("taskkill failed for pid {}", pid))
270    }
271}
272
273pub fn log_path(root: &Path, name: &str) -> PathBuf {
274    root.join("logs").join(format!("{name}.log"))
275}
276
277pub fn pid_path(root: &Path, name: &str) -> PathBuf {
278    root.join("state").join("pids").join(format!("{name}.pid"))
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284
285    #[cfg(unix)]
286    #[test]
287    fn start_and_stop_process() {
288        let temp = tempfile::tempdir().unwrap();
289        let pid = pid_path(temp.path(), "sleep");
290        let log = log_path(temp.path(), "sleep");
291        assert_eq!(log, temp.path().join("logs").join("sleep.log"));
292        let args = vec!["1".to_string()];
293        let envs: Vec<(&str, String)> = Vec::new();
294
295        let state = start_process("sleep", &args, &envs, &pid, &log, None).unwrap();
296        assert!(matches!(
297            state,
298            ServiceState::Started | ServiceState::AlreadyRunning
299        ));
300
301        std::thread::sleep(std::time::Duration::from_millis(50));
302        assert_eq!(process_status(&pid).unwrap(), ProcessStatus::Running);
303
304        let stop = stop_process(&pid).unwrap();
305        assert!(matches!(
306            stop,
307            ServiceState::Stopped | ServiceState::NotRunning
308        ));
309    }
310}