athena_rs 2.9.1

Database gateway API
Documentation
//! HTTP mapping for gateway fetch success and error paths.

use actix_web::HttpResponse;
use actix_web::http::StatusCode;
use actix_web::web::Data;
use serde_json::{Value, json};
use std::time::Instant;
use tracing::error;

use crate::AppState;
use crate::data::parse::strip_nulls::strip_nulls_from_key;
use crate::utils::format::normalize_rows;
use crate::utils::request_logging::{LoggedRequest, log_operation_event};

use super::post_processing::apply_post_processing;
use super::types::PostProcessingConfig;

pub(crate) fn missing_client_header_response() -> HttpResponse {
    HttpResponse::BadRequest().json(json!({
        "status": "error",
        "code": "missing_client_header",
        "message": "X-Athena-Client header is required and cannot be empty",
    }))
}

/// Normalizes rows, optional strip-nulls, post-processing, and success logging.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn respond_fetch_ok(
    app_state: Data<AppState>,
    data: &[Value],
    hashed_cache_key: &str,
    strip_nulls: bool,
    post_processing_config: &PostProcessingConfig,
    force_camel_case_to_snake_case: bool,
    table_name: &str,
    logged_request: &LoggedRequest,
    operation_start: Instant,
) -> HttpResponse {
    let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
    let mut data_val: Value = json!({ "data": normalized_rows, "cache_key": hashed_cache_key });

    if strip_nulls {
        let data_stripped_result: Option<Value> = strip_nulls_from_key(&mut data_val, "data").await;

        if let Some(data_stripped) = data_stripped_result {
            data_val = data_stripped.clone();
        } else {
            error!("Failed to strip nulls from data");
            return HttpResponse::InternalServerError().json(json!({
                    "error": "Failed to strip nulls from data"
                }
            ));
        }
    }

    let row_snapshot: Vec<Value> = data_val
        .get("data")
        .and_then(|v| v.as_array())
        .cloned()
        .unwrap_or_default();
    match apply_post_processing(&row_snapshot, post_processing_config) {
        Ok(Some(post_processing)) => {
            data_val["post_processing"] = post_processing;
        }
        Ok(None) => {}
        Err(err) => {
            error!("Post-processing error: {}", err);
            return HttpResponse::BadRequest().json(json!({
                "error": format!("Post-processing failure: {}", err),
                "cache_key": hashed_cache_key,
            }));
        }
    }

    log_operation_event(
        Some(app_state.get_ref()),
        logged_request,
        "fetch",
        Some(table_name),
        operation_start.elapsed().as_millis(),
        StatusCode::OK,
        Some(json!({
            "cache_key": hashed_cache_key,
            "row_count": normalized_rows.len()
        })),
    );

    HttpResponse::Ok().json(data_val)
}

/// Maps backend error strings (including processed sqlx JSON) to HTTP responses.
#[allow(clippy::too_many_arguments)]
pub(crate) fn respond_fetch_err(
    err: String,
    app_state: Data<AppState>,
    table_name: &str,
    client_name: &str,
    user_id: &str,
    hashed_cache_key: &str,
    start_time: Instant,
    operation_start: Instant,
    logged_request: &LoggedRequest,
) -> HttpResponse {
    if let Ok(error_json) = serde_json::from_str::<Value>(&err) {
        if error_json.get("code").is_some() && error_json.get("trace_id").is_some() {
            let status = error_json
                .get("status_code")
                .and_then(Value::as_u64)
                .and_then(|code| StatusCode::from_u16(code as u16).ok())
                .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
            error!(
                table = %table_name,
                client = %client_name,
                user_id = %user_id,
                cache_key = %hashed_cache_key,
                duration_ms = %start_time.elapsed().as_millis(),
                error_code = %error_json["code"].as_str().unwrap_or("unknown"),
                trace_id = %error_json["trace_id"].as_str().unwrap_or("unknown"),
                "fetch POST failed with processed error"
            );
            log_operation_event(
                Some(app_state.get_ref()),
                logged_request,
                "fetch",
                Some(table_name),
                operation_start.elapsed().as_millis(),
                status,
                Some(json!({
                    "cache_key": hashed_cache_key,
                    "error_code": error_json["code"],
                    "message": error_json["message"]
                })),
            );
            return HttpResponse::build(status).json(error_json);
        }

        error!(
            table = %table_name,
            client = %client_name,
            user_id = %user_id,
            cache_key = %hashed_cache_key,
            duration_ms = %start_time.elapsed().as_millis(),
            error = %err,
            "fetch POST failed"
        );
        log_operation_event(
            Some(app_state.get_ref()),
            logged_request,
            "fetch",
            Some(table_name),
            operation_start.elapsed().as_millis(),
            StatusCode::INTERNAL_SERVER_ERROR,
            Some(json!({
                "cache_key": hashed_cache_key,
                "error": err
            })),
        );
        return HttpResponse::InternalServerError().json(
            json!({"status": "error", "message": "Failed to fetch data", "error": err, "cache_key": hashed_cache_key}),
        );
    }

    if err.starts_with("HostOffline:") {
        let parts: Vec<&str> = err.splitn(3, ':').collect();
        let host = parts.get(1).unwrap_or(&client_name);
        let until_secs = parts.get(2).and_then(|s| s.parse::<u64>().ok()).unwrap_or(60);
        app_state
            .metrics_state
            .record_gateway_backend_unavailable("fetch", client_name);
        let msg = format!(
            "Backend '{}' temporarily unavailable; retry after {}s",
            host, until_secs
        );
        log_operation_event(
            Some(app_state.get_ref()),
            logged_request,
            "fetch",
            Some(table_name),
            operation_start.elapsed().as_millis(),
            StatusCode::SERVICE_UNAVAILABLE,
            Some(json!({
                "cache_key": hashed_cache_key,
                "host": host,
                "until_secs": until_secs,
            })),
        );
        return HttpResponse::ServiceUnavailable().json(json!({
            "status": "error",
            "code": "backend_unavailable",
            "message": msg,
            "cache_key": hashed_cache_key,
            "until_secs": until_secs,
        }));
    }

    if err.starts_with("Unknown client name:") {
        return HttpResponse::BadRequest().json(json!({
            "status": "error",
            "code": "unknown_client",
            "message": "X-Athena-Client does not match a configured client.",
            "details": {
                "client": client_name,
                "cache_key": hashed_cache_key
            }
        }));
    }

    error!(
        table = %table_name,
        client = %client_name,
        user_id = %user_id,
        cache_key = %hashed_cache_key,
        duration_ms = %start_time.elapsed().as_millis(),
        error = %err,
        "fetch POST failed"
    );
    log_operation_event(
        Some(app_state.get_ref()),
        logged_request,
        "fetch",
        Some(table_name),
        operation_start.elapsed().as_millis(),
        StatusCode::INTERNAL_SERVER_ERROR,
        Some(json!({
            "cache_key": hashed_cache_key,
            "error": err
        })),
    );
    HttpResponse::InternalServerError().json(
        json!({"status": "error", "message": "Failed to fetch data", "error": err, "cache_key": hashed_cache_key}),
    )
}