#[allow(deprecated)]
use actix_web::HttpRequest;
use actix_web::web::{Data, Json};
use actix_web::{HttpResponse, post};
use serde_json::{Value, json};
use std::time::Instant;
use tracing::{error, info};
use crate::AppState;
use crate::api::cache::check::{
CacheLookupOutcome, check_cache_control_and_get_response_v2_with_outcome,
};
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{authorize_gateway_request, read_right_for_resource};
use crate::api::headers::x_athena_client::x_athena_client;
#[cfg(feature = "deadpool_experimental")]
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::headers::x_strip_nulls::get_x_strip_nulls;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::parser::query_builder::Condition;
use crate::utils::format::{normalize_column_name, normalize_rows};
use crate::utils::request_logging::{LoggedRequest, log_request};
use super::conditions::{RequestCondition, to_query_conditions};
use super::execution::execute_gateway_fetch_data;
use super::request::{
build_fetch_hashed_cache_key, build_fetch_hashed_cache_key_legacy8,
parse_gateway_fetch_conditions, parse_sort_options_from_body,
};
use super::response::{missing_client_header_response, respond_fetch_err, respond_fetch_ok};
use super::singleflight::{
SingleflightRole, acquire_fetch_singleflight, publish_fetch_singleflight_result,
wait_for_fetch_singleflight_result,
};
use super::types::{PostProcessingConfig, SortOptions};
pub(crate) async fn handle_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response();
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let auto_cast_uuid_filter_values_to_text =
app_state.gateway_auto_cast_uuid_filter_values_to_text;
let mut table_name: String = String::new();
let mut current_page: i64 = 1;
let mut page_size: i64 = 100;
let mut offset: i64 = 0;
let mut conditions: Vec<RequestCondition> = vec![];
let apikey: Option<String> = req
.headers()
.get("x-athena-key")
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string())
.filter(|key| key == &std::env::var("SUITSBOOKS_API_ADMIN_KEY").unwrap_or_default());
let user_id: String = get_x_user_id(&req)
.or_else(|| apikey.clone())
.unwrap_or_default();
let limit: i64 = match &body {
Some(json_body) => json_body
.get("limit")
.and_then(Value::as_u64)
.unwrap_or(page_size as u64) as i64,
None => page_size,
};
let mut columns_vec: Vec<String> = vec![];
if let Some(ref json_body) = body
&& let Some(cols_val) = json_body.get("columns")
{
if let Some(arr) = cols_val.as_array() {
columns_vec = arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
} else if let Some(s) = cols_val.as_str() {
columns_vec = s
.split(',')
.map(|p| p.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
}
}
if columns_vec.is_empty() {
columns_vec.push("*".to_string());
}
if force_camel_case_to_snake_case {
columns_vec = columns_vec
.into_iter()
.map(|col| normalize_column_name(&col, true))
.collect();
}
let strip_nulls: bool = match get_x_strip_nulls(&req) {
Some(value) => value.to_lowercase() == "true",
None => false,
};
let post_processing_config: PostProcessingConfig =
PostProcessingConfig::from_body(body.as_deref(), force_camel_case_to_snake_case);
let sort_options: Option<SortOptions> =
parse_sort_options_from_body(body.as_deref(), force_camel_case_to_snake_case);
if let Some(ref json_body) = body {
if let Some(page) = json_body.get("current_page").and_then(Value::as_u64) {
current_page = page as i64;
}
if let Some(size) = json_body.get("page_size").and_then(Value::as_u64) {
page_size = size as i64;
}
if let Some(off) = json_body.get("offset").and_then(Value::as_u64) {
offset = off as i64;
}
}
if let Some(ref json_body) = body {
if let Some(name) = json_body.get("table_name").and_then(Value::as_str) {
table_name = name.to_string();
}
conditions = match parse_gateway_fetch_conditions(json_body, force_camel_case_to_snake_case)
{
Ok(c) => c,
Err(resp) => return resp,
};
} else {
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
#[cfg(feature = "deadpool_experimental")]
let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id));
#[cfg(not(feature = "deadpool_experimental"))]
let deadpool_requested = false;
if table_name.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
let hashed_cache_key: String = build_fetch_hashed_cache_key(
&table_name,
&conditions,
&columns_vec,
limit,
strip_nulls,
&client_name,
sort_options.as_ref(),
);
let legacy_hashed_cache_key: String = build_fetch_hashed_cache_key_legacy8(
&table_name,
&conditions,
&columns_vec,
limit,
strip_nulls,
&client_name,
sort_options.as_ref(),
);
let (cache_result, cache_outcome): (Option<HttpResponse>, CacheLookupOutcome) =
check_cache_control_and_get_response_v2_with_outcome(
&req,
app_state.clone(),
&hashed_cache_key,
)
.await;
match cache_result {
Some(cached_response) => {
return cached_response;
}
None => {
if legacy_hashed_cache_key != hashed_cache_key {
let (legacy_cache_result, legacy_cache_outcome): (
Option<HttpResponse>,
CacheLookupOutcome,
) = check_cache_control_and_get_response_v2_with_outcome(
&req,
app_state.clone(),
&legacy_hashed_cache_key,
)
.await;
if let Some(cached_response) = legacy_cache_result {
info!(
cache_key = %hashed_cache_key,
legacy_cache_key = %legacy_hashed_cache_key,
outcome = %legacy_cache_outcome.as_str(),
"cache hit via legacy hashed key (POST gateway fetch)"
);
return cached_response;
}
}
info!(
cache_key = %hashed_cache_key,
legacy_cache_key = %legacy_hashed_cache_key,
outcome = %cache_outcome.as_str(),
"cache miss (POST gateway fetch)"
);
}
}
let conditions_json: Vec<Value> = conditions
.iter()
.map(|c| {
json!({
"eq_column": c.eq_column,
"eq_value": c.eq_value.clone()
})
})
.collect();
let columns_refs: Vec<&str> = columns_vec.iter().map(|s| s.as_str()).collect();
let pg_conditions: Vec<Condition> = to_query_conditions(
&conditions[..],
force_camel_case_to_snake_case,
auto_cast_uuid_filter_values_to_text,
);
let page_offset: i64 = if current_page < 1 { 1 } else { current_page };
let calculated_offset: i64 = (page_offset - 1) * page_size + offset;
let singleflight_started_at = Instant::now();
let singleflight_role = acquire_fetch_singleflight(&hashed_cache_key).await;
let data_result: Result<Vec<Value>, String> = match singleflight_role {
SingleflightRole::Leader(flight) => {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"leader",
singleflight_started_at.elapsed().as_secs_f64(),
);
let leader_result = execute_gateway_fetch_data(
app_state.get_ref(),
&req,
&auth.request_id,
&client_name,
&table_name,
columns_refs.clone(),
&pg_conditions,
conditions_json,
limit,
current_page,
page_size,
offset,
calculated_offset,
sort_options.as_ref(),
deadpool_requested,
)
.await;
publish_fetch_singleflight_result(&hashed_cache_key, flight, leader_result.clone())
.await;
leader_result
}
SingleflightRole::Follower(flight) => {
let follower_result: Option<Result<Vec<Value>, String>> =
wait_for_fetch_singleflight_result(flight).await;
if let Some(shared_result) = follower_result {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"follower_shared",
singleflight_started_at.elapsed().as_secs_f64(),
);
shared_result
} else {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"follower_timeout_fallback",
singleflight_started_at.elapsed().as_secs_f64(),
);
execute_gateway_fetch_data(
app_state.get_ref(),
&req,
&auth.request_id,
&client_name,
&table_name,
columns_refs,
&pg_conditions,
conditions_json,
limit,
current_page,
page_size,
offset,
calculated_offset,
sort_options.as_ref(),
deadpool_requested,
)
.await
}
}
};
if let Ok(data) = &data_result {
let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": normalized_rows.clone()})],
)
.await;
} else {
error!("Failed to rehydrate cache due to data fetch error");
}
match data_result {
Ok(data) => {
respond_fetch_ok(
app_state.clone(),
&data,
&hashed_cache_key,
strip_nulls,
&post_processing_config,
force_camel_case_to_snake_case,
&table_name,
&logged_request,
operation_start,
)
.await
}
Err(err) => respond_fetch_err(
err,
app_state.clone(),
&table_name,
&client_name,
&user_id,
&hashed_cache_key,
start_time,
operation_start,
&logged_request,
),
}
}
#[post("/gateway/data")]
pub async fn fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
#[post("/gateway/fetch")]
pub async fn proxy_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}