use actix_web::{HttpRequest, HttpResponse, post, web::Data, web::Json};
use serde_json::{Value, json};
use std::time::Instant;
use crate::AppState;
use crate::api::gateway::delete::{DeleteRequest, handle_delete_data};
use crate::api::gateway::fetch::{handle_fetch_data_route, handle_gateway_update_route};
use crate::api::gateway::insert::handle_insert_data;
use crate::api::gateway::query::{GatewayQueryRequest, handle_gateway_query_route};
use crate::api::headers::request_context::{set_disallow_jdbc_routing, set_resolved_athena_client};
use crate::api::response::{bad_request, not_found, service_unavailable};
use crate::data::public_routes::get_public_gateway_route_by_key;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PublicRouteOp {
Query,
Fetch,
Insert,
Update,
Delete,
}
impl PublicRouteOp {
fn as_str(self) -> &'static str {
match self {
Self::Query => "query",
Self::Fetch => "fetch",
Self::Insert => "insert",
Self::Update => "update",
Self::Delete => "delete",
}
}
fn parse(raw: &str) -> Option<Self> {
match raw.trim().to_ascii_lowercase().as_str() {
"query" => Some(Self::Query),
"fetch" => Some(Self::Fetch),
"insert" => Some(Self::Insert),
"update" => Some(Self::Update),
"delete" => Some(Self::Delete),
_ => None,
}
}
}
fn invalid_op_response(op: &str) -> HttpResponse {
bad_request(
"Invalid public route operation",
format!(
"Unsupported operation '{}'. Use one of: query, fetch, insert, update, delete.",
op
),
)
}
#[post("/public/{route_key}/{op}")]
pub async fn public_route_dispatch(
req: HttpRequest,
path: actix_web::web::Path<(String, String)>,
body: Json<Value>,
app_state: Data<AppState>,
) -> HttpResponse {
let started = Instant::now();
let logged_request: LoggedRequest =
log_request(req.clone(), Some(app_state.get_ref()), None, None);
let (route_key, op) = path.into_inner();
let normalized_key = route_key.trim().to_ascii_lowercase();
if normalized_key.is_empty() {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"invalid_key",
started.elapsed().as_secs_f64(),
);
return bad_request(
"Invalid route key",
"Route key must not be empty in /public/{route_key}/{op}.",
);
}
let op = match PublicRouteOp::parse(&op) {
Some(value) => value,
None => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"invalid_op",
started.elapsed().as_secs_f64(),
);
return invalid_op_response(&op);
}
};
let logging_client = match app_state.logging_client_name.as_ref() {
Some(name) => name,
None => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"catalog_unavailable",
started.elapsed().as_secs_f64(),
);
return service_unavailable(
"Route registry unavailable",
"No logging client configured for public route lookup.",
);
}
};
let pool = match app_state.pg_registry.get_pool(logging_client) {
Some(pool) => pool,
None => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"catalog_unavailable",
started.elapsed().as_secs_f64(),
);
return service_unavailable(
"Route registry unavailable",
format!("Logging client '{}' is not connected.", logging_client),
);
}
};
let route = match get_public_gateway_route_by_key(&pool, &normalized_key).await {
Ok(Some(record)) => record,
Ok(None) => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"route_not_found",
started.elapsed().as_secs_f64(),
);
return not_found(
"Public route not found",
format!("No active public route '{}'.", normalized_key),
);
}
Err(err) => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"lookup_failed",
started.elapsed().as_secs_f64(),
);
return HttpResponse::InternalServerError().json(json!({
"status": "error",
"message": "Failed to resolve public route",
"error": err.to_string(),
}));
}
};
if !route.is_active {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"route_inactive",
started.elapsed().as_secs_f64(),
);
return HttpResponse::Forbidden().json(json!({
"status": "error",
"code": "route_inactive",
"message": "Public route is inactive",
"error": format!("Route '{}' is disabled.", route.route_key),
}));
}
let allowed = route
.allowed_ops
.iter()
.any(|allowed| allowed.eq_ignore_ascii_case(op.as_str()));
if !allowed {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"op_not_allowed",
started.elapsed().as_secs_f64(),
);
return HttpResponse::Forbidden().json(json!({
"status": "error",
"code": "op_not_allowed",
"message": "Public route operation not allowed",
"error": format!(
"Route '{}' does not allow '{}'.",
route.route_key,
op.as_str()
),
}));
}
set_resolved_athena_client(&req, route.client_name.clone());
set_disallow_jdbc_routing(&req);
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"hit",
started.elapsed().as_secs_f64(),
);
let response = match op {
PublicRouteOp::Query => {
let parsed: GatewayQueryRequest = match serde_json::from_value(body.0.clone()) {
Ok(v) => v,
Err(_) => {
return bad_request(
"Invalid query payload",
"Expected body shape: { \"query\": \"...\" }",
);
}
};
handle_gateway_query_route(req, parsed, app_state.clone()).await
}
PublicRouteOp::Fetch => {
handle_fetch_data_route(req, Some(Json(body.0.clone())), app_state.clone()).await
}
PublicRouteOp::Insert => handle_insert_data(req, body.0.clone(), app_state.clone()).await,
PublicRouteOp::Update => {
handle_gateway_update_route(req, Some(Json(body.0.clone())), app_state.clone()).await
}
PublicRouteOp::Delete => {
let parsed: DeleteRequest = match serde_json::from_value(body.0.clone()) {
Ok(v) => v,
Err(_) => {
return bad_request(
"Invalid delete payload",
"Expected body shape: { \"table_name\": \"...\", \"resource_id\": \"...\" }",
);
}
};
handle_delete_data(req, parsed, app_state.clone()).await
}
};
let status = if response.status().is_success() {
"forwarded_ok"
} else {
"forwarded_error"
};
app_state.metrics_state.record_management_mutation(
"public_route_dispatch",
status,
started.elapsed().as_secs_f64(),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"public_route_dispatch",
None,
started.elapsed().as_millis(),
response.status(),
Some(json!({
"route_key": route.route_key,
"resolved_client_name": route.client_name,
"public_op": op.as_str(),
})),
);
response
}