use std::convert::Infallible;
use hyper::body;
use hyper::Body;
use hyper::Method;
use hyper::Response;
use crate::monitoring::metrics::handle_showing_metrics;
use crate::monitoring::metrics::record_monitoring_metrics_api_after;
use crate::monitoring::metrics::record_monitoring_metrics_api_before;
use crate::core::server::core_http_request::CoreHttpRequest;
use crate::utils::get_server_address::get_server_address;
use crate::requests::auth::login_user::login_user;
use crate::requests::user::consume_user_otp::consume_user_otp;
use crate::requests::user::create_otp::create_otp;
use crate::requests::user::create_user::create_user;
use crate::requests::user::delete_user::delete_user;
use crate::requests::user::get_user::get_user;
use crate::requests::user::search_user_data::search_user_data;
use crate::requests::user::search_users::search_users;
use crate::requests::user::update_user::update_user;
use crate::requests::user::update_user_data::update_user_data;
use crate::requests::user::upload_user_data::upload_user_data;
use crate::requests::user::verify_user::verify_user;
pub async fn handle_request(
data: CoreHttpRequest,
) -> std::result::Result<Response<Body>, Infallible> {
let tracking_label = data.config.label.to_string();
let mut processed_result: std::result::Result<Response<Body>, Infallible> =
Ok(Response::new(Body::from("prep".to_string())));
let (parts, body) = data.request.into_parts();
let request_uri = parts.uri.path();
let request_method = parts.method;
match (request_method.clone(), request_uri) {
(Method::POST, "/") => {
if false {
println!("{:?}", processed_result);
}
record_monitoring_metrics_api_before(
request_uri,
"unknown",
"post",
);
let bytes = body::to_bytes(body).await.unwrap();
let response_str =
format!("valid POST uri=/ data size={} bytes", bytes.len());
processed_result = Ok(Response::new(Body::from(response_str)));
record_monitoring_metrics_api_after(
request_uri,
"unknown",
"post",
processed_result,
)
}
(Method::POST, "/user") => {
record_monitoring_metrics_api_before(request_uri, "user", "post");
let bytes = body::to_bytes(body).await.unwrap();
processed_result = create_user(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"user",
"post",
processed_result,
)
}
(Method::DELETE, "/user") => {
record_monitoring_metrics_api_before(request_uri, "user", "delete");
let bytes = body::to_bytes(body).await.unwrap();
processed_result = delete_user(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"user",
"delete",
processed_result,
)
}
(Method::PUT, "/user") => {
record_monitoring_metrics_api_before(request_uri, "user", "put");
let bytes = body::to_bytes(body).await.unwrap();
processed_result = update_user(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"user",
"put",
processed_result,
)
}
(Method::POST, "/user/search") => {
record_monitoring_metrics_api_before(request_uri, "user", "search");
let bytes = body::to_bytes(body).await.unwrap();
processed_result = search_users(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"user",
"search",
processed_result,
)
}
(Method::POST, "/user/data") => {
record_monitoring_metrics_api_before(request_uri, "data", "upload");
processed_result = upload_user_data(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
body,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"data",
"upload",
processed_result,
)
}
(Method::PUT, "/user/data") => {
record_monitoring_metrics_api_before(request_uri, "data", "put");
let bytes = body::to_bytes(body).await.unwrap();
processed_result = update_user_data(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"data",
"put",
processed_result,
)
}
(Method::POST, "/user/data/search") => {
record_monitoring_metrics_api_before(request_uri, "data", "search");
let bytes = body::to_bytes(body).await.unwrap();
processed_result = search_user_data(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"data",
"search",
processed_result,
)
}
(Method::POST, "/user/password/reset") => {
record_monitoring_metrics_api_before(
request_uri,
"user",
"create_otp",
);
let bytes = body::to_bytes(body).await.unwrap();
processed_result = create_otp(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"user",
"create_otp",
processed_result,
)
}
(Method::POST, "/user/password/change") => {
record_monitoring_metrics_api_before(
request_uri,
"user",
"consume_otp",
);
let bytes = body::to_bytes(body).await.unwrap();
processed_result = consume_user_otp(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"user",
"consume_otp",
processed_result,
)
}
(Method::POST, "/login") => {
record_monitoring_metrics_api_before(request_uri, "auth", "login");
let bytes = body::to_bytes(body).await.unwrap();
processed_result = login_user(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&bytes,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"auth",
"login",
processed_result,
)
}
(Method::GET, "/metrics") => handle_showing_metrics(),
(Method::GET, "/favicon.ico") => {
let body = Body::from("no favicon.ico".to_string());
processed_result = Ok(Response::new(body));
processed_result
}
_ => {
if request_method == Method::GET
&& request_uri.contains("/user/verify")
{
record_monitoring_metrics_api_before(
request_uri,
"user",
"consume_verify",
);
let request_query_params = parts.uri.query().unwrap_or("");
let full_url = format!(
"https://{}{request_uri}?{request_query_params}",
get_server_address("api")
);
processed_result = verify_user(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&full_url,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"user",
"consume_verify",
processed_result,
)
}
else if request_method == Method::GET
&& request_uri.contains("/user/")
{
record_monitoring_metrics_api_before(
request_uri,
"user",
"get",
);
processed_result = get_user(
&tracking_label,
&data.config,
&data.db_pool,
&data.kafka_pool,
&parts.headers,
request_uri,
)
.await;
record_monitoring_metrics_api_after(
request_uri,
"user",
"get",
processed_result,
)
}
else {
record_monitoring_metrics_api_before(
request_uri,
"unknown",
"get",
);
let reason = format!(
"unsupported method and uri \
https://{}{request_uri} \
method={request_method}",
data.config.server_address
);
let err_msg =
format!("{{\"status\":400,\"reason\":\"{}\"}}", reason);
error!("{}", err_msg);
let body = Body::from(err_msg);
processed_result = Ok(Response::new(body));
record_monitoring_metrics_api_after(
request_uri,
"unknown",
"get",
processed_result,
)
}
}
}
}