1use axum::{
2 extract::{Query, State},
3 routing::get,
4 Json, Router,
5};
6use chrono::Utc;
7use routa_core::events::{AgentEvent, AgentEventType};
8use routa_core::kanban::set_task_column;
9use routa_core::models::artifact::{Artifact, ArtifactType};
10use routa_core::models::kanban::KanbanBoard;
11use routa_core::models::task::TaskLaneSessionStatus;
12use serde::{Deserialize, Serialize};
13use std::collections::{BTreeMap, BTreeSet};
14
15use crate::api::tasks_automation::{
16 auto_create_worktree, resolve_codebase, trigger_assigned_task_agent,
17};
18use crate::api::tasks_github::{
19 build_task_issue_body, create_github_issue, resolve_github_repo, update_github_issue,
20};
21use crate::application::tasks::{CreateTaskCommand, TaskApplicationService, UpdateTaskCommand};
22use crate::error::ServerError;
23use crate::models::task::TaskStatus;
24use crate::state::AppState;
25
26const KANBAN_HAPPY_PATH_COLUMN_ORDER: [&str; 5] = ["backlog", "todo", "dev", "review", "done"];
27
28#[derive(Debug, Serialize)]
29#[serde(rename_all = "camelCase")]
30struct TaskArtifactSummary {
31 total: usize,
32 by_type: BTreeMap<String, usize>,
33 required_satisfied: bool,
34 missing_required: Vec<String>,
35}
36
37#[derive(Debug, Serialize)]
38#[serde(rename_all = "camelCase")]
39struct TaskVerificationSummary {
40 has_verdict: bool,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 verdict: Option<String>,
43 has_report: bool,
44}
45
46#[derive(Debug, Serialize)]
47#[serde(rename_all = "camelCase")]
48struct TaskCompletionSummary {
49 has_summary: bool,
50}
51
52#[derive(Debug, Serialize)]
53#[serde(rename_all = "camelCase")]
54struct TaskRunSummary {
55 total: usize,
56 latest_status: String,
57}
58
59#[derive(Debug, Serialize)]
60#[serde(rename_all = "camelCase")]
61struct TaskEvidenceSummary {
62 artifact: TaskArtifactSummary,
63 verification: TaskVerificationSummary,
64 completion: TaskCompletionSummary,
65 runs: TaskRunSummary,
66}
67
68#[derive(Debug, Serialize)]
69#[serde(rename_all = "camelCase")]
70struct TaskRunResumeTarget {
71 r#type: String,
72 id: String,
73}
74
75#[derive(Debug, Serialize)]
76#[serde(rename_all = "camelCase")]
77struct TaskRunLedgerEntry {
78 id: String,
79 kind: String,
80 status: String,
81 #[serde(skip_serializing_if = "Option::is_none")]
82 session_id: Option<String>,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 external_task_id: Option<String>,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 context_id: Option<String>,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 column_id: Option<String>,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 step_id: Option<String>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 step_name: Option<String>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 provider: Option<String>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 specialist_name: Option<String>,
97 started_at: String,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 completed_at: Option<String>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 owner_instance_id: Option<String>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 resume_target: Option<TaskRunResumeTarget>,
104}
105
106pub fn router() -> Router<AppState> {
107 Router::new()
108 .route(
109 "/",
110 get(list_tasks).post(create_task).delete(delete_all_tasks),
111 )
112 .route(
113 "/{id}",
114 get(get_task).patch(update_task).delete(delete_task),
115 )
116 .route(
117 "/{id}/artifacts",
118 get(list_task_artifacts).post(create_task_artifact),
119 )
120 .route("/{id}/runs", get(list_task_runs))
121 .route("/{id}/status", axum::routing::post(update_task_status))
122 .route("/ready", get(find_ready_tasks))
123}
124
125async fn emit_kanban_workspace_event(
126 state: &AppState,
127 workspace_id: &str,
128 entity: &str,
129 action: &str,
130 resource_id: Option<&str>,
131 source: &str,
132) {
133 state
134 .event_bus
135 .emit(AgentEvent {
136 event_type: AgentEventType::WorkspaceUpdated,
137 agent_id: format!("kanban-{}", source),
138 workspace_id: workspace_id.to_string(),
139 data: serde_json::json!({
140 "scope": "kanban",
141 "entity": entity,
142 "action": action,
143 "resourceId": resource_id,
144 "source": source,
145 }),
146 timestamp: Utc::now(),
147 })
148 .await;
149}
150
151#[derive(Debug, Deserialize)]
152#[serde(rename_all = "camelCase")]
153struct CreateTaskArtifactRequest {
154 agent_id: Option<String>,
155 #[serde(rename = "type")]
156 artifact_type: Option<String>,
157 content: Option<String>,
158 context: Option<String>,
159 request_id: Option<String>,
160 metadata: Option<BTreeMap<String, String>>,
161}
162
163async fn list_task_artifacts(
164 State(state): State<AppState>,
165 axum::extract::Path(id): axum::extract::Path<String>,
166) -> Result<Json<serde_json::Value>, ServerError> {
167 let task = state
168 .task_store
169 .get(&id)
170 .await?
171 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
172
173 let artifacts = state.artifact_store.list_by_task(&task.id).await?;
174
175 Ok(Json(serde_json::json!({
176 "artifacts": artifacts,
177 })))
178}
179
180async fn list_task_runs(
181 State(state): State<AppState>,
182 axum::extract::Path(id): axum::extract::Path<String>,
183) -> Result<Json<serde_json::Value>, ServerError> {
184 let task = state
185 .task_store
186 .get(&id)
187 .await?
188 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
189
190 Ok(Json(serde_json::json!({
191 "runs": build_task_run_ledger(&state, &task).await?
192 })))
193}
194
195async fn create_task_artifact(
196 State(state): State<AppState>,
197 axum::extract::Path(id): axum::extract::Path<String>,
198 Json(body): Json<CreateTaskArtifactRequest>,
199) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
200 let task = state
201 .task_store
202 .get(&id)
203 .await?
204 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
205
206 let artifact_type = body
207 .artifact_type
208 .as_deref()
209 .map(str::trim)
210 .filter(|value| !value.is_empty())
211 .ok_or_else(|| ServerError::BadRequest("A valid artifact type is required".to_string()))?;
212 let artifact_type = ArtifactType::from_str(artifact_type)
213 .ok_or_else(|| ServerError::BadRequest("A valid artifact type is required".to_string()))?;
214
215 let agent_id = body
216 .agent_id
217 .as_deref()
218 .map(str::trim)
219 .filter(|value| !value.is_empty())
220 .ok_or_else(|| {
221 ServerError::BadRequest("agentId is required for agent artifact submission".to_string())
222 })?;
223
224 let content = body
225 .content
226 .as_deref()
227 .map(str::trim)
228 .filter(|value| !value.is_empty())
229 .ok_or_else(|| ServerError::BadRequest("Artifact content is required".to_string()))?;
230
231 let now = Utc::now();
232 let artifact = Artifact {
233 id: uuid::Uuid::new_v4().to_string(),
234 artifact_type,
235 task_id: task.id.clone(),
236 workspace_id: task.workspace_id.clone(),
237 provided_by_agent_id: Some(agent_id.to_string()),
238 requested_by_agent_id: None,
239 request_id: body
240 .request_id
241 .as_deref()
242 .map(str::trim)
243 .filter(|value| !value.is_empty())
244 .map(str::to_string),
245 content: Some(content.to_string()),
246 context: body
247 .context
248 .as_deref()
249 .map(str::trim)
250 .filter(|value| !value.is_empty())
251 .map(str::to_string),
252 status: routa_core::models::artifact::ArtifactStatus::Provided,
253 expires_at: None,
254 metadata: body.metadata,
255 created_at: now,
256 updated_at: now,
257 };
258 state.artifact_store.save(&artifact).await?;
259 emit_kanban_workspace_event(
260 &state,
261 &task.workspace_id,
262 "task",
263 "updated",
264 Some(&task.id),
265 "agent",
266 )
267 .await;
268
269 Ok((
270 axum::http::StatusCode::CREATED,
271 Json(serde_json::json!({ "artifact": artifact })),
272 ))
273}
274
275#[derive(Debug, Deserialize)]
276#[serde(rename_all = "camelCase")]
277struct ListTasksQuery {
278 workspace_id: Option<String>,
279 session_id: Option<String>,
280 status: Option<String>,
281 assigned_to: Option<String>,
282}
283
284async fn list_tasks(
285 State(state): State<AppState>,
286 Query(query): Query<ListTasksQuery>,
287) -> Result<Json<serde_json::Value>, ServerError> {
288 let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
289
290 let tasks = if let Some(session_id) = &query.session_id {
291 state.task_store.list_by_session(session_id).await?
293 } else if let Some(assignee) = &query.assigned_to {
294 state.task_store.list_by_assignee(assignee).await?
295 } else if let Some(status_str) = &query.status {
296 let status = TaskStatus::from_str(status_str)
297 .ok_or_else(|| ServerError::BadRequest(format!("Invalid status: {}", status_str)))?;
298 state
299 .task_store
300 .list_by_status(workspace_id, &status)
301 .await?
302 } else {
303 state.task_store.list_by_workspace(workspace_id).await?
304 };
305
306 let mut serialized_tasks = Vec::with_capacity(tasks.len());
307 for task in &tasks {
308 serialized_tasks.push(serialize_task_with_evidence(&state, task).await?);
309 }
310
311 Ok(Json(serde_json::json!({ "tasks": serialized_tasks })))
312}
313
314async fn get_task(
315 State(state): State<AppState>,
316 axum::extract::Path(id): axum::extract::Path<String>,
317) -> Result<Json<serde_json::Value>, ServerError> {
318 let task = state
319 .task_store
320 .get(&id)
321 .await?
322 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
323
324 Ok(Json(serde_json::json!({
325 "task": serialize_task_with_evidence(&state, &task).await?
326 })))
327}
328
329#[derive(Debug, Deserialize)]
330#[serde(rename_all = "camelCase")]
331struct CreateTaskRequest {
332 title: String,
333 objective: String,
334 workspace_id: Option<String>,
335 session_id: Option<String>,
336 scope: Option<String>,
337 acceptance_criteria: Option<Vec<String>>,
338 verification_commands: Option<Vec<String>>,
339 test_cases: Option<Vec<String>>,
340 dependencies: Option<Vec<String>>,
341 parallel_group: Option<String>,
342 board_id: Option<String>,
343 column_id: Option<String>,
344 position: Option<i64>,
345 priority: Option<String>,
346 labels: Option<Vec<String>>,
347 assignee: Option<String>,
348 assigned_provider: Option<String>,
349 assigned_role: Option<String>,
350 assigned_specialist_id: Option<String>,
351 assigned_specialist_name: Option<String>,
352 create_github_issue: Option<bool>,
353 repo_path: Option<String>,
354}
355
356async fn create_task(
357 State(state): State<AppState>,
358 Json(body): Json<CreateTaskRequest>,
359) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
360 let service = TaskApplicationService::new(state.clone());
361 let plan = service.create_task(create_task_command(body)).await?;
362 let mut task = plan.task;
363 let codebase = resolve_codebase(&state, &task.workspace_id, plan.repo_path.as_deref()).await?;
364
365 if plan.create_github_issue {
366 match resolve_github_repo(codebase.as_ref().map(|item| item.repo_path.as_str())) {
367 Some(repo) => match create_github_issue(
368 &repo,
369 &task.title,
370 Some(&build_task_issue_body(
371 &task.objective,
372 task.test_cases.as_ref(),
373 )),
374 &task.labels,
375 task.assignee.as_deref(),
376 )
377 .await
378 {
379 Ok(issue) => {
380 task.github_id = Some(issue.id);
381 task.github_number = Some(issue.number);
382 task.github_url = Some(issue.url);
383 task.github_repo = Some(issue.repo);
384 task.github_state = Some(issue.state);
385 task.github_synced_at = Some(Utc::now());
386 task.last_sync_error = None;
387 }
388 Err(error) => {
389 task.last_sync_error = Some(error);
390 }
391 },
392 None => {
393 task.last_sync_error =
394 Some("Selected codebase is not linked to a GitHub repository.".to_string());
395 }
396 }
397 }
398
399 if plan.should_trigger_agent {
400 if plan.entering_dev {
401 if let (Some(ref cb), None) = (&codebase, &task.worktree_id) {
402 match auto_create_worktree(&state, &task, cb).await {
403 Ok(worktree_id) => {
404 task.worktree_id = Some(worktree_id);
405 }
406 Err(err) => {
407 set_task_column(&mut task, "blocked");
408 task.last_sync_error = Some(format!("Worktree creation failed: {}", err));
409 }
410 }
411 }
412 }
413
414 let trigger_result = trigger_assigned_task_agent(
415 &state,
416 &mut task,
417 codebase.as_ref().map(|item| item.repo_path.as_str()),
418 codebase.as_ref().and_then(|item| item.branch.as_deref()),
419 )
420 .await;
421
422 match trigger_result {
423 Ok(()) => {
424 task.last_sync_error = None;
425 }
426 Err(error) => {
427 task.last_sync_error = Some(error);
428 }
429 }
430 }
431
432 tracing::info!(
433 target: "routa_task_api",
434 task_id = %task.id,
435 column_id = ?task.column_id,
436 trigger_session_id = ?task.trigger_session_id,
437 assigned_provider = ?task.assigned_provider,
438 assigned_role = ?task.assigned_role,
439 status = %task.status.as_str(),
440 "api.tasks.update_task before save"
441 );
442 state.task_store.save(&task).await?;
443 emit_kanban_workspace_event(
444 &state,
445 &task.workspace_id,
446 "task",
447 "created",
448 Some(&task.id),
449 "user",
450 )
451 .await;
452 Ok((
453 axum::http::StatusCode::CREATED,
454 Json(serde_json::json!({
455 "task": serialize_task_with_evidence(&state, &task).await?
456 })),
457 ))
458}
459
460#[derive(Debug, Deserialize, Default)]
461#[serde(rename_all = "camelCase")]
462struct UpdateTaskRequest {
463 title: Option<String>,
464 objective: Option<String>,
465 scope: Option<String>,
466 acceptance_criteria: Option<Vec<String>>,
467 verification_commands: Option<Vec<String>>,
468 test_cases: Option<Vec<String>>,
469 assigned_to: Option<String>,
470 status: Option<String>,
471 board_id: Option<String>,
472 column_id: Option<String>,
473 position: Option<i64>,
474 priority: Option<String>,
475 labels: Option<Vec<String>>,
476 assignee: Option<String>,
477 assigned_provider: Option<String>,
478 assigned_role: Option<String>,
479 assigned_specialist_id: Option<String>,
480 assigned_specialist_name: Option<String>,
481 trigger_session_id: Option<String>,
482 github_id: Option<String>,
483 github_number: Option<i64>,
484 github_url: Option<String>,
485 github_repo: Option<String>,
486 github_state: Option<String>,
487 last_sync_error: Option<String>,
488 dependencies: Option<Vec<String>>,
489 parallel_group: Option<String>,
490 completion_summary: Option<String>,
491 verification_report: Option<String>,
492 sync_to_github: Option<bool>,
493 retry_trigger: Option<bool>,
494 repo_path: Option<String>,
495 codebase_ids: Option<Vec<String>>,
496 worktree_id: Option<serde_json::Value>,
497}
498
499fn create_task_command(body: CreateTaskRequest) -> CreateTaskCommand {
500 CreateTaskCommand {
501 title: body.title,
502 objective: body.objective,
503 workspace_id: body.workspace_id,
504 session_id: body.session_id,
505 scope: body.scope,
506 acceptance_criteria: body.acceptance_criteria,
507 verification_commands: body.verification_commands,
508 test_cases: body.test_cases,
509 dependencies: body.dependencies,
510 parallel_group: body.parallel_group,
511 board_id: body.board_id,
512 column_id: body.column_id,
513 position: body.position,
514 priority: body.priority,
515 labels: body.labels,
516 assignee: body.assignee,
517 assigned_provider: body.assigned_provider,
518 assigned_role: body.assigned_role,
519 assigned_specialist_id: body.assigned_specialist_id,
520 assigned_specialist_name: body.assigned_specialist_name,
521 create_github_issue: body.create_github_issue,
522 repo_path: body.repo_path,
523 }
524}
525
526fn update_task_command(body: UpdateTaskRequest) -> UpdateTaskCommand {
527 UpdateTaskCommand {
528 title: body.title,
529 objective: body.objective,
530 scope: body.scope,
531 acceptance_criteria: body.acceptance_criteria,
532 verification_commands: body.verification_commands,
533 test_cases: body.test_cases,
534 assigned_to: body.assigned_to,
535 status: body.status,
536 board_id: body.board_id,
537 column_id: body.column_id,
538 position: body.position,
539 priority: body.priority,
540 labels: body.labels,
541 assignee: body.assignee,
542 assigned_provider: body.assigned_provider,
543 assigned_role: body.assigned_role,
544 assigned_specialist_id: body.assigned_specialist_id,
545 assigned_specialist_name: body.assigned_specialist_name,
546 trigger_session_id: body.trigger_session_id,
547 github_id: body.github_id,
548 github_number: body.github_number,
549 github_url: body.github_url,
550 github_repo: body.github_repo,
551 github_state: body.github_state,
552 last_sync_error: body.last_sync_error,
553 dependencies: body.dependencies,
554 parallel_group: body.parallel_group,
555 completion_summary: body.completion_summary,
556 verification_report: body.verification_report,
557 sync_to_github: body.sync_to_github,
558 retry_trigger: body.retry_trigger,
559 repo_path: body.repo_path,
560 codebase_ids: body.codebase_ids,
561 worktree_id: body.worktree_id,
562 }
563}
564
565async fn update_task(
566 State(state): State<AppState>,
567 axum::extract::Path(id): axum::extract::Path<String>,
568 Json(body): Json<UpdateTaskRequest>,
569) -> Result<Json<serde_json::Value>, ServerError> {
570 ensure_transition_artifacts(&state, &id, &body).await?;
571 let service = TaskApplicationService::new(state.clone());
572 let plan = service.update_task(&id, update_task_command(body)).await?;
573 let mut task = plan.task;
574
575 if plan.should_sync_github {
576 if let (Some(repo), Some(issue_number)) = (task.github_repo.clone(), task.github_number) {
577 match update_github_issue(
578 &repo,
579 issue_number,
580 &task.title,
581 Some(&build_task_issue_body(
582 &task.objective,
583 task.test_cases.as_ref(),
584 )),
585 &task.labels,
586 if task.status == TaskStatus::Completed {
587 "closed"
588 } else {
589 "open"
590 },
591 task.assignee.as_deref(),
592 )
593 .await
594 {
595 Ok(()) => {
596 task.github_state = Some(if task.status == TaskStatus::Completed {
597 "closed".to_string()
598 } else {
599 "open".to_string()
600 });
601 task.github_synced_at = Some(Utc::now());
602 task.last_sync_error = None;
603 }
604 Err(error) => {
605 task.last_sync_error = Some(error);
606 }
607 }
608 }
609 }
610
611 if plan.should_trigger_agent {
612 let codebase = if plan.repo_path.is_some() {
613 resolve_codebase(&state, &task.workspace_id, plan.repo_path.as_deref()).await?
614 } else if let Some(first_id) = task.codebase_ids.first() {
615 state.codebase_store.get(first_id).await.ok().flatten()
616 } else {
617 resolve_codebase(&state, &task.workspace_id, None).await?
618 };
619
620 if plan.entering_dev {
622 if let (Some(ref cb), None) = (&codebase, &task.worktree_id) {
623 match auto_create_worktree(&state, &task, cb).await {
624 Ok(worktree_id) => {
625 task.worktree_id = Some(worktree_id);
626 }
627 Err(err) => {
628 set_task_column(&mut task, "blocked");
629 task.last_sync_error = Some(format!("Worktree creation failed: {}", err));
630 state.task_store.save(&task).await?;
631 emit_kanban_workspace_event(
632 &state,
633 &task.workspace_id,
634 "task",
635 "updated",
636 Some(&task.id),
637 "system",
638 )
639 .await;
640 return Ok(Json(serde_json::json!({ "task": task })));
641 }
642 }
643 }
644 }
645
646 let trigger_result = trigger_assigned_task_agent(
647 &state,
648 &mut task,
649 codebase.as_ref().map(|item| item.repo_path.as_str()),
650 codebase.as_ref().and_then(|item| item.branch.as_deref()),
651 )
652 .await;
653
654 match trigger_result {
655 Ok(()) => {
656 task.last_sync_error = None;
657 }
658 Err(error) => {
659 task.last_sync_error = Some(error);
660 }
661 }
662 }
663
664 state.task_store.save(&task).await?;
665 emit_kanban_workspace_event(
666 &state,
667 &task.workspace_id,
668 "task",
669 "updated",
670 Some(&task.id),
671 "user",
672 )
673 .await;
674 Ok(Json(serde_json::json!({
675 "task": serialize_task_with_evidence(&state, &task).await?
676 })))
677}
678
679async fn serialize_task_with_evidence(
680 state: &AppState,
681 task: &routa_core::models::task::Task,
682) -> Result<serde_json::Value, ServerError> {
683 let evidence_summary = build_task_evidence_summary(state, task).await?;
684 let mut task_value = serde_json::to_value(task)
685 .map_err(|error| ServerError::Internal(format!("Failed to serialize task: {error}")))?;
686 let task_object = task_value.as_object_mut().ok_or_else(|| {
687 ServerError::Internal("Task payload must serialize to a JSON object".to_string())
688 })?;
689 task_object.insert(
690 "artifactSummary".to_string(),
691 serde_json::to_value(&evidence_summary.artifact).map_err(|error| {
692 ServerError::Internal(format!(
693 "Failed to serialize task artifact summary: {error}"
694 ))
695 })?,
696 );
697 task_object.insert(
698 "evidenceSummary".to_string(),
699 serde_json::to_value(&evidence_summary).map_err(|error| {
700 ServerError::Internal(format!(
701 "Failed to serialize task evidence summary: {error}"
702 ))
703 })?,
704 );
705 Ok(task_value)
706}
707
708async fn build_task_run_ledger(
709 state: &AppState,
710 task: &routa_core::models::task::Task,
711) -> Result<Vec<TaskRunLedgerEntry>, ServerError> {
712 let mut lane_sessions = task.lane_sessions.clone();
713 lane_sessions.sort_by(|left, right| right.started_at.cmp(&left.started_at));
714
715 let mut runs = Vec::with_capacity(lane_sessions.len());
716 for lane_session in lane_sessions {
717 let session = state
718 .acp_session_store
719 .get(&lane_session.session_id)
720 .await?;
721 let is_a2a = lane_session.transport.as_deref() == Some("a2a");
722 let resume_target = if is_a2a {
723 lane_session
724 .external_task_id
725 .clone()
726 .map(|id| TaskRunResumeTarget {
727 r#type: "external_task".to_string(),
728 id,
729 })
730 } else {
731 Some(TaskRunResumeTarget {
732 r#type: "session".to_string(),
733 id: lane_session.session_id.clone(),
734 })
735 };
736
737 runs.push(TaskRunLedgerEntry {
738 id: lane_session.session_id.clone(),
739 kind: if is_a2a {
740 "a2a_task".to_string()
741 } else {
742 "embedded_acp".to_string()
743 },
744 status: serde_json::to_value(&lane_session.status)
745 .ok()
746 .and_then(|value| value.as_str().map(str::to_string))
747 .unwrap_or_else(|| "unknown".to_string()),
748 session_id: Some(lane_session.session_id.clone()),
749 external_task_id: lane_session.external_task_id.clone(),
750 context_id: lane_session.context_id.clone(),
751 column_id: lane_session.column_id.clone(),
752 step_id: lane_session.step_id.clone(),
753 step_name: lane_session.step_name.clone(),
754 provider: lane_session
755 .provider
756 .clone()
757 .or_else(|| session.as_ref().and_then(|row| row.provider.clone())),
758 specialist_name: lane_session.specialist_name.clone(),
759 started_at: lane_session.started_at.clone(),
760 completed_at: lane_session.completed_at.clone(),
761 owner_instance_id: None,
762 resume_target,
763 });
764 }
765
766 Ok(runs)
767}
768
769async fn build_task_evidence_summary(
770 state: &AppState,
771 task: &routa_core::models::task::Task,
772) -> Result<TaskEvidenceSummary, ServerError> {
773 let artifacts = state.artifact_store.list_by_task(&task.id).await?;
774 let mut by_type = BTreeMap::new();
775 for artifact in &artifacts {
776 let key = artifact.artifact_type.as_str().to_string();
777 *by_type.entry(key).or_insert(0) += 1;
778 }
779
780 let board = match task.board_id.as_deref() {
781 Some(board_id) => state.kanban_store.get(board_id).await?,
782 None => None,
783 };
784 let required_artifacts =
785 resolve_next_required_artifacts(board.as_ref(), task.column_id.as_deref());
786 let present_artifacts = by_type.keys().cloned().collect::<BTreeSet<_>>();
787 let missing_required = required_artifacts
788 .into_iter()
789 .filter(|artifact| !present_artifacts.contains(artifact))
790 .collect::<Vec<_>>();
791
792 let latest_status = task
793 .lane_sessions
794 .last()
795 .map(|session| task_lane_session_status_as_str(&session.status).to_string())
796 .unwrap_or_else(|| {
797 if task.session_ids.is_empty() {
798 "idle".to_string()
799 } else {
800 "unknown".to_string()
801 }
802 });
803
804 Ok(TaskEvidenceSummary {
805 artifact: TaskArtifactSummary {
806 total: artifacts.len(),
807 by_type,
808 required_satisfied: missing_required.is_empty(),
809 missing_required,
810 },
811 verification: TaskVerificationSummary {
812 has_verdict: task.verification_verdict.is_some(),
813 verdict: task
814 .verification_verdict
815 .as_ref()
816 .map(|verdict| verdict.as_str().to_string()),
817 has_report: task
818 .verification_report
819 .as_ref()
820 .is_some_and(|report| !report.trim().is_empty()),
821 },
822 completion: TaskCompletionSummary {
823 has_summary: task
824 .completion_summary
825 .as_ref()
826 .is_some_and(|summary| !summary.trim().is_empty()),
827 },
828 runs: TaskRunSummary {
829 total: task.session_ids.len(),
830 latest_status,
831 },
832 })
833}
834
835fn resolve_next_required_artifacts(
836 board: Option<&KanbanBoard>,
837 current_column_id: Option<&str>,
838) -> Vec<String> {
839 let current_column_id = current_column_id.unwrap_or("backlog").to_ascii_lowercase();
840 let next_column_id = KANBAN_HAPPY_PATH_COLUMN_ORDER
841 .iter()
842 .position(|column_id| *column_id == current_column_id)
843 .and_then(|index| KANBAN_HAPPY_PATH_COLUMN_ORDER.get(index + 1))
844 .copied();
845 let Some(next_column_id) = next_column_id else {
846 return Vec::new();
847 };
848
849 board
850 .and_then(|board| {
851 board
852 .columns
853 .iter()
854 .find(|column| column.id == next_column_id)
855 })
856 .and_then(|column| column.automation.as_ref())
857 .and_then(|automation| automation.required_artifacts.clone())
858 .unwrap_or_default()
859}
860
861fn task_lane_session_status_as_str(status: &TaskLaneSessionStatus) -> &'static str {
862 match status {
863 TaskLaneSessionStatus::Running => "running",
864 TaskLaneSessionStatus::Completed => "completed",
865 TaskLaneSessionStatus::Failed => "failed",
866 TaskLaneSessionStatus::TimedOut => "timed_out",
867 TaskLaneSessionStatus::Transitioned => "transitioned",
868 }
869}
870
871async fn ensure_transition_artifacts(
872 state: &AppState,
873 task_id: &str,
874 body: &UpdateTaskRequest,
875) -> Result<(), ServerError> {
876 let Some(target_column_id) = body.column_id.as_deref() else {
877 return Ok(());
878 };
879 let existing = state
880 .task_store
881 .get(task_id)
882 .await?
883 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", task_id)))?;
884 if existing.column_id.as_deref() == Some(target_column_id) {
885 return Ok(());
886 }
887
888 let Some(board_id) = body.board_id.as_deref().or(existing.board_id.as_deref()) else {
889 return Ok(());
890 };
891 let Some(board) = state.kanban_store.get(board_id).await? else {
892 return Ok(());
893 };
894 let Some(target_column) = board
895 .columns
896 .iter()
897 .find(|column| column.id == target_column_id)
898 else {
899 return Ok(());
900 };
901 let Some(required_artifacts) = target_column
902 .automation
903 .as_ref()
904 .and_then(|automation| automation.required_artifacts.as_ref())
905 else {
906 return Ok(());
907 };
908
909 let mut missing_artifacts = Vec::new();
910 for artifact_name in required_artifacts {
911 let artifact_type = ArtifactType::from_str(artifact_name).ok_or_else(|| {
912 ServerError::BadRequest(format!(
913 "Invalid required artifact type configured on column {}: {}",
914 target_column.id, artifact_name
915 ))
916 })?;
917 let artifacts = state
918 .artifact_store
919 .list_by_task_and_type(task_id, &artifact_type)
920 .await?;
921 if artifacts.is_empty() {
922 missing_artifacts.push(artifact_name.clone());
923 }
924 }
925
926 if missing_artifacts.is_empty() {
927 return Ok(());
928 }
929
930 Err(ServerError::BadRequest(format!(
931 "Cannot move task to \"{}\": missing required artifacts: {}. Please provide these artifacts before moving the task.",
932 target_column.name,
933 missing_artifacts.join(", ")
934 )))
935}
936
937async fn delete_task(
938 State(state): State<AppState>,
939 axum::extract::Path(id): axum::extract::Path<String>,
940) -> Result<Json<serde_json::Value>, ServerError> {
941 let task = state
942 .task_store
943 .get(&id)
944 .await?
945 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
946 state.task_store.delete(&id).await?;
947 emit_kanban_workspace_event(
948 &state,
949 &task.workspace_id,
950 "task",
951 "deleted",
952 Some(&id),
953 "user",
954 )
955 .await;
956 Ok(Json(serde_json::json!({ "deleted": true })))
957}
958
959#[derive(Debug, Deserialize)]
960struct UpdateStatusRequest {
961 status: String,
962}
963
964async fn update_task_status(
965 State(state): State<AppState>,
966 axum::extract::Path(id): axum::extract::Path<String>,
967 Json(body): Json<UpdateStatusRequest>,
968) -> Result<Json<serde_json::Value>, ServerError> {
969 let status = TaskStatus::from_str(&body.status)
970 .ok_or_else(|| ServerError::BadRequest(format!("Invalid status: {}", body.status)))?;
971 let task = state
972 .task_store
973 .get(&id)
974 .await?
975 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
976 state.task_store.update_status(&id, &status).await?;
977 emit_kanban_workspace_event(
978 &state,
979 &task.workspace_id,
980 "task",
981 "updated",
982 Some(&id),
983 "user",
984 )
985 .await;
986 Ok(Json(serde_json::json!({ "updated": true })))
987}
988
989async fn find_ready_tasks(
990 State(state): State<AppState>,
991 Query(query): Query<ListTasksQuery>,
992) -> Result<Json<serde_json::Value>, ServerError> {
993 let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
994 let tasks = state.task_store.find_ready_tasks(workspace_id).await?;
995 let mut serialized_tasks = Vec::with_capacity(tasks.len());
996 for task in &tasks {
997 serialized_tasks.push(serialize_task_with_evidence(&state, task).await?);
998 }
999 Ok(Json(serde_json::json!({ "tasks": serialized_tasks })))
1000}
1001
1002async fn delete_all_tasks(
1004 State(state): State<AppState>,
1005 Query(query): Query<ListTasksQuery>,
1006) -> Result<Json<serde_json::Value>, ServerError> {
1007 let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
1008 let tasks = state.task_store.list_by_workspace(workspace_id).await?;
1009 let count = tasks.len();
1010 for task in &tasks {
1011 state.task_store.delete(&task.id).await?;
1012 }
1013 if count > 0 {
1014 emit_kanban_workspace_event(&state, workspace_id, "task", "deleted", None, "user").await;
1015 }
1016 Ok(Json(serde_json::json!({ "deleted": count })))
1017}