1use axum::{
2 extract::{Query, State},
3 routing::get,
4 Json, Router,
5};
6use chrono::Utc;
7use routa_core::events::{AgentEvent, AgentEventType};
8use routa_core::kanban::set_task_column;
9use routa_core::models::artifact::{Artifact, ArtifactType};
10use serde::Deserialize;
11use std::collections::BTreeMap;
12
13use crate::api::tasks_automation::{
14 auto_create_worktree, resolve_codebase, trigger_assigned_task_agent,
15};
16use crate::api::tasks_github::{
17 build_task_issue_body, create_github_issue, resolve_github_repo, update_github_issue,
18};
19use crate::application::tasks::{CreateTaskCommand, TaskApplicationService, UpdateTaskCommand};
20use crate::error::ServerError;
21use crate::models::task::TaskStatus;
22use crate::state::AppState;
23
24pub fn router() -> Router<AppState> {
25 Router::new()
26 .route(
27 "/",
28 get(list_tasks).post(create_task).delete(delete_all_tasks),
29 )
30 .route(
31 "/{id}",
32 get(get_task).patch(update_task).delete(delete_task),
33 )
34 .route(
35 "/{id}/artifacts",
36 get(list_task_artifacts).post(create_task_artifact),
37 )
38 .route("/{id}/status", axum::routing::post(update_task_status))
39 .route("/ready", get(find_ready_tasks))
40}
41
42async fn emit_kanban_workspace_event(
43 state: &AppState,
44 workspace_id: &str,
45 entity: &str,
46 action: &str,
47 resource_id: Option<&str>,
48 source: &str,
49) {
50 state
51 .event_bus
52 .emit(AgentEvent {
53 event_type: AgentEventType::WorkspaceUpdated,
54 agent_id: format!("kanban-{}", source),
55 workspace_id: workspace_id.to_string(),
56 data: serde_json::json!({
57 "scope": "kanban",
58 "entity": entity,
59 "action": action,
60 "resourceId": resource_id,
61 "source": source,
62 }),
63 timestamp: Utc::now(),
64 })
65 .await;
66}
67
68#[derive(Debug, Deserialize)]
69#[serde(rename_all = "camelCase")]
70struct CreateTaskArtifactRequest {
71 agent_id: Option<String>,
72 #[serde(rename = "type")]
73 artifact_type: Option<String>,
74 content: Option<String>,
75 context: Option<String>,
76 request_id: Option<String>,
77 metadata: Option<BTreeMap<String, String>>,
78}
79
80async fn list_task_artifacts(
81 State(state): State<AppState>,
82 axum::extract::Path(id): axum::extract::Path<String>,
83) -> Result<Json<serde_json::Value>, ServerError> {
84 let task = state
85 .task_store
86 .get(&id)
87 .await?
88 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
89
90 let artifacts = state.artifact_store.list_by_task(&task.id).await?;
91
92 Ok(Json(serde_json::json!({
93 "artifacts": artifacts,
94 })))
95}
96
97async fn create_task_artifact(
98 State(state): State<AppState>,
99 axum::extract::Path(id): axum::extract::Path<String>,
100 Json(body): Json<CreateTaskArtifactRequest>,
101) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
102 let task = state
103 .task_store
104 .get(&id)
105 .await?
106 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
107
108 let artifact_type = body
109 .artifact_type
110 .as_deref()
111 .map(str::trim)
112 .filter(|value| !value.is_empty())
113 .ok_or_else(|| ServerError::BadRequest("A valid artifact type is required".to_string()))?;
114 let artifact_type = ArtifactType::from_str(artifact_type)
115 .ok_or_else(|| ServerError::BadRequest("A valid artifact type is required".to_string()))?;
116
117 let agent_id = body
118 .agent_id
119 .as_deref()
120 .map(str::trim)
121 .filter(|value| !value.is_empty())
122 .ok_or_else(|| {
123 ServerError::BadRequest("agentId is required for agent artifact submission".to_string())
124 })?;
125
126 let content = body
127 .content
128 .as_deref()
129 .map(str::trim)
130 .filter(|value| !value.is_empty())
131 .ok_or_else(|| ServerError::BadRequest("Artifact content is required".to_string()))?;
132
133 let now = Utc::now();
134 let artifact = Artifact {
135 id: uuid::Uuid::new_v4().to_string(),
136 artifact_type,
137 task_id: task.id.clone(),
138 workspace_id: task.workspace_id.clone(),
139 provided_by_agent_id: Some(agent_id.to_string()),
140 requested_by_agent_id: None,
141 request_id: body
142 .request_id
143 .as_deref()
144 .map(str::trim)
145 .filter(|value| !value.is_empty())
146 .map(str::to_string),
147 content: Some(content.to_string()),
148 context: body
149 .context
150 .as_deref()
151 .map(str::trim)
152 .filter(|value| !value.is_empty())
153 .map(str::to_string),
154 status: routa_core::models::artifact::ArtifactStatus::Provided,
155 expires_at: None,
156 metadata: body.metadata,
157 created_at: now,
158 updated_at: now,
159 };
160 state.artifact_store.save(&artifact).await?;
161 emit_kanban_workspace_event(
162 &state,
163 &task.workspace_id,
164 "task",
165 "updated",
166 Some(&task.id),
167 "agent",
168 )
169 .await;
170
171 Ok((
172 axum::http::StatusCode::CREATED,
173 Json(serde_json::json!({ "artifact": artifact })),
174 ))
175}
176
177#[derive(Debug, Deserialize)]
178#[serde(rename_all = "camelCase")]
179struct ListTasksQuery {
180 workspace_id: Option<String>,
181 session_id: Option<String>,
182 status: Option<String>,
183 assigned_to: Option<String>,
184}
185
186async fn list_tasks(
187 State(state): State<AppState>,
188 Query(query): Query<ListTasksQuery>,
189) -> Result<Json<serde_json::Value>, ServerError> {
190 let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
191
192 let tasks = if let Some(session_id) = &query.session_id {
193 state.task_store.list_by_session(session_id).await?
195 } else if let Some(assignee) = &query.assigned_to {
196 state.task_store.list_by_assignee(assignee).await?
197 } else if let Some(status_str) = &query.status {
198 let status = TaskStatus::from_str(status_str)
199 .ok_or_else(|| ServerError::BadRequest(format!("Invalid status: {}", status_str)))?;
200 state
201 .task_store
202 .list_by_status(workspace_id, &status)
203 .await?
204 } else {
205 state.task_store.list_by_workspace(workspace_id).await?
206 };
207
208 Ok(Json(serde_json::json!({ "tasks": tasks })))
209}
210
211async fn get_task(
212 State(state): State<AppState>,
213 axum::extract::Path(id): axum::extract::Path<String>,
214) -> Result<Json<serde_json::Value>, ServerError> {
215 let task = state
216 .task_store
217 .get(&id)
218 .await?
219 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
220
221 Ok(Json(serde_json::json!({ "task": task })))
222}
223
224#[derive(Debug, Deserialize)]
225#[serde(rename_all = "camelCase")]
226struct CreateTaskRequest {
227 title: String,
228 objective: String,
229 workspace_id: Option<String>,
230 session_id: Option<String>,
231 scope: Option<String>,
232 acceptance_criteria: Option<Vec<String>>,
233 verification_commands: Option<Vec<String>>,
234 test_cases: Option<Vec<String>>,
235 dependencies: Option<Vec<String>>,
236 parallel_group: Option<String>,
237 board_id: Option<String>,
238 column_id: Option<String>,
239 position: Option<i64>,
240 priority: Option<String>,
241 labels: Option<Vec<String>>,
242 assignee: Option<String>,
243 assigned_provider: Option<String>,
244 assigned_role: Option<String>,
245 assigned_specialist_id: Option<String>,
246 assigned_specialist_name: Option<String>,
247 create_github_issue: Option<bool>,
248 repo_path: Option<String>,
249}
250
251async fn create_task(
252 State(state): State<AppState>,
253 Json(body): Json<CreateTaskRequest>,
254) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
255 let service = TaskApplicationService::new(state.clone());
256 let plan = service.create_task(create_task_command(body)).await?;
257 let mut task = plan.task;
258 let codebase = resolve_codebase(&state, &task.workspace_id, plan.repo_path.as_deref()).await?;
259
260 if plan.create_github_issue {
261 match resolve_github_repo(codebase.as_ref().map(|item| item.repo_path.as_str())) {
262 Some(repo) => match create_github_issue(
263 &repo,
264 &task.title,
265 Some(&build_task_issue_body(
266 &task.objective,
267 task.test_cases.as_ref(),
268 )),
269 &task.labels,
270 task.assignee.as_deref(),
271 )
272 .await
273 {
274 Ok(issue) => {
275 task.github_id = Some(issue.id);
276 task.github_number = Some(issue.number);
277 task.github_url = Some(issue.url);
278 task.github_repo = Some(issue.repo);
279 task.github_state = Some(issue.state);
280 task.github_synced_at = Some(Utc::now());
281 task.last_sync_error = None;
282 }
283 Err(error) => {
284 task.last_sync_error = Some(error);
285 }
286 },
287 None => {
288 task.last_sync_error =
289 Some("Selected codebase is not linked to a GitHub repository.".to_string());
290 }
291 }
292 }
293
294 if plan.should_trigger_agent {
295 if plan.entering_dev {
296 if let (Some(ref cb), None) = (&codebase, &task.worktree_id) {
297 match auto_create_worktree(&state, &task, cb).await {
298 Ok(worktree_id) => {
299 task.worktree_id = Some(worktree_id);
300 }
301 Err(err) => {
302 set_task_column(&mut task, "blocked");
303 task.last_sync_error = Some(format!("Worktree creation failed: {}", err));
304 }
305 }
306 }
307 }
308
309 let trigger_result = trigger_assigned_task_agent(
310 &state,
311 &mut task,
312 codebase.as_ref().map(|item| item.repo_path.as_str()),
313 codebase.as_ref().and_then(|item| item.branch.as_deref()),
314 )
315 .await;
316
317 match trigger_result {
318 Ok(()) => {
319 task.last_sync_error = None;
320 }
321 Err(error) => {
322 task.last_sync_error = Some(error);
323 }
324 }
325 }
326
327 tracing::info!(
328 target: "routa_task_api",
329 task_id = %task.id,
330 column_id = ?task.column_id,
331 trigger_session_id = ?task.trigger_session_id,
332 assigned_provider = ?task.assigned_provider,
333 assigned_role = ?task.assigned_role,
334 status = %task.status.as_str(),
335 "api.tasks.update_task before save"
336 );
337 state.task_store.save(&task).await?;
338 emit_kanban_workspace_event(
339 &state,
340 &task.workspace_id,
341 "task",
342 "created",
343 Some(&task.id),
344 "user",
345 )
346 .await;
347 Ok((
348 axum::http::StatusCode::CREATED,
349 Json(serde_json::json!({ "task": task })),
350 ))
351}
352
353#[derive(Debug, Deserialize, Default)]
354#[serde(rename_all = "camelCase")]
355struct UpdateTaskRequest {
356 title: Option<String>,
357 objective: Option<String>,
358 scope: Option<String>,
359 acceptance_criteria: Option<Vec<String>>,
360 verification_commands: Option<Vec<String>>,
361 test_cases: Option<Vec<String>>,
362 assigned_to: Option<String>,
363 status: Option<String>,
364 board_id: Option<String>,
365 column_id: Option<String>,
366 position: Option<i64>,
367 priority: Option<String>,
368 labels: Option<Vec<String>>,
369 assignee: Option<String>,
370 assigned_provider: Option<String>,
371 assigned_role: Option<String>,
372 assigned_specialist_id: Option<String>,
373 assigned_specialist_name: Option<String>,
374 trigger_session_id: Option<String>,
375 github_id: Option<String>,
376 github_number: Option<i64>,
377 github_url: Option<String>,
378 github_repo: Option<String>,
379 github_state: Option<String>,
380 last_sync_error: Option<String>,
381 dependencies: Option<Vec<String>>,
382 parallel_group: Option<String>,
383 completion_summary: Option<String>,
384 verification_report: Option<String>,
385 sync_to_github: Option<bool>,
386 retry_trigger: Option<bool>,
387 repo_path: Option<String>,
388 codebase_ids: Option<Vec<String>>,
389 worktree_id: Option<serde_json::Value>,
390}
391
392fn create_task_command(body: CreateTaskRequest) -> CreateTaskCommand {
393 CreateTaskCommand {
394 title: body.title,
395 objective: body.objective,
396 workspace_id: body.workspace_id,
397 session_id: body.session_id,
398 scope: body.scope,
399 acceptance_criteria: body.acceptance_criteria,
400 verification_commands: body.verification_commands,
401 test_cases: body.test_cases,
402 dependencies: body.dependencies,
403 parallel_group: body.parallel_group,
404 board_id: body.board_id,
405 column_id: body.column_id,
406 position: body.position,
407 priority: body.priority,
408 labels: body.labels,
409 assignee: body.assignee,
410 assigned_provider: body.assigned_provider,
411 assigned_role: body.assigned_role,
412 assigned_specialist_id: body.assigned_specialist_id,
413 assigned_specialist_name: body.assigned_specialist_name,
414 create_github_issue: body.create_github_issue,
415 repo_path: body.repo_path,
416 }
417}
418
419fn update_task_command(body: UpdateTaskRequest) -> UpdateTaskCommand {
420 UpdateTaskCommand {
421 title: body.title,
422 objective: body.objective,
423 scope: body.scope,
424 acceptance_criteria: body.acceptance_criteria,
425 verification_commands: body.verification_commands,
426 test_cases: body.test_cases,
427 assigned_to: body.assigned_to,
428 status: body.status,
429 board_id: body.board_id,
430 column_id: body.column_id,
431 position: body.position,
432 priority: body.priority,
433 labels: body.labels,
434 assignee: body.assignee,
435 assigned_provider: body.assigned_provider,
436 assigned_role: body.assigned_role,
437 assigned_specialist_id: body.assigned_specialist_id,
438 assigned_specialist_name: body.assigned_specialist_name,
439 trigger_session_id: body.trigger_session_id,
440 github_id: body.github_id,
441 github_number: body.github_number,
442 github_url: body.github_url,
443 github_repo: body.github_repo,
444 github_state: body.github_state,
445 last_sync_error: body.last_sync_error,
446 dependencies: body.dependencies,
447 parallel_group: body.parallel_group,
448 completion_summary: body.completion_summary,
449 verification_report: body.verification_report,
450 sync_to_github: body.sync_to_github,
451 retry_trigger: body.retry_trigger,
452 repo_path: body.repo_path,
453 codebase_ids: body.codebase_ids,
454 worktree_id: body.worktree_id,
455 }
456}
457
458async fn update_task(
459 State(state): State<AppState>,
460 axum::extract::Path(id): axum::extract::Path<String>,
461 Json(body): Json<UpdateTaskRequest>,
462) -> Result<Json<serde_json::Value>, ServerError> {
463 ensure_transition_artifacts(&state, &id, &body).await?;
464 let service = TaskApplicationService::new(state.clone());
465 let plan = service.update_task(&id, update_task_command(body)).await?;
466 let mut task = plan.task;
467
468 if plan.should_sync_github {
469 if let (Some(repo), Some(issue_number)) = (task.github_repo.clone(), task.github_number) {
470 match update_github_issue(
471 &repo,
472 issue_number,
473 &task.title,
474 Some(&build_task_issue_body(
475 &task.objective,
476 task.test_cases.as_ref(),
477 )),
478 &task.labels,
479 if task.status == TaskStatus::Completed {
480 "closed"
481 } else {
482 "open"
483 },
484 task.assignee.as_deref(),
485 )
486 .await
487 {
488 Ok(()) => {
489 task.github_state = Some(if task.status == TaskStatus::Completed {
490 "closed".to_string()
491 } else {
492 "open".to_string()
493 });
494 task.github_synced_at = Some(Utc::now());
495 task.last_sync_error = None;
496 }
497 Err(error) => {
498 task.last_sync_error = Some(error);
499 }
500 }
501 }
502 }
503
504 if plan.should_trigger_agent {
505 let codebase = if plan.repo_path.is_some() {
506 resolve_codebase(&state, &task.workspace_id, plan.repo_path.as_deref()).await?
507 } else if let Some(first_id) = task.codebase_ids.first() {
508 state.codebase_store.get(first_id).await.ok().flatten()
509 } else {
510 resolve_codebase(&state, &task.workspace_id, None).await?
511 };
512
513 if plan.entering_dev {
515 if let (Some(ref cb), None) = (&codebase, &task.worktree_id) {
516 match auto_create_worktree(&state, &task, cb).await {
517 Ok(worktree_id) => {
518 task.worktree_id = Some(worktree_id);
519 }
520 Err(err) => {
521 set_task_column(&mut task, "blocked");
522 task.last_sync_error = Some(format!("Worktree creation failed: {}", err));
523 state.task_store.save(&task).await?;
524 emit_kanban_workspace_event(
525 &state,
526 &task.workspace_id,
527 "task",
528 "updated",
529 Some(&task.id),
530 "system",
531 )
532 .await;
533 return Ok(Json(serde_json::json!({ "task": task })));
534 }
535 }
536 }
537 }
538
539 let trigger_result = trigger_assigned_task_agent(
540 &state,
541 &mut task,
542 codebase.as_ref().map(|item| item.repo_path.as_str()),
543 codebase.as_ref().and_then(|item| item.branch.as_deref()),
544 )
545 .await;
546
547 match trigger_result {
548 Ok(()) => {
549 task.last_sync_error = None;
550 }
551 Err(error) => {
552 task.last_sync_error = Some(error);
553 }
554 }
555 }
556
557 state.task_store.save(&task).await?;
558 emit_kanban_workspace_event(
559 &state,
560 &task.workspace_id,
561 "task",
562 "updated",
563 Some(&task.id),
564 "user",
565 )
566 .await;
567 Ok(Json(serde_json::json!({ "task": task })))
568}
569
570async fn ensure_transition_artifacts(
571 state: &AppState,
572 task_id: &str,
573 body: &UpdateTaskRequest,
574) -> Result<(), ServerError> {
575 let Some(target_column_id) = body.column_id.as_deref() else {
576 return Ok(());
577 };
578 let existing = state
579 .task_store
580 .get(task_id)
581 .await?
582 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", task_id)))?;
583 if existing.column_id.as_deref() == Some(target_column_id) {
584 return Ok(());
585 }
586
587 let Some(board_id) = body.board_id.as_deref().or(existing.board_id.as_deref()) else {
588 return Ok(());
589 };
590 let Some(board) = state.kanban_store.get(board_id).await? else {
591 return Ok(());
592 };
593 let Some(target_column) = board
594 .columns
595 .iter()
596 .find(|column| column.id == target_column_id)
597 else {
598 return Ok(());
599 };
600 let Some(required_artifacts) = target_column
601 .automation
602 .as_ref()
603 .and_then(|automation| automation.required_artifacts.as_ref())
604 else {
605 return Ok(());
606 };
607
608 let mut missing_artifacts = Vec::new();
609 for artifact_name in required_artifacts {
610 let artifact_type = ArtifactType::from_str(artifact_name).ok_or_else(|| {
611 ServerError::BadRequest(format!(
612 "Invalid required artifact type configured on column {}: {}",
613 target_column.id, artifact_name
614 ))
615 })?;
616 let artifacts = state
617 .artifact_store
618 .list_by_task_and_type(task_id, &artifact_type)
619 .await?;
620 if artifacts.is_empty() {
621 missing_artifacts.push(artifact_name.clone());
622 }
623 }
624
625 if missing_artifacts.is_empty() {
626 return Ok(());
627 }
628
629 Err(ServerError::BadRequest(format!(
630 "Cannot move task to \"{}\": missing required artifacts: {}. Please provide these artifacts before moving the task.",
631 target_column.name,
632 missing_artifacts.join(", ")
633 )))
634}
635
636async fn delete_task(
637 State(state): State<AppState>,
638 axum::extract::Path(id): axum::extract::Path<String>,
639) -> Result<Json<serde_json::Value>, ServerError> {
640 let task = state
641 .task_store
642 .get(&id)
643 .await?
644 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
645 state.task_store.delete(&id).await?;
646 emit_kanban_workspace_event(
647 &state,
648 &task.workspace_id,
649 "task",
650 "deleted",
651 Some(&id),
652 "user",
653 )
654 .await;
655 Ok(Json(serde_json::json!({ "deleted": true })))
656}
657
658#[derive(Debug, Deserialize)]
659struct UpdateStatusRequest {
660 status: String,
661}
662
663async fn update_task_status(
664 State(state): State<AppState>,
665 axum::extract::Path(id): axum::extract::Path<String>,
666 Json(body): Json<UpdateStatusRequest>,
667) -> Result<Json<serde_json::Value>, ServerError> {
668 let status = TaskStatus::from_str(&body.status)
669 .ok_or_else(|| ServerError::BadRequest(format!("Invalid status: {}", body.status)))?;
670 let task = state
671 .task_store
672 .get(&id)
673 .await?
674 .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
675 state.task_store.update_status(&id, &status).await?;
676 emit_kanban_workspace_event(
677 &state,
678 &task.workspace_id,
679 "task",
680 "updated",
681 Some(&id),
682 "user",
683 )
684 .await;
685 Ok(Json(serde_json::json!({ "updated": true })))
686}
687
688async fn find_ready_tasks(
689 State(state): State<AppState>,
690 Query(query): Query<ListTasksQuery>,
691) -> Result<Json<serde_json::Value>, ServerError> {
692 let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
693 let tasks = state.task_store.find_ready_tasks(workspace_id).await?;
694 Ok(Json(serde_json::json!({ "tasks": tasks })))
695}
696
697async fn delete_all_tasks(
699 State(state): State<AppState>,
700 Query(query): Query<ListTasksQuery>,
701) -> Result<Json<serde_json::Value>, ServerError> {
702 let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
703 let tasks = state.task_store.list_by_workspace(workspace_id).await?;
704 let count = tasks.len();
705 for task in &tasks {
706 state.task_store.delete(&task.id).await?;
707 }
708 if count > 0 {
709 emit_kanban_workspace_event(&state, workspace_id, "task", "deleted", None, "user").await;
710 }
711 Ok(Json(serde_json::json!({ "deleted": count })))
712}