use std::convert::Infallible;
use postgres_native_tls::MakeTlsConnector;
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use hyper::Body;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use argon2::hash_encoded as argon_hash_encoded;
use argon2::Config as argon_config;
use kafka_threadpool::kafka_publisher::KafkaPublisher;
use crate::core::core_config::CoreConfig;
use crate::kafka::publish_msg::publish_msg;
use crate::requests::auth::create_user_token::create_user_token;
use crate::requests::auth::login_user::ApiResUserLogin;
use crate::requests::user::is_verification_enabled::is_verification_enabled;
use crate::requests::user::upsert_user_verification::upsert_user_verification;
use crate::utils::get_server_address::get_server_address;
#[derive(Serialize, Deserialize, Clone)]
pub struct ApiReqUserCreate {
pub email: String,
pub password: String,
}
#[derive(Serialize, Deserialize, Default, Clone)]
pub struct ApiResUserCreate {
pub user_id: i32,
pub email: String,
pub state: i32,
pub role: String,
pub token: String,
pub msg: String,
}
pub async fn create_user(
tracking_label: &str,
config: &CoreConfig,
db_pool: &Pool<PostgresConnectionManager<MakeTlsConnector>>,
kafka_pool: &KafkaPublisher,
bytes: &[u8],
) -> std::result::Result<Response<Body>, Infallible> {
let user_object: ApiReqUserCreate = serde_json::from_slice(bytes).unwrap();
if user_object.password.len() < 4 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserCreate {
user_id: -1,
email: "".to_string(),
state: -1,
role: "".to_string(),
token: "".to_string(),
msg: ("User password must be more than 4 characters")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let mut user_role = "user";
if user_object.email == "admin@email.com" {
user_role = "admin";
}
let user_verification_enabled = is_verification_enabled();
let user_start_state_value = 0;
let user_verified_value = match user_verification_enabled {
true => 0,
false => 1,
};
let argon_config = argon_config::default();
let hash = argon_hash_encoded(
user_object.password.as_bytes(),
&config.server_password_salt,
&argon_config,
)
.unwrap();
let insert_query = format!(
"INSERT INTO \
users (\
email, \
password, \
state, \
verified, \
role) \
VALUES (\
'{}', \
'{hash}', \
{user_start_state_value}, \
{user_verified_value}, \
'{user_role}') \
RETURNING \
users.id, \
users.email, \
users.password, \
users.state, \
users.verified, \
users.role;",
user_object.email
);
let conn = db_pool.get().await.unwrap();
let stmt = conn.prepare(&insert_query).await.unwrap();
let query_result = match conn.query(&stmt, &[]).await {
Ok(query_result) => query_result,
Err(e) => {
let err_msg = format!("{e}");
if err_msg.contains("duplicate key value violates") {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserCreate {
user_id: -1,
email: "".to_string(),
state: -1,
role: "".to_string(),
token: "".to_string(),
msg: format!(
"User email {} already registered",
user_object.email
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
} else {
let response = Response::builder()
.status(500)
.body(Body::from(
serde_json::to_string(
&ApiResUserCreate {
user_id: -1,
email: "".to_string(),
state: -1,
role: "".to_string(),
token: "".to_string(),
msg: format!(
"User creation failed for email={} with err='{err_msg}'",
user_object.email)
}).unwrap()))
.unwrap();
return Ok(response);
}
}
};
let mut row_list: Vec<(i32, String, String, i32, i32, String)> =
Vec::with_capacity(1);
for row in query_result.iter() {
let id: i32 = row.try_get("id").unwrap();
let email: String = row.try_get("email").unwrap();
let password: String = row.try_get("password").unwrap();
if password != hash {
error!("BAD PASSWORD FOUND DURING USER CREATION:\npassword=\n{password}\n!=\nsalt=\n{hash}");
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserLogin {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
token: "".to_string(),
msg: ("User login failed - invalid password")
.to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let user_state: i32 = row.try_get("state").unwrap();
let user_verified_db: i32 = row.try_get("verified").unwrap();
let role: String = row.try_get("role").unwrap();
row_list.push((id, email, password, user_state, user_verified_db, role))
}
if row_list.is_empty() {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(
&ApiResUserLogin {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
token: "".to_string(),
msg: format!(
"User creation failed - user does not exist with email={}",
user_object.email)
}
).unwrap()))
.unwrap();
Ok(response)
} else {
let user_id = row_list[0].0;
let user_email = row_list[0].1.clone();
let user_token = match create_user_token(
tracking_label,
config,
&conn,
&user_email,
user_id,
)
.await
{
Ok(user_token) => user_token,
Err(_) => {
let response = Response::builder()
.status(500)
.body(Body::from(
serde_json::to_string(
&ApiResUserLogin {
user_id: -1,
email: "".to_string(),
state: -1,
verified: -1,
role: "".to_string(),
token: "".to_string(),
msg: format!("User token creation failed - {user_id} {user_email}"),
}
).unwrap()))
.unwrap();
return Ok(response);
}
};
if user_verification_enabled {
match upsert_user_verification(
tracking_label,
user_id,
&user_email,
true, 0, &conn,
)
.await
{
Ok(verification_token) => {
info!(
"{tracking_label} - verify token created user={user_id} \
{user_email} - verify url:\
curl -ks \
\"https://{}/user/verify?u={user_id}&t={verification_token}\" \
| jq",
get_server_address("api"));
}
Err(e) => {
error!(
"{tracking_label} - \
failed to generate verify token for user {user_id} \
{user_email} with err='{e}'"
);
}
};
}
if config.kafka_publish_events {
publish_msg(
kafka_pool,
"user.events",
&format!("user-{}", user_id),
None,
&format!("USER_CREATE user={user_id} email={user_email}"),
)
.await;
}
let response = Response::builder()
.status(201)
.body(Body::from(
serde_json::to_string(&ApiResUserLogin {
user_id,
email: user_email,
state: row_list[0].3,
verified: row_list[0].4,
role: row_list[0].5.clone(),
token: user_token,
msg: "success".to_string(),
})
.unwrap(),
))
.unwrap();
Ok(response)
}
}