1use crate::async_database::AsyncDatabase;
8use crate::task_repository::items::delete_task_and_collect_log_paths;
9use anyhow::Result;
10use std::path::Path;
11use tracing::info;
12
13pub async fn cleanup_old_tasks(
18 db: &AsyncDatabase,
19 logs_dir: &Path,
20 retention_days: u32,
21 batch_limit: u32,
22) -> Result<u64> {
23 if retention_days == 0 {
24 return Ok(0);
25 }
26
27 let limit = if batch_limit == 0 { 50 } else { batch_limit };
28
29 let task_ids: Vec<String> = db
31 .reader()
32 .call(move |conn| {
33 let sql = format!(
34 "SELECT id FROM tasks \
35 WHERE status IN ('completed','failed','cancelled') \
36 AND updated_at < datetime('now', '-{retention_days} days') \
37 LIMIT {limit}"
38 );
39 let mut stmt = conn.prepare(&sql)?;
40 let ids: Vec<String> = stmt
41 .query_map([], |row| row.get(0))?
42 .filter_map(|r| r.ok())
43 .collect();
44 Ok(ids)
45 })
46 .await
47 .map_err(|e| anyhow::anyhow!("{e}"))?;
48
49 if task_ids.is_empty() {
50 return Ok(0);
51 }
52
53 let mut deleted = 0u64;
54 let logs_dir = logs_dir.to_path_buf();
55
56 for task_id in &task_ids {
57 let tid = task_id.clone();
58 let log_paths: Vec<String> = db
59 .writer()
60 .call(move |conn| {
61 delete_task_and_collect_log_paths(conn, &tid)
62 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
63 })
64 .await
65 .map_err(|e| anyhow::anyhow!("{e}"))?;
66
67 for path_str in &log_paths {
69 let path = Path::new(path_str);
70 if path.is_file() {
71 let _ = std::fs::remove_file(path);
72 }
73 }
74
75 let task_log_dir = logs_dir.join(task_id);
77 if task_log_dir.is_dir() {
78 let _ = std::fs::remove_dir_all(&task_log_dir);
79 }
80
81 deleted += 1;
82 }
83
84 if deleted > 0 {
85 info!(
86 tasks = deleted,
87 retention_days, "task auto-cleanup completed"
88 );
89 }
90
91 Ok(deleted)
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use crate::test_utils::TestState;
98
99 async fn insert_task(db: &AsyncDatabase, task_id: &str, status: &str) {
100 let id = task_id.to_owned();
101 let st = status.to_owned();
102 db.writer()
103 .call(move |conn| {
104 conn.execute(
105 "INSERT INTO tasks (id, name, status, goal, target_files_json, mode, \
106 project_id, workspace_id, workflow_id, workspace_root, \
107 qa_targets_json, ticket_dir, created_at, updated_at) \
108 VALUES (?1, ?1, ?2, '', '[]', 'auto', 'default', 'default', 'basic', \
109 '/tmp', '[]', '/tmp/tickets', datetime('now'), datetime('now'))",
110 rusqlite::params![id, st],
111 )?;
112 Ok(())
113 })
114 .await
115 .expect("insert_task");
116 }
117
118 async fn age_task(db: &AsyncDatabase, task_id: &str, days: u32) {
119 let id = task_id.to_owned();
120 db.writer()
121 .call(move |conn| {
122 conn.execute(
123 &format!(
124 "UPDATE tasks SET updated_at = datetime('now', '-{days} days') WHERE id = ?1"
125 ),
126 rusqlite::params![id],
127 )?;
128 Ok(())
129 })
130 .await
131 .expect("age_task");
132 }
133
134 async fn count_tasks(db: &AsyncDatabase) -> u64 {
135 db.reader()
136 .call(|conn| {
137 let c: i64 = conn.query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))?;
138 Ok(c as u64)
139 })
140 .await
141 .expect("count_tasks")
142 }
143
144 async fn task_exists(db: &AsyncDatabase, task_id: &str) -> bool {
145 let id = task_id.to_owned();
146 db.reader()
147 .call(move |conn| {
148 let c: i64 = conn.query_row(
149 "SELECT COUNT(*) FROM tasks WHERE id = ?1",
150 rusqlite::params![id],
151 |r| r.get(0),
152 )?;
153 Ok(c > 0)
154 })
155 .await
156 .expect("task_exists")
157 }
158
159 #[tokio::test]
160 async fn retention_zero_returns_zero() {
161 let mut ts = TestState::new();
162 let state = ts.build();
163 let logs_dir = tempfile::tempdir().unwrap();
164
165 insert_task(&state.async_database, "t1", "completed").await;
166 age_task(&state.async_database, "t1", 30).await;
167
168 let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 0, 10)
169 .await
170 .unwrap();
171 assert_eq!(deleted, 0);
172 assert!(task_exists(&state.async_database, "t1").await);
174 }
175
176 #[tokio::test]
177 async fn no_terminal_tasks_returns_zero() {
178 let mut ts = TestState::new();
179 let state = ts.build();
180 let logs_dir = tempfile::tempdir().unwrap();
181
182 insert_task(&state.async_database, "t-running", "running").await;
183 age_task(&state.async_database, "t-running", 30).await;
184
185 insert_task(&state.async_database, "t-pending", "pending").await;
186 age_task(&state.async_database, "t-pending", 30).await;
187
188 let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 100)
189 .await
190 .unwrap();
191 assert_eq!(deleted, 0);
192 assert_eq!(count_tasks(&state.async_database).await, 2);
193 }
194
195 #[tokio::test]
196 async fn old_completed_task_deleted() {
197 let mut ts = TestState::new();
198 let state = ts.build();
199 let logs_dir = tempfile::tempdir().unwrap();
200
201 insert_task(&state.async_database, "t-old", "completed").await;
202 age_task(&state.async_database, "t-old", 30).await;
203
204 let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 100)
205 .await
206 .unwrap();
207 assert_eq!(deleted, 1);
208 assert!(!task_exists(&state.async_database, "t-old").await);
209 }
210
211 #[tokio::test]
212 async fn recent_completed_task_not_deleted() {
213 let mut ts = TestState::new();
214 let state = ts.build();
215 let logs_dir = tempfile::tempdir().unwrap();
216
217 insert_task(&state.async_database, "t-recent", "completed").await;
219
220 let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 100)
221 .await
222 .unwrap();
223 assert_eq!(deleted, 0);
224 assert!(task_exists(&state.async_database, "t-recent").await);
225 }
226
227 #[tokio::test]
228 async fn batch_limit_respected() {
229 let mut ts = TestState::new();
230 let state = ts.build();
231 let logs_dir = tempfile::tempdir().unwrap();
232
233 for i in 0..3 {
234 let tid = format!("t-batch-{i}");
235 insert_task(&state.async_database, &tid, "failed").await;
236 age_task(&state.async_database, &tid, 30).await;
237 }
238
239 let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 2)
240 .await
241 .unwrap();
242 assert_eq!(deleted, 2);
243 assert_eq!(count_tasks(&state.async_database).await, 1);
245 }
246
247 #[tokio::test]
248 async fn batch_limit_zero_defaults_to_fifty() {
249 let mut ts = TestState::new();
250 let state = ts.build();
251 let logs_dir = tempfile::tempdir().unwrap();
252
253 insert_task(&state.async_database, "t-default", "cancelled").await;
254 age_task(&state.async_database, "t-default", 30).await;
255
256 let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 0)
258 .await
259 .unwrap();
260 assert_eq!(deleted, 1);
261 assert!(!task_exists(&state.async_database, "t-default").await);
262 }
263
264 #[tokio::test]
265 async fn log_dir_cleaned_up() {
266 let mut ts = TestState::new();
267 let state = ts.build();
268 let logs_dir = tempfile::tempdir().unwrap();
269
270 let task_id = "t-logdir";
271 insert_task(&state.async_database, task_id, "completed").await;
272 age_task(&state.async_database, task_id, 30).await;
273
274 let task_log_dir = logs_dir.path().join(task_id);
276 std::fs::create_dir_all(&task_log_dir).unwrap();
277 std::fs::write(task_log_dir.join("stdout.log"), "some output").unwrap();
278 std::fs::write(task_log_dir.join("stderr.log"), "some errors").unwrap();
279 assert!(task_log_dir.exists());
280
281 let deleted = cleanup_old_tasks(&state.async_database, logs_dir.path(), 7, 100)
282 .await
283 .unwrap();
284 assert_eq!(deleted, 1);
285 assert!(
286 !task_log_dir.exists(),
287 "task log directory should be removed after cleanup"
288 );
289 }
290}