use std::convert::Infallible;
use postgres_native_tls::MakeTlsConnector;
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use hyper::Body;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use kafka_threadpool::kafka_publisher::KafkaPublisher;
use crate::core::core_config::CoreConfig;
use crate::kafka::publish_msg::publish_msg;
use crate::requests::models::user::get_user_by_id;
use crate::requests::models::user_verify::get_user_verify_by_user_id;
use crate::requests::user::is_verification_enabled::is_verification_enabled;
use crate::utils::get_query_params_from_url::get_query_params_from_url;
#[derive(Serialize, Deserialize, Clone)]
pub struct ApiReqUserVerify {
pub u: i32,
pub t: String,
pub e: Option<String>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct ApiResUserVerify {
pub user_id: i32,
pub email: String,
pub state: i32,
pub verified: i32,
pub role: String,
pub msg: String,
}
pub async fn verify_user(
tracking_label: &str,
config: &CoreConfig,
db_pool: &Pool<PostgresConnectionManager<MakeTlsConnector>>,
kafka_pool: &KafkaPublisher,
full_url: &str,
) -> std::result::Result<Response<Body>, Infallible> {
let params_map =
match get_query_params_from_url(tracking_label, full_url).await {
Ok(params_map) => params_map,
Err(_) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: ("Missing required query params").to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
let user_id: i32 = match params_map.get("u") {
Some(user_id_str) => {
let user_id: i32 = user_id_str.parse::<i32>().unwrap_or(-1);
user_id
}
None => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: ("Missing required query param: user id")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
let verify_token: String = match params_map.get("t") {
Some(verify_token) => verify_token.to_string(),
None => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: ("User verify failed - please ensure \
the verify token is correct and reach out \
to support for additional help")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
if user_id <= 0 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: ("User verify failed - please ensure \
the user id must be a non-negative number \
and reach out to support for additional help")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let verify_token_len = verify_token.len();
if verify_token_len < 20 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: format!(
"User verify failed - please ensure \
the verify token is valid \
({verify_token_len} is too short) \
and reach out to support for additional help"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
} else if verify_token_len > 256 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: format!(
"User verify failed - please ensure \
the verify token is valid \
({verify_token_len} is too long) \
and reach out to support for additional help"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let conn = db_pool.get().await.unwrap();
let user_model = match get_user_by_id(tracking_label, user_id, &conn).await
{
Ok(user_model) => user_model,
Err(_) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: ("User verify failed - please ensure \
the parameters are correct and reach out \
to support for additional help")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
if !is_verification_enabled() {
let response = Response::builder()
.status(200)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: user_model.id,
email: user_model.email,
state: user_model.state,
verified: user_model.verified,
role: user_model.role,
msg: ("User verification success").to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let user_email = user_model.email.clone();
if user_model.state != 0 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: user_model.email,
state: user_model.state,
verified: user_model.verified,
role: user_model.role,
msg: format!(
"User {user_id} is inactive - \
not able to verify {user_email}"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
if user_model.verified != 0 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: user_model.id,
email: user_model.email,
state: user_model.state,
verified: user_model.verified,
role: user_model.role,
msg: ("User already verified").to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let user_verify_model = match get_user_verify_by_user_id(
tracking_label,
user_id,
&conn,
)
.await
{
Ok(uvm) => uvm,
Err(_) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: ("User verification record does not exist")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
let now: chrono::DateTime<chrono::Utc> = chrono::Utc::now();
let exp_vs_now_diff =
now.signed_duration_since(user_verify_model.exp_date_utc);
let exp_date_vs_now = exp_vs_now_diff.num_seconds();
info!(
"{tracking_label} - user {user_id} verifying exp_date={} \
now={} \
num_seconds_expired={exp_date_vs_now}s",
user_verify_model.exp_date_utc.format("%Y-%m-%dT%H:%M:%SZ"),
now.format("%Y-%m-%dT%H:%M:%SZ")
);
if exp_date_vs_now > 0 {
let err_msg = format!(
"{tracking_label} - user {user_id} \
verify token {verify_token} \
expired on: \
exp_date={} \
duration_since={exp_date_vs_now}s",
user_verify_model.exp_date_utc
);
error!("{err_msg}");
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: format!("user {user_email} verification has expired"),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let query = format!(
"UPDATE \
users_verified \
SET \
email = '{user_email}', \
state = 1, \
verify_date = '{now}' \
WHERE \
users_verified.user_id = {user_id} \
RETURNING \
users_verified.user_id,
users_verified.token,
users_verified.email,
users_verified.state;"
);
let stmt = conn.prepare(&query).await.unwrap();
let query_result = match conn.query(&stmt, &[]).await {
Ok(query_result) => {
info!(
"{tracking_label} - \
user {user_id} email {user_email} token verified"
);
query_result
}
Err(e) => {
let err_msg = format!("{e}");
if err_msg.contains(
"db error: ERROR: duplicate key value \
violates unique constraint",
) && err_msg.contains("users_verified_email_key")
&& err_msg.contains("already exists")
{
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: format!(
"User email is already \
in use: {user_email}"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
} else {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: format!(
"User update failed for user_id={user_id} \
{user_email} \
with err='{err_msg}'"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
}
};
let query = format!(
"UPDATE \
users \
SET \
verified = 1 \
WHERE \
users.id = {user_id};"
);
let stmt = conn.prepare(&query).await.unwrap();
match conn.query(&stmt, &[]).await {
Ok(_) => {
info!(
"{tracking_label} - \
user {user_id} email {user_email} account verified"
);
}
Err(e) => {
let err_msg = format!("{e}");
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: format!(
"User table update failed for user verification \
user_id={user_id} {user_email}={verify_token} \
with err='{err_msg}'"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
if let Some(row) = query_result.first() {
let found_user_id: i32 = row.try_get("user_id").unwrap();
let email: String = row.try_get("email").unwrap();
let user_verify_state: i32 = row.try_get("state").unwrap();
if config.kafka_publish_events {
publish_msg(
kafka_pool,
"user.events",
&format!("user-{}", user_id),
None,
&format!("USER_VERIFY user={user_id} email={email}"),
)
.await;
}
let response = Response::builder()
.status(200)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: found_user_id,
email: email.clone(),
state: user_model.state,
verified: user_verify_state,
role: user_model.role,
msg: format!("user {found_user_id} verified {email}"),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
Ok(Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserVerify {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
msg: format!(
"User update failed - user does \
not exist with user_id={user_id} email={user_email}"
),
})
.unwrap(),
))
.unwrap())
}