use std::convert::Infallible;
use postgres_native_tls::MakeTlsConnector;
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use hyper::header::HeaderValue;
use hyper::Body;
use hyper::HeaderMap;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use argon2::hash_encoded as argon_hash_encoded;
use argon2::Config as argon_config;
use kafka_threadpool::kafka_publisher::KafkaPublisher;
use crate::core::core_config::CoreConfig;
use crate::kafka::publish_msg::publish_msg;
use crate::requests::auth::validate_user_token::validate_user_token;
use crate::requests::models::user::get_user_by_id;
use crate::requests::models::user_otp::get_user_otp;
#[derive(Serialize, Deserialize, Clone)]
pub struct ApiReqUserConsumeOtp {
pub user_id: i32,
pub email: String,
pub token: String,
pub password: String,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct ApiResUserConsumeOtp {
pub user_id: i32,
pub otp_id: i32,
pub msg: String,
}
pub async fn consume_user_otp(
tracking_label: &str,
config: &CoreConfig,
db_pool: &Pool<PostgresConnectionManager<MakeTlsConnector>>,
kafka_pool: &KafkaPublisher,
headers: &HeaderMap<HeaderValue>,
bytes: &[u8],
) -> std::result::Result<Response<Body>, Infallible> {
let req_object: ApiReqUserConsumeOtp = match serde_json::from_slice(bytes) {
Ok(uo) => uo,
Err(_) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: -1,
otp_id: -1,
msg: ("User consume one-time-password failed - \
please ensure \
user_id, email, token, and password \
were set correctly in the request")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
if req_object.user_id < 0 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User consume one-time-password failed \
please ensure the \
user_id is a non-negative number")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
} else if req_object.email.is_empty() {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User consume one-time-password failed \
please ensure the \
email is set to the user's email address")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
} else if req_object.password.is_empty() {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User consume one-time-password failed \
please ensure the \
passsword is set")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
} else if req_object.password.len() < 4 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User consume one-time-password failed \
please ensure the \
passsword is longer than 4 characters")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
} else if req_object.token.len() < 4 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User consume one-time-password failed \
please ensure the \
token is longer than 4 characters")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
} else if req_object.token.len() > 256 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User consume one-time-password failed \
please ensure the \
token is shorter than 256 characters")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let conn = db_pool.get().await.unwrap();
let user_clone = req_object.clone();
let user_id = user_clone.user_id;
let user_email = user_clone.email;
let _token = match validate_user_token(
tracking_label,
config,
&conn,
headers,
user_id,
)
.await
{
Ok(_token) => _token,
Err(_) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User consume one-time-password failed \
due to invalid token")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
let user_model = match get_user_by_id(tracking_label, user_id, &conn).await
{
Ok(user_model) => user_model,
Err(err_msg) => {
error!(
"{tracking_label} - \
failed to consume one-time-password user {user_id} \
with err='{err_msg}'"
);
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: format!(
"User consume one-time-password failed - \
unable to find user with id: {user_id}"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
if user_model.email != req_object.email && user_email != user_model.email {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: format!(
"User consume one-time-password failed - \
user_email does not match {}",
req_object.email
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let user_otp_model = match get_user_otp(
tracking_label,
user_id,
&req_object.email,
&req_object.token,
&conn,
)
.await
{
Ok(rec) => rec,
Err(_) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User one-time-password record does not exist")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
if req_object.token != user_otp_model.token {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: format!(
"User one-time-password token={} does not match \
db otp_token={}",
req_object.token, user_otp_model.token
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let now: chrono::DateTime<chrono::Utc> = chrono::Utc::now();
let exp_vs_now_diff =
now.signed_duration_since(user_otp_model.exp_date_utc);
let exp_date_vs_now = exp_vs_now_diff.num_seconds();
if exp_date_vs_now > 0 {
let err_msg = format!(
"{tracking_label} - user {user_id} \
one-time-password token {} \
expired on: \
exp_date={} \
duration_since={exp_date_vs_now}s",
req_object.token, user_otp_model.exp_date_utc
);
error!("{err_msg}");
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User one-time-password has expired").to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
info!(
"{tracking_label} - \
consuming user {user_id} otp"
);
let cur_query = format!(
"UPDATE \
users_otp \
SET \
state = 1, \
consumed_date = '{now}' \
WHERE \
user_id = {user_id} \
AND \
state = 0 \
AND \
token = '{}' \
AND \
email = '{user_email}' \
RETURNING \
users_otp.id, \
users_otp.user_id, \
users_otp.token, \
users_otp.email, \
users_otp.state, \
users_otp.exp_date;",
req_object.token
);
let stmt = conn.prepare(&cur_query).await.unwrap();
let query_result = match conn.query(&stmt, &[]).await {
Ok(query_result) => query_result,
Err(e) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: format!(
"User consume one-time-password failed \
for user_id={user_id} {user_email} \
with err='{e}'"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
if let Some(row) = query_result.first() {
let argon_config = argon_config::default();
let new_password = argon_hash_encoded(
req_object.password.as_bytes(),
&config.server_password_salt,
&argon_config,
)
.unwrap();
let update_user_query = format!(
"UPDATE \
users \
SET \
password = '{new_password}' \
WHERE \
users.id = {user_id};"
);
let stmt = conn.prepare(&update_user_query).await.unwrap();
let _ = match conn.query(&stmt, &[]).await {
Ok(query_result) => query_result,
Err(e) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: format!(
"User consume one-time-password failed \
to reset user's password for \
user_id={user_id} {user_email} \
with err='{e}'"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
let user_otp_id: i32 = row.try_get("id").unwrap();
if config.kafka_publish_events {
publish_msg(
kafka_pool,
"user.events",
&format!("user-{}", user_id),
None,
&format!("USER_CONSUME_OTP user={user_id}"),
)
.await;
}
let response = Response::builder()
.status(200)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id,
otp_id: user_otp_id,
msg: "success".to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserConsumeOtp {
user_id: req_object.user_id,
otp_id: -1,
msg: ("User consume one-time-password failed - \
no records found in db")
.to_string(),
})
.unwrap(),
))
.unwrap();
Ok(response)
}