#[allow(deprecated)]
use actix_web::HttpRequest;
use actix_web::web::{Data, Json};
use actix_web::{HttpResponse, post};
use serde_json::{Value, json};
use std::time::Instant;
use tracing::{error, info};
use crate::AppState;
use crate::api::cache::check::{
CacheLookupOutcome, check_cache_control_and_get_response_v2_with_outcome,
};
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{authorize_gateway_request, read_right_for_resource};
use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_FETCH, GatewayDeferredRequest, GatewayFetchRequest,
};
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::headers::x_athena_client::x_athena_client;
#[cfg(feature = "deadpool_experimental")]
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::headers::x_strip_nulls::get_x_strip_nulls;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::api::response::api_accepted;
use crate::parser::query_builder::Condition;
use crate::utils::format::normalize_rows;
use crate::utils::request_logging::{
LoggedRequest, RequestCompletionLogContext, finalize_request_log, log_operation_event,
log_request,
};
use super::conditions::{RequestCondition, to_query_conditions};
use super::execution::execute_gateway_fetch_data;
use super::request::{
build_fetch_hashed_cache_key, build_fetch_hashed_cache_key_legacy8,
parse_gateway_fetch_conditions, parse_sort_options_from_body,
};
use super::response::{missing_client_header_response, respond_fetch_err, respond_fetch_ok};
use super::singleflight::{
SingleflightRole, acquire_fetch_singleflight, publish_fetch_singleflight_result,
wait_for_fetch_singleflight_result,
};
use super::types::{PostProcessingConfig, SortOptions};
fn cache_source_from_outcome(outcome: CacheLookupOutcome) -> &'static str {
match outcome {
CacheLookupOutcome::HitLocalRaw | CacheLookupOutcome::HitLocal => "local",
CacheLookupOutcome::HitRedis => "redis",
CacheLookupOutcome::BypassNoCacheHeader => "bypass",
CacheLookupOutcome::MissAllTiers
| CacheLookupOutcome::MissAfterRedisGetError
| CacheLookupOutcome::MissAfterRedisGetTimeout => "database",
}
}
pub(crate) async fn handle_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response();
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let auto_cast_uuid_filter_values_to_text =
app_state.gateway_auto_cast_uuid_filter_values_to_text;
let table_name: String;
let mut current_page: i64 = 1;
let mut page_size: i64 = 100;
let mut offset: i64 = 0;
let parsed_fetch_body: Option<GatewayFetchRequest> = body
.as_ref()
.map(|json_body| GatewayFetchRequest::from_body(json_body, force_camel_case_to_snake_case));
let mut conditions: Vec<RequestCondition> = vec![];
let apikey: Option<String> = req
.headers()
.get("x-athena-key")
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string())
.filter(|key| key == &std::env::var("SUITSBOOKS_API_ADMIN_KEY").unwrap_or_default());
let user_id: String = get_x_user_id(&req)
.or_else(|| apikey.clone())
.unwrap_or_default();
let limit: i64 = parsed_fetch_body
.as_ref()
.and_then(|request| request.limit)
.unwrap_or(page_size);
let mut columns_vec: Vec<String> = parsed_fetch_body
.as_ref()
.map(|request| request.columns.clone())
.unwrap_or_else(|| vec!["*".to_string()]);
if columns_vec.is_empty() {
columns_vec.push("*".to_string());
}
let strip_nulls: bool = match get_x_strip_nulls(&req) {
Some(value) => value.to_lowercase() == "true",
None => false,
};
let post_processing_config: PostProcessingConfig =
PostProcessingConfig::from_body(body.as_deref(), force_camel_case_to_snake_case);
let sort_options: Option<SortOptions> =
parse_sort_options_from_body(body.as_deref(), force_camel_case_to_snake_case);
if let Some(parsed_fetch) = parsed_fetch_body.as_ref() {
if let Some(page) = parsed_fetch.current_page {
current_page = page;
}
if let Some(size) = parsed_fetch.page_size {
page_size = size;
}
if let Some(off) = parsed_fetch.offset {
offset = off;
}
}
if let Some(ref json_body) = body {
table_name = parsed_fetch_body
.as_ref()
.map(|request| request.table_name.clone())
.unwrap_or_default();
conditions = match parse_gateway_fetch_conditions(json_body, force_camel_case_to_snake_case)
{
Ok(c) => c,
Err(resp) => return resp,
};
} else {
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
if auth.force_deferred_queue {
if table_name.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
let Some(request_body) = body.as_ref().map(|json_body| json_body.0.clone()) else {
return HttpResponse::BadRequest().json(json!({
"error": "request body is required for deferred /gateway/fetch execution"
}));
};
let request_bytes: Option<u64> = req
.headers()
.get(actix_web::http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
let deferred_request = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_FETCH,
auth.request_id.clone(),
client_name.clone(),
request_body,
)
.with_reason(auth.force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
if let Err(err) = enqueue_gateway_deferred_request(
app_state.get_ref(),
"POST",
req.path(),
request_bytes,
&deferred_request,
)
.await
{
return HttpResponse::ServiceUnavailable().json(json!({
"status": "error",
"code": "deferred_enqueue_unavailable",
"message": "Deferred queue unavailable",
"error": format!("Failed to queue deferred fetch request: {err}"),
}));
}
return api_accepted(
"Fetch request queued for deferred execution (auth fallback mode)",
json!({
"request_id": auth.request_id,
"status": "queued",
"route": req.path(),
}),
);
}
#[cfg(feature = "deadpool_experimental")]
let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id));
#[cfg(not(feature = "deadpool_experimental"))]
let deadpool_requested = false;
if table_name.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
let hashed_cache_key: String = build_fetch_hashed_cache_key(
&table_name,
&conditions,
&columns_vec,
limit,
strip_nulls,
&client_name,
sort_options.as_ref(),
);
let legacy_hashed_cache_key: String = build_fetch_hashed_cache_key_legacy8(
&table_name,
&conditions,
&columns_vec,
limit,
strip_nulls,
&client_name,
sort_options.as_ref(),
);
let (cache_result, cache_outcome): (Option<HttpResponse>, CacheLookupOutcome) =
check_cache_control_and_get_response_v2_with_outcome(
&req,
app_state.clone(),
&hashed_cache_key,
"gateway_fetch_cache_lookup",
)
.await;
match cache_result {
Some(mut cached_response) => {
let cache_source = cache_source_from_outcome(cache_outcome);
cached_response.headers_mut().insert(
"X-Athena-Cache-Outcome".parse().unwrap(),
cache_outcome.as_str().parse().unwrap(),
);
cached_response.headers_mut().insert(
"X-Athena-Cache-Source".parse().unwrap(),
cache_source.parse().unwrap(),
);
cached_response
.headers_mut()
.insert("X-Athena-Cached".parse().unwrap(), "true".parse().unwrap());
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch",
Some(&table_name),
operation_start.elapsed().as_millis(),
cached_response.status(),
Some(json!({
"cache_key": hashed_cache_key,
"cached": true,
"cache_source": cache_source,
"cache_lookup_outcome": cache_outcome.as_str(),
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(cached_response.status().as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(true),
cache_lookup_outcome: Some(cache_outcome.as_str().to_string()),
cache_source: Some(cache_source.to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.clone()),
},
);
return cached_response;
}
None => {
if legacy_hashed_cache_key != hashed_cache_key {
let (legacy_cache_result, legacy_cache_outcome): (
Option<HttpResponse>,
CacheLookupOutcome,
) = check_cache_control_and_get_response_v2_with_outcome(
&req,
app_state.clone(),
&legacy_hashed_cache_key,
"gateway_fetch_cache_lookup",
)
.await;
if let Some(mut cached_response) = legacy_cache_result {
let cache_source = cache_source_from_outcome(legacy_cache_outcome);
cached_response.headers_mut().insert(
"X-Athena-Cache-Outcome".parse().unwrap(),
legacy_cache_outcome.as_str().parse().unwrap(),
);
cached_response.headers_mut().insert(
"X-Athena-Cache-Source".parse().unwrap(),
cache_source.parse().unwrap(),
);
cached_response
.headers_mut()
.insert("X-Athena-Cached".parse().unwrap(), "true".parse().unwrap());
info!(
cache_key = %hashed_cache_key,
legacy_cache_key = %legacy_hashed_cache_key,
outcome = %legacy_cache_outcome.as_str(),
"cache hit via legacy hashed key (POST gateway fetch)"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch",
Some(&table_name),
operation_start.elapsed().as_millis(),
cached_response.status(),
Some(json!({
"cache_key": legacy_hashed_cache_key,
"cached": true,
"cache_source": cache_source,
"cache_lookup_outcome": legacy_cache_outcome.as_str(),
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(cached_response.status().as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(true),
cache_lookup_outcome: Some(legacy_cache_outcome.as_str().to_string()),
cache_source: Some(cache_source.to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.clone()),
},
);
return cached_response;
}
}
info!(
cache_key = %hashed_cache_key,
legacy_cache_key = %legacy_hashed_cache_key,
outcome = %cache_outcome.as_str(),
"cache miss (POST gateway fetch)"
);
}
}
let conditions_json: Vec<Value> = conditions
.iter()
.map(|c| {
json!({
"eq_column": c.eq_column,
"eq_value": c.eq_value.clone()
})
})
.collect();
let columns_refs: Vec<&str> = columns_vec.iter().map(|s| s.as_str()).collect();
let pg_conditions: Vec<Condition> = to_query_conditions(
&conditions[..],
force_camel_case_to_snake_case,
auto_cast_uuid_filter_values_to_text,
);
let page_offset: i64 = if current_page < 1 { 1 } else { current_page };
let calculated_offset: i64 = (page_offset - 1) * page_size + offset;
let singleflight_started_at = Instant::now();
let singleflight_role = acquire_fetch_singleflight(&hashed_cache_key).await;
let data_result: Result<Vec<Value>, String> = match singleflight_role {
SingleflightRole::Leader(flight) => {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"leader",
singleflight_started_at.elapsed().as_secs_f64(),
);
let leader_result = execute_gateway_fetch_data(
app_state.get_ref(),
&req,
&auth.request_id,
&client_name,
&table_name,
columns_refs.clone(),
&pg_conditions,
conditions_json,
limit,
current_page,
page_size,
offset,
calculated_offset,
sort_options.as_ref(),
deadpool_requested,
)
.await;
publish_fetch_singleflight_result(&hashed_cache_key, flight, leader_result.clone())
.await;
leader_result
}
SingleflightRole::Follower(flight) => {
let follower_result: Option<Result<Vec<Value>, String>> =
wait_for_fetch_singleflight_result(flight).await;
if let Some(shared_result) = follower_result {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"follower_shared",
singleflight_started_at.elapsed().as_secs_f64(),
);
shared_result
} else {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"follower_timeout_fallback",
singleflight_started_at.elapsed().as_secs_f64(),
);
execute_gateway_fetch_data(
app_state.get_ref(),
&req,
&auth.request_id,
&client_name,
&table_name,
columns_refs,
&pg_conditions,
conditions_json,
limit,
current_page,
page_size,
offset,
calculated_offset,
sort_options.as_ref(),
deadpool_requested,
)
.await
}
}
};
if let Ok(data) = &data_result {
let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": normalized_rows.clone()})],
)
.await;
} else {
error!("Failed to rehydrate cache due to data fetch error");
}
match data_result {
Ok(data) => {
respond_fetch_ok(
app_state.clone(),
&data,
&hashed_cache_key,
strip_nulls,
&post_processing_config,
force_camel_case_to_snake_case,
&table_name,
&logged_request,
operation_start,
cache_outcome,
)
.await
}
Err(err) => respond_fetch_err(
err,
app_state.clone(),
&table_name,
&client_name,
&user_id,
&hashed_cache_key,
start_time,
operation_start,
&logged_request,
cache_outcome,
),
}
}
#[post("/gateway/data")]
pub async fn fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
#[post("/gateway/fetch")]
pub async fn proxy_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}