stackpatrol 0.2.5

Single-binary Rust CLI that monitors a server and reports to the StackPatrol control plane.
use std::collections::HashMap;
use std::path::{Path, PathBuf};

use anyhow::{Context, Result, bail};
use serde::Deserialize;
use stackpatrol_core::event::Event;
use tokio::process::Command;

#[derive(Debug, Deserialize)]
struct ComposeService {
    #[serde(rename = "Service")]
    service: String,
    #[serde(rename = "State")]
    state: String,
}

pub struct DockerProbe {
    project_paths: Vec<PathBuf>,
    /// service_name -> was_up_last_tick
    last_state: HashMap<String, bool>,
    initialized: bool,
}

impl DockerProbe {
    pub fn new(project_paths: Vec<PathBuf>) -> Self {
        Self {
            project_paths,
            last_state: HashMap::new(),
            initialized: false,
        }
    }

    pub fn enabled(&self) -> bool {
        !self.project_paths.is_empty()
    }

    /// Scan all configured compose projects and return events for state transitions
    /// since the previous tick. The first tick records baseline state and emits nothing,
    /// so daemon startup never triggers an alert storm.
    pub async fn tick(&mut self) -> Vec<Event> {
        let mut current = HashMap::new();
        for path in &self.project_paths {
            match scan_project(path).await {
                Ok(services) => {
                    for svc in services {
                        let key = service_key(path, &svc.service);
                        current.insert(key, is_up(&svc.state));
                    }
                }
                Err(err) => {
                    eprintln!("docker probe: scan failed for {}: {err:#}", path.display());
                    // Skip this project on failure — keep prior state so a transient
                    // error doesn't spuriously emit ServiceDown for everything we forgot.
                    for (name, was_up) in &self.last_state {
                        if name.starts_with(&project_prefix(path)) {
                            current.insert(name.clone(), *was_up);
                        }
                    }
                }
            }
        }

        if !self.initialized {
            self.last_state = current;
            self.initialized = true;
            return Vec::new();
        }

        let mut events = Vec::new();
        for (name, up_now) in &current {
            match self.last_state.get(name) {
                Some(up_before) if up_before == up_now => {}
                Some(_) => {
                    events.push(if *up_now {
                        Event::ServiceUp { name: name.clone() }
                    } else {
                        Event::ServiceDown { name: name.clone() }
                    });
                }
                None if !up_now => {
                    // New service appeared in a not-running state — alert.
                    events.push(Event::ServiceDown { name: name.clone() });
                }
                None => {}
            }
        }

        self.last_state = current;
        events
    }
}

async fn scan_project(path: &Path) -> Result<Vec<ComposeService>> {
    let output = Command::new("docker")
        .args(["compose", "ps", "--format", "json", "--all"])
        .current_dir(path)
        .output()
        .await
        .context("running `docker compose ps`")?;

    if !output.status.success() {
        let stderr = String::from_utf8_lossy(&output.stderr);
        bail!("docker compose ps exited {}: {stderr}", output.status);
    }

    let stdout =
        String::from_utf8(output.stdout).context("docker compose ps stdout was not valid utf-8")?;
    parse_compose_ps(&stdout)
}

/// `docker compose ps --format json` emits NDJSON in compose v2.x and a JSON array in
/// older / some newer builds. Handle both.
fn parse_compose_ps(s: &str) -> Result<Vec<ComposeService>> {
    let trimmed = s.trim();
    if trimmed.is_empty() {
        return Ok(Vec::new());
    }
    if trimmed.starts_with('[') {
        serde_json::from_str(trimmed).context("parsing docker compose ps JSON array")
    } else {
        trimmed
            .lines()
            .filter(|l| !l.trim().is_empty())
            .map(|l| {
                serde_json::from_str::<ComposeService>(l)
                    .with_context(|| format!("parsing docker compose ps line: {l}"))
            })
            .collect()
    }
}

fn is_up(state: &str) -> bool {
    state.eq_ignore_ascii_case("running")
}

fn project_prefix(path: &Path) -> String {
    let label = path
        .file_name()
        .and_then(|s| s.to_str())
        .unwrap_or("compose");
    format!("{label}/")
}

fn service_key(path: &Path, service: &str) -> String {
    format!("{}{service}", project_prefix(path))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parses_ndjson() {
        let s = r#"{"Service":"web","State":"running"}
{"Service":"db","State":"exited"}"#;
        let svcs = parse_compose_ps(s).unwrap();
        assert_eq!(svcs.len(), 2);
        assert_eq!(svcs[0].service, "web");
        assert_eq!(svcs[1].state, "exited");
    }

    #[test]
    fn parses_json_array() {
        let s = r#"[{"Service":"web","State":"running"},{"Service":"db","State":"running"}]"#;
        let svcs = parse_compose_ps(s).unwrap();
        assert_eq!(svcs.len(), 2);
    }

    #[test]
    fn parses_empty() {
        assert!(parse_compose_ps("").unwrap().is_empty());
        assert!(parse_compose_ps("\n").unwrap().is_empty());
    }

    #[tokio::test]
    async fn first_tick_emits_nothing() {
        let mut probe = DockerProbe::new(vec![]);
        // No projects configured, so scan returns empty — first tick still seeds.
        let events = probe.tick().await;
        assert!(events.is_empty());
        assert!(probe.initialized);
    }

    #[tokio::test]
    async fn diff_emits_transitions() {
        let mut probe = DockerProbe::new(vec![]);
        probe.last_state.insert("proj/web".into(), true);
        probe.last_state.insert("proj/db".into(), true);
        probe.initialized = true;

        // Manually build a "current" by short-circuiting tick logic for the test.
        let mut current = HashMap::new();
        current.insert("proj/web".to_string(), false); // went down
        current.insert("proj/db".to_string(), true); // unchanged

        let mut events = Vec::new();
        for (name, up_now) in &current {
            match probe.last_state.get(name) {
                Some(up_before) if up_before == up_now => {}
                Some(_) => events.push(if *up_now {
                    Event::ServiceUp { name: name.clone() }
                } else {
                    Event::ServiceDown { name: name.clone() }
                }),
                None if !up_now => events.push(Event::ServiceDown { name: name.clone() }),
                None => {}
            }
        }

        assert_eq!(events.len(), 1);
        assert!(matches!(&events[0], Event::ServiceDown { name } if name == "proj/web"));
    }
}