#[allow(deprecated)]
use actix_web::HttpRequest;
use actix_web::web::{Data, Json};
use actix_web::{HttpResponse, post};
use serde_json::{Value, json};
use std::time::Instant;
use tracing::{error, info};
use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{authorize_gateway_request, read_right_for_resource};
use crate::api::headers::x_athena_client::x_athena_client;
#[cfg(feature = "deadpool_experimental")]
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::headers::x_strip_nulls::get_x_strip_nulls;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::parser::query_builder::Condition;
use crate::utils::format::{normalize_column_name, normalize_rows};
use crate::utils::request_logging::{LoggedRequest, log_request};
use super::conditions::{RequestCondition, to_query_conditions};
use super::execution::execute_gateway_fetch_data;
use super::request::{
build_fetch_hashed_cache_key, parse_gateway_fetch_conditions, parse_sort_options_from_body,
};
use super::response::{missing_client_header_response, respond_fetch_err, respond_fetch_ok};
use super::types::{PostProcessingConfig, SortOptions};
pub(crate) async fn handle_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response();
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let auto_cast_uuid_filter_values_to_text =
app_state.gateway_auto_cast_uuid_filter_values_to_text;
let mut table_name: String = String::new();
let mut current_page: i64 = 1;
let mut page_size: i64 = 100;
let mut offset: i64 = 0;
let mut conditions: Vec<RequestCondition> = vec![];
let apikey: Option<String> = req
.headers()
.get("x-athena-key")
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string())
.filter(|key| key == &std::env::var("SUITSBOOKS_API_ADMIN_KEY").unwrap_or_default());
let user_id: String = get_x_user_id(&req)
.or_else(|| apikey.clone())
.unwrap_or_default();
let limit: i64 = match &body {
Some(json_body) => json_body
.get("limit")
.and_then(Value::as_u64)
.unwrap_or(page_size as u64) as i64,
None => page_size,
};
let mut columns_vec: Vec<String> = vec![];
if let Some(ref json_body) = body
&& let Some(cols_val) = json_body.get("columns")
{
if let Some(arr) = cols_val.as_array() {
columns_vec = arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
} else if let Some(s) = cols_val.as_str() {
columns_vec = s
.split(',')
.map(|p| p.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
}
}
if columns_vec.is_empty() {
columns_vec.push("*".to_string());
}
if force_camel_case_to_snake_case {
columns_vec = columns_vec
.into_iter()
.map(|col| normalize_column_name(&col, true))
.collect();
}
let strip_nulls: bool = match get_x_strip_nulls(&req) {
Some(value) => value.to_lowercase() == "true",
None => false,
};
let post_processing_config: PostProcessingConfig =
PostProcessingConfig::from_body(body.as_deref(), force_camel_case_to_snake_case);
let sort_options: Option<SortOptions> =
parse_sort_options_from_body(body.as_deref(), force_camel_case_to_snake_case);
if let Some(ref json_body) = body {
if let Some(page) = json_body.get("current_page").and_then(Value::as_u64) {
current_page = page as i64;
}
if let Some(size) = json_body.get("page_size").and_then(Value::as_u64) {
page_size = size as i64;
}
if let Some(off) = json_body.get("offset").and_then(Value::as_u64) {
offset = off as i64;
}
}
if let Some(ref json_body) = body {
if let Some(name) = json_body.get("table_name").and_then(Value::as_str) {
table_name = name.to_string();
}
conditions = match parse_gateway_fetch_conditions(json_body, force_camel_case_to_snake_case)
{
Ok(c) => c,
Err(resp) => return resp,
};
} else {
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
#[cfg(feature = "deadpool_experimental")]
let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id));
#[cfg(not(feature = "deadpool_experimental"))]
let deadpool_requested = false;
if table_name.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
let hashed_cache_key: String = build_fetch_hashed_cache_key(
&table_name,
&conditions,
&columns_vec,
limit,
strip_nulls,
&client_name,
sort_options.as_ref(),
);
let cache_result: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;
match cache_result {
Some(cached_response) => {
return cached_response;
}
None => {
info!(cache_key = %hashed_cache_key, "cache miss (POST gateway fetch)");
}
}
let conditions_json: Vec<Value> = conditions
.iter()
.map(|c| {
json!({
"eq_column": c.eq_column,
"eq_value": c.eq_value.clone()
})
})
.collect();
let columns_refs: Vec<&str> = columns_vec.iter().map(|s| s.as_str()).collect();
let pg_conditions: Vec<Condition> = to_query_conditions(
&conditions[..],
force_camel_case_to_snake_case,
auto_cast_uuid_filter_values_to_text,
);
let page_offset: i64 = if current_page < 1 { 1 } else { current_page };
let calculated_offset: i64 = (page_offset - 1) * page_size + offset;
let data_result: Result<Vec<Value>, String> = execute_gateway_fetch_data(
app_state.get_ref(),
&req,
&auth.request_id,
&client_name,
&table_name,
columns_refs,
&pg_conditions,
conditions_json,
limit,
current_page,
page_size,
offset,
calculated_offset,
sort_options.as_ref(),
deadpool_requested,
)
.await;
if let Ok(data) = &data_result {
let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": normalized_rows.clone()})],
)
.await;
} else {
error!("Failed to rehydrate cache due to data fetch error");
}
match data_result {
Ok(data) => {
respond_fetch_ok(
app_state.clone(),
&data,
&hashed_cache_key,
strip_nulls,
&post_processing_config,
force_camel_case_to_snake_case,
&table_name,
&logged_request,
operation_start,
)
.await
}
Err(err) => respond_fetch_err(
err,
app_state.clone(),
&table_name,
&client_name,
&user_id,
&hashed_cache_key,
start_time,
operation_start,
&logged_request,
),
}
}
#[post("/gateway/data")]
pub async fn fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
#[post("/gateway/fetch")]
pub async fn proxy_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}