use chrono::{DateTime, Utc};
use sqlx::PgPool;
use uuid::Uuid;
use super::models::*;
use super::DbError;
pub struct UserRepository<'a> {
pool: &'a PgPool,
}
impl<'a> UserRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn create(&self, user: CreateUser) -> Result<User, DbError> {
let row = sqlx::query_as::<_, User>(
r#"
INSERT INTO users (email, password_hash, gdpr_consent_at, gdpr_consent_version)
VALUES ($1, $2, NOW(), $3)
RETURNING *
"#,
)
.bind(&user.email)
.bind(&user.password_hash)
.bind(&user.gdpr_consent_version)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn find_by_id(&self, id: Uuid) -> Result<User, DbError> {
let row =
sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1 AND deleted_at IS NULL")
.bind(id)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn find_by_email(&self, email: &str) -> Result<User, DbError> {
let row = sqlx::query_as::<_, User>(
"SELECT * FROM users WHERE email = $1 AND deleted_at IS NULL",
)
.bind(email)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn email_exists(&self, email: &str) -> Result<bool, DbError> {
let row = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM users WHERE email = $1 AND deleted_at IS NULL",
)
.bind(email)
.fetch_one(self.pool)
.await?;
Ok(row > 0)
}
pub async fn verify_email(&self, user_id: Uuid) -> Result<(), DbError> {
sqlx::query("UPDATE users SET email_verified_at = NOW(), updated_at = NOW() WHERE id = $1")
.bind(user_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn update_password(&self, user_id: Uuid, password_hash: &str) -> Result<(), DbError> {
sqlx::query("UPDATE users SET password_hash = $1, updated_at = NOW() WHERE id = $2")
.bind(password_hash)
.bind(user_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn soft_delete(&self, user_id: Uuid) -> Result<(), DbError> {
sqlx::query("UPDATE users SET deleted_at = NOW(), updated_at = NOW() WHERE id = $1")
.bind(user_id)
.execute(self.pool)
.await?;
Ok(())
}
}
pub struct SessionRepository<'a> {
pool: &'a PgPool,
}
impl<'a> SessionRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn create(&self, session: CreateSession) -> Result<Session, DbError> {
let row = sqlx::query_as::<_, Session>(
r#"
INSERT INTO sessions (user_id, refresh_token_hash, device_fingerprint, ip_address, user_agent, expires_at)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
"#,
)
.bind(session.user_id)
.bind(&session.refresh_token_hash)
.bind(&session.device_fingerprint)
.bind(&session.ip_address)
.bind(&session.user_agent)
.bind(session.expires_at)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn find_by_token_hash(&self, token_hash: &str) -> Result<Session, DbError> {
let row = sqlx::query_as::<_, Session>(
r#"
SELECT * FROM sessions
WHERE refresh_token_hash = $1
AND revoked_at IS NULL
AND expires_at > NOW()
"#,
)
.bind(token_hash)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn revoke(&self, session_id: Uuid) -> Result<(), DbError> {
sqlx::query("UPDATE sessions SET revoked_at = NOW() WHERE id = $1")
.bind(session_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn revoke_all_for_user(&self, user_id: Uuid) -> Result<(), DbError> {
sqlx::query(
"UPDATE sessions SET revoked_at = NOW() WHERE user_id = $1 AND revoked_at IS NULL",
)
.bind(user_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn list_active(&self, user_id: Uuid) -> Result<Vec<Session>, DbError> {
let rows = sqlx::query_as::<_, Session>(
r#"
SELECT * FROM sessions
WHERE user_id = $1
AND revoked_at IS NULL
AND expires_at > NOW()
ORDER BY created_at DESC
"#,
)
.bind(user_id)
.fetch_all(self.pool)
.await?;
Ok(rows)
}
pub async fn cleanup_expired(&self) -> Result<u64, DbError> {
let result = sqlx::query("DELETE FROM sessions WHERE expires_at < NOW()")
.execute(self.pool)
.await?;
Ok(result.rows_affected())
}
}
pub struct ApiKeyRepository<'a> {
pool: &'a PgPool,
}
impl<'a> ApiKeyRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn create(&self, key: CreateApiKey) -> Result<ApiKeyRecord, DbError> {
let row = sqlx::query_as::<_, ApiKeyRecord>(
r#"
INSERT INTO api_keys (user_id, name, key_prefix, key_hash, scopes, rate_limit_rpm, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
.bind(key.user_id)
.bind(&key.name)
.bind(&key.key_prefix)
.bind(&key.key_hash)
.bind(&key.scopes)
.bind(key.rate_limit_rpm)
.bind(key.expires_at)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn find_by_hash(&self, key_hash: &str) -> Result<ApiKeyRecord, DbError> {
let row = sqlx::query_as::<_, ApiKeyRecord>(
r#"
SELECT * FROM api_keys
WHERE key_hash = $1
AND revoked_at IS NULL
AND (expires_at IS NULL OR expires_at > NOW())
"#,
)
.bind(key_hash)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<ApiKeyRecord>, DbError> {
let rows = sqlx::query_as::<_, ApiKeyRecord>(
r#"
SELECT * FROM api_keys
WHERE user_id = $1
AND revoked_at IS NULL
ORDER BY created_at DESC
"#,
)
.bind(user_id)
.fetch_all(self.pool)
.await?;
Ok(rows)
}
pub async fn revoke(&self, key_id: Uuid, user_id: Uuid) -> Result<(), DbError> {
let result =
sqlx::query("UPDATE api_keys SET revoked_at = NOW() WHERE id = $1 AND user_id = $2")
.bind(key_id)
.bind(user_id)
.execute(self.pool)
.await?;
if result.rows_affected() == 0 {
return Err(DbError::NotFound);
}
Ok(())
}
pub async fn update_last_used(&self, key_id: Uuid) -> Result<(), DbError> {
sqlx::query("UPDATE api_keys SET last_used_at = NOW() WHERE id = $1")
.bind(key_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn count_for_user(&self, user_id: Uuid) -> Result<i64, DbError> {
let count = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM api_keys WHERE user_id = $1 AND revoked_at IS NULL",
)
.bind(user_id)
.fetch_one(self.pool)
.await?;
Ok(count)
}
}
pub struct SettingsRepository<'a> {
pool: &'a PgPool,
}
impl<'a> SettingsRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn get(&self, user_id: Uuid, key: &str) -> Result<Setting, DbError> {
let row =
sqlx::query_as::<_, Setting>("SELECT * FROM settings WHERE user_id = $1 AND key = $2")
.bind(user_id)
.bind(key)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn set(
&self,
user_id: Uuid,
key: &str,
value: serde_json::Value,
) -> Result<Setting, DbError> {
let row = sqlx::query_as::<_, Setting>(
r#"
INSERT INTO settings (user_id, key, value)
VALUES ($1, $2, $3)
ON CONFLICT (user_id, key) DO UPDATE
SET value = EXCLUDED.value,
version = settings.version + 1,
updated_at = NOW()
RETURNING *
"#,
)
.bind(user_id)
.bind(key)
.bind(value)
.fetch_one(self.pool)
.await?;
Ok(row)
}
pub async fn get_all(&self, user_id: Uuid) -> Result<Vec<Setting>, DbError> {
let rows =
sqlx::query_as::<_, Setting>("SELECT * FROM settings WHERE user_id = $1 ORDER BY key")
.bind(user_id)
.fetch_all(self.pool)
.await?;
Ok(rows)
}
pub async fn delete(&self, user_id: Uuid, key: &str) -> Result<(), DbError> {
sqlx::query("DELETE FROM settings WHERE user_id = $1 AND key = $2")
.bind(user_id)
.bind(key)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn get_changes_since(
&self,
user_id: Uuid,
since_version: i32,
) -> Result<Vec<Setting>, DbError> {
let rows = sqlx::query_as::<_, Setting>(
"SELECT * FROM settings WHERE user_id = $1 AND version > $2 ORDER BY version",
)
.bind(user_id)
.bind(since_version)
.fetch_all(self.pool)
.await?;
Ok(rows)
}
}