use std::convert::Infallible;
use postgres_native_tls::MakeTlsConnector;
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use hyper::body;
use hyper::header::HeaderValue;
use hyper::Body;
use hyper::HeaderMap;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use kafka_threadpool::kafka_publisher::KafkaPublisher;
use crate::core::core_config::CoreConfig;
use crate::is3::s3_upload_buffer::s3_upload_buffer;
use crate::kafka::publish_msg::publish_msg;
use crate::requests::auth::validate_user_token::validate_user_token;
use crate::utils::get_uuid::get_uuid;
#[derive(Serialize, Deserialize, Clone)]
pub struct ApiReqUserUploadData {
pub data: Vec<u8>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct ApiResUserUploadData {
pub user_id: i32,
pub data_id: i32,
pub filename: String,
pub data_type: String,
pub size_in_bytes: i64,
pub comments: String,
pub encoding: String,
pub sloc: String,
pub msg: String,
}
pub async fn upload_user_data(
tracking_label: &str,
config: &CoreConfig,
db_pool: &Pool<PostgresConnectionManager<MakeTlsConnector>>,
kafka_pool: &KafkaPublisher,
headers: &HeaderMap<HeaderValue>,
body: hyper::Body,
) -> std::result::Result<Response<Body>, Infallible> {
if !headers.contains_key("user_id") {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(
&ApiResUserUploadData {
user_id: -1,
data_id: -1,
filename: "".to_string(),
data_type: "".to_string(),
size_in_bytes: 0,
comments: "".to_string(),
encoding: "".to_string(),
sloc: "".to_string(),
msg: (
"Missing required header 'user_id' key (i.e. curl -H 'user_id: INT'"
).to_string(),
}
).unwrap()))
.unwrap();
return Ok(response);
}
let user_id_str = headers.get("user_id").unwrap().to_str().unwrap();
let user_id: i32 = match user_id_str.parse::<i32>() {
Ok(user_id) => user_id,
Err(_) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(
&ApiResUserUploadData {
user_id: -1,
data_id: -1,
filename: "".to_string(),
data_type: "".to_string(),
size_in_bytes: 0,
comments: "".to_string(),
encoding: "".to_string(),
sloc: "".to_string(),
msg: (
"user_id must be a postive number that is the actual user_id for the token"
).to_string(),
}
).unwrap()))
.unwrap();
return Ok(response);
}
};
if !headers.contains_key("filename") {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(
&ApiResUserUploadData {
user_id: -1,
data_id: -1,
filename: "".to_string(),
data_type: "".to_string(),
size_in_bytes: 0,
comments: "".to_string(),
encoding: "".to_string(),
sloc: "".to_string(),
msg: (
"Missing required header 'filename' key (i.e. curl -H 'user_id: INT'"
).to_string(),
}
).unwrap()))
.unwrap();
return Ok(response);
}
let file_name_str = headers.get("filename").unwrap().to_str().unwrap();
let file_name_len = file_name_str.len();
if !(1..=511).contains(&file_name_len) {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(
&ApiResUserUploadData {
user_id: -1,
data_id: -1,
filename: "".to_string(),
data_type: "".to_string(),
size_in_bytes: 0,
comments: "".to_string(),
encoding: "".to_string(),
sloc: "".to_string(),
msg: (
"The header value for 'filename' must be between 1 and 511 characters"
).to_string(),
}
).unwrap()))
.unwrap();
return Ok(response);
}
let encoding = match headers.get("encoding") {
Some(v) => v.to_str().unwrap().to_string(),
None => "na".to_string(),
};
let comments = match headers.get("comments") {
Some(v) => v.to_str().unwrap().to_string(),
None => "file".to_string(),
};
let data_type = match headers.get("data_type") {
Some(v) => v.to_str().unwrap().to_string(),
None => "file".to_string(),
};
let sloc_start = match headers.get("sloc") {
Some(v) => v.to_str().unwrap().to_string(),
None => "".to_string(),
};
let should_upload_to_s3 = match headers.get("s3_enable") {
Some(_) => true,
None => {
std::env::var("S3_DATA_UPLOAD_TO_S3")
.unwrap_or_else(|_| "0".to_string())
== *"0"
}
};
let s3_bucket = std::env::var("S3_DATA_BUCKET")
.unwrap_or_else(|_| "BUCKET_NAME".to_string());
let s3_prefix = std::env::var("S3_DATA_PREFIX")
.unwrap_or_else(|_| "user/data/file".to_string());
let now = chrono::Utc::now();
let now_str = now.format("%Y/%m/%d");
let s3_uuid = get_uuid();
let s3_key_dst = format!(
"{s3_prefix}/\
{user_id}/\
{now_str}/\
{s3_uuid}.{file_name_str}"
);
let sloc = match sloc_start.len() {
0 => {
format!("s3://{s3_bucket}/{s3_key_dst}")
}
_ => sloc_start,
};
{
let conn = db_pool.get().await.unwrap();
let _token = match validate_user_token(
tracking_label,
config,
&conn,
headers,
user_id,
)
.await
{
Ok(_token) => _token,
Err(_) => {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(
&ApiResUserUploadData {
user_id: -1,
data_id: -1,
filename: "".to_string(),
data_type: "".to_string(),
size_in_bytes: 0,
comments: "".to_string(),
encoding: "".to_string(),
sloc: "".to_string(),
msg: ("
User data upload failed due to invalid token"
).to_string(),
}
).unwrap()))
.unwrap();
return Ok(response);
}
};
}
info!("{tracking_label} - receiving user_id={user_id} name={file_name_str} data");
let bytes = body::to_bytes(body).await.unwrap();
let file_contents_size: usize = bytes.len() as usize;
if file_contents_size < 1 {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserUploadData {
user_id: -1,
data_id: -1,
filename: "".to_string(),
data_type: "".to_string(),
size_in_bytes: 0,
comments: "".to_string(),
encoding: "".to_string(),
sloc: "".to_string(),
msg: ("No data uploaded in the body").to_string(),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
let file_contents_size_in_mb: f32 =
file_contents_size as f32 / 1024.0 / 1024.0;
info!(
"{tracking_label} - processing data for user_id={user_id} \
name={file_name_str} \
size={file_contents_size_in_mb:.2}mb \
upload_to_s3={should_upload_to_s3} \
{sloc}"
);
if should_upload_to_s3 {
match s3_upload_buffer(tracking_label, &s3_bucket, &s3_key_dst, &bytes)
.await
{
Ok(good_msg) => {
info!("{good_msg} - done uploading - {sloc}")
}
Err(emsg) => {
info!("{emsg} - failed uploading {sloc}")
}
}
} else {
info!("{tracking_label} - not uploading to s3");
}
let conn = db_pool.get().await.unwrap();
let cur_query = format!(
"INSERT INTO \
users_data (\
user_id, \
filename, \
data_type, \
size_in_bytes, \
comments, \
encoding, \
sloc) \
VALUES (\
{user_id},
'{file_name_str}',
'{data_type}',
{file_contents_size},
'{comments}',
'{encoding}',
'{sloc}') \
RETURNING \
users_data.id,
users_data.user_id,
users_data.filename,
users_data.data_type,
users_data.size_in_bytes,
users_data.comments,
users_data.encoding,
users_data.sloc;"
);
let stmt = conn.prepare(&cur_query).await.unwrap();
let query_result = match conn.query(&stmt, &[]).await {
Ok(query_result) => query_result,
Err(e) => {
let err_msg = format!("{}", e);
let response = Response::builder()
.status(500)
.body(Body::from(
serde_json::to_string(&ApiResUserUploadData {
user_id: -1,
data_id: -1,
filename: "".to_string(),
data_type: "".to_string(),
size_in_bytes: 0,
comments: "".to_string(),
encoding: "".to_string(),
sloc: "".to_string(),
msg: format!(
"User data upload failed for user_id={user_id} \
with err='{err_msg}'"
),
})
.unwrap(),
))
.unwrap();
return Ok(response);
}
};
let mut row_list: Vec<ApiResUserUploadData> = Vec::with_capacity(1);
for row in query_result.iter() {
let found_data_id: i32 = row.try_get("id").unwrap();
let found_user_id: i32 = row.try_get("user_id").unwrap();
let found_filename: String = row.try_get("filename").unwrap();
let found_data_type: String = row.try_get("data_type").unwrap();
let found_size_in_bytes: i64 = row.try_get("size_in_bytes").unwrap();
let found_comments: String = row.try_get("comments").unwrap();
let found_encoding: String = row.try_get("encoding").unwrap();
let found_sloc: String = row.try_get("sloc").unwrap();
row_list.push(ApiResUserUploadData {
user_id: found_user_id,
data_id: found_data_id,
filename: found_filename,
data_type: found_data_type,
size_in_bytes: found_size_in_bytes,
comments: found_comments,
encoding: found_encoding,
sloc: found_sloc,
msg: "success".to_string(),
});
}
if row_list.is_empty() {
let response = Response::builder()
.status(400)
.body(Body::from(
serde_json::to_string(&ApiResUserUploadData {
user_id: -1,
data_id: -1,
filename: "".to_string(),
data_type: "".to_string(),
size_in_bytes: 0,
comments: "".to_string(),
encoding: "".to_string(),
sloc: "".to_string(),
msg: ("no upload data found in db").to_string(),
})
.unwrap(),
))
.unwrap();
Ok(response)
} else {
if config.kafka_publish_events {
publish_msg(
kafka_pool,
"user.events",
&format!("user-{}", user_id),
None,
&format!("UPLOAD_USER_DATA user={user_id}"),
)
.await;
}
let response = Response::builder()
.status(200)
.body(Body::from(serde_json::to_string(&row_list[0]).unwrap()))
.unwrap();
Ok(response)
}
}