athena_rs 2.12.1

Database gateway API
Documentation
use actix_web::{HttpRequest, HttpResponse, post, web::Data, web::Json};
use serde_json::{Value, json};
use std::time::Instant;

use crate::AppState;
use crate::api::gateway::delete::{DeleteRequest, handle_delete_data};
use crate::api::gateway::fetch::{handle_fetch_data_route, handle_gateway_update_route};
use crate::api::gateway::insert::handle_insert_data;
use crate::api::gateway::query::{GatewayQueryRequest, handle_gateway_query_route};
use crate::api::headers::request_context::{set_disallow_jdbc_routing, set_resolved_athena_client};
use crate::api::response::{bad_request, not_found, service_unavailable};
use crate::data::public_routes::get_public_gateway_route_by_key;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PublicRouteOp {
    Query,
    Fetch,
    Insert,
    Update,
    Delete,
}

impl PublicRouteOp {
    fn as_str(self) -> &'static str {
        match self {
            Self::Query => "query",
            Self::Fetch => "fetch",
            Self::Insert => "insert",
            Self::Update => "update",
            Self::Delete => "delete",
        }
    }

    fn parse(raw: &str) -> Option<Self> {
        match raw.trim().to_ascii_lowercase().as_str() {
            "query" => Some(Self::Query),
            "fetch" => Some(Self::Fetch),
            "insert" => Some(Self::Insert),
            "update" => Some(Self::Update),
            "delete" => Some(Self::Delete),
            _ => None,
        }
    }
}

fn invalid_op_response(op: &str) -> HttpResponse {
    bad_request(
        "Invalid public route operation",
        format!(
            "Unsupported operation '{}'. Use one of: query, fetch, insert, update, delete.",
            op
        ),
    )
}

#[post("/public/{route_key}/{op}")]
pub async fn public_route_dispatch(
    req: HttpRequest,
    path: actix_web::web::Path<(String, String)>,
    body: Json<Value>,
    app_state: Data<AppState>,
) -> HttpResponse {
    let started = Instant::now();
    let logged_request: LoggedRequest =
        log_request(req.clone(), Some(app_state.get_ref()), None, None);
    let (route_key, op) = path.into_inner();
    let normalized_key = route_key.trim().to_ascii_lowercase();
    if normalized_key.is_empty() {
        app_state.metrics_state.record_management_mutation(
            "public_route_lookup",
            "invalid_key",
            started.elapsed().as_secs_f64(),
        );
        return bad_request(
            "Invalid route key",
            "Route key must not be empty in /public/{route_key}/{op}.",
        );
    }

    let op = match PublicRouteOp::parse(&op) {
        Some(value) => value,
        None => {
            app_state.metrics_state.record_management_mutation(
                "public_route_lookup",
                "invalid_op",
                started.elapsed().as_secs_f64(),
            );
            return invalid_op_response(&op);
        }
    };

    let logging_client = match app_state.logging_client_name.as_ref() {
        Some(name) => name,
        None => {
            app_state.metrics_state.record_management_mutation(
                "public_route_lookup",
                "catalog_unavailable",
                started.elapsed().as_secs_f64(),
            );
            return service_unavailable(
                "Route registry unavailable",
                "No logging client configured for public route lookup.",
            );
        }
    };

    let pool = match app_state.pg_registry.get_pool(logging_client) {
        Some(pool) => pool,
        None => {
            app_state.metrics_state.record_management_mutation(
                "public_route_lookup",
                "catalog_unavailable",
                started.elapsed().as_secs_f64(),
            );
            return service_unavailable(
                "Route registry unavailable",
                format!("Logging client '{}' is not connected.", logging_client),
            );
        }
    };

    let route = match get_public_gateway_route_by_key(&pool, &normalized_key).await {
        Ok(Some(record)) => record,
        Ok(None) => {
            app_state.metrics_state.record_management_mutation(
                "public_route_lookup",
                "route_not_found",
                started.elapsed().as_secs_f64(),
            );
            return not_found(
                "Public route not found",
                format!("No active public route '{}'.", normalized_key),
            );
        }
        Err(err) => {
            app_state.metrics_state.record_management_mutation(
                "public_route_lookup",
                "lookup_failed",
                started.elapsed().as_secs_f64(),
            );
            return HttpResponse::InternalServerError().json(json!({
                "status": "error",
                "message": "Failed to resolve public route",
                "error": err.to_string(),
            }));
        }
    };

    if !route.is_active {
        app_state.metrics_state.record_management_mutation(
            "public_route_lookup",
            "route_inactive",
            started.elapsed().as_secs_f64(),
        );
        return HttpResponse::Forbidden().json(json!({
            "status": "error",
            "code": "route_inactive",
            "message": "Public route is inactive",
            "error": format!("Route '{}' is disabled.", route.route_key),
        }));
    }

    let allowed = route
        .allowed_ops
        .iter()
        .any(|allowed| allowed.eq_ignore_ascii_case(op.as_str()));
    if !allowed {
        app_state.metrics_state.record_management_mutation(
            "public_route_lookup",
            "op_not_allowed",
            started.elapsed().as_secs_f64(),
        );
        return HttpResponse::Forbidden().json(json!({
            "status": "error",
            "code": "op_not_allowed",
            "message": "Public route operation not allowed",
            "error": format!(
                "Route '{}' does not allow '{}'.",
                route.route_key,
                op.as_str()
            ),
        }));
    }

    set_resolved_athena_client(&req, route.client_name.clone());
    set_disallow_jdbc_routing(&req);

    app_state.metrics_state.record_management_mutation(
        "public_route_lookup",
        "hit",
        started.elapsed().as_secs_f64(),
    );

    let response = match op {
        PublicRouteOp::Query => {
            let parsed: GatewayQueryRequest = match serde_json::from_value(body.0.clone()) {
                Ok(v) => v,
                Err(_) => {
                    return bad_request(
                        "Invalid query payload",
                        "Expected body shape: { \"query\": \"...\" }",
                    );
                }
            };
            handle_gateway_query_route(req, parsed, app_state.clone()).await
        }
        PublicRouteOp::Fetch => {
            handle_fetch_data_route(req, Some(Json(body.0.clone())), app_state.clone()).await
        }
        PublicRouteOp::Insert => handle_insert_data(req, body.0.clone(), app_state.clone()).await,
        PublicRouteOp::Update => {
            handle_gateway_update_route(req, Some(Json(body.0.clone())), app_state.clone()).await
        }
        PublicRouteOp::Delete => {
            let parsed: DeleteRequest = match serde_json::from_value(body.0.clone()) {
                Ok(v) => v,
                Err(_) => {
                    return bad_request(
                        "Invalid delete payload",
                        "Expected body shape: { \"table_name\": \"...\", \"resource_id\": \"...\" }",
                    );
                }
            };
            handle_delete_data(req, parsed, app_state.clone()).await
        }
    };

    let status = if response.status().is_success() {
        "forwarded_ok"
    } else {
        "forwarded_error"
    };
    app_state.metrics_state.record_management_mutation(
        "public_route_dispatch",
        status,
        started.elapsed().as_secs_f64(),
    );
    log_operation_event(
        Some(app_state.get_ref()),
        &logged_request,
        "public_route_dispatch",
        None,
        started.elapsed().as_millis(),
        response.status(),
        Some(json!({
            "route_key": route.route_key,
            "resolved_client_name": route.client_name,
            "public_op": op.as_str(),
        })),
    );

    response
}