1use rusqlite::{OptionalExtension, params};
4
5use super::{Db, StorageError};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct EventRow {
9 pub id: i64,
10 pub task_id: String,
11 pub ts: i64,
12 pub kind: String,
13 pub payload_json: String,
14}
15
16#[derive(Debug, Clone)]
17pub struct EventInsert {
18 pub task_id: String,
19 pub kind: String,
20 pub payload_json: String,
21}
22
23pub async fn append(db: &Db, input: EventInsert) -> Result<i64, StorageError> {
24 let EventInsert {
25 task_id,
26 kind,
27 payload_json,
28 } = input;
29 let now = now_epoch_ms();
30 let id = db
31 .conn
32 .call(move |c| {
33 c.execute(
34 "INSERT INTO task_events (task_id, ts, kind, payload_json)
35 VALUES (?1, ?2, ?3, ?4)",
36 params![task_id, now, kind, payload_json],
37 )?;
38 Ok::<_, rusqlite::Error>(c.last_insert_rowid())
39 })
40 .await?;
41 Ok(id)
42}
43
44pub async fn range_since(
45 db: &Db,
46 task_id: &str,
47 after_id: i64,
48 limit: i64,
49) -> Result<Vec<EventRow>, StorageError> {
50 let task_id = task_id.to_string();
51 let rows = db
52 .conn
53 .call(move |c| {
54 let mut stmt = c.prepare(
55 "SELECT id, task_id, ts, kind, payload_json
56 FROM task_events
57 WHERE task_id = ?1 AND id > ?2
58 ORDER BY id ASC
59 LIMIT ?3",
60 )?;
61 let iter = stmt.query_map(params![task_id, after_id, limit], |r| {
62 Ok(EventRow {
63 id: r.get(0)?,
64 task_id: r.get(1)?,
65 ts: r.get(2)?,
66 kind: r.get(3)?,
67 payload_json: r.get(4)?,
68 })
69 })?;
70 let mut out = Vec::new();
71 for r in iter {
72 out.push(r?);
73 }
74 Ok::<_, rusqlite::Error>(out)
75 })
76 .await?;
77 Ok(rows)
78}
79
80pub async fn count_by_kind(db: &Db, task_id: &str) -> Result<Vec<(String, i64)>, StorageError> {
81 let task_id = task_id.to_string();
82 let counts = db
83 .conn
84 .call(move |c| {
85 let mut stmt = c.prepare(
86 "SELECT kind, COUNT(*) FROM task_events WHERE task_id = ?1 GROUP BY kind",
87 )?;
88 let iter = stmt.query_map([&task_id], |r| {
89 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
90 })?;
91 let mut out = Vec::new();
92 for r in iter {
93 out.push(r?);
94 }
95 Ok::<_, rusqlite::Error>(out)
96 })
97 .await?;
98 Ok(counts)
99}
100
101pub async fn last_for_task(db: &Db, task_id: &str) -> Result<Option<EventRow>, StorageError> {
102 let task_id = task_id.to_string();
103 let row = db
104 .conn
105 .call(move |c| {
106 c.query_row(
107 "SELECT id, task_id, ts, kind, payload_json
108 FROM task_events WHERE task_id = ?1
109 ORDER BY id DESC LIMIT 1",
110 [&task_id],
111 |r| {
112 Ok(EventRow {
113 id: r.get(0)?,
114 task_id: r.get(1)?,
115 ts: r.get(2)?,
116 kind: r.get(3)?,
117 payload_json: r.get(4)?,
118 })
119 },
120 )
121 .optional()
122 })
123 .await?;
124 Ok(row)
125}
126
127fn now_epoch_ms() -> i64 {
128 use std::time::{SystemTime, UNIX_EPOCH};
129 SystemTime::now()
130 .duration_since(UNIX_EPOCH)
131 .map(|d| d.as_millis() as i64)
132 .unwrap_or(0)
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use crate::storage::tasks::{TaskInsert, TaskKind, insert as insert_task};
139 use tempfile::tempdir;
140
141 async fn fresh_db_with_task(id: &str) -> Db {
142 let tmp = tempdir().unwrap();
143 let db = Db::open(tmp.path().join("rover.db")).await.unwrap();
144 std::mem::forget(tmp);
145 insert_task(
146 &db,
147 TaskInsert {
148 id: id.into(),
149 kind: TaskKind::BatchFetch,
150 params_json: "{}".into(),
151 owner_pid: Some(1),
152 },
153 )
154 .await
155 .unwrap();
156 db
157 }
158
159 fn ev(task_id: &str, kind: &str, payload: &str) -> EventInsert {
160 EventInsert {
161 task_id: task_id.into(),
162 kind: kind.into(),
163 payload_json: payload.into(),
164 }
165 }
166
167 #[tokio::test]
168 async fn append_returns_monotonic_ids() {
169 let db = fresh_db_with_task("t1").await;
170 let id1 = append(&db, ev("t1", "task_started", "{}")).await.unwrap();
171 let id2 = append(&db, ev("t1", "item_done", r#"{"url":"a"}"#))
172 .await
173 .unwrap();
174 assert!(id2 > id1);
175 }
176
177 #[tokio::test]
178 async fn range_since_filters_by_cursor() {
179 let db = fresh_db_with_task("t1").await;
180 let id1 = append(&db, ev("t1", "a", "{}")).await.unwrap();
181 let id2 = append(&db, ev("t1", "b", "{}")).await.unwrap();
182 let id3 = append(&db, ev("t1", "c", "{}")).await.unwrap();
183 let rows = range_since(&db, "t1", id1, 100).await.unwrap();
184 let kinds: Vec<&str> = rows.iter().map(|r| r.kind.as_str()).collect();
185 assert_eq!(kinds, vec!["b", "c"]);
186 let ids: Vec<i64> = rows.iter().map(|r| r.id).collect();
187 assert_eq!(ids, vec![id2, id3]);
188 }
189
190 #[tokio::test]
191 async fn range_since_caps_at_limit() {
192 let db = fresh_db_with_task("t1").await;
193 for i in 0..10 {
194 append(&db, ev("t1", "x", &format!(r#"{{"i":{i}}}"#)))
195 .await
196 .unwrap();
197 }
198 let rows = range_since(&db, "t1", 0, 3).await.unwrap();
199 assert_eq!(rows.len(), 3);
200 }
201
202 #[tokio::test]
203 async fn range_since_isolates_tasks() {
204 let db = fresh_db_with_task("t1").await;
205 insert_task(
206 &db,
207 TaskInsert {
208 id: "t2".into(),
209 kind: TaskKind::BatchFetch,
210 params_json: "{}".into(),
211 owner_pid: Some(1),
212 },
213 )
214 .await
215 .unwrap();
216 append(&db, ev("t1", "a", "{}")).await.unwrap();
217 append(&db, ev("t2", "b", "{}")).await.unwrap();
218 let rows = range_since(&db, "t1", 0, 100).await.unwrap();
219 assert!(rows.iter().all(|r| r.task_id == "t1"));
220 assert_eq!(rows.len(), 1);
221 }
222
223 #[tokio::test]
224 async fn count_by_kind_groups_correctly() {
225 let db = fresh_db_with_task("t1").await;
226 append(&db, ev("t1", "item_done", "{}")).await.unwrap();
227 append(&db, ev("t1", "item_done", "{}")).await.unwrap();
228 append(&db, ev("t1", "item_failed", "{}")).await.unwrap();
229 let mut counts = count_by_kind(&db, "t1").await.unwrap();
230 counts.sort_by(|a, b| a.0.cmp(&b.0));
231 assert_eq!(
232 counts,
233 vec![("item_done".into(), 2), ("item_failed".into(), 1)],
234 );
235 }
236
237 #[tokio::test]
238 async fn last_for_task_returns_highest_id() {
239 let db = fresh_db_with_task("t1").await;
240 append(&db, ev("t1", "a", "{}")).await.unwrap();
241 let mid_id = append(&db, ev("t1", "b", r#"{"k":1}"#)).await.unwrap();
242 append(&db, ev("t1", "c", "{}")).await.unwrap();
243 let last = last_for_task(&db, "t1").await.unwrap().unwrap();
244 assert_eq!(last.kind, "c");
245 assert!(last.id > mid_id);
246 }
247
248 #[tokio::test]
249 async fn cascade_delete_drops_events() {
250 let db = fresh_db_with_task("t1").await;
251 append(&db, ev("t1", "a", "{}")).await.unwrap();
252 db.conn
253 .call(|c| {
254 c.execute("DELETE FROM tasks WHERE id = 't1'", [])?;
255 Ok::<_, rusqlite::Error>(())
256 })
257 .await
258 .unwrap();
259 let rows = range_since(&db, "t1", 0, 100).await.unwrap();
260 assert!(rows.is_empty());
261 }
262}