use axum::{
extract::State,
http::{header, HeaderMap},
response::IntoResponse,
Json,
};
use chrono::{Duration, Utc};
use serde::Deserialize;
use std::sync::Arc;
use crate::callback::{AuthCallback, AuthCallbackPayload};
use crate::errors::AppError;
use crate::handlers::auth::{
call_authenticated_callback_with_timeout, call_registered_callback_with_timeout,
};
use crate::models::{AuthMethod, AuthResponse, MessageResponse};
use crate::repositories::{
default_expiry, generate_api_key, generate_verification_token, hash_verification_token,
normalize_email, ApiKeyEntity, AuditEventType, MembershipEntity, SessionEntity, TokenType,
UserEntity,
};
use crate::services::EmailService;
use crate::utils::{
build_json_response_with_cookies, compute_post_login, extract_client_ip_with_fallback,
get_default_org_context, hash_refresh_token, is_new_device, resolve_org_assignment,
user_entity_to_auth_user, DeviceInfo, PeerIp,
};
use crate::AppState;
use uuid::Uuid;
use serde_json::json;
use tokio::time::{Duration as TokioDuration, Instant as TokioInstant};
#[derive(Debug, Deserialize)]
pub struct InstantLinkRequest {
pub email: String,
pub referral: Option<String>,
pub access_code: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct VerifyInstantLinkRequest {
pub token: String,
}
pub async fn send_instant_link<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Json(req): Json<InstantLinkRequest>,
) -> Result<(axum::http::StatusCode, Json<MessageResponse>), AppError> {
let started_at = TokioInstant::now();
const MIN_DURATION: TokioDuration = TokioDuration::from_millis(150);
let enabled = state
.settings_service
.get_bool("auth_instantlink_enabled")
.await
.ok()
.flatten()
.unwrap_or(state.config.email.enabled);
if !enabled {
return Err(AppError::NotFound("Instant link auth disabled".into()));
}
let response = (
axum::http::StatusCode::OK,
Json(MessageResponse {
message: "A sign-in link has been sent to your email".to_string(),
}),
);
let email = normalize_email(&req.email);
let throttle_key = format!("instant_link:{}", email);
let throttle_status = state
.login_attempt_repo
.record_failed_attempt_atomic(None, &throttle_key, None, &state.login_attempt_config)
.await?;
if throttle_status.is_locked {
if let Some(remaining) = throttle_status.lockout_remaining_secs {
return Err(AppError::TooManyRequests(format!(
"Too many instant link requests. Try again in {} seconds",
remaining
)));
}
return Err(AppError::RateLimited);
}
let user = match state.user_repo.find_by_email(&email).await? {
Some(u) => u,
None => {
let gate_result = state
.signup_gating_service
.check_signup(req.access_code.as_deref())
.await?;
let referrals_enabled = state
.settings_service
.get_bool("feature_referrals_enabled")
.await
.ok()
.flatten()
.unwrap_or(false);
let referred_by = if referrals_enabled {
if let Some(ref code) = req.referral {
match state.user_repo.find_by_referral_code(code).await {
Ok(Some(referrer)) => Some(referrer.id),
Ok(None) => {
tracing::debug!(referral_code = %code, "Referral code not found, ignoring");
None
}
Err(e) => {
tracing::warn!(error = %e, "Failed to look up referral code, ignoring");
None
}
}
} else {
None
}
} else {
None
};
let now = Utc::now();
let new_user = UserEntity {
id: Uuid::new_v4(),
email: Some(email.clone()),
email_verified: false,
password_hash: None,
name: None,
username: None,
picture: None,
wallet_address: None,
google_id: None,
apple_id: None,
stripe_customer_id: None,
auth_methods: vec![AuthMethod::Email],
is_system_admin: false,
created_at: now,
updated_at: now,
last_login_at: None,
welcome_completed_at: None,
referral_code: crate::repositories::generate_referral_code(),
referred_by,
payout_wallet_address: None,
kyc_status: "none".to_string(),
kyc_verified_at: None,
kyc_expires_at: None,
accreditation_status: "none".to_string(),
accreditation_verified_at: None,
accreditation_expires_at: None,
};
let created_user = match state.user_repo.create(new_user).await {
Ok(created) => created,
Err(AppError::EmailExists) => {
state
.user_repo
.find_by_email(&email)
.await?
.ok_or_else(|| {
AppError::Internal(anyhow::anyhow!(
"User vanished after EmailExists"
))
})?
}
Err(e) => return Err(e),
};
if let Some(code_id) = gate_result.access_code_id {
if let Err(e) = state.signup_gating_service.mark_code_used(code_id).await {
tracing::warn!(
user_id = %created_user.id,
code_id = %code_id,
error = %e,
"Failed to mark access code as used"
);
}
}
created_user
}
};
state
.verification_repo
.delete_for_user(user.id, TokenType::InstantLink)
.await?;
let token = generate_verification_token();
let token_hash = hash_verification_token(&token);
state
.verification_repo
.create(
user.id,
&token_hash,
TokenType::InstantLink,
default_expiry(TokenType::InstantLink),
)
.await
.map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to create token: {}", e)))?;
state
.comms_service
.queue_instant_link_email(&email, user.name.as_deref(), &token, Some(user.id))
.await?;
let _ = state
.audit_service
.log_user_event(
AuditEventType::InstantLinkRequested,
user.id,
Some(&headers),
)
.await;
let elapsed = started_at.elapsed();
if elapsed < MIN_DURATION {
tokio::time::sleep(MIN_DURATION - elapsed).await;
}
Ok(response)
}
pub async fn verify_instant_link<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
PeerIp(peer_ip): PeerIp,
Json(req): Json<VerifyInstantLinkRequest>,
) -> Result<impl IntoResponse, AppError> {
let enabled = state
.settings_service
.get_bool("auth_instantlink_enabled")
.await
.ok()
.flatten()
.unwrap_or(state.config.email.enabled);
if !enabled {
return Err(AppError::NotFound("Instant link auth disabled".into()));
}
state.sanctions_service.check_country_from_request(&headers).await?;
let token_hash = hash_verification_token(&req.token);
let token = state
.verification_repo
.consume_if_valid(&token_hash)
.await
.map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to consume token: {}", e)))?
.ok_or_else(|| AppError::Validation("Invalid or expired link".to_string()))?;
if token.token_type != TokenType::InstantLink {
return Err(AppError::Validation("Invalid or expired link".to_string()));
}
let mut user = state
.user_repo
.find_by_id(token.user_id)
.await?
.ok_or(AppError::NotFound("User not found".into()))?;
if !user.email_verified {
state.user_repo.set_email_verified(user.id, true).await?;
user.email_verified = true;
}
let has_mfa = state.totp_repo.has_mfa_enabled(user.id).await?;
if has_mfa {
let mfa_token = generate_verification_token();
let mfa_token_hash = hash_verification_token(&mfa_token);
let _ = state
.verification_repo
.delete_for_user(user.id, TokenType::MfaPending)
.await;
state
.verification_repo
.create(
user.id,
&mfa_token_hash,
TokenType::MfaPending,
default_expiry(TokenType::MfaPending),
)
.await
.map_err(|e| {
AppError::Internal(anyhow::anyhow!("Failed to create MFA token: {}", e))
})?;
let _ = state
.audit_service
.log_user_event(AuditEventType::MfaChallengeIssued, user.id, Some(&headers))
.await;
return Ok(Json(json!({
"mfaRequired": true,
"mfaToken": mfa_token,
"userId": user.id
}))
.into_response());
}
let memberships = state.membership_repo.find_by_user(user.id).await?;
let (is_new_user, raw_api_key) = if memberships.is_empty() {
let org_assignment = resolve_org_assignment(&state, user.id).await?;
let membership = MembershipEntity::new(user.id, org_assignment.org_id, org_assignment.role);
state.membership_repo.create(membership).await?;
let raw = generate_api_key();
let api_key_entity = ApiKeyEntity::new(user.id, &raw, "default");
state.api_key_repo.create(api_key_entity).await?;
if let Some(referrer_id) = user.referred_by {
if let Err(e) = crate::services::referral_reward_service::issue_signup_reward(
&*state.user_repo,
&*state.credit_repo,
&*state.referral_payout_repo,
&state.settings_service,
&*state.callback,
user.id,
referrer_id,
&state.config.privacy.company_currency,
)
.await
{
tracing::warn!(
user_id = %user.id,
referrer_id = %referrer_id,
error = %e,
"Failed to issue referral signup reward"
);
}
}
(true, Some(raw))
} else {
(false, None)
};
let memberships = if is_new_user {
state.membership_repo.find_by_user(user.id).await?
} else {
memberships
};
let token_context = get_default_org_context(&memberships, user.is_system_admin, user.email_verified);
let session_id = uuid::Uuid::new_v4();
let token_pair =
state
.jwt_service
.generate_token_pair_with_context(user.id, session_id, &token_context)?;
let refresh_expiry =
Utc::now() + Duration::seconds(state.jwt_service.refresh_expiry_secs() as i64);
let ip_address =
extract_client_ip_with_fallback(&headers, state.config.server.trust_proxy, peer_ip);
let user_agent = headers
.get(header::USER_AGENT)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let mut session = SessionEntity::new_with_id(
session_id,
user.id,
hash_refresh_token(&token_pair.refresh_token, &state.config.jwt.secret),
refresh_expiry,
ip_address.clone(),
user_agent.clone(),
);
session.last_strong_auth_at = Some(Utc::now());
state.session_repo.create(session).await?;
if let Some(email) = &user.email {
let previous_sessions = state
.session_repo
.find_recent_by_user_id(user.id, 10)
.await?;
let previous_user_agents: Vec<Option<String>> = previous_sessions
.iter()
.filter(|s| s.id != session_id) .map(|s| s.user_agent.clone())
.collect();
let device_info = DeviceInfo::from_user_agent(user_agent.as_deref());
if !previous_user_agents.is_empty()
&& is_new_device(&device_info.fingerprint, &previous_user_agents)
{
let login_time = Utc::now().format("%B %d, %Y at %H:%M UTC").to_string();
let _ = state
.comms_service
.queue_security_alert_email(
email,
user.name.as_deref(),
user.id,
&login_time,
ip_address.as_deref(),
Some(&device_info.device_type),
Some(&device_info.browser),
)
.await;
}
}
let auth_user = user_entity_to_auth_user(&user);
let payload = AuthCallbackPayload {
user: auth_user.clone(),
method: AuthMethod::Email, is_new_user,
session_id: session_id.to_string(),
ip_address,
user_agent,
referral: None, };
let callback_data = if is_new_user {
call_registered_callback_with_timeout(&state.callback, &payload).await
} else {
call_authenticated_callback_with_timeout(&state.callback, &payload).await
};
let audit_event = if is_new_user {
AuditEventType::UserRegister
} else {
AuditEventType::UserLogin
};
let _ = state
.audit_service
.log_user_event(audit_event, user.id, Some(&headers))
.await;
let response_tokens = if state.config.cookie.enabled {
None
} else {
Some(token_pair.clone())
};
let response = AuthResponse {
user: auth_user,
tokens: response_tokens,
is_new_user,
callback_data,
api_key: raw_api_key,
email_queued: None,
post_login: compute_post_login(&user, &state.settings_service, &*state.totp_repo, &*state.credential_repo, &*state.wallet_material_repo, &*state.storage.pending_wallet_recovery_repo).await,
};
Ok(build_json_response_with_cookies(
&state.config.cookie,
&token_pair,
state.jwt_service.refresh_expiry_secs(),
response,
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_instant_link_request_deserialize() {
let json = r#"{"email": "test@example.com"}"#;
let req: InstantLinkRequest = serde_json::from_str(json).unwrap();
assert_eq!(req.email, "test@example.com");
}
#[test]
fn test_verify_instant_link_request_deserialize() {
let json = r#"{"token": "abc123xyz"}"#;
let req: VerifyInstantLinkRequest = serde_json::from_str(json).unwrap();
assert_eq!(req.token, "abc123xyz");
}
}