1use anyhow::{Context, Result};
2use rusqlite::{Connection, params};
3
4use super::{Run, RunStatus};
5
6pub fn insert_run(conn: &Connection, run: &Run) -> Result<()> {
7 conn.execute(
8 "INSERT INTO runs (id, issue_number, status, pr_number, branch, worktree_path, \
9 cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
10 issue_source) \
11 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
12 params![
13 run.id,
14 run.issue_number,
15 run.status.to_string(),
16 run.pr_number,
17 run.branch,
18 run.worktree_path,
19 run.cost_usd,
20 run.auto_merge,
21 run.started_at,
22 run.finished_at,
23 run.error_message,
24 run.complexity,
25 run.issue_source,
26 ],
27 )
28 .context("inserting run")?;
29 Ok(())
30}
31
32pub fn get_run(conn: &Connection, id: &str) -> Result<Option<Run>> {
33 let mut stmt = conn
34 .prepare(
35 "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
36 cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
37 issue_source \
38 FROM runs WHERE id = ?1",
39 )
40 .context("preparing get_run")?;
41
42 let mut rows = stmt.query_map(params![id], row_to_run).context("querying run")?;
43 match rows.next() {
44 Some(row) => Ok(Some(row.context("reading run row")?)),
45 None => Ok(None),
46 }
47}
48
49pub fn get_runs_by_status(conn: &Connection, status: RunStatus) -> Result<Vec<Run>> {
50 let mut stmt = conn
51 .prepare(
52 "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
53 cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
54 issue_source \
55 FROM runs WHERE status = ?1 ORDER BY started_at",
56 )
57 .context("preparing get_runs_by_status")?;
58
59 let rows = stmt
60 .query_map(params![status.to_string()], row_to_run)
61 .context("querying runs by status")?;
62 rows.collect::<std::result::Result<Vec<_>, _>>().context("collecting runs")
63}
64
65pub fn get_latest_run(conn: &Connection) -> Result<Option<Run>> {
66 let mut stmt = conn
67 .prepare(
68 "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
69 cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
70 issue_source \
71 FROM runs ORDER BY started_at DESC LIMIT 1",
72 )
73 .context("preparing get_latest_run")?;
74
75 let mut rows = stmt.query_map([], row_to_run).context("querying latest run")?;
76 match rows.next() {
77 Some(row) => Ok(Some(row.context("reading latest run row")?)),
78 None => Ok(None),
79 }
80}
81
82pub fn get_active_runs(conn: &Connection) -> Result<Vec<Run>> {
83 let mut stmt = conn
84 .prepare(
85 "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
86 cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
87 issue_source \
88 FROM runs WHERE status NOT IN ('complete', 'failed') ORDER BY started_at",
89 )
90 .context("preparing get_active_runs")?;
91
92 let rows = stmt.query_map([], row_to_run).context("querying active runs")?;
93 rows.collect::<std::result::Result<Vec<_>, _>>().context("collecting active runs")
94}
95
96pub fn get_all_runs(conn: &Connection) -> Result<Vec<Run>> {
97 let mut stmt = conn
98 .prepare(
99 "SELECT id, issue_number, status, pr_number, branch, worktree_path, \
100 cost_usd, auto_merge, started_at, finished_at, error_message, complexity, \
101 issue_source \
102 FROM runs ORDER BY started_at DESC",
103 )
104 .context("preparing get_all_runs")?;
105
106 let rows = stmt.query_map([], row_to_run).context("querying all runs")?;
107 rows.collect::<std::result::Result<Vec<_>, _>>().context("collecting all runs")
108}
109
110pub fn update_run_status(conn: &Connection, id: &str, status: RunStatus) -> Result<()> {
111 conn.execute("UPDATE runs SET status = ?1 WHERE id = ?2", params![status.to_string(), id])
112 .context("updating run status")?;
113 Ok(())
114}
115
116pub fn update_run_pr(conn: &Connection, id: &str, pr_number: u32) -> Result<()> {
117 conn.execute("UPDATE runs SET pr_number = ?1 WHERE id = ?2", params![pr_number, id])
118 .context("updating run PR number")?;
119 Ok(())
120}
121
122pub fn update_run_cost(conn: &Connection, id: &str, cost_usd: f64) -> Result<()> {
123 conn.execute("UPDATE runs SET cost_usd = ?1 WHERE id = ?2", params![cost_usd, id])
124 .context("updating run cost")?;
125 Ok(())
126}
127
128pub fn increment_run_cost(conn: &Connection, id: &str, delta: f64) -> Result<f64> {
133 let tx = conn.unchecked_transaction().context("starting cost transaction")?;
134 tx.execute("UPDATE runs SET cost_usd = cost_usd + ?1 WHERE id = ?2", params![delta, id])
135 .context("incrementing run cost")?;
136 let new_cost: f64 = tx
137 .query_row("SELECT cost_usd FROM runs WHERE id = ?1", params![id], |row| row.get(0))
138 .context("reading updated cost")?;
139 tx.commit().context("committing cost transaction")?;
140 Ok(new_cost)
141}
142
143pub fn finish_run(
144 conn: &Connection,
145 id: &str,
146 status: RunStatus,
147 error_message: Option<&str>,
148) -> Result<()> {
149 conn.execute(
150 "UPDATE runs SET status = ?1, finished_at = datetime('now'), error_message = ?2 \
151 WHERE id = ?3",
152 params![status.to_string(), error_message, id],
153 )
154 .context("finishing run")?;
155 Ok(())
156}
157
158pub fn update_run_worktree(conn: &Connection, id: &str, branch: &str, path: &str) -> Result<()> {
159 conn.execute(
160 "UPDATE runs SET branch = ?1, worktree_path = ?2 WHERE id = ?3",
161 params![branch, path, id],
162 )
163 .context("updating run worktree")?;
164 Ok(())
165}
166
167pub fn update_run_complexity(conn: &Connection, id: &str, complexity: &str) -> Result<()> {
168 conn.execute("UPDATE runs SET complexity = ?1 WHERE id = ?2", params![complexity, id])
169 .context("updating run complexity")?;
170 Ok(())
171}
172
173fn row_to_run(row: &rusqlite::Row<'_>) -> rusqlite::Result<Run> {
174 let status_str: String = row.get(2)?;
175 let status: RunStatus = status_str.parse().map_err(|_| {
176 rusqlite::Error::InvalidColumnType(2, "status".to_string(), rusqlite::types::Type::Text)
177 })?;
178 Ok(Run {
179 id: row.get(0)?,
180 issue_number: row.get(1)?,
181 status,
182 pr_number: row.get(3)?,
183 branch: row.get(4)?,
184 worktree_path: row.get(5)?,
185 cost_usd: row.get(6)?,
186 auto_merge: row.get(7)?,
187 started_at: row.get(8)?,
188 finished_at: row.get(9)?,
189 error_message: row.get(10)?,
190 complexity: row.get(11)?,
191 issue_source: row.get(12)?,
192 })
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use crate::db;
199
200 fn test_db() -> Connection {
201 db::open_in_memory().unwrap()
202 }
203
204 fn sample_run(id: &str, issue: u32) -> Run {
205 Run {
206 id: id.to_string(),
207 issue_number: issue,
208 status: RunStatus::Pending,
209 pr_number: None,
210 branch: None,
211 worktree_path: None,
212 cost_usd: 0.0,
213 auto_merge: false,
214 started_at: "2026-03-12T00:00:00".to_string(),
215 finished_at: None,
216 error_message: None,
217 complexity: "full".to_string(),
218 issue_source: "github".to_string(),
219 }
220 }
221
222 #[test]
223 fn insert_and_get_run() {
224 let conn = test_db();
225 let run = sample_run("abcd1234", 42);
226 insert_run(&conn, &run).unwrap();
227
228 let retrieved = get_run(&conn, "abcd1234").unwrap().unwrap();
229 assert_eq!(retrieved.id, "abcd1234");
230 assert_eq!(retrieved.issue_number, 42);
231 assert_eq!(retrieved.status, RunStatus::Pending);
232 }
233
234 #[test]
235 fn get_nonexistent_run_returns_none() {
236 let conn = test_db();
237 assert!(get_run(&conn, "nope").unwrap().is_none());
238 }
239
240 #[test]
241 fn update_status() {
242 let conn = test_db();
243 insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
244
245 update_run_status(&conn, "abcd1234", RunStatus::Implementing).unwrap();
246 let run = get_run(&conn, "abcd1234").unwrap().unwrap();
247 assert_eq!(run.status, RunStatus::Implementing);
248 }
249
250 #[test]
251 fn update_pr_number() {
252 let conn = test_db();
253 insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
254
255 update_run_pr(&conn, "abcd1234", 99).unwrap();
256 let run = get_run(&conn, "abcd1234").unwrap().unwrap();
257 assert_eq!(run.pr_number, Some(99));
258 }
259
260 #[test]
261 fn update_cost() {
262 let conn = test_db();
263 insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
264
265 update_run_cost(&conn, "abcd1234", 3.50).unwrap();
266 let run = get_run(&conn, "abcd1234").unwrap().unwrap();
267 assert!((run.cost_usd - 3.50).abs() < f64::EPSILON);
268 }
269
270 #[test]
271 fn finish_run_sets_status_and_timestamp() {
272 let conn = test_db();
273 insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
274
275 finish_run(&conn, "abcd1234", RunStatus::Complete, None).unwrap();
276 let run = get_run(&conn, "abcd1234").unwrap().unwrap();
277 assert_eq!(run.status, RunStatus::Complete);
278 assert!(run.finished_at.is_some());
279 }
280
281 #[test]
282 fn finish_run_with_error() {
283 let conn = test_db();
284 insert_run(&conn, &sample_run("abcd1234", 42)).unwrap();
285
286 finish_run(&conn, "abcd1234", RunStatus::Failed, Some("boom")).unwrap();
287 let run = get_run(&conn, "abcd1234").unwrap().unwrap();
288 assert_eq!(run.status, RunStatus::Failed);
289 assert_eq!(run.error_message.as_deref(), Some("boom"));
290 }
291
292 #[test]
293 fn get_runs_by_status_filters() {
294 let conn = test_db();
295 insert_run(&conn, &sample_run("aaa11111", 1)).unwrap();
296 insert_run(&conn, &sample_run("bbb22222", 2)).unwrap();
297
298 update_run_status(&conn, "aaa11111", RunStatus::Complete).unwrap();
299
300 let pending = get_runs_by_status(&conn, RunStatus::Pending).unwrap();
301 assert_eq!(pending.len(), 1);
302 assert_eq!(pending[0].id, "bbb22222");
303
304 let complete = get_runs_by_status(&conn, RunStatus::Complete).unwrap();
305 assert_eq!(complete.len(), 1);
306 assert_eq!(complete[0].id, "aaa11111");
307 }
308
309 #[test]
310 fn get_latest_run_returns_most_recent() {
311 let conn = test_db();
312 let mut run1 = sample_run("aaa11111", 1);
313 run1.started_at = "2026-03-01T00:00:00".to_string();
314 let mut run2 = sample_run("bbb22222", 2);
315 run2.started_at = "2026-03-02T00:00:00".to_string();
316
317 insert_run(&conn, &run1).unwrap();
318 insert_run(&conn, &run2).unwrap();
319
320 let latest = get_latest_run(&conn).unwrap().unwrap();
321 assert_eq!(latest.id, "bbb22222");
322 }
323
324 #[test]
325 fn get_all_runs_returns_ordered() {
326 let conn = test_db();
327 let mut run1 = sample_run("aaa11111", 1);
328 run1.started_at = "2026-03-01T00:00:00".to_string();
329 let mut run2 = sample_run("bbb22222", 2);
330 run2.started_at = "2026-03-02T00:00:00".to_string();
331
332 insert_run(&conn, &run1).unwrap();
333 insert_run(&conn, &run2).unwrap();
334
335 let all = get_all_runs(&conn).unwrap();
336 assert_eq!(all.len(), 2);
337 assert_eq!(all[0].id, "bbb22222"); }
339
340 #[test]
341 fn issue_source_persists() {
342 let conn = test_db();
343 let mut run = sample_run("local001", 1);
344 run.issue_source = "local".to_string();
345 insert_run(&conn, &run).unwrap();
346
347 let retrieved = get_run(&conn, "local001").unwrap().unwrap();
348 assert_eq!(retrieved.issue_source, "local");
349 }
350
351 #[test]
352 fn issue_source_defaults_to_github() {
353 let conn = test_db();
354 insert_run(&conn, &sample_run("gh000001", 1)).unwrap();
355
356 let retrieved = get_run(&conn, "gh000001").unwrap().unwrap();
357 assert_eq!(retrieved.issue_source, "github");
358 }
359}