Skip to main content

oven_cli/db/
runs.rs

1use anyhow::{Context, Result};
2use rusqlite::{Connection, params};
3
4use super::{Run, RunStatus};
5
6pub fn insert_run(conn: &Connection, run: &Run) -> Result<()> {
7    conn.execute(
8        "INSERT INTO runs (id, issue_number, status, pr_number, branch, worktree_path, \
9         cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
10         issue_source) \
11         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
12        params![
13            run.id,
14            run.issue_number,
15            run.status.to_string(),
16            run.pr_number,
17            run.branch,
18            run.worktree_path,
19            run.cost_usd,
20            run.auto_merge,
21            run.started_at,
22            run.finished_at,
23            run.error_message,
24            run.complexity,
25            run.issue_source,
26        ],
27    )
28    .context("inserting run")?;
29    Ok(())
30}
31
32pub fn get_run(conn: &Connection, id: &str) -> Result<Option<Run>> {
33    let mut stmt = conn
34        .prepare(
35            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
36             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
37             issue_source \
38             FROM runs WHERE id = ?1",
39        )
40        .context("preparing get_run")?;
41
42    let mut rows = stmt.query_map(params![id], row_to_run).context("querying run")?;
43    match rows.next() {
44        Some(row) => Ok(Some(row.context("reading run row")?)),
45        None => Ok(None),
46    }
47}
48
49pub fn get_runs_by_status(conn: &Connection, status: RunStatus) -> Result<Vec<Run>> {
50    let mut stmt = conn
51        .prepare(
52            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
53             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
54             issue_source \
55             FROM runs WHERE status = ?1 ORDER BY started_at",
56        )
57        .context("preparing get_runs_by_status")?;
58
59    let rows = stmt
60        .query_map(params![status.to_string()], row_to_run)
61        .context("querying runs by status")?;
62    rows.collect::<std::result::Result<Vec<_>, _>>().context("collecting runs")
63}
64
65pub fn get_latest_run(conn: &Connection) -> Result<Option<Run>> {
66    let mut stmt = conn
67        .prepare(
68            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
69             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
70             issue_source \
71             FROM runs ORDER BY started_at DESC LIMIT 1",
72        )
73        .context("preparing get_latest_run")?;
74
75    let mut rows = stmt.query_map([], row_to_run).context("querying latest run")?;
76    match rows.next() {
77        Some(row) => Ok(Some(row.context("reading latest run row")?)),
78        None => Ok(None),
79    }
80}
81
82pub fn get_all_runs(conn: &Connection) -> Result<Vec<Run>> {
83    let mut stmt = conn
84        .prepare(
85            "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
86             cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
87             issue_source \
88             FROM runs ORDER BY started_at DESC",
89        )
90        .context("preparing get_all_runs")?;
91
92    let rows = stmt.query_map([], row_to_run).context("querying all runs")?;
93    rows.collect::<std::result::Result<Vec<_>, _>>().context("collecting all runs")
94}
95
96pub fn update_run_status(conn: &Connection, id: &str, status: RunStatus) -> Result<()> {
97    conn.execute("UPDATE runs SET status = ?1 WHERE id = ?2", params![status.to_string(), id])
98        .context("updating run status")?;
99    Ok(())
100}
101
102pub fn update_run_pr(conn: &Connection, id: &str, pr_number: u32) -> Result<()> {
103    conn.execute("UPDATE runs SET pr_number = ?1 WHERE id = ?2", params![pr_number, id])
104        .context("updating run PR number")?;
105    Ok(())
106}
107
108pub fn update_run_cost(conn: &Connection, id: &str, cost_usd: f64) -> Result<()> {
109    conn.execute("UPDATE runs SET cost_usd = ?1 WHERE id = ?2", params![cost_usd, id])
110        .context("updating run cost")?;
111    Ok(())
112}
113
114/// Atomically add `delta` to the run's cost and return the new total.
115///
116/// Uses an explicit transaction to ensure the UPDATE and SELECT are atomic,
117/// preventing incorrect cost readings under concurrent access.
118pub fn increment_run_cost(conn: &Connection, id: &str, delta: f64) -> Result<f64> {
119    let tx = conn.unchecked_transaction().context("starting cost transaction")?;
120    tx.execute("UPDATE runs SET cost_usd = cost_usd + ?1 WHERE id = ?2", params![delta, id])
121        .context("incrementing run cost")?;
122    let new_cost: f64 = tx
123        .query_row("SELECT cost_usd FROM runs WHERE id = ?1", params![id], |row| row.get(0))
124        .context("reading updated cost")?;
125    tx.commit().context("committing cost transaction")?;
126    Ok(new_cost)
127}
128
129pub fn finish_run(
130    conn: &Connection,
131    id: &str,
132    status: RunStatus,
133    error_message: Option<&str>,
134) -> Result<()> {
135    conn.execute(
136        "UPDATE runs SET status = ?1, finished_at = datetime('now'), error_message = ?2 \
137         WHERE id = ?3",
138        params![status.to_string(), error_message, id],
139    )
140    .context("finishing run")?;
141    Ok(())
142}
143
144pub fn update_run_worktree(conn: &Connection, id: &str, branch: &str, path: &str) -> Result<()> {
145    conn.execute(
146        "UPDATE runs SET branch = ?1, worktree_path = ?2 WHERE id = ?3",
147        params![branch, path, id],
148    )
149    .context("updating run worktree")?;
150    Ok(())
151}
152
153pub fn update_run_complexity(conn: &Connection, id: &str, complexity: &str) -> Result<()> {
154    conn.execute("UPDATE runs SET complexity = ?1 WHERE id = ?2", params![complexity, id])
155        .context("updating run complexity")?;
156    Ok(())
157}
158
159fn row_to_run(row: &rusqlite::Row<'_>) -> rusqlite::Result<Run> {
160    let status_str: String = row.get(2)?;
161    let status: RunStatus = status_str.parse().map_err(|_| {
162        rusqlite::Error::InvalidColumnType(2, "status".to_string(), rusqlite::types::Type::Text)
163    })?;
164    Ok(Run {
165        id: row.get(0)?,
166        issue_number: row.get(1)?,
167        status,
168        pr_number: row.get(3)?,
169        branch: row.get(4)?,
170        worktree_path: row.get(5)?,
171        cost_usd: row.get(6)?,
172        auto_merge: row.get(7)?,
173        started_at: row.get(8)?,
174        finished_at: row.get(9)?,
175        error_message: row.get(10)?,
176        complexity: row.get(11)?,
177        issue_source: row.get(12)?,
178    })
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use crate::db;
185
186    fn test_db() -> Connection {
187        db::open_in_memory().unwrap()
188    }
189
190    fn sample_run(id: &str, issue: u32) -> Run {
191        Run {
192            id: id.to_string(),
193            issue_number: issue,
194            status: RunStatus::Pending,
195            pr_number: None,
196            branch: None,
197            worktree_path: None,
198            cost_usd: 0.0,
199            auto_merge: false,
200            started_at: "2026-03-12T00:00:00".to_string(),
201            finished_at: None,
202            error_message: None,
203            complexity: "full".to_string(),
204            issue_source: "github".to_string(),
205        }
206    }
207
208    #[test]
209    fn insert_and_get_run() {
210        let conn = test_db();
211        let run = sample_run("abcd1234", 42);
212        insert_run(&conn, &run).unwrap();
213
214        let retrieved = get_run(&conn, "abcd1234").unwrap().unwrap();
215        assert_eq!(retrieved.id, "abcd1234");
216        assert_eq!(retrieved.issue_number, 42);
217        assert_eq!(retrieved.status, RunStatus::Pending);
218    }
219
220    #[test]
221    fn get_nonexistent_run_returns_none() {
222        let conn = test_db();
223        assert!(get_run(&conn, "nope").unwrap().is_none());
224    }
225
226    #[test]
227    fn update_status() {
228        let conn = test_db();
229        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
230
231        update_run_status(&conn, "abcd1234", RunStatus::Implementing).unwrap();
232        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
233        assert_eq!(run.status, RunStatus::Implementing);
234    }
235
236    #[test]
237    fn update_pr_number() {
238        let conn = test_db();
239        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
240
241        update_run_pr(&conn, "abcd1234", 99).unwrap();
242        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
243        assert_eq!(run.pr_number, Some(99));
244    }
245
246    #[test]
247    fn update_cost() {
248        let conn = test_db();
249        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
250
251        update_run_cost(&conn, "abcd1234", 3.50).unwrap();
252        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
253        assert!((run.cost_usd - 3.50).abs() < f64::EPSILON);
254    }
255
256    #[test]
257    fn finish_run_sets_status_and_timestamp() {
258        let conn = test_db();
259        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
260
261        finish_run(&conn, "abcd1234", RunStatus::Complete, None).unwrap();
262        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
263        assert_eq!(run.status, RunStatus::Complete);
264        assert!(run.finished_at.is_some());
265    }
266
267    #[test]
268    fn finish_run_with_error() {
269        let conn = test_db();
270        insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
271
272        finish_run(&conn, "abcd1234", RunStatus::Failed, Some("boom")).unwrap();
273        let run = get_run(&conn, "abcd1234").unwrap().unwrap();
274        assert_eq!(run.status, RunStatus::Failed);
275        assert_eq!(run.error_message.as_deref(), Some("boom"));
276    }
277
278    #[test]
279    fn get_runs_by_status_filters() {
280        let conn = test_db();
281        insert_run(&conn, &sample_run("aaa11111", 1)).unwrap();
282        insert_run(&conn, &sample_run("bbb22222", 2)).unwrap();
283
284        update_run_status(&conn, "aaa11111", RunStatus::Complete).unwrap();
285
286        let pending = get_runs_by_status(&conn, RunStatus::Pending).unwrap();
287        assert_eq!(pending.len(), 1);
288        assert_eq!(pending[0].id, "bbb22222");
289
290        let complete = get_runs_by_status(&conn, RunStatus::Complete).unwrap();
291        assert_eq!(complete.len(), 1);
292        assert_eq!(complete[0].id, "aaa11111");
293    }
294
295    #[test]
296    fn get_latest_run_returns_most_recent() {
297        let conn = test_db();
298        let mut run1 = sample_run("aaa11111", 1);
299        run1.started_at = "2026-03-01T00:00:00".to_string();
300        let mut run2 = sample_run("bbb22222", 2);
301        run2.started_at = "2026-03-02T00:00:00".to_string();
302
303        insert_run(&conn, &run1).unwrap();
304        insert_run(&conn, &run2).unwrap();
305
306        let latest = get_latest_run(&conn).unwrap().unwrap();
307        assert_eq!(latest.id, "bbb22222");
308    }
309
310    #[test]
311    fn get_all_runs_returns_ordered() {
312        let conn = test_db();
313        let mut run1 = sample_run("aaa11111", 1);
314        run1.started_at = "2026-03-01T00:00:00".to_string();
315        let mut run2 = sample_run("bbb22222", 2);
316        run2.started_at = "2026-03-02T00:00:00".to_string();
317
318        insert_run(&conn, &run1).unwrap();
319        insert_run(&conn, &run2).unwrap();
320
321        let all = get_all_runs(&conn).unwrap();
322        assert_eq!(all.len(), 2);
323        assert_eq!(all[0].id, "bbb22222"); // most recent first
324    }
325
326    #[test]
327    fn issue_source_persists() {
328        let conn = test_db();
329        let mut run = sample_run("local001", 1);
330        run.issue_source = "local".to_string();
331        insert_run(&conn, &run).unwrap();
332
333        let retrieved = get_run(&conn, "local001").unwrap().unwrap();
334        assert_eq!(retrieved.issue_source, "local");
335    }
336
337    #[test]
338    fn issue_source_defaults_to_github() {
339        let conn = test_db();
340        insert_run(&conn, &sample_run("gh000001", 1)).unwrap();
341
342        let retrieved = get_run(&conn, "gh000001").unwrap().unwrap();
343        assert_eq!(retrieved.issue_source, "github");
344    }
345}