athena_rs 2.9.1

Database gateway API
Documentation
//! GET `/data` route: maps query params to an internal call to the fetch handler.

use actix_web::HttpRequest;
use actix_web::http::StatusCode;
use actix_web::web;
use actix_web::web::Data;
use actix_web::{HttpResponse, Responder, get};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::time::Instant;

use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{authorize_gateway_request, read_right_for_resource};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::utils::format::normalize_columns_csv;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};

use super::response::missing_client_header_response;

#[get("/data")]
/// GET entry point that translates query parameters into a POST body before delegating.
pub async fn get_data_route(
    req: HttpRequest,
    query: web::Query<HashMap<String, String>>,
    app_state: Data<AppState>,
) -> impl Responder {
    let start_time: Instant = Instant::now();
    let client_name: String = x_athena_client(&req.clone());
    if client_name.is_empty() {
        return missing_client_header_response();
    }
    let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;

    let view: String = match query.get("view") {
        Some(v) => v.clone(),
        None => {
            let auth = authorize_gateway_request(
                &req,
                app_state.get_ref(),
                Some(&client_name),
                vec![read_right_for_resource(None)],
            )
            .await;
            let _logged_request: LoggedRequest = log_request(
                req.clone(),
                Some(app_state.get_ref()),
                Some(auth.request_id.clone()),
                Some(&auth.log_context),
            );
            if let Some(resp) = auth.response {
                return resp;
            }
            return HttpResponse::BadRequest().json(json!({
                "error": "Missing required parameter: view"
            }));
        }
    };
    let auth = authorize_gateway_request(
        &req,
        app_state.get_ref(),
        Some(&client_name),
        vec![read_right_for_resource(Some(&view))],
    )
    .await;
    let logged_request: LoggedRequest = log_request(
        req.clone(),
        Some(app_state.get_ref()),
        Some(auth.request_id.clone()),
        Some(&auth.log_context),
    );
    if let Some(resp) = auth.response {
        return resp;
    }

    let eq_column_raw: String = match query.get("eq_column") {
        Some(c) => c.clone(),
        None => {
            return HttpResponse::BadRequest().json(json!({
                "error": "Missing required parameter: eq_column"
            }));
        }
    };
    let eq_column: String = eq_column_raw.clone();

    let eq_value: String = match query.get("eq_value") {
        Some(v) => v.clone(),
        None => {
            return HttpResponse::BadRequest().json(json!({
                "error": "Missing required parameter: eq_value"
            }));
        }
    };

    let columns: Option<String> = query.get("columns").cloned();
    let limit: Option<i32> = query.get("limit").and_then(|l| l.parse::<i32>().ok());
    let current_page: Option<i32> = query
        .get("current_page")
        .and_then(|p| p.parse::<i32>().ok());
    let page_size: Option<i32> = query.get("page_size").and_then(|s| s.parse::<i32>().ok());
    let offset: Option<i32> = query.get("offset").and_then(|o| o.parse::<i32>().ok());
    let total_pages: Option<i32> = query.get("total_pages").and_then(|t| t.parse::<i32>().ok());
    let strip_nulls: bool = query
        .get("strip_nulls")
        .and_then(|s| s.parse::<bool>().ok())
        .unwrap_or(false);
    let sort_by: Option<String> = query.get("sort_by").cloned();
    let sort_direction: Option<String> = query.get("sort_direction").cloned();

    let normalized_columns_param: Option<String> = columns
        .as_ref()
        .map(|cols| normalize_columns_csv(cols, force_camel_case_to_snake_case));

    let legacy_cache_key: String = format!(
        "get_data_route:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}",
        view,
        eq_column,
        eq_value,
        normalized_columns_param.as_deref().unwrap_or(""),
        limit.unwrap_or(0),
        current_page.unwrap_or(1),
        page_size.unwrap_or(100),
        offset.unwrap_or(0),
        total_pages.unwrap_or(1),
        strip_nulls,
        sort_by.as_deref().unwrap_or(""),
        sort_direction.as_deref().unwrap_or(""),
        client_name
    );

    let get_hash_input: Value = json!({
        "view": view,
        "eq_column": eq_column,
        "eq_value": eq_value,
        "columns": normalized_columns_param.clone(),
        "limit": limit,
        "current_page": current_page,
        "page_size": page_size,
        "offset": offset,
        "total_pages": total_pages,
        "strip_nulls": strip_nulls,
        "sort_by": sort_by,
        "sort_direction": sort_direction,
        "client": client_name,
    });
    let get_hash_str: String =
        sha256::digest(serde_json::to_string(&get_hash_input).unwrap_or_default());
    let get_short_hash: &str = &get_hash_str[..8];
    let hashed_cache_key: String = format!("{}-h{}", legacy_cache_key, get_short_hash);

    let cache_result_hashed: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;

    if let Some(cached_response) = cache_result_hashed {
        return cached_response;
    }

    let cache_result_legacy: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, app_state.clone(), &legacy_cache_key).await;

    if let Some(cached_response) = cache_result_legacy {
        tracing::info!(cache_key = %legacy_cache_key, duration_ms = %start_time.elapsed().as_millis(), "cache hit (GET, legacy)");
        return cached_response;
    }

    let condition: Value = json!({
        "eq_column": eq_column,
        "eq_value": eq_value
    });

    let mut request_body: Value = json!({
        "table_name": view,
        "conditions": [condition]
    });

    if let Some(cols) = normalized_columns_param.clone() {
        request_body["columns"] = json!(cols);
    }

    if let Some(l) = limit {
        request_body["limit"] = json!(l);
    }

    if let Some(cp) = current_page {
        request_body["current_page"] = json!(cp);
    }

    if let Some(ps) = page_size {
        request_body["page_size"] = json!(ps);
    }

    if let Some(o) = offset {
        request_body["offset"] = json!(o);
    }

    if let Some(tp) = total_pages {
        request_body["total_pages"] = json!(tp);
    }

    if strip_nulls {
        request_body["strip_nulls"] = json!(strip_nulls);
    }

    if let Some(ref col) = sort_by
        && !col.is_empty()
    {
        let dir = sort_direction.as_deref().unwrap_or("asc").to_lowercase();
        let direction = if dir == "desc" || dir == "descending" {
            "desc"
        } else {
            "asc"
        };
        request_body["sortBy"] = json!({ "field": col, "direction": direction });
    }

    let fetch_response =
        super::routes::handle_fetch_data_route(req, Some(web::Json(request_body)), app_state.clone())
            .await;

    if fetch_response.status().is_success() {
        let body_bytes = actix_web::body::to_bytes(fetch_response.into_body())
            .await
            .unwrap_or_default();
        if let Ok(parsed) = serde_json::from_slice::<Value>(&body_bytes) {
                hydrate_cache_and_return_json(
                    app_state.clone(),
                    hashed_cache_key.clone(),
                    vec![json!({"data": parsed.clone()})],
                )
                .await;
                hydrate_cache_and_return_json(
                    app_state.clone(),
                    legacy_cache_key.clone(),
                    vec![json!({"data": parsed.clone()})],
                )
                .await;

                log_operation_event(
                    Some(app_state.get_ref()),
                    &logged_request,
                    "fetch_get",
                    Some(&view),
                    start_time.elapsed().as_millis(),
                    StatusCode::OK,
                    Some(json!({
                        "legacy_cache_key": legacy_cache_key,
                        "hash": hashed_cache_key
                    })),
                );

            return HttpResponse::Ok().json(parsed);
        }
    }

    log_operation_event(
        Some(app_state.get_ref()),
        &logged_request,
        "fetch_get",
        Some(&view),
        start_time.elapsed().as_millis(),
        StatusCode::INTERNAL_SERVER_ERROR,
        Some(json!({ "error": "fetch delegation failed" })),
    );
    HttpResponse::InternalServerError().json(json!({
        "error": "Failed to process request"
    }))
}