Skip to main content

agent_orchestrator/
task_cleanup.rs

1//! Auto-cleanup of terminated tasks and their associated data.
2//!
3//! Batch-deletes tasks in terminal state (completed/failed/cancelled) that
4//! are older than a configurable retention period. Cascade-deletes all
5//! related items, runs, events, and log files.
6
7use crate::async_database::AsyncDatabase;
8use crate::task_repository::items::delete_task_and_collect_log_paths;
9use anyhow::Result;
10use std::path::Path;
11use tracing::info;
12
13/// Clean up terminated tasks older than `retention_days`.
14///
15/// Cascade-deletes task_items, command_runs, events, and physically removes
16/// log files. Returns the number of tasks deleted.
17pub async fn cleanup_old_tasks(
18    db: &AsyncDatabase,
19    logs_dir: &Path,
20    retention_days: u32,
21    batch_limit: u32,
22) -> Result<u64> {
23    if retention_days == 0 {
24        return Ok(0);
25    }
26
27    let limit = if batch_limit == 0 { 50 } else { batch_limit };
28
29    // Find candidate task IDs.
30    let task_ids: Vec<String> = db
31        .reader()
32        .call(move |conn| {
33            let sql = format!(
34                "SELECT id FROM tasks \
35                 WHERE status IN ('completed','failed','cancelled') \
36                   AND updated_at < datetime('now', '-{retention_days} days') \
37                 LIMIT {limit}"
38            );
39            let mut stmt = conn.prepare(&sql)?;
40            let ids: Vec<String> = stmt
41                .query_map([], |row| row.get(0))?
42                .filter_map(|r| r.ok())
43                .collect();
44            Ok(ids)
45        })
46        .await
47        .map_err(|e| anyhow::anyhow!("{e}"))?;
48
49    if task_ids.is_empty() {
50        return Ok(0);
51    }
52
53    let mut deleted = 0u64;
54    let logs_dir = logs_dir.to_path_buf();
55
56    for task_id in &task_ids {
57        let tid = task_id.clone();
58        let log_paths: Vec<String> = db
59            .writer()
60            .call(move |conn| {
61                delete_task_and_collect_log_paths(conn, &tid)
62                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
63            })
64            .await
65            .map_err(|e| anyhow::anyhow!("{e}"))?;
66
67        // Physically remove log files.
68        for path_str in &log_paths {
69            let path = Path::new(path_str);
70            if path.is_file() {
71                let _ = std::fs::remove_file(path);
72            }
73        }
74
75        // Remove the task log directory if it exists.
76        let task_log_dir = logs_dir.join(task_id);
77        if task_log_dir.is_dir() {
78            let _ = std::fs::remove_dir_all(&task_log_dir);
79        }
80
81        deleted += 1;
82    }
83
84    if deleted > 0 {
85        info!(
86            tasks = deleted,
87            retention_days, "task auto-cleanup completed"
88        );
89    }
90
91    Ok(deleted)
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use crate::test_utils::TestState;
98
99    async fn insert_task(db: &AsyncDatabase, task_id: &str, status: &str) {
100        let id = task_id.to_owned();
101        let st = status.to_owned();
102        db.writer()
103            .call(move |conn| {
104                conn.execute(
105                    "INSERT INTO tasks (id, name, status, goal, target_files_json, mode, \
106                     project_id, workspace_id, workflow_id, workspace_root, \
107                     qa_targets_json, ticket_dir, created_at, updated_at) \
108                     VALUES (?1, ?1, ?2, '', '[]', 'auto', 'default', 'default', 'basic', \
109                     '/tmp', '[]', '/tmp/tickets', datetime('now'), datetime('now'))",
110                    rusqlite::params![id, st],
111                )?;
112                Ok(())
113            })
114            .await
115            .expect("insert_task");
116    }
117
118    async fn age_task(db: &AsyncDatabase, task_id: &str, days: u32) {
119        let id = task_id.to_owned();
120        db.writer()
121            .call(move |conn| {
122                conn.execute(
123                    &format!(
124                        "UPDATE tasks SET updated_at = datetime('now', '-{days} days') WHERE id = ?1"
125                    ),
126                    rusqlite::params![id],
127                )?;
128                Ok(())
129            })
130            .await
131            .expect("age_task");
132    }
133
134    async fn count_tasks(db: &AsyncDatabase) -> u64 {
135        db.reader()
136            .call(|conn| {
137                let c: i64 = conn.query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))?;
138                Ok(c as u64)
139            })
140            .await
141            .expect("count_tasks")
142    }
143
144    async fn task_exists(db: &AsyncDatabase, task_id: &str) -> bool {
145        let id = task_id.to_owned();
146        db.reader()
147            .call(move |conn| {
148                let c: i64 = conn.query_row(
149                    "SELECT COUNT(*) FROM tasks WHERE id = ?1",
150                    rusqlite::params![id],
151                    |r| r.get(0),
152                )?;
153                Ok(c > 0)
154            })
155            .await
156            .expect("task_exists")
157    }
158
159    #[tokio::test]
160    async fn retention_zero_returns_zero() {
161        let mut ts = TestState::new();
162        let state = ts.build();
163        let logs_dir = tempfile::tempdir().unwrap();
164
165        insert_task(&state.async_database, "t1", "completed").await;
166        age_task(&state.async_database, "t1", 30).await;
167
168        let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 0, 10)
169            .await
170            .unwrap();
171        assert_eq!(deleted, 0);
172        // Task should still exist — nothing was cleaned.
173        assert!(task_exists(&state.async_database, "t1").await);
174    }
175
176    #[tokio::test]
177    async fn no_terminal_tasks_returns_zero() {
178        let mut ts = TestState::new();
179        let state = ts.build();
180        let logs_dir = tempfile::tempdir().unwrap();
181
182        insert_task(&state.async_database, "t-running", "running").await;
183        age_task(&state.async_database, "t-running", 30).await;
184
185        insert_task(&state.async_database, "t-pending", "pending").await;
186        age_task(&state.async_database, "t-pending", 30).await;
187
188        let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 100)
189            .await
190            .unwrap();
191        assert_eq!(deleted, 0);
192        assert_eq!(count_tasks(&state.async_database).await, 2);
193    }
194
195    #[tokio::test]
196    async fn old_completed_task_deleted() {
197        let mut ts = TestState::new();
198        let state = ts.build();
199        let logs_dir = tempfile::tempdir().unwrap();
200
201        insert_task(&state.async_database, "t-old", "completed").await;
202        age_task(&state.async_database, "t-old", 30).await;
203
204        let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 100)
205            .await
206            .unwrap();
207        assert_eq!(deleted, 1);
208        assert!(!task_exists(&state.async_database, "t-old").await);
209    }
210
211    #[tokio::test]
212    async fn recent_completed_task_not_deleted() {
213        let mut ts = TestState::new();
214        let state = ts.build();
215        let logs_dir = tempfile::tempdir().unwrap();
216
217        // Task is completed but was updated just now — within retention window.
218        insert_task(&state.async_database, "t-recent", "completed").await;
219
220        let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 100)
221            .await
222            .unwrap();
223        assert_eq!(deleted, 0);
224        assert!(task_exists(&state.async_database, "t-recent").await);
225    }
226
227    #[tokio::test]
228    async fn batch_limit_respected() {
229        let mut ts = TestState::new();
230        let state = ts.build();
231        let logs_dir = tempfile::tempdir().unwrap();
232
233        for i in 0..3 {
234            let tid = format!("t-batch-{i}");
235            insert_task(&state.async_database, &tid, "failed").await;
236            age_task(&state.async_database, &tid, 30).await;
237        }
238
239        let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 2)
240            .await
241            .unwrap();
242        assert_eq!(deleted, 2);
243        // One task should remain.
244        assert_eq!(count_tasks(&state.async_database).await, 1);
245    }
246
247    #[tokio::test]
248    async fn batch_limit_zero_defaults_to_fifty() {
249        let mut ts = TestState::new();
250        let state = ts.build();
251        let logs_dir = tempfile::tempdir().unwrap();
252
253        insert_task(&state.async_database, "t-default", "cancelled").await;
254        age_task(&state.async_database, "t-default", 30).await;
255
256        // batch_limit=0 should not fail — it defaults to 50 internally.
257        let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 0)
258            .await
259            .unwrap();
260        assert_eq!(deleted, 1);
261        assert!(!task_exists(&state.async_database, "t-default").await);
262    }
263
264    #[tokio::test]
265    async fn log_dir_cleaned_up() {
266        let mut ts = TestState::new();
267        let state = ts.build();
268        let logs_dir = tempfile::tempdir().unwrap();
269
270        let task_id = "t-logdir";
271        insert_task(&state.async_database, task_id, "completed").await;
272        age_task(&state.async_database, task_id, 30).await;
273
274        // Create a log directory with files that should be removed.
275        let task_log_dir = logs_dir.path().join(task_id);
276        std::fs::create_dir_all(&task_log_dir).unwrap();
277        std::fs::write(task_log_dir.join("stdout.log"), "some output").unwrap();
278        std::fs::write(task_log_dir.join("stderr.log"), "some errors").unwrap();
279        assert!(task_log_dir.exists());
280
281        let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 100)
282            .await
283            .unwrap();
284        assert_eq!(deleted, 1);
285        assert!(
286            !task_log_dir.exists(),
287            "task log directory should be removed after cleanup"
288        );
289    }
290}