use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use tokio::sync::RwLock;
use unicode_normalization::UnicodeNormalization;
use uuid::Uuid;
use crate::errors::AppError;
use crate::models::AuthMethod;
pub fn normalize_email(email: &str) -> String {
email.nfkc().collect::<String>().to_lowercase()
}
pub fn validate_email_ascii_local(email: &str) -> Result<(), crate::errors::AppError> {
if let Some(at_pos) = email.rfind('@') {
let local = &email[..at_pos];
if !local.is_ascii() {
return Err(crate::errors::AppError::Validation(
"Email local part must contain only ASCII characters".into(),
));
}
}
Ok(())
}
pub fn generate_referral_code() -> String {
use rand::Rng;
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
let mut rng = rand::thread_rng();
(0..8)
.map(|_| CHARSET[rng.gen_range(0..CHARSET.len())] as char)
.collect()
}
#[derive(Debug, Clone)]
pub struct UserEntity {
pub id: Uuid,
pub email: Option<String>,
pub email_verified: bool,
pub password_hash: Option<String>,
pub name: Option<String>,
pub username: Option<String>,
pub picture: Option<String>,
pub wallet_address: Option<String>,
pub google_id: Option<String>,
pub apple_id: Option<String>,
pub stripe_customer_id: Option<String>,
pub auth_methods: Vec<AuthMethod>,
pub is_system_admin: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_login_at: Option<DateTime<Utc>>,
pub welcome_completed_at: Option<DateTime<Utc>>,
pub referral_code: String,
pub referred_by: Option<Uuid>,
pub payout_wallet_address: Option<String>,
pub kyc_status: String,
pub kyc_verified_at: Option<DateTime<Utc>>,
pub kyc_expires_at: Option<DateTime<Utc>>,
pub accreditation_status: String,
pub accreditation_verified_at: Option<DateTime<Utc>>,
pub accreditation_expires_at: Option<DateTime<Utc>>,
}
impl UserEntity {
pub fn new_email_user(email: String, password_hash: String, name: Option<String>) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4(),
email: Some(normalize_email(&email)),
email_verified: false,
password_hash: Some(password_hash),
name,
username: None,
picture: None,
wallet_address: None,
google_id: None,
apple_id: None,
stripe_customer_id: None,
auth_methods: vec![AuthMethod::Email],
is_system_admin: false,
created_at: now,
updated_at: now,
last_login_at: None,
welcome_completed_at: None,
referral_code: generate_referral_code(),
referred_by: None,
payout_wallet_address: None,
kyc_status: "none".to_string(),
kyc_verified_at: None,
kyc_expires_at: None,
accreditation_status: "none".to_string(),
accreditation_verified_at: None,
accreditation_expires_at: None,
}
}
pub fn is_deleted(&self) -> bool {
self.auth_methods.is_empty()
&& self.email.is_none()
&& self.password_hash.is_none()
&& self.wallet_address.is_none()
&& self.google_id.is_none()
&& self.apple_id.is_none()
&& self.name.as_deref() == Some("Deleted Account")
}
}
#[async_trait]
pub trait UserRepository: Send + Sync {
async fn find_by_id(&self, id: Uuid) -> Result<Option<UserEntity>, AppError>;
async fn find_by_email(&self, email: &str) -> Result<Option<UserEntity>, AppError>;
async fn find_by_wallet(&self, wallet: &str) -> Result<Option<UserEntity>, AppError>;
async fn find_by_google_id(&self, google_id: &str) -> Result<Option<UserEntity>, AppError>;
async fn find_by_apple_id(&self, apple_id: &str) -> Result<Option<UserEntity>, AppError>;
async fn find_by_stripe_customer_id(
&self,
stripe_customer_id: &str,
) -> Result<Option<UserEntity>, AppError>;
async fn create(&self, user: UserEntity) -> Result<UserEntity, AppError>;
async fn update(&self, user: UserEntity) -> Result<UserEntity, AppError>;
async fn email_exists(&self, email: &str) -> Result<bool, AppError>;
async fn wallet_exists(&self, wallet: &str) -> Result<bool, AppError>;
async fn set_email_verified(&self, id: Uuid, verified: bool) -> Result<(), AppError>;
async fn update_password(&self, id: Uuid, password_hash: &str) -> Result<(), AppError>;
async fn list_all(&self, limit: u32, offset: u32) -> Result<Vec<UserEntity>, AppError>;
async fn count(&self) -> Result<u64, AppError>;
async fn set_system_admin(&self, id: Uuid, is_admin: bool) -> Result<(), AppError>;
async fn set_stripe_customer_id(
&self,
id: Uuid,
stripe_customer_id: &str,
) -> Result<(), AppError>;
async fn count_system_admins(&self) -> Result<u64, AppError>;
async fn delete(&self, id: Uuid) -> Result<(), AppError>;
async fn anonymize_for_deletion(
&self,
id: Uuid,
replacement_referral_code: &str,
) -> Result<UserEntity, AppError>;
async fn count_by_auth_methods(
&self,
) -> Result<std::collections::HashMap<String, u64>, AppError>;
async fn update_last_login(&self, id: Uuid) -> Result<(), AppError>;
async fn set_welcome_completed(&self, id: Uuid) -> Result<(), AppError>;
async fn username_exists(&self, username: &str) -> Result<bool, AppError>;
async fn set_username(&self, id: Uuid, username: &str) -> Result<(), AppError>;
async fn find_by_referral_code(&self, code: &str) -> Result<Option<UserEntity>, AppError>;
async fn count_referrals(&self, user_id: Uuid) -> Result<u64, AppError>;
async fn regenerate_referral_code(&self, id: Uuid) -> Result<String, AppError>;
async fn set_referral_code(&self, id: Uuid, code: &str) -> Result<(), AppError>;
async fn set_payout_wallet_address(
&self,
id: Uuid,
address: Option<&str>,
) -> Result<(), AppError>;
async fn count_referred(&self) -> Result<u64, AppError>;
async fn count_referred_since(&self, since: DateTime<Utc>) -> Result<u64, AppError>;
async fn top_referrers(&self, limit: u32) -> Result<Vec<TopReferrerRow>, AppError>;
async fn find_referred_by(
&self,
referrer_id: Uuid,
limit: u32,
offset: u32,
) -> Result<Vec<UserEntity>, AppError>;
async fn count_referred_by(&self, referrer_id: Uuid) -> Result<u64, AppError>;
async fn set_kyc_status(
&self,
id: Uuid,
status: &str,
verified_at: Option<DateTime<Utc>>,
expires_at: Option<DateTime<Utc>>,
) -> Result<(), AppError>;
async fn set_accreditation_status(
&self,
id: Uuid,
status: &str,
verified_at: Option<DateTime<Utc>>,
expires_at: Option<DateTime<Utc>>,
) -> Result<(), AppError>;
async fn count_created_since(&self, since: DateTime<Utc>) -> Result<u64, AppError>;
}
#[derive(Debug, Clone)]
pub struct TopReferrerRow {
pub user_id: Uuid,
pub email: Option<String>,
pub name: Option<String>,
pub referral_code: String,
pub referral_count: u64,
}
pub struct InMemoryUserRepository {
users: RwLock<HashMap<Uuid, UserEntity>>,
email_index: RwLock<HashMap<String, Uuid>>,
wallet_index: RwLock<HashMap<String, Uuid>>,
google_id_index: RwLock<HashMap<String, Uuid>>,
apple_id_index: RwLock<HashMap<String, Uuid>>,
stripe_customer_id_index: RwLock<HashMap<String, Uuid>>,
referral_code_index: RwLock<HashMap<String, Uuid>>,
referral_history: RwLock<HashMap<String, Uuid>>,
}
impl InMemoryUserRepository {
pub fn new() -> Self {
Self {
users: RwLock::new(HashMap::new()),
email_index: RwLock::new(HashMap::new()),
wallet_index: RwLock::new(HashMap::new()),
google_id_index: RwLock::new(HashMap::new()),
apple_id_index: RwLock::new(HashMap::new()),
stripe_customer_id_index: RwLock::new(HashMap::new()),
referral_code_index: RwLock::new(HashMap::new()),
referral_history: RwLock::new(HashMap::new()),
}
}
}
impl Default for InMemoryUserRepository {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl UserRepository for InMemoryUserRepository {
async fn find_by_id(&self, id: Uuid) -> Result<Option<UserEntity>, AppError> {
let users = self.users.read().await;
Ok(users.get(&id).cloned())
}
async fn find_by_email(&self, email: &str) -> Result<Option<UserEntity>, AppError> {
let email_normalized = normalize_email(email);
let email_index = self.email_index.read().await;
let user_id = match email_index.get(&email_normalized) {
Some(id) => *id,
None => return Ok(None),
};
drop(email_index);
let users = self.users.read().await;
Ok(users.get(&user_id).cloned())
}
async fn find_by_wallet(&self, wallet: &str) -> Result<Option<UserEntity>, AppError> {
let wallet_index = self.wallet_index.read().await;
let user_id = match wallet_index.get(wallet) {
Some(id) => *id,
None => return Ok(None),
};
drop(wallet_index);
let users = self.users.read().await;
Ok(users.get(&user_id).cloned())
}
async fn find_by_google_id(&self, google_id: &str) -> Result<Option<UserEntity>, AppError> {
let google_id_index = self.google_id_index.read().await;
let user_id = match google_id_index.get(google_id) {
Some(id) => *id,
None => return Ok(None),
};
drop(google_id_index);
let users = self.users.read().await;
Ok(users.get(&user_id).cloned())
}
async fn find_by_apple_id(&self, apple_id: &str) -> Result<Option<UserEntity>, AppError> {
let apple_id_index = self.apple_id_index.read().await;
let user_id = match apple_id_index.get(apple_id) {
Some(id) => *id,
None => return Ok(None),
};
drop(apple_id_index);
let users = self.users.read().await;
Ok(users.get(&user_id).cloned())
}
async fn find_by_stripe_customer_id(
&self,
stripe_customer_id: &str,
) -> Result<Option<UserEntity>, AppError> {
let stripe_index = self.stripe_customer_id_index.read().await;
let user_id = match stripe_index.get(stripe_customer_id) {
Some(id) => *id,
None => return Ok(None),
};
drop(stripe_index);
let users = self.users.read().await;
Ok(users.get(&user_id).cloned())
}
async fn create(&self, user: UserEntity) -> Result<UserEntity, AppError> {
let mut users = self.users.write().await;
if let Some(ref email) = user.email {
let mut email_index = self.email_index.write().await;
email_index.insert(normalize_email(email), user.id);
}
if let Some(ref wallet) = user.wallet_address {
let mut wallet_index = self.wallet_index.write().await;
wallet_index.insert(wallet.clone(), user.id);
}
if let Some(ref google_id) = user.google_id {
let mut google_id_index = self.google_id_index.write().await;
google_id_index.insert(google_id.clone(), user.id);
}
if let Some(ref apple_id) = user.apple_id {
let mut apple_id_index = self.apple_id_index.write().await;
apple_id_index.insert(apple_id.clone(), user.id);
}
if let Some(ref stripe_customer_id) = user.stripe_customer_id {
let mut stripe_index = self.stripe_customer_id_index.write().await;
stripe_index.insert(stripe_customer_id.clone(), user.id);
}
{
let mut referral_index = self.referral_code_index.write().await;
referral_index.insert(user.referral_code.clone(), user.id);
}
users.insert(user.id, user.clone());
Ok(user)
}
async fn update(&self, user: UserEntity) -> Result<UserEntity, AppError> {
let mut users = self.users.write().await;
let mut email_index = self.email_index.write().await;
if let Some(old_user) = users.get(&user.id) {
if let Some(ref old_email) = old_user.email {
let old_normalized = normalize_email(old_email);
let new_normalized = user.email.as_ref().map(|e| normalize_email(e));
if new_normalized.as_ref() != Some(&old_normalized) {
email_index.remove(&old_normalized);
}
}
}
if let Some(ref email) = user.email {
email_index.insert(normalize_email(email), user.id);
}
if let Some(old_user) = users.get(&user.id) {
if old_user.wallet_address != user.wallet_address {
let mut wallet_index = self.wallet_index.write().await;
if let Some(ref old_wallet) = old_user.wallet_address {
wallet_index.remove(old_wallet);
}
if let Some(ref new_wallet) = user.wallet_address {
wallet_index.insert(new_wallet.clone(), user.id);
}
}
if old_user.google_id != user.google_id {
let mut google_id_index = self.google_id_index.write().await;
if let Some(ref old_google_id) = old_user.google_id {
google_id_index.remove(old_google_id);
}
if let Some(ref new_google_id) = user.google_id {
google_id_index.insert(new_google_id.clone(), user.id);
}
}
if old_user.apple_id != user.apple_id {
let mut apple_id_index = self.apple_id_index.write().await;
if let Some(ref old_apple_id) = old_user.apple_id {
apple_id_index.remove(old_apple_id);
}
if let Some(ref new_apple_id) = user.apple_id {
apple_id_index.insert(new_apple_id.clone(), user.id);
}
}
if old_user.stripe_customer_id != user.stripe_customer_id {
let mut stripe_index = self.stripe_customer_id_index.write().await;
if let Some(ref old_scid) = old_user.stripe_customer_id {
stripe_index.remove(old_scid);
}
if let Some(ref new_scid) = user.stripe_customer_id {
stripe_index.insert(new_scid.clone(), user.id);
}
}
}
users.insert(user.id, user.clone());
Ok(user)
}
async fn email_exists(&self, email: &str) -> Result<bool, AppError> {
Ok(self.find_by_email(email).await?.is_some())
}
async fn wallet_exists(&self, wallet: &str) -> Result<bool, AppError> {
Ok(self.find_by_wallet(wallet).await?.is_some())
}
async fn set_email_verified(&self, id: Uuid, verified: bool) -> Result<(), AppError> {
let mut users = self.users.write().await;
if let Some(user) = users.get_mut(&id) {
user.email_verified = verified;
user.updated_at = Utc::now();
}
Ok(())
}
async fn update_password(&self, id: Uuid, password_hash: &str) -> Result<(), AppError> {
let mut users = self.users.write().await;
if let Some(user) = users.get_mut(&id) {
user.password_hash = Some(password_hash.to_string());
user.updated_at = Utc::now();
}
Ok(())
}
async fn list_all(&self, limit: u32, offset: u32) -> Result<Vec<UserEntity>, AppError> {
const MAX_PAGE_SIZE: u32 = 100;
let capped_limit = limit.min(MAX_PAGE_SIZE);
let users = self.users.read().await;
let mut all_users: Vec<_> = users.values().cloned().collect();
all_users.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(all_users
.into_iter()
.skip(offset as usize)
.take(capped_limit as usize)
.collect())
}
async fn count(&self) -> Result<u64, AppError> {
let users = self.users.read().await;
Ok(users.len() as u64)
}
async fn set_system_admin(&self, id: Uuid, is_admin: bool) -> Result<(), AppError> {
let mut users = self.users.write().await;
if let Some(user) = users.get_mut(&id) {
user.is_system_admin = is_admin;
user.updated_at = Utc::now();
}
Ok(())
}
async fn set_stripe_customer_id(
&self,
id: Uuid,
stripe_customer_id: &str,
) -> Result<(), AppError> {
let mut users = self.users.write().await;
let user = users
.get_mut(&id)
.ok_or_else(|| AppError::NotFound("User not found".into()))?;
if user
.stripe_customer_id
.as_deref()
.map(|v| v == stripe_customer_id)
.unwrap_or(false)
{
return Ok(());
}
let mut stripe_index = self.stripe_customer_id_index.write().await;
if let Some(existing_user_id) = stripe_index.get(stripe_customer_id).copied() {
if existing_user_id != id {
return Err(AppError::Validation(
"Stripe customer ID is already linked to another user".into(),
));
}
}
if let Some(ref old) = user.stripe_customer_id {
stripe_index.remove(old);
}
stripe_index.insert(stripe_customer_id.to_string(), id);
user.stripe_customer_id = Some(stripe_customer_id.to_string());
user.updated_at = Utc::now();
Ok(())
}
async fn count_system_admins(&self) -> Result<u64, AppError> {
let users = self.users.read().await;
Ok(users.values().filter(|u| u.is_system_admin).count() as u64)
}
async fn delete(&self, id: Uuid) -> Result<(), AppError> {
let mut users = self.users.write().await;
let user = users
.remove(&id)
.ok_or_else(|| AppError::NotFound("User not found".into()))?;
if let Some(ref email) = user.email {
let mut email_index = self.email_index.write().await;
email_index.remove(&normalize_email(email));
}
if let Some(ref wallet) = user.wallet_address {
let mut wallet_index = self.wallet_index.write().await;
wallet_index.remove(wallet);
}
if let Some(ref google_id) = user.google_id {
let mut google_id_index = self.google_id_index.write().await;
google_id_index.remove(google_id);
}
if let Some(ref apple_id) = user.apple_id {
let mut apple_id_index = self.apple_id_index.write().await;
apple_id_index.remove(apple_id);
}
if let Some(ref stripe_customer_id) = user.stripe_customer_id {
let mut stripe_index = self.stripe_customer_id_index.write().await;
stripe_index.remove(stripe_customer_id);
}
{
let mut referral_index = self.referral_code_index.write().await;
referral_index.remove(&user.referral_code);
}
Ok(())
}
async fn anonymize_for_deletion(
&self,
id: Uuid,
replacement_referral_code: &str,
) -> Result<UserEntity, AppError> {
let mut users = self.users.write().await;
let user = users
.get_mut(&id)
.ok_or_else(|| AppError::NotFound("User not found".into()))?;
let old_email = user.email.clone();
let old_wallet = user.wallet_address.clone();
let old_google_id = user.google_id.clone();
let old_apple_id = user.apple_id.clone();
let old_stripe_customer_id = user.stripe_customer_id.clone();
let old_referral_code = user.referral_code.clone();
user.email = None;
user.email_verified = false;
user.password_hash = None;
user.name = Some("Deleted Account".to_string());
user.username = None;
user.picture = None;
user.wallet_address = None;
user.google_id = None;
user.apple_id = None;
user.stripe_customer_id = None;
user.auth_methods.clear();
user.is_system_admin = false;
user.updated_at = Utc::now();
user.last_login_at = None;
user.welcome_completed_at = None;
user.referral_code = replacement_referral_code.to_string();
user.referred_by = None;
user.payout_wallet_address = None;
user.kyc_status = "none".to_string();
user.kyc_verified_at = None;
user.kyc_expires_at = None;
user.accreditation_status = "none".to_string();
user.accreditation_verified_at = None;
user.accreditation_expires_at = None;
let updated = user.clone();
drop(users);
if let Some(email) = old_email {
self.email_index.write().await.remove(&normalize_email(&email));
}
if let Some(wallet) = old_wallet {
self.wallet_index.write().await.remove(&wallet);
}
if let Some(google_id) = old_google_id {
self.google_id_index.write().await.remove(&google_id);
}
if let Some(apple_id) = old_apple_id {
self.apple_id_index.write().await.remove(&apple_id);
}
if let Some(stripe_customer_id) = old_stripe_customer_id {
self.stripe_customer_id_index
.write()
.await
.remove(&stripe_customer_id);
}
let mut referral_index = self.referral_code_index.write().await;
referral_index.remove(&old_referral_code);
referral_index.insert(replacement_referral_code.to_string(), id);
Ok(updated)
}
async fn count_by_auth_methods(
&self,
) -> Result<std::collections::HashMap<String, u64>, AppError> {
let users = self.users.read().await;
let mut counts = std::collections::HashMap::new();
for user in users.values() {
for method in &user.auth_methods {
let method_str = match method {
crate::models::AuthMethod::Email => "email",
crate::models::AuthMethod::Google => "google",
crate::models::AuthMethod::Apple => "apple",
crate::models::AuthMethod::Solana => "solana",
crate::models::AuthMethod::WebAuthn => "webauthn",
crate::models::AuthMethod::Sso => "sso",
};
*counts.entry(method_str.to_string()).or_insert(0) += 1;
}
}
Ok(counts)
}
async fn update_last_login(&self, id: Uuid) -> Result<(), AppError> {
let mut users = self.users.write().await;
if let Some(user) = users.get_mut(&id) {
user.last_login_at = Some(Utc::now());
}
Ok(())
}
async fn set_welcome_completed(&self, id: Uuid) -> Result<(), AppError> {
let mut users = self.users.write().await;
if let Some(user) = users.get_mut(&id) {
user.welcome_completed_at = Some(Utc::now());
user.updated_at = Utc::now();
}
Ok(())
}
async fn username_exists(&self, username: &str) -> Result<bool, AppError> {
let lower = username.to_lowercase();
let users = self.users.read().await;
Ok(users
.values()
.any(|u| u.username.as_deref().map(|n| n.to_lowercase()) == Some(lower.clone())))
}
async fn set_username(&self, id: Uuid, username: &str) -> Result<(), AppError> {
let mut users = self.users.write().await;
if let Some(user) = users.get_mut(&id) {
user.username = Some(username.to_string());
user.updated_at = Utc::now();
}
Ok(())
}
async fn find_by_referral_code(&self, code: &str) -> Result<Option<UserEntity>, AppError> {
let referral_index = self.referral_code_index.read().await;
let user_id_opt = referral_index.get(code).copied();
drop(referral_index);
let user_id = match user_id_opt {
Some(id) => id,
None => {
let history = self.referral_history.read().await;
match history.get(code).copied() {
Some(id) => id,
None => return Ok(None),
}
}
};
let users = self.users.read().await;
Ok(users.get(&user_id).filter(|user| !user.is_deleted()).cloned())
}
async fn count_referrals(&self, user_id: Uuid) -> Result<u64, AppError> {
let users = self.users.read().await;
let count = users
.values()
.filter(|u| u.referred_by == Some(user_id))
.count();
Ok(count as u64)
}
async fn regenerate_referral_code(&self, id: Uuid) -> Result<String, AppError> {
let mut users = self.users.write().await;
let user = users
.get_mut(&id)
.ok_or_else(|| AppError::NotFound("User not found".into()))?;
let old_code = user.referral_code.clone();
let mut referral_index = self.referral_code_index.write().await;
referral_index.remove(&old_code);
let new_code = generate_referral_code();
referral_index.insert(new_code.clone(), id);
user.referral_code = new_code.clone();
user.updated_at = Utc::now();
drop(referral_index);
drop(users);
let mut history = self.referral_history.write().await;
history.entry(old_code).or_insert(id);
Ok(new_code)
}
async fn set_referral_code(&self, id: Uuid, code: &str) -> Result<(), AppError> {
let referral_index = self.referral_code_index.read().await;
if let Some(&existing_id) = referral_index.get(code) {
if existing_id != id {
return Err(AppError::Validation(
"Referral code is already taken".into(),
));
}
return Ok(());
}
drop(referral_index);
let mut users = self.users.write().await;
let user = users
.get_mut(&id)
.ok_or_else(|| AppError::NotFound("User not found".into()))?;
let old_code = user.referral_code.clone();
let mut referral_index = self.referral_code_index.write().await;
referral_index.remove(&old_code);
referral_index.insert(code.to_string(), id);
user.referral_code = code.to_string();
user.updated_at = Utc::now();
drop(referral_index);
drop(users);
let mut history = self.referral_history.write().await;
history.entry(old_code).or_insert(id);
Ok(())
}
async fn set_payout_wallet_address(
&self,
id: Uuid,
address: Option<&str>,
) -> Result<(), AppError> {
let mut users = self.users.write().await;
if let Some(user) = users.get_mut(&id) {
user.payout_wallet_address = address.map(|a| a.to_string());
user.updated_at = Utc::now();
}
Ok(())
}
async fn count_referred(&self) -> Result<u64, AppError> {
let users = self.users.read().await;
Ok(users.values().filter(|u| u.referred_by.is_some()).count() as u64)
}
async fn count_referred_since(&self, since: DateTime<Utc>) -> Result<u64, AppError> {
let users = self.users.read().await;
Ok(users
.values()
.filter(|u| u.referred_by.is_some() && u.created_at >= since)
.count() as u64)
}
async fn top_referrers(&self, limit: u32) -> Result<Vec<TopReferrerRow>, AppError> {
let users = self.users.read().await;
let mut counts: HashMap<Uuid, u64> = HashMap::new();
for u in users.values() {
if let Some(referrer_id) = u.referred_by {
*counts.entry(referrer_id).or_insert(0) += 1;
}
}
let mut rows: Vec<TopReferrerRow> = counts
.into_iter()
.filter_map(|(referrer_id, count)| {
users.get(&referrer_id).map(|u| TopReferrerRow {
user_id: referrer_id,
email: u.email.clone(),
name: u.name.clone(),
referral_code: u.referral_code.clone(),
referral_count: count,
})
})
.collect();
rows.sort_by(|a, b| b.referral_count.cmp(&a.referral_count));
rows.truncate(limit as usize);
Ok(rows)
}
async fn find_referred_by(
&self,
referrer_id: Uuid,
limit: u32,
offset: u32,
) -> Result<Vec<UserEntity>, AppError> {
const MAX_PAGE_SIZE: u32 = 100;
let capped_limit = limit.min(MAX_PAGE_SIZE);
let users = self.users.read().await;
let mut referred: Vec<_> = users
.values()
.filter(|u| u.referred_by == Some(referrer_id))
.cloned()
.collect();
referred.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(referred
.into_iter()
.skip(offset as usize)
.take(capped_limit as usize)
.collect())
}
async fn count_referred_by(&self, referrer_id: Uuid) -> Result<u64, AppError> {
let users = self.users.read().await;
Ok(users
.values()
.filter(|u| u.referred_by == Some(referrer_id))
.count() as u64)
}
async fn set_kyc_status(
&self,
id: Uuid,
status: &str,
verified_at: Option<DateTime<Utc>>,
expires_at: Option<DateTime<Utc>>,
) -> Result<(), AppError> {
let mut users = self.users.write().await;
let user = users
.get_mut(&id)
.ok_or_else(|| AppError::NotFound("User not found".into()))?;
user.kyc_status = status.to_string();
user.kyc_verified_at = verified_at;
user.kyc_expires_at = expires_at;
user.updated_at = Utc::now();
Ok(())
}
async fn set_accreditation_status(
&self,
id: Uuid,
status: &str,
verified_at: Option<DateTime<Utc>>,
expires_at: Option<DateTime<Utc>>,
) -> Result<(), AppError> {
let mut users = self.users.write().await;
let user = users
.get_mut(&id)
.ok_or_else(|| AppError::NotFound("User not found".into()))?;
user.accreditation_status = status.to_string();
user.accreditation_verified_at = verified_at;
user.accreditation_expires_at = expires_at;
user.updated_at = Utc::now();
Ok(())
}
async fn count_created_since(&self, since: DateTime<Utc>) -> Result<u64, AppError> {
let users = self.users.read().await;
Ok(users.values().filter(|u| u.created_at >= since).count() as u64)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_create_and_find_user() {
let repo = InMemoryUserRepository::new();
let user = UserEntity::new_email_user(
"test@example.com".to_string(),
"hash123".to_string(),
Some("Test User".to_string()),
);
let user_id = user.id;
repo.create(user).await.unwrap();
let found = repo.find_by_id(user_id).await.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().email, Some("test@example.com".to_string()));
}
#[test]
fn test_new_email_user_normalizes_email() {
let user =
UserEntity::new_email_user("Test@Example.com".to_string(), "hash123".to_string(), None);
assert_eq!(user.email.as_deref(), Some("test@example.com"));
}
#[tokio::test]
async fn test_find_by_email_case_insensitive() {
let repo = InMemoryUserRepository::new();
let user =
UserEntity::new_email_user("Test@Example.com".to_string(), "hash123".to_string(), None);
repo.create(user).await.unwrap();
let found = repo.find_by_email("test@example.com").await.unwrap();
assert!(found.is_some());
let found = repo.find_by_email("TEST@EXAMPLE.COM").await.unwrap();
assert!(found.is_some());
}
#[tokio::test]
async fn test_find_by_email_unicode_normalized() {
let repo = InMemoryUserRepository::new();
let user =
UserEntity::new_email_user("user@example.com".to_string(), "hash123".to_string(), None);
repo.create(user).await.unwrap();
let fullwidth_email = "\u{FF55}\u{FF53}\u{FF45}\u{FF52}@example.com";
let found = repo.find_by_email(fullwidth_email).await.unwrap();
assert!(
found.is_some(),
"Should find user with fullwidth local part"
);
}
#[test]
fn test_normalize_email_function() {
assert_eq!(normalize_email("Test@Example.COM"), "test@example.com");
let fullwidth_a = "\u{FF21}"; assert_eq!(
normalize_email(&format!("{}bc@test.com", fullwidth_a)),
"abc@test.com"
);
let fullwidth_user = "\u{FF55}\u{FF53}\u{FF45}\u{FF52}"; assert_eq!(
normalize_email(&format!("{}@test.com", fullwidth_user)),
"user@test.com"
);
assert_eq!(normalize_email("user+tag@gmail.com"), "user+tag@gmail.com");
}
#[tokio::test]
async fn test_email_exists() {
let repo = InMemoryUserRepository::new();
let user = UserEntity::new_email_user(
"exists@example.com".to_string(),
"hash123".to_string(),
None,
);
repo.create(user).await.unwrap();
assert!(repo.email_exists("exists@example.com").await.unwrap());
assert!(!repo.email_exists("notexists@example.com").await.unwrap());
}
#[tokio::test]
async fn test_email_index_updated_on_email_change() {
let repo = InMemoryUserRepository::new();
let mut user =
UserEntity::new_email_user("old@example.com".to_string(), "hash123".to_string(), None);
let user_id = user.id;
repo.create(user.clone()).await.unwrap();
assert!(repo
.find_by_email("old@example.com")
.await
.unwrap()
.is_some());
user.email = Some("new@example.com".to_string());
repo.update(user).await.unwrap();
assert!(repo
.find_by_email("old@example.com")
.await
.unwrap()
.is_none());
let found = repo.find_by_email("new@example.com").await.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().id, user_id);
}
}