forge_core_server/routes/
tasks.rs

1use std::{path::PathBuf, sync::Arc};
2
3use anyhow;
4use axum::{
5    Extension, Json, Router,
6    extract::{
7        Query, State,
8        ws::{WebSocket, WebSocketUpgrade},
9    },
10    http::StatusCode,
11    middleware::from_fn_with_state,
12    response::{IntoResponse, Json as ResponseJson},
13    routing::{get, post},
14};
15use forge_core_db::models::{
16    image::TaskImage,
17    project::Project,
18    task::{CreateTask, Task, TaskStatus, TaskWithAttemptStatus, UpdateTask},
19    task_attempt::{CreateTaskAttempt, TaskAttempt},
20};
21use forge_core_deployment::Deployment;
22use forge_core_executors::profile::ExecutorProfileId;
23use forge_core_services::services::container::{
24    ContainerService, WorktreeCleanupData, cleanup_worktrees_direct,
25};
26use forge_core_utils::response::ApiResponse;
27use futures_util::{SinkExt, StreamExt, TryStreamExt};
28use serde::{Deserialize, Serialize};
29use sqlx::Error as SqlxError;
30use ts_rs_forge::TS;
31use uuid::Uuid;
32
33use crate::{DeploymentImpl, error::ApiError, middleware::load_task_middleware};
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct TaskQuery {
37    pub project_id: Uuid,
38}
39
40/// Get kanban tasks (excludes agent tasks)
41/// Agent tasks are in their own endpoint: /projects/{id}/agents/tasks
42pub async fn get_tasks(
43    State(deployment): State<DeploymentImpl>,
44    Query(query): Query<TaskQuery>,
45) -> Result<ResponseJson<ApiResponse<Vec<TaskWithAttemptStatus>>>, ApiError> {
46    // Kanban endpoint always excludes agent tasks
47    // Agent tasks have their own dedicated endpoint
48    let tasks = get_kanban_tasks(&deployment.db().pool, query.project_id).await?;
49    Ok(ResponseJson(ApiResponse::success(tasks)))
50}
51
52/// Get kanban tasks (excludes agent tasks in forge_agents table)
53async fn get_kanban_tasks(
54    pool: &sqlx::SqlitePool,
55    project_id: Uuid,
56) -> Result<Vec<TaskWithAttemptStatus>, sqlx::Error> {
57    let query_str = r#"SELECT
58  t.id                            AS "id",
59  t.project_id                    AS "project_id",
60  t.title,
61  t.description,
62  t.status                        AS "status",
63  t.parent_task_attempt           AS "parent_task_attempt",
64  t.dev_server_id                 AS "dev_server_id",
65  t.created_at                    AS "created_at",
66  t.updated_at                    AS "updated_at",
67
68  CASE WHEN EXISTS (
69    SELECT 1
70      FROM task_attempts ta
71      JOIN execution_processes ep
72        ON ep.task_attempt_id = ta.id
73     WHERE ta.task_id       = t.id
74       AND ep.status        = 'running'
75       AND ep.run_reason IN ('setupscript','cleanupscript','codingagent')
76     LIMIT 1
77  ) THEN 1 ELSE 0 END            AS has_in_progress_attempt,
78
79  CASE WHEN (
80    SELECT ep.status
81      FROM task_attempts ta
82      JOIN execution_processes ep
83        ON ep.task_attempt_id = ta.id
84     WHERE ta.task_id       = t.id
85     AND ep.run_reason IN ('setupscript','cleanupscript','codingagent')
86     ORDER BY ep.created_at DESC
87     LIMIT 1
88  ) IN ('failed','killed') THEN 1 ELSE 0 END
89                                 AS last_attempt_failed,
90
91  ( SELECT ta.executor
92      FROM task_attempts ta
93      WHERE ta.task_id = t.id
94     ORDER BY ta.created_at DESC
95      LIMIT 1
96    )                               AS executor,
97
98  ( SELECT COUNT(*)
99      FROM task_attempts ta
100      WHERE ta.task_id = t.id
101    )                               AS attempt_count
102
103FROM tasks t
104WHERE t.project_id = ?
105  AND t.id NOT IN (SELECT task_id FROM forge_agents)
106ORDER BY t.created_at DESC"#;
107
108    let rows = sqlx::query(query_str)
109        .bind(project_id)
110        .fetch_all(pool)
111        .await?;
112
113    let mut items: Vec<TaskWithAttemptStatus> = Vec::with_capacity(rows.len());
114    for row in rows {
115        use sqlx::Row;
116
117        // Build Task directly from row (eliminates N+1 query)
118        let status_str: String = row.try_get("status")?;
119        let task = Task {
120            id: row.try_get("id")?,
121            project_id: row.try_get("project_id")?,
122            title: row.try_get("title")?,
123            description: row.try_get("description")?,
124            status: status_str.parse().unwrap_or(TaskStatus::Todo),
125            parent_task_attempt: row.try_get("parent_task_attempt").ok().flatten(),
126            dev_server_id: row.try_get("dev_server_id").ok().flatten(),
127            created_at: row.try_get("created_at")?,
128            updated_at: row.try_get("updated_at")?,
129        };
130
131        let has_in_progress_attempt = row
132            .try_get::<i64, _>("has_in_progress_attempt")
133            .map(|v| v != 0)
134            .unwrap_or(false);
135        let last_attempt_failed = row
136            .try_get::<i64, _>("last_attempt_failed")
137            .map(|v| v != 0)
138            .unwrap_or(false);
139        let executor: String = row.try_get("executor").unwrap_or_else(|_| String::new());
140        let attempt_count: i64 = row.try_get::<i64, _>("attempt_count").unwrap_or(0);
141
142        items.push(TaskWithAttemptStatus {
143            task,
144            has_in_progress_attempt,
145            has_merged_attempt: false,
146            last_attempt_failed,
147            executor,
148            attempt_count,
149        });
150    }
151
152    Ok(items)
153}
154
155/// WebSocket for kanban tasks (excludes agent tasks)
156/// Agent tasks have their own dedicated WebSocket endpoint
157pub async fn stream_tasks_ws(
158    ws: WebSocketUpgrade,
159    State(deployment): State<DeploymentImpl>,
160    Query(query): Query<TaskQuery>,
161) -> Result<impl IntoResponse, ApiError> {
162    // Verify project exists (authorization check)
163    let _project = Project::find_by_id(&deployment.db().pool, query.project_id)
164        .await?
165        .ok_or(ApiError::Database(SqlxError::RowNotFound))?;
166
167    Ok(ws.on_upgrade(move |socket| async move {
168        // Kanban WebSocket always filters out agent tasks
169        let result = handle_kanban_tasks_ws(socket, deployment, query.project_id).await;
170        if let Err(e) = result {
171            tracing::warn!("kanban tasks WS closed: {}", e);
172        }
173    }))
174}
175
176/// Handle kanban WebSocket (excludes agent tasks)
177/// Uses a cache with periodic refresh to minimize DB queries
178async fn handle_kanban_tasks_ws(
179    socket: WebSocket,
180    deployment: DeploymentImpl,
181    project_id: Uuid,
182) -> anyhow::Result<()> {
183    use std::{collections::HashSet, sync::Arc, time::Duration};
184
185    use forge_core_utils::log_msg::LogMsg;
186    use serde_json::json;
187    use tokio::sync::RwLock;
188
189    let pool = deployment.db().pool.clone();
190
191    // Batch query for all agent task IDs at initialization
192    // CRITICAL: Fail early if DB query fails - empty cache would leak agent tasks to kanban
193    let agent_task_ids: Arc<RwLock<HashSet<Uuid>>> = {
194        let agent_tasks: Vec<Uuid> = sqlx::query_scalar(
195            "SELECT task_id FROM forge_agents fa
196             INNER JOIN tasks t ON fa.task_id = t.id
197             WHERE t.project_id = ?",
198        )
199        .bind(project_id)
200        .fetch_all(&pool)
201        .await
202        .map_err(|e| {
203            tracing::error!(
204                "Critical: Failed to init agent task cache for project {}: {}",
205                project_id,
206                e
207            );
208            anyhow::anyhow!("Failed to initialize task filter: {}", e)
209        })?;
210
211        Arc::new(RwLock::new(agent_tasks.into_iter().collect()))
212    };
213
214    // Spawn background task to refresh agent task IDs periodically
215    let refresh_cache = agent_task_ids.clone();
216    let refresh_pool = pool.clone();
217    let refresh_project_id = project_id;
218    let refresh_task_handle = tokio::spawn(async move {
219        let mut interval = tokio::time::interval(Duration::from_secs(30));
220        loop {
221            interval.tick().await;
222
223            match sqlx::query_scalar::<_, Uuid>(
224                "SELECT task_id FROM forge_agents fa
225                 INNER JOIN tasks t ON fa.task_id = t.id
226                 WHERE t.project_id = ?",
227            )
228            .bind(refresh_project_id)
229            .fetch_all(&refresh_pool)
230            .await
231            {
232                Ok(tasks) => {
233                    let mut cache = refresh_cache.write().await;
234                    cache.clear();
235                    cache.extend(tasks);
236                    tracing::trace!(
237                        "Refreshed agent task cache for project {}: {} tasks",
238                        refresh_project_id,
239                        cache.len()
240                    );
241                }
242                Err(e) => {
243                    tracing::warn!(
244                        "Failed to refresh agent task cache for project {}: {}",
245                        refresh_project_id,
246                        e
247                    );
248                }
249            }
250        }
251    });
252
253    // Get the raw stream and filter out agent tasks
254    let stream = deployment
255        .events()
256        .stream_tasks_raw(project_id)
257        .await?
258        .filter_map(move |msg_result| {
259            let agent_task_ids = agent_task_ids.clone();
260            let pool = pool.clone();
261            async move {
262                match msg_result {
263                    Ok(LogMsg::JsonPatch(patch)) => {
264                        if let Some(patch_op) = patch.0.first() {
265                            // Handle direct task patches
266                            if patch_op.path().starts_with("/tasks/") {
267                                match patch_op {
268                                    json_patch::PatchOperation::Add(op) => {
269                                        if let Ok(task_with_status) =
270                                            serde_json::from_value::<TaskWithAttemptStatus>(
271                                                op.value.clone(),
272                                            )
273                                        {
274                                            let task_id = task_with_status.task.id;
275                                            // Filter by forge_agents cache OR by task status
276                                            // The status check is a backup for race conditions
277                                            if is_agent_task(&agent_task_ids, &pool, task_id).await
278                                                || task_with_status.task.status == TaskStatus::Agent
279                                            {
280                                                return None;
281                                            }
282                                            return Some(Ok(LogMsg::JsonPatch(patch)));
283                                        }
284                                    }
285                                    json_patch::PatchOperation::Replace(op) => {
286                                        if let Ok(task_with_status) =
287                                            serde_json::from_value::<TaskWithAttemptStatus>(
288                                                op.value.clone(),
289                                            )
290                                        {
291                                            let task_id = task_with_status.task.id;
292                                            // Filter by forge_agents cache OR by task status
293                                            // The status check is a backup for race conditions
294                                            if is_agent_task(&agent_task_ids, &pool, task_id).await
295                                                || task_with_status.task.status == TaskStatus::Agent
296                                            {
297                                                return None;
298                                            }
299                                            return Some(Ok(LogMsg::JsonPatch(patch)));
300                                        }
301                                    }
302                                    json_patch::PatchOperation::Remove(_) => {
303                                        return Some(Ok(LogMsg::JsonPatch(patch)));
304                                    }
305                                    _ => {}
306                                }
307                            }
308                            // Handle initial snapshot
309                            else if patch_op.path() == "/tasks"
310                                && let json_patch::PatchOperation::Replace(op) = patch_op
311                                && let Some(tasks_obj) = op.value.as_object()
312                            {
313                                let mut filtered_tasks = serde_json::Map::new();
314                                for (task_id_str, task_value) in tasks_obj {
315                                    if let Ok(task_with_status) =
316                                        serde_json::from_value::<TaskWithAttemptStatus>(
317                                            task_value.clone(),
318                                        )
319                                    {
320                                        let task_id = task_with_status.task.id;
321                                        // Filter by forge_agents cache OR by task status
322                                        // The status check is a backup for race conditions
323                                        let is_agent =
324                                            is_agent_task(&agent_task_ids, &pool, task_id).await
325                                                || task_with_status.task.status
326                                                    == TaskStatus::Agent;
327                                        if !is_agent {
328                                            filtered_tasks.insert(
329                                                task_id_str.to_string(),
330                                                task_value.clone(),
331                                            );
332                                        }
333                                    }
334                                }
335
336                                let filtered_patch = json!([{
337                                    "op": "replace",
338                                    "path": "/tasks",
339                                    "value": filtered_tasks
340                                }]);
341                                return match serde_json::from_value(filtered_patch) {
342                                    Ok(patch) => Some(Ok(LogMsg::JsonPatch(patch))),
343                                    Err(e) => {
344                                        tracing::error!(
345                                            "Failed to deserialize filtered patch: {}",
346                                            e
347                                        );
348                                        Some(Err(std::io::Error::new(
349                                            std::io::ErrorKind::InvalidData,
350                                            "Patch deserialization failed",
351                                        )))
352                                    }
353                                };
354                            }
355                        }
356                        Some(Ok(LogMsg::JsonPatch(patch)))
357                    }
358                    Ok(other) => Some(Ok(other)),
359                    Err(e) => Some(Err(e)),
360                }
361            }
362        })
363        .map_ok(|msg| msg.to_ws_message_unchecked());
364
365    futures_util::pin_mut!(stream);
366
367    let (mut sender, mut receiver) = socket.split();
368
369    tokio::spawn(async move { while let Some(Ok(_)) = receiver.next().await {} });
370
371    while let Some(item) = stream.next().await {
372        match item {
373            Ok(msg) => {
374                if sender.send(msg).await.is_err() {
375                    break;
376                }
377            }
378            Err(e) => {
379                tracing::error!("stream error: {}", e);
380                break;
381            }
382        }
383    }
384
385    refresh_task_handle.abort();
386
387    Ok(())
388}
389
390/// Check if a task is an agent task using cache with DB fallback
391async fn is_agent_task(
392    agent_task_ids: &Arc<tokio::sync::RwLock<std::collections::HashSet<Uuid>>>,
393    pool: &sqlx::SqlitePool,
394    task_id: Uuid,
395) -> bool {
396    // Check cache first
397    {
398        let cache = agent_task_ids.read().await;
399        if cache.contains(&task_id) {
400            return true;
401        }
402    }
403
404    // Fallback to DB query for tasks not in cache
405    let is_agent_db: bool =
406        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM forge_agents WHERE task_id = ?)")
407            .bind(task_id)
408            .fetch_one(pool)
409            .await
410            .map_err(|e| {
411                tracing::warn!("Agent task fallback query failed for {}: {}", task_id, e);
412                e
413            })
414            .unwrap_or(false);
415
416    // If it's an agent, update cache
417    if is_agent_db {
418        let mut cache = agent_task_ids.write().await;
419        cache.insert(task_id);
420    }
421
422    is_agent_db
423}
424
425pub async fn get_task(
426    Extension(task): Extension<Task>,
427    State(_deployment): State<DeploymentImpl>,
428) -> Result<ResponseJson<ApiResponse<Task>>, ApiError> {
429    Ok(ResponseJson(ApiResponse::success(task)))
430}
431
432pub async fn create_task(
433    State(deployment): State<DeploymentImpl>,
434    Json(payload): Json<CreateTask>,
435) -> Result<ResponseJson<ApiResponse<Task>>, ApiError> {
436    let id = Uuid::new_v4();
437
438    tracing::debug!(
439        "Creating task '{}' in project {}",
440        payload.title,
441        payload.project_id
442    );
443
444    let task = Task::create(&deployment.db().pool, &payload, id).await?;
445
446    if let Some(image_ids) = &payload.image_ids {
447        TaskImage::associate_many_dedup(&deployment.db().pool, task.id, image_ids).await?;
448    }
449
450    deployment
451        .track_if_analytics_allowed(
452            "task_created",
453            serde_json::json!({
454            "task_id": task.id.to_string(),
455            "project_id": payload.project_id,
456            "has_description": task.description.is_some(),
457            "has_images": payload.image_ids.is_some(),
458            }),
459        )
460        .await;
461
462    Ok(ResponseJson(ApiResponse::success(task)))
463}
464
465#[derive(Debug, Deserialize, TS)]
466pub struct CreateAndStartTaskRequest {
467    pub task: CreateTask,
468    pub executor_profile_id: ExecutorProfileId,
469    pub base_branch: String,
470    /// Whether to use a git worktree for isolation (default: true)
471    pub use_worktree: Option<bool>,
472}
473
474pub async fn create_task_and_start(
475    State(deployment): State<DeploymentImpl>,
476    Json(payload): Json<CreateAndStartTaskRequest>,
477) -> Result<ResponseJson<ApiResponse<TaskWithAttemptStatus>>, ApiError> {
478    let task_id = Uuid::new_v4();
479    let use_worktree = payload.use_worktree.unwrap_or(true);
480
481    // Set initial status based on use_worktree to avoid race condition with WebSocket broadcasts.
482    // Agent tasks (use_worktree: false) must be created with status 'agent' from the start,
483    // so the first WebSocket broadcast already has the correct status for filtering.
484    let initial_status = if use_worktree {
485        TaskStatus::Todo
486    } else {
487        TaskStatus::Agent
488    };
489    let task = Task::create_with_status(
490        &deployment.db().pool,
491        &payload.task,
492        task_id,
493        initial_status,
494    )
495    .await?;
496
497    if let Some(image_ids) = &payload.task.image_ids {
498        TaskImage::associate_many(&deployment.db().pool, task.id, image_ids).await?;
499    }
500
501    // If non-worktree task (e.g., agent chat), register in forge_agents to hide from kanban
502    if !use_worktree {
503        sqlx::query(
504            r#"INSERT INTO forge_agents (id, project_id, agent_type, task_id, created_at, updated_at)
505               VALUES (?, ?, 'genie_chat', ?, datetime('now'), datetime('now'))"#,
506        )
507        .bind(Uuid::new_v4().to_string())
508        .bind(task.project_id.to_string())
509        .bind(task.id.to_string())
510        .execute(&deployment.db().pool)
511        .await?;
512        // Note: Status is already set to 'agent' at task creation time above
513    }
514
515    deployment
516        .track_if_analytics_allowed(
517            "task_created",
518            serde_json::json!({
519                "task_id": task.id.to_string(),
520                "project_id": task.project_id,
521                "has_description": task.description.is_some(),
522                "has_images": payload.task.image_ids.is_some(),
523            }),
524        )
525        .await;
526
527    // Load and cache workspace-specific .genie profiles (per-workspace, thread-safe)
528    // Note: Profiles are cached in ProfileCacheManager per-workspace, NOT in global static cache
529    // This avoids race conditions when multiple projects are accessed concurrently
530    let project = task
531        .parent_project(&deployment.db().pool)
532        .await?
533        .ok_or(SqlxError::RowNotFound)?;
534    if let Ok(_cache) = deployment
535        .profile_cache()
536        .get_or_create(project.git_repo_path.clone())
537        .await
538    {
539        tracing::debug!(
540            "Cached .genie profiles for workspace: {}",
541            project.git_repo_path.display()
542        );
543        deployment
544            .profile_cache()
545            .register_project(project.id, project.git_repo_path.clone())
546            .await;
547    }
548
549    let attempt_id = Uuid::new_v4();
550    let git_branch_name = deployment
551        .container()
552        .git_branch_from_task_attempt(&attempt_id, &task.title)
553        .await;
554
555    let mut task_attempt = TaskAttempt::create(
556        &deployment.db().pool,
557        &CreateTaskAttempt {
558            executor: payload.executor_profile_id.executor,
559            base_branch: payload.base_branch,
560            branch: git_branch_name,
561        },
562        attempt_id,
563        task.id,
564    )
565    .await?;
566
567    // Store executor with variant for filtering (executor:variant format)
568    if let Some(variant) = &payload.executor_profile_id.variant {
569        let executor_with_variant = format!("{}:{}", payload.executor_profile_id.executor, variant);
570        sqlx::query(
571            "UPDATE task_attempts SET executor = ?, updated_at = datetime('now') WHERE id = ?",
572        )
573        .bind(&executor_with_variant)
574        .bind(attempt_id.to_string())
575        .execute(&deployment.db().pool)
576        .await?;
577        task_attempt.executor = executor_with_variant;
578    }
579
580    // Insert worktree config if explicitly specified (defaults to true when not present)
581    if let Some(use_worktree) = payload.use_worktree {
582        sqlx::query(
583            "INSERT INTO forge_task_attempt_config (task_attempt_id, use_worktree) VALUES (?, ?)",
584        )
585        .bind(attempt_id.to_string())
586        .bind(use_worktree)
587        .execute(&deployment.db().pool)
588        .await?;
589    }
590
591    let is_attempt_running = deployment
592        .container()
593        .start_attempt(&task_attempt, payload.executor_profile_id.clone())
594        .await
595        .inspect_err(|err| tracing::error!("Failed to start task attempt: {}", err))
596        .is_ok();
597    deployment
598        .track_if_analytics_allowed(
599            "task_attempt_started",
600            serde_json::json!({
601                "task_id": task.id.to_string(),
602                "executor": &payload.executor_profile_id.executor,
603                "variant": &payload.executor_profile_id.variant,
604                "attempt_id": task_attempt.id.to_string(),
605            }),
606        )
607        .await;
608
609    let task = Task::find_by_id(&deployment.db().pool, task.id)
610        .await?
611        .ok_or(ApiError::Database(SqlxError::RowNotFound))?;
612
613    tracing::info!("Started attempt for task {}", task.id);
614    Ok(ResponseJson(ApiResponse::success(TaskWithAttemptStatus {
615        task,
616        has_in_progress_attempt: is_attempt_running,
617        has_merged_attempt: false,
618        last_attempt_failed: false,
619        executor: task_attempt.executor,
620        attempt_count: 1, // First attempt for a newly created task
621    })))
622}
623
624pub async fn update_task(
625    Extension(existing_task): Extension<Task>,
626    State(deployment): State<DeploymentImpl>,
627    Json(payload): Json<UpdateTask>,
628) -> Result<ResponseJson<ApiResponse<Task>>, ApiError> {
629    // Use existing values if not provided in update
630    let title = payload.title.unwrap_or(existing_task.title);
631    let description = match payload.description {
632        Some(s) if s.trim().is_empty() => None, // Empty string = clear description
633        Some(s) => Some(s),                     // Non-empty string = update description
634        None => existing_task.description,      // Field omitted = keep existing
635    };
636    let status = payload.status.unwrap_or(existing_task.status);
637    let parent_task_attempt = payload
638        .parent_task_attempt
639        .or(existing_task.parent_task_attempt);
640
641    let task = Task::update(
642        &deployment.db().pool,
643        existing_task.id,
644        existing_task.project_id,
645        title,
646        description,
647        status,
648        parent_task_attempt,
649    )
650    .await?;
651
652    if let Some(image_ids) = &payload.image_ids {
653        TaskImage::delete_by_task_id(&deployment.db().pool, task.id).await?;
654        TaskImage::associate_many_dedup(&deployment.db().pool, task.id, image_ids).await?;
655    }
656
657    // Handle archive status transition
658    if status == TaskStatus::Archived && existing_task.status != TaskStatus::Archived {
659        // Task is being archived for the first time - spawn background cleanup
660        handle_task_archive(&deployment, existing_task.id);
661    }
662
663    Ok(ResponseJson(ApiResponse::success(task)))
664}
665
666pub async fn delete_task(
667    Extension(task): Extension<Task>,
668    State(deployment): State<DeploymentImpl>,
669) -> Result<(StatusCode, ResponseJson<ApiResponse<()>>), ApiError> {
670    // Validate no running execution processes
671    if deployment
672        .container()
673        .has_running_processes(task.id)
674        .await?
675    {
676        return Err(ApiError::Conflict("Task has running execution processes. Please wait for them to complete or stop them first.".to_string()));
677    }
678
679    // Gather task attempts data needed for background cleanup
680    let attempts = TaskAttempt::fetch_all(&deployment.db().pool, Some(task.id))
681        .await
682        .map_err(|e| {
683            tracing::error!("Failed to fetch task attempts for task {}: {}", task.id, e);
684            ApiError::TaskAttempt(e)
685        })?;
686
687    // Gather cleanup data before deletion
688    let project = task
689        .parent_project(&deployment.db().pool)
690        .await?
691        .ok_or_else(|| ApiError::Database(SqlxError::RowNotFound))?;
692
693    let cleanup_data: Vec<WorktreeCleanupData> = attempts
694        .iter()
695        .filter_map(|attempt| {
696            attempt
697                .container_ref
698                .as_ref()
699                .map(|worktree_path| WorktreeCleanupData {
700                    attempt_id: attempt.id,
701                    worktree_path: PathBuf::from(worktree_path),
702                    git_repo_path: Some(project.git_repo_path.clone()),
703                })
704        })
705        .collect();
706
707    // Use a transaction to ensure atomicity: either all operations succeed or all are rolled back
708    let mut tx = deployment.db().pool.begin().await?;
709
710    // Nullify parent_task_attempt for all child tasks before deletion
711    // This breaks parent-child relationships to avoid foreign key constraint violations
712    let mut total_children_affected = 0u64;
713    for attempt in &attempts {
714        let children_affected = Task::nullify_children_by_attempt_id(&mut *tx, attempt.id).await?;
715        total_children_affected += children_affected;
716    }
717
718    // Delete task from database (FK CASCADE will handle task_attempts)
719    let rows_affected = Task::delete(&mut *tx, task.id).await?;
720
721    if rows_affected == 0 {
722        return Err(ApiError::Database(SqlxError::RowNotFound));
723    }
724
725    // Commit the transaction - if this fails, all changes are rolled back
726    tx.commit().await?;
727
728    if total_children_affected > 0 {
729        tracing::info!(
730            "Nullified {} child task references before deleting task {}",
731            total_children_affected,
732            task.id
733        );
734    }
735
736    deployment
737        .track_if_analytics_allowed(
738            "task_deleted",
739            serde_json::json!({
740                "task_id": task.id.to_string(),
741                "project_id": task.project_id.to_string(),
742                "attempt_count": attempts.len(),
743            }),
744        )
745        .await;
746
747    // Spawn background worktree cleanup task
748    let task_id = task.id;
749    tokio::spawn(async move {
750        let span = tracing::info_span!("background_worktree_cleanup", task_id = %task_id);
751        let _enter = span.enter();
752
753        tracing::info!(
754            "Starting background cleanup for task {} ({} worktrees)",
755            task_id,
756            cleanup_data.len()
757        );
758
759        if let Err(e) = cleanup_worktrees_direct(&cleanup_data).await {
760            tracing::error!(
761                "Background worktree cleanup failed for task {}: {}",
762                task_id,
763                e
764            );
765        } else {
766            tracing::info!("Background cleanup completed for task {}", task_id);
767        }
768    });
769
770    // Return 202 Accepted to indicate deletion was scheduled
771    Ok((StatusCode::ACCEPTED, ResponseJson(ApiResponse::success(()))))
772}
773
774/// Handle worktree cleanup when task is archived
775fn handle_task_archive(deployment: &DeploymentImpl, task_id: Uuid) {
776    let deployment = deployment.clone();
777    tokio::spawn(async move {
778        let span = tracing::info_span!("archive_task_worktree_cleanup", task_id = %task_id);
779        let _enter = span.enter();
780
781        // Fetch task
782        let task = match Task::find_by_id(&deployment.db().pool, task_id).await {
783            Ok(Some(t)) => t,
784            _ => {
785                tracing::error!("Failed to find task {} for archive cleanup", task_id);
786                return;
787            }
788        };
789
790        // Fetch all attempts
791        let attempts = match TaskAttempt::fetch_all(&deployment.db().pool, Some(task_id)).await {
792            Ok(a) => a,
793            Err(e) => {
794                tracing::error!("Failed to fetch attempts for task {}: {}", task_id, e);
795                return;
796            }
797        };
798
799        // Fetch project for git repo path
800        let project = match task.parent_project(&deployment.db().pool).await {
801            Ok(Some(p)) => p,
802            _ => {
803                tracing::error!("Failed to find project for task {}", task_id);
804                return;
805            }
806        };
807
808        // Build cleanup data from attempts
809        let cleanup_data: Vec<WorktreeCleanupData> = attempts
810            .iter()
811            .filter_map(|attempt| {
812                attempt
813                    .container_ref
814                    .as_ref()
815                    .map(|worktree_path| WorktreeCleanupData {
816                        attempt_id: attempt.id,
817                        worktree_path: PathBuf::from(worktree_path),
818                        git_repo_path: Some(project.git_repo_path.clone()),
819                    })
820            })
821            .collect();
822
823        if cleanup_data.is_empty() {
824            tracing::debug!("No worktrees to cleanup for archived task {}", task_id);
825            return;
826        }
827
828        tracing::info!(
829            "Starting worktree cleanup for archived task {} ({} worktrees)",
830            task_id,
831            cleanup_data.len()
832        );
833
834        // Perform cleanup
835        match cleanup_worktrees_direct(&cleanup_data).await {
836            Ok(_) => {
837                // Mark worktrees as deleted in database
838                for attempt in &attempts {
839                    if let Err(e) = sqlx::query(
840                        "UPDATE task_attempts SET worktree_deleted = TRUE, updated_at = datetime('now') WHERE id = ?"
841                    )
842                    .bind(attempt.id)
843                    .execute(&deployment.db().pool)
844                    .await
845                    {
846                        tracing::error!("Failed to mark worktree_deleted for attempt {}: {}", attempt.id, e);
847                    }
848                }
849                tracing::info!("Completed worktree cleanup for archived task {}", task_id);
850            }
851            Err(e) => {
852                tracing::error!(
853                    "Failed to cleanup worktrees for archived task {}: {}",
854                    task_id,
855                    e
856                );
857            }
858        }
859    });
860}
861
862pub fn router(deployment: &DeploymentImpl) -> Router<DeploymentImpl> {
863    let task_id_router = Router::new()
864        .route("/", get(get_task).put(update_task).delete(delete_task))
865        .layer(from_fn_with_state(deployment.clone(), load_task_middleware));
866
867    let inner = Router::new()
868        .route("/", get(get_tasks).post(create_task))
869        .route("/stream/ws", get(stream_tasks_ws))
870        .route("/create-and-start", post(create_task_and_start))
871        .nest("/{task_id}", task_id_router);
872
873    // mount under /projects/:project_id/tasks
874    Router::new().nest("/tasks", inner)
875}