1use anyhow::{Context, Result, bail};
2use std::env;
3use std::ffi::OsString;
4use std::fs::{self, OpenOptions};
5use std::io;
6use std::net::TcpStream;
7use std::path::{Path, PathBuf};
8use std::process::{Command, Stdio};
9use std::thread;
10use std::time::{Duration, Instant};
11
12const PROC_CORE_DAEMON: ManagedProcess = ManagedProcess {
13 name: "core_daemon",
14 binary: "core_daemon",
15 binary_env: "CONTRAIL_CORE_DAEMON_BIN",
16 pid_file: "core_daemon.pid",
17 log_file: "core_daemon.log",
18 health_addr: None,
19};
20
21const PROC_DASHBOARD: ManagedProcess = ManagedProcess {
22 name: "dashboard",
23 binary: "dashboard",
24 binary_env: "CONTRAIL_DASHBOARD_BIN",
25 pid_file: "dashboard.pid",
26 log_file: "dashboard.log",
27 health_addr: Some("127.0.0.1:3000"),
28};
29
30const PROC_ANALYSIS: ManagedProcess = ManagedProcess {
31 name: "analysis",
32 binary: "analysis",
33 binary_env: "CONTRAIL_ANALYSIS_BIN",
34 pid_file: "analysis.pid",
35 log_file: "analysis.log",
36 health_addr: Some("127.0.0.1:3210"),
37};
38
39const PROCS_START_ORDER: [ManagedProcess; 3] = [PROC_CORE_DAEMON, PROC_DASHBOARD, PROC_ANALYSIS];
40const PROCS_STOP_ORDER: [ManagedProcess; 3] = [PROC_ANALYSIS, PROC_DASHBOARD, PROC_CORE_DAEMON];
41
42pub fn run() -> Result<()> {
43 let args: Vec<OsString> = env::args_os().collect();
44 if let Some(cmd) = parse_lifecycle_command(&args) {
45 return run_lifecycle_command(cmd);
46 }
47
48 importer::run()
49}
50
51#[derive(Clone, Copy)]
52enum LifecycleCommand {
53 Up,
54 Down,
55 Status,
56}
57
58#[derive(Clone, Copy)]
59struct ManagedProcess {
60 name: &'static str,
61 binary: &'static str,
62 binary_env: &'static str,
63 pid_file: &'static str,
64 log_file: &'static str,
65 health_addr: Option<&'static str>,
66}
67
68fn parse_lifecycle_command(args: &[OsString]) -> Option<LifecycleCommand> {
69 let command = args.get(1)?.to_str()?;
70 match command {
71 "up" => Some(LifecycleCommand::Up),
72 "down" => Some(LifecycleCommand::Down),
73 "status" => Some(LifecycleCommand::Status),
74 _ => None,
75 }
76}
77
78fn run_lifecycle_command(command: LifecycleCommand) -> Result<()> {
79 let run_dir = contrail_root_dir()?.join("run");
80 fs::create_dir_all(&run_dir)
81 .with_context(|| format!("failed to create run directory at {}", run_dir.display()))?;
82
83 match command {
84 LifecycleCommand::Up => {
85 let mut started: Vec<ManagedProcess> = Vec::new();
86 for process in PROCS_START_ORDER {
87 if let Err(err) = start_process(&run_dir, process) {
88 for started_process in started.iter().rev() {
89 let _ = stop_process(&run_dir, *started_process);
90 }
91 return Err(err);
92 }
93 started.push(process);
94 }
95 }
96 LifecycleCommand::Down => {
97 for process in PROCS_STOP_ORDER {
98 stop_process(&run_dir, process)?;
99 }
100 }
101 LifecycleCommand::Status => {
102 for process in PROCS_START_ORDER {
103 print_process_status(&run_dir, process);
104 }
105 }
106 }
107
108 Ok(())
109}
110
111fn contrail_root_dir() -> Result<PathBuf> {
112 if let Some(root) = env::var_os("CONTRAIL_HOME") {
113 return Ok(PathBuf::from(root));
114 }
115
116 let home = env::var_os("HOME")
117 .map(PathBuf::from)
118 .context("HOME is not set and CONTRAIL_HOME was not provided")?;
119 Ok(home.join(".contrail"))
120}
121
122fn start_process(run_dir: &Path, process: ManagedProcess) -> Result<()> {
123 let pid_path = run_dir.join(process.pid_file);
124 let log_path = run_dir.join(process.log_file);
125
126 if let Some(pid) = read_pid(&pid_path) {
127 if is_pid_running(pid) {
128 println!("{} already running (pid {})", process.name, pid);
129 return Ok(());
130 }
131 fs::remove_file(&pid_path).ok();
132 }
133
134 let stdout_log = OpenOptions::new()
135 .create(true)
136 .append(true)
137 .open(&log_path)
138 .with_context(|| format!("failed to open log file {}", log_path.display()))?;
139 let stderr_log = stdout_log
140 .try_clone()
141 .with_context(|| format!("failed to clone log file handle {}", log_path.display()))?;
142
143 let binary = resolve_binary_path(process)?;
144 let mut command = Command::new(&binary);
145 command
146 .stdin(Stdio::null())
147 .stdout(Stdio::from(stdout_log))
148 .stderr(Stdio::from(stderr_log));
149
150 let child = match command.spawn() {
151 Ok(child) => child,
152 Err(err) if err.kind() == io::ErrorKind::NotFound => {
153 bail!(
154 "{} binary not found in PATH. Install it, then retry.",
155 process.binary
156 )
157 }
158 Err(err) => {
159 return Err(err).with_context(|| format!("failed to start {}", process.binary));
160 }
161 };
162
163 let pid = child.id();
164 fs::write(&pid_path, format!("{pid}\n"))
165 .with_context(|| format!("failed to write pid file {}", pid_path.display()))?;
166 println!(
167 "started {} (pid {}, binary {}, log {})",
168 process.name,
169 pid,
170 binary.display(),
171 log_path.display()
172 );
173
174 let became_healthy = if let Some(addr) = process.health_addr {
175 wait_for_health(process.name, addr)
176 } else {
177 true
178 };
179
180 if !became_healthy {
181 if !is_pid_running(pid) {
182 bail!(
183 "{} exited before becoming healthy. Check {}. If a different `{}` binary is installed, set {} to the intended binary path.",
184 process.name,
185 log_path.display(),
186 process.binary,
187 process.binary_env
188 );
189 }
190 bail!(
191 "{} did not become healthy within timeout. Check {}. If a different `{}` binary is installed, set {} to the intended binary path.",
192 process.name,
193 log_path.display(),
194 process.binary,
195 process.binary_env
196 );
197 } else if !is_pid_running(pid) {
198 bail!(
199 "{} exited shortly after start. Check {}",
200 process.name,
201 log_path.display()
202 );
203 }
204
205 Ok(())
206}
207
208fn stop_process(run_dir: &Path, process: ManagedProcess) -> Result<()> {
209 let pid_path = run_dir.join(process.pid_file);
210
211 let Some(pid) = read_pid(&pid_path) else {
212 println!("{} not running", process.name);
213 return Ok(());
214 };
215
216 if !is_pid_running(pid) {
217 fs::remove_file(&pid_path).ok();
218 println!("{} not running", process.name);
219 return Ok(());
220 }
221
222 let _ = send_signal(pid, None)?;
223 let deadline = Instant::now() + Duration::from_secs(5);
224 while Instant::now() < deadline {
225 if !is_pid_running(pid) {
226 break;
227 }
228 thread::sleep(Duration::from_millis(100));
229 }
230
231 if is_pid_running(pid) {
232 let killed = send_signal(pid, Some("-9"))?;
233 if !killed && is_pid_running(pid) {
234 bail!("failed to stop {} (pid {})", process.name, pid);
235 }
236 }
237
238 fs::remove_file(&pid_path).ok();
239 println!("stopped {} (pid {})", process.name, pid);
240 Ok(())
241}
242
243fn print_process_status(run_dir: &Path, process: ManagedProcess) {
244 let pid_path = run_dir.join(process.pid_file);
245 match read_pid(&pid_path) {
246 Some(pid) if is_pid_running(pid) => {
247 println!("{}: running (pid {})", process.name, pid);
248 }
249 Some(_) => {
250 fs::remove_file(&pid_path).ok();
251 println!("{}: stopped", process.name);
252 }
253 None => {
254 println!("{}: stopped", process.name);
255 }
256 }
257}
258
259fn read_pid(pid_path: &Path) -> Option<u32> {
260 let raw = fs::read_to_string(pid_path).ok()?;
261 raw.trim().parse::<u32>().ok()
262}
263
264fn is_pid_running(pid: u32) -> bool {
265 Command::new("kill")
266 .arg("-0")
267 .arg(pid.to_string())
268 .stdout(Stdio::null())
269 .stderr(Stdio::null())
270 .status()
271 .map(|status| status.success())
272 .unwrap_or(false)
273}
274
275fn send_signal(pid: u32, signal: Option<&str>) -> Result<bool> {
276 let mut command = Command::new("kill");
277 if let Some(signal) = signal {
278 command.arg(signal);
279 }
280 let status = command
281 .arg(pid.to_string())
282 .stdout(Stdio::null())
283 .stderr(Stdio::null())
284 .status()
285 .with_context(|| format!("failed to send signal to pid {}", pid))?;
286 Ok(status.success())
287}
288
289fn wait_for_health(name: &str, addr: &str) -> bool {
290 let deadline = Instant::now() + Duration::from_secs(15);
291 while Instant::now() < deadline {
292 if TcpStream::connect(addr).is_ok() {
293 println!("{} healthy at http://{}", name, addr);
294 return true;
295 }
296 thread::sleep(Duration::from_millis(500));
297 }
298 eprintln!(
299 "warning: {} did not become healthy at http://{}",
300 name, addr
301 );
302 false
303}
304
305fn resolve_binary_path(process: ManagedProcess) -> Result<PathBuf> {
306 if let Some(path) = env::var_os(process.binary_env)
307 && !path.is_empty()
308 {
309 return Ok(PathBuf::from(path));
310 }
311
312 if let Ok(current_exe) = env::current_exe()
313 && let Some(bin_dir) = current_exe.parent()
314 {
315 let sibling = bin_dir.join(process.binary);
316 if sibling.is_file() {
317 return Ok(sibling);
318 }
319 }
320
321 Ok(PathBuf::from(process.binary))
322}
323
324#[cfg(test)]
325mod tests {
326 use super::{LifecycleCommand, parse_lifecycle_command};
327 use std::ffi::OsString;
328
329 #[test]
330 fn parses_lifecycle_commands() {
331 let args = vec![OsString::from("contrail"), OsString::from("up")];
332 assert!(matches!(
333 parse_lifecycle_command(&args),
334 Some(LifecycleCommand::Up)
335 ));
336
337 let args = vec![OsString::from("contrail"), OsString::from("down")];
338 assert!(matches!(
339 parse_lifecycle_command(&args),
340 Some(LifecycleCommand::Down)
341 ));
342
343 let args = vec![OsString::from("contrail"), OsString::from("status")];
344 assert!(matches!(
345 parse_lifecycle_command(&args),
346 Some(LifecycleCommand::Status)
347 ));
348 }
349
350 #[test]
351 fn leaves_other_commands_for_importer_cli() {
352 let args = vec![OsString::from("contrail"), OsString::from("import-history")];
353 assert!(parse_lifecycle_command(&args).is_none());
354 }
355}