Skip to main content

oven_cli/db/
runs.rs

1use anyhow::{Context, Result};
2use rusqlite::{Connection, params};
3
4use super::{Run, RunStatus};
5
6pub fn insert_run(conn: &Connection, run: &Run) -> Result<()> {
7    conn.execute(
8        "INSERT INTO runs (id, issue_number, status, pr_number, branch, worktree_path, \
9         cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
10         issue_source) \
11         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
12        params![
13            run.id,
14            run.issue_number,
15            run.status.to_string(),
16            run.pr_number,
17            run.branch,
18            run.worktree_path,
19            run.cost_usd,
20            run.auto_merge,
21            run.started_at,
22            run.finished_at,
23            run.error_message,
24            run.complexity,
25            run.issue_source,
26        ],
27    )
28    .context("inserting run")?;
29    Ok(())
30}
31
32pub fn get_run(conn: &Connection, id: &str) -> Result<Option<Run>> {
33    let mut stmt = conn
34        .prepare(
35            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
36             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
37             issue_source \
38             FROM runs WHERE id = ?1",
39        )
40        .context("preparing get_run")?;
41
42    let mut rows = stmt.query_map(params![id], row_to_run).context("querying run")?;
43    match rows.next() {
44        Some(row) => Ok(Some(row.context("reading run row")?)),
45        None => Ok(None),
46    }
47}
48
49pub fn get_runs_by_status(conn: &Connection, status: RunStatus) -> Result<Vec<Run>> {
50    let mut stmt = conn
51        .prepare(
52            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
53             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
54             issue_source \
55             FROM runs WHERE status = ?1 ORDER BY started_at",
56        )
57        .context("preparing get_runs_by_status")?;
58
59    let rows = stmt
60        .query_map(params![status.to_string()], row_to_run)
61        .context("querying runs by status")?;
62    rows.collect::<std::result::Result<Vec<_>, _>>().context("collecting runs")
63}
64
65pub fn get_latest_run(conn: &Connection) -> Result<Option<Run>> {
66    let mut stmt = conn
67        .prepare(
68            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
69             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
70             issue_source \
71             FROM runs ORDER BY started_at DESC LIMIT 1",
72        )
73        .context("preparing get_latest_run")?;
74
75    let mut rows = stmt.query_map([], row_to_run).context("querying latest run")?;
76    match rows.next() {
77        Some(row) => Ok(Some(row.context("reading latest run row")?)),
78        None => Ok(None),
79    }
80}
81
82pub fn get_active_runs(conn: &Connection) -> Result<Vec<Run>> {
83    let mut stmt = conn
84        .prepare(
85            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
86             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
87             issue_source \
88             FROM runs WHERE status NOT IN ('complete', 'failed') ORDER BY started_at",
89        )
90        .context("preparing get_active_runs")?;
91
92    let rows = stmt.query_map([], row_to_run).context("querying active runs")?;
93    rows.collect::<std::result::Result<Vec<_>, _>>().context("collecting active runs")
94}
95
96pub fn get_all_runs(conn: &Connection) -> Result<Vec<Run>> {
97    let mut stmt = conn
98        .prepare(
99            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
100             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
101             issue_source \
102             FROM runs ORDER BY started_at DESC",
103        )
104        .context("preparing get_all_runs")?;
105
106    let rows = stmt.query_map([], row_to_run).context("querying all runs")?;
107    rows.collect::<std::result::Result<Vec<_>, _>>().context("collecting all runs")
108}
109
110pub fn update_run_status(conn: &Connection, id: &str, status: RunStatus) -> Result<()> {
111    conn.execute("UPDATE runs SET status = ?1 WHERE id = ?2", params![status.to_string(), id])
112        .context("updating run status")?;
113    Ok(())
114}
115
116pub fn update_run_pr(conn: &Connection, id: &str, pr_number: u32) -> Result<()> {
117    conn.execute("UPDATE runs SET pr_number = ?1 WHERE id = ?2", params![pr_number, id])
118        .context("updating run PR number")?;
119    Ok(())
120}
121
122pub fn update_run_cost(conn: &Connection, id: &str, cost_usd: f64) -> Result<()> {
123    conn.execute("UPDATE runs SET cost_usd = ?1 WHERE id = ?2", params![cost_usd, id])
124        .context("updating run cost")?;
125    Ok(())
126}
127
128/// Atomically add `delta` to the run's cost and return the new total.
129///
130/// Uses an explicit transaction to ensure the UPDATE and SELECT are atomic,
131/// preventing incorrect cost readings under concurrent access.
132pub fn increment_run_cost(conn: &Connection, id: &str, delta: f64) -> Result<f64> {
133    let tx = conn.unchecked_transaction().context("starting cost transaction")?;
134    tx.execute("UPDATE runs SET cost_usd = cost_usd + ?1 WHERE id = ?2", params![delta, id])
135        .context("incrementing run cost")?;
136    let new_cost: f64 = tx
137        .query_row("SELECT cost_usd FROM runs WHERE id = ?1", params![id], |row| row.get(0))
138        .context("reading updated cost")?;
139    tx.commit().context("committing cost transaction")?;
140    Ok(new_cost)
141}
142
143pub fn finish_run(
144    conn: &Connection,
145    id: &str,
146    status: RunStatus,
147    error_message: Option<&str>,
148) -> Result<()> {
149    conn.execute(
150        "UPDATE runs SET status = ?1, finished_at = datetime('now'), error_message = ?2 \
151         WHERE id = ?3",
152        params![status.to_string(), error_message, id],
153    )
154    .context("finishing run")?;
155    Ok(())
156}
157
158pub fn update_run_worktree(conn: &Connection, id: &str, branch: &str, path: &str) -> Result<()> {
159    conn.execute(
160        "UPDATE runs SET branch = ?1, worktree_path = ?2 WHERE id = ?3",
161        params![branch, path, id],
162    )
163    .context("updating run worktree")?;
164    Ok(())
165}
166
167pub fn update_run_complexity(conn: &Connection, id: &str, complexity: &str) -> Result<()> {
168    conn.execute("UPDATE runs SET complexity = ?1 WHERE id = ?2", params![complexity, id])
169        .context("updating run complexity")?;
170    Ok(())
171}
172
173fn row_to_run(row: &rusqlite::Row<'_>) -> rusqlite::Result<Run> {
174    let status_str: String = row.get(2)?;
175    let status: RunStatus = status_str.parse().map_err(|_| {
176        rusqlite::Error::InvalidColumnType(2, "status".to_string(), rusqlite::types::Type::Text)
177    })?;
178    Ok(Run {
179        id: row.get(0)?,
180        issue_number: row.get(1)?,
181        status,
182        pr_number: row.get(3)?,
183        branch: row.get(4)?,
184        worktree_path: row.get(5)?,
185        cost_usd: row.get(6)?,
186        auto_merge: row.get(7)?,
187        started_at: row.get(8)?,
188        finished_at: row.get(9)?,
189        error_message: row.get(10)?,
190        complexity: row.get(11)?,
191        issue_source: row.get(12)?,
192    })
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use crate::db;
199
200    fn test_db() -> Connection {
201        db::open_in_memory().unwrap()
202    }
203
204    fn sample_run(id: &str, issue: u32) -> Run {
205        Run {
206            id: id.to_string(),
207            issue_number: issue,
208            status: RunStatus::Pending,
209            pr_number: None,
210            branch: None,
211            worktree_path: None,
212            cost_usd: 0.0,
213            auto_merge: false,
214            started_at: "2026-03-12T00:00:00".to_string(),
215            finished_at: None,
216            error_message: None,
217            complexity: "full".to_string(),
218            issue_source: "github".to_string(),
219        }
220    }
221
222    #[test]
223    fn insert_and_get_run() {
224        let conn = test_db();
225        let run = sample_run("abcd1234", 42);
226        insert_run(&conn, &run).unwrap();
227
228        let retrieved = get_run(&conn, "abcd1234").unwrap().unwrap();
229        assert_eq!(retrieved.id, "abcd1234");
230        assert_eq!(retrieved.issue_number, 42);
231        assert_eq!(retrieved.status, RunStatus::Pending);
232    }
233
234    #[test]
235    fn get_nonexistent_run_returns_none() {
236        let conn = test_db();
237        assert!(get_run(&conn, "nope").unwrap().is_none());
238    }
239
240    #[test]
241    fn update_status() {
242        let conn = test_db();
243        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
244
245        update_run_status(&conn, "abcd1234", RunStatus::Implementing).unwrap();
246        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
247        assert_eq!(run.status, RunStatus::Implementing);
248    }
249
250    #[test]
251    fn update_pr_number() {
252        let conn = test_db();
253        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
254
255        update_run_pr(&conn, "abcd1234", 99).unwrap();
256        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
257        assert_eq!(run.pr_number, Some(99));
258    }
259
260    #[test]
261    fn update_cost() {
262        let conn = test_db();
263        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
264
265        update_run_cost(&conn, "abcd1234", 3.50).unwrap();
266        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
267        assert!((run.cost_usd - 3.50).abs() < f64::EPSILON);
268    }
269
270    #[test]
271    fn finish_run_sets_status_and_timestamp() {
272        let conn = test_db();
273        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
274
275        finish_run(&conn, "abcd1234", RunStatus::Complete, None).unwrap();
276        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
277        assert_eq!(run.status, RunStatus::Complete);
278        assert!(run.finished_at.is_some());
279    }
280
281    #[test]
282    fn finish_run_with_error() {
283        let conn = test_db();
284        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
285
286        finish_run(&conn, "abcd1234", RunStatus::Failed, Some("boom")).unwrap();
287        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
288        assert_eq!(run.status, RunStatus::Failed);
289        assert_eq!(run.error_message.as_deref(), Some("boom"));
290    }
291
292    #[test]
293    fn get_runs_by_status_filters() {
294        let conn = test_db();
295        insert_run(&conn, &sample_run("aaa11111", 1)).unwrap();
296        insert_run(&conn, &sample_run("bbb22222", 2)).unwrap();
297
298        update_run_status(&conn, "aaa11111", RunStatus::Complete).unwrap();
299
300        let pending = get_runs_by_status(&conn, RunStatus::Pending).unwrap();
301        assert_eq!(pending.len(), 1);
302        assert_eq!(pending[0].id, "bbb22222");
303
304        let complete = get_runs_by_status(&conn, RunStatus::Complete).unwrap();
305        assert_eq!(complete.len(), 1);
306        assert_eq!(complete[0].id, "aaa11111");
307    }
308
309    #[test]
310    fn get_latest_run_returns_most_recent() {
311        let conn = test_db();
312        let mut run1 = sample_run("aaa11111", 1);
313        run1.started_at = "2026-03-01T00:00:00".to_string();
314        let mut run2 = sample_run("bbb22222", 2);
315        run2.started_at = "2026-03-02T00:00:00".to_string();
316
317        insert_run(&conn, &run1).unwrap();
318        insert_run(&conn, &run2).unwrap();
319
320        let latest = get_latest_run(&conn).unwrap().unwrap();
321        assert_eq!(latest.id, "bbb22222");
322    }
323
324    #[test]
325    fn get_all_runs_returns_ordered() {
326        let conn = test_db();
327        let mut run1 = sample_run("aaa11111", 1);
328        run1.started_at = "2026-03-01T00:00:00".to_string();
329        let mut run2 = sample_run("bbb22222", 2);
330        run2.started_at = "2026-03-02T00:00:00".to_string();
331
332        insert_run(&conn, &run1).unwrap();
333        insert_run(&conn, &run2).unwrap();
334
335        let all = get_all_runs(&conn).unwrap();
336        assert_eq!(all.len(), 2);
337        assert_eq!(all[0].id, "bbb22222"); // most recent first
338    }
339
340    #[test]
341    fn issue_source_persists() {
342        let conn = test_db();
343        let mut run = sample_run("local001", 1);
344        run.issue_source = "local".to_string();
345        insert_run(&conn, &run).unwrap();
346
347        let retrieved = get_run(&conn, "local001").unwrap().unwrap();
348        assert_eq!(retrieved.issue_source, "local");
349    }
350
351    #[test]
352    fn issue_source_defaults_to_github() {
353        let conn = test_db();
354        insert_run(&conn, &sample_run("gh000001", 1)).unwrap();
355
356        let retrieved = get_run(&conn, "gh000001").unwrap().unwrap();
357        assert_eq!(retrieved.issue_source, "github");
358    }
359}