greentic_operator/services/
runner.rs1use std::env;
2use std::fs::OpenOptions;
3use std::path::{Path, PathBuf};
4use std::process::{Command, Stdio};
5
6use sysinfo::{Pid, ProcessesToUpdate, System};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ProcessStatus {
10 Running,
11 NotRunning,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ServiceState {
16 Started,
17 AlreadyRunning,
18 Stopped,
19 NotRunning,
20}
21
22pub fn start_process(
23 command: &str,
24 args: &[String],
25 envs: &[(&str, String)],
26 pid_path: &Path,
27 log_path: &Path,
28 cwd: Option<&Path>,
29) -> anyhow::Result<ServiceState> {
30 if let Some(pid) = read_pid(pid_path)? {
31 if is_process_running(pid)? {
32 if should_restart_for_command(pid, command)? {
33 kill_process(pid)?;
34 let _ = std::fs::remove_file(pid_path);
35 } else {
36 return Ok(ServiceState::AlreadyRunning);
37 }
38 } else {
39 let _ = std::fs::remove_file(pid_path);
40 }
41 }
42
43 if let Some(parent) = pid_path.parent() {
44 ensure_dir_logged(parent, "pid directory")?;
45 }
46 if let Some(parent) = log_path.parent() {
47 ensure_dir_logged(parent, "log directory")?;
48 }
49 if !log_path.exists() {
50 std::fs::File::create(log_path)?;
52 }
53
54 let log_file = OpenOptions::new()
55 .create(true)
56 .append(true)
57 .open(log_path)?;
58 let log_file_err = log_file.try_clone()?;
59
60 let mut command = Command::new(command);
61 command.args(args);
62 command.envs(envs.iter().map(|(key, value)| (*key, value)));
63 if let Some(cwd) = cwd {
64 command.current_dir(cwd);
65 }
66 #[cfg(unix)]
67 {
68 use std::os::unix::process::CommandExt;
69 unsafe {
70 command.pre_exec(|| {
71 if libc::setpgid(0, 0) != 0 {
72 let err = std::io::Error::last_os_error();
73 if err.raw_os_error() == Some(libc::EPERM) {
74 return Ok(());
75 }
76 return Err(err);
77 }
78 Ok(())
79 });
80 }
81 }
82 let child = command
83 .stdout(Stdio::from(log_file))
84 .stderr(Stdio::from(log_file_err))
85 .spawn()?;
86
87 let pid = child.id();
88 std::fs::write(pid_path, pid.to_string())?;
89
90 Ok(ServiceState::Started)
91}
92
93pub fn stop_process(pid_path: &Path) -> anyhow::Result<ServiceState> {
94 let pid = match read_pid(pid_path)? {
95 Some(pid) => pid,
96 None => return Ok(ServiceState::NotRunning),
97 };
98
99 if !is_pid_running(pid_path)? {
100 let _ = std::fs::remove_file(pid_path);
101 return Ok(ServiceState::NotRunning);
102 }
103
104 kill_process(pid)?;
105 let _ = std::fs::remove_file(pid_path);
106 Ok(ServiceState::Stopped)
107}
108
109pub fn process_status(pid_path: &Path) -> anyhow::Result<ProcessStatus> {
110 if is_pid_running(pid_path)? {
111 Ok(ProcessStatus::Running)
112 } else {
113 Ok(ProcessStatus::NotRunning)
114 }
115}
116
117pub fn tail_log(path: &Path) -> anyhow::Result<()> {
118 if !path.exists() {
119 return Err(anyhow::anyhow!(
120 "Log file does not exist: {}",
121 path.display()
122 ));
123 }
124
125 #[cfg(unix)]
126 {
127 let status = Command::new("tail")
128 .args(["-f", &path.display().to_string()])
129 .status()?;
130 if !status.success() {
131 return Err(anyhow::anyhow!("tail exited with {}", status));
132 }
133 Ok(())
134 }
135
136 #[cfg(windows)]
137 {
138 let contents = std::fs::read_to_string(path)?;
139 println!("{contents}");
140 Ok(())
141 }
142}
143
144fn ensure_dir_logged(path: &Path, description: &str) -> anyhow::Result<()> {
145 if demo_debug_enabled() {
146 println!("demo debug: ensuring {description} at {}", path.display());
147 }
148 match std::fs::create_dir_all(path) {
149 Ok(()) => {
150 if demo_debug_enabled() {
151 println!("demo debug: ensured {description}");
152 }
153 Ok(())
154 }
155 Err(err) => {
156 if demo_debug_enabled() {
157 eprintln!(
158 "demo debug: failed to create {description} at {}: {err}",
159 path.display()
160 );
161 }
162 Err(err.into())
163 }
164 }
165}
166
167fn demo_debug_enabled() -> bool {
168 matches!(
169 env::var("GREENTIC_OPERATOR_DEMO_DEBUG").as_deref(),
170 Ok("1") | Ok("true") | Ok("yes")
171 )
172}
173
174fn is_pid_running(pid_path: &Path) -> anyhow::Result<bool> {
175 let pid = match read_pid(pid_path)? {
176 Some(pid) => pid,
177 None => return Ok(false),
178 };
179 is_process_running(pid)
180}
181
182fn read_pid(pid_path: &Path) -> anyhow::Result<Option<u32>> {
183 if !pid_path.exists() {
184 return Ok(None);
185 }
186 let contents = std::fs::read_to_string(pid_path)?;
187 let trimmed = contents.trim();
188 if trimmed.is_empty() {
189 return Ok(None);
190 }
191 let pid: u32 = trimmed.parse()?;
192 Ok(Some(pid))
193}
194
195fn should_restart_for_command(pid: u32, command: &str) -> anyhow::Result<bool> {
196 let command_path = Path::new(command);
197 if !command_path.is_absolute() {
198 return Ok(false);
199 }
200 let command_path =
201 std::fs::canonicalize(command_path).unwrap_or_else(|_| command_path.to_path_buf());
202 let Some(proc_path) = process_exe(pid) else {
203 return Ok(false);
204 };
205 let proc_path = std::fs::canonicalize(&proc_path).unwrap_or(proc_path);
206 Ok(proc_path != command_path)
207}
208
209fn process_exe(pid: u32) -> Option<PathBuf> {
210 let mut system = System::new();
211 let pid = Pid::from_u32(pid);
212 system.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
213 system
214 .process(pid)
215 .and_then(|process| process.exe())
216 .map(|path| path.to_path_buf())
217}
218
219#[cfg(unix)]
220fn is_process_running(pid: u32) -> anyhow::Result<bool> {
221 let result = unsafe { libc::kill(pid as i32, 0) };
222 if result == 0 {
223 return Ok(true);
224 }
225 let err = std::io::Error::last_os_error();
226 if err.raw_os_error() == Some(libc::ESRCH) {
227 Ok(false)
228 } else {
229 Err(err.into())
230 }
231}
232
233#[cfg(unix)]
234fn kill_process(pid: u32) -> anyhow::Result<()> {
235 let pid = pid as i32;
236 let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
237 if result == 0 {
238 return Ok(());
239 }
240 let err = std::io::Error::last_os_error();
241 if err.raw_os_error() == Some(libc::ESRCH) {
242 return Ok(());
243 }
244 let fallback = unsafe { libc::kill(pid, libc::SIGTERM) };
245 if fallback == 0 {
246 Ok(())
247 } else {
248 Err(std::io::Error::last_os_error().into())
249 }
250}
251
252#[cfg(windows)]
253fn is_process_running(pid: u32) -> anyhow::Result<bool> {
254 let output = Command::new("tasklist")
255 .args(["/FI", &format!("PID eq {pid}")])
256 .output()?;
257 let stdout = String::from_utf8_lossy(&output.stdout);
258 Ok(stdout.contains(&pid.to_string()))
259}
260
261#[cfg(windows)]
262fn kill_process(pid: u32) -> anyhow::Result<()> {
263 let status = Command::new("taskkill")
264 .args(["/PID", &pid.to_string(), "/T", "/F"])
265 .status()?;
266 if status.success() {
267 Ok(())
268 } else {
269 Err(anyhow::anyhow!("taskkill failed for pid {}", pid))
270 }
271}
272
273pub fn log_path(root: &Path, name: &str) -> PathBuf {
274 root.join("logs").join(format!("{name}.log"))
275}
276
277pub fn pid_path(root: &Path, name: &str) -> PathBuf {
278 root.join("state").join("pids").join(format!("{name}.pid"))
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284
285 #[cfg(unix)]
286 #[test]
287 fn start_and_stop_process() {
288 let temp = tempfile::tempdir().unwrap();
289 let pid = pid_path(temp.path(), "sleep");
290 let log = log_path(temp.path(), "sleep");
291 assert_eq!(log, temp.path().join("logs").join("sleep.log"));
292 let args = vec!["1".to_string()];
293 let envs: Vec<(&str, String)> = Vec::new();
294
295 let state = start_process("sleep", &args, &envs, &pid, &log, None).unwrap();
296 assert!(matches!(
297 state,
298 ServiceState::Started | ServiceState::AlreadyRunning
299 ));
300
301 std::thread::sleep(std::time::Duration::from_millis(50));
302 assert_eq!(process_status(&pid).unwrap(), ProcessStatus::Running);
303
304 let stop = stop_process(&pid).unwrap();
305 assert!(matches!(
306 stop,
307 ServiceState::Stopped | ServiceState::NotRunning
308 ));
309 }
310}