use pubky_common::crypto::PublicKey;
use crate::persistence::sql::user::{UserEntity, UserRepository};
use crate::persistence::sql::{uexecutor, SqlDb, UnifiedExecutor};
use crate::shared::user_quota::{UserQuota, UserQuotaPatch};
use crate::shared::{HttpError, HttpResult};
use super::quota_cache::{CachedEntry, QuotaCache};
pub const FILE_METADATA_SIZE: u64 = 256;
#[derive(Clone, Debug)]
pub struct UserService {
sql_db: SqlDb,
quota_cache: QuotaCache,
}
impl UserService {
pub fn new(sql_db: SqlDb) -> Self {
let quota_cache = QuotaCache::new();
Self {
sql_db,
quota_cache,
}
}
pub fn pool(&self) -> &sqlx::PgPool {
self.sql_db.pool()
}
pub async fn get_for_update<'a>(
&self,
pubkey: &PublicKey,
executor: &mut UnifiedExecutor<'a>,
) -> Result<UserEntity, sqlx::Error> {
UserRepository::get_for_update(pubkey, executor).await
}
pub async fn update<'a>(
&self,
user: &UserEntity,
executor: &mut UnifiedExecutor<'a>,
) -> Result<UserEntity, sqlx::Error> {
UserRepository::update(user, executor).await
}
pub async fn get_or_http_error(
&self,
pubkey: &PublicKey,
err_if_disabled: bool,
) -> HttpResult<UserEntity> {
let user = match UserRepository::get(pubkey, &mut self.sql_db.pool().into()).await {
Ok(user) => user,
Err(sqlx::Error::RowNotFound) => {
tracing::warn!("User {} not found. Forbid access.", pubkey);
return Err(HttpError::not_found());
}
Err(e) => return Err(e.into()),
};
if err_if_disabled && user.disabled {
tracing::warn!("User {} is disabled. Forbid access.", pubkey);
return Err(HttpError::forbidden_with_message("User is disabled"));
}
Ok(user)
}
pub async fn resolve_quota(
&self,
pubkey: &PublicKey,
) -> Result<Option<UserQuota>, sqlx::Error> {
if let Some(cached) = self.quota_cache.get(pubkey) {
return Ok(cached);
}
self.quota_cache.remove(pubkey);
self.quota_cache.make_room();
match UserRepository::get(pubkey, &mut self.sql_db.pool().into()).await {
Ok(user) => {
let resolved = user.quota();
self.quota_cache
.insert(pubkey.clone(), CachedEntry::found(resolved.clone()));
Ok(Some(resolved))
}
Err(sqlx::Error::RowNotFound) => {
self.quota_cache
.insert(pubkey.clone(), CachedEntry::not_found());
Ok(None)
}
Err(e) => Err(e),
}
}
pub async fn create_user(
&self,
public_key: &PublicKey,
quota: &UserQuota,
tx: sqlx::Transaction<'static, sqlx::Postgres>,
) -> HttpResult<UserEntity> {
let mut tx = tx;
let user = UserRepository::create(public_key, uexecutor!(tx)).await?;
let user = UserRepository::set_quota(user.id, quota, uexecutor!(tx)).await?;
tx.commit().await?;
self.quota_cache
.insert(public_key.clone(), CachedEntry::found(user.quota()));
Ok(user)
}
pub async fn patch_quota(
&self,
pubkey: &PublicKey,
patch: &UserQuotaPatch,
) -> HttpResult<UserEntity> {
let mut tx = self.sql_db.pool().begin().await?;
let user = match self.get_for_update(pubkey, uexecutor!(tx)).await {
Ok(user) => user,
Err(sqlx::Error::RowNotFound) => return Err(HttpError::not_found()),
Err(e) => return Err(e.into()),
};
let mut config = user.quota();
config.merge(patch);
config.validate().map_err(|e| {
HttpError::new_with_message(axum::http::StatusCode::UNPROCESSABLE_ENTITY, e)
})?;
let updated = UserRepository::set_quota(user.id, &config, uexecutor!(tx)).await?;
tx.commit().await?;
self.quota_cache.remove(pubkey);
Ok(updated)
}
}