1use crate::types::*;
2use anyhow::Result;
3use rusqlite::{params, Connection};
4use std::{path::Path, sync::Mutex};
5
6pub struct Store {
7 conn: Mutex<Connection>,
8}
9
10
11impl Store {
12 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
13 let conn = Connection::open(path)?;
14 conn.execute_batch("
15 PRAGMA journal_mode=WAL;
16 CREATE TABLE IF NOT EXISTS sessions (
17 id TEXT PRIMARY KEY, orchestrator_id TEXT,
18 name TEXT NOT NULL, repo TEXT NOT NULL,
19 status TEXT NOT NULL, agent_type TEXT NOT NULL,
20 cost_usd REAL NOT NULL DEFAULT 0, started_at INTEGER NOT NULL,
21 pr_number INTEGER, pr_id INTEGER,
22 workspace_path TEXT, pid INTEGER
23 );
24 CREATE TABLE IF NOT EXISTS orchestrators (
25 id TEXT PRIMARY KEY, name TEXT NOT NULL, created_at INTEGER NOT NULL
26 );
27 CREATE TABLE IF NOT EXISTS prs (
28 id INTEGER PRIMARY KEY, number INTEGER NOT NULL,
29 title TEXT NOT NULL, url TEXT NOT NULL,
30 body TEXT NOT NULL, session_id TEXT NOT NULL
31 );
32 CREATE TABLE IF NOT EXISTS ci_status (
33 pr_id INTEGER PRIMARY KEY, total INTEGER NOT NULL,
34 passing INTEGER NOT NULL, failing INTEGER NOT NULL,
35 pending INTEGER NOT NULL
36 );
37 CREATE TABLE IF NOT EXISTS review_comments (
38 id INTEGER PRIMARY KEY, pr_id INTEGER NOT NULL,
39 author TEXT NOT NULL, body TEXT NOT NULL,
40 path TEXT, line INTEGER, created_at INTEGER NOT NULL
41 );
42 ")?;
43 Ok(Self { conn: Mutex::new(conn) })
44 }
45
46 pub fn upsert_session(&self, s: &Session) -> Result<()> {
47 let status = serde_json::to_string(&s.status)?.replace('"', "");
48 let conn = self.conn.lock().unwrap();
49 conn.execute(
50 "INSERT INTO sessions (id,orchestrator_id,name,repo,status,agent_type,
51 cost_usd,started_at,pr_number,pr_id,workspace_path,pid)
52 VALUES(?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12)
53 ON CONFLICT(id) DO UPDATE SET
54 status=excluded.status,cost_usd=excluded.cost_usd,
55 pr_number=excluded.pr_number,pr_id=excluded.pr_id,
56 workspace_path=excluded.workspace_path,pid=excluded.pid",
57 params![
58 s.id, s.orchestrator_id, s.name, s.repo, status, s.agent_type,
59 s.cost_usd, s.started_at, s.pr_number, s.pr_id,
60 s.workspace_path, s.pid
61 ],
62 )?;
63 Ok(())
64 }
65
66 pub fn list_sessions(&self) -> Result<Vec<Session>> {
67 let conn = self.conn.lock().unwrap();
68 let mut stmt = conn.prepare(
69 "SELECT id,orchestrator_id,name,repo,status,agent_type,cost_usd,
70 started_at,pr_number,pr_id,workspace_path,pid
71 FROM sessions ORDER BY started_at DESC",
72 )?;
73 let rows = stmt.query_map([], |r| {
74 Ok((
75 r.get::<_, String>(0)?,
76 r.get::<_, Option<String>>(1)?,
77 r.get::<_, String>(2)?,
78 r.get::<_, String>(3)?,
79 r.get::<_, String>(4)?,
80 r.get::<_, String>(5)?,
81 r.get::<_, f64>(6)?,
82 r.get::<_, i64>(7)?,
83 r.get::<_, Option<u64>>(8)?,
84 r.get::<_, Option<i64>>(9)?,
85 r.get::<_, Option<String>>(10)?,
86 r.get::<_, Option<u32>>(11)?,
87 ))
88 })?;
89 rows.map(|r| {
90 let (id, orchestrator_id, name, repo, status_str, agent_type,
91 cost_usd, started_at, pr_number, pr_id, workspace_path, pid) = r?;
92 let status = serde_json::from_str(&format!("\"{status_str}\""))
93 .unwrap_or(SessionStatus::Working);
94 Ok(Session {
95 id, orchestrator_id, name, repo, status, agent_type,
96 cost_usd, started_at, pr_number, pr_id, workspace_path, pid,
97 })
98 })
99 .collect()
100 }
101
102 pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
103 let conn = self.conn.lock().unwrap();
104 let mut stmt = conn.prepare(
105 "SELECT id,orchestrator_id,name,repo,status,agent_type,cost_usd,
106 started_at,pr_number,pr_id,workspace_path,pid
107 FROM sessions WHERE id = ?1",
108 )?;
109 let mut rows = stmt.query_map([id], |r| {
110 Ok((
111 r.get::<_, String>(0)?,
112 r.get::<_, Option<String>>(1)?,
113 r.get::<_, String>(2)?,
114 r.get::<_, String>(3)?,
115 r.get::<_, String>(4)?,
116 r.get::<_, String>(5)?,
117 r.get::<_, f64>(6)?,
118 r.get::<_, i64>(7)?,
119 r.get::<_, Option<u64>>(8)?,
120 r.get::<_, Option<i64>>(9)?,
121 r.get::<_, Option<String>>(10)?,
122 r.get::<_, Option<u32>>(11)?,
123 ))
124 })?;
125 match rows.next() {
126 None => Ok(None),
127 Some(r) => {
128 let (id, orchestrator_id, name, repo, status_str, agent_type,
129 cost_usd, started_at, pr_number, pr_id, workspace_path, pid) = r?;
130 let status = serde_json::from_str(&format!("\"{status_str}\""))
131 .unwrap_or(SessionStatus::Working);
132 Ok(Some(Session {
133 id, orchestrator_id, name, repo, status, agent_type,
134 cost_usd, started_at, pr_number, pr_id, workspace_path, pid,
135 }))
136 }
137 }
138 }
139
140 pub fn upsert_orchestrator(&self, o: &Orchestrator) -> Result<()> {
141 let conn = self.conn.lock().unwrap();
142 conn.execute(
143 "INSERT INTO orchestrators(id,name,created_at) VALUES(?1,?2,?3)
144 ON CONFLICT(id) DO UPDATE SET name=excluded.name",
145 params![o.id, o.name, o.created_at],
146 )?;
147 Ok(())
148 }
149
150 pub fn sessions_by_orchestrator(&self, orchestrator_id: &str) -> Result<Vec<Session>> {
151 let sessions = self.list_sessions()?;
152 Ok(sessions.into_iter().filter(|s| s.orchestrator_id.as_deref() == Some(orchestrator_id)).collect())
153 }
154
155 pub fn delete_session(&self, id: &str) -> Result<()> {
156 let conn = self.conn.lock().unwrap();
157 conn.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
158 Ok(())
159 }
160
161 pub fn delete_orchestrator(&self, id: &str) -> Result<()> {
162 let conn = self.conn.lock().unwrap();
163 conn.execute("DELETE FROM sessions WHERE orchestrator_id = ?1", [id])?;
164 conn.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
165 conn.execute("DELETE FROM orchestrators WHERE id = ?1", [id])?;
166 Ok(())
167 }
168
169 pub fn upsert_pr(&self, pr: &PR) -> Result<()> {
170 let conn = self.conn.lock().unwrap();
171 conn.execute(
172 "INSERT OR REPLACE INTO prs
173 (id, number, title, url, body, session_id)
174 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
175 params![pr.id, pr.number, pr.title, pr.url, pr.body, pr.session_id],
176 )?;
177 Ok(())
178 }
179
180 pub fn upsert_ci_status(&self, ci: &CIStatus) -> Result<()> {
181 let conn = self.conn.lock().unwrap();
182 conn.execute(
183 "INSERT OR REPLACE INTO ci_status
184 (pr_id, total, passing, failing, pending)
185 VALUES (?1, ?2, ?3, ?4, ?5)",
186 params![ci.pr_id, ci.total, ci.passing, ci.failing, ci.pending],
187 )?;
188 Ok(())
189 }
190
191 pub fn upsert_comment(&self, c: &Comment) -> Result<()> {
192 let conn = self.conn.lock().unwrap();
193 conn.execute(
194 "INSERT OR REPLACE INTO review_comments
195 (id, pr_id, author, body, path, line, created_at)
196 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
197 params![c.id, c.pr_id, c.author, c.body, c.path, c.line, c.created_at],
198 )?;
199 Ok(())
200 }
201
202 pub fn list_orchestrators(&self) -> Result<Vec<Orchestrator>> {
203 let conn = self.conn.lock().unwrap();
204 let mut stmt = conn.prepare(
205 "SELECT id,name,created_at FROM orchestrators ORDER BY created_at DESC",
206 )?;
207 let rows = stmt.query_map([], |r| {
208 Ok(Orchestrator {
209 id: r.get(0)?,
210 name: r.get(1)?,
211 created_at: r.get(2)?,
212 })
213 })?;
214 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use tempfile::tempdir;
222
223 fn test_store() -> Store {
224 let dir = tempdir().unwrap();
225 let path = dir.path().join("t.db");
226 std::mem::forget(dir);
228 Store::open(path).unwrap()
229 }
230
231 #[test]
232 fn upsert_and_list_session() {
233 let store = test_store();
234 let session = Session {
235 id: "s1".into(), orchestrator_id: None, name: "worker-1".into(),
236 repo: "slievr/Athene".into(), status: SessionStatus::Working,
237 agent_type: "claude-code".into(), cost_usd: 0.0, started_at: 0,
238 pr_number: None, pr_id: None, workspace_path: None, pid: None,
239 };
240 store.upsert_session(&session).unwrap();
241 let list = store.list_sessions().unwrap();
242 assert_eq!(list.len(), 1);
243 assert_eq!(list[0].id, "s1");
244 }
245
246 #[test]
247 fn upsert_updates_status() {
248 let store = test_store();
249 let mut s = Session {
250 id: "s1".into(), orchestrator_id: None, name: "w".into(),
251 repo: "r".into(), status: SessionStatus::Working,
252 agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
253 pr_number: None, pr_id: None, workspace_path: None, pid: None,
254 };
255 store.upsert_session(&s).unwrap();
256 s.status = SessionStatus::Done;
257 store.upsert_session(&s).unwrap();
258 let list = store.list_sessions().unwrap();
259 assert_eq!(list.len(), 1);
260 assert!(matches!(list[0].status, SessionStatus::Done));
261 }
262
263 #[test]
264 fn get_session_by_id() {
265 let store = test_store();
266 let s = Session {
267 id: "s1".into(), orchestrator_id: None, name: "w".into(),
268 repo: "r".into(), status: SessionStatus::Working,
269 agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
270 pr_number: None, pr_id: None, workspace_path: None, pid: None,
271 };
272 store.upsert_session(&s).unwrap();
273 let found = store.get_session("s1").unwrap();
274 assert!(found.is_some());
275 assert_eq!(found.unwrap().name, "w");
276 assert!(store.get_session("missing").unwrap().is_none());
277 }
278}