use anyhow::Result;
use chrono::{DateTime, Local};
use rusqlite::Row;
use super::Store;
use crate::types::*;
const CREATE_TABLES_SQL: &str = "CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
agent TEXT NOT NULL,
prompt TEXT NOT NULL,
resolved_prompt TEXT,
status TEXT NOT NULL DEFAULT 'pending',
parent_task_id TEXT,
workgroup_id TEXT,
caller_kind TEXT,
caller_session_id TEXT,
repo_path TEXT,
worktree_path TEXT,
worktree_branch TEXT,
start_sha TEXT,
log_path TEXT,
output_path TEXT,
tokens INTEGER,
duration_ms INTEGER,
model TEXT,
cost_usd REAL,
created_at TEXT NOT NULL,
completed_at TEXT,
completion_summary TEXT,
peer_review TEXT,
category TEXT,
pending_reason TEXT
);
CREATE TABLE IF NOT EXISTS workgroups (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
shared_context TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL REFERENCES tasks(id),
timestamp TEXT NOT NULL,
event_type TEXT NOT NULL,
detail TEXT NOT NULL,
metadata TEXT
);
CREATE TABLE IF NOT EXISTS findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workgroup_id TEXT NOT NULL,
content TEXT NOT NULL,
source_task_id TEXT,
severity TEXT,
title TEXT,
file TEXT,
lines TEXT,
category TEXT,
confidence TEXT,
verdict TEXT,
score TEXT,
note TEXT,
created_at TEXT NOT NULL,
updated_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_findings_workgroup ON findings(workgroup_id);
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
memory_type TEXT NOT NULL,
content TEXT NOT NULL,
source_task_id TEXT,
agent TEXT,
project_path TEXT,
content_hash TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_memories_project ON memories(project_path);
CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type);
CREATE INDEX IF NOT EXISTS idx_memories_hash ON memories(content_hash);
";
const CREATE_WORKGROUPS_SQL: &str = "CREATE TABLE IF NOT EXISTS workgroups (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
shared_context TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);";
const CREATE_MEMORIES_SQL: &str = "CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
memory_type TEXT NOT NULL,
content TEXT NOT NULL,
source_task_id TEXT,
agent TEXT,
project_path TEXT,
content_hash TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_memories_project ON memories(project_path);
CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type);
CREATE INDEX IF NOT EXISTS idx_memories_hash ON memories(content_hash);";
const CREATE_FINDINGS_SQL: &str = "CREATE TABLE IF NOT EXISTS findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workgroup_id TEXT NOT NULL,
content TEXT NOT NULL,
source_task_id TEXT,
severity TEXT,
title TEXT,
file TEXT,
lines TEXT,
category TEXT,
confidence TEXT,
verdict TEXT,
score TEXT,
note TEXT,
created_at TEXT NOT NULL,
updated_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_findings_workgroup ON findings(workgroup_id);";
pub(super) fn create_tables(store: &Store) -> Result<()> {
store.db().execute_batch(CREATE_TABLES_SQL)?;
Ok(())
}
pub(super) fn migrate(store: &Store) -> Result<()> {
let conn = store.db();
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN model TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN cost_usd REAL;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN exit_code INTEGER;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN parent_task_id TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN workgroup_id TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN caller_kind TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN caller_session_id TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN agent_session_id TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN repo_path TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN resolved_prompt TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN start_sha TEXT;");
let _ = conn.execute_batch(CREATE_WORKGROUPS_SQL);
let _ = conn.execute_batch(CREATE_MEMORIES_SQL);
let _ = conn.execute_batch("ALTER TABLE memories ADD COLUMN supersedes TEXT;");
let _ = conn.execute_batch("ALTER TABLE memories ADD COLUMN version INTEGER NOT NULL DEFAULT 1;");
let _ = conn.execute_batch("ALTER TABLE memories ADD COLUMN inject_count INTEGER NOT NULL DEFAULT 0;");
let _ = conn.execute_batch("ALTER TABLE memories ADD COLUMN last_injected_at TEXT;");
let _ = conn.execute_batch("ALTER TABLE memories ADD COLUMN success_count INTEGER NOT NULL DEFAULT 0;");
let _ = conn.execute_batch("ALTER TABLE events ADD COLUMN metadata TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN prompt_tokens INTEGER;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN verify TEXT;");
let _ =
conn.execute_batch("ALTER TABLE tasks ADD COLUMN read_only INTEGER NOT NULL DEFAULT 0;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN budget INTEGER NOT NULL DEFAULT 0;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN custom_agent_name TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN completion_summary TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN peer_review TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN category TEXT;");
let _ = conn.execute_batch("ALTER TABLE tasks ADD COLUMN pending_reason TEXT;");
let _ = conn.execute_batch(
"ALTER TABLE tasks ADD COLUMN verify_status TEXT NOT NULL DEFAULT 'skipped';",
);
let _ = conn.execute_batch("ALTER TABLE workgroups ADD COLUMN created_by TEXT;");
let _ = conn.execute_batch(CREATE_FINDINGS_SQL);
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN severity TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN title TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN file TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN lines TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN category TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN confidence TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN verdict TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN score TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN note TEXT;");
let _ = conn.execute_batch("ALTER TABLE findings ADD COLUMN updated_at TEXT;");
let _ = conn.execute_batch("CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON tasks(created_at);");
let _ = conn.execute_batch("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);");
let _ = conn.execute_batch("CREATE INDEX IF NOT EXISTS idx_tasks_workgroup ON tasks(workgroup_id);");
let _ = conn.execute_batch("CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id);");
let _ = conn.execute_batch("CREATE INDEX IF NOT EXISTS idx_events_task_kind ON events(task_id, event_type);");
Ok(())
}
pub(super) fn row_to_task(row: &Row) -> rusqlite::Result<Result<Task>> {
Ok(Ok(Task {
id: TaskId(row.get::<_, String>(0)?),
agent: AgentKind::parse_str(&row.get::<_, String>(1)?).unwrap_or(AgentKind::Custom),
custom_agent_name: row.get(25).ok().flatten(),
prompt: row.get(2)?,
resolved_prompt: row.get(3)?,
category: row.get(28).ok().flatten(),
status: TaskStatus::parse_str(&row.get::<_, String>(4)?).unwrap_or(TaskStatus::Pending),
parent_task_id: row.get(5)?,
workgroup_id: row.get(6)?,
caller_kind: row.get(7)?,
caller_session_id: row.get(8)?,
agent_session_id: row.get(9)?,
repo_path: row.get(10)?,
worktree_path: row.get(11)?,
worktree_branch: row.get(12)?,
start_sha: row.get(13)?,
log_path: row.get(14)?,
output_path: row.get(15)?,
tokens: row.get(16)?,
prompt_tokens: row.get(17)?,
duration_ms: row.get(18)?,
model: row.get(19)?,
cost_usd: row.get(20)?,
exit_code: row.get(28).ok().flatten(),
created_at: parse_dt(&row.get::<_, String>(21)?),
completed_at: row.get::<_, Option<String>>(22)?.map(|s| parse_dt(&s)),
verify: row.get(23)?,
verify_status: row
.get::<_, Option<String>>(27)?
.and_then(|s| VerifyStatus::parse_str(&s))
.unwrap_or(VerifyStatus::Skipped),
pending_reason: row.get(30).ok().flatten(),
read_only: row.get(24)?,
budget: row.get(25)?,
}))
}
pub(super) fn row_to_event(row: &Row) -> rusqlite::Result<TaskEvent> {
let metadata_str: Option<String> = row.get(4)?;
let metadata = metadata_str.and_then(|s| serde_json::from_str(&s).ok());
Ok(TaskEvent {
task_id: TaskId(row.get::<_, String>(0)?),
timestamp: parse_dt(&row.get::<_, String>(1)?),
event_kind: EventKind::parse_str(&row.get::<_, String>(2)?).unwrap_or(EventKind::Reasoning),
detail: row.get(3)?,
metadata,
})
}
pub(super) fn row_to_memory(row: &Row) -> rusqlite::Result<Result<Memory>> {
Ok(Ok(Memory {
id: MemoryId(row.get::<_, String>(0)?),
memory_type: MemoryType::parse_str(&row.get::<_, String>(1)?).unwrap_or(MemoryType::Fact),
content: row.get(2)?,
source_task_id: row.get(3)?,
agent: row.get(4)?,
project_path: row.get(5)?,
content_hash: row.get(6)?,
created_at: parse_dt(&row.get::<_, String>(7)?),
expires_at: row.get::<_, Option<String>>(8)?.map(|s| parse_dt(&s)),
supersedes: row
.get::<_, Option<String>>(9)?
.map(MemoryId),
version: row.get::<_, i64>(10)?,
inject_count: row.get::<_, i64>(11)?,
last_injected_at: row
.get::<_, Option<String>>(12)?
.map(|s| parse_dt(&s)),
success_count: row.get::<_, i64>(13)?,
}))
}
pub(super) fn parse_dt(s: &str) -> DateTime<Local> {
DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&Local))
.unwrap_or_else(|_| Local::now())
}