use crate::types::*;
use anyhow::Result;
use rusqlite::{params, Connection};
use std::{path::Path, sync::Mutex};
pub struct Store {
conn: Mutex<Connection>,
}
impl Store {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let conn = Connection::open(path)?;
conn.execute_batch("
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY, orchestrator_id TEXT,
name TEXT NOT NULL, repo TEXT NOT NULL,
status TEXT NOT NULL, agent_type TEXT NOT NULL,
cost_usd REAL NOT NULL DEFAULT 0, started_at INTEGER NOT NULL,
pr_number INTEGER, pr_id INTEGER,
workspace_path TEXT, pid INTEGER
);
CREATE TABLE IF NOT EXISTS orchestrators (
id TEXT PRIMARY KEY, name TEXT NOT NULL, created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS prs (
id INTEGER PRIMARY KEY, number INTEGER NOT NULL,
title TEXT NOT NULL, url TEXT NOT NULL,
body TEXT NOT NULL, session_id TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS ci_status (
pr_id INTEGER PRIMARY KEY, total INTEGER NOT NULL,
passing INTEGER NOT NULL, failing INTEGER NOT NULL,
pending INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS review_comments (
id INTEGER PRIMARY KEY, pr_id INTEGER NOT NULL,
author TEXT NOT NULL, body TEXT NOT NULL,
path TEXT, line INTEGER, created_at INTEGER NOT NULL
);
")?;
Ok(Self { conn: Mutex::new(conn) })
}
pub fn upsert_session(&self, s: &Session) -> Result<()> {
let status = serde_json::to_string(&s.status)?.replace('"', "");
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO sessions (id,orchestrator_id,name,repo,status,agent_type,
cost_usd,started_at,pr_number,pr_id,workspace_path,pid)
VALUES(?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12)
ON CONFLICT(id) DO UPDATE SET
status=excluded.status,cost_usd=excluded.cost_usd,
pr_number=excluded.pr_number,pr_id=excluded.pr_id,
workspace_path=excluded.workspace_path,pid=excluded.pid",
params![
s.id, s.orchestrator_id, s.name, s.repo, status, s.agent_type,
s.cost_usd, s.started_at, s.pr_number, s.pr_id,
s.workspace_path, s.pid
],
)?;
Ok(())
}
pub fn list_sessions(&self) -> Result<Vec<Session>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id,orchestrator_id,name,repo,status,agent_type,cost_usd,
started_at,pr_number,pr_id,workspace_path,pid
FROM sessions ORDER BY started_at DESC",
)?;
let rows = stmt.query_map([], |r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, Option<String>>(1)?,
r.get::<_, String>(2)?,
r.get::<_, String>(3)?,
r.get::<_, String>(4)?,
r.get::<_, String>(5)?,
r.get::<_, f64>(6)?,
r.get::<_, i64>(7)?,
r.get::<_, Option<u64>>(8)?,
r.get::<_, Option<i64>>(9)?,
r.get::<_, Option<String>>(10)?,
r.get::<_, Option<u32>>(11)?,
))
})?;
rows.map(|r| {
let (id, orchestrator_id, name, repo, status_str, agent_type,
cost_usd, started_at, pr_number, pr_id, workspace_path, pid) = r?;
let status = serde_json::from_str(&format!("\"{status_str}\""))
.unwrap_or(SessionStatus::Working);
Ok(Session {
id, orchestrator_id, name, repo, status, agent_type,
cost_usd, started_at, pr_number, pr_id, workspace_path, pid,
})
})
.collect()
}
pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id,orchestrator_id,name,repo,status,agent_type,cost_usd,
started_at,pr_number,pr_id,workspace_path,pid
FROM sessions WHERE id = ?1",
)?;
let mut rows = stmt.query_map([id], |r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, Option<String>>(1)?,
r.get::<_, String>(2)?,
r.get::<_, String>(3)?,
r.get::<_, String>(4)?,
r.get::<_, String>(5)?,
r.get::<_, f64>(6)?,
r.get::<_, i64>(7)?,
r.get::<_, Option<u64>>(8)?,
r.get::<_, Option<i64>>(9)?,
r.get::<_, Option<String>>(10)?,
r.get::<_, Option<u32>>(11)?,
))
})?;
match rows.next() {
None => Ok(None),
Some(r) => {
let (id, orchestrator_id, name, repo, status_str, agent_type,
cost_usd, started_at, pr_number, pr_id, workspace_path, pid) = r?;
let status = serde_json::from_str(&format!("\"{status_str}\""))
.unwrap_or(SessionStatus::Working);
Ok(Some(Session {
id, orchestrator_id, name, repo, status, agent_type,
cost_usd, started_at, pr_number, pr_id, workspace_path, pid,
}))
}
}
}
pub fn upsert_orchestrator(&self, o: &Orchestrator) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO orchestrators(id,name,created_at) VALUES(?1,?2,?3)
ON CONFLICT(id) DO UPDATE SET name=excluded.name",
params![o.id, o.name, o.created_at],
)?;
Ok(())
}
pub fn sessions_by_orchestrator(&self, orchestrator_id: &str) -> Result<Vec<Session>> {
let sessions = self.list_sessions()?;
Ok(sessions.into_iter().filter(|s| s.orchestrator_id.as_deref() == Some(orchestrator_id)).collect())
}
pub fn delete_session(&self, id: &str) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
Ok(())
}
pub fn delete_orchestrator(&self, id: &str) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute("DELETE FROM sessions WHERE orchestrator_id = ?1", [id])?;
conn.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
conn.execute("DELETE FROM orchestrators WHERE id = ?1", [id])?;
Ok(())
}
pub fn upsert_pr(&self, pr: &PR) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT OR REPLACE INTO prs
(id, number, title, url, body, session_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![pr.id, pr.number, pr.title, pr.url, pr.body, pr.session_id],
)?;
Ok(())
}
pub fn upsert_ci_status(&self, ci: &CIStatus) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT OR REPLACE INTO ci_status
(pr_id, total, passing, failing, pending)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![ci.pr_id, ci.total, ci.passing, ci.failing, ci.pending],
)?;
Ok(())
}
pub fn upsert_comment(&self, c: &Comment) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT OR REPLACE INTO review_comments
(id, pr_id, author, body, path, line, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![c.id, c.pr_id, c.author, c.body, c.path, c.line, c.created_at],
)?;
Ok(())
}
pub fn list_orchestrators(&self) -> Result<Vec<Orchestrator>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id,name,created_at FROM orchestrators ORDER BY created_at DESC",
)?;
let rows = stmt.query_map([], |r| {
Ok(Orchestrator {
id: r.get(0)?,
name: r.get(1)?,
created_at: r.get(2)?,
})
})?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn test_store() -> Store {
let dir = tempdir().unwrap();
let path = dir.path().join("t.db");
std::mem::forget(dir);
Store::open(path).unwrap()
}
#[test]
fn upsert_and_list_session() {
let store = test_store();
let session = Session {
id: "s1".into(), orchestrator_id: None, name: "worker-1".into(),
repo: "slievr/Athene".into(), status: SessionStatus::Working,
agent_type: "claude-code".into(), cost_usd: 0.0, started_at: 0,
pr_number: None, pr_id: None, workspace_path: None, pid: None,
};
store.upsert_session(&session).unwrap();
let list = store.list_sessions().unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0].id, "s1");
}
#[test]
fn upsert_updates_status() {
let store = test_store();
let mut s = Session {
id: "s1".into(), orchestrator_id: None, name: "w".into(),
repo: "r".into(), status: SessionStatus::Working,
agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
pr_number: None, pr_id: None, workspace_path: None, pid: None,
};
store.upsert_session(&s).unwrap();
s.status = SessionStatus::Done;
store.upsert_session(&s).unwrap();
let list = store.list_sessions().unwrap();
assert_eq!(list.len(), 1);
assert!(matches!(list[0].status, SessionStatus::Done));
}
#[test]
fn get_session_by_id() {
let store = test_store();
let s = Session {
id: "s1".into(), orchestrator_id: None, name: "w".into(),
repo: "r".into(), status: SessionStatus::Working,
agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
pr_number: None, pr_id: None, workspace_path: None, pid: None,
};
store.upsert_session(&s).unwrap();
let found = store.get_session("s1").unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().name, "w");
assert!(store.get_session("missing").unwrap().is_none());
}
}