Skip to main content

routa_server/api/
tasks.rs

1use axum::{
2    extract::{Query, State},
3    routing::get,
4    Json, Router,
5};
6use chrono::Utc;
7use routa_core::events::{AgentEvent, AgentEventType};
8use routa_core::kanban::set_task_column;
9use routa_core::models::artifact::{Artifact, ArtifactType};
10use serde::Deserialize;
11use std::collections::BTreeMap;
12
13use crate::api::tasks_automation::{
14    auto_create_worktree, resolve_codebase, trigger_assigned_task_agent,
15};
16use crate::api::tasks_github::{
17    build_task_issue_body, create_github_issue, resolve_github_repo, update_github_issue,
18};
19use crate::application::tasks::{CreateTaskCommand, TaskApplicationService, UpdateTaskCommand};
20use crate::error::ServerError;
21use crate::models::task::TaskStatus;
22use crate::state::AppState;
23
24pub fn router() -> Router<AppState> {
25    Router::new()
26        .route(
27            "/",
28            get(list_tasks).post(create_task).delete(delete_all_tasks),
29        )
30        .route(
31            "/{id}",
32            get(get_task).patch(update_task).delete(delete_task),
33        )
34        .route(
35            "/{id}/artifacts",
36            get(list_task_artifacts).post(create_task_artifact),
37        )
38        .route("/{id}/status", axum::routing::post(update_task_status))
39        .route("/ready", get(find_ready_tasks))
40}
41
42async fn emit_kanban_workspace_event(
43    state: &AppState,
44    workspace_id: &str,
45    entity: &str,
46    action: &str,
47    resource_id: Option<&str>,
48    source: &str,
49) {
50    state
51        .event_bus
52        .emit(AgentEvent {
53            event_type: AgentEventType::WorkspaceUpdated,
54            agent_id: format!("kanban-{}", source),
55            workspace_id: workspace_id.to_string(),
56            data: serde_json::json!({
57                "scope": "kanban",
58                "entity": entity,
59                "action": action,
60                "resourceId": resource_id,
61                "source": source,
62            }),
63            timestamp: Utc::now(),
64        })
65        .await;
66}
67
68#[derive(Debug, Deserialize)]
69#[serde(rename_all = "camelCase")]
70struct CreateTaskArtifactRequest {
71    agent_id: Option<String>,
72    #[serde(rename = "type")]
73    artifact_type: Option<String>,
74    content: Option<String>,
75    context: Option<String>,
76    request_id: Option<String>,
77    metadata: Option<BTreeMap<String, String>>,
78}
79
80async fn list_task_artifacts(
81    State(state): State<AppState>,
82    axum::extract::Path(id): axum::extract::Path<String>,
83) -> Result<Json<serde_json::Value>, ServerError> {
84    let task = state
85        .task_store
86        .get(&id)
87        .await?
88        .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
89
90    let artifacts = state.artifact_store.list_by_task(&task.id).await?;
91
92    Ok(Json(serde_json::json!({
93        "artifacts": artifacts,
94    })))
95}
96
97async fn create_task_artifact(
98    State(state): State<AppState>,
99    axum::extract::Path(id): axum::extract::Path<String>,
100    Json(body): Json<CreateTaskArtifactRequest>,
101) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
102    let task = state
103        .task_store
104        .get(&id)
105        .await?
106        .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
107
108    let artifact_type = body
109        .artifact_type
110        .as_deref()
111        .map(str::trim)
112        .filter(|value| !value.is_empty())
113        .ok_or_else(|| ServerError::BadRequest("A valid artifact type is required".to_string()))?;
114    let artifact_type = ArtifactType::from_str(artifact_type)
115        .ok_or_else(|| ServerError::BadRequest("A valid artifact type is required".to_string()))?;
116
117    let agent_id = body
118        .agent_id
119        .as_deref()
120        .map(str::trim)
121        .filter(|value| !value.is_empty())
122        .ok_or_else(|| {
123            ServerError::BadRequest("agentId is required for agent artifact submission".to_string())
124        })?;
125
126    let content = body
127        .content
128        .as_deref()
129        .map(str::trim)
130        .filter(|value| !value.is_empty())
131        .ok_or_else(|| ServerError::BadRequest("Artifact content is required".to_string()))?;
132
133    let now = Utc::now();
134    let artifact = Artifact {
135        id: uuid::Uuid::new_v4().to_string(),
136        artifact_type,
137        task_id: task.id.clone(),
138        workspace_id: task.workspace_id.clone(),
139        provided_by_agent_id: Some(agent_id.to_string()),
140        requested_by_agent_id: None,
141        request_id: body
142            .request_id
143            .as_deref()
144            .map(str::trim)
145            .filter(|value| !value.is_empty())
146            .map(str::to_string),
147        content: Some(content.to_string()),
148        context: body
149            .context
150            .as_deref()
151            .map(str::trim)
152            .filter(|value| !value.is_empty())
153            .map(str::to_string),
154        status: routa_core::models::artifact::ArtifactStatus::Provided,
155        expires_at: None,
156        metadata: body.metadata,
157        created_at: now,
158        updated_at: now,
159    };
160    state.artifact_store.save(&artifact).await?;
161    emit_kanban_workspace_event(
162        &state,
163        &task.workspace_id,
164        "task",
165        "updated",
166        Some(&task.id),
167        "agent",
168    )
169    .await;
170
171    Ok((
172        axum::http::StatusCode::CREATED,
173        Json(serde_json::json!({ "artifact": artifact })),
174    ))
175}
176
177#[derive(Debug, Deserialize)]
178#[serde(rename_all = "camelCase")]
179struct ListTasksQuery {
180    workspace_id: Option<String>,
181    session_id: Option<String>,
182    status: Option<String>,
183    assigned_to: Option<String>,
184}
185
186async fn list_tasks(
187    State(state): State<AppState>,
188    Query(query): Query<ListTasksQuery>,
189) -> Result<Json<serde_json::Value>, ServerError> {
190    let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
191
192    let tasks = if let Some(session_id) = &query.session_id {
193        // Filter by session_id takes priority
194        state.task_store.list_by_session(session_id).await?
195    } else if let Some(assignee) = &query.assigned_to {
196        state.task_store.list_by_assignee(assignee).await?
197    } else if let Some(status_str) = &query.status {
198        let status = TaskStatus::from_str(status_str)
199            .ok_or_else(|| ServerError::BadRequest(format!("Invalid status: {}", status_str)))?;
200        state
201            .task_store
202            .list_by_status(workspace_id, &status)
203            .await?
204    } else {
205        state.task_store.list_by_workspace(workspace_id).await?
206    };
207
208    Ok(Json(serde_json::json!({ "tasks": tasks })))
209}
210
211async fn get_task(
212    State(state): State<AppState>,
213    axum::extract::Path(id): axum::extract::Path<String>,
214) -> Result<Json<serde_json::Value>, ServerError> {
215    let task = state
216        .task_store
217        .get(&id)
218        .await?
219        .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
220
221    Ok(Json(serde_json::json!({ "task": task })))
222}
223
224#[derive(Debug, Deserialize)]
225#[serde(rename_all = "camelCase")]
226struct CreateTaskRequest {
227    title: String,
228    objective: String,
229    workspace_id: Option<String>,
230    session_id: Option<String>,
231    scope: Option<String>,
232    acceptance_criteria: Option<Vec<String>>,
233    verification_commands: Option<Vec<String>>,
234    test_cases: Option<Vec<String>>,
235    dependencies: Option<Vec<String>>,
236    parallel_group: Option<String>,
237    board_id: Option<String>,
238    column_id: Option<String>,
239    position: Option<i64>,
240    priority: Option<String>,
241    labels: Option<Vec<String>>,
242    assignee: Option<String>,
243    assigned_provider: Option<String>,
244    assigned_role: Option<String>,
245    assigned_specialist_id: Option<String>,
246    assigned_specialist_name: Option<String>,
247    create_github_issue: Option<bool>,
248    repo_path: Option<String>,
249}
250
251async fn create_task(
252    State(state): State<AppState>,
253    Json(body): Json<CreateTaskRequest>,
254) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
255    let service = TaskApplicationService::new(state.clone());
256    let plan = service.create_task(create_task_command(body)).await?;
257    let mut task = plan.task;
258    let codebase = resolve_codebase(&state, &task.workspace_id, plan.repo_path.as_deref()).await?;
259
260    if plan.create_github_issue {
261        match resolve_github_repo(codebase.as_ref().map(|item| item.repo_path.as_str())) {
262            Some(repo) => match create_github_issue(
263                &repo,
264                &task.title,
265                Some(&build_task_issue_body(
266                    &task.objective,
267                    task.test_cases.as_ref(),
268                )),
269                &task.labels,
270                task.assignee.as_deref(),
271            )
272            .await
273            {
274                Ok(issue) => {
275                    task.github_id = Some(issue.id);
276                    task.github_number = Some(issue.number);
277                    task.github_url = Some(issue.url);
278                    task.github_repo = Some(issue.repo);
279                    task.github_state = Some(issue.state);
280                    task.github_synced_at = Some(Utc::now());
281                    task.last_sync_error = None;
282                }
283                Err(error) => {
284                    task.last_sync_error = Some(error);
285                }
286            },
287            None => {
288                task.last_sync_error =
289                    Some("Selected codebase is not linked to a GitHub repository.".to_string());
290            }
291        }
292    }
293
294    if plan.should_trigger_agent {
295        if plan.entering_dev {
296            if let (Some(ref cb), None) = (&codebase, &task.worktree_id) {
297                match auto_create_worktree(&state, &task, cb).await {
298                    Ok(worktree_id) => {
299                        task.worktree_id = Some(worktree_id);
300                    }
301                    Err(err) => {
302                        set_task_column(&mut task, "blocked");
303                        task.last_sync_error = Some(format!("Worktree creation failed: {}", err));
304                    }
305                }
306            }
307        }
308
309        let trigger_result = trigger_assigned_task_agent(
310            &state,
311            &mut task,
312            codebase.as_ref().map(|item| item.repo_path.as_str()),
313            codebase.as_ref().and_then(|item| item.branch.as_deref()),
314        )
315        .await;
316
317        match trigger_result {
318            Ok(()) => {
319                task.last_sync_error = None;
320            }
321            Err(error) => {
322                task.last_sync_error = Some(error);
323            }
324        }
325    }
326
327    tracing::info!(
328        target: "routa_task_api",
329        task_id = %task.id,
330        column_id = ?task.column_id,
331        trigger_session_id = ?task.trigger_session_id,
332        assigned_provider = ?task.assigned_provider,
333        assigned_role = ?task.assigned_role,
334        status = %task.status.as_str(),
335        "api.tasks.update_task before save"
336    );
337    state.task_store.save(&task).await?;
338    emit_kanban_workspace_event(
339        &state,
340        &task.workspace_id,
341        "task",
342        "created",
343        Some(&task.id),
344        "user",
345    )
346    .await;
347    Ok((
348        axum::http::StatusCode::CREATED,
349        Json(serde_json::json!({ "task": task })),
350    ))
351}
352
353#[derive(Debug, Deserialize, Default)]
354#[serde(rename_all = "camelCase")]
355struct UpdateTaskRequest {
356    title: Option<String>,
357    objective: Option<String>,
358    scope: Option<String>,
359    acceptance_criteria: Option<Vec<String>>,
360    verification_commands: Option<Vec<String>>,
361    test_cases: Option<Vec<String>>,
362    assigned_to: Option<String>,
363    status: Option<String>,
364    board_id: Option<String>,
365    column_id: Option<String>,
366    position: Option<i64>,
367    priority: Option<String>,
368    labels: Option<Vec<String>>,
369    assignee: Option<String>,
370    assigned_provider: Option<String>,
371    assigned_role: Option<String>,
372    assigned_specialist_id: Option<String>,
373    assigned_specialist_name: Option<String>,
374    trigger_session_id: Option<String>,
375    github_id: Option<String>,
376    github_number: Option<i64>,
377    github_url: Option<String>,
378    github_repo: Option<String>,
379    github_state: Option<String>,
380    last_sync_error: Option<String>,
381    dependencies: Option<Vec<String>>,
382    parallel_group: Option<String>,
383    completion_summary: Option<String>,
384    verification_report: Option<String>,
385    sync_to_github: Option<bool>,
386    retry_trigger: Option<bool>,
387    repo_path: Option<String>,
388    codebase_ids: Option<Vec<String>>,
389    worktree_id: Option<serde_json::Value>,
390}
391
392fn create_task_command(body: CreateTaskRequest) -> CreateTaskCommand {
393    CreateTaskCommand {
394        title: body.title,
395        objective: body.objective,
396        workspace_id: body.workspace_id,
397        session_id: body.session_id,
398        scope: body.scope,
399        acceptance_criteria: body.acceptance_criteria,
400        verification_commands: body.verification_commands,
401        test_cases: body.test_cases,
402        dependencies: body.dependencies,
403        parallel_group: body.parallel_group,
404        board_id: body.board_id,
405        column_id: body.column_id,
406        position: body.position,
407        priority: body.priority,
408        labels: body.labels,
409        assignee: body.assignee,
410        assigned_provider: body.assigned_provider,
411        assigned_role: body.assigned_role,
412        assigned_specialist_id: body.assigned_specialist_id,
413        assigned_specialist_name: body.assigned_specialist_name,
414        create_github_issue: body.create_github_issue,
415        repo_path: body.repo_path,
416    }
417}
418
419fn update_task_command(body: UpdateTaskRequest) -> UpdateTaskCommand {
420    UpdateTaskCommand {
421        title: body.title,
422        objective: body.objective,
423        scope: body.scope,
424        acceptance_criteria: body.acceptance_criteria,
425        verification_commands: body.verification_commands,
426        test_cases: body.test_cases,
427        assigned_to: body.assigned_to,
428        status: body.status,
429        board_id: body.board_id,
430        column_id: body.column_id,
431        position: body.position,
432        priority: body.priority,
433        labels: body.labels,
434        assignee: body.assignee,
435        assigned_provider: body.assigned_provider,
436        assigned_role: body.assigned_role,
437        assigned_specialist_id: body.assigned_specialist_id,
438        assigned_specialist_name: body.assigned_specialist_name,
439        trigger_session_id: body.trigger_session_id,
440        github_id: body.github_id,
441        github_number: body.github_number,
442        github_url: body.github_url,
443        github_repo: body.github_repo,
444        github_state: body.github_state,
445        last_sync_error: body.last_sync_error,
446        dependencies: body.dependencies,
447        parallel_group: body.parallel_group,
448        completion_summary: body.completion_summary,
449        verification_report: body.verification_report,
450        sync_to_github: body.sync_to_github,
451        retry_trigger: body.retry_trigger,
452        repo_path: body.repo_path,
453        codebase_ids: body.codebase_ids,
454        worktree_id: body.worktree_id,
455    }
456}
457
458async fn update_task(
459    State(state): State<AppState>,
460    axum::extract::Path(id): axum::extract::Path<String>,
461    Json(body): Json<UpdateTaskRequest>,
462) -> Result<Json<serde_json::Value>, ServerError> {
463    ensure_transition_artifacts(&state, &id, &body).await?;
464    let service = TaskApplicationService::new(state.clone());
465    let plan = service.update_task(&id, update_task_command(body)).await?;
466    let mut task = plan.task;
467
468    if plan.should_sync_github {
469        if let (Some(repo), Some(issue_number)) = (task.github_repo.clone(), task.github_number) {
470            match update_github_issue(
471                &repo,
472                issue_number,
473                &task.title,
474                Some(&build_task_issue_body(
475                    &task.objective,
476                    task.test_cases.as_ref(),
477                )),
478                &task.labels,
479                if task.status == TaskStatus::Completed {
480                    "closed"
481                } else {
482                    "open"
483                },
484                task.assignee.as_deref(),
485            )
486            .await
487            {
488                Ok(()) => {
489                    task.github_state = Some(if task.status == TaskStatus::Completed {
490                        "closed".to_string()
491                    } else {
492                        "open".to_string()
493                    });
494                    task.github_synced_at = Some(Utc::now());
495                    task.last_sync_error = None;
496                }
497                Err(error) => {
498                    task.last_sync_error = Some(error);
499                }
500            }
501        }
502    }
503
504    if plan.should_trigger_agent {
505        let codebase = if plan.repo_path.is_some() {
506            resolve_codebase(&state, &task.workspace_id, plan.repo_path.as_deref()).await?
507        } else if let Some(first_id) = task.codebase_ids.first() {
508            state.codebase_store.get(first_id).await.ok().flatten()
509        } else {
510            resolve_codebase(&state, &task.workspace_id, None).await?
511        };
512
513        // Auto-create worktree when entering dev column (mirrors Next.js behavior)
514        if plan.entering_dev {
515            if let (Some(ref cb), None) = (&codebase, &task.worktree_id) {
516                match auto_create_worktree(&state, &task, cb).await {
517                    Ok(worktree_id) => {
518                        task.worktree_id = Some(worktree_id);
519                    }
520                    Err(err) => {
521                        set_task_column(&mut task, "blocked");
522                        task.last_sync_error = Some(format!("Worktree creation failed: {}", err));
523                        state.task_store.save(&task).await?;
524                        emit_kanban_workspace_event(
525                            &state,
526                            &task.workspace_id,
527                            "task",
528                            "updated",
529                            Some(&task.id),
530                            "system",
531                        )
532                        .await;
533                        return Ok(Json(serde_json::json!({ "task": task })));
534                    }
535                }
536            }
537        }
538
539        let trigger_result = trigger_assigned_task_agent(
540            &state,
541            &mut task,
542            codebase.as_ref().map(|item| item.repo_path.as_str()),
543            codebase.as_ref().and_then(|item| item.branch.as_deref()),
544        )
545        .await;
546
547        match trigger_result {
548            Ok(()) => {
549                task.last_sync_error = None;
550            }
551            Err(error) => {
552                task.last_sync_error = Some(error);
553            }
554        }
555    }
556
557    state.task_store.save(&task).await?;
558    emit_kanban_workspace_event(
559        &state,
560        &task.workspace_id,
561        "task",
562        "updated",
563        Some(&task.id),
564        "user",
565    )
566    .await;
567    Ok(Json(serde_json::json!({ "task": task })))
568}
569
570async fn ensure_transition_artifacts(
571    state: &AppState,
572    task_id: &str,
573    body: &UpdateTaskRequest,
574) -> Result<(), ServerError> {
575    let Some(target_column_id) = body.column_id.as_deref() else {
576        return Ok(());
577    };
578    let existing = state
579        .task_store
580        .get(task_id)
581        .await?
582        .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", task_id)))?;
583    if existing.column_id.as_deref() == Some(target_column_id) {
584        return Ok(());
585    }
586
587    let Some(board_id) = body.board_id.as_deref().or(existing.board_id.as_deref()) else {
588        return Ok(());
589    };
590    let Some(board) = state.kanban_store.get(board_id).await? else {
591        return Ok(());
592    };
593    let Some(target_column) = board
594        .columns
595        .iter()
596        .find(|column| column.id == target_column_id)
597    else {
598        return Ok(());
599    };
600    let Some(required_artifacts) = target_column
601        .automation
602        .as_ref()
603        .and_then(|automation| automation.required_artifacts.as_ref())
604    else {
605        return Ok(());
606    };
607
608    let mut missing_artifacts = Vec::new();
609    for artifact_name in required_artifacts {
610        let artifact_type = ArtifactType::from_str(artifact_name).ok_or_else(|| {
611            ServerError::BadRequest(format!(
612                "Invalid required artifact type configured on column {}: {}",
613                target_column.id, artifact_name
614            ))
615        })?;
616        let artifacts = state
617            .artifact_store
618            .list_by_task_and_type(task_id, &artifact_type)
619            .await?;
620        if artifacts.is_empty() {
621            missing_artifacts.push(artifact_name.clone());
622        }
623    }
624
625    if missing_artifacts.is_empty() {
626        return Ok(());
627    }
628
629    Err(ServerError::BadRequest(format!(
630        "Cannot move task to \"{}\": missing required artifacts: {}. Please provide these artifacts before moving the task.",
631        target_column.name,
632        missing_artifacts.join(", ")
633    )))
634}
635
636async fn delete_task(
637    State(state): State<AppState>,
638    axum::extract::Path(id): axum::extract::Path<String>,
639) -> Result<Json<serde_json::Value>, ServerError> {
640    let task = state
641        .task_store
642        .get(&id)
643        .await?
644        .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
645    state.task_store.delete(&id).await?;
646    emit_kanban_workspace_event(
647        &state,
648        &task.workspace_id,
649        "task",
650        "deleted",
651        Some(&id),
652        "user",
653    )
654    .await;
655    Ok(Json(serde_json::json!({ "deleted": true })))
656}
657
658#[derive(Debug, Deserialize)]
659struct UpdateStatusRequest {
660    status: String,
661}
662
663async fn update_task_status(
664    State(state): State<AppState>,
665    axum::extract::Path(id): axum::extract::Path<String>,
666    Json(body): Json<UpdateStatusRequest>,
667) -> Result<Json<serde_json::Value>, ServerError> {
668    let status = TaskStatus::from_str(&body.status)
669        .ok_or_else(|| ServerError::BadRequest(format!("Invalid status: {}", body.status)))?;
670    let task = state
671        .task_store
672        .get(&id)
673        .await?
674        .ok_or_else(|| ServerError::NotFound(format!("Task {} not found", id)))?;
675    state.task_store.update_status(&id, &status).await?;
676    emit_kanban_workspace_event(
677        &state,
678        &task.workspace_id,
679        "task",
680        "updated",
681        Some(&id),
682        "user",
683    )
684    .await;
685    Ok(Json(serde_json::json!({ "updated": true })))
686}
687
688async fn find_ready_tasks(
689    State(state): State<AppState>,
690    Query(query): Query<ListTasksQuery>,
691) -> Result<Json<serde_json::Value>, ServerError> {
692    let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
693    let tasks = state.task_store.find_ready_tasks(workspace_id).await?;
694    Ok(Json(serde_json::json!({ "tasks": tasks })))
695}
696
697/// DELETE /api/tasks — Bulk delete all tasks for a workspace
698async fn delete_all_tasks(
699    State(state): State<AppState>,
700    Query(query): Query<ListTasksQuery>,
701) -> Result<Json<serde_json::Value>, ServerError> {
702    let workspace_id = query.workspace_id.as_deref().unwrap_or("default");
703    let tasks = state.task_store.list_by_workspace(workspace_id).await?;
704    let count = tasks.len();
705    for task in &tasks {
706        state.task_store.delete(&task.id).await?;
707    }
708    if count > 0 {
709        emit_kanban_workspace_event(&state, workspace_id, "task", "deleted", None, "user").await;
710    }
711    Ok(Json(serde_json::json!({ "deleted": count })))
712}