use actix_web::http::StatusCode;
use actix_web::web::Data;
use actix_web::{HttpRequest, HttpResponse};
use serde_json::{Value, json};
use std::time::Instant;
use tracing::error;
use crate::AppState;
use crate::api::cache::check::CacheLookupOutcome;
use crate::api::gateway::contracts::GatewayOperationKind;
use crate::api::gateway::response::{
GATEWAY_ERROR_CODE_BACKEND_TEMPORARILY_UNAVAILABLE, GATEWAY_ERROR_CODE_FETCH_DATA_FAILED,
GATEWAY_ERROR_CODE_FETCH_RESPONSE_PROCESSING_FAILED, GATEWAY_ERROR_CODE_INVALID_CLIENT,
GATEWAY_ERROR_CODE_INVALID_POST_PROCESSING_CONFIGURATION,
GATEWAY_ERROR_CODE_OUTBOUND_REQUEST_RATE_LIMITED, gateway_bad_request_with_code,
gateway_internal_error_with_code, gateway_service_unavailable_with_code,
};
use crate::api::headers::response_headers::set_cache_headers;
use crate::data::parse::strip_nulls::strip_nulls_from_key;
use crate::utils::cache_key_token::cache_key_client_token;
use crate::utils::format::normalize_rows;
use crate::utils::request_logging::{
LoggedRequest, RequestCompletionLogContext, finalize_request_log, log_operation_event,
};
use super::post_processing::apply_post_processing;
use super::types::PostProcessingConfig;
fn strip_nulls_from_data_field(data_val: &mut Value) -> bool {
match strip_nulls_from_key(data_val, "data") {
Some(data_stripped) => {
data_val["data"] = data_stripped;
true
}
None => false,
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn respond_fetch_ok(
req: &HttpRequest,
route_key: &str,
request_payload: Option<Value>,
app_state: Data<AppState>,
data: &[Value],
count: Option<u64>,
hashed_cache_key: &str,
strip_nulls: bool,
post_processing_config: &PostProcessingConfig,
force_camel_case_to_snake_case: bool,
table_name: &str,
logged_request: &LoggedRequest,
operation_start: Instant,
cache_outcome: CacheLookupOutcome,
) -> HttpResponse {
let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
let cache_key_public: String = cache_key_client_token(hashed_cache_key);
let mut data_val: Value = json!({ "data": normalized_rows, "cache_key": cache_key_public });
if let Some(count) = count {
data_val["count"] = json!(count);
}
if strip_nulls {
if !strip_nulls_from_data_field(&mut data_val) {
error!("Failed to strip nulls from data");
return gateway_internal_error_with_code(
GATEWAY_ERROR_CODE_FETCH_RESPONSE_PROCESSING_FAILED,
GatewayOperationKind::Fetch,
"Failed to process fetch response",
"Failed to strip nulls from data",
);
}
}
let row_snapshot: Vec<Value> = data_val
.get("data")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
match apply_post_processing(&row_snapshot, post_processing_config) {
Ok(Some(post_processing)) => {
data_val["post_processing"] = post_processing;
}
Ok(None) => {}
Err(err) => {
error!("Post-processing error: {}", err);
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_INVALID_POST_PROCESSING_CONFIGURATION,
GatewayOperationKind::Fetch,
"Invalid post-processing configuration",
format!(
"Post-processing failure: {} (cache_key={})",
err, cache_key_public
),
);
}
}
data_val["status"] = json!("success");
data_val["message"] = json!(format!("Fetched {} rows", row_snapshot.len()));
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"cache_key": hashed_cache_key,
"row_count": normalized_rows.len(),
"count": count,
"cached": false,
"cache_source": "database",
"cache_lookup_outcome": cache_outcome.as_str()
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(StatusCode::OK.as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(false),
cache_lookup_outcome: Some(cache_outcome.as_str().to_string()),
cache_source: Some("database".to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.to_string()),
},
);
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
req,
logged_request.client_name.as_str(),
route_key,
Some(table_name.to_string()),
Some(logged_request.request_id.clone()),
request_payload,
Some(data_val.clone()),
),
);
let mut response = HttpResponse::Ok().json(data_val);
set_cache_headers(
response.headers_mut(),
false,
Some(&cache_key_public),
Some(cache_outcome.as_str()),
Some("database"),
);
response
}
#[cfg(test)]
mod tests {
use super::strip_nulls_from_data_field;
use serde_json::json;
#[test]
fn strip_nulls_keeps_fetch_response_envelope_shape() {
let mut payload = json!({
"data": [null, { "id": 1 }, null],
"cache_key": "abc123"
});
assert!(strip_nulls_from_data_field(&mut payload));
assert_eq!(payload["cache_key"], "abc123");
assert_eq!(payload["data"], json!([{ "id": 1 }]));
assert!(payload.is_object());
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn respond_fetch_err(
err: String,
app_state: Data<AppState>,
table_name: &str,
client_name: &str,
user_id: &str,
hashed_cache_key: &str,
start_time: Instant,
operation_start: Instant,
logged_request: &LoggedRequest,
cache_outcome: CacheLookupOutcome,
) -> HttpResponse {
let cache_key_public: String = cache_key_client_token(hashed_cache_key);
if let Ok(error_json) = serde_json::from_str::<Value>(&err) {
if error_json.get("code").is_some() && error_json.get("trace_id").is_some() {
let status = error_json
.get("status_code")
.and_then(Value::as_u64)
.and_then(|code| StatusCode::from_u16(code as u16).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key_digest = %cache_key_public,
duration_ms = %start_time.elapsed().as_millis(),
error_code = %error_json["code"].as_str().unwrap_or("unknown"),
trace_id = %error_json["trace_id"].as_str().unwrap_or("unknown"),
"fetch POST failed with processed error"
);
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
status,
Some(json!({
"cache_key": hashed_cache_key,
"error_code": error_json["code"],
"message": error_json["message"],
"cached": false,
"cache_source": "database",
"cache_lookup_outcome": cache_outcome.as_str()
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(status.as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(false),
cache_lookup_outcome: Some(cache_outcome.as_str().to_string()),
cache_source: Some("database".to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.to_string()),
},
);
return HttpResponse::build(status).json(error_json);
}
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key_digest = %cache_key_public,
duration_ms = %start_time.elapsed().as_millis(),
error = %err,
"fetch POST failed"
);
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({
"cache_key": hashed_cache_key,
"error": err,
"cached": false,
"cache_source": "database",
"cache_lookup_outcome": cache_outcome.as_str()
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(false),
cache_lookup_outcome: Some(cache_outcome.as_str().to_string()),
cache_source: Some("database".to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.to_string()),
},
);
return gateway_internal_error_with_code(
GATEWAY_ERROR_CODE_FETCH_DATA_FAILED,
GatewayOperationKind::Fetch,
"Failed to fetch data",
format!("{} (cache_key={})", err, cache_key_public),
);
}
if err == "OutboundSupabaseThrottled" {
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::SERVICE_UNAVAILABLE,
Some(json!({
"backend": "supabase",
"cache_key": hashed_cache_key,
"reason": "outbound_supabase_throttled",
"cached": false,
"cache_source": "database",
"cache_lookup_outcome": cache_outcome.as_str()
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(StatusCode::SERVICE_UNAVAILABLE.as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(false),
cache_lookup_outcome: Some(cache_outcome.as_str().to_string()),
cache_source: Some("database".to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.to_string()),
},
);
return gateway_service_unavailable_with_code(
GATEWAY_ERROR_CODE_OUTBOUND_REQUEST_RATE_LIMITED,
GatewayOperationKind::Fetch,
"Outbound request rate limited",
format!(
"Supabase traffic is temporarily throttled (cache_key={})",
cache_key_public
),
);
}
if err.starts_with("HostOffline:") {
let parts: Vec<&str> = err.splitn(3, ':').collect();
let host = parts.get(1).unwrap_or(&client_name);
let until_secs = parts
.get(2)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
app_state
.metrics_state
.record_gateway_backend_unavailable("fetch", "supabase");
let msg = format!(
"Backend '{}' temporarily unavailable; retry after {}s",
host, until_secs
);
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::SERVICE_UNAVAILABLE,
Some(json!({
"backend": "supabase",
"cache_key": hashed_cache_key,
"host": host,
"until_secs": until_secs,
"cached": false,
"cache_source": "database",
"cache_lookup_outcome": cache_outcome.as_str()
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(StatusCode::SERVICE_UNAVAILABLE.as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(false),
cache_lookup_outcome: Some(cache_outcome.as_str().to_string()),
cache_source: Some("database".to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.to_string()),
},
);
return gateway_service_unavailable_with_code(
GATEWAY_ERROR_CODE_BACKEND_TEMPORARILY_UNAVAILABLE,
GatewayOperationKind::Fetch,
msg,
format!("cache_key={}, until_secs={}", cache_key_public, until_secs),
);
}
if err.starts_with("Unknown client name:") {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_INVALID_CLIENT,
GatewayOperationKind::Fetch,
"Invalid client",
format!(
"X-Athena-Client does not match a configured client (client={}, cache_key={})",
client_name, cache_key_public
),
);
}
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key_digest = %cache_key_public,
duration_ms = %start_time.elapsed().as_millis(),
error = %err,
"fetch POST failed"
);
log_operation_event(
Some(app_state.get_ref()),
logged_request,
"fetch",
Some(table_name),
operation_start.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({
"cache_key": hashed_cache_key,
"error": err,
"cached": false,
"cache_source": "database",
"cache_lookup_outcome": cache_outcome.as_str()
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(false),
cache_lookup_outcome: Some(cache_outcome.as_str().to_string()),
cache_source: Some("database".to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.to_string()),
},
);
gateway_internal_error_with_code(
GATEWAY_ERROR_CODE_FETCH_DATA_FAILED,
GatewayOperationKind::Fetch,
"Failed to fetch data",
format!("{} (cache_key={})", err, cache_key_public),
)
}