athena_rs 3.18.0

Hyper performant polyglot Database driver
Documentation
//! GET `/data` route: maps query params to an internal call to the fetch handler.

use actix_web::HttpRequest;
use actix_web::http::StatusCode;
use actix_web::web;
use actix_web::web::Data;
use actix_web::{HttpResponse, Responder, get};
use athena_gateway::{
    GatewayGetFetchCompatibilityError, build_gateway_get_fetch_compatibility_plan,
};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::time::Instant;

use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{
    GatewayAuthOutcome, authorize_gateway_request, read_right_for_resource,
};
use crate::api::gateway::contracts::GatewayOperationKind;
use crate::api::gateway::response::{gateway_bad_request, gateway_internal_error};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::utils::cache_key_token::cache_key_client_token;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};

use super::response::missing_client_header_response;

#[get("/data")]
/// GET entry point that translates query parameters into a POST body before delegating.
pub async fn get_data_route(
    req: HttpRequest,
    query: web::Query<HashMap<String, String>>,
    app_state: Data<AppState>,
) -> impl Responder {
    let start_time: Instant = Instant::now();
    let client_name: String = x_athena_client(&req.clone());
    if client_name.is_empty() {
        return missing_client_header_response();
    }
    let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;

    let view: String = match query.get("view") {
        Some(v) => v.clone(),
        None => {
            let auth: GatewayAuthOutcome = authorize_gateway_request(
                &req,
                app_state.get_ref(),
                Some(&client_name),
                vec![read_right_for_resource(None)],
            )
            .await;
            let _logged_request: LoggedRequest = log_request(
                req.clone(),
                Some(app_state.get_ref()),
                Some(auth.request_id.clone()),
                Some(&auth.log_context),
            );
            if let Some(resp) = auth.response {
                return resp;
            }
            return gateway_bad_request(
                GatewayOperationKind::Fetch,
                "Missing required parameter",
                "view",
            );
        }
    };
    let auth: GatewayAuthOutcome = authorize_gateway_request(
        &req,
        app_state.get_ref(),
        Some(&client_name),
        vec![read_right_for_resource(Some(&view))],
    )
    .await;
    if let Some(resp) = auth.response {
        return resp;
    }

    let compatibility_plan = match build_gateway_get_fetch_compatibility_plan(
        &query,
        &client_name,
        force_camel_case_to_snake_case,
    ) {
        Ok(plan) => plan,
        Err(GatewayGetFetchCompatibilityError::MissingView) => unreachable!("view handled above"),
        Err(err) => {
            return gateway_bad_request(
                GatewayOperationKind::Fetch,
                "Missing required parameter",
                err.parameter_name(),
            );
        }
    };
    let legacy_cache_key = compatibility_plan.legacy_cache_key.clone();
    let hashed_cache_key = compatibility_plan.hashed_cache_key.clone();
    let legacy_hashed_cache_key = compatibility_plan.legacy_hashed_cache_key.clone();

    let cache_result_hashed: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;

    if let Some(cached_response) = cache_result_hashed {
        return cached_response;
    }

    if legacy_hashed_cache_key != hashed_cache_key {
        let cache_result_legacy_hashed: Option<HttpResponse> =
            check_cache_control_and_get_response_v2(
                &req,
                app_state.clone(),
                &legacy_hashed_cache_key,
            )
            .await;

        if let Some(cached_response) = cache_result_legacy_hashed {
            tracing::info!(
                cache_key = %hashed_cache_key,
                legacy_hashed_cache_key = %legacy_hashed_cache_key,
                duration_ms = %start_time.elapsed().as_millis(),
                "cache hit (GET, legacy hashed)"
            );
            return cached_response;
        }
    }

    let cache_result_legacy: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, app_state.clone(), &legacy_cache_key).await;

    if let Some(cached_response) = cache_result_legacy {
        tracing::info!(cache_key = %legacy_cache_key, duration_ms = %start_time.elapsed().as_millis(), "cache hit (GET, legacy)");
        return cached_response;
    }

    let logged_request: LoggedRequest = log_request(
        req.clone(),
        Some(app_state.get_ref()),
        Some(auth.request_id.clone()),
        Some(&auth.log_context),
    );

    let fetch_response: HttpResponse = super::routes::handle_fetch_data_route(
        req,
        Some(web::Json(compatibility_plan.request_body)),
        app_state.clone(),
    )
    .await;

    if fetch_response.status().is_success() {
        let body_bytes: web::Bytes = actix_web::body::to_bytes(fetch_response.into_body())
            .await
            .unwrap_or_default();
        if let Ok(parsed) = serde_json::from_slice::<Value>(&body_bytes) {
            hydrate_cache_and_return_json(
                app_state.clone(),
                hashed_cache_key.clone(),
                vec![json!({"data": parsed.clone()})],
            )
            .await;
            hydrate_cache_and_return_json(
                app_state.clone(),
                legacy_cache_key.clone(),
                vec![json!({"data": parsed.clone()})],
            )
            .await;

            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "fetch_get",
                Some(&compatibility_plan.view),
                start_time.elapsed().as_millis(),
                StatusCode::OK,
                Some(json!({
                    "legacy_cache_key": legacy_cache_key,
                    "hash": cache_key_client_token(&hashed_cache_key)
                })),
            );

            return HttpResponse::Ok().json(parsed);
        }
    }

    log_operation_event(
        Some(app_state.get_ref()),
        &logged_request,
        "fetch_get",
        Some(&compatibility_plan.view),
        start_time.elapsed().as_millis(),
        StatusCode::INTERNAL_SERVER_ERROR,
        Some(json!({ "error": "fetch delegation failed" })),
    );
    gateway_internal_error(
        GatewayOperationKind::Fetch,
        "Failed to process request",
        "fetch delegation failed",
    )
}