Skip to main content

rover/storage/
events.rs

1//! `task_events` append-only log.
2
3use rusqlite::{OptionalExtension, params};
4
5use super::{Db, StorageError};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct EventRow {
9    pub id: i64,
10    pub task_id: String,
11    pub ts: i64,
12    pub kind: String,
13    pub payload_json: String,
14}
15
16#[derive(Debug, Clone)]
17pub struct EventInsert {
18    pub task_id: String,
19    pub kind: String,
20    pub payload_json: String,
21}
22
23pub async fn append(db: &Db, input: EventInsert) -> Result<i64, StorageError> {
24    let EventInsert {
25        task_id,
26        kind,
27        payload_json,
28    } = input;
29    let now = now_epoch_ms();
30    let id = db
31        .conn
32        .call(move |c| {
33            c.execute(
34                "INSERT INTO task_events (task_id, ts, kind, payload_json)
35                 VALUES (?1, ?2, ?3, ?4)",
36                params![task_id, now, kind, payload_json],
37            )?;
38            Ok::<_, rusqlite::Error>(c.last_insert_rowid())
39        })
40        .await?;
41    Ok(id)
42}
43
44pub async fn range_since(
45    db: &Db,
46    task_id: &str,
47    after_id: i64,
48    limit: i64,
49) -> Result<Vec<EventRow>, StorageError> {
50    let task_id = task_id.to_string();
51    let rows = db
52        .conn
53        .call(move |c| {
54            let mut stmt = c.prepare(
55                "SELECT id, task_id, ts, kind, payload_json
56                 FROM task_events
57                 WHERE task_id = ?1 AND id > ?2
58                 ORDER BY id ASC
59                 LIMIT ?3",
60            )?;
61            let iter = stmt.query_map(params![task_id, after_id, limit], |r| {
62                Ok(EventRow {
63                    id: r.get(0)?,
64                    task_id: r.get(1)?,
65                    ts: r.get(2)?,
66                    kind: r.get(3)?,
67                    payload_json: r.get(4)?,
68                })
69            })?;
70            let mut out = Vec::new();
71            for r in iter {
72                out.push(r?);
73            }
74            Ok::<_, rusqlite::Error>(out)
75        })
76        .await?;
77    Ok(rows)
78}
79
80pub async fn count_by_kind(db: &Db, task_id: &str) -> Result<Vec<(String, i64)>, StorageError> {
81    let task_id = task_id.to_string();
82    let counts = db
83        .conn
84        .call(move |c| {
85            let mut stmt = c.prepare(
86                "SELECT kind, COUNT(*) FROM task_events WHERE task_id = ?1 GROUP BY kind",
87            )?;
88            let iter = stmt.query_map([&task_id], |r| {
89                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
90            })?;
91            let mut out = Vec::new();
92            for r in iter {
93                out.push(r?);
94            }
95            Ok::<_, rusqlite::Error>(out)
96        })
97        .await?;
98    Ok(counts)
99}
100
101pub async fn last_for_task(db: &Db, task_id: &str) -> Result<Option<EventRow>, StorageError> {
102    let task_id = task_id.to_string();
103    let row = db
104        .conn
105        .call(move |c| {
106            c.query_row(
107                "SELECT id, task_id, ts, kind, payload_json
108                 FROM task_events WHERE task_id = ?1
109                 ORDER BY id DESC LIMIT 1",
110                [&task_id],
111                |r| {
112                    Ok(EventRow {
113                        id: r.get(0)?,
114                        task_id: r.get(1)?,
115                        ts: r.get(2)?,
116                        kind: r.get(3)?,
117                        payload_json: r.get(4)?,
118                    })
119                },
120            )
121            .optional()
122        })
123        .await?;
124    Ok(row)
125}
126
127fn now_epoch_ms() -> i64 {
128    use std::time::{SystemTime, UNIX_EPOCH};
129    SystemTime::now()
130        .duration_since(UNIX_EPOCH)
131        .map(|d| d.as_millis() as i64)
132        .unwrap_or(0)
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use crate::storage::tasks::{TaskInsert, TaskKind, insert as insert_task};
139    use tempfile::tempdir;
140
141    async fn fresh_db_with_task(id: &str) -> Db {
142        let tmp = tempdir().unwrap();
143        let db = Db::open(tmp.path().join("rover.db")).await.unwrap();
144        std::mem::forget(tmp);
145        insert_task(
146            &db,
147            TaskInsert {
148                id: id.into(),
149                kind: TaskKind::BatchFetch,
150                params_json: "{}".into(),
151                owner_pid: Some(1),
152            },
153        )
154        .await
155        .unwrap();
156        db
157    }
158
159    fn ev(task_id: &str, kind: &str, payload: &str) -> EventInsert {
160        EventInsert {
161            task_id: task_id.into(),
162            kind: kind.into(),
163            payload_json: payload.into(),
164        }
165    }
166
167    #[tokio::test]
168    async fn append_returns_monotonic_ids() {
169        let db = fresh_db_with_task("t1").await;
170        let id1 = append(&db, ev("t1", "task_started", "{}")).await.unwrap();
171        let id2 = append(&db, ev("t1", "item_done", r#"{"url":"a"}"#))
172            .await
173            .unwrap();
174        assert!(id2 > id1);
175    }
176
177    #[tokio::test]
178    async fn range_since_filters_by_cursor() {
179        let db = fresh_db_with_task("t1").await;
180        let id1 = append(&db, ev("t1", "a", "{}")).await.unwrap();
181        let id2 = append(&db, ev("t1", "b", "{}")).await.unwrap();
182        let id3 = append(&db, ev("t1", "c", "{}")).await.unwrap();
183        let rows = range_since(&db, "t1", id1, 100).await.unwrap();
184        let kinds: Vec<&str> = rows.iter().map(|r| r.kind.as_str()).collect();
185        assert_eq!(kinds, vec!["b", "c"]);
186        let ids: Vec<i64> = rows.iter().map(|r| r.id).collect();
187        assert_eq!(ids, vec![id2, id3]);
188    }
189
190    #[tokio::test]
191    async fn range_since_caps_at_limit() {
192        let db = fresh_db_with_task("t1").await;
193        for i in 0..10 {
194            append(&db, ev("t1", "x", &format!(r#"{{"i":{i}}}"#)))
195                .await
196                .unwrap();
197        }
198        let rows = range_since(&db, "t1", 0, 3).await.unwrap();
199        assert_eq!(rows.len(), 3);
200    }
201
202    #[tokio::test]
203    async fn range_since_isolates_tasks() {
204        let db = fresh_db_with_task("t1").await;
205        insert_task(
206            &db,
207            TaskInsert {
208                id: "t2".into(),
209                kind: TaskKind::BatchFetch,
210                params_json: "{}".into(),
211                owner_pid: Some(1),
212            },
213        )
214        .await
215        .unwrap();
216        append(&db, ev("t1", "a", "{}")).await.unwrap();
217        append(&db, ev("t2", "b", "{}")).await.unwrap();
218        let rows = range_since(&db, "t1", 0, 100).await.unwrap();
219        assert!(rows.iter().all(|r| r.task_id == "t1"));
220        assert_eq!(rows.len(), 1);
221    }
222
223    #[tokio::test]
224    async fn count_by_kind_groups_correctly() {
225        let db = fresh_db_with_task("t1").await;
226        append(&db, ev("t1", "item_done", "{}")).await.unwrap();
227        append(&db, ev("t1", "item_done", "{}")).await.unwrap();
228        append(&db, ev("t1", "item_failed", "{}")).await.unwrap();
229        let mut counts = count_by_kind(&db, "t1").await.unwrap();
230        counts.sort_by(|a, b| a.0.cmp(&b.0));
231        assert_eq!(
232            counts,
233            vec![("item_done".into(), 2), ("item_failed".into(), 1)],
234        );
235    }
236
237    #[tokio::test]
238    async fn last_for_task_returns_highest_id() {
239        let db = fresh_db_with_task("t1").await;
240        append(&db, ev("t1", "a", "{}")).await.unwrap();
241        let mid_id = append(&db, ev("t1", "b", r#"{"k":1}"#)).await.unwrap();
242        append(&db, ev("t1", "c", "{}")).await.unwrap();
243        let last = last_for_task(&db, "t1").await.unwrap().unwrap();
244        assert_eq!(last.kind, "c");
245        assert!(last.id > mid_id);
246    }
247
248    #[tokio::test]
249    async fn cascade_delete_drops_events() {
250        let db = fresh_db_with_task("t1").await;
251        append(&db, ev("t1", "a", "{}")).await.unwrap();
252        db.conn
253            .call(|c| {
254                c.execute("DELETE FROM tasks WHERE id = 't1'", [])?;
255                Ok::<_, rusqlite::Error>(())
256            })
257            .await
258            .unwrap();
259        let rows = range_since(&db, "t1", 0, 100).await.unwrap();
260        assert!(rows.is_empty());
261    }
262}