athena_rs 2.9.0

Database gateway API
Documentation
//! GET `/data` route: maps query params to a loopback POST to `/gateway/fetch`.

use actix_web::HttpRequest;
use actix_web::http::StatusCode;
use actix_web::web;
use actix_web::web::Data;
use actix_web::{HttpResponse, Responder, get};
use reqwest::{Client, RequestBuilder, Response};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::time::Instant;
use tracing::error;

use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{authorize_gateway_request, read_right_for_resource};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::utils::format::normalize_columns_csv;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};

use super::response::missing_client_header_response;

#[get("/data")]
/// GET entry point that translates query parameters into a POST body before delegating.
pub async fn get_data_route(
    req: HttpRequest,
    query: web::Query<HashMap<String, String>>,
    app_state: Data<AppState>,
) -> impl Responder {
    let start_time: Instant = Instant::now();
    let client_name: String = x_athena_client(&req.clone());
    if client_name.is_empty() {
        return missing_client_header_response();
    }
    let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;

    let view: String = match query.get("view") {
        Some(v) => v.clone(),
        None => {
            let auth = authorize_gateway_request(
                &req,
                app_state.get_ref(),
                Some(&client_name),
                vec![read_right_for_resource(None)],
            )
            .await;
            let _logged_request: LoggedRequest = log_request(
                req.clone(),
                Some(app_state.get_ref()),
                Some(auth.request_id.clone()),
                Some(&auth.log_context),
            );
            if let Some(resp) = auth.response {
                return resp;
            }
            return HttpResponse::BadRequest().json(json!({
                "error": "Missing required parameter: view"
            }));
        }
    };
    let auth = authorize_gateway_request(
        &req,
        app_state.get_ref(),
        Some(&client_name),
        vec![read_right_for_resource(Some(&view))],
    )
    .await;
    let logged_request: LoggedRequest = log_request(
        req.clone(),
        Some(app_state.get_ref()),
        Some(auth.request_id.clone()),
        Some(&auth.log_context),
    );
    if let Some(resp) = auth.response {
        return resp;
    }

    let eq_column_raw: String = match query.get("eq_column") {
        Some(c) => c.clone(),
        None => {
            return HttpResponse::BadRequest().json(json!({
                "error": "Missing required parameter: eq_column"
            }));
        }
    };
    let eq_column: String = eq_column_raw.clone();

    let eq_value: String = match query.get("eq_value") {
        Some(v) => v.clone(),
        None => {
            return HttpResponse::BadRequest().json(json!({
                "error": "Missing required parameter: eq_value"
            }));
        }
    };

    let columns: Option<String> = query.get("columns").cloned();
    let limit: Option<i32> = query.get("limit").and_then(|l| l.parse::<i32>().ok());
    let current_page: Option<i32> = query
        .get("current_page")
        .and_then(|p| p.parse::<i32>().ok());
    let page_size: Option<i32> = query.get("page_size").and_then(|s| s.parse::<i32>().ok());
    let offset: Option<i32> = query.get("offset").and_then(|o| o.parse::<i32>().ok());
    let total_pages: Option<i32> = query.get("total_pages").and_then(|t| t.parse::<i32>().ok());
    let strip_nulls: bool = query
        .get("strip_nulls")
        .and_then(|s| s.parse::<bool>().ok())
        .unwrap_or(false);
    let sort_by: Option<String> = query.get("sort_by").cloned();
    let sort_direction: Option<String> = query.get("sort_direction").cloned();

    let normalized_columns_param: Option<String> = columns
        .as_ref()
        .map(|cols| normalize_columns_csv(cols, force_camel_case_to_snake_case));

    let legacy_cache_key: String = format!(
        "get_data_route:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}",
        view,
        eq_column,
        eq_value,
        normalized_columns_param.as_deref().unwrap_or(""),
        limit.unwrap_or(0),
        current_page.unwrap_or(1),
        page_size.unwrap_or(100),
        offset.unwrap_or(0),
        total_pages.unwrap_or(1),
        strip_nulls,
        sort_by.as_deref().unwrap_or(""),
        sort_direction.as_deref().unwrap_or(""),
        client_name
    );

    let get_hash_input: Value = json!({
        "view": view,
        "eq_column": eq_column,
        "eq_value": eq_value,
        "columns": normalized_columns_param.clone(),
        "limit": limit,
        "current_page": current_page,
        "page_size": page_size,
        "offset": offset,
        "total_pages": total_pages,
        "strip_nulls": strip_nulls,
        "sort_by": sort_by,
        "sort_direction": sort_direction,
        "client": client_name,
    });
    let get_hash_str: String =
        sha256::digest(serde_json::to_string(&get_hash_input).unwrap_or_default());
    let get_short_hash: &str = &get_hash_str[..8];
    let hashed_cache_key: String = format!("{}-h{}", legacy_cache_key, get_short_hash);

    let cache_result_hashed: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;

    if let Some(cached_response) = cache_result_hashed {
        return cached_response;
    }

    let cache_result_legacy: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, app_state.clone(), &legacy_cache_key).await;

    if let Some(cached_response) = cache_result_legacy {
        tracing::info!(cache_key = %legacy_cache_key, duration_ms = %start_time.elapsed().as_millis(), "cache hit (GET, legacy)");
        return cached_response;
    }

    let condition: Value = json!({
        "eq_column": eq_column,
        "eq_value": eq_value
    });

    let mut request_body: Value = json!({
        "view_name": view,
        "conditions": [condition]
    });

    if let Some(cols) = normalized_columns_param.clone() {
        request_body["columns"] = json!(cols);
    }

    if let Some(l) = limit {
        request_body["limit"] = json!(l);
    }

    if let Some(cp) = current_page {
        request_body["current_page"] = json!(cp);
    }

    if let Some(ps) = page_size {
        request_body["page_size"] = json!(ps);
    }

    if let Some(o) = offset {
        request_body["offset"] = json!(o);
    }

    if let Some(tp) = total_pages {
        request_body["total_pages"] = json!(tp);
    }

    if strip_nulls {
        request_body["strip_nulls"] = json!(strip_nulls);
    }

    if let Some(ref col) = sort_by
        && !col.is_empty()
    {
        let dir = sort_direction.as_deref().unwrap_or("asc").to_lowercase();
        let direction = if dir == "desc" || dir == "descending" {
            "desc"
        } else {
            "asc"
        };
        request_body["sortBy"] = json!({ "field": col, "direction": direction });
    }

    let client: Client = Client::new();
    let mut req_builder: RequestBuilder = client
        .post("http://localhost:4052/gateway/fetch")
        .header("Content-Type", "application/json")
        .header(
            "X-User-Id",
            req.headers()
                .get("X-User-Id")
                .and_then(|h| h.to_str().ok())
                .unwrap_or(""),
        )
        .header("X-Athena-Client", &client_name);

    if let Some(api_key) = req.headers().get("apikey").and_then(|h| h.to_str().ok()) {
        req_builder = req_builder.header("apikey", api_key);
    }

    if let Some(api_key) = req.headers().get("x-api-key").and_then(|h| h.to_str().ok()) {
        req_builder = req_builder.header("x-api-key", api_key);
    }

    if let Some(url) = req
        .headers()
        .get("x-supabase-url")
        .and_then(|h| h.to_str().ok())
    {
        req_builder = req_builder.header("x-supabase-url", url);
    }
    if let Some(key) = req
        .headers()
        .get("x-supabase-key")
        .and_then(|h| h.to_str().ok())
    {
        req_builder = req_builder.header("x-supabase-key", key);
    }

    let response: Response = match req_builder.json(&request_body).send().await {
        Ok(resp) => resp,
        Err(e) => {
            error!("Failed to send POST request: {:#?}", e);
            return HttpResponse::InternalServerError().json(json!({
                "error": "Failed to process request"
            }));
        }
    };

    match response.json::<Value>().await {
        Ok(data) => {
            hydrate_cache_and_return_json(
                app_state.clone(),
                hashed_cache_key.clone(),
                vec![json!({"data": data.clone()})],
            )
            .await;
            hydrate_cache_and_return_json(
                app_state.clone(),
                legacy_cache_key.clone(),
                vec![json!({"data": data.clone()})],
            )
            .await;

            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "fetch_get",
                Some(&view),
                start_time.elapsed().as_millis(),
                StatusCode::OK,
                Some(json!({
                    "legacy_cache_key": legacy_cache_key,
                    "hash": hashed_cache_key
                })),
            );

            HttpResponse::Ok().json(data)
        }

        Err(e) => {
            error!(duration_ms = %start_time.elapsed().as_millis(), error = %e, "fetch GET failed to parse response");
            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "fetch_get",
                Some(&view),
                start_time.elapsed().as_millis(),
                StatusCode::INTERNAL_SERVER_ERROR,
                Some(json!({ "error": e.to_string() })),
            );
            HttpResponse::InternalServerError().json(json!({
                "error": "Failed to parse response"
            }))
        }
    }
}