use actix_web::HttpRequest;
use actix_web::http::StatusCode;
use actix_web::web;
use actix_web::web::Data;
use actix_web::{HttpResponse, Responder, get};
use athena_gateway::{
GatewayGetFetchCompatibilityError, build_gateway_get_fetch_compatibility_plan,
};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::time::Instant;
use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{
GatewayAuthOutcome, authorize_gateway_request, read_right_for_resource,
};
use crate::api::gateway::contracts::GatewayOperationKind;
use crate::api::gateway::response::{
GATEWAY_ERROR_CODE_FETCH_DELEGATION_FAILED, gateway_bad_request_with_code,
gateway_internal_error_with_code, missing_client_header_response,
};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::utils::cache_key_token::cache_key_client_token;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
#[get("/data")]
pub async fn get_data_route(
req: HttpRequest,
query: web::Query<HashMap<String, String>>,
app_state: Data<AppState>,
) -> impl Responder {
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response(GatewayOperationKind::Fetch);
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let view: String = match query.get("view") {
Some(v) => v.clone(),
None => {
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return gateway_bad_request_with_code(
GatewayGetFetchCompatibilityError::MissingView.code(),
GatewayOperationKind::Fetch,
"Missing required parameter",
"view",
);
}
};
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&view))],
)
.await;
if let Some(resp) = auth.response {
return resp;
}
let compatibility_plan = match build_gateway_get_fetch_compatibility_plan(
&query,
&client_name,
force_camel_case_to_snake_case,
) {
Ok(plan) => plan,
Err(GatewayGetFetchCompatibilityError::MissingView) => unreachable!("view handled above"),
Err(err) => {
return gateway_bad_request_with_code(
err.code(),
GatewayOperationKind::Fetch,
"Missing required parameter",
err.parameter_name(),
);
}
};
let legacy_cache_key = compatibility_plan.legacy_cache_key.clone();
let hashed_cache_key = compatibility_plan.hashed_cache_key.clone();
let legacy_hashed_cache_key = compatibility_plan.legacy_hashed_cache_key.clone();
let cache_result_hashed: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;
if let Some(cached_response) = cache_result_hashed {
return cached_response;
}
if legacy_hashed_cache_key != hashed_cache_key {
let cache_result_legacy_hashed: Option<HttpResponse> =
check_cache_control_and_get_response_v2(
&req,
app_state.clone(),
&legacy_hashed_cache_key,
)
.await;
if let Some(cached_response) = cache_result_legacy_hashed {
tracing::info!(
cache_key = %hashed_cache_key,
legacy_hashed_cache_key = %legacy_hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
"cache hit (GET, legacy hashed)"
);
return cached_response;
}
}
let cache_result_legacy: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &legacy_cache_key).await;
if let Some(cached_response) = cache_result_legacy {
tracing::info!(cache_key = %legacy_cache_key, duration_ms = %start_time.elapsed().as_millis(), "cache hit (GET, legacy)");
return cached_response;
}
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let fetch_response: HttpResponse = super::routes::handle_fetch_data_route(
req,
Some(web::Json(compatibility_plan.request_body)),
app_state.clone(),
)
.await;
if fetch_response.status().is_success() {
let body_bytes: web::Bytes = actix_web::body::to_bytes(fetch_response.into_body())
.await
.unwrap_or_default();
if let Ok(parsed) = serde_json::from_slice::<Value>(&body_bytes) {
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": parsed.clone()})],
)
.await;
hydrate_cache_and_return_json(
app_state.clone(),
legacy_cache_key.clone(),
vec![json!({"data": parsed.clone()})],
)
.await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch_get",
Some(&compatibility_plan.view),
start_time.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"legacy_cache_key": legacy_cache_key,
"hash": cache_key_client_token(&hashed_cache_key)
})),
);
return HttpResponse::Ok().json(parsed);
}
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch_get",
Some(&compatibility_plan.view),
start_time.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({ "error": "fetch delegation failed" })),
);
gateway_internal_error_with_code(
GATEWAY_ERROR_CODE_FETCH_DELEGATION_FAILED,
GatewayOperationKind::Fetch,
"Failed to process request",
"fetch delegation failed",
)
}