Skip to main content

oven_cli/db/
mod.rs

1pub mod agent_runs;
2pub mod graph;
3pub mod runs;
4
5use std::path::Path;
6
7use anyhow::Context;
8use rusqlite::Connection;
9use rusqlite_migration::{M, Migrations};
10
11pub static MIGRATIONS: std::sync::LazyLock<Migrations<'static>> = std::sync::LazyLock::new(|| {
12    Migrations::new(vec![
13        M::up(
14            "CREATE TABLE runs (
15    id TEXT PRIMARY KEY,
16    issue_number INTEGER NOT NULL,
17    status TEXT NOT NULL DEFAULT 'pending',
18    pr_number INTEGER,
19    branch TEXT,
20    worktree_path TEXT,
21    cost_usd REAL NOT NULL DEFAULT 0.0,
22    auto_merge INTEGER NOT NULL DEFAULT 0,
23    started_at TEXT NOT NULL DEFAULT (datetime('now')),
24    finished_at TEXT,
25    error_message TEXT
26);
27
28CREATE TABLE agent_runs (
29    id INTEGER PRIMARY KEY AUTOINCREMENT,
30    run_id TEXT NOT NULL REFERENCES runs(id) ON DELETE CASCADE,
31    agent TEXT NOT NULL,
32    cycle INTEGER NOT NULL DEFAULT 1,
33    status TEXT NOT NULL DEFAULT 'pending',
34    cost_usd REAL NOT NULL DEFAULT 0.0,
35    turns INTEGER NOT NULL DEFAULT 0,
36    started_at TEXT NOT NULL DEFAULT (datetime('now')),
37    finished_at TEXT,
38    output_summary TEXT,
39    error_message TEXT
40);
41
42CREATE TABLE review_findings (
43    id INTEGER PRIMARY KEY AUTOINCREMENT,
44    agent_run_id INTEGER NOT NULL REFERENCES agent_runs(id) ON DELETE CASCADE,
45    severity TEXT NOT NULL CHECK (severity IN ('critical', 'warning', 'info')),
46    category TEXT NOT NULL,
47    file_path TEXT,
48    line_number INTEGER,
49    message TEXT NOT NULL,
50    resolved INTEGER NOT NULL DEFAULT 0
51);
52
53CREATE INDEX idx_runs_status ON runs(status);
54CREATE INDEX idx_runs_issue ON runs(issue_number);
55CREATE INDEX idx_agent_runs_run ON agent_runs(run_id);
56CREATE INDEX idx_findings_agent_run ON review_findings(agent_run_id);
57CREATE INDEX idx_findings_severity ON review_findings(severity);",
58        ),
59        M::up("ALTER TABLE runs ADD COLUMN complexity TEXT NOT NULL DEFAULT 'full';"),
60        M::up("ALTER TABLE runs ADD COLUMN issue_source TEXT NOT NULL DEFAULT 'github';"),
61        M::up("ALTER TABLE agent_runs ADD COLUMN raw_output TEXT;"),
62        M::up(
63            "CREATE TABLE graph_nodes (
64    issue_number INTEGER NOT NULL,
65    session_id TEXT NOT NULL,
66    state TEXT NOT NULL DEFAULT 'pending',
67    pr_number INTEGER,
68    run_id TEXT,
69    title TEXT NOT NULL DEFAULT '',
70    area TEXT NOT NULL DEFAULT '',
71    predicted_files TEXT NOT NULL DEFAULT '[]',
72    has_migration INTEGER NOT NULL DEFAULT 0,
73    complexity TEXT NOT NULL DEFAULT 'full',
74    PRIMARY KEY (issue_number, session_id)
75);
76
77CREATE TABLE graph_edges (
78    session_id TEXT NOT NULL,
79    from_issue INTEGER NOT NULL,
80    to_issue INTEGER NOT NULL,
81    PRIMARY KEY (session_id, from_issue, to_issue),
82    FOREIGN KEY (from_issue, session_id) REFERENCES graph_nodes(issue_number, session_id) ON DELETE CASCADE,
83    FOREIGN KEY (to_issue, session_id) REFERENCES graph_nodes(issue_number, session_id) ON DELETE CASCADE
84);
85
86CREATE INDEX idx_graph_nodes_session ON graph_nodes(session_id);
87CREATE INDEX idx_graph_nodes_state ON graph_nodes(state);
88CREATE INDEX idx_graph_edges_session ON graph_edges(session_id);
89CREATE INDEX idx_graph_edges_to ON graph_edges(to_issue, session_id);",
90        ),
91        M::up("ALTER TABLE graph_nodes ADD COLUMN target_repo TEXT;"),
92        M::up("ALTER TABLE review_findings ADD COLUMN dispute_reason TEXT;"),
93    ])
94});
95
96pub fn open(path: &Path) -> anyhow::Result<Connection> {
97    let mut conn = Connection::open(path).context("opening database")?;
98    configure(&conn)?;
99    MIGRATIONS.to_latest(&mut conn).context("running database migrations")?;
100    Ok(conn)
101}
102
103pub fn open_in_memory() -> anyhow::Result<Connection> {
104    let mut conn = Connection::open_in_memory().context("opening in-memory database")?;
105    configure(&conn)?;
106    MIGRATIONS.to_latest(&mut conn).context("running database migrations")?;
107    Ok(conn)
108}
109
110fn configure(conn: &Connection) -> anyhow::Result<()> {
111    conn.pragma_update(None, "journal_mode", "WAL")?;
112    conn.pragma_update(None, "synchronous", "NORMAL")?;
113    conn.pragma_update(None, "busy_timeout", "5000")?;
114    conn.pragma_update(None, "foreign_keys", "ON")?;
115    Ok(())
116}
117
118/// Run status for pipeline runs.
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
120pub enum RunStatus {
121    Pending,
122    Implementing,
123    Reviewing,
124    Fixing,
125    AwaitingMerge,
126    Merging,
127    Complete,
128    Failed,
129}
130
131impl std::fmt::Display for RunStatus {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.write_str(match self {
134            Self::Pending => "pending",
135            Self::Implementing => "implementing",
136            Self::Reviewing => "reviewing",
137            Self::Fixing => "fixing",
138            Self::AwaitingMerge => "awaiting_merge",
139            Self::Merging => "merging",
140            Self::Complete => "complete",
141            Self::Failed => "failed",
142        })
143    }
144}
145
146impl std::str::FromStr for RunStatus {
147    type Err = anyhow::Error;
148
149    fn from_str(s: &str) -> Result<Self, Self::Err> {
150        match s {
151            "pending" => Ok(Self::Pending),
152            "implementing" => Ok(Self::Implementing),
153            "reviewing" => Ok(Self::Reviewing),
154            "fixing" => Ok(Self::Fixing),
155            "awaiting_merge" => Ok(Self::AwaitingMerge),
156            "merging" => Ok(Self::Merging),
157            "complete" => Ok(Self::Complete),
158            "failed" => Ok(Self::Failed),
159            other => anyhow::bail!("unknown run status: {other}"),
160        }
161    }
162}
163
164/// A pipeline run record.
165#[derive(Debug, Clone)]
166pub struct Run {
167    pub id: String,
168    pub issue_number: u32,
169    pub status: RunStatus,
170    pub pr_number: Option<u32>,
171    pub branch: Option<String>,
172    pub worktree_path: Option<String>,
173    pub cost_usd: f64,
174    pub auto_merge: bool,
175    pub started_at: String,
176    pub finished_at: Option<String>,
177    pub error_message: Option<String>,
178    pub complexity: String,
179    pub issue_source: String,
180}
181
182/// An agent execution record.
183#[derive(Debug, Clone)]
184pub struct AgentRun {
185    pub id: i64,
186    pub run_id: String,
187    pub agent: String,
188    pub cycle: u32,
189    pub status: String,
190    pub cost_usd: f64,
191    pub turns: u32,
192    pub started_at: String,
193    pub finished_at: Option<String>,
194    pub output_summary: Option<String>,
195    pub error_message: Option<String>,
196    pub raw_output: Option<String>,
197}
198
199/// A review finding from the reviewer agent.
200#[derive(Debug, Clone)]
201pub struct ReviewFinding {
202    pub id: i64,
203    pub agent_run_id: i64,
204    pub severity: String,
205    pub category: String,
206    pub file_path: Option<String>,
207    pub line_number: Option<u32>,
208    pub message: String,
209    pub resolved: bool,
210    pub dispute_reason: Option<String>,
211}
212
213#[cfg(test)]
214mod tests {
215    use proptest::prelude::*;
216
217    use super::*;
218
219    const ALL_STATUSES: [RunStatus; 8] = [
220        RunStatus::Pending,
221        RunStatus::Implementing,
222        RunStatus::Reviewing,
223        RunStatus::Fixing,
224        RunStatus::AwaitingMerge,
225        RunStatus::Merging,
226        RunStatus::Complete,
227        RunStatus::Failed,
228    ];
229
230    proptest! {
231        #[test]
232        fn run_status_display_fromstr_roundtrip(idx in 0..8usize) {
233            let status = ALL_STATUSES[idx];
234            let s = status.to_string();
235            let parsed: RunStatus = s.parse().unwrap();
236            assert_eq!(status, parsed);
237        }
238
239        #[test]
240        fn arbitrary_strings_never_panic_on_parse(s in "\\PC{1,50}") {
241            // Parsing arbitrary strings should never panic, only return Ok or Err
242            let _ = s.parse::<RunStatus>();
243        }
244    }
245
246    #[test]
247    fn migrations_validate() {
248        MIGRATIONS.validate().unwrap();
249    }
250
251    #[test]
252    fn open_in_memory_succeeds() {
253        let conn = open_in_memory().unwrap();
254        // Verify tables exist by querying them
255        let count: i64 = conn.query_row("SELECT COUNT(*) FROM runs", [], |row| row.get(0)).unwrap();
256        assert_eq!(count, 0);
257    }
258
259    #[test]
260    fn run_status_display_roundtrip() {
261        let statuses = [
262            RunStatus::Pending,
263            RunStatus::Implementing,
264            RunStatus::Reviewing,
265            RunStatus::Fixing,
266            RunStatus::AwaitingMerge,
267            RunStatus::Merging,
268            RunStatus::Complete,
269            RunStatus::Failed,
270        ];
271        for status in statuses {
272            let s = status.to_string();
273            let parsed: RunStatus = s.parse().unwrap();
274            assert_eq!(status, parsed);
275        }
276    }
277
278    #[test]
279    fn run_status_unknown_returns_error() {
280        let result = "banana".parse::<RunStatus>();
281        assert!(result.is_err());
282    }
283}