use actix_web::HttpResponse;
use actix_web::http::StatusCode;
use actix_web::web::Data;
use serde_json::{Value, json};
use std::time::Instant;
use tracing::error;
use crate::AppState;
use crate::data::parse::strip_nulls::strip_nulls_from_key;
use crate::utils::format::normalize_rows;
use crate::utils::request_logging::{LoggedRequest, log_operation_event};
use super::post_processing::apply_post_processing;
use super::types::PostProcessingConfig;
pub(crate) fn missing_client_header_response() -> HttpResponse {
HttpResponse::BadRequest().json(json!({
"status": "error",
"code": "missing_client_header",
"message": "X-Athena-Client header is required and cannot be empty",
}))
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn respond_fetch_ok(
app_state: Data<AppState>,
data: &[Value],
hashed_cache_key: &str,
strip_nulls: bool,
post_processing_config: &PostProcessingConfig,
force_camel_case_to_snake_case: bool,
table_name: &str,
logged_request: &LoggedRequest,
operation_start: Instant,
) -> HttpResponse {
let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
let mut data_val: Value = json!({ "data": normalized_rows, "cache_key": hashed_cache_key });
if strip_nulls {
let data_stripped_result: Option<Value> = strip_nulls_from_key(&mut data_val, "data").await;
if let Some(data_stripped) = data_stripped_result {
data_val = data_stripped.clone();
} else {
error!("Failed to strip nulls from data");
return HttpResponse::InternalServerError().json(json!({
"error": "Failed to strip nulls from data"
}
));
}
}
let row_snapshot: Vec<Value> = data_val
.get("data")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
match apply_post_processing(&row_snapshot, post_processing_config) {
Ok(Some(post_processing)) => {
data_val["post_processing"] = post_processing;
}
Ok(None) => {}
Err(err) => {
error!("Post-processing error: {}", err);
return HttpResponse::BadRequest().json(json!({
"error": format!("Post-processing failure: {}", err),
"cache_key": hashed_cache_key,
}));
}
}
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"cache_key": hashed_cache_key,
"row_count": normalized_rows.len()
})),
);
HttpResponse::Ok().json(data_val)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn respond_fetch_err(
err: String,
app_state: Data<AppState>,
table_name: &str,
client_name: &str,
user_id: &str,
hashed_cache_key: &str,
start_time: Instant,
operation_start: Instant,
logged_request: &LoggedRequest,
) -> HttpResponse {
if let Ok(error_json) = serde_json::from_str::<Value>(&err) {
if error_json.get("code").is_some() && error_json.get("trace_id").is_some() {
let status = error_json
.get("status_code")
.and_then(Value::as_u64)
.and_then(|code| StatusCode::from_u16(code as u16).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key = %hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
error_code = %error_json["code"].as_str().unwrap_or("unknown"),
trace_id = %error_json["trace_id"].as_str().unwrap_or("unknown"),
"fetch POST failed with processed error"
);
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
status,
Some(json!({
"cache_key": hashed_cache_key,
"error_code": error_json["code"],
"message": error_json["message"]
})),
);
return HttpResponse::build(status).json(error_json);
}
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key = %hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
error = %err,
"fetch POST failed"
);
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({
"cache_key": hashed_cache_key,
"error": err
})),
);
return HttpResponse::InternalServerError().json(
json!({"status": "error", "message": "Failed to fetch data", "error": err, "cache_key": hashed_cache_key}),
);
}
if err.starts_with("HostOffline:") {
let parts: Vec<&str> = err.splitn(3, ':').collect();
let host = parts.get(1).unwrap_or(&client_name);
let until_secs = parts.get(2).and_then(|s| s.parse::<u64>().ok()).unwrap_or(60);
app_state
.metrics_state
.record_gateway_backend_unavailable("fetch", client_name);
let msg = format!(
"Backend '{}' temporarily unavailable; retry after {}s",
host, until_secs
);
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::SERVICE_UNAVAILABLE,
Some(json!({
"cache_key": hashed_cache_key,
"host": host,
"until_secs": until_secs,
})),
);
return HttpResponse::ServiceUnavailable().json(json!({
"status": "error",
"code": "backend_unavailable",
"message": msg,
"cache_key": hashed_cache_key,
"until_secs": until_secs,
}));
}
if err.starts_with("Unknown client name:") {
return HttpResponse::BadRequest().json(json!({
"status": "error",
"code": "unknown_client",
"message": "X-Athena-Client does not match a configured client.",
"details": {
"client": client_name,
"cache_key": hashed_cache_key
}
}));
}
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key = %hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
error = %err,
"fetch POST failed"
);
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({
"cache_key": hashed_cache_key,
"error": err
})),
);
HttpResponse::InternalServerError().json(
json!({"status": "error", "message": "Failed to fetch data", "error": err, "cache_key": hashed_cache_key}),
)
}