apm-server 0.1.10

Web UI and agent dispatcher for APM, a git-native project manager for parallel AI coding agents.
use axum::{extract::State, Json};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::{AppError, AppState, TicketSource};

#[derive(serde::Serialize)]
pub struct DryRunCandidate {
    id: String,
    title: String,
    state: String,
    priority: u8,
    effort: u8,
    risk: u8,
    score: f64,
}

#[derive(serde::Serialize)]
pub struct DryRunResponse {
    candidates: Vec<DryRunCandidate>,
}

#[derive(serde::Deserialize, Default)]
pub struct StartWorkRequest {
    pub epic: Option<String>,
}

pub struct WorkEngine {
    pub cancel: Arc<AtomicBool>,
    pub handle: tokio::task::JoinHandle<()>,
    pub epic: Option<String>,
}

pub type WorkEngineState = Arc<Mutex<Option<WorkEngine>>>;

pub fn new_engine_state() -> WorkEngineState {
    Arc::new(Mutex::new(None))
}

fn engine_is_alive(engine: &WorkEngine) -> bool {
    !engine.handle.is_finished()
}

fn check_workers_alive(root: &Path) -> bool {
    let output = std::process::Command::new("git")
        .args(["worktree", "list", "--porcelain"])
        .current_dir(root)
        .output();
    let Ok(out) = output else { return false };
    let stdout = String::from_utf8_lossy(&out.stdout);
    stdout
        .lines()
        .filter_map(|l| l.strip_prefix("worktree "))
        .any(|wt| {
            let pid_path = PathBuf::from(wt).join(".apm-worker.pid");
            if !pid_path.exists() {
                return false;
            }
            match apm_core::worker::read_pid_file(&pid_path) {
                Ok((pid, _)) => apm_core::worker::is_alive(pid),
                Err(_) => false,
            }
        })
}

pub async fn get_work_status(
    State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, AppError> {
    let (alive, epic) = {
        let guard = state.work_engine.lock().await;
        match guard.as_ref() {
            Some(e) => (engine_is_alive(e), e.epic.clone()),
            None => (false, None),
        }
    };

    if !alive {
        return Ok(Json(serde_json::json!({"status": "stopped"})));
    }

    let root = match state.git_root() {
        Some(r) => r.clone(),
        None => return Ok(Json(serde_json::json!({"status": "idle", "epic": epic}))),
    };

    let has_alive_worker =
        tokio::task::spawn_blocking(move || check_workers_alive(&root)).await?;

    let status = if has_alive_worker { "running" } else { "idle" };
    Ok(Json(serde_json::json!({"status": status, "epic": epic})))
}

pub async fn post_work_start(
    State(state): State<Arc<AppState>>,
    body: Option<Json<StartWorkRequest>>,
) -> Result<Json<serde_json::Value>, AppError> {
    {
        let guard = state.work_engine.lock().await;
        let already_running = guard.as_ref().map(engine_is_alive).unwrap_or(false);
        drop(guard);
        if already_running {
            return get_work_status(State(state)).await;
        }
    }

    let epic = body.and_then(|b| b.0.epic);

    let root = match state.git_root() {
        Some(r) => r.clone(),
        None => return Ok(Json(serde_json::json!({"status": "stopped"}))),
    };

    let config = crate::util::load_config(root.clone()).await?;

    let max_concurrent = {
        let ov = state.max_concurrent_override.lock().await;
        ov.unwrap_or_else(|| config.agents.max_concurrent.max(1))
    };
    let skip_permissions = config.agents.skip_permissions;

    let cancel = Arc::new(AtomicBool::new(false));
    let cancel_clone = cancel.clone();
    let epic_clone = epic.clone();
    let handle = tokio::task::spawn_blocking(move || {
        let _ = apm_core::work::run_engine_loop(
            &root,
            cancel_clone,
            30,
            max_concurrent,
            skip_permissions,
            epic_clone,
        );
    });

    {
        let mut guard = state.work_engine.lock().await;
        *guard = Some(WorkEngine { cancel, handle, epic });
    }

    Ok(Json(serde_json::json!({"status": "idle"})))
}

pub async fn post_work_stop(
    State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, AppError> {
    let engine_opt = {
        let mut guard = state.work_engine.lock().await;
        guard.take()
    };

    let engine = match engine_opt {
        None => return Ok(Json(serde_json::json!({"status": "stopped"}))),
        Some(e) => e,
    };

    if !engine_is_alive(&engine) {
        return Ok(Json(serde_json::json!({"status": "stopped"})));
    }

    engine.cancel.store(true, Ordering::Relaxed);
    let _ =
        tokio::time::timeout(std::time::Duration::from_secs(10), engine.handle).await;

    Ok(Json(serde_json::json!({"status": "stopped"})))
}

pub async fn get_work_dry_run(
    State(state): State<Arc<AppState>>,
) -> Result<Json<DryRunResponse>, AppError> {
    let (root, tickets_dir) = match &state.source {
        TicketSource::Git(root, tickets_dir) => (root.clone(), tickets_dir.clone()),
        TicketSource::InMemory(_) => {
            return Ok(Json(DryRunResponse { candidates: vec![] }))
        }
    };
    let override_val = *state.max_concurrent_override.lock().await;
    let candidates = crate::util::blocking(move || {
        let config = apm_core::config::Config::load(&root)?;
        let pw = config.workflow.prioritization.priority_weight;
        let ew = config.workflow.prioritization.effort_weight;
        let rw = config.workflow.prioritization.risk_weight;
        let max_concurrent = override_val.unwrap_or_else(|| config.agents.max_concurrent.max(1));

        let startable: Vec<String> = config
            .workflow
            .states
            .iter()
            .filter(|s| s.transitions.iter().any(|tr| tr.trigger == "command:start"))
            .map(|s| s.id.clone())
            .collect();
        let actionable_owned = config.actionable_states_for("agent");

        let current_user = apm_core::config::resolve_identity(&root);
        let tickets = apm_core::ticket::load_all_from_git(&root, &tickets_dir)?;
        let mut filtered: Vec<&apm_core::ticket::Ticket> = tickets
            .iter()
            .filter(|t| {
                let st = t.frontmatter.state.as_str();
                actionable_owned.iter().any(|a| a == st)
                    && (startable.is_empty() || startable.iter().any(|s| s == st))
            })
            .collect();
        filtered.retain(|t| t.frontmatter.owner.as_deref() == Some(current_user.as_str()));
        filtered.sort_by(|a, b| {
            b.score(pw, ew, rw)
                .partial_cmp(&a.score(pw, ew, rw))
                .unwrap_or(std::cmp::Ordering::Equal)
        });
        let result: Vec<DryRunCandidate> = filtered
            .into_iter()
            .take(max_concurrent)
            .map(|t| {
                let fm = &t.frontmatter;
                let raw_score = t.score(pw, ew, rw);
                let score = (raw_score * 100.0).round() / 100.0;
                DryRunCandidate {
                    id: fm.id.clone(),
                    title: fm.title.clone(),
                    state: fm.state.clone(),
                    priority: fm.priority,
                    effort: fm.effort,
                    risk: fm.risk,
                    score,
                }
            })
            .collect();
        Ok::<_, anyhow::Error>(result)
    }).await?;
    Ok(Json(DryRunResponse { candidates }))
}

#[cfg(test)]
mod tests {
    use axum::body::Body;
    use axum::http::{Request, StatusCode};
    use http_body_util::BodyExt;
    use tower::ServiceExt;

    #[tokio::test]
    async fn work_status_stopped_when_no_engine() {
        let app = crate::build_app_in_memory_for_work();
        let response = app
            .oneshot(
                Request::builder()
                    .uri("/api/work/status")
                    .body(Body::empty())
                    .unwrap(),
            )
            .await
            .unwrap();
        assert_eq!(response.status(), StatusCode::OK);
        let bytes = response.into_body().collect().await.unwrap().to_bytes();
        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
        assert_eq!(json["status"], "stopped");
    }

    #[tokio::test]
    async fn work_stop_when_already_stopped_returns_stopped() {
        let app = crate::build_app_in_memory_for_work();
        let response = app
            .oneshot(
                Request::builder()
                    .method("POST")
                    .uri("/api/work/stop")
                    .body(Body::empty())
                    .unwrap(),
            )
            .await
            .unwrap();
        assert_eq!(response.status(), StatusCode::OK);
        let bytes = response.into_body().collect().await.unwrap().to_bytes();
        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
        assert_eq!(json["status"], "stopped");
    }

    #[tokio::test]
    async fn work_start_without_git_root_returns_stopped() {
        let app = crate::build_app_in_memory_for_work();
        let response = app
            .oneshot(
                Request::builder()
                    .method("POST")
                    .uri("/api/work/start")
                    .body(Body::empty())
                    .unwrap(),
            )
            .await
            .unwrap();
        assert_eq!(response.status(), StatusCode::OK);
        let bytes = response.into_body().collect().await.unwrap().to_bytes();
        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
        assert_eq!(json["status"], "stopped");
    }

    #[tokio::test]
    async fn work_start_with_epic_field_accepted() {
        let app = crate::build_app_in_memory_for_work();
        let response = app
            .oneshot(
                Request::builder()
                    .method("POST")
                    .uri("/api/work/start")
                    .header("content-type", "application/json")
                    .body(Body::from(r#"{"epic":"abc123"}"#))
                    .unwrap(),
            )
            .await
            .unwrap();
        assert_eq!(response.status(), StatusCode::OK);
    }

    #[tokio::test]
    async fn work_status_has_no_epic_key_when_stopped() {
        let app = crate::build_app_in_memory_for_work();
        let response = app
            .oneshot(
                Request::builder()
                    .uri("/api/work/status")
                    .body(Body::empty())
                    .unwrap(),
            )
            .await
            .unwrap();
        assert_eq!(response.status(), StatusCode::OK);
        let bytes = response.into_body().collect().await.unwrap().to_bytes();
        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
        assert_eq!(json["status"], "stopped");
        assert!(json.get("epic").is_none());
    }

    #[tokio::test]
    async fn dry_run_returns_empty_candidates_for_in_memory() {
        let app = crate::build_app_in_memory_for_work();
        let response = app
            .oneshot(
                Request::builder()
                    .uri("/api/work/dry-run")
                    .body(Body::empty())
                    .unwrap(),
            )
            .await
            .unwrap();
        assert_eq!(response.status(), StatusCode::OK);
        let bytes = response.into_body().collect().await.unwrap().to_bytes();
        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
        assert!(json["candidates"].is_array());
        assert_eq!(json["candidates"].as_array().unwrap().len(), 0);
    }
}