Skip to main content

ninox_core/
store.rs

1use crate::types::*;
2use anyhow::Result;
3use rusqlite::{params, Connection};
4use std::{path::Path, sync::Mutex};
5
6pub struct Store {
7    conn: Mutex<Connection>,
8}
9
10
11impl Store {
12    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
13        let conn = Connection::open(path)?;
14        conn.execute_batch("
15            PRAGMA journal_mode=WAL;
16            CREATE TABLE IF NOT EXISTS sessions (
17                id TEXT PRIMARY KEY, orchestrator_id TEXT,
18                name TEXT NOT NULL, repo TEXT NOT NULL,
19                status TEXT NOT NULL, agent_type TEXT NOT NULL,
20                cost_usd REAL NOT NULL DEFAULT 0, started_at INTEGER NOT NULL,
21                pr_number INTEGER, pr_id INTEGER,
22                workspace_path TEXT, pid INTEGER
23            );
24            CREATE TABLE IF NOT EXISTS orchestrators (
25                id TEXT PRIMARY KEY, name TEXT NOT NULL, created_at INTEGER NOT NULL
26            );
27            CREATE TABLE IF NOT EXISTS prs (
28                id INTEGER PRIMARY KEY, number INTEGER NOT NULL,
29                title TEXT NOT NULL, url TEXT NOT NULL,
30                body TEXT NOT NULL, session_id TEXT NOT NULL
31            );
32            CREATE TABLE IF NOT EXISTS ci_status (
33                pr_id INTEGER PRIMARY KEY, total INTEGER NOT NULL,
34                passing INTEGER NOT NULL, failing INTEGER NOT NULL,
35                pending INTEGER NOT NULL
36            );
37            CREATE TABLE IF NOT EXISTS review_comments (
38                id INTEGER PRIMARY KEY, pr_id INTEGER NOT NULL,
39                author TEXT NOT NULL, body TEXT NOT NULL,
40                path TEXT, line INTEGER, created_at INTEGER NOT NULL
41            );
42        ")?;
43        Ok(Self { conn: Mutex::new(conn) })
44    }
45
46    pub fn upsert_session(&self, s: &Session) -> Result<()> {
47        let status = serde_json::to_string(&s.status)?.replace('"', "");
48        let conn = self.conn.lock().unwrap();
49        conn.execute(
50            "INSERT INTO sessions (id,orchestrator_id,name,repo,status,agent_type,
51             cost_usd,started_at,pr_number,pr_id,workspace_path,pid)
52             VALUES(?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12)
53             ON CONFLICT(id) DO UPDATE SET
54             status=excluded.status,cost_usd=excluded.cost_usd,
55             pr_number=excluded.pr_number,pr_id=excluded.pr_id,
56             workspace_path=excluded.workspace_path,pid=excluded.pid",
57            params![
58                s.id, s.orchestrator_id, s.name, s.repo, status, s.agent_type,
59                s.cost_usd, s.started_at, s.pr_number, s.pr_id,
60                s.workspace_path, s.pid
61            ],
62        )?;
63        Ok(())
64    }
65
66    pub fn list_sessions(&self) -> Result<Vec<Session>> {
67        let conn = self.conn.lock().unwrap();
68        let mut stmt = conn.prepare(
69            "SELECT id,orchestrator_id,name,repo,status,agent_type,cost_usd,
70             started_at,pr_number,pr_id,workspace_path,pid
71             FROM sessions ORDER BY started_at DESC",
72        )?;
73        let rows = stmt.query_map([], |r| {
74            Ok((
75                r.get::<_, String>(0)?,
76                r.get::<_, Option<String>>(1)?,
77                r.get::<_, String>(2)?,
78                r.get::<_, String>(3)?,
79                r.get::<_, String>(4)?,
80                r.get::<_, String>(5)?,
81                r.get::<_, f64>(6)?,
82                r.get::<_, i64>(7)?,
83                r.get::<_, Option<u64>>(8)?,
84                r.get::<_, Option<i64>>(9)?,
85                r.get::<_, Option<String>>(10)?,
86                r.get::<_, Option<u32>>(11)?,
87            ))
88        })?;
89        rows.map(|r| {
90            let (id, orchestrator_id, name, repo, status_str, agent_type,
91                 cost_usd, started_at, pr_number, pr_id, workspace_path, pid) = r?;
92            let status = serde_json::from_str(&format!("\"{status_str}\""))
93                .unwrap_or(SessionStatus::Working);
94            Ok(Session {
95                id, orchestrator_id, name, repo, status, agent_type,
96                cost_usd, started_at, pr_number, pr_id, workspace_path, pid,
97            })
98        })
99        .collect()
100    }
101
102    pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
103        let conn = self.conn.lock().unwrap();
104        let mut stmt = conn.prepare(
105            "SELECT id,orchestrator_id,name,repo,status,agent_type,cost_usd,
106             started_at,pr_number,pr_id,workspace_path,pid
107             FROM sessions WHERE id = ?1",
108        )?;
109        let mut rows = stmt.query_map([id], |r| {
110            Ok((
111                r.get::<_, String>(0)?,
112                r.get::<_, Option<String>>(1)?,
113                r.get::<_, String>(2)?,
114                r.get::<_, String>(3)?,
115                r.get::<_, String>(4)?,
116                r.get::<_, String>(5)?,
117                r.get::<_, f64>(6)?,
118                r.get::<_, i64>(7)?,
119                r.get::<_, Option<u64>>(8)?,
120                r.get::<_, Option<i64>>(9)?,
121                r.get::<_, Option<String>>(10)?,
122                r.get::<_, Option<u32>>(11)?,
123            ))
124        })?;
125        match rows.next() {
126            None => Ok(None),
127            Some(r) => {
128                let (id, orchestrator_id, name, repo, status_str, agent_type,
129                     cost_usd, started_at, pr_number, pr_id, workspace_path, pid) = r?;
130                let status = serde_json::from_str(&format!("\"{status_str}\""))
131                    .unwrap_or(SessionStatus::Working);
132                Ok(Some(Session {
133                    id, orchestrator_id, name, repo, status, agent_type,
134                    cost_usd, started_at, pr_number, pr_id, workspace_path, pid,
135                }))
136            }
137        }
138    }
139
140    pub fn upsert_orchestrator(&self, o: &Orchestrator) -> Result<()> {
141        let conn = self.conn.lock().unwrap();
142        conn.execute(
143            "INSERT INTO orchestrators(id,name,created_at) VALUES(?1,?2,?3)
144             ON CONFLICT(id) DO UPDATE SET name=excluded.name",
145            params![o.id, o.name, o.created_at],
146        )?;
147        Ok(())
148    }
149
150    pub fn sessions_by_orchestrator(&self, orchestrator_id: &str) -> Result<Vec<Session>> {
151        let sessions = self.list_sessions()?;
152        Ok(sessions.into_iter().filter(|s| s.orchestrator_id.as_deref() == Some(orchestrator_id)).collect())
153    }
154
155    pub fn delete_session(&self, id: &str) -> Result<()> {
156        let conn = self.conn.lock().unwrap();
157        conn.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
158        Ok(())
159    }
160
161    pub fn delete_orchestrator(&self, id: &str) -> Result<()> {
162        let conn = self.conn.lock().unwrap();
163        conn.execute("DELETE FROM sessions WHERE orchestrator_id = ?1", [id])?;
164        conn.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
165        conn.execute("DELETE FROM orchestrators WHERE id = ?1", [id])?;
166        Ok(())
167    }
168
169    pub fn upsert_pr(&self, pr: &PR) -> Result<()> {
170        let conn = self.conn.lock().unwrap();
171        conn.execute(
172            "INSERT OR REPLACE INTO prs
173             (id, number, title, url, body, session_id)
174             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
175            params![pr.id, pr.number, pr.title, pr.url, pr.body, pr.session_id],
176        )?;
177        Ok(())
178    }
179
180    pub fn upsert_ci_status(&self, ci: &CIStatus) -> Result<()> {
181        let conn = self.conn.lock().unwrap();
182        conn.execute(
183            "INSERT OR REPLACE INTO ci_status
184             (pr_id, total, passing, failing, pending)
185             VALUES (?1, ?2, ?3, ?4, ?5)",
186            params![ci.pr_id, ci.total, ci.passing, ci.failing, ci.pending],
187        )?;
188        Ok(())
189    }
190
191    pub fn upsert_comment(&self, c: &Comment) -> Result<()> {
192        let conn = self.conn.lock().unwrap();
193        conn.execute(
194            "INSERT OR REPLACE INTO review_comments
195             (id, pr_id, author, body, path, line, created_at)
196             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
197            params![c.id, c.pr_id, c.author, c.body, c.path, c.line, c.created_at],
198        )?;
199        Ok(())
200    }
201
202    pub fn list_orchestrators(&self) -> Result<Vec<Orchestrator>> {
203        let conn = self.conn.lock().unwrap();
204        let mut stmt = conn.prepare(
205            "SELECT id,name,created_at FROM orchestrators ORDER BY created_at DESC",
206        )?;
207        let rows = stmt.query_map([], |r| {
208            Ok(Orchestrator {
209                id: r.get(0)?,
210                name: r.get(1)?,
211                created_at: r.get(2)?,
212            })
213        })?;
214        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use tempfile::tempdir;
222
223    fn test_store() -> Store {
224        let dir = tempdir().unwrap();
225        let path = dir.path().join("t.db");
226        // keep dir alive for the lifetime of the test by leaking it
227        std::mem::forget(dir);
228        Store::open(path).unwrap()
229    }
230
231    #[test]
232    fn upsert_and_list_session() {
233        let store = test_store();
234        let session = Session {
235            id: "s1".into(), orchestrator_id: None, name: "worker-1".into(),
236            repo: "slievr/Athene".into(), status: SessionStatus::Working,
237            agent_type: "claude-code".into(), cost_usd: 0.0, started_at: 0,
238            pr_number: None, pr_id: None, workspace_path: None, pid: None,
239        };
240        store.upsert_session(&session).unwrap();
241        let list = store.list_sessions().unwrap();
242        assert_eq!(list.len(), 1);
243        assert_eq!(list[0].id, "s1");
244    }
245
246    #[test]
247    fn upsert_updates_status() {
248        let store = test_store();
249        let mut s = Session {
250            id: "s1".into(), orchestrator_id: None, name: "w".into(),
251            repo: "r".into(), status: SessionStatus::Working,
252            agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
253            pr_number: None, pr_id: None, workspace_path: None, pid: None,
254        };
255        store.upsert_session(&s).unwrap();
256        s.status = SessionStatus::Done;
257        store.upsert_session(&s).unwrap();
258        let list = store.list_sessions().unwrap();
259        assert_eq!(list.len(), 1);
260        assert!(matches!(list[0].status, SessionStatus::Done));
261    }
262
263    #[test]
264    fn get_session_by_id() {
265        let store = test_store();
266        let s = Session {
267            id: "s1".into(), orchestrator_id: None, name: "w".into(),
268            repo: "r".into(), status: SessionStatus::Working,
269            agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
270            pr_number: None, pr_id: None, workspace_path: None, pid: None,
271        };
272        store.upsert_session(&s).unwrap();
273        let found = store.get_session("s1").unwrap();
274        assert!(found.is_some());
275        assert_eq!(found.unwrap().name, "w");
276        assert!(store.get_session("missing").unwrap().is_none());
277    }
278}