mod signup;
use sqlx::PgPool;
use uuid::Uuid;
use serde_json::Value as JsonValue;
use crate::errors::AppError;
use crate::models::AuthMethod;
use crate::repositories::{CreateWalletMaterial, MembershipEntity, OrgRole, UserEntity};
pub struct TransactionalOps;
impl TransactionalOps {
pub async fn accept_invite_atomic(
pool: &PgPool,
invite_id: Uuid,
user_id: Uuid,
org_id: Uuid,
role: OrgRole,
) -> Result<MembershipEntity, AppError> {
let mut tx = pool
.begin()
.await
.map_err(|e| AppError::Database(format!("Failed to begin transaction: {}", e)))?;
let accepted_at = sqlx::query_scalar::<_, Option<chrono::DateTime<chrono::Utc>>>(
r#"
UPDATE invites
SET accepted_at = NOW()
WHERE id = $1
AND accepted_at IS NULL
AND expires_at > NOW()
RETURNING accepted_at
"#,
)
.bind(invite_id)
.fetch_optional(&mut *tx)
.await
.map_err(|e| AppError::Database(format!("Failed to accept invite: {}", e)))?;
if accepted_at.is_none() {
if let Err(e) = tx.rollback().await {
tracing::error!(
error = %e,
invite_id = %invite_id,
operation = "accept_invite_atomic",
step = "invite_validation_failed",
"Failed to rollback transaction after invite validation failure"
);
}
return Err(AppError::Validation(
"Invite not found, already accepted, or expired".into(),
));
}
let membership_id = Uuid::new_v4();
let row_result = sqlx::query_as::<_, MembershipRow>(
r#"
INSERT INTO memberships (id, user_id, org_id, role)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, org_id) DO NOTHING
RETURNING id, user_id, org_id, role, joined_at
"#,
)
.bind(membership_id)
.bind(user_id)
.bind(org_id)
.bind(role.as_str())
.fetch_optional(&mut *tx)
.await;
match row_result {
Ok(Some(row)) => {
tx.commit()
.await
.map_err(|e| AppError::Database(format!("Failed to commit: {}", e)))?;
Ok(MembershipEntity {
id: row.id,
user_id: row.user_id,
org_id: row.org_id,
role,
joined_at: row.joined_at,
})
}
Ok(None) => {
if let Err(e) = tx.rollback().await {
tracing::error!(
error = %e,
invite_id = %invite_id,
user_id = %user_id,
org_id = %org_id,
operation = "accept_invite_atomic",
step = "membership_conflict",
"Failed to rollback transaction after membership conflict"
);
}
Err(AppError::Validation(
"User is already a member of this organization".into(),
))
}
Err(e) => {
if let Err(rollback_err) = tx.rollback().await {
tracing::error!(
error = %rollback_err,
invite_id = %invite_id,
user_id = %user_id,
org_id = %org_id,
operation = "accept_invite_atomic",
step = "membership_creation_failed",
"Failed to rollback transaction after membership creation failure"
);
}
tracing::error!(
invite_id = %invite_id,
user_id = %user_id,
error = %e,
"CRITICAL: Membership creation failed after invite acceptance"
);
Err(AppError::Database(format!(
"Failed to create membership: {}",
e
)))
}
}
}
pub async fn create_user_with_membership_atomic(
pool: &PgPool,
user: UserEntity,
org_id: Uuid,
role: OrgRole,
) -> Result<(UserEntity, MembershipEntity), AppError> {
let mut tx = pool
.begin()
.await
.map_err(|e| AppError::Database(format!("Failed to begin transaction: {}", e)))?;
let now = chrono::Utc::now();
let auth_methods: Vec<String> = user
.auth_methods
.iter()
.map(|m| match m {
AuthMethod::Email => "email".to_string(),
AuthMethod::Google => "google".to_string(),
AuthMethod::Apple => "apple".to_string(),
AuthMethod::Solana => "solana".to_string(),
AuthMethod::WebAuthn => "webauthn".to_string(),
AuthMethod::Sso => "sso".to_string(),
})
.collect();
let user_result = sqlx::query_as::<_, UserRow>(
r#"
INSERT INTO users (id, email, email_verified, password_hash, name, username, picture,
wallet_address, google_id, apple_id, stripe_customer_id,
auth_methods, is_system_admin, created_at, updated_at, last_login_at,
welcome_completed_at, referral_code, referred_by)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $14, $15, $16, $17, $18)
ON CONFLICT (email) DO NOTHING
RETURNING id, email, email_verified, password_hash, name, username, picture,
wallet_address, google_id, apple_id, stripe_customer_id,
auth_methods, is_system_admin, created_at, updated_at, last_login_at,
welcome_completed_at, referral_code, referred_by, payout_wallet_address
"#,
)
.bind(user.id)
.bind(&user.email)
.bind(user.email_verified)
.bind(&user.password_hash)
.bind(&user.name)
.bind(&user.username)
.bind(&user.picture)
.bind(&user.wallet_address)
.bind(&user.google_id)
.bind(&user.apple_id)
.bind(&user.stripe_customer_id)
.bind(&auth_methods)
.bind(user.is_system_admin)
.bind(now)
.bind(user.last_login_at)
.bind(user.welcome_completed_at)
.bind(&user.referral_code)
.bind(user.referred_by)
.fetch_optional(&mut *tx)
.await;
let user_row = match user_result {
Ok(Some(row)) => row,
Ok(None) => {
if let Err(rollback_err) = tx.rollback().await {
tracing::error!(
error = %rollback_err,
user_id = %user.id,
email = ?user.email,
org_id = %org_id,
operation = "create_user_with_membership_atomic",
step = "email_conflict",
"Failed to rollback transaction after email conflict"
);
}
return Err(AppError::EmailExists);
}
Err(e) => {
if let Err(rollback_err) = tx.rollback().await {
tracing::error!(
error = %rollback_err,
user_id = %user.id,
email = ?user.email,
org_id = %org_id,
operation = "create_user_with_membership_atomic",
step = "user_creation_failed",
"Failed to rollback transaction after user creation failure"
);
}
if let sqlx::Error::Database(ref db_err) = e {
if db_err.is_unique_violation() {
let constraint = db_err.constraint().unwrap_or("");
if constraint.contains("wallet") {
return Err(AppError::WalletExists);
}
return Err(AppError::EmailExists);
}
}
return Err(AppError::Database(format!("Failed to create user: {}", e)));
}
};
let membership_id = Uuid::new_v4();
let membership_result = sqlx::query_as::<_, MembershipRow>(
r#"
INSERT INTO memberships (id, user_id, org_id, role)
VALUES ($1, $2, $3, $4)
RETURNING id, user_id, org_id, role, joined_at
"#,
)
.bind(membership_id)
.bind(user_row.id)
.bind(org_id)
.bind(role.as_str())
.fetch_one(&mut *tx)
.await;
let membership_row = match membership_result {
Ok(row) => row,
Err(e) => {
if let Err(rollback_err) = tx.rollback().await {
tracing::error!(
error = %rollback_err,
user_id = %user_row.id,
org_id = %org_id,
operation = "create_user_with_membership_atomic",
step = "membership_creation_failed",
"Failed to rollback transaction after membership creation failure"
);
}
return Err(AppError::Database(format!(
"Failed to create membership: {}",
e
)));
}
};
tx.commit()
.await
.map_err(|e| AppError::Database(format!("Failed to commit: {}", e)))?;
let created_user = UserEntity {
id: user_row.id,
email: user_row.email,
email_verified: user_row.email_verified,
password_hash: user_row.password_hash,
name: user_row.name,
username: user_row.username,
picture: user_row.picture,
wallet_address: user_row.wallet_address,
google_id: user_row.google_id,
apple_id: user_row.apple_id,
stripe_customer_id: user_row.stripe_customer_id,
auth_methods: user.auth_methods.clone(),
is_system_admin: user_row.is_system_admin,
created_at: user_row.created_at,
updated_at: user_row.updated_at,
last_login_at: user_row.last_login_at,
welcome_completed_at: user_row.welcome_completed_at,
referral_code: user_row.referral_code,
referred_by: user_row.referred_by,
payout_wallet_address: user_row.payout_wallet_address,
kyc_status: "none".to_string(),
kyc_verified_at: None,
kyc_expires_at: None,
accreditation_status: "none".to_string(),
accreditation_verified_at: None,
accreditation_expires_at: None,
};
let membership = MembershipEntity {
id: membership_row.id,
user_id: membership_row.user_id,
org_id: membership_row.org_id,
role,
joined_at: membership_row.joined_at,
};
Ok((created_user, membership))
}
pub async fn recover_wallet_atomic(
pool: &PgPool,
material: CreateWalletMaterial,
) -> Result<(), AppError> {
let kdf_params_json: Option<JsonValue> = material
.share_a_kdf_params
.as_ref()
.map(serde_json::to_value)
.transpose()
.map_err(|e| AppError::Internal(e.into()))?;
let auth_method_str = material.share_a_auth_method.to_string();
let new_id = Uuid::new_v4();
let mut tx = pool
.begin()
.await
.map_err(|e| AppError::Database(format!("Failed to begin transaction: {}", e)))?;
sqlx::query("DELETE FROM solana_wallet_material WHERE user_id = $1 AND api_key_id IS NULL")
.bind(material.user_id)
.execute(&mut *tx)
.await
.map_err(|e| AppError::Database(format!("Failed to delete wallet material: {}", e)))?;
sqlx::query(
r#"
INSERT INTO solana_wallet_material (
id, user_id, solana_pubkey, scheme_version,
share_a_auth_method, share_a_ciphertext, share_a_nonce,
share_a_kdf_salt, share_a_kdf_params_json, prf_salt, share_a_pin_hash,
share_b, api_key_id, shamir_t, shamir_n,
created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, 2, 3, NOW(), NOW())
"#,
)
.bind(new_id)
.bind(material.user_id)
.bind(&material.solana_pubkey)
.bind(2i16)
.bind(&auth_method_str)
.bind(&material.share_a_ciphertext)
.bind(&material.share_a_nonce)
.bind(&material.share_a_kdf_salt)
.bind(&kdf_params_json)
.bind(&material.prf_salt)
.bind(&material.share_a_pin_hash)
.bind(&material.share_b)
.bind(material.api_key_id)
.execute(&mut *tx)
.await
.map_err(|e| AppError::Database(format!("Failed to create wallet material: {}", e)))?;
tx.commit()
.await
.map_err(|e| AppError::Database(format!("Failed to commit: {}", e)))?;
Ok(())
}
}
#[derive(sqlx::FromRow)]
struct MembershipRow {
id: Uuid,
user_id: Uuid,
org_id: Uuid,
_role: String,
joined_at: chrono::DateTime<chrono::Utc>,
}
#[derive(sqlx::FromRow)]
struct UserRow {
id: Uuid,
email: Option<String>,
email_verified: bool,
password_hash: Option<String>,
name: Option<String>,
username: Option<String>,
picture: Option<String>,
wallet_address: Option<String>,
google_id: Option<String>,
apple_id: Option<String>,
stripe_customer_id: Option<String>,
_auth_methods: Vec<String>,
is_system_admin: bool,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
last_login_at: Option<chrono::DateTime<chrono::Utc>>,
welcome_completed_at: Option<chrono::DateTime<chrono::Utc>>,
referral_code: String,
referred_by: Option<uuid::Uuid>,
payout_wallet_address: Option<String>,
}
#[cfg(test)]
mod tests {
}