envoy/http/handlers/
tasks.rs1use axum::extract::{Path, Query, State};
2use axum::response::IntoResponse;
3use axum::Json;
4use serde::Deserialize;
5
6use crate::error::{EnvoyError, Result};
7use crate::http::state::SharedState;
8use crate::http::ws::broadcast_to_project;
9use crate::task::{self, TaskState};
10pub(crate) async fn propose_task(
13 State(state): State<SharedState>,
14 Json(req): Json<task::ProposeTaskRequest>,
15) -> Result<impl IntoResponse> {
16 let state_fb = state.clone();
17 let task = tokio::task::spawn_blocking(move || {
18 let engine = state_fb.engine.lock();
19 state_fb.task_store.propose(
20 engine.graph(),
21 req.project.clone(),
22 req.description,
23 req.blocked_by,
24 )
25 })
26 .await
27 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
28 broadcast_to_project(
29 &state,
30 &task.project,
31 "task_proposed",
32 &serde_json::to_value(&task).unwrap_or_default(),
33 )
34 .await;
35 Ok((axum::http::StatusCode::CREATED, Json(task)))
36}
37
38pub(crate) async fn claim_task(
39 State(state): State<SharedState>,
40 Path(task_id): Path<String>,
41 Json(req): Json<task::ClaimTaskRequest>,
42) -> Result<impl IntoResponse> {
43 let state_fb = state.clone();
44 let tid = task_id.clone();
45 let task = tokio::task::spawn_blocking(move || {
46 let engine = state_fb.engine.lock();
47 let task = state_fb
48 .task_store
49 .claim(engine.graph(), &tid, req.agent_id.clone())?;
50 let _ = state_fb
51 .audit_store
52 .log_task_claimed(engine.graph(), &tid, &req.agent_id);
53 Ok::<_, crate::error::EnvoyError>(task)
54 })
55 .await
56 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
57 broadcast_to_project(
58 &state,
59 &task.project,
60 "task_claimed",
61 &serde_json::to_value(&task).unwrap_or_default(),
62 )
63 .await;
64 Ok(Json(task))
65}
66
67pub(crate) async fn claim_next_task(
68 State(state): State<SharedState>,
69 Json(req): Json<task::ClaimNextRequest>,
70) -> Result<impl IntoResponse> {
71 let state_fb = state.clone();
72 let task = tokio::task::spawn_blocking(move || {
73 let engine = state_fb.engine.lock();
74 state_fb
75 .task_store
76 .claim_next(engine.graph(), &req.project, req.agent_id)
77 })
78 .await
79 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
80 broadcast_to_project(
81 &state,
82 &task.project,
83 "task_claimed",
84 &serde_json::to_value(&task).unwrap_or_default(),
85 )
86 .await;
87 Ok((axum::http::StatusCode::CREATED, Json(task)))
88}
89
90pub(crate) async fn update_task_state(
91 State(state): State<SharedState>,
92 Path(task_id): Path<String>,
93 Json(req): Json<task::UpdateTaskStateRequest>,
94) -> Result<impl IntoResponse> {
95 let new_state: TaskState = req.state.parse()?;
96 let is_done = new_state == TaskState::Done;
97 let state_fb = state.clone();
98 let tid = task_id.clone();
99 let (task, blocked) = tokio::task::spawn_blocking(move || {
100 let engine = state_fb.engine.lock();
101 let task = state_fb.task_store.update_state(
102 engine.graph(),
103 &tid,
104 new_state,
105 req.checkpoint,
106 None,
107 )?;
108 let blocked = if is_done {
109 state_fb.task_store.find_blocked_by(engine.graph(), &tid)?
110 } else {
111 Vec::new()
112 };
113 Ok::<_, EnvoyError>((task, blocked))
114 })
115 .await
116 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
117
118 for bt in &blocked {
120 let notify = serde_json::json!({
121 "resolved_dependency": task_id,
122 "task_id": bt.id,
123 "message": format!("Dependency {} resolved — can proceed", task_id),
124 });
125 if let Some(ref claimant) = bt.claimed_by {
126 state
127 .ws_registry
128 .send_json(claimant, "dependency_resolved", ¬ify);
129 }
130 }
131 broadcast_to_project(
132 &state,
133 &task.project,
134 "task_state_changed",
135 &serde_json::to_value(&task).unwrap_or_default(),
136 )
137 .await;
138 Ok(Json(task))
139}
140
141#[derive(Debug, Deserialize)]
142pub struct ListTasksQuery {
143 project: String,
144 #[serde(default)]
145 state: Option<String>,
146}
147
148pub(crate) async fn list_tasks(
149 State(state): State<SharedState>,
150 Query(params): Query<ListTasksQuery>,
151) -> Result<impl IntoResponse> {
152 let filter = params.state.as_deref().and_then(|s| s.parse().ok());
153 let state_fb = state.clone();
154 let project = params.project.clone();
155 let tasks = tokio::task::spawn_blocking(move || {
156 let engine = state_fb.engine.lock();
157 state_fb
158 .task_store
159 .list(engine.graph(), &project, filter.as_ref())
160 })
161 .await
162 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
163 Ok(Json(serde_json::json!({
164 "tasks": tasks,
165 "count": tasks.len(),
166 })))
167}
168
169pub(crate) async fn get_task(
170 State(state): State<SharedState>,
171 Path(task_id): Path<String>,
172) -> Result<impl IntoResponse> {
173 let state_fb = state.clone();
174 let task = tokio::task::spawn_blocking(move || {
175 let engine = state_fb.engine.lock();
176 state_fb.task_store.get(engine.graph(), &task_id)
177 })
178 .await
179 .map_err(|_| EnvoyError::InvalidEntity("blocking task join error".into()))??;
180 Ok(Json(task))
181}