athena_rs 2.9.1

Database gateway API
Documentation
//! HTTP routes for gateway fetch: `/gateway/data`, `/gateway/fetch`.

#[allow(deprecated)]
use actix_web::HttpRequest;
use actix_web::web::{Data, Json};
use actix_web::{HttpResponse, post};
use serde_json::{Value, json};
use std::time::Instant;
use tracing::{error, info};

use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{authorize_gateway_request, read_right_for_resource};
use crate::api::headers::x_athena_client::x_athena_client;
#[cfg(feature = "deadpool_experimental")]
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::headers::x_strip_nulls::get_x_strip_nulls;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::parser::query_builder::Condition;
use crate::utils::format::{normalize_column_name, normalize_rows};
use crate::utils::request_logging::{LoggedRequest, log_request};

use super::conditions::{RequestCondition, to_query_conditions};
use super::execution::execute_gateway_fetch_data;
use super::request::{
    build_fetch_hashed_cache_key, parse_gateway_fetch_conditions, parse_sort_options_from_body,
};
use super::response::{missing_client_header_response, respond_fetch_err, respond_fetch_ok};
use super::types::{PostProcessingConfig, SortOptions};

/// Central handler for the `/gateway/data`, `/gateway/fetch`, and `/gateway/update` POST entry points.
///
/// It normalizes requested columns/conditions, consults cache headers, runs the query against Postgres or Supabase,
/// hydrates the cache, optionally strips nulls, and applies grouping/aggregation rules before responding.
pub(crate) async fn handle_fetch_data_route(
    req: HttpRequest,
    body: Option<Json<Value>>,
    app_state: Data<AppState>,
) -> HttpResponse {
    let operation_start: Instant = Instant::now();
    let start_time: Instant = Instant::now();
    let client_name: String = x_athena_client(&req.clone());
    if client_name.is_empty() {
        return missing_client_header_response();
    }
    let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
    let auto_cast_uuid_filter_values_to_text =
        app_state.gateway_auto_cast_uuid_filter_values_to_text;
    let mut table_name: String = String::new();

    let mut current_page: i64 = 1;
    let mut page_size: i64 = 100;
    let mut offset: i64 = 0;

    let mut conditions: Vec<RequestCondition> = vec![];
    let apikey: Option<String> = req
        .headers()
        .get("x-athena-key")
        .and_then(|value| value.to_str().ok())
        .map(|s| s.to_string())
        .filter(|key| key == &std::env::var("SUITSBOOKS_API_ADMIN_KEY").unwrap_or_default());

    let user_id: String = get_x_user_id(&req)
        .or_else(|| apikey.clone())
        .unwrap_or_default();

    let limit: i64 = match &body {
        Some(json_body) => json_body
            .get("limit")
            .and_then(Value::as_u64)
            .unwrap_or(page_size as u64) as i64,
        None => page_size,
    };

    let mut columns_vec: Vec<String> = vec![];
    if let Some(ref json_body) = body
        && let Some(cols_val) = json_body.get("columns")
    {
        if let Some(arr) = cols_val.as_array() {
            columns_vec = arr
                .iter()
                .filter_map(|v| v.as_str().map(|s| s.to_string()))
                .collect();
        } else if let Some(s) = cols_val.as_str() {
            columns_vec = s
                .split(',')
                .map(|p| p.trim().to_string())
                .filter(|s| !s.is_empty())
                .collect();
        }
    }
    if columns_vec.is_empty() {
        columns_vec.push("*".to_string());
    }

    if force_camel_case_to_snake_case {
        columns_vec = columns_vec
            .into_iter()
            .map(|col| normalize_column_name(&col, true))
            .collect();
    }

    let strip_nulls: bool = match get_x_strip_nulls(&req) {
        Some(value) => value.to_lowercase() == "true",
        None => false,
    };
    let post_processing_config: PostProcessingConfig =
        PostProcessingConfig::from_body(body.as_deref(), force_camel_case_to_snake_case);
    let sort_options: Option<SortOptions> =
        parse_sort_options_from_body(body.as_deref(), force_camel_case_to_snake_case);

    if let Some(ref json_body) = body {
        if let Some(page) = json_body.get("current_page").and_then(Value::as_u64) {
            current_page = page as i64;
        }
        if let Some(size) = json_body.get("page_size").and_then(Value::as_u64) {
            page_size = size as i64;
        }
        if let Some(off) = json_body.get("offset").and_then(Value::as_u64) {
            offset = off as i64;
        }
    }

    if let Some(ref json_body) = body {
        if let Some(name) = json_body.get("table_name").and_then(Value::as_str) {
            table_name = name.to_string();
        }

        conditions = match parse_gateway_fetch_conditions(json_body, force_camel_case_to_snake_case)
        {
            Ok(c) => c,
            Err(resp) => return resp,
        };
    } else {
        let auth = authorize_gateway_request(
            &req,
            app_state.get_ref(),
            Some(&client_name),
            vec![read_right_for_resource(None)],
        )
        .await;
        let _logged_request: LoggedRequest = log_request(
            req.clone(),
            Some(app_state.get_ref()),
            Some(auth.request_id.clone()),
            Some(&auth.log_context),
        );
        if let Some(resp) = auth.response {
            return resp;
        }
        return HttpResponse::BadRequest().json(json!({
            "error": "table_name is required"
        }));
    }

    let auth = authorize_gateway_request(
        &req,
        app_state.get_ref(),
        Some(&client_name),
        vec![read_right_for_resource(Some(&table_name))],
    )
    .await;
    let logged_request: LoggedRequest = log_request(
        req.clone(),
        Some(app_state.get_ref()),
        Some(auth.request_id.clone()),
        Some(&auth.log_context),
    );
    if let Some(resp) = auth.response {
        return resp;
    }

    #[cfg(feature = "deadpool_experimental")]
    let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id));
    #[cfg(not(feature = "deadpool_experimental"))]
    let deadpool_requested = false;

    if table_name.is_empty() {
        return HttpResponse::BadRequest().json(json!({
            "error": "table_name is required"
        }));
    }

    conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));

    let hashed_cache_key: String = build_fetch_hashed_cache_key(
        &table_name,
        &conditions,
        &columns_vec,
        limit,
        strip_nulls,
        &client_name,
        sort_options.as_ref(),
    );

    let cache_result: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;

    match cache_result {
        Some(cached_response) => {
            return cached_response;
        }
        None => {
            info!(cache_key = %hashed_cache_key, "cache miss (POST gateway fetch)");
        }
    }

    let conditions_json: Vec<Value> = conditions
        .iter()
        .map(|c| {
            json!({
                "eq_column": c.eq_column,
                "eq_value": c.eq_value.clone()
            })
        })
        .collect();

    let columns_refs: Vec<&str> = columns_vec.iter().map(|s| s.as_str()).collect();
    let pg_conditions: Vec<Condition> = to_query_conditions(
        &conditions[..],
        force_camel_case_to_snake_case,
        auto_cast_uuid_filter_values_to_text,
    );

    let page_offset: i64 = if current_page < 1 { 1 } else { current_page };
    let calculated_offset: i64 = (page_offset - 1) * page_size + offset;

    let data_result: Result<Vec<Value>, String> = execute_gateway_fetch_data(
        app_state.get_ref(),
        &req,
        &auth.request_id,
        &client_name,
        &table_name,
        columns_refs,
        &pg_conditions,
        conditions_json,
        limit,
        current_page,
        page_size,
        offset,
        calculated_offset,
        sort_options.as_ref(),
        deadpool_requested,
    )
    .await;

    if let Ok(data) = &data_result {
        let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);

        hydrate_cache_and_return_json(
            app_state.clone(),
            hashed_cache_key.clone(),
            vec![json!({"data": normalized_rows.clone()})],
        )
        .await;
    } else {
        error!("Failed to rehydrate cache due to data fetch error");
    }

    match data_result {
        Ok(data) => {
            respond_fetch_ok(
                app_state.clone(),
                &data,
                &hashed_cache_key,
                strip_nulls,
                &post_processing_config,
                force_camel_case_to_snake_case,
                &table_name,
                &logged_request,
                operation_start,
            )
            .await
        }
        Err(err) => respond_fetch_err(
            err,
            app_state.clone(),
            &table_name,
            &client_name,
            &user_id,
            &hashed_cache_key,
            start_time,
            operation_start,
            &logged_request,
        ),
    }
}

#[post("/gateway/data")]
/// POST entry point that proxies to `handle_fetch_data_route`.
pub async fn fetch_data_route(
    req: HttpRequest,
    body: Option<Json<Value>>,
    app_state: Data<AppState>,
) -> HttpResponse {
    handle_fetch_data_route(req, body, app_state).await
}

#[post("/gateway/fetch")]
/// Legacy POST route for `/gateway/fetch` that mirrors `/gateway/data`.
pub async fn proxy_fetch_data_route(
    req: HttpRequest,
    body: Option<Json<Value>>,
    app_state: Data<AppState>,
) -> HttpResponse {
    handle_fetch_data_route(req, body, app_state).await
}