apm-server 0.1.24

Web UI and agent dispatcher for APM, a git-native project manager for parallel AI coding agents.
use axum::{
    extract::{Path, State},
    http::StatusCode,
    response::IntoResponse,
    Json,
};
use std::path::{Path as FsPath, PathBuf};
use std::sync::Arc;

use crate::{AppError, AppState, TicketSource};

#[derive(serde::Serialize)]
pub struct WorkerInfo {
    pid: u32,
    ticket_id: String,
    ticket_title: String,
    branch: String,
    state: String,
    elapsed: String,
    status: String,
}

pub async fn workers_handler(
    State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<WorkerInfo>>, AppError> {
    let (root, tickets_dir) = match &state.source {
        TicketSource::Git(root, tickets_dir) => (root.clone(), tickets_dir.clone()),
        TicketSource::InMemory(_) => return Ok(Json(vec![])),
    };
    let results = crate::util::blocking(move || collect_workers(&root, &tickets_dir)).await?;
    Ok(Json(results))
}

fn determine_status(alive: bool, state: &str, ended_states: &std::collections::HashSet<&str>) -> &'static str {
    if alive {
        "running"
    } else if ended_states.contains(state) {
        "ended"
    } else {
        "crashed"
    }
}

fn collect_workers(root: &FsPath, tickets_dir: &FsPath) -> anyhow::Result<Vec<WorkerInfo>> {
    let output = std::process::Command::new("git")
        .args(["worktree", "list", "--porcelain"])
        .current_dir(root)
        .output()?;
    let stdout = String::from_utf8_lossy(&output.stdout);
    let worktree_paths: Vec<PathBuf> = stdout
        .lines()
        .filter_map(|line| line.strip_prefix("worktree "))
        .map(PathBuf::from)
        .collect();

    let tickets = apm_core::ticket::load_all_from_git(root, tickets_dir).unwrap_or_default();
    let config = apm_core::config::Config::load(root)?;
    let ended_states: std::collections::HashSet<&str> = config
        .workflow
        .states
        .iter()
        .filter(|s| s.terminal || s.worker_end)
        .map(|s| s.id.as_str())
        .collect();
    let worker_states: std::collections::HashSet<&str> = config
        .workflow
        .states
        .iter()
        .filter(|s| !s.terminal && !s.worker_end && s.actionable.is_empty())
        .map(|s| s.id.as_str())
        .collect();

    let mut results = Vec::new();
    for wt_path in worktree_paths {
        let pid_path = wt_path.join(".apm-worker.pid");
        if !pid_path.exists() {
            continue;
        }
        let (pid, pf) = match apm_core::worker::read_pid_file(&pid_path) {
            Ok(v) => v,
            Err(_) => continue,
        };
        let alive = apm_core::worker::is_alive(pid);
        let elapsed = apm_core::worker::elapsed_since(&pf.started_at);
        let ticket = tickets.iter().find(|t| t.frontmatter.id == pf.ticket_id);
        let (ticket_title, branch, state) = match ticket {
            Some(t) => (
                t.frontmatter.title.clone(),
                t.frontmatter.branch.clone().unwrap_or_default(),
                t.frontmatter.state.clone(),
            ),
            None => (String::new(), String::new(), String::new()),
        };
        if !worker_states.contains(state.as_str()) && !ended_states.contains(state.as_str()) {
            continue;
        }
        let status = determine_status(alive, &state, &ended_states);
        results.push(WorkerInfo {
            pid,
            ticket_id: pf.ticket_id,
            ticket_title,
            branch,
            state,
            elapsed,
            status: status.to_string(),
        });
    }
    Ok(results)
}

enum StopError {
    NotFound,
    NotAlive,
    Other(String),
}

fn stop_worker_by_pid(root: &FsPath, target_pid: u32) -> Result<(), StopError> {
    let worktrees = apm_core::worktree::list_ticket_worktrees(root)
        .map_err(|e| StopError::Other(e.to_string()))?;
    for (wt_path, _branch) in &worktrees {
        let pid_path = wt_path.join(".apm-worker.pid");
        if !pid_path.exists() {
            continue;
        }
        let Ok((pid, _)) = apm_core::worker::read_pid_file(&pid_path) else {
            continue;
        };
        if pid != target_pid {
            continue;
        }
        if !apm_core::worker::is_alive(pid) {
            let _ = std::fs::remove_file(&pid_path);
            return Err(StopError::NotAlive);
        }
        std::process::Command::new("kill")
            .args(["-TERM", &pid.to_string()])
            .status()
            .map_err(|e| StopError::Other(e.to_string()))?;
        return Ok(());
    }
    Err(StopError::NotFound)
}

pub async fn delete_worker(
    State(state): State<Arc<AppState>>,
    Path(pid_str): Path<String>,
) -> impl IntoResponse {
    let pid: u32 = match pid_str.parse() {
        Ok(p) => p,
        Err(_) => {
            return (
                StatusCode::BAD_REQUEST,
                Json(serde_json::json!({"error": "invalid pid"})),
            )
                .into_response()
        }
    };
    let root = match &state.source {
        TicketSource::Git(root, _) => root.clone(),
        TicketSource::InMemory(_) => {
            return (
                StatusCode::NOT_IMPLEMENTED,
                Json(serde_json::json!({"error": "no git root"})),
            )
                .into_response()
        }
    };
    let result = tokio::task::spawn_blocking(move || stop_worker_by_pid(&root, pid))
        .await
        .unwrap();
    match result {
        Ok(()) => StatusCode::NO_CONTENT.into_response(),
        Err(StopError::NotFound) => (
            StatusCode::NOT_FOUND,
            Json(serde_json::json!({"error": "pid not found"})),
        )
            .into_response(),
        Err(StopError::NotAlive) => (
            StatusCode::CONFLICT,
            Json(serde_json::json!({"error": "process not alive (stale pid file)"})),
        )
            .into_response(),
        Err(StopError::Other(e)) => (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(serde_json::json!({"error": e})),
        )
            .into_response(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use axum::body::Body;
    use axum::http::{Request, StatusCode};
    use http_body_util::BodyExt;
    use tower::ServiceExt;

    #[test]
    fn determine_status_dead_terminal_shows_ended() {
        let mut ended: std::collections::HashSet<&str> = std::collections::HashSet::new();
        ended.insert("closed");

        assert_eq!(determine_status(false, "closed", &ended), "ended");
        assert_eq!(determine_status(false, "in_progress", &ended), "crashed");
        assert_eq!(determine_status(true, "closed", &ended), "running");
        assert_eq!(determine_status(true, "in_progress", &ended), "running");
    }

    #[test]
    fn determine_status_dead_worker_end_shows_ended() {
        let mut ended: std::collections::HashSet<&str> = std::collections::HashSet::new();
        ended.insert("specd");
        ended.insert("implemented");

        assert_eq!(determine_status(false, "specd", &ended), "ended");
        assert_eq!(determine_status(false, "implemented", &ended), "ended");
        assert_eq!(determine_status(false, "in_progress", &ended), "crashed");
        assert_eq!(determine_status(true, "specd", &ended), "running");
    }

    /// States with no worker association (ready, groomed, etc.) should be
    /// excluded from the workers response. Only worker_states and ended_states
    /// should appear.
    #[test]
    fn non_worker_states_excluded_by_filter() {
        let worker_states: std::collections::HashSet<&str> =
            ["in_design", "in_progress"].iter().copied().collect();
        let ended_states: std::collections::HashSet<&str> =
            ["specd", "implemented", "closed"].iter().copied().collect();

        // States that should pass the filter
        assert!(
            worker_states.contains("in_progress") || ended_states.contains("in_progress"),
            "in_progress should be included"
        );
        assert!(
            worker_states.contains("in_design") || ended_states.contains("in_design"),
            "in_design should be included"
        );
        assert!(
            worker_states.contains("implemented") || ended_states.contains("implemented"),
            "implemented should be included"
        );
        assert!(
            worker_states.contains("closed") || ended_states.contains("closed"),
            "closed should be included"
        );

        // States that should be excluded
        let excluded = ["ready", "groomed", "new", "question", "ammend", "blocked"];
        for state in excluded {
            assert!(
                !worker_states.contains(state) && !ended_states.contains(state),
                "state '{state}' should be excluded from workers panel"
            );
        }
    }

    #[tokio::test]
    async fn workers_empty_when_no_pid_files() {
        let app = crate::build_app_in_memory_with_workers(vec![]);
        let response = app
            .oneshot(
                Request::builder()
                    .uri("/api/workers")
                    .body(Body::empty())
                    .unwrap(),
            )
            .await
            .unwrap();
        assert_eq!(response.status(), StatusCode::OK);
        assert!(response
            .headers()
            .get("content-type")
            .unwrap()
            .to_str()
            .unwrap()
            .contains("application/json"));
        let bytes = response.into_body().collect().await.unwrap().to_bytes();
        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
        assert!(json.is_array());
        assert_eq!(json.as_array().unwrap().len(), 0);
    }
}