1use std::{path::PathBuf, sync::Arc};
2
3use anyhow;
4use axum::{
5 Extension, Json, Router,
6 extract::{
7 Query, State,
8 ws::{WebSocket, WebSocketUpgrade},
9 },
10 http::StatusCode,
11 middleware::from_fn_with_state,
12 response::{IntoResponse, Json as ResponseJson},
13 routing::{get, post},
14};
15use forge_core_db::models::{
16 image::TaskImage,
17 project::Project,
18 task::{CreateTask, Task, TaskStatus, TaskWithAttemptStatus, UpdateTask},
19 task_attempt::{CreateTaskAttempt, TaskAttempt},
20};
21use forge_core_deployment::Deployment;
22use forge_core_executors::profile::ExecutorProfileId;
23use forge_core_services::services::container::{
24 ContainerService, WorktreeCleanupData, cleanup_worktrees_direct,
25};
26use forge_core_utils::response::ApiResponse;
27use futures_util::{SinkExt, StreamExt, TryStreamExt};
28use serde::{Deserialize, Serialize};
29use sqlx::Error as SqlxError;
30use ts_rs_forge::TS;
31use uuid::Uuid;
32
33use crate::{DeploymentImpl, error::ApiError, middleware::load_task_middleware};
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct TaskQuery {
37 pub project_id: Uuid,
38}
39
40pub async fn get_tasks(
43 State(deployment): State<DeploymentImpl>,
44 Query(query): Query<TaskQuery>,
45) -> Result<ResponseJson<ApiResponse<Vec<TaskWithAttemptStatus>>>, ApiError> {
46 let tasks = get_kanban_tasks(&deployment.db().pool, query.project_id).await?;
49 Ok(ResponseJson(ApiResponse::success(tasks)))
50}
51
52async fn get_kanban_tasks(
54 pool: &sqlx::SqlitePool,
55 project_id: Uuid,
56) -> Result<Vec<TaskWithAttemptStatus>, sqlx::Error> {
57 let query_str = r#"SELECT
58 t.id AS "id",
59 t.project_id AS "project_id",
60 t.title,
61 t.description,
62 t.status AS "status",
63 t.parent_task_attempt AS "parent_task_attempt",
64 t.dev_server_id AS "dev_server_id",
65 t.created_at AS "created_at",
66 t.updated_at AS "updated_at",
67
68 CASE WHEN EXISTS (
69 SELECT 1
70 FROM task_attempts ta
71 JOIN execution_processes ep
72 ON ep.task_attempt_id = ta.id
73 WHERE ta.task_id = t.id
74 AND ep.status = 'running'
75 AND ep.run_reason IN ('setupscript','cleanupscript','codingagent')
76 LIMIT 1
77 ) THEN 1 ELSE 0 END AS has_in_progress_attempt,
78
79 CASE WHEN (
80 SELECT ep.status
81 FROM task_attempts ta
82 JOIN execution_processes ep
83 ON ep.task_attempt_id = ta.id
84 WHERE ta.task_id = t.id
85 AND ep.run_reason IN ('setupscript','cleanupscript','codingagent')
86 ORDER BY ep.created_at DESC
87 LIMIT 1
88 ) IN ('failed','killed') THEN 1 ELSE 0 END
89 AS last_attempt_failed,
90
91 ( SELECT ta.executor
92 FROM task_attempts ta
93 WHERE ta.task_id = t.id
94 ORDER BY ta.created_at DESC
95 LIMIT 1
96 ) AS executor,
97
98 ( SELECT COUNT(*)
99 FROM task_attempts ta
100 WHERE ta.task_id = t.id
101 ) AS attempt_count
102
103FROM tasks t
104WHERE t.project_id = ?
105 AND t.id NOT IN (SELECT task_id FROM forge_agents)
106ORDER BY t.created_at DESC"#;
107
108 let rows = sqlx::query(query_str)
109 .bind(project_id)
110 .fetch_all(pool)
111 .await?;
112
113 let mut items: Vec<TaskWithAttemptStatus> = Vec::with_capacity(rows.len());
114 for row in rows {
115 use sqlx::Row;
116
117 let status_str: String = row.try_get("status")?;
119 let task = Task {
120 id: row.try_get("id")?,
121 project_id: row.try_get("project_id")?,
122 title: row.try_get("title")?,
123 description: row.try_get("description")?,
124 status: status_str.parse().unwrap_or(TaskStatus::Todo),
125 parent_task_attempt: row.try_get("parent_task_attempt").ok().flatten(),
126 dev_server_id: row.try_get("dev_server_id").ok().flatten(),
127 created_at: row.try_get("created_at")?,
128 updated_at: row.try_get("updated_at")?,
129 };
130
131 let has_in_progress_attempt = row
132 .try_get::<i64, _>("has_in_progress_attempt")
133 .map(|v| v != 0)
134 .unwrap_or(false);
135 let last_attempt_failed = row
136 .try_get::<i64, _>("last_attempt_failed")
137 .map(|v| v != 0)
138 .unwrap_or(false);
139 let executor: String = row.try_get("executor").unwrap_or_else(|_| String::new());
140 let attempt_count: i64 = row.try_get::<i64, _>("attempt_count").unwrap_or(0);
141
142 items.push(TaskWithAttemptStatus {
143 task,
144 has_in_progress_attempt,
145 has_merged_attempt: false,
146 last_attempt_failed,
147 executor,
148 attempt_count,
149 });
150 }
151
152 Ok(items)
153}
154
155pub async fn stream_tasks_ws(
158 ws: WebSocketUpgrade,
159 State(deployment): State<DeploymentImpl>,
160 Query(query): Query<TaskQuery>,
161) -> Result<impl IntoResponse, ApiError> {
162 let _project = Project::find_by_id(&deployment.db().pool, query.project_id)
164 .await?
165 .ok_or(ApiError::Database(SqlxError::RowNotFound))?;
166
167 Ok(ws.on_upgrade(move |socket| async move {
168 let result = handle_kanban_tasks_ws(socket, deployment, query.project_id).await;
170 if let Err(e) = result {
171 tracing::warn!("kanban tasks WS closed: {}", e);
172 }
173 }))
174}
175
176async fn handle_kanban_tasks_ws(
179 socket: WebSocket,
180 deployment: DeploymentImpl,
181 project_id: Uuid,
182) -> anyhow::Result<()> {
183 use std::{collections::HashSet, sync::Arc, time::Duration};
184
185 use forge_core_utils::log_msg::LogMsg;
186 use serde_json::json;
187 use tokio::sync::RwLock;
188
189 let pool = deployment.db().pool.clone();
190
191 let agent_task_ids: Arc<RwLock<HashSet<Uuid>>> = {
194 let agent_tasks: Vec<Uuid> = sqlx::query_scalar(
195 "SELECT task_id FROM forge_agents fa
196 INNER JOIN tasks t ON fa.task_id = t.id
197 WHERE t.project_id = ?",
198 )
199 .bind(project_id)
200 .fetch_all(&pool)
201 .await
202 .map_err(|e| {
203 tracing::error!(
204 "Critical: Failed to init agent task cache for project {}: {}",
205 project_id,
206 e
207 );
208 anyhow::anyhow!("Failed to initialize task filter: {}", e)
209 })?;
210
211 Arc::new(RwLock::new(agent_tasks.into_iter().collect()))
212 };
213
214 let refresh_cache = agent_task_ids.clone();
216 let refresh_pool = pool.clone();
217 let refresh_project_id = project_id;
218 let refresh_task_handle = tokio::spawn(async move {
219 let mut interval = tokio::time::interval(Duration::from_secs(30));
220 loop {
221 interval.tick().await;
222
223 match sqlx::query_scalar::<_, Uuid>(
224 "SELECT task_id FROM forge_agents fa
225 INNER JOIN tasks t ON fa.task_id = t.id
226 WHERE t.project_id = ?",
227 )
228 .bind(refresh_project_id)
229 .fetch_all(&refresh_pool)
230 .await
231 {
232 Ok(tasks) => {
233 let mut cache = refresh_cache.write().await;
234 cache.clear();
235 cache.extend(tasks);
236 tracing::trace!(
237 "Refreshed agent task cache for project {}: {} tasks",
238 refresh_project_id,
239 cache.len()
240 );
241 }
242 Err(e) => {
243 tracing::warn!(
244 "Failed to refresh agent task cache for project {}: {}",
245 refresh_project_id,
246 e
247 );
248 }
249 }
250 }
251 });
252
253 let stream = deployment
255 .events()
256 .stream_tasks_raw(project_id)
257 .await?
258 .filter_map(move |msg_result| {
259 let agent_task_ids = agent_task_ids.clone();
260 let pool = pool.clone();
261 async move {
262 match msg_result {
263 Ok(LogMsg::JsonPatch(patch)) => {
264 if let Some(patch_op) = patch.0.first() {
265 if patch_op.path().starts_with("/tasks/") {
267 match patch_op {
268 json_patch::PatchOperation::Add(op) => {
269 if let Ok(task_with_status) =
270 serde_json::from_value::<TaskWithAttemptStatus>(
271 op.value.clone(),
272 )
273 {
274 let task_id = task_with_status.task.id;
275 if is_agent_task(&agent_task_ids, &pool, task_id).await
278 || task_with_status.task.status == TaskStatus::Agent
279 {
280 return None;
281 }
282 return Some(Ok(LogMsg::JsonPatch(patch)));
283 }
284 }
285 json_patch::PatchOperation::Replace(op) => {
286 if let Ok(task_with_status) =
287 serde_json::from_value::<TaskWithAttemptStatus>(
288 op.value.clone(),
289 )
290 {
291 let task_id = task_with_status.task.id;
292 if is_agent_task(&agent_task_ids, &pool, task_id).await
295 || task_with_status.task.status == TaskStatus::Agent
296 {
297 return None;
298 }
299 return Some(Ok(LogMsg::JsonPatch(patch)));
300 }
301 }
302 json_patch::PatchOperation::Remove(_) => {
303 return Some(Ok(LogMsg::JsonPatch(patch)));
304 }
305 _ => {}
306 }
307 }
308 else if patch_op.path() == "/tasks"
310 && let json_patch::PatchOperation::Replace(op) = patch_op
311 && let Some(tasks_obj) = op.value.as_object()
312 {
313 let mut filtered_tasks = serde_json::Map::new();
314 for (task_id_str, task_value) in tasks_obj {
315 if let Ok(task_with_status) =
316 serde_json::from_value::<TaskWithAttemptStatus>(
317 task_value.clone(),
318 )
319 {
320 let task_id = task_with_status.task.id;
321 let is_agent =
324 is_agent_task(&agent_task_ids, &pool, task_id).await
325 || task_with_status.task.status
326 == TaskStatus::Agent;
327 if !is_agent {
328 filtered_tasks.insert(
329 task_id_str.to_string(),
330 task_value.clone(),
331 );
332 }
333 }
334 }
335
336 let filtered_patch = json!([{
337 "op": "replace",
338 "path": "/tasks",
339 "value": filtered_tasks
340 }]);
341 return match serde_json::from_value(filtered_patch) {
342 Ok(patch) => Some(Ok(LogMsg::JsonPatch(patch))),
343 Err(e) => {
344 tracing::error!(
345 "Failed to deserialize filtered patch: {}",
346 e
347 );
348 Some(Err(std::io::Error::new(
349 std::io::ErrorKind::InvalidData,
350 "Patch deserialization failed",
351 )))
352 }
353 };
354 }
355 }
356 Some(Ok(LogMsg::JsonPatch(patch)))
357 }
358 Ok(other) => Some(Ok(other)),
359 Err(e) => Some(Err(e)),
360 }
361 }
362 })
363 .map_ok(|msg| msg.to_ws_message_unchecked());
364
365 futures_util::pin_mut!(stream);
366
367 let (mut sender, mut receiver) = socket.split();
368
369 tokio::spawn(async move { while let Some(Ok(_)) = receiver.next().await {} });
370
371 while let Some(item) = stream.next().await {
372 match item {
373 Ok(msg) => {
374 if sender.send(msg).await.is_err() {
375 break;
376 }
377 }
378 Err(e) => {
379 tracing::error!("stream error: {}", e);
380 break;
381 }
382 }
383 }
384
385 refresh_task_handle.abort();
386
387 Ok(())
388}
389
390async fn is_agent_task(
392 agent_task_ids: &Arc<tokio::sync::RwLock<std::collections::HashSet<Uuid>>>,
393 pool: &sqlx::SqlitePool,
394 task_id: Uuid,
395) -> bool {
396 {
398 let cache = agent_task_ids.read().await;
399 if cache.contains(&task_id) {
400 return true;
401 }
402 }
403
404 let is_agent_db: bool =
406 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM forge_agents WHERE task_id = ?)")
407 .bind(task_id)
408 .fetch_one(pool)
409 .await
410 .map_err(|e| {
411 tracing::warn!("Agent task fallback query failed for {}: {}", task_id, e);
412 e
413 })
414 .unwrap_or(false);
415
416 if is_agent_db {
418 let mut cache = agent_task_ids.write().await;
419 cache.insert(task_id);
420 }
421
422 is_agent_db
423}
424
425pub async fn get_task(
426 Extension(task): Extension<Task>,
427 State(_deployment): State<DeploymentImpl>,
428) -> Result<ResponseJson<ApiResponse<Task>>, ApiError> {
429 Ok(ResponseJson(ApiResponse::success(task)))
430}
431
432pub async fn create_task(
433 State(deployment): State<DeploymentImpl>,
434 Json(payload): Json<CreateTask>,
435) -> Result<ResponseJson<ApiResponse<Task>>, ApiError> {
436 let id = Uuid::new_v4();
437
438 tracing::debug!(
439 "Creating task '{}' in project {}",
440 payload.title,
441 payload.project_id
442 );
443
444 let task = Task::create(&deployment.db().pool, &payload, id).await?;
445
446 if let Some(image_ids) = &payload.image_ids {
447 TaskImage::associate_many_dedup(&deployment.db().pool, task.id, image_ids).await?;
448 }
449
450 deployment
451 .track_if_analytics_allowed(
452 "task_created",
453 serde_json::json!({
454 "task_id": task.id.to_string(),
455 "project_id": payload.project_id,
456 "has_description": task.description.is_some(),
457 "has_images": payload.image_ids.is_some(),
458 }),
459 )
460 .await;
461
462 Ok(ResponseJson(ApiResponse::success(task)))
463}
464
465#[derive(Debug, Deserialize, TS)]
466pub struct CreateAndStartTaskRequest {
467 pub task: CreateTask,
468 pub executor_profile_id: ExecutorProfileId,
469 pub base_branch: String,
470 pub use_worktree: Option<bool>,
472}
473
474pub async fn create_task_and_start(
475 State(deployment): State<DeploymentImpl>,
476 Json(payload): Json<CreateAndStartTaskRequest>,
477) -> Result<ResponseJson<ApiResponse<TaskWithAttemptStatus>>, ApiError> {
478 let task_id = Uuid::new_v4();
479 let use_worktree = payload.use_worktree.unwrap_or(true);
480
481 let initial_status = if use_worktree {
485 TaskStatus::Todo
486 } else {
487 TaskStatus::Agent
488 };
489 let task = Task::create_with_status(
490 &deployment.db().pool,
491 &payload.task,
492 task_id,
493 initial_status,
494 )
495 .await?;
496
497 if let Some(image_ids) = &payload.task.image_ids {
498 TaskImage::associate_many(&deployment.db().pool, task.id, image_ids).await?;
499 }
500
501 if !use_worktree {
503 sqlx::query(
504 r#"INSERT INTO forge_agents (id, project_id, agent_type, task_id, created_at, updated_at)
505 VALUES (?, ?, 'genie_chat', ?, datetime('now'), datetime('now'))"#,
506 )
507 .bind(Uuid::new_v4().to_string())
508 .bind(task.project_id.to_string())
509 .bind(task.id.to_string())
510 .execute(&deployment.db().pool)
511 .await?;
512 }
514
515 deployment
516 .track_if_analytics_allowed(
517 "task_created",
518 serde_json::json!({
519 "task_id": task.id.to_string(),
520 "project_id": task.project_id,
521 "has_description": task.description.is_some(),
522 "has_images": payload.task.image_ids.is_some(),
523 }),
524 )
525 .await;
526
527 let project = task
531 .parent_project(&deployment.db().pool)
532 .await?
533 .ok_or(SqlxError::RowNotFound)?;
534 if let Ok(_cache) = deployment
535 .profile_cache()
536 .get_or_create(project.git_repo_path.clone())
537 .await
538 {
539 tracing::debug!(
540 "Cached .genie profiles for workspace: {}",
541 project.git_repo_path.display()
542 );
543 deployment
544 .profile_cache()
545 .register_project(project.id, project.git_repo_path.clone())
546 .await;
547 }
548
549 let attempt_id = Uuid::new_v4();
550 let git_branch_name = deployment
551 .container()
552 .git_branch_from_task_attempt(&attempt_id, &task.title)
553 .await;
554
555 let mut task_attempt = TaskAttempt::create(
556 &deployment.db().pool,
557 &CreateTaskAttempt {
558 executor: payload.executor_profile_id.executor,
559 base_branch: payload.base_branch,
560 branch: git_branch_name,
561 },
562 attempt_id,
563 task.id,
564 )
565 .await?;
566
567 if let Some(variant) = &payload.executor_profile_id.variant {
569 let executor_with_variant = format!("{}:{}", payload.executor_profile_id.executor, variant);
570 sqlx::query(
571 "UPDATE task_attempts SET executor = ?, updated_at = datetime('now') WHERE id = ?",
572 )
573 .bind(&executor_with_variant)
574 .bind(attempt_id.to_string())
575 .execute(&deployment.db().pool)
576 .await?;
577 task_attempt.executor = executor_with_variant;
578 }
579
580 if let Some(use_worktree) = payload.use_worktree {
582 sqlx::query(
583 "INSERT INTO forge_task_attempt_config (task_attempt_id, use_worktree) VALUES (?, ?)",
584 )
585 .bind(attempt_id.to_string())
586 .bind(use_worktree)
587 .execute(&deployment.db().pool)
588 .await?;
589 }
590
591 let is_attempt_running = deployment
592 .container()
593 .start_attempt(&task_attempt, payload.executor_profile_id.clone())
594 .await
595 .inspect_err(|err| tracing::error!("Failed to start task attempt: {}", err))
596 .is_ok();
597 deployment
598 .track_if_analytics_allowed(
599 "task_attempt_started",
600 serde_json::json!({
601 "task_id": task.id.to_string(),
602 "executor": &payload.executor_profile_id.executor,
603 "variant": &payload.executor_profile_id.variant,
604 "attempt_id": task_attempt.id.to_string(),
605 }),
606 )
607 .await;
608
609 let task = Task::find_by_id(&deployment.db().pool, task.id)
610 .await?
611 .ok_or(ApiError::Database(SqlxError::RowNotFound))?;
612
613 tracing::info!("Started attempt for task {}", task.id);
614 Ok(ResponseJson(ApiResponse::success(TaskWithAttemptStatus {
615 task,
616 has_in_progress_attempt: is_attempt_running,
617 has_merged_attempt: false,
618 last_attempt_failed: false,
619 executor: task_attempt.executor,
620 attempt_count: 1, })))
622}
623
624pub async fn update_task(
625 Extension(existing_task): Extension<Task>,
626 State(deployment): State<DeploymentImpl>,
627 Json(payload): Json<UpdateTask>,
628) -> Result<ResponseJson<ApiResponse<Task>>, ApiError> {
629 let title = payload.title.unwrap_or(existing_task.title);
631 let description = match payload.description {
632 Some(s) if s.trim().is_empty() => None, Some(s) => Some(s), None => existing_task.description, };
636 let status = payload.status.unwrap_or(existing_task.status);
637 let parent_task_attempt = payload
638 .parent_task_attempt
639 .or(existing_task.parent_task_attempt);
640
641 let task = Task::update(
642 &deployment.db().pool,
643 existing_task.id,
644 existing_task.project_id,
645 title,
646 description,
647 status,
648 parent_task_attempt,
649 )
650 .await?;
651
652 if let Some(image_ids) = &payload.image_ids {
653 TaskImage::delete_by_task_id(&deployment.db().pool, task.id).await?;
654 TaskImage::associate_many_dedup(&deployment.db().pool, task.id, image_ids).await?;
655 }
656
657 if status == TaskStatus::Archived && existing_task.status != TaskStatus::Archived {
659 handle_task_archive(&deployment, existing_task.id);
661 }
662
663 Ok(ResponseJson(ApiResponse::success(task)))
664}
665
666pub async fn delete_task(
667 Extension(task): Extension<Task>,
668 State(deployment): State<DeploymentImpl>,
669) -> Result<(StatusCode, ResponseJson<ApiResponse<()>>), ApiError> {
670 if deployment
672 .container()
673 .has_running_processes(task.id)
674 .await?
675 {
676 return Err(ApiError::Conflict("Task has running execution processes. Please wait for them to complete or stop them first.".to_string()));
677 }
678
679 let attempts = TaskAttempt::fetch_all(&deployment.db().pool, Some(task.id))
681 .await
682 .map_err(|e| {
683 tracing::error!("Failed to fetch task attempts for task {}: {}", task.id, e);
684 ApiError::TaskAttempt(e)
685 })?;
686
687 let project = task
689 .parent_project(&deployment.db().pool)
690 .await?
691 .ok_or_else(|| ApiError::Database(SqlxError::RowNotFound))?;
692
693 let cleanup_data: Vec<WorktreeCleanupData> = attempts
694 .iter()
695 .filter_map(|attempt| {
696 attempt
697 .container_ref
698 .as_ref()
699 .map(|worktree_path| WorktreeCleanupData {
700 attempt_id: attempt.id,
701 worktree_path: PathBuf::from(worktree_path),
702 git_repo_path: Some(project.git_repo_path.clone()),
703 })
704 })
705 .collect();
706
707 let mut tx = deployment.db().pool.begin().await?;
709
710 let mut total_children_affected = 0u64;
713 for attempt in &attempts {
714 let children_affected = Task::nullify_children_by_attempt_id(&mut *tx, attempt.id).await?;
715 total_children_affected += children_affected;
716 }
717
718 let rows_affected = Task::delete(&mut *tx, task.id).await?;
720
721 if rows_affected == 0 {
722 return Err(ApiError::Database(SqlxError::RowNotFound));
723 }
724
725 tx.commit().await?;
727
728 if total_children_affected > 0 {
729 tracing::info!(
730 "Nullified {} child task references before deleting task {}",
731 total_children_affected,
732 task.id
733 );
734 }
735
736 deployment
737 .track_if_analytics_allowed(
738 "task_deleted",
739 serde_json::json!({
740 "task_id": task.id.to_string(),
741 "project_id": task.project_id.to_string(),
742 "attempt_count": attempts.len(),
743 }),
744 )
745 .await;
746
747 let task_id = task.id;
749 tokio::spawn(async move {
750 let span = tracing::info_span!("background_worktree_cleanup", task_id = %task_id);
751 let _enter = span.enter();
752
753 tracing::info!(
754 "Starting background cleanup for task {} ({} worktrees)",
755 task_id,
756 cleanup_data.len()
757 );
758
759 if let Err(e) = cleanup_worktrees_direct(&cleanup_data).await {
760 tracing::error!(
761 "Background worktree cleanup failed for task {}: {}",
762 task_id,
763 e
764 );
765 } else {
766 tracing::info!("Background cleanup completed for task {}", task_id);
767 }
768 });
769
770 Ok((StatusCode::ACCEPTED, ResponseJson(ApiResponse::success(()))))
772}
773
774fn handle_task_archive(deployment: &DeploymentImpl, task_id: Uuid) {
776 let deployment = deployment.clone();
777 tokio::spawn(async move {
778 let span = tracing::info_span!("archive_task_worktree_cleanup", task_id = %task_id);
779 let _enter = span.enter();
780
781 let task = match Task::find_by_id(&deployment.db().pool, task_id).await {
783 Ok(Some(t)) => t,
784 _ => {
785 tracing::error!("Failed to find task {} for archive cleanup", task_id);
786 return;
787 }
788 };
789
790 let attempts = match TaskAttempt::fetch_all(&deployment.db().pool, Some(task_id)).await {
792 Ok(a) => a,
793 Err(e) => {
794 tracing::error!("Failed to fetch attempts for task {}: {}", task_id, e);
795 return;
796 }
797 };
798
799 let project = match task.parent_project(&deployment.db().pool).await {
801 Ok(Some(p)) => p,
802 _ => {
803 tracing::error!("Failed to find project for task {}", task_id);
804 return;
805 }
806 };
807
808 let cleanup_data: Vec<WorktreeCleanupData> = attempts
810 .iter()
811 .filter_map(|attempt| {
812 attempt
813 .container_ref
814 .as_ref()
815 .map(|worktree_path| WorktreeCleanupData {
816 attempt_id: attempt.id,
817 worktree_path: PathBuf::from(worktree_path),
818 git_repo_path: Some(project.git_repo_path.clone()),
819 })
820 })
821 .collect();
822
823 if cleanup_data.is_empty() {
824 tracing::debug!("No worktrees to cleanup for archived task {}", task_id);
825 return;
826 }
827
828 tracing::info!(
829 "Starting worktree cleanup for archived task {} ({} worktrees)",
830 task_id,
831 cleanup_data.len()
832 );
833
834 match cleanup_worktrees_direct(&cleanup_data).await {
836 Ok(_) => {
837 for attempt in &attempts {
839 if let Err(e) = sqlx::query(
840 "UPDATE task_attempts SET worktree_deleted = TRUE, updated_at = datetime('now') WHERE id = ?"
841 )
842 .bind(attempt.id)
843 .execute(&deployment.db().pool)
844 .await
845 {
846 tracing::error!("Failed to mark worktree_deleted for attempt {}: {}", attempt.id, e);
847 }
848 }
849 tracing::info!("Completed worktree cleanup for archived task {}", task_id);
850 }
851 Err(e) => {
852 tracing::error!(
853 "Failed to cleanup worktrees for archived task {}: {}",
854 task_id,
855 e
856 );
857 }
858 }
859 });
860}
861
862pub fn router(deployment: &DeploymentImpl) -> Router<DeploymentImpl> {
863 let task_id_router = Router::new()
864 .route("/", get(get_task).put(update_task).delete(delete_task))
865 .layer(from_fn_with_state(deployment.clone(), load_task_middleware));
866
867 let inner = Router::new()
868 .route("/", get(get_tasks).post(create_task))
869 .route("/stream/ws", get(stream_tasks_ws))
870 .route("/create-and-start", post(create_task_and_start))
871 .nest("/{task_id}", task_id_router);
872
873 Router::new().nest("/tasks", inner)
875}