use actix_web::HttpRequest;
use actix_web::http::StatusCode;
use actix_web::web;
use actix_web::web::Data;
use actix_web::{HttpResponse, Responder, get};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::time::Instant;
use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{
GatewayAuthOutcome, authorize_gateway_request, read_right_for_resource,
};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::utils::format::normalize_columns_csv;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
use super::response::missing_client_header_response;
#[get("/data")]
pub async fn get_data_route(
req: HttpRequest,
query: web::Query<HashMap<String, String>>,
app_state: Data<AppState>,
) -> impl Responder {
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response();
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let view: String = match query.get("view") {
Some(v) => v.clone(),
None => {
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: view"
}));
}
};
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&view))],
)
.await;
if let Some(resp) = auth.response {
return resp;
}
let eq_column_raw: String = match query.get("eq_column") {
Some(c) => c.clone(),
None => {
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: eq_column"
}));
}
};
let eq_column: String = eq_column_raw.clone();
let eq_value: String = match query.get("eq_value") {
Some(v) => v.clone(),
None => {
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: eq_value"
}));
}
};
let columns: Option<String> = query.get("columns").cloned();
let limit: Option<i32> = query.get("limit").and_then(|l| l.parse::<i32>().ok());
let current_page: Option<i32> = query
.get("current_page")
.and_then(|p| p.parse::<i32>().ok());
let page_size: Option<i32> = query.get("page_size").and_then(|s| s.parse::<i32>().ok());
let offset: Option<i32> = query.get("offset").and_then(|o| o.parse::<i32>().ok());
let total_pages: Option<i32> = query.get("total_pages").and_then(|t| t.parse::<i32>().ok());
let strip_nulls: bool = query
.get("strip_nulls")
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or(false);
let sort_by: Option<String> = query.get("sort_by").cloned();
let sort_direction: Option<String> = query.get("sort_direction").cloned();
let normalized_columns_param: Option<String> = columns
.as_ref()
.map(|cols| normalize_columns_csv(cols, force_camel_case_to_snake_case));
let legacy_cache_key: String = format!(
"get_data_route:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}",
view,
eq_column,
eq_value,
normalized_columns_param.as_deref().unwrap_or(""),
limit.unwrap_or(0),
current_page.unwrap_or(1),
page_size.unwrap_or(100),
offset.unwrap_or(0),
total_pages.unwrap_or(1),
strip_nulls,
sort_by.as_deref().unwrap_or(""),
sort_direction.as_deref().unwrap_or(""),
client_name
);
let get_hash_input: Value = json!({
"view": view,
"eq_column": eq_column,
"eq_value": eq_value,
"columns": normalized_columns_param.clone(),
"limit": limit,
"current_page": current_page,
"page_size": page_size,
"offset": offset,
"total_pages": total_pages,
"strip_nulls": strip_nulls,
"sort_by": sort_by,
"sort_direction": sort_direction,
"client": client_name,
});
let get_hash_str: String =
sha256::digest(serde_json::to_string(&get_hash_input).unwrap_or_default());
let get_short_hash: &str = &get_hash_str[..16.min(get_hash_str.len())];
let hashed_cache_key: String = format!("{}-h{}", legacy_cache_key, get_short_hash);
let legacy_hashed_cache_key: String = format!("{}-h{}", legacy_cache_key, &get_hash_str[..8]);
let cache_result_hashed: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;
if let Some(cached_response) = cache_result_hashed {
return cached_response;
}
if legacy_hashed_cache_key != hashed_cache_key {
let cache_result_legacy_hashed: Option<HttpResponse> =
check_cache_control_and_get_response_v2(
&req,
app_state.clone(),
&legacy_hashed_cache_key,
)
.await;
if let Some(cached_response) = cache_result_legacy_hashed {
tracing::info!(
cache_key = %hashed_cache_key,
legacy_hashed_cache_key = %legacy_hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
"cache hit (GET, legacy hashed)"
);
return cached_response;
}
}
let cache_result_legacy: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &legacy_cache_key).await;
if let Some(cached_response) = cache_result_legacy {
tracing::info!(cache_key = %legacy_cache_key, duration_ms = %start_time.elapsed().as_millis(), "cache hit (GET, legacy)");
return cached_response;
}
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let condition: Value = json!({
"eq_column": eq_column,
"eq_value": eq_value
});
let mut request_body: Value = json!({
"table_name": view,
"conditions": [condition]
});
if let Some(cols) = normalized_columns_param.clone() {
request_body["columns"] = json!(cols);
}
if let Some(l) = limit {
request_body["limit"] = json!(l);
}
if let Some(cp) = current_page {
request_body["current_page"] = json!(cp);
}
if let Some(ps) = page_size {
request_body["page_size"] = json!(ps);
}
if let Some(o) = offset {
request_body["offset"] = json!(o);
}
if let Some(tp) = total_pages {
request_body["total_pages"] = json!(tp);
}
if strip_nulls {
request_body["strip_nulls"] = json!(strip_nulls);
}
if let Some(ref col) = sort_by
&& !col.is_empty()
{
let dir: String = sort_direction.as_deref().unwrap_or("asc").to_lowercase();
let direction: &str = if dir == "desc" || dir == "descending" {
"desc"
} else {
"asc"
};
request_body["sortBy"] = json!({ "field": col, "direction": direction });
}
let fetch_response: HttpResponse = super::routes::handle_fetch_data_route(
req,
Some(web::Json(request_body)),
app_state.clone(),
)
.await;
if fetch_response.status().is_success() {
let body_bytes: web::Bytes = actix_web::body::to_bytes(fetch_response.into_body())
.await
.unwrap_or_default();
if let Ok(parsed) = serde_json::from_slice::<Value>(&body_bytes) {
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": parsed.clone()})],
)
.await;
hydrate_cache_and_return_json(
app_state.clone(),
legacy_cache_key.clone(),
vec![json!({"data": parsed.clone()})],
)
.await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch_get",
Some(&view),
start_time.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"legacy_cache_key": legacy_cache_key,
"hash": hashed_cache_key
})),
);
return HttpResponse::Ok().json(parsed);
}
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch_get",
Some(&view),
start_time.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({ "error": "fetch delegation failed" })),
);
HttpResponse::InternalServerError().json(json!({
"error": "Failed to process request"
}))
}