greentic_operator/
supervisor.rs1use std::collections::BTreeMap;
2use std::path::{Path, PathBuf};
3use std::process::{Command, Stdio};
4use std::time::{Duration, Instant};
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use sysinfo::{Pid, ProcessesToUpdate, System};
9
10use crate::runtime_state::{RuntimePaths, read_json, write_json};
11
12#[derive(Clone, Debug, Eq, PartialEq, Hash)]
13pub struct ServiceId(String);
14
15impl ServiceId {
16 pub fn new(value: impl Into<String>) -> anyhow::Result<Self> {
17 let value = value.into();
18 if value.is_empty() {
19 return Err(anyhow::anyhow!("service id cannot be empty"));
20 }
21 if !value
22 .chars()
23 .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
24 {
25 return Err(anyhow::anyhow!(
26 "invalid service id '{}'; use alphanumeric, '-' or '_'",
27 value
28 ));
29 }
30 Ok(Self(value))
31 }
32
33 pub fn as_str(&self) -> &str {
34 &self.0
35 }
36}
37
38#[derive(Clone, Debug)]
39pub struct ServiceSpec {
40 pub id: ServiceId,
41 pub argv: Vec<String>,
42 pub cwd: Option<PathBuf>,
43 pub env: BTreeMap<String, String>,
44}
45
46#[derive(Clone, Debug)]
47pub struct ServiceHandle {
48 pub id: ServiceId,
49 pub pid: u32,
50 pub started_at: DateTime<Utc>,
51 pub log_path: PathBuf,
52}
53
54#[derive(Clone, Debug)]
55pub struct ServiceStatus {
56 pub id: ServiceId,
57 pub running: bool,
58 pub pid: Option<u32>,
59 pub log_path: PathBuf,
60 pub last_error: Option<String>,
61}
62
63#[derive(Clone, Debug, Serialize, Deserialize)]
64pub struct ResolvedService {
65 pub argv: Vec<String>,
66 pub cwd: Option<PathBuf>,
67 pub env: BTreeMap<String, String>,
68 #[serde(default)]
69 pub log_path: Option<PathBuf>,
70}
71
72pub fn spawn_service(
73 paths: &RuntimePaths,
74 spec: ServiceSpec,
75 log_path_override: Option<PathBuf>,
76) -> anyhow::Result<ServiceHandle> {
77 if spec.argv.is_empty() {
78 return Err(anyhow::anyhow!("service argv cannot be empty"));
79 }
80 let pid_path = paths.pid_path(spec.id.as_str());
81 if let Some(pid) = read_pid(&pid_path)?
82 && is_running(pid)
83 {
84 return Err(anyhow::anyhow!(
85 "service {} already running (pid {})",
86 spec.id.as_str(),
87 pid
88 ));
89 }
90
91 let log_path = log_path_override.unwrap_or_else(|| paths.log_path(spec.id.as_str()));
92 if let Some(parent) = log_path.parent() {
93 std::fs::create_dir_all(parent)?;
94 }
95 let log_file = std::fs::OpenOptions::new()
96 .create(true)
97 .append(true)
98 .open(&log_path)?;
99 let log_err = log_file.try_clone()?;
100
101 let mut command = Command::new(&spec.argv[0]);
102 if spec.argv.len() > 1 {
103 command.args(&spec.argv[1..]);
104 }
105 if let Some(cwd) = &spec.cwd {
106 command.current_dir(cwd);
107 }
108 command.envs(spec.env.iter());
109 let child = command
110 .stdout(Stdio::from(log_file))
111 .stderr(Stdio::from(log_err))
112 .spawn()?;
113
114 let pid = child.id();
115 std::fs::create_dir_all(paths.pids_dir())?;
116 std::fs::write(&pid_path, pid.to_string())?;
117
118 let resolved = ResolvedService {
119 argv: spec.argv.clone(),
120 cwd: spec.cwd.clone(),
121 env: spec.env.clone(),
122 log_path: Some(log_path.clone()),
123 };
124 write_json(&paths.resolved_path(spec.id.as_str()), &resolved)?;
125
126 Ok(ServiceHandle {
127 id: spec.id,
128 pid,
129 started_at: Utc::now(),
130 log_path,
131 })
132}
133
134pub fn stop_service(
135 paths: &RuntimePaths,
136 id: &ServiceId,
137 graceful_timeout_ms: u64,
138) -> anyhow::Result<()> {
139 let pid_path = paths.pid_path(id.as_str());
140 stop_pidfile(&pid_path, graceful_timeout_ms)
141}
142
143pub fn stop_pidfile(pid_path: &Path, graceful_timeout_ms: u64) -> anyhow::Result<()> {
144 let pid = match read_pid(pid_path)? {
145 Some(pid) => pid,
146 None => return Ok(()),
147 };
148
149 if !is_running(pid) {
150 let _ = std::fs::remove_file(pid_path);
151 return Ok(());
152 }
153
154 terminate_process(pid, graceful_timeout_ms)?;
155 let _ = std::fs::remove_file(pid_path);
156 Ok(())
157}
158
159pub fn read_status(paths: &RuntimePaths) -> anyhow::Result<Vec<ServiceStatus>> {
160 let mut statuses = Vec::new();
161 let pids_dir = paths.pids_dir();
162 if !pids_dir.exists() {
163 return Ok(statuses);
164 }
165 for entry in std::fs::read_dir(&pids_dir)? {
166 let entry = entry?;
167 if !entry.file_type()?.is_file() {
168 continue;
169 }
170 let path = entry.path();
171 if path.extension().and_then(|ext| ext.to_str()) != Some("pid") {
172 continue;
173 }
174 let file_name = entry.file_name();
175 let Some(stem) = Path::new(&file_name).file_stem().and_then(|s| s.to_str()) else {
176 continue;
177 };
178 let id = ServiceId::new(stem.to_string())?;
179 let pid = read_pid(&path)?;
180 let running = pid.map(is_running).unwrap_or(false);
181 let log_path = if let Some(resolved) = read_resolved(paths, &id)? {
182 resolved
183 .log_path
184 .or_else(|| Some(paths.log_path(stem)))
185 .unwrap()
186 } else {
187 paths.log_path(stem)
188 };
189 statuses.push(ServiceStatus {
190 id,
191 running,
192 pid,
193 log_path,
194 last_error: None,
195 });
196 }
197 Ok(statuses)
198}
199
200pub fn read_resolved(
201 paths: &RuntimePaths,
202 id: &ServiceId,
203) -> anyhow::Result<Option<ResolvedService>> {
204 read_json(&paths.resolved_path(id.as_str()))
205}
206
207pub fn is_running(pid: u32) -> bool {
208 let mut system = System::new();
209 let pid = Pid::from_u32(pid);
210 system.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
211 system.process(pid).is_some()
212}
213
214fn read_pid(pid_path: &Path) -> anyhow::Result<Option<u32>> {
215 if !pid_path.exists() {
216 return Ok(None);
217 }
218 let contents = std::fs::read_to_string(pid_path)?;
219 let trimmed = contents.trim();
220 if trimmed.is_empty() {
221 return Ok(None);
222 }
223 Ok(Some(trimmed.parse()?))
224}
225
226fn terminate_process(pid: u32, graceful_timeout_ms: u64) -> anyhow::Result<()> {
227 #[cfg(unix)]
228 {
229 let _ = unsafe { libc::kill(pid as i32, libc::SIGTERM) };
230 let deadline = Instant::now() + Duration::from_millis(graceful_timeout_ms);
231 while Instant::now() < deadline {
232 if !is_running(pid) {
233 return Ok(());
234 }
235 std::thread::sleep(Duration::from_millis(50));
236 }
237 let _ = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
238 Ok(())
239 }
240
241 #[cfg(windows)]
242 {
243 let _ = graceful_timeout_ms;
244 let _ = Command::new("taskkill")
245 .args(["/PID", &pid.to_string(), "/T", "/F"])
246 .status();
247 Ok(())
248 }
249}