bctx-cloud-core 0.1.6

bctx-cloud-core — cloud client and server for Vault sync, dashboard API, billing
Documentation
use anyhow::Result;
use rusqlite::{params, Connection};
use std::path::Path;
use std::sync::{Arc, Mutex};

pub struct CloudDb {
    conn: Arc<Mutex<Connection>>,
}

impl CloudDb {
    pub fn open(path: &Path) -> Result<Self> {
        let conn = Connection::open(path)?;
        // WAL mode: readers don't block writers, writers don't block readers.
        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
        let db = Self {
            conn: Arc::new(Mutex::new(conn)),
        };
        db.migrate()?;
        Ok(db)
    }

    pub fn open_in_memory() -> Result<Self> {
        let conn = Connection::open_in_memory()?;
        let db = Self {
            conn: Arc::new(Mutex::new(conn)),
        };
        db.migrate()?;
        Ok(db)
    }

    fn migrate(&self) -> Result<()> {
        let conn = self.conn.lock().unwrap();
        conn.execute_batch(SCHEMA)?;
        Ok(())
    }

    pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
        self.conn.lock().unwrap()
    }
}

impl Clone for CloudDb {
    fn clone(&self) -> Self {
        Self {
            conn: self.conn.clone(),
        }
    }
}

pub struct User {
    pub id: String,
    pub email: String,
    pub tier: String,
    pub created_at: String,
}

pub struct VaultRecord {
    pub project_hash: String,
    pub user_id: String,
    pub facts_json: String,
    pub server_version: i64,
    pub updated_at: String,
}

pub struct UsageRecord {
    pub user_id: String,
    pub tokens_sent: i64,
    pub tokens_saved: i64,
    pub skill: String,
    pub recorded_at: String,
}

impl CloudDb {
    pub fn upsert_user(&self, id: &str, email: &str, tier: &str) -> Result<()> {
        let conn = self.conn();
        conn.execute(
            "INSERT INTO users(id, email, tier, created_at) VALUES(?1,?2,?3,datetime('now'))
             ON CONFLICT(id) DO UPDATE SET tier=excluded.tier",
            params![id, email, tier],
        )?;
        Ok(())
    }

    pub fn set_user_tier(&self, id: &str, tier: &str) -> Result<()> {
        let conn = self.conn();
        conn.execute("UPDATE users SET tier=?1 WHERE id=?2", params![tier, id])?;
        Ok(())
    }

    pub fn get_user(&self, id: &str) -> Option<User> {
        let conn = self.conn();
        conn.query_row(
            "SELECT id,email,tier,created_at FROM users WHERE id=?1",
            params![id],
            |row| {
                Ok(User {
                    id: row.get(0)?,
                    email: row.get(1)?,
                    tier: row.get(2)?,
                    created_at: row.get(3)?,
                })
            },
        )
        .ok()
    }

    pub fn upsert_vault(&self, user_id: &str, project_hash: &str, facts_json: &str) -> Result<i64> {
        let conn = self.conn();
        conn.execute(
            "INSERT INTO vaults(project_hash,user_id,facts_json,server_version,updated_at)
             VALUES(?1,?2,?3,1,datetime('now'))
             ON CONFLICT(project_hash,user_id) DO UPDATE SET
               facts_json=excluded.facts_json,
               server_version=server_version+1,
               updated_at=excluded.updated_at",
            params![project_hash, user_id, facts_json],
        )?;
        let version: i64 = conn.query_row(
            "SELECT server_version FROM vaults WHERE project_hash=?1 AND user_id=?2",
            params![project_hash, user_id],
            |row| row.get(0),
        )?;
        Ok(version)
    }

    pub fn get_vault(&self, user_id: &str, project_hash: &str) -> Option<VaultRecord> {
        let conn = self.conn();
        conn.query_row(
            "SELECT project_hash,user_id,facts_json,server_version,updated_at
             FROM vaults WHERE project_hash=?1 AND user_id=?2",
            params![project_hash, user_id],
            |row| {
                Ok(VaultRecord {
                    project_hash: row.get(0)?,
                    user_id: row.get(1)?,
                    facts_json: row.get(2)?,
                    server_version: row.get(3)?,
                    updated_at: row.get(4)?,
                })
            },
        )
        .ok()
    }

    /// Returns true if the user is within rate limits (120/min, 10_000/day).
    pub fn check_signal_rate_limit(&self, user_id: &str) -> bool {
        let conn = self.conn();
        let per_min: i64 = conn
            .query_row(
                "SELECT COUNT(*) FROM usage WHERE user_id=?1 AND recorded_at >= datetime('now','-1 minute')",
                params![user_id],
                |r| r.get(0),
            )
            .unwrap_or(0);
        if per_min >= 120 {
            return false;
        }
        let per_day: i64 = conn
            .query_row(
                "SELECT COUNT(*) FROM usage WHERE user_id=?1 AND recorded_at >= datetime('now','-1 day')",
                params![user_id],
                |r| r.get(0),
            )
            .unwrap_or(0);
        per_day < 10_000
    }

    pub fn record_usage(
        &self,
        user_id: &str,
        tokens_sent: i64,
        tokens_saved: i64,
        skill: &str,
    ) -> Result<()> {
        let conn = self.conn();
        conn.execute(
            "INSERT INTO usage(user_id,tokens_sent,tokens_saved,skill,recorded_at)
             VALUES(?1,?2,?3,?4,datetime('now'))",
            params![user_id, tokens_sent, tokens_saved, skill],
        )?;
        Ok(())
    }

    pub fn usage_summary(&self, user_id: &str, days: i64) -> (i64, i64) {
        let conn = self.conn();
        conn.query_row(
            "SELECT COALESCE(SUM(tokens_sent),0), COALESCE(SUM(tokens_saved),0)
             FROM usage WHERE user_id=?1 AND recorded_at >= datetime('now', ?2)",
            params![user_id, format!("-{days} days")],
            |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
        )
        .unwrap_or((0, 0))
    }

    pub fn store_device_code(
        &self,
        device_code: &str,
        user_code: &str,
        email: &str,
        expires_at: &str,
    ) -> Result<()> {
        let conn = self.conn();
        conn.execute(
            "INSERT OR REPLACE INTO device_codes(device_code,user_code,email,authorized,expires_at)
             VALUES(?1,?2,?3,0,?4)",
            params![device_code, user_code, email, expires_at],
        )?;
        Ok(())
    }

    pub fn authorize_device_code(&self, user_code: &str, user_id: &str) -> Result<bool> {
        let conn = self.conn();
        let n = conn.execute(
            "UPDATE device_codes SET authorized=1,user_id=?1 WHERE user_code=?2 AND datetime('now')<expires_at",
            params![user_id, user_code],
        )?;
        Ok(n > 0)
    }

    pub fn poll_device_code(&self, device_code: &str) -> Option<String> {
        let conn = self.conn();
        conn.query_row(
            "SELECT user_id FROM device_codes WHERE device_code=?1 AND authorized=1 AND datetime('now')<expires_at",
            params![device_code],
            |row| row.get(0),
        ).ok()
    }

    /// Per-day usage over the last `days` days.
    /// Returns vec of (date_str, tokens_sent, tokens_saved).
    pub fn usage_by_day(&self, user_id: &str, days: i64) -> Vec<(String, i64, i64)> {
        let conn = self.conn();
        let window = format!("-{days} days");
        let mut stmt = match conn.prepare(
            "SELECT date(recorded_at) as day,
                    COALESCE(SUM(tokens_sent),0),
                    COALESCE(SUM(tokens_saved),0)
             FROM usage
             WHERE user_id=?1 AND recorded_at >= datetime('now', ?2)
             GROUP BY date(recorded_at)
             ORDER BY day ASC",
        ) {
            Ok(s) => s,
            Err(_) => return vec![],
        };
        stmt.query_map(params![user_id, window], |row| {
            Ok((
                row.get::<_, String>(0)?,
                row.get::<_, i64>(1)?,
                row.get::<_, i64>(2)?,
            ))
        })
        .ok()
        .map(|rows| rows.filter_map(|r| r.ok()).collect())
        .unwrap_or_default()
    }

    /// Per-skill token totals over the last `days` days.
    /// Returns vec of (skill_name, tokens_sent) sorted descending.
    pub fn skill_breakdown(&self, user_id: &str, days: i64) -> Vec<(String, i64)> {
        let conn = self.conn();
        let window = format!("-{days} days");
        let mut stmt = match conn.prepare(
            "SELECT skill, COALESCE(SUM(tokens_sent),0) as total
             FROM usage
             WHERE user_id=?1 AND skill!='' AND recorded_at >= datetime('now', ?2)
             GROUP BY skill
             ORDER BY total DESC
             LIMIT 10",
        ) {
            Ok(s) => s,
            Err(_) => return vec![],
        };
        stmt.query_map(params![user_id, window], |row| {
            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
        })
        .ok()
        .map(|rows| rows.filter_map(|r| r.ok()).collect())
        .unwrap_or_default()
    }
}

const SCHEMA: &str = "
CREATE TABLE IF NOT EXISTS users (
    id TEXT PRIMARY KEY,
    email TEXT NOT NULL UNIQUE,
    tier TEXT NOT NULL DEFAULT 'free',
    created_at TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS vaults (
    project_hash TEXT NOT NULL,
    user_id TEXT NOT NULL,
    facts_json TEXT NOT NULL,
    server_version INTEGER NOT NULL DEFAULT 1,
    updated_at TEXT NOT NULL,
    PRIMARY KEY (project_hash, user_id)
);

CREATE TABLE IF NOT EXISTS usage (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id TEXT NOT NULL,
    tokens_sent INTEGER NOT NULL DEFAULT 0,
    tokens_saved INTEGER NOT NULL DEFAULT 0,
    skill TEXT NOT NULL DEFAULT '',
    recorded_at TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS device_codes (
    device_code TEXT PRIMARY KEY,
    user_code TEXT NOT NULL UNIQUE,
    email TEXT NOT NULL,
    authorized INTEGER NOT NULL DEFAULT 0,
    user_id TEXT,
    expires_at TEXT NOT NULL
);
";