reasonkit-web 0.1.7

High-performance MCP server for browser automation, web capture, and content extraction. Rust-powered CDP client for AI agents.
Documentation
//! # Database Queries
//!
//! CRUD operations for portal entities.

use chrono::{DateTime, Utc};
use sqlx::PgPool;
use uuid::Uuid;

use super::models::*;
use super::DbError;

/// User repository
pub struct UserRepository<'a> {
    pool: &'a PgPool,
}

impl<'a> UserRepository<'a> {
    pub fn new(pool: &'a PgPool) -> Self {
        Self { pool }
    }

    /// Create a new user
    pub async fn create(&self, user: CreateUser) -> Result<User, DbError> {
        let row = sqlx::query_as::<_, User>(
            r#"
            INSERT INTO users (email, password_hash, gdpr_consent_at, gdpr_consent_version)
            VALUES ($1, $2, NOW(), $3)
            RETURNING *
            "#,
        )
        .bind(&user.email)
        .bind(&user.password_hash)
        .bind(&user.gdpr_consent_version)
        .fetch_one(self.pool)
        .await?;

        Ok(row)
    }

    /// Find user by ID
    pub async fn find_by_id(&self, id: Uuid) -> Result<User, DbError> {
        let row =
            sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1 AND deleted_at IS NULL")
                .bind(id)
                .fetch_one(self.pool)
                .await?;

        Ok(row)
    }

    /// Find user by email
    pub async fn find_by_email(&self, email: &str) -> Result<User, DbError> {
        let row = sqlx::query_as::<_, User>(
            "SELECT * FROM users WHERE email = $1 AND deleted_at IS NULL",
        )
        .bind(email)
        .fetch_one(self.pool)
        .await?;

        Ok(row)
    }

    /// Check if email exists
    pub async fn email_exists(&self, email: &str) -> Result<bool, DbError> {
        let row = sqlx::query_scalar::<_, i64>(
            "SELECT COUNT(*) FROM users WHERE email = $1 AND deleted_at IS NULL",
        )
        .bind(email)
        .fetch_one(self.pool)
        .await?;

        Ok(row > 0)
    }

    /// Update email verification timestamp
    pub async fn verify_email(&self, user_id: Uuid) -> Result<(), DbError> {
        sqlx::query("UPDATE users SET email_verified_at = NOW(), updated_at = NOW() WHERE id = $1")
            .bind(user_id)
            .execute(self.pool)
            .await?;

        Ok(())
    }

    /// Update password
    pub async fn update_password(&self, user_id: Uuid, password_hash: &str) -> Result<(), DbError> {
        sqlx::query("UPDATE users SET password_hash = $1, updated_at = NOW() WHERE id = $2")
            .bind(password_hash)
            .bind(user_id)
            .execute(self.pool)
            .await?;

        Ok(())
    }

    /// Soft delete user (GDPR)
    pub async fn soft_delete(&self, user_id: Uuid) -> Result<(), DbError> {
        sqlx::query("UPDATE users SET deleted_at = NOW(), updated_at = NOW() WHERE id = $1")
            .bind(user_id)
            .execute(self.pool)
            .await?;

        Ok(())
    }
}

/// Session repository
pub struct SessionRepository<'a> {
    pool: &'a PgPool,
}

impl<'a> SessionRepository<'a> {
    pub fn new(pool: &'a PgPool) -> Self {
        Self { pool }
    }

    /// Create a new session
    pub async fn create(&self, session: CreateSession) -> Result<Session, DbError> {
        let row = sqlx::query_as::<_, Session>(
            r#"
            INSERT INTO sessions (user_id, refresh_token_hash, device_fingerprint, ip_address, user_agent, expires_at)
            VALUES ($1, $2, $3, $4, $5, $6)
            RETURNING *
            "#,
        )
        .bind(session.user_id)
        .bind(&session.refresh_token_hash)
        .bind(&session.device_fingerprint)
        .bind(&session.ip_address)
        .bind(&session.user_agent)
        .bind(session.expires_at)
        .fetch_one(self.pool)
        .await?;

        Ok(row)
    }

    /// Find session by refresh token hash
    pub async fn find_by_token_hash(&self, token_hash: &str) -> Result<Session, DbError> {
        let row = sqlx::query_as::<_, Session>(
            r#"
            SELECT * FROM sessions
            WHERE refresh_token_hash = $1
            AND revoked_at IS NULL
            AND expires_at > NOW()
            "#,
        )
        .bind(token_hash)
        .fetch_one(self.pool)
        .await?;

        Ok(row)
    }

    /// Revoke a session
    pub async fn revoke(&self, session_id: Uuid) -> Result<(), DbError> {
        sqlx::query("UPDATE sessions SET revoked_at = NOW() WHERE id = $1")
            .bind(session_id)
            .execute(self.pool)
            .await?;

        Ok(())
    }

    /// Revoke all sessions for a user
    pub async fn revoke_all_for_user(&self, user_id: Uuid) -> Result<(), DbError> {
        sqlx::query(
            "UPDATE sessions SET revoked_at = NOW() WHERE user_id = $1 AND revoked_at IS NULL",
        )
        .bind(user_id)
        .execute(self.pool)
        .await?;

        Ok(())
    }

    /// List active sessions for a user
    pub async fn list_active(&self, user_id: Uuid) -> Result<Vec<Session>, DbError> {
        let rows = sqlx::query_as::<_, Session>(
            r#"
            SELECT * FROM sessions
            WHERE user_id = $1
            AND revoked_at IS NULL
            AND expires_at > NOW()
            ORDER BY created_at DESC
            "#,
        )
        .bind(user_id)
        .fetch_all(self.pool)
        .await?;

        Ok(rows)
    }

    /// Cleanup expired sessions
    pub async fn cleanup_expired(&self) -> Result<u64, DbError> {
        let result = sqlx::query("DELETE FROM sessions WHERE expires_at < NOW()")
            .execute(self.pool)
            .await?;

        Ok(result.rows_affected())
    }
}

/// API Key repository
pub struct ApiKeyRepository<'a> {
    pool: &'a PgPool,
}

impl<'a> ApiKeyRepository<'a> {
    pub fn new(pool: &'a PgPool) -> Self {
        Self { pool }
    }

    /// Create a new API key
    pub async fn create(&self, key: CreateApiKey) -> Result<ApiKeyRecord, DbError> {
        let row = sqlx::query_as::<_, ApiKeyRecord>(
            r#"
            INSERT INTO api_keys (user_id, name, key_prefix, key_hash, scopes, rate_limit_rpm, expires_at)
            VALUES ($1, $2, $3, $4, $5, $6, $7)
            RETURNING *
            "#,
        )
        .bind(key.user_id)
        .bind(&key.name)
        .bind(&key.key_prefix)
        .bind(&key.key_hash)
        .bind(&key.scopes)
        .bind(key.rate_limit_rpm)
        .bind(key.expires_at)
        .fetch_one(self.pool)
        .await?;

        Ok(row)
    }

    /// Find API key by prefix and hash
    pub async fn find_by_hash(&self, key_hash: &str) -> Result<ApiKeyRecord, DbError> {
        let row = sqlx::query_as::<_, ApiKeyRecord>(
            r#"
            SELECT * FROM api_keys
            WHERE key_hash = $1
            AND revoked_at IS NULL
            AND (expires_at IS NULL OR expires_at > NOW())
            "#,
        )
        .bind(key_hash)
        .fetch_one(self.pool)
        .await?;

        Ok(row)
    }

    /// List all API keys for a user
    pub async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<ApiKeyRecord>, DbError> {
        let rows = sqlx::query_as::<_, ApiKeyRecord>(
            r#"
            SELECT * FROM api_keys
            WHERE user_id = $1
            AND revoked_at IS NULL
            ORDER BY created_at DESC
            "#,
        )
        .bind(user_id)
        .fetch_all(self.pool)
        .await?;

        Ok(rows)
    }

    /// Revoke an API key
    pub async fn revoke(&self, key_id: Uuid, user_id: Uuid) -> Result<(), DbError> {
        let result =
            sqlx::query("UPDATE api_keys SET revoked_at = NOW() WHERE id = $1 AND user_id = $2")
                .bind(key_id)
                .bind(user_id)
                .execute(self.pool)
                .await?;

        if result.rows_affected() == 0 {
            return Err(DbError::NotFound);
        }

        Ok(())
    }

    /// Update last used timestamp
    pub async fn update_last_used(&self, key_id: Uuid) -> Result<(), DbError> {
        sqlx::query("UPDATE api_keys SET last_used_at = NOW() WHERE id = $1")
            .bind(key_id)
            .execute(self.pool)
            .await?;

        Ok(())
    }

    /// Count keys for a user
    pub async fn count_for_user(&self, user_id: Uuid) -> Result<i64, DbError> {
        let count = sqlx::query_scalar::<_, i64>(
            "SELECT COUNT(*) FROM api_keys WHERE user_id = $1 AND revoked_at IS NULL",
        )
        .bind(user_id)
        .fetch_one(self.pool)
        .await?;

        Ok(count)
    }
}

/// Settings repository
pub struct SettingsRepository<'a> {
    pool: &'a PgPool,
}

impl<'a> SettingsRepository<'a> {
    pub fn new(pool: &'a PgPool) -> Self {
        Self { pool }
    }

    /// Get a setting
    pub async fn get(&self, user_id: Uuid, key: &str) -> Result<Setting, DbError> {
        let row =
            sqlx::query_as::<_, Setting>("SELECT * FROM settings WHERE user_id = $1 AND key = $2")
                .bind(user_id)
                .bind(key)
                .fetch_one(self.pool)
                .await?;

        Ok(row)
    }

    /// Set a setting (upsert)
    pub async fn set(
        &self,
        user_id: Uuid,
        key: &str,
        value: serde_json::Value,
    ) -> Result<Setting, DbError> {
        let row = sqlx::query_as::<_, Setting>(
            r#"
            INSERT INTO settings (user_id, key, value)
            VALUES ($1, $2, $3)
            ON CONFLICT (user_id, key) DO UPDATE
            SET value = EXCLUDED.value,
                version = settings.version + 1,
                updated_at = NOW()
            RETURNING *
            "#,
        )
        .bind(user_id)
        .bind(key)
        .bind(value)
        .fetch_one(self.pool)
        .await?;

        Ok(row)
    }

    /// Get all settings for a user
    pub async fn get_all(&self, user_id: Uuid) -> Result<Vec<Setting>, DbError> {
        let rows =
            sqlx::query_as::<_, Setting>("SELECT * FROM settings WHERE user_id = $1 ORDER BY key")
                .bind(user_id)
                .fetch_all(self.pool)
                .await?;

        Ok(rows)
    }

    /// Delete a setting
    pub async fn delete(&self, user_id: Uuid, key: &str) -> Result<(), DbError> {
        sqlx::query("DELETE FROM settings WHERE user_id = $1 AND key = $2")
            .bind(user_id)
            .bind(key)
            .execute(self.pool)
            .await?;

        Ok(())
    }

    /// Get settings changed since version
    pub async fn get_changes_since(
        &self,
        user_id: Uuid,
        since_version: i32,
    ) -> Result<Vec<Setting>, DbError> {
        let rows = sqlx::query_as::<_, Setting>(
            "SELECT * FROM settings WHERE user_id = $1 AND version > $2 ORDER BY version",
        )
        .bind(user_id)
        .bind(since_version)
        .fetch_all(self.pool)
        .await?;

        Ok(rows)
    }
}