1use crate::async_database::AsyncDatabase;
7use crate::dto::EventDto;
8use anyhow::Result;
9use serde_json::Value;
10use std::path::Path;
11use tracing::info;
12
13#[derive(Debug, Clone)]
15pub struct EventStats {
16 pub total_rows: u64,
18 pub earliest: Option<String>,
20 pub latest: Option<String>,
22 pub by_task_status: Vec<(String, u64)>,
24}
25
26const TERMINAL_STATUSES: &str = "'completed','failed','cancelled'";
28
29pub async fn cleanup_old_events(
35 db: &AsyncDatabase,
36 retention_days: u32,
37 batch_limit: u32,
38) -> Result<u64> {
39 let days = retention_days;
40 let limit = batch_limit;
41 let deleted: u64 = db
42 .writer()
43 .call(move |conn| {
44 let sql = format!(
45 "DELETE FROM events WHERE rowid IN (\
46 SELECT events.rowid FROM events \
47 INNER JOIN tasks ON events.task_id = tasks.id \
48 WHERE events.created_at < datetime('now', '-{days} days') \
49 AND tasks.status IN ({TERMINAL_STATUSES}) \
50 LIMIT {limit}\
51 )"
52 );
53 let count = conn.execute(&sql, [])?;
54 Ok(count as u64)
55 })
56 .await
57 .map_err(|e| anyhow::anyhow!("{e}"))?;
58 if deleted > 0 {
59 info!(deleted, retention_days, "event cleanup: deleted old events");
60 }
61 Ok(deleted)
62}
63
64pub async fn count_pending_cleanup(db: &AsyncDatabase, retention_days: u32) -> Result<u64> {
67 let days = retention_days;
68 let count: u64 = db
69 .reader()
70 .call(move |conn| {
71 let sql = format!(
72 "SELECT COUNT(*) FROM events \
73 INNER JOIN tasks ON events.task_id = tasks.id \
74 WHERE events.created_at < datetime('now', '-{days} days') \
75 AND tasks.status IN ({TERMINAL_STATUSES})"
76 );
77 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
78 Ok(count as u64)
79 })
80 .await
81 .map_err(|e| anyhow::anyhow!("{e}"))?;
82 Ok(count)
83}
84
85pub async fn list_task_events(
88 db: &AsyncDatabase,
89 task_id: &str,
90 event_type_filter: Option<&str>,
91 limit: u32,
92) -> Result<Vec<EventDto>> {
93 let task_id = task_id.to_string();
94 let type_filter = event_type_filter.map(|s| s.to_string());
95 let limit = if limit == 0 { 50 } else { limit };
96 let events = db
97 .reader()
98 .call(move |conn| {
99 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
100 ref prefix,
101 ) = type_filter
102 {
103 (
104 format!(
105 "SELECT id, task_id, task_item_id, event_type, payload_json, created_at \
106 FROM events WHERE task_id = ?1 AND event_type LIKE ?2 \
107 ORDER BY id DESC LIMIT {limit}"
108 ),
109 vec![Box::new(task_id.clone()), Box::new(format!("{prefix}%"))],
110 )
111 } else {
112 (
113 format!(
114 "SELECT id, task_id, task_item_id, event_type, payload_json, created_at \
115 FROM events WHERE task_id = ?1 \
116 ORDER BY id DESC LIMIT {limit}"
117 ),
118 vec![Box::new(task_id.clone())],
119 )
120 };
121 let mut stmt = conn.prepare(&sql)?;
122 let rows = stmt
123 .query_map(rusqlite::params_from_iter(params.iter()), |row| {
124 let payload_str: String = row.get(4)?;
125 let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
126 Ok(EventDto {
127 id: row.get(0)?,
128 task_id: row.get(1)?,
129 task_item_id: row.get(2)?,
130 event_type: row.get(3)?,
131 payload,
132 created_at: row.get(5)?,
133 })
134 })?
135 .filter_map(|r| r.ok())
136 .collect();
137 Ok(rows)
138 })
139 .await
140 .map_err(|e| anyhow::anyhow!("{e}"))?;
141 Ok(events)
142}
143
144pub async fn event_stats(db: &AsyncDatabase) -> Result<EventStats> {
146 let stats = db
147 .reader()
148 .call(|conn| {
149 let total_rows: i64 =
150 conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
151 let earliest: Option<String> = conn
152 .query_row("SELECT MIN(created_at) FROM events", [], |row| row.get(0))
153 .unwrap_or(None);
154 let latest: Option<String> = conn
155 .query_row("SELECT MAX(created_at) FROM events", [], |row| row.get(0))
156 .unwrap_or(None);
157
158 let mut stmt = conn.prepare(
159 "SELECT COALESCE(t.status, 'unknown'), COUNT(*) \
160 FROM events e \
161 LEFT JOIN tasks t ON e.task_id = t.id \
162 GROUP BY t.status \
163 ORDER BY COUNT(*) DESC",
164 )?;
165 let by_task_status: Vec<(String, u64)> = stmt
166 .query_map([], |row| {
167 let status: String = row.get(0)?;
168 let count: i64 = row.get(1)?;
169 Ok((status, count as u64))
170 })?
171 .filter_map(|r| r.ok())
172 .collect();
173
174 Ok(EventStats {
175 total_rows: total_rows as u64,
176 earliest,
177 latest,
178 by_task_status,
179 })
180 })
181 .await
182 .map_err(|e| anyhow::anyhow!("{e}"))?;
183 Ok(stats)
184}
185
186pub async fn archive_events(
191 db: &AsyncDatabase,
192 archive_dir: &Path,
193 retention_days: u32,
194 batch_limit: u32,
195) -> Result<u64> {
196 let dir = archive_dir.to_path_buf();
197 let days = retention_days;
198 let limit = batch_limit;
199 let archived: u64 = db
200 .writer()
201 .call(move |conn| {
202 let sql = format!(
204 "SELECT events.rowid, events.task_id, events.task_item_id, \
205 events.event_type, events.payload_json, events.created_at, \
206 events.step, events.step_scope, events.cycle \
207 FROM events \
208 INNER JOIN tasks ON events.task_id = tasks.id \
209 WHERE events.created_at < datetime('now', '-{days} days') \
210 AND tasks.status IN ({TERMINAL_STATUSES}) \
211 LIMIT {limit}"
212 );
213 let mut stmt = conn.prepare(&sql)?;
214
215 struct ArchiveRow {
216 rowid: i64,
217 task_id: String,
218 task_item_id: Option<String>,
219 event_type: String,
220 payload_json: String,
221 created_at: String,
222 step: Option<String>,
223 step_scope: Option<String>,
224 cycle: Option<i64>,
225 }
226
227 let rows: Vec<ArchiveRow> = stmt
228 .query_map([], |row| {
229 Ok(ArchiveRow {
230 rowid: row.get(0)?,
231 task_id: row.get(1)?,
232 task_item_id: row.get(2)?,
233 event_type: row.get(3)?,
234 payload_json: row.get(4)?,
235 created_at: row.get(5)?,
236 step: row.get(6)?,
237 step_scope: row.get(7)?,
238 cycle: row.get(8)?,
239 })
240 })?
241 .filter_map(|r| r.ok())
242 .collect();
243
244 if rows.is_empty() {
245 return Ok(0u64);
246 }
247
248 use std::collections::HashMap;
250 use std::io::Write;
251 let mut grouped: HashMap<String, Vec<String>> = HashMap::new();
252 let mut rowids = Vec::with_capacity(rows.len());
253 for row in &rows {
254 let (
255 rowid,
256 task_id,
257 task_item_id,
258 event_type,
259 payload_json,
260 created_at,
261 step,
262 step_scope,
263 cycle,
264 ) = (
265 &row.rowid,
266 &row.task_id,
267 &row.task_item_id,
268 &row.event_type,
269 &row.payload_json,
270 &row.created_at,
271 &row.step,
272 &row.step_scope,
273 &row.cycle,
274 );
275 rowids.push(*rowid);
276 let date = if created_at.len() >= 10 {
278 &created_at[..10]
279 } else {
280 created_at.as_str()
281 };
282 let line = serde_json::json!({
283 "task_id": task_id,
284 "task_item_id": task_item_id,
285 "event_type": event_type,
286 "payload_json": payload_json,
287 "created_at": created_at,
288 "step": step,
289 "step_scope": step_scope,
290 "cycle": cycle,
291 });
292 let key = format!("{task_id}/{date}");
293 grouped.entry(key).or_default().push(line.to_string());
294 }
295 for (key, lines) in &grouped {
296 let path = dir.join(format!("{key}.jsonl"));
297 if let Some(parent) = path.parent() {
298 std::fs::create_dir_all(parent)
299 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
300 }
301 let mut f = std::fs::OpenOptions::new()
302 .create(true)
303 .append(true)
304 .open(&path)
305 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
306 for line in lines {
307 writeln!(f, "{line}")
308 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
309 }
310 }
311
312 let placeholders: Vec<String> = rowids.iter().map(|id| id.to_string()).collect();
314 let delete_sql = format!(
315 "DELETE FROM events WHERE rowid IN ({})",
316 placeholders.join(",")
317 );
318 conn.execute(&delete_sql, [])?;
319
320 Ok(rows.len() as u64)
321 })
322 .await
323 .map_err(|e| anyhow::anyhow!("{e}"))?;
324 if archived > 0 {
325 info!(
326 archived,
327 retention_days, "event cleanup: archived and deleted events"
328 );
329 }
330 Ok(archived)
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::test_utils::TestState;
337
338 async fn insert_task(db: &AsyncDatabase, task_id: &str, status: &str) {
340 let id = task_id.to_owned();
341 let st = status.to_owned();
342 db.writer()
343 .call(move |conn| {
344 conn.execute(
345 "INSERT INTO tasks (id, name, status, goal, target_files_json, mode, \
346 project_id, workspace_id, workflow_id, workspace_root, \
347 qa_targets_json, ticket_dir, created_at, updated_at) \
348 VALUES (?1, ?1, ?2, '', '[]', 'auto', 'default', 'default', 'basic', \
349 '/tmp', '[]', '/tmp/tickets', datetime('now'), datetime('now'))",
350 rusqlite::params![id, st],
351 )?;
352 Ok(())
353 })
354 .await
355 .expect("insert_task");
356 }
357
358 async fn insert_event(db: &AsyncDatabase, task_id: &str, event_type: &str, created_at: &str) {
360 let tid = task_id.to_owned();
361 let et = event_type.to_owned();
362 let ca = created_at.to_owned();
363 db.writer()
364 .call(move |conn| {
365 conn.execute(
366 "INSERT INTO events (task_id, event_type, payload_json, created_at) \
367 VALUES (?1, ?2, '{}', ?3)",
368 rusqlite::params![tid, et, ca],
369 )?;
370 Ok(())
371 })
372 .await
373 .expect("insert_event");
374 }
375
376 async fn count_events(db: &AsyncDatabase) -> u64 {
378 db.reader()
379 .call(|conn| {
380 let c: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |r| r.get(0))?;
381 Ok(c as u64)
382 })
383 .await
384 .expect("count_events")
385 }
386
387 #[tokio::test]
388 async fn cleanup_deletes_only_terminal_old_events() {
389 let mut ts = TestState::new();
390 let state = ts.build();
391
392 insert_task(&state.async_database, "t-done", "completed").await;
394 insert_event(
395 &state.async_database,
396 "t-done",
397 "step_start",
398 "2020-01-01T00:00:00",
399 )
400 .await;
401
402 insert_task(&state.async_database, "t-running", "running").await;
404 insert_event(
405 &state.async_database,
406 "t-running",
407 "step_start",
408 "2020-01-01T00:00:00",
409 )
410 .await;
411
412 insert_task(&state.async_database, "t-recent", "completed").await;
414 insert_event(
415 &state.async_database,
416 "t-recent",
417 "step_start",
418 "2099-01-01T00:00:00",
419 )
420 .await;
421
422 assert_eq!(count_events(&state.async_database).await, 3);
423
424 let deleted = cleanup_old_events(&state.async_database, 1, 1000)
425 .await
426 .unwrap();
427 assert_eq!(deleted, 1);
428 assert_eq!(count_events(&state.async_database).await, 2);
429 }
430
431 #[tokio::test]
432 async fn cleanup_respects_batch_limit() {
433 let mut ts = TestState::new();
434 let state = ts.build();
435
436 insert_task(&state.async_database, "t-done", "completed").await;
437 for i in 0..5 {
438 insert_event(
439 &state.async_database,
440 "t-done",
441 &format!("ev_{i}"),
442 "2020-01-01T00:00:00",
443 )
444 .await;
445 }
446 assert_eq!(count_events(&state.async_database).await, 5);
447
448 let deleted = cleanup_old_events(&state.async_database, 1, 2)
449 .await
450 .unwrap();
451 assert_eq!(deleted, 2);
452 assert_eq!(count_events(&state.async_database).await, 3);
453 }
454
455 #[tokio::test]
456 async fn count_pending_cleanup_returns_correct_count() {
457 let mut ts = TestState::new();
458 let state = ts.build();
459
460 insert_task(&state.async_database, "t-fail", "failed").await;
461 insert_event(&state.async_database, "t-fail", "e1", "2020-01-01T00:00:00").await;
462 insert_event(&state.async_database, "t-fail", "e2", "2020-01-02T00:00:00").await;
463
464 insert_task(&state.async_database, "t-run", "running").await;
465 insert_event(&state.async_database, "t-run", "e3", "2020-01-01T00:00:00").await;
466
467 let count = count_pending_cleanup(&state.async_database, 1)
468 .await
469 .unwrap();
470 assert_eq!(count, 2);
471 }
472
473 #[tokio::test]
474 async fn event_stats_returns_expected_values() {
475 let mut ts = TestState::new();
476 let state = ts.build();
477
478 insert_task(&state.async_database, "t1", "completed").await;
479 insert_event(&state.async_database, "t1", "a", "2024-01-01T00:00:00").await;
480 insert_event(&state.async_database, "t1", "b", "2024-06-01T00:00:00").await;
481
482 insert_task(&state.async_database, "t2", "running").await;
483 insert_event(&state.async_database, "t2", "c", "2024-03-01T00:00:00").await;
484
485 let stats = event_stats(&state.async_database).await.unwrap();
486 assert_eq!(stats.total_rows, 3);
487 assert_eq!(stats.earliest.as_deref(), Some("2024-01-01T00:00:00"));
488 assert_eq!(stats.latest.as_deref(), Some("2024-06-01T00:00:00"));
489 assert!(stats.by_task_status.len() >= 2);
490 }
491
492 #[tokio::test]
493 async fn archive_events_writes_jsonl_and_deletes() {
494 let mut ts = TestState::new();
495 let state = ts.build();
496 let archive_dir =
497 std::env::temp_dir().join(format!("archive-test-{}", uuid::Uuid::new_v4()));
498
499 insert_task(&state.async_database, "t-arch", "cancelled").await;
500 insert_event(&state.async_database, "t-arch", "e1", "2020-06-15T10:00:00").await;
501 insert_event(&state.async_database, "t-arch", "e2", "2020-06-15T11:00:00").await;
502
503 assert_eq!(count_events(&state.async_database).await, 2);
504
505 let archived = archive_events(&state.async_database, &archive_dir, 1, 1000)
506 .await
507 .unwrap();
508 assert_eq!(archived, 2);
509 assert_eq!(count_events(&state.async_database).await, 0);
510
511 let jsonl_path = archive_dir.join("t-arch/2020-06-15.jsonl");
513 assert!(jsonl_path.exists(), "JSONL file should exist");
514 let content = std::fs::read_to_string(&jsonl_path).unwrap();
515 let lines: Vec<&str> = content.trim().lines().collect();
516 assert_eq!(lines.len(), 2);
517
518 for line in &lines {
520 let _: serde_json::Value = serde_json::from_str(line).expect("valid JSON line");
521 }
522
523 let _ = std::fs::remove_dir_all(&archive_dir);
525 }
526
527 #[tokio::test]
528 async fn cleanup_with_zero_retention_deletes_recent_terminal_events() {
529 let mut ts = TestState::new();
530 let state = ts.build();
531
532 insert_task(&state.async_database, "t-done", "completed").await;
533 insert_event(
536 &state.async_database,
537 "t-done",
538 "step_start",
539 "2025-01-01T00:00:00",
540 )
541 .await;
542
543 insert_task(&state.async_database, "t-running", "running").await;
544 insert_event(
545 &state.async_database,
546 "t-running",
547 "step_start",
548 "2025-01-01T00:00:00",
549 )
550 .await;
551
552 assert_eq!(count_events(&state.async_database).await, 2);
553
554 let deleted = cleanup_old_events(&state.async_database, 0, 1000)
556 .await
557 .unwrap();
558 assert_eq!(deleted, 1);
560 assert_eq!(count_events(&state.async_database).await, 1);
561 }
562
563 #[tokio::test]
564 async fn event_stats_on_empty_database() {
565 let mut ts = TestState::new();
566 let state = ts.build();
567
568 let stats = event_stats(&state.async_database).await.unwrap();
569 assert_eq!(stats.total_rows, 0);
570 assert_eq!(stats.earliest, None);
571 assert_eq!(stats.latest, None);
572 assert!(stats.by_task_status.is_empty());
573 }
574
575 #[tokio::test]
576 async fn archive_events_with_no_eligible_events() {
577 let mut ts = TestState::new();
578 let state = ts.build();
579 let archive_dir =
580 std::env::temp_dir().join(format!("archive-empty-{}", uuid::Uuid::new_v4()));
581
582 insert_task(&state.async_database, "t-run", "running").await;
584 insert_event(&state.async_database, "t-run", "e1", "2020-01-01T00:00:00").await;
585
586 let archived = archive_events(&state.async_database, &archive_dir, 1, 1000)
587 .await
588 .unwrap();
589 assert_eq!(archived, 0);
590 assert_eq!(count_events(&state.async_database).await, 1);
591
592 assert!(!archive_dir.exists());
594 }
595
596 #[tokio::test]
597 async fn archive_events_groups_by_date() {
598 let mut ts = TestState::new();
599 let state = ts.build();
600 let archive_dir =
601 std::env::temp_dir().join(format!("archive-dates-{}", uuid::Uuid::new_v4()));
602
603 insert_task(&state.async_database, "t-multi", "completed").await;
604 insert_event(
606 &state.async_database,
607 "t-multi",
608 "e1",
609 "2020-06-15T10:00:00",
610 )
611 .await;
612 insert_event(
613 &state.async_database,
614 "t-multi",
615 "e2",
616 "2020-06-16T12:00:00",
617 )
618 .await;
619 insert_event(
620 &state.async_database,
621 "t-multi",
622 "e3",
623 "2020-06-15T14:00:00",
624 )
625 .await;
626
627 let archived = archive_events(&state.async_database, &archive_dir, 1, 1000)
628 .await
629 .unwrap();
630 assert_eq!(archived, 3);
631 assert_eq!(count_events(&state.async_database).await, 0);
632
633 let path_15 = archive_dir.join("t-multi/2020-06-15.jsonl");
635 let path_16 = archive_dir.join("t-multi/2020-06-16.jsonl");
636 assert!(path_15.exists(), "JSONL for 2020-06-15 should exist");
637 assert!(path_16.exists(), "JSONL for 2020-06-16 should exist");
638
639 let content_15 = std::fs::read_to_string(&path_15).unwrap();
640 let lines_15: Vec<&str> = content_15.trim().lines().collect();
641 assert_eq!(lines_15.len(), 2, "Two events on 2020-06-15");
642
643 let content_16 = std::fs::read_to_string(&path_16).unwrap();
644 let lines_16: Vec<&str> = content_16.trim().lines().collect();
645 assert_eq!(lines_16.len(), 1, "One event on 2020-06-16");
646
647 let _ = std::fs::remove_dir_all(&archive_dir);
648 }
649
650 #[tokio::test]
651 async fn count_pending_cleanup_with_zero_results() {
652 let mut ts = TestState::new();
653 let state = ts.build();
654
655 let count = count_pending_cleanup(&state.async_database, 1)
657 .await
658 .unwrap();
659 assert_eq!(count, 0);
660
661 insert_task(&state.async_database, "t-run", "running").await;
663 insert_event(&state.async_database, "t-run", "e1", "2020-01-01T00:00:00").await;
664 let count = count_pending_cleanup(&state.async_database, 1)
665 .await
666 .unwrap();
667 assert_eq!(count, 0);
668 }
669
670 #[tokio::test]
671 async fn cleanup_deletes_failed_and_cancelled_tasks() {
672 let mut ts = TestState::new();
673 let state = ts.build();
674
675 insert_task(&state.async_database, "t-fail", "failed").await;
676 insert_event(
677 &state.async_database,
678 "t-fail",
679 "err",
680 "2020-01-01T00:00:00",
681 )
682 .await;
683
684 insert_task(&state.async_database, "t-cancel", "cancelled").await;
685 insert_event(
686 &state.async_database,
687 "t-cancel",
688 "cancel_ev",
689 "2020-01-01T00:00:00",
690 )
691 .await;
692
693 insert_task(&state.async_database, "t-pending", "pending").await;
694 insert_event(
695 &state.async_database,
696 "t-pending",
697 "pending_ev",
698 "2020-01-01T00:00:00",
699 )
700 .await;
701
702 assert_eq!(count_events(&state.async_database).await, 3);
703
704 let deleted = cleanup_old_events(&state.async_database, 1, 1000)
705 .await
706 .unwrap();
707 assert_eq!(deleted, 2);
709 assert_eq!(count_events(&state.async_database).await, 1);
710 }
711
712 #[tokio::test]
713 async fn cleanup_with_no_events_returns_zero() {
714 let mut ts = TestState::new();
715 let state = ts.build();
716
717 let deleted = cleanup_old_events(&state.async_database, 1, 1000)
718 .await
719 .unwrap();
720 assert_eq!(deleted, 0);
721 }
722
723 #[tokio::test]
724 async fn list_task_events_without_filter() {
725 let mut ts = TestState::new();
726 let state = ts.build();
727
728 insert_task(&state.async_database, "t-list", "running").await;
729 insert_event(
730 &state.async_database,
731 "t-list",
732 "step_start",
733 "2024-01-01T00:00:00",
734 )
735 .await;
736 insert_event(
737 &state.async_database,
738 "t-list",
739 "step_end",
740 "2024-01-02T00:00:00",
741 )
742 .await;
743
744 let events = list_task_events(&state.async_database, "t-list", None, 50)
745 .await
746 .unwrap();
747 assert_eq!(events.len(), 2);
748 assert_eq!(events[0].event_type, "step_end");
750 assert_eq!(events[1].event_type, "step_start");
751 }
752
753 #[tokio::test]
754 async fn list_task_events_with_type_filter() {
755 let mut ts = TestState::new();
756 let state = ts.build();
757
758 insert_task(&state.async_database, "t-filter", "running").await;
759 insert_event(
760 &state.async_database,
761 "t-filter",
762 "step_start",
763 "2024-01-01T00:00:00",
764 )
765 .await;
766 insert_event(
767 &state.async_database,
768 "t-filter",
769 "step_end",
770 "2024-01-02T00:00:00",
771 )
772 .await;
773 insert_event(
774 &state.async_database,
775 "t-filter",
776 "error_occurred",
777 "2024-01-03T00:00:00",
778 )
779 .await;
780
781 let events = list_task_events(&state.async_database, "t-filter", Some("step"), 50)
782 .await
783 .unwrap();
784 assert_eq!(events.len(), 2);
785 assert!(events.iter().all(|e| e.event_type.starts_with("step")));
786 }
787
788 #[tokio::test]
789 async fn list_task_events_with_zero_limit_defaults_to_50() {
790 let mut ts = TestState::new();
791 let state = ts.build();
792
793 insert_task(&state.async_database, "t-zero", "running").await;
794 insert_event(
795 &state.async_database,
796 "t-zero",
797 "ev1",
798 "2024-01-01T00:00:00",
799 )
800 .await;
801
802 let events = list_task_events(&state.async_database, "t-zero", None, 0)
804 .await
805 .unwrap();
806 assert_eq!(events.len(), 1);
807 }
808
809 #[tokio::test]
810 async fn list_task_events_for_nonexistent_task() {
811 let mut ts = TestState::new();
812 let state = ts.build();
813
814 let events = list_task_events(&state.async_database, "no-such-task", None, 50)
815 .await
816 .unwrap();
817 assert!(events.is_empty());
818 }
819
820 #[tokio::test]
821 async fn archive_events_across_multiple_tasks() {
822 let mut ts = TestState::new();
823 let state = ts.build();
824 let archive_dir =
825 std::env::temp_dir().join(format!("archive-multi-{}", uuid::Uuid::new_v4()));
826
827 insert_task(&state.async_database, "t-a", "completed").await;
828 insert_task(&state.async_database, "t-b", "failed").await;
829
830 insert_event(&state.async_database, "t-a", "e1", "2020-03-10T08:00:00").await;
831 insert_event(&state.async_database, "t-b", "e2", "2020-03-10T09:00:00").await;
832
833 let archived = archive_events(&state.async_database, &archive_dir, 1, 1000)
834 .await
835 .unwrap();
836 assert_eq!(archived, 2);
837 assert_eq!(count_events(&state.async_database).await, 0);
838
839 assert!(archive_dir.join("t-a/2020-03-10.jsonl").exists());
841 assert!(archive_dir.join("t-b/2020-03-10.jsonl").exists());
842
843 let _ = std::fs::remove_dir_all(&archive_dir);
844 }
845}