Skip to main content

envoy/http/handlers/
tasks.rs

1use axum::extract::{Path, Query, State};
2use axum::response::IntoResponse;
3use axum::Json;
4use serde::Deserialize;
5
6use crate::error::{EnvoyError, Result};
7use crate::http::state::SharedState;
8use crate::http::ws::broadcast_to_project;
9use crate::task::{self, TaskState};
10// ── Task handlers ──
11
12pub(crate) async fn propose_task(
13    State(state): State<SharedState>,
14    Json(req): Json<task::ProposeTaskRequest>,
15) -> Result<impl IntoResponse> {
16    let state_fb = state.clone();
17    let task = tokio::task::spawn_blocking(move || {
18        let engine = state_fb.engine.lock();
19        state_fb.task_store.propose(
20            engine.graph(),
21            req.project.clone(),
22            req.description,
23            req.blocked_by,
24        )
25    })
26    .await
27    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
28    broadcast_to_project(
29        &state,
30        &task.project,
31        "task_proposed",
32        &serde_json::to_value(&task).unwrap_or_default(),
33    )
34    .await;
35    Ok((axum::http::StatusCode::CREATED, Json(task)))
36}
37
38pub(crate) async fn claim_task(
39    State(state): State<SharedState>,
40    Path(task_id): Path<String>,
41    Json(req): Json<task::ClaimTaskRequest>,
42) -> Result<impl IntoResponse> {
43    let state_fb = state.clone();
44    let tid = task_id.clone();
45    let task = tokio::task::spawn_blocking(move || {
46        let engine = state_fb.engine.lock();
47        let task = state_fb
48            .task_store
49            .claim(engine.graph(), &tid, req.agent_id.clone())?;
50        let _ = state_fb
51            .audit_store
52            .log_task_claimed(engine.graph(), &tid, &req.agent_id);
53        Ok::<_, crate::error::EnvoyError>(task)
54    })
55    .await
56    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
57    broadcast_to_project(
58        &state,
59        &task.project,
60        "task_claimed",
61        &serde_json::to_value(&task).unwrap_or_default(),
62    )
63    .await;
64    Ok(Json(task))
65}
66
67pub(crate) async fn claim_next_task(
68    State(state): State<SharedState>,
69    Json(req): Json<task::ClaimNextRequest>,
70) -> Result<impl IntoResponse> {
71    let state_fb = state.clone();
72    let task = tokio::task::spawn_blocking(move || {
73        let engine = state_fb.engine.lock();
74        state_fb
75            .task_store
76            .claim_next(engine.graph(), &req.project, req.agent_id)
77    })
78    .await
79    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
80    broadcast_to_project(
81        &state,
82        &task.project,
83        "task_claimed",
84        &serde_json::to_value(&task).unwrap_or_default(),
85    )
86    .await;
87    Ok((axum::http::StatusCode::CREATED, Json(task)))
88}
89
90pub(crate) async fn update_task_state(
91    State(state): State<SharedState>,
92    Path(task_id): Path<String>,
93    Json(req): Json<task::UpdateTaskStateRequest>,
94) -> Result<impl IntoResponse> {
95    let new_state: TaskState = req.state.parse()?;
96    let is_done = new_state == TaskState::Done;
97    let state_fb = state.clone();
98    let tid = task_id.clone();
99    let (task, blocked) = tokio::task::spawn_blocking(move || {
100        let engine = state_fb.engine.lock();
101        let task = state_fb.task_store.update_state(
102            engine.graph(),
103            &tid,
104            new_state,
105            req.checkpoint,
106            None,
107        )?;
108        let blocked = if is_done {
109            state_fb.task_store.find_blocked_by(engine.graph(), &tid)?
110        } else {
111            Vec::new()
112        };
113        Ok::<_, EnvoyError>((task, blocked))
114    })
115    .await
116    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
117
118    // WS notifications are in-memory
119    for bt in &blocked {
120        let notify = serde_json::json!({
121            "resolved_dependency": task_id,
122            "task_id": bt.id,
123            "message": format!("Dependency {} resolved — can proceed", task_id),
124        });
125        if let Some(ref claimant) = bt.claimed_by {
126            state
127                .ws_registry
128                .send_json(claimant, "dependency_resolved", &notify);
129        }
130    }
131    broadcast_to_project(
132        &state,
133        &task.project,
134        "task_state_changed",
135        &serde_json::to_value(&task).unwrap_or_default(),
136    )
137    .await;
138    Ok(Json(task))
139}
140
141#[derive(Debug, Deserialize)]
142pub struct ListTasksQuery {
143    project: String,
144    #[serde(default)]
145    state: Option<String>,
146}
147
148pub(crate) async fn list_tasks(
149    State(state): State<SharedState>,
150    Query(params): Query<ListTasksQuery>,
151) -> Result<impl IntoResponse> {
152    let filter = params.state.as_deref().and_then(|s| s.parse().ok());
153    let state_fb = state.clone();
154    let project = params.project.clone();
155    let tasks = tokio::task::spawn_blocking(move || {
156        let engine = state_fb.engine.lock();
157        state_fb
158            .task_store
159            .list(engine.graph(), &project, filter.as_ref())
160    })
161    .await
162    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
163    Ok(Json(serde_json::json!({
164        "tasks": tasks,
165        "count": tasks.len(),
166    })))
167}
168
169pub(crate) async fn get_task(
170    State(state): State<SharedState>,
171    Path(task_id): Path<String>,
172) -> Result<impl IntoResponse> {
173    let state_fb = state.clone();
174    let task = tokio::task::spawn_blocking(move || {
175        let engine = state_fb.engine.lock();
176        state_fb.task_store.get(engine.graph(), &task_id)
177    })
178    .await
179    .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
180    Ok(Json(task))
181}