use axum::{
extract::{Multipart, Path, Query, State},
http::HeaderMap,
Json,
};
use chrono::{DateTime, Utc};
use std::sync::Arc;
use uuid::Uuid;
use crate::callback::AuthCallback;
use crate::errors::AppError;
use crate::models::ListUsersQueryParams;
use crate::repositories::{
pagination::cap_limit, pagination::cap_offset, AccreditationDocumentEntity,
AccreditationSubmissionEntity,
};
use crate::services::{EmailService, ImageStorageService};
use crate::utils::authenticate;
use crate::AppState;
use axum::body::Bytes;
use crate::handlers::admin::validate_system_admin;
use crate::handlers::upload::build_storage_service;
fn format_dt(dt: &DateTime<Utc>) -> String {
dt.to_rfc3339()
}
fn validate_accreditation_status(status: &str) -> Result<(), AppError> {
match status {
"none" | "pending" | "approved" | "rejected" | "expired" => Ok(()),
other => Err(AppError::Validation(format!(
"Invalid accreditation status '{}'. Expected one of: none, pending, approved, rejected, expired",
other
))),
}
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AccreditationStatusApiResponse {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub verified_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires_at: Option<String>,
pub enforcement_mode: String,
}
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubmitAccreditationRequest {
pub method: String,
pub income_type: Option<String>,
pub stated_amount_usd: Option<f64>,
pub crd_number: Option<String>,
pub license_type: Option<String>,
pub investment_commitment_usd: Option<f64>,
pub entity_type: Option<String>,
pub user_statement: Option<String>,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SubmitAccreditationResponse {
pub submission_id: String,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AccreditationDocumentItem {
pub id: String,
pub submission_id: String,
pub document_type: String,
pub s3_key: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub original_filename: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub file_size_bytes: Option<i64>,
pub uploaded_at: String,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DocumentUploadResponse {
pub document_id: String,
pub submission_id: String,
pub s3_key: String,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DocumentPresignedUrlResponse {
pub url: String,
pub expires_in: u32,
}
const ALLOWED_DOC_CONTENT_TYPES: &[&str] = &[
"application/pdf",
"image/jpeg",
"image/jpg",
"image/png",
"image/tiff",
"image/tif",
];
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AccreditationSubmissionItem {
pub id: String,
pub user_id: String,
pub method: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub income_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stated_amount_usd: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub crd_number: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub license_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub investment_commitment_usd: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub entity_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user_statement: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reviewed_by: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reviewed_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reviewer_notes: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rejection_reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires_at: Option<String>,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SubmissionsListResponse {
pub submissions: Vec<AccreditationSubmissionItem>,
pub total: u64,
pub limit: u32,
pub offset: u32,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PendingListResponse {
pub submissions: Vec<AccreditationSubmissionItem>,
pub total: u64,
pub limit: u32,
pub offset: u32,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AdminUserAccreditationResponse {
pub user_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub verified_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires_at: Option<String>,
pub submissions: Vec<AccreditationSubmissionItem>,
pub total_submissions: u64,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AdminSubmissionDetailResponse {
pub submission: AccreditationSubmissionItem,
pub documents: Vec<AccreditationDocumentItem>,
}
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReviewAccreditationRequest {
pub approved: bool,
pub reviewer_notes: Option<String>,
pub rejection_reason: Option<String>,
pub expiry_days: Option<i64>,
}
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AccreditationOverrideRequest {
pub status: String,
pub expires_at: Option<String>,
}
fn map_submission(s: AccreditationSubmissionEntity) -> AccreditationSubmissionItem {
AccreditationSubmissionItem {
id: s.id.to_string(),
user_id: s.user_id.to_string(),
method: s.method,
status: s.status,
income_type: s.income_type,
stated_amount_usd: s.stated_amount_usd,
crd_number: s.crd_number,
license_type: s.license_type,
investment_commitment_usd: s.investment_commitment_usd,
entity_type: s.entity_type,
user_statement: s.user_statement,
reviewed_by: s.reviewed_by.map(|u| u.to_string()),
reviewed_at: s.reviewed_at.as_ref().map(format_dt),
reviewer_notes: s.reviewer_notes,
rejection_reason: s.rejection_reason,
expires_at: s.expires_at.as_ref().map(format_dt),
created_at: format_dt(&s.created_at),
updated_at: format_dt(&s.updated_at),
}
}
fn map_document(d: AccreditationDocumentEntity) -> AccreditationDocumentItem {
AccreditationDocumentItem {
id: d.id.to_string(),
submission_id: d.submission_id.to_string(),
document_type: d.document_type,
s3_key: d.s3_key,
original_filename: d.original_filename,
content_type: d.content_type,
file_size_bytes: d.file_size_bytes,
uploaded_at: format_dt(&d.uploaded_at),
}
}
pub async fn accreditation_status<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
) -> Result<Json<AccreditationStatusApiResponse>, AppError> {
let auth_user = authenticate(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
let status = accreditation_service.get_status(auth_user.user_id).await?;
Ok(Json(AccreditationStatusApiResponse {
status: status.status,
verified_at: status.verified_at.as_ref().map(format_dt),
expires_at: status.expires_at.as_ref().map(format_dt),
enforcement_mode: status.enforcement_mode,
}))
}
pub async fn submit_accreditation<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Json(request): Json<SubmitAccreditationRequest>,
) -> Result<Json<SubmitAccreditationResponse>, AppError> {
let auth_user = authenticate(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
if request.method.is_empty() {
return Err(AppError::Validation("method is required".into()));
}
let data = crate::services::SubmitAccreditationData {
method: request.method,
income_type: request.income_type,
stated_amount_usd: request.stated_amount_usd,
crd_number: request.crd_number,
license_type: request.license_type,
investment_commitment_usd: request.investment_commitment_usd,
entity_type: request.entity_type,
user_statement: request.user_statement,
};
let result = accreditation_service
.submit_verification(auth_user.user_id, data)
.await?;
tracing::info!(
user_id = %auth_user.user_id,
submission_id = %result.submission_id,
"Accreditation submission created"
);
Ok(Json(SubmitAccreditationResponse {
submission_id: result.submission_id.to_string(),
}))
}
pub async fn upload_accreditation_document<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
mut multipart: Multipart,
) -> Result<Json<DocumentUploadResponse>, AppError> {
let auth_user = authenticate(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
let mut submission_id_str: Option<String> = None;
let mut document_type: Option<String> = None;
let mut file_bytes: Option<Bytes> = None;
let mut filename: Option<String> = None;
let mut content_type_str: Option<String> = None;
while let Some(field) = multipart
.next_field()
.await
.map_err(|e| AppError::Validation(format!("Invalid multipart data: {}", e)))?
{
let field_name = field.name().unwrap_or("").to_string();
match field_name.as_str() {
"submissionId" => {
let val = field.text().await.map_err(|e| {
AppError::Validation(format!("Failed to read submissionId: {}", e))
})?;
submission_id_str = Some(val);
}
"documentType" => {
let val = field.text().await.map_err(|e| {
AppError::Validation(format!("Failed to read documentType: {}", e))
})?;
document_type = Some(val);
}
"file" => {
filename = field.file_name().map(str::to_string);
content_type_str = field.content_type().map(|ct| ct.to_string());
let data = field
.bytes()
.await
.map_err(|e| AppError::Validation(format!("Failed to read file: {}", e)))?;
file_bytes = Some(data);
}
_ => {
let _ = field.bytes().await;
}
}
}
let submission_id_str =
submission_id_str.ok_or_else(|| AppError::Validation("submissionId is required".into()))?;
let submission_id = Uuid::parse_str(&submission_id_str)
.map_err(|_| AppError::Validation("submissionId must be a valid UUID".into()))?;
let doc_type =
document_type.ok_or_else(|| AppError::Validation("documentType is required".into()))?;
let file_data = file_bytes.ok_or_else(|| AppError::Validation("file is required".into()))?;
let content_type_validated = content_type_str
.as_deref()
.unwrap_or("application/octet-stream");
if !ALLOWED_DOC_CONTENT_TYPES.contains(&content_type_validated) {
return Err(AppError::Validation(format!(
"Unsupported document type '{}'. Allowed: PDF, JPEG, PNG, TIFF.",
content_type_validated
)));
}
let file_size = file_data.len() as i64;
let ext = filename
.as_deref()
.and_then(|f| f.rsplit('.').next())
.unwrap_or("bin");
let doc_id = Uuid::new_v4();
let s3_key = format!(
"accreditation/{}/{}/{}.{}",
auth_user.user_id, submission_id, doc_id, ext
);
match build_storage_service(&state).await {
Ok(storage) => {
storage
.upload_document(&s3_key, &file_data, content_type_validated)
.await?;
}
Err(AppError::Validation(msg)) if msg.contains("not configured") => {
tracing::warn!(
s3_key = %s3_key,
"Image storage not configured; document metadata stored without S3 upload"
);
}
Err(e) => return Err(e),
}
let doc = accreditation_service
.add_document(
auth_user.user_id,
submission_id,
doc_type,
s3_key.clone(),
filename,
content_type_str,
Some(file_size),
)
.await?;
tracing::info!(
user_id = %auth_user.user_id,
submission_id = %submission_id,
document_id = %doc.id,
s3_key = %s3_key,
"Accreditation document uploaded"
);
Ok(Json(DocumentUploadResponse {
document_id: doc.id.to_string(),
submission_id: doc.submission_id.to_string(),
s3_key,
}))
}
pub async fn list_accreditation_submissions<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Query(params): Query<ListUsersQueryParams>,
) -> Result<Json<SubmissionsListResponse>, AppError> {
let auth_user = authenticate(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
let limit = cap_limit(params.limit);
let offset = cap_offset(params.offset);
let (submissions, total) = tokio::join!(
accreditation_service.list_submissions(auth_user.user_id, limit, offset),
accreditation_service.count_submissions(auth_user.user_id)
);
let submissions = submissions?;
let total = total?;
Ok(Json(SubmissionsListResponse {
submissions: submissions.into_iter().map(map_submission).collect(),
total,
limit,
offset,
}))
}
pub async fn list_pending_accreditations<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Query(params): Query<ListUsersQueryParams>,
) -> Result<Json<PendingListResponse>, AppError> {
validate_system_admin(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
let limit = cap_limit(params.limit);
let offset = cap_offset(params.offset);
let (items, total) = accreditation_service
.admin_list_pending(limit, offset)
.await?;
Ok(Json(PendingListResponse {
submissions: items.into_iter().map(map_submission).collect(),
total,
limit,
offset,
}))
}
pub async fn get_user_accreditation<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Path(user_id): Path<Uuid>,
) -> Result<Json<AdminUserAccreditationResponse>, AppError> {
validate_system_admin(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
let user = state
.user_repo
.find_by_id(user_id)
.await?
.ok_or(AppError::NotFound("User not found".into()))?;
let (submissions, total) = tokio::join!(
accreditation_service.list_submissions(user_id, 20, 0),
accreditation_service.count_submissions(user_id)
);
let submissions = submissions?;
let total = total?;
Ok(Json(AdminUserAccreditationResponse {
user_id: user_id.to_string(),
status: user.accreditation_status,
verified_at: user.accreditation_verified_at.as_ref().map(format_dt),
expires_at: user.accreditation_expires_at.as_ref().map(format_dt),
submissions: submissions.into_iter().map(map_submission).collect(),
total_submissions: total,
}))
}
pub async fn get_accreditation_submission<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Path(submission_id): Path<Uuid>,
) -> Result<Json<AdminSubmissionDetailResponse>, AppError> {
validate_system_admin(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
let submission = accreditation_service
.admin_get_submission(submission_id)
.await?
.ok_or_else(|| AppError::NotFound("Submission not found".into()))?;
let documents = accreditation_service
.admin_list_documents(submission_id)
.await?;
Ok(Json(AdminSubmissionDetailResponse {
submission: map_submission(submission),
documents: documents.into_iter().map(map_document).collect(),
}))
}
pub async fn review_accreditation<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Path(submission_id): Path<Uuid>,
Json(request): Json<ReviewAccreditationRequest>,
) -> Result<Json<serde_json::Value>, AppError> {
let admin_id = validate_system_admin(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
let _ = accreditation_service
.admin_get_submission(submission_id)
.await?
.ok_or_else(|| AppError::NotFound("Submission not found".into()))?;
accreditation_service
.admin_review(
submission_id,
admin_id,
request.approved,
request.reviewer_notes.clone(),
request.rejection_reason.clone(),
request.expiry_days.map(|d| d as u32),
)
.await?;
let outcome = if request.approved {
"approved"
} else {
"rejected"
};
tracing::info!(
admin_id = %admin_id,
submission_id = %submission_id,
outcome,
"Accreditation submission reviewed"
);
Ok(Json(serde_json::json!({
"ok": true,
"submissionId": submission_id.to_string(),
"approved": request.approved,
})))
}
pub async fn override_accreditation_status<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Path(user_id): Path<Uuid>,
Json(request): Json<AccreditationOverrideRequest>,
) -> Result<Json<serde_json::Value>, AppError> {
let admin_id = validate_system_admin(&state, &headers).await?;
validate_accreditation_status(&request.status)?;
let _ = state
.user_repo
.find_by_id(user_id)
.await?
.ok_or(AppError::NotFound("User not found".into()))?;
let expires_at: Option<DateTime<Utc>> = match request.expires_at.as_deref() {
None | Some("") => None,
Some(s) => {
let dt = DateTime::parse_from_rfc3339(s)
.map_err(|_| {
AppError::Validation(format!("Invalid expires_at timestamp: '{}'", s))
})?
.with_timezone(&Utc);
Some(dt)
}
};
let verified_at: Option<DateTime<Utc>> = if request.status == "approved" {
Some(Utc::now())
} else {
None
};
state
.user_repo
.set_accreditation_status(user_id, &request.status, verified_at, expires_at)
.await?;
tracing::info!(
admin_id = %admin_id,
user_id = %user_id,
status = %request.status,
"Admin accreditation status override applied"
);
Ok(Json(serde_json::json!({
"ok": true,
"userId": user_id.to_string(),
"status": request.status,
})))
}
pub async fn get_document_presigned_url<C: AuthCallback, E: EmailService>(
State(state): State<Arc<AppState<C, E>>>,
headers: HeaderMap,
Path(doc_id): Path<Uuid>,
) -> Result<Json<DocumentPresignedUrlResponse>, AppError> {
validate_system_admin(&state, &headers).await?;
let accreditation_service = state
.accreditation_service
.as_ref()
.ok_or_else(|| AppError::NotFound("Accreditation not available".into()))?;
let doc = accreditation_service
.admin_get_document(doc_id)
.await?
.ok_or_else(|| AppError::NotFound("Document not found".into()))?;
const EXPIRY_SECS: u32 = 900;
let storage = build_storage_service(&state).await?;
let url = storage.presign_get(&doc.s3_key, EXPIRY_SECS).await?;
Ok(Json(DocumentPresignedUrlResponse {
url,
expires_in: EXPIRY_SECS,
}))
}