Skip to main content

agent_orchestrator/
event_cleanup.rs

1//! TTL-based event cleanup, optional archival, and statistics.
2//!
3//! Provides functions to purge old events for completed/failed/cancelled tasks,
4//! optionally archiving them to JSONL before deletion.
5
6use crate::async_database::AsyncDatabase;
7use crate::dto::EventDto;
8use anyhow::Result;
9use serde_json::Value;
10use std::path::Path;
11use tracing::info;
12
13/// Aggregate statistics about the events table.
14#[derive(Debug, Clone)]
15pub struct EventStats {
16    /// Total number of rows in the events table.
17    pub total_rows: u64,
18    /// Earliest `created_at` timestamp, if any events exist.
19    pub earliest: Option<String>,
20    /// Latest `created_at` timestamp, if any events exist.
21    pub latest: Option<String>,
22    /// Event counts grouped by the owning task's status.
23    pub by_task_status: Vec<(String, u64)>,
24}
25
26/// Terminal task statuses whose events are eligible for cleanup.
27const TERMINAL_STATUSES: &str = "'completed','failed','cancelled'";
28
29/// Delete events older than `retention_days` whose owning task is in a terminal
30/// status. At most `batch_limit` rows are deleted per invocation to avoid long
31/// write-lock durations.
32///
33/// Returns the number of rows deleted.
34pub async fn cleanup_old_events(
35    db: &AsyncDatabase,
36    retention_days: u32,
37    batch_limit: u32,
38) -> Result<u64> {
39    let days = retention_days;
40    let limit = batch_limit;
41    let deleted: u64 = db
42        .writer()
43        .call(move |conn| {
44            let sql = format!(
45                "DELETE FROM events WHERE rowid IN (\
46                   SELECT events.rowid FROM events \
47                   INNER JOIN tasks ON events.task_id = tasks.id \
48                   WHERE events.created_at < datetime('now', '-{days} days') \
49                     AND tasks.status IN ({TERMINAL_STATUSES}) \
50                   LIMIT {limit}\
51                 )"
52            );
53            let count = conn.execute(&sql, [])?;
54            Ok(count as u64)
55        })
56        .await
57        .map_err(|e| anyhow::anyhow!("{e}"))?;
58    if deleted > 0 {
59        info!(deleted, retention_days, "event cleanup: deleted old events");
60    }
61    Ok(deleted)
62}
63
64/// Count events that would be deleted by `cleanup_old_events` without actually
65/// deleting them (dry-run).
66pub async fn count_pending_cleanup(db: &AsyncDatabase, retention_days: u32) -> Result<u64> {
67    let days = retention_days;
68    let count: u64 = db
69        .reader()
70        .call(move |conn| {
71            let sql = format!(
72                "SELECT COUNT(*) FROM events \
73                 INNER JOIN tasks ON events.task_id = tasks.id \
74                 WHERE events.created_at < datetime('now', '-{days} days') \
75                   AND tasks.status IN ({TERMINAL_STATUSES})"
76            );
77            let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
78            Ok(count as u64)
79        })
80        .await
81        .map_err(|e| anyhow::anyhow!("{e}"))?;
82    Ok(count)
83}
84
85/// Gather statistics about the events table.
86/// List events for a specific task, optionally filtered by event type prefix.
87pub async fn list_task_events(
88    db: &AsyncDatabase,
89    task_id: &str,
90    event_type_filter: Option<&str>,
91    limit: u32,
92) -> Result<Vec<EventDto>> {
93    let task_id = task_id.to_string();
94    let type_filter = event_type_filter.map(|s| s.to_string());
95    let limit = if limit == 0 { 50 } else { limit };
96    let events = db
97        .reader()
98        .call(move |conn| {
99            let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
100                ref prefix,
101            ) = type_filter
102            {
103                (
104                    format!(
105                        "SELECT id, task_id, task_item_id, event_type, payload_json, created_at \
106                             FROM events WHERE task_id = ?1 AND event_type LIKE ?2 \
107                             ORDER BY id DESC LIMIT {limit}"
108                    ),
109                    vec![Box::new(task_id.clone()), Box::new(format!("{prefix}%"))],
110                )
111            } else {
112                (
113                    format!(
114                        "SELECT id, task_id, task_item_id, event_type, payload_json, created_at \
115                             FROM events WHERE task_id = ?1 \
116                             ORDER BY id DESC LIMIT {limit}"
117                    ),
118                    vec![Box::new(task_id.clone())],
119                )
120            };
121            let mut stmt = conn.prepare(&sql)?;
122            let rows = stmt
123                .query_map(rusqlite::params_from_iter(params.iter()), |row| {
124                    let payload_str: String = row.get(4)?;
125                    let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
126                    Ok(EventDto {
127                        id: row.get(0)?,
128                        task_id: row.get(1)?,
129                        task_item_id: row.get(2)?,
130                        event_type: row.get(3)?,
131                        payload,
132                        created_at: row.get(5)?,
133                    })
134                })?
135                .filter_map(|r| r.ok())
136                .collect();
137            Ok(rows)
138        })
139        .await
140        .map_err(|e| anyhow::anyhow!("{e}"))?;
141    Ok(events)
142}
143
144/// Compute aggregate statistics for the events table.
145pub async fn event_stats(db: &AsyncDatabase) -> Result<EventStats> {
146    let stats = db
147        .reader()
148        .call(|conn| {
149            let total_rows: i64 =
150                conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
151            let earliest: Option<String> = conn
152                .query_row("SELECT MIN(created_at) FROM events", [], |row| row.get(0))
153                .unwrap_or(None);
154            let latest: Option<String> = conn
155                .query_row("SELECT MAX(created_at) FROM events", [], |row| row.get(0))
156                .unwrap_or(None);
157
158            let mut stmt = conn.prepare(
159                "SELECT COALESCE(t.status, 'unknown'), COUNT(*) \
160                 FROM events e \
161                 LEFT JOIN tasks t ON e.task_id = t.id \
162                 GROUP BY t.status \
163                 ORDER BY COUNT(*) DESC",
164            )?;
165            let by_task_status: Vec<(String, u64)> = stmt
166                .query_map([], |row| {
167                    let status: String = row.get(0)?;
168                    let count: i64 = row.get(1)?;
169                    Ok((status, count as u64))
170                })?
171                .filter_map(|r| r.ok())
172                .collect();
173
174            Ok(EventStats {
175                total_rows: total_rows as u64,
176                earliest,
177                latest,
178                by_task_status,
179            })
180        })
181        .await
182        .map_err(|e| anyhow::anyhow!("{e}"))?;
183    Ok(stats)
184}
185
186/// Archive events eligible for cleanup to JSONL files, then delete them.
187///
188/// Events are written to `{archive_dir}/{task_id}/{date}.jsonl` with one JSON
189/// object per line. Returns the number of events archived and deleted.
190pub async fn archive_events(
191    db: &AsyncDatabase,
192    archive_dir: &Path,
193    retention_days: u32,
194    batch_limit: u32,
195) -> Result<u64> {
196    let dir = archive_dir.to_path_buf();
197    let days = retention_days;
198    let limit = batch_limit;
199    let archived: u64 = db
200        .writer()
201        .call(move |conn| {
202            // Select events to archive
203            let sql = format!(
204                "SELECT events.rowid, events.task_id, events.task_item_id, \
205                        events.event_type, events.payload_json, events.created_at, \
206                        events.step, events.step_scope, events.cycle \
207                 FROM events \
208                 INNER JOIN tasks ON events.task_id = tasks.id \
209                 WHERE events.created_at < datetime('now', '-{days} days') \
210                   AND tasks.status IN ({TERMINAL_STATUSES}) \
211                 LIMIT {limit}"
212            );
213            let mut stmt = conn.prepare(&sql)?;
214
215            struct ArchiveRow {
216                rowid: i64,
217                task_id: String,
218                task_item_id: Option<String>,
219                event_type: String,
220                payload_json: String,
221                created_at: String,
222                step: Option<String>,
223                step_scope: Option<String>,
224                cycle: Option<i64>,
225            }
226
227            let rows: Vec<ArchiveRow> = stmt
228                .query_map([], |row| {
229                    Ok(ArchiveRow {
230                        rowid: row.get(0)?,
231                        task_id: row.get(1)?,
232                        task_item_id: row.get(2)?,
233                        event_type: row.get(3)?,
234                        payload_json: row.get(4)?,
235                        created_at: row.get(5)?,
236                        step: row.get(6)?,
237                        step_scope: row.get(7)?,
238                        cycle: row.get(8)?,
239                    })
240                })?
241                .filter_map(|r| r.ok())
242                .collect();
243
244            if rows.is_empty() {
245                return Ok(0u64);
246            }
247
248            // Group by task_id and write JSONL
249            use std::collections::HashMap;
250            use std::io::Write;
251            let mut grouped: HashMap<String, Vec<String>> = HashMap::new();
252            let mut rowids = Vec::with_capacity(rows.len());
253            for row in &rows {
254                let (
255                    rowid,
256                    task_id,
257                    task_item_id,
258                    event_type,
259                    payload_json,
260                    created_at,
261                    step,
262                    step_scope,
263                    cycle,
264                ) = (
265                    &row.rowid,
266                    &row.task_id,
267                    &row.task_item_id,
268                    &row.event_type,
269                    &row.payload_json,
270                    &row.created_at,
271                    &row.step,
272                    &row.step_scope,
273                    &row.cycle,
274                );
275                rowids.push(*rowid);
276                // Extract date from created_at (first 10 chars: YYYY-MM-DD)
277                let date = if created_at.len() >= 10 {
278                    &created_at[..10]
279                } else {
280                    created_at.as_str()
281                };
282                let line = serde_json::json!({
283                    "task_id": task_id,
284                    "task_item_id": task_item_id,
285                    "event_type": event_type,
286                    "payload_json": payload_json,
287                    "created_at": created_at,
288                    "step": step,
289                    "step_scope": step_scope,
290                    "cycle": cycle,
291                });
292                let key = format!("{task_id}/{date}");
293                grouped.entry(key).or_default().push(line.to_string());
294            }
295            for (key, lines) in &grouped {
296                let path = dir.join(format!("{key}.jsonl"));
297                if let Some(parent) = path.parent() {
298                    std::fs::create_dir_all(parent)
299                        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
300                }
301                let mut f = std::fs::OpenOptions::new()
302                    .create(true)
303                    .append(true)
304                    .open(&path)
305                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
306                for line in lines {
307                    writeln!(f, "{line}")
308                        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
309                }
310            }
311
312            // Delete archived events by rowid
313            let placeholders: Vec<String> = rowids.iter().map(|id| id.to_string()).collect();
314            let delete_sql = format!(
315                "DELETE FROM events WHERE rowid IN ({})",
316                placeholders.join(",")
317            );
318            conn.execute(&delete_sql, [])?;
319
320            Ok(rows.len() as u64)
321        })
322        .await
323        .map_err(|e| anyhow::anyhow!("{e}"))?;
324    if archived > 0 {
325        info!(
326            archived,
327            retention_days, "event cleanup: archived and deleted events"
328        );
329    }
330    Ok(archived)
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::test_utils::TestState;
337
338    /// Helper: insert a task row directly.
339    async fn insert_task(db: &AsyncDatabase, task_id: &str, status: &str) {
340        let id = task_id.to_owned();
341        let st = status.to_owned();
342        db.writer()
343            .call(move |conn| {
344                conn.execute(
345                    "INSERT INTO tasks (id, name, status, goal, target_files_json, mode, \
346                     project_id, workspace_id, workflow_id, workspace_root, \
347                     qa_targets_json, ticket_dir, created_at, updated_at) \
348                     VALUES (?1, ?1, ?2, '', '[]', 'auto', 'default', 'default', 'basic', \
349                     '/tmp', '[]', '/tmp/tickets', datetime('now'), datetime('now'))",
350                    rusqlite::params![id, st],
351                )?;
352                Ok(())
353            })
354            .await
355            .expect("insert_task");
356    }
357
358    /// Helper: insert an event with a specific created_at timestamp.
359    async fn insert_event(db: &AsyncDatabase, task_id: &str, event_type: &str, created_at: &str) {
360        let tid = task_id.to_owned();
361        let et = event_type.to_owned();
362        let ca = created_at.to_owned();
363        db.writer()
364            .call(move |conn| {
365                conn.execute(
366                    "INSERT INTO events (task_id, event_type, payload_json, created_at) \
367                     VALUES (?1, ?2, '{}', ?3)",
368                    rusqlite::params![tid, et, ca],
369                )?;
370                Ok(())
371            })
372            .await
373            .expect("insert_event");
374    }
375
376    /// Helper: count all events.
377    async fn count_events(db: &AsyncDatabase) -> u64 {
378        db.reader()
379            .call(|conn| {
380                let c: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |r| r.get(0))?;
381                Ok(c as u64)
382            })
383            .await
384            .expect("count_events")
385    }
386
387    #[tokio::test]
388    async fn cleanup_deletes_only_terminal_old_events() {
389        let mut ts = TestState::new();
390        let state = ts.build();
391
392        // completed task with old event — should be cleaned
393        insert_task(&state.async_database, "t-done", "completed").await;
394        insert_event(
395            &state.async_database,
396            "t-done",
397            "step_start",
398            "2020-01-01T00:00:00",
399        )
400        .await;
401
402        // running task with old event — should NOT be cleaned
403        insert_task(&state.async_database, "t-running", "running").await;
404        insert_event(
405            &state.async_database,
406            "t-running",
407            "step_start",
408            "2020-01-01T00:00:00",
409        )
410        .await;
411
412        // completed task with recent event — should NOT be cleaned (within retention)
413        insert_task(&state.async_database, "t-recent", "completed").await;
414        insert_event(
415            &state.async_database,
416            "t-recent",
417            "step_start",
418            "2099-01-01T00:00:00",
419        )
420        .await;
421
422        assert_eq!(count_events(&state.async_database).await, 3);
423
424        let deleted = cleanup_old_events(&state.async_database, 1, 1000)
425            .await
426            .unwrap();
427        assert_eq!(deleted, 1);
428        assert_eq!(count_events(&state.async_database).await, 2);
429    }
430
431    #[tokio::test]
432    async fn cleanup_respects_batch_limit() {
433        let mut ts = TestState::new();
434        let state = ts.build();
435
436        insert_task(&state.async_database, "t-done", "completed").await;
437        for i in 0..5 {
438            insert_event(
439                &state.async_database,
440                "t-done",
441                &format!("ev_{i}"),
442                "2020-01-01T00:00:00",
443            )
444            .await;
445        }
446        assert_eq!(count_events(&state.async_database).await, 5);
447
448        let deleted = cleanup_old_events(&state.async_database, 1, 2)
449            .await
450            .unwrap();
451        assert_eq!(deleted, 2);
452        assert_eq!(count_events(&state.async_database).await, 3);
453    }
454
455    #[tokio::test]
456    async fn count_pending_cleanup_returns_correct_count() {
457        let mut ts = TestState::new();
458        let state = ts.build();
459
460        insert_task(&state.async_database, "t-fail", "failed").await;
461        insert_event(&state.async_database, "t-fail", "e1", "2020-01-01T00:00:00").await;
462        insert_event(&state.async_database, "t-fail", "e2", "2020-01-02T00:00:00").await;
463
464        insert_task(&state.async_database, "t-run", "running").await;
465        insert_event(&state.async_database, "t-run", "e3", "2020-01-01T00:00:00").await;
466
467        let count = count_pending_cleanup(&state.async_database, 1)
468            .await
469            .unwrap();
470        assert_eq!(count, 2);
471    }
472
473    #[tokio::test]
474    async fn event_stats_returns_expected_values() {
475        let mut ts = TestState::new();
476        let state = ts.build();
477
478        insert_task(&state.async_database, "t1", "completed").await;
479        insert_event(&state.async_database, "t1", "a", "2024-01-01T00:00:00").await;
480        insert_event(&state.async_database, "t1", "b", "2024-06-01T00:00:00").await;
481
482        insert_task(&state.async_database, "t2", "running").await;
483        insert_event(&state.async_database, "t2", "c", "2024-03-01T00:00:00").await;
484
485        let stats = event_stats(&state.async_database).await.unwrap();
486        assert_eq!(stats.total_rows, 3);
487        assert_eq!(stats.earliest.as_deref(), Some("2024-01-01T00:00:00"));
488        assert_eq!(stats.latest.as_deref(), Some("2024-06-01T00:00:00"));
489        assert!(stats.by_task_status.len() >= 2);
490    }
491
492    #[tokio::test]
493    async fn archive_events_writes_jsonl_and_deletes() {
494        let mut ts = TestState::new();
495        let state = ts.build();
496        let archive_dir =
497            std::env::temp_dir().join(format!("archive-test-{}", uuid::Uuid::new_v4()));
498
499        insert_task(&state.async_database, "t-arch", "cancelled").await;
500        insert_event(&state.async_database, "t-arch", "e1", "2020-06-15T10:00:00").await;
501        insert_event(&state.async_database, "t-arch", "e2", "2020-06-15T11:00:00").await;
502
503        assert_eq!(count_events(&state.async_database).await, 2);
504
505        let archived = archive_events(&state.async_database, &archive_dir, 1, 1000)
506            .await
507            .unwrap();
508        assert_eq!(archived, 2);
509        assert_eq!(count_events(&state.async_database).await, 0);
510
511        // Verify JSONL file exists and has 2 lines
512        let jsonl_path = archive_dir.join("t-arch/2020-06-15.jsonl");
513        assert!(jsonl_path.exists(), "JSONL file should exist");
514        let content = std::fs::read_to_string(&jsonl_path).unwrap();
515        let lines: Vec<&str> = content.trim().lines().collect();
516        assert_eq!(lines.len(), 2);
517
518        // Each line should be valid JSON
519        for line in &lines {
520            let _: serde_json::Value = serde_json::from_str(line).expect("valid JSON line");
521        }
522
523        // Cleanup
524        let _ = std::fs::remove_dir_all(&archive_dir);
525    }
526
527    #[tokio::test]
528    async fn cleanup_with_zero_retention_deletes_recent_terminal_events() {
529        let mut ts = TestState::new();
530        let state = ts.build();
531
532        insert_task(&state.async_database, "t-done", "completed").await;
533        // An event from just a second ago — retention_days=0 means "older than now"
534        // so any past event qualifies
535        insert_event(
536            &state.async_database,
537            "t-done",
538            "step_start",
539            "2025-01-01T00:00:00",
540        )
541        .await;
542
543        insert_task(&state.async_database, "t-running", "running").await;
544        insert_event(
545            &state.async_database,
546            "t-running",
547            "step_start",
548            "2025-01-01T00:00:00",
549        )
550        .await;
551
552        assert_eq!(count_events(&state.async_database).await, 2);
553
554        // retention_days=0 means "older than now minus 0 days" — any past event qualifies
555        let deleted = cleanup_old_events(&state.async_database, 0, 1000)
556            .await
557            .unwrap();
558        // The completed task's event should be deleted; running task's should remain
559        assert_eq!(deleted, 1);
560        assert_eq!(count_events(&state.async_database).await, 1);
561    }
562
563    #[tokio::test]
564    async fn event_stats_on_empty_database() {
565        let mut ts = TestState::new();
566        let state = ts.build();
567
568        let stats = event_stats(&state.async_database).await.unwrap();
569        assert_eq!(stats.total_rows, 0);
570        assert_eq!(stats.earliest, None);
571        assert_eq!(stats.latest, None);
572        assert!(stats.by_task_status.is_empty());
573    }
574
575    #[tokio::test]
576    async fn archive_events_with_no_eligible_events() {
577        let mut ts = TestState::new();
578        let state = ts.build();
579        let archive_dir =
580            std::env::temp_dir().join(format!("archive-empty-{}", uuid::Uuid::new_v4()));
581
582        // Running task — not terminal, so nothing to archive
583        insert_task(&state.async_database, "t-run", "running").await;
584        insert_event(&state.async_database, "t-run", "e1", "2020-01-01T00:00:00").await;
585
586        let archived = archive_events(&state.async_database, &archive_dir, 1, 1000)
587            .await
588            .unwrap();
589        assert_eq!(archived, 0);
590        assert_eq!(count_events(&state.async_database).await, 1);
591
592        // Archive dir should not have been created since no events were archived
593        assert!(!archive_dir.exists());
594    }
595
596    #[tokio::test]
597    async fn archive_events_groups_by_date() {
598        let mut ts = TestState::new();
599        let state = ts.build();
600        let archive_dir =
601            std::env::temp_dir().join(format!("archive-dates-{}", uuid::Uuid::new_v4()));
602
603        insert_task(&state.async_database, "t-multi", "completed").await;
604        // Events on two different dates
605        insert_event(
606            &state.async_database,
607            "t-multi",
608            "e1",
609            "2020-06-15T10:00:00",
610        )
611        .await;
612        insert_event(
613            &state.async_database,
614            "t-multi",
615            "e2",
616            "2020-06-16T12:00:00",
617        )
618        .await;
619        insert_event(
620            &state.async_database,
621            "t-multi",
622            "e3",
623            "2020-06-15T14:00:00",
624        )
625        .await;
626
627        let archived = archive_events(&state.async_database, &archive_dir, 1, 1000)
628            .await
629            .unwrap();
630        assert_eq!(archived, 3);
631        assert_eq!(count_events(&state.async_database).await, 0);
632
633        // Two separate date files
634        let path_15 = archive_dir.join("t-multi/2020-06-15.jsonl");
635        let path_16 = archive_dir.join("t-multi/2020-06-16.jsonl");
636        assert!(path_15.exists(), "JSONL for 2020-06-15 should exist");
637        assert!(path_16.exists(), "JSONL for 2020-06-16 should exist");
638
639        let content_15 = std::fs::read_to_string(&path_15).unwrap();
640        let lines_15: Vec<&str> = content_15.trim().lines().collect();
641        assert_eq!(lines_15.len(), 2, "Two events on 2020-06-15");
642
643        let content_16 = std::fs::read_to_string(&path_16).unwrap();
644        let lines_16: Vec<&str> = content_16.trim().lines().collect();
645        assert_eq!(lines_16.len(), 1, "One event on 2020-06-16");
646
647        let _ = std::fs::remove_dir_all(&archive_dir);
648    }
649
650    #[tokio::test]
651    async fn count_pending_cleanup_with_zero_results() {
652        let mut ts = TestState::new();
653        let state = ts.build();
654
655        // No tasks or events at all
656        let count = count_pending_cleanup(&state.async_database, 1)
657            .await
658            .unwrap();
659        assert_eq!(count, 0);
660
661        // Add a running task with old events — still zero eligible
662        insert_task(&state.async_database, "t-run", "running").await;
663        insert_event(&state.async_database, "t-run", "e1", "2020-01-01T00:00:00").await;
664        let count = count_pending_cleanup(&state.async_database, 1)
665            .await
666            .unwrap();
667        assert_eq!(count, 0);
668    }
669
670    #[tokio::test]
671    async fn cleanup_deletes_failed_and_cancelled_tasks() {
672        let mut ts = TestState::new();
673        let state = ts.build();
674
675        insert_task(&state.async_database, "t-fail", "failed").await;
676        insert_event(
677            &state.async_database,
678            "t-fail",
679            "err",
680            "2020-01-01T00:00:00",
681        )
682        .await;
683
684        insert_task(&state.async_database, "t-cancel", "cancelled").await;
685        insert_event(
686            &state.async_database,
687            "t-cancel",
688            "cancel_ev",
689            "2020-01-01T00:00:00",
690        )
691        .await;
692
693        insert_task(&state.async_database, "t-pending", "pending").await;
694        insert_event(
695            &state.async_database,
696            "t-pending",
697            "pending_ev",
698            "2020-01-01T00:00:00",
699        )
700        .await;
701
702        assert_eq!(count_events(&state.async_database).await, 3);
703
704        let deleted = cleanup_old_events(&state.async_database, 1, 1000)
705            .await
706            .unwrap();
707        // failed + cancelled = 2 deleted; pending remains
708        assert_eq!(deleted, 2);
709        assert_eq!(count_events(&state.async_database).await, 1);
710    }
711
712    #[tokio::test]
713    async fn cleanup_with_no_events_returns_zero() {
714        let mut ts = TestState::new();
715        let state = ts.build();
716
717        let deleted = cleanup_old_events(&state.async_database, 1, 1000)
718            .await
719            .unwrap();
720        assert_eq!(deleted, 0);
721    }
722
723    #[tokio::test]
724    async fn list_task_events_without_filter() {
725        let mut ts = TestState::new();
726        let state = ts.build();
727
728        insert_task(&state.async_database, "t-list", "running").await;
729        insert_event(
730            &state.async_database,
731            "t-list",
732            "step_start",
733            "2024-01-01T00:00:00",
734        )
735        .await;
736        insert_event(
737            &state.async_database,
738            "t-list",
739            "step_end",
740            "2024-01-02T00:00:00",
741        )
742        .await;
743
744        let events = list_task_events(&state.async_database, "t-list", None, 50)
745            .await
746            .unwrap();
747        assert_eq!(events.len(), 2);
748        // Results are ordered by id DESC, so most recent first
749        assert_eq!(events[0].event_type, "step_end");
750        assert_eq!(events[1].event_type, "step_start");
751    }
752
753    #[tokio::test]
754    async fn list_task_events_with_type_filter() {
755        let mut ts = TestState::new();
756        let state = ts.build();
757
758        insert_task(&state.async_database, "t-filter", "running").await;
759        insert_event(
760            &state.async_database,
761            "t-filter",
762            "step_start",
763            "2024-01-01T00:00:00",
764        )
765        .await;
766        insert_event(
767            &state.async_database,
768            "t-filter",
769            "step_end",
770            "2024-01-02T00:00:00",
771        )
772        .await;
773        insert_event(
774            &state.async_database,
775            "t-filter",
776            "error_occurred",
777            "2024-01-03T00:00:00",
778        )
779        .await;
780
781        let events = list_task_events(&state.async_database, "t-filter", Some("step"), 50)
782            .await
783            .unwrap();
784        assert_eq!(events.len(), 2);
785        assert!(events.iter().all(|e| e.event_type.starts_with("step")));
786    }
787
788    #[tokio::test]
789    async fn list_task_events_with_zero_limit_defaults_to_50() {
790        let mut ts = TestState::new();
791        let state = ts.build();
792
793        insert_task(&state.async_database, "t-zero", "running").await;
794        insert_event(
795            &state.async_database,
796            "t-zero",
797            "ev1",
798            "2024-01-01T00:00:00",
799        )
800        .await;
801
802        // limit=0 should default to 50 internally and still return the event
803        let events = list_task_events(&state.async_database, "t-zero", None, 0)
804            .await
805            .unwrap();
806        assert_eq!(events.len(), 1);
807    }
808
809    #[tokio::test]
810    async fn list_task_events_for_nonexistent_task() {
811        let mut ts = TestState::new();
812        let state = ts.build();
813
814        let events = list_task_events(&state.async_database, "no-such-task", None, 50)
815            .await
816            .unwrap();
817        assert!(events.is_empty());
818    }
819
820    #[tokio::test]
821    async fn archive_events_across_multiple_tasks() {
822        let mut ts = TestState::new();
823        let state = ts.build();
824        let archive_dir =
825            std::env::temp_dir().join(format!("archive-multi-{}", uuid::Uuid::new_v4()));
826
827        insert_task(&state.async_database, "t-a", "completed").await;
828        insert_task(&state.async_database, "t-b", "failed").await;
829
830        insert_event(&state.async_database, "t-a", "e1", "2020-03-10T08:00:00").await;
831        insert_event(&state.async_database, "t-b", "e2", "2020-03-10T09:00:00").await;
832
833        let archived = archive_events(&state.async_database, &archive_dir, 1, 1000)
834            .await
835            .unwrap();
836        assert_eq!(archived, 2);
837        assert_eq!(count_events(&state.async_database).await, 0);
838
839        // Each task gets its own subdirectory
840        assert!(archive_dir.join("t-a/2020-03-10.jsonl").exists());
841        assert!(archive_dir.join("t-b/2020-03-10.jsonl").exists());
842
843        let _ = std::fs::remove_dir_all(&archive_dir);
844    }
845}