use anyhow::{Context, Result};
use rusqlite::{Connection, OptionalExtension, params};
use crate::paths;
pub struct Db {
pub conn: Connection,
}
#[derive(Debug, Clone, Default)]
pub struct Cursor {
pub byte_offset: u64,
pub last_uuid: Option<String>,
}
const SCHEMA: &str = r"
CREATE TABLE IF NOT EXISTS usage_events (
message_id TEXT PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'claude_code',
uuid TEXT NOT NULL,
session_id TEXT NOT NULL,
project_path TEXT NOT NULL,
cwd TEXT,
transcript_path TEXT NOT NULL,
timestamp TEXT NOT NULL,
model TEXT NOT NULL,
is_sidechain INTEGER NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_5m INTEGER NOT NULL DEFAULT 0,
cache_creation_1h INTEGER NOT NULL DEFAULT 0,
cache_read_tokens INTEGER NOT NULL DEFAULT 0,
reasoning_tokens INTEGER NOT NULL DEFAULT 0,
cost_usd REAL NOT NULL,
pricing_version TEXT NOT NULL
) STRICT;
CREATE INDEX IF NOT EXISTS idx_ts ON usage_events(timestamp);
CREATE INDEX IF NOT EXISTS idx_model ON usage_events(model);
CREATE INDEX IF NOT EXISTS idx_session ON usage_events(session_id);
CREATE INDEX IF NOT EXISTS idx_project ON usage_events(project_path);
CREATE INDEX IF NOT EXISTS idx_source ON usage_events(source);
CREATE TABLE IF NOT EXISTS file_cursor (
transcript_path TEXT PRIMARY KEY,
byte_offset INTEGER NOT NULL,
last_uuid TEXT,
updated_at TEXT NOT NULL
) STRICT;
CREATE TABLE IF NOT EXISTS schema_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
) STRICT;
";
const SCHEMA_VERSION: &str = "2";
fn migrate(conn: &Connection) -> Result<()> {
let cols = column_names(conn, "usage_events")?;
if !cols.contains(&"source".to_string()) {
conn.execute(
"ALTER TABLE usage_events ADD COLUMN source TEXT NOT NULL DEFAULT 'claude_code'",
[],
)?;
}
if !cols.contains(&"reasoning_tokens".to_string()) {
conn.execute(
"ALTER TABLE usage_events ADD COLUMN reasoning_tokens INTEGER NOT NULL DEFAULT 0",
[],
)?;
}
Ok(())
}
fn column_names(conn: &Connection, table: &str) -> Result<Vec<String>> {
let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
let names = stmt
.query_map([], |r| r.get::<_, String>(1))?
.filter_map(std::result::Result::ok)
.collect();
Ok(names)
}
impl Db {
pub fn open() -> Result<Self> {
paths::ensure_data_dir()?;
let path = paths::db_path()?;
let conn =
Connection::open(&path).with_context(|| format!("opening {}", path.display()))?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "busy_timeout", 5000)?;
conn.pragma_update(None, "foreign_keys", "ON")?;
conn.execute_batch(SCHEMA).context("applying schema")?;
migrate(&conn).context("running migrations")?;
conn.execute(
"INSERT INTO schema_meta(key, value) VALUES('version', ?1)
ON CONFLICT(key) DO UPDATE SET value = excluded.value",
params![SCHEMA_VERSION],
)?;
Ok(Self { conn })
}
pub fn cursor_for(&self, transcript_path: &str) -> Result<Cursor> {
let row = self
.conn
.query_row(
"SELECT byte_offset, last_uuid FROM file_cursor WHERE transcript_path = ?1",
params![transcript_path],
|r| Ok((r.get::<_, i64>(0)?, r.get::<_, Option<String>>(1)?)),
)
.optional()?;
Ok(match row {
Some((off, uuid)) => Cursor {
byte_offset: off as u64,
last_uuid: uuid,
},
None => Cursor::default(),
})
}
pub fn set_cursor(
&self,
transcript_path: &str,
byte_offset: u64,
last_uuid: Option<&str>,
) -> Result<()> {
self.conn.execute(
"INSERT INTO file_cursor(transcript_path, byte_offset, last_uuid, updated_at)
VALUES (?1, ?2, ?3, datetime('now'))
ON CONFLICT(transcript_path) DO UPDATE SET
byte_offset = excluded.byte_offset,
last_uuid = excluded.last_uuid,
updated_at = excluded.updated_at",
params![transcript_path, byte_offset as i64, last_uuid],
)?;
Ok(())
}
pub fn vacuum(&self) -> Result<()> {
let mut stmt = self
.conn
.prepare("SELECT transcript_path FROM file_cursor")?;
let paths: Vec<String> = stmt
.query_map([], |r| r.get::<_, String>(0))?
.filter_map(std::result::Result::ok)
.collect();
drop(stmt);
for p in paths {
if !std::path::Path::new(&p).exists() {
self.conn.execute(
"DELETE FROM file_cursor WHERE transcript_path = ?1",
params![p],
)?;
}
}
self.conn.execute_batch("VACUUM;")?;
Ok(())
}
}