Skip to main content

contrail_cli/
lib.rs

1use anyhow::{Context, Result, bail};
2use std::env;
3use std::ffi::OsString;
4use std::fs::{self, OpenOptions};
5use std::io;
6use std::net::TcpStream;
7use std::path::{Path, PathBuf};
8use std::process::{Command, Stdio};
9use std::thread;
10use std::time::{Duration, Instant};
11
12const PROC_CORE_DAEMON: ManagedProcess = ManagedProcess {
13    name: "core_daemon",
14    binary: "core_daemon",
15    binary_env: "CONTRAIL_CORE_DAEMON_BIN",
16    pid_file: "core_daemon.pid",
17    log_file: "core_daemon.log",
18    health_addr: None,
19};
20
21const PROC_DASHBOARD: ManagedProcess = ManagedProcess {
22    name: "dashboard",
23    binary: "dashboard",
24    binary_env: "CONTRAIL_DASHBOARD_BIN",
25    pid_file: "dashboard.pid",
26    log_file: "dashboard.log",
27    health_addr: Some("127.0.0.1:3000"),
28};
29
30const PROC_ANALYSIS: ManagedProcess = ManagedProcess {
31    name: "analysis",
32    binary: "analysis",
33    binary_env: "CONTRAIL_ANALYSIS_BIN",
34    pid_file: "analysis.pid",
35    log_file: "analysis.log",
36    health_addr: Some("127.0.0.1:3210"),
37};
38
39const PROCS_START_ORDER: [ManagedProcess; 3] = [PROC_CORE_DAEMON, PROC_DASHBOARD, PROC_ANALYSIS];
40const PROCS_STOP_ORDER: [ManagedProcess; 3] = [PROC_ANALYSIS, PROC_DASHBOARD, PROC_CORE_DAEMON];
41
42pub fn run() -> Result<()> {
43    let args: Vec<OsString> = env::args_os().collect();
44    if let Some(cmd) = parse_lifecycle_command(&args) {
45        return run_lifecycle_command(cmd);
46    }
47
48    importer::run()
49}
50
51#[derive(Clone, Copy)]
52enum LifecycleCommand {
53    Up,
54    Down,
55    Status,
56}
57
58#[derive(Clone, Copy)]
59struct ManagedProcess {
60    name: &'static str,
61    binary: &'static str,
62    binary_env: &'static str,
63    pid_file: &'static str,
64    log_file: &'static str,
65    health_addr: Option<&'static str>,
66}
67
68fn parse_lifecycle_command(args: &[OsString]) -> Option<LifecycleCommand> {
69    let command = args.get(1)?.to_str()?;
70    match command {
71        "up" => Some(LifecycleCommand::Up),
72        "down" => Some(LifecycleCommand::Down),
73        "status" => Some(LifecycleCommand::Status),
74        _ => None,
75    }
76}
77
78fn run_lifecycle_command(command: LifecycleCommand) -> Result<()> {
79    let run_dir = contrail_root_dir()?.join("run");
80    fs::create_dir_all(&run_dir)
81        .with_context(|| format!("failed to create run directory at {}", run_dir.display()))?;
82
83    match command {
84        LifecycleCommand::Up => {
85            let mut started: Vec<ManagedProcess> = Vec::new();
86            for process in PROCS_START_ORDER {
87                if let Err(err) = start_process(&run_dir, process) {
88                    for started_process in started.iter().rev() {
89                        let _ = stop_process(&run_dir, *started_process);
90                    }
91                    return Err(err);
92                }
93                started.push(process);
94            }
95        }
96        LifecycleCommand::Down => {
97            for process in PROCS_STOP_ORDER {
98                stop_process(&run_dir, process)?;
99            }
100        }
101        LifecycleCommand::Status => {
102            for process in PROCS_START_ORDER {
103                print_process_status(&run_dir, process);
104            }
105        }
106    }
107
108    Ok(())
109}
110
111fn contrail_root_dir() -> Result<PathBuf> {
112    if let Some(root) = env::var_os("CONTRAIL_HOME") {
113        return Ok(PathBuf::from(root));
114    }
115
116    let home = env::var_os("HOME")
117        .map(PathBuf::from)
118        .context("HOME is not set and CONTRAIL_HOME was not provided")?;
119    Ok(home.join(".contrail"))
120}
121
122fn start_process(run_dir: &Path, process: ManagedProcess) -> Result<()> {
123    let pid_path = run_dir.join(process.pid_file);
124    let log_path = run_dir.join(process.log_file);
125
126    if let Some(pid) = read_pid(&pid_path) {
127        if is_pid_running(pid) {
128            println!("{} already running (pid {})", process.name, pid);
129            return Ok(());
130        }
131        fs::remove_file(&pid_path).ok();
132    }
133
134    let stdout_log = OpenOptions::new()
135        .create(true)
136        .append(true)
137        .open(&log_path)
138        .with_context(|| format!("failed to open log file {}", log_path.display()))?;
139    let stderr_log = stdout_log
140        .try_clone()
141        .with_context(|| format!("failed to clone log file handle {}", log_path.display()))?;
142
143    let binary = resolve_binary_path(process)?;
144    let mut command = Command::new(&binary);
145    command
146        .stdin(Stdio::null())
147        .stdout(Stdio::from(stdout_log))
148        .stderr(Stdio::from(stderr_log));
149
150    let child = match command.spawn() {
151        Ok(child) => child,
152        Err(err) if err.kind() == io::ErrorKind::NotFound => {
153            bail!(
154                "{} binary not found in PATH. Install it, then retry.",
155                process.binary
156            )
157        }
158        Err(err) => {
159            return Err(err).with_context(|| format!("failed to start {}", process.binary));
160        }
161    };
162
163    let pid = child.id();
164    fs::write(&pid_path, format!("{pid}\n"))
165        .with_context(|| format!("failed to write pid file {}", pid_path.display()))?;
166    println!(
167        "started {} (pid {}, binary {}, log {})",
168        process.name,
169        pid,
170        binary.display(),
171        log_path.display()
172    );
173
174    let became_healthy = if let Some(addr) = process.health_addr {
175        wait_for_health(process.name, addr)
176    } else {
177        true
178    };
179
180    if !became_healthy {
181        if !is_pid_running(pid) {
182            bail!(
183                "{} exited before becoming healthy. Check {}. If a different `{}` binary is installed, set {} to the intended binary path.",
184                process.name,
185                log_path.display(),
186                process.binary,
187                process.binary_env
188            );
189        }
190        bail!(
191            "{} did not become healthy within timeout. Check {}. If a different `{}` binary is installed, set {} to the intended binary path.",
192            process.name,
193            log_path.display(),
194            process.binary,
195            process.binary_env
196        );
197    } else if !is_pid_running(pid) {
198        bail!(
199            "{} exited shortly after start. Check {}",
200            process.name,
201            log_path.display()
202        );
203    }
204
205    Ok(())
206}
207
208fn stop_process(run_dir: &Path, process: ManagedProcess) -> Result<()> {
209    let pid_path = run_dir.join(process.pid_file);
210
211    let Some(pid) = read_pid(&pid_path) else {
212        println!("{} not running", process.name);
213        return Ok(());
214    };
215
216    if !is_pid_running(pid) {
217        fs::remove_file(&pid_path).ok();
218        println!("{} not running", process.name);
219        return Ok(());
220    }
221
222    let _ = send_signal(pid, None)?;
223    let deadline = Instant::now() + Duration::from_secs(5);
224    while Instant::now() < deadline {
225        if !is_pid_running(pid) {
226            break;
227        }
228        thread::sleep(Duration::from_millis(100));
229    }
230
231    if is_pid_running(pid) {
232        let killed = send_signal(pid, Some("-9"))?;
233        if !killed && is_pid_running(pid) {
234            bail!("failed to stop {} (pid {})", process.name, pid);
235        }
236    }
237
238    fs::remove_file(&pid_path).ok();
239    println!("stopped {} (pid {})", process.name, pid);
240    Ok(())
241}
242
243fn print_process_status(run_dir: &Path, process: ManagedProcess) {
244    let pid_path = run_dir.join(process.pid_file);
245    match read_pid(&pid_path) {
246        Some(pid) if is_pid_running(pid) => {
247            println!("{}: running (pid {})", process.name, pid);
248        }
249        Some(_) => {
250            fs::remove_file(&pid_path).ok();
251            println!("{}: stopped", process.name);
252        }
253        None => {
254            println!("{}: stopped", process.name);
255        }
256    }
257}
258
259fn read_pid(pid_path: &Path) -> Option<u32> {
260    let raw = fs::read_to_string(pid_path).ok()?;
261    raw.trim().parse::<u32>().ok()
262}
263
264fn is_pid_running(pid: u32) -> bool {
265    Command::new("kill")
266        .arg("-0")
267        .arg(pid.to_string())
268        .stdout(Stdio::null())
269        .stderr(Stdio::null())
270        .status()
271        .map(|status| status.success())
272        .unwrap_or(false)
273}
274
275fn send_signal(pid: u32, signal: Option<&str>) -> Result<bool> {
276    let mut command = Command::new("kill");
277    if let Some(signal) = signal {
278        command.arg(signal);
279    }
280    let status = command
281        .arg(pid.to_string())
282        .stdout(Stdio::null())
283        .stderr(Stdio::null())
284        .status()
285        .with_context(|| format!("failed to send signal to pid {}", pid))?;
286    Ok(status.success())
287}
288
289fn wait_for_health(name: &str, addr: &str) -> bool {
290    let deadline = Instant::now() + Duration::from_secs(15);
291    while Instant::now() < deadline {
292        if TcpStream::connect(addr).is_ok() {
293            println!("{} healthy at http://{}", name, addr);
294            return true;
295        }
296        thread::sleep(Duration::from_millis(500));
297    }
298    eprintln!(
299        "warning: {} did not become healthy at http://{}",
300        name, addr
301    );
302    false
303}
304
305fn resolve_binary_path(process: ManagedProcess) -> Result<PathBuf> {
306    if let Some(path) = env::var_os(process.binary_env)
307        && !path.is_empty()
308    {
309        return Ok(PathBuf::from(path));
310    }
311
312    if let Ok(current_exe) = env::current_exe()
313        && let Some(bin_dir) = current_exe.parent()
314    {
315        let sibling = bin_dir.join(process.binary);
316        if sibling.is_file() {
317            return Ok(sibling);
318        }
319    }
320
321    Ok(PathBuf::from(process.binary))
322}
323
324#[cfg(test)]
325mod tests {
326    use super::{LifecycleCommand, parse_lifecycle_command};
327    use std::ffi::OsString;
328
329    #[test]
330    fn parses_lifecycle_commands() {
331        let args = vec![OsString::from("contrail"), OsString::from("up")];
332        assert!(matches!(
333            parse_lifecycle_command(&args),
334            Some(LifecycleCommand::Up)
335        ));
336
337        let args = vec![OsString::from("contrail"), OsString::from("down")];
338        assert!(matches!(
339            parse_lifecycle_command(&args),
340            Some(LifecycleCommand::Down)
341        ));
342
343        let args = vec![OsString::from("contrail"), OsString::from("status")];
344        assert!(matches!(
345            parse_lifecycle_command(&args),
346            Some(LifecycleCommand::Status)
347        ));
348    }
349
350    #[test]
351    fn leaves_other_commands_for_importer_cli() {
352        let args = vec![OsString::from("contrail"), OsString::from("import-history")];
353        assert!(parse_lifecycle_command(&args).is_none());
354    }
355}