use anyhow::Result;
use rusqlite::{params, Connection};
use std::path::Path;
use std::sync::{Arc, Mutex};
pub struct CloudDb {
conn: Arc<Mutex<Connection>>,
}
impl CloudDb {
pub fn open(path: &Path) -> Result<Self> {
let conn = Connection::open(path)?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
let db = Self {
conn: Arc::new(Mutex::new(conn)),
};
db.migrate()?;
Ok(db)
}
pub fn open_in_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
let db = Self {
conn: Arc::new(Mutex::new(conn)),
};
db.migrate()?;
Ok(db)
}
fn migrate(&self) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute_batch(SCHEMA)?;
Ok(())
}
pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
self.conn.lock().unwrap()
}
}
impl Clone for CloudDb {
fn clone(&self) -> Self {
Self {
conn: self.conn.clone(),
}
}
}
pub struct User {
pub id: String,
pub email: String,
pub tier: String,
pub created_at: String,
}
pub struct VaultRecord {
pub project_hash: String,
pub user_id: String,
pub facts_json: String,
pub server_version: i64,
pub updated_at: String,
}
pub struct UsageRecord {
pub user_id: String,
pub tokens_sent: i64,
pub tokens_saved: i64,
pub skill: String,
pub recorded_at: String,
}
impl CloudDb {
pub fn upsert_user(&self, id: &str, email: &str, tier: &str) -> Result<()> {
let conn = self.conn();
conn.execute(
"INSERT INTO users(id, email, tier, created_at) VALUES(?1,?2,?3,datetime('now'))
ON CONFLICT(id) DO UPDATE SET tier=excluded.tier",
params![id, email, tier],
)?;
Ok(())
}
pub fn set_user_tier(&self, id: &str, tier: &str) -> Result<()> {
let conn = self.conn();
conn.execute("UPDATE users SET tier=?1 WHERE id=?2", params![tier, id])?;
Ok(())
}
pub fn get_user(&self, id: &str) -> Option<User> {
let conn = self.conn();
conn.query_row(
"SELECT id,email,tier,created_at FROM users WHERE id=?1",
params![id],
|row| {
Ok(User {
id: row.get(0)?,
email: row.get(1)?,
tier: row.get(2)?,
created_at: row.get(3)?,
})
},
)
.ok()
}
pub fn upsert_vault(&self, user_id: &str, project_hash: &str, facts_json: &str) -> Result<i64> {
let conn = self.conn();
conn.execute(
"INSERT INTO vaults(project_hash,user_id,facts_json,server_version,updated_at)
VALUES(?1,?2,?3,1,datetime('now'))
ON CONFLICT(project_hash,user_id) DO UPDATE SET
facts_json=excluded.facts_json,
server_version=server_version+1,
updated_at=excluded.updated_at",
params![project_hash, user_id, facts_json],
)?;
let version: i64 = conn.query_row(
"SELECT server_version FROM vaults WHERE project_hash=?1 AND user_id=?2",
params![project_hash, user_id],
|row| row.get(0),
)?;
Ok(version)
}
pub fn get_vault(&self, user_id: &str, project_hash: &str) -> Option<VaultRecord> {
let conn = self.conn();
conn.query_row(
"SELECT project_hash,user_id,facts_json,server_version,updated_at
FROM vaults WHERE project_hash=?1 AND user_id=?2",
params![project_hash, user_id],
|row| {
Ok(VaultRecord {
project_hash: row.get(0)?,
user_id: row.get(1)?,
facts_json: row.get(2)?,
server_version: row.get(3)?,
updated_at: row.get(4)?,
})
},
)
.ok()
}
pub fn check_signal_rate_limit(&self, user_id: &str) -> bool {
let conn = self.conn();
let per_min: i64 = conn
.query_row(
"SELECT COUNT(*) FROM usage WHERE user_id=?1 AND recorded_at >= datetime('now','-1 minute')",
params![user_id],
|r| r.get(0),
)
.unwrap_or(0);
if per_min >= 120 {
return false;
}
let per_day: i64 = conn
.query_row(
"SELECT COUNT(*) FROM usage WHERE user_id=?1 AND recorded_at >= datetime('now','-1 day')",
params![user_id],
|r| r.get(0),
)
.unwrap_or(0);
per_day < 10_000
}
pub fn record_usage(
&self,
user_id: &str,
tokens_sent: i64,
tokens_saved: i64,
skill: &str,
) -> Result<()> {
let conn = self.conn();
conn.execute(
"INSERT INTO usage(user_id,tokens_sent,tokens_saved,skill,recorded_at)
VALUES(?1,?2,?3,?4,datetime('now'))",
params![user_id, tokens_sent, tokens_saved, skill],
)?;
Ok(())
}
pub fn usage_summary(&self, user_id: &str, days: i64) -> (i64, i64) {
let conn = self.conn();
conn.query_row(
"SELECT COALESCE(SUM(tokens_sent),0), COALESCE(SUM(tokens_saved),0)
FROM usage WHERE user_id=?1 AND recorded_at >= datetime('now', ?2)",
params![user_id, format!("-{days} days")],
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
)
.unwrap_or((0, 0))
}
pub fn store_device_code(
&self,
device_code: &str,
user_code: &str,
email: &str,
expires_at: &str,
) -> Result<()> {
let conn = self.conn();
conn.execute(
"INSERT OR REPLACE INTO device_codes(device_code,user_code,email,authorized,expires_at)
VALUES(?1,?2,?3,0,?4)",
params![device_code, user_code, email, expires_at],
)?;
Ok(())
}
pub fn authorize_device_code(&self, user_code: &str, user_id: &str) -> Result<bool> {
let conn = self.conn();
let n = conn.execute(
"UPDATE device_codes SET authorized=1,user_id=?1 WHERE user_code=?2 AND datetime('now')<expires_at",
params![user_id, user_code],
)?;
Ok(n > 0)
}
pub fn poll_device_code(&self, device_code: &str) -> Option<String> {
let conn = self.conn();
conn.query_row(
"SELECT user_id FROM device_codes WHERE device_code=?1 AND authorized=1 AND datetime('now')<expires_at",
params![device_code],
|row| row.get(0),
).ok()
}
pub fn usage_by_day(&self, user_id: &str, days: i64) -> Vec<(String, i64, i64)> {
let conn = self.conn();
let window = format!("-{days} days");
let mut stmt = match conn.prepare(
"SELECT date(recorded_at) as day,
COALESCE(SUM(tokens_sent),0),
COALESCE(SUM(tokens_saved),0)
FROM usage
WHERE user_id=?1 AND recorded_at >= datetime('now', ?2)
GROUP BY date(recorded_at)
ORDER BY day ASC",
) {
Ok(s) => s,
Err(_) => return vec![],
};
stmt.query_map(params![user_id, window], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, i64>(2)?,
))
})
.ok()
.map(|rows| rows.filter_map(|r| r.ok()).collect())
.unwrap_or_default()
}
pub fn skill_breakdown(&self, user_id: &str, days: i64) -> Vec<(String, i64)> {
let conn = self.conn();
let window = format!("-{days} days");
let mut stmt = match conn.prepare(
"SELECT skill, COALESCE(SUM(tokens_sent),0) as total
FROM usage
WHERE user_id=?1 AND skill!='' AND recorded_at >= datetime('now', ?2)
GROUP BY skill
ORDER BY total DESC
LIMIT 10",
) {
Ok(s) => s,
Err(_) => return vec![],
};
stmt.query_map(params![user_id, window], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
})
.ok()
.map(|rows| rows.filter_map(|r| r.ok()).collect())
.unwrap_or_default()
}
}
const SCHEMA: &str = "
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
email TEXT NOT NULL UNIQUE,
tier TEXT NOT NULL DEFAULT 'free',
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS vaults (
project_hash TEXT NOT NULL,
user_id TEXT NOT NULL,
facts_json TEXT NOT NULL,
server_version INTEGER NOT NULL DEFAULT 1,
updated_at TEXT NOT NULL,
PRIMARY KEY (project_hash, user_id)
);
CREATE TABLE IF NOT EXISTS usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
tokens_sent INTEGER NOT NULL DEFAULT 0,
tokens_saved INTEGER NOT NULL DEFAULT 0,
skill TEXT NOT NULL DEFAULT '',
recorded_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS device_codes (
device_code TEXT PRIMARY KEY,
user_code TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
authorized INTEGER NOT NULL DEFAULT 0,
user_id TEXT,
expires_at TEXT NOT NULL
);
";