use actix_web::{HttpRequest, HttpResponse, post, web::Data, web::Json};
use serde_json::{Value, json};
use std::time::Instant;
use crate::AppState;
use crate::api::gateway::contracts::{GatewayDeleteRequest, GatewaySqlRequest};
use crate::api::gateway::delete::handle_delete_data;
use crate::api::gateway::fetch::{handle_fetch_data_route, handle_gateway_update_route};
use crate::api::gateway::insert::handle_insert_data;
use crate::api::gateway::query::handle_gateway_query_route;
use crate::api::headers::request_context::{set_disallow_jdbc_routing, set_resolved_athena_client};
use crate::api::response::{bad_request, not_found, service_unavailable};
use crate::data::public_routes::get_public_gateway_route_by_key;
use crate::utils::request_logging::{
LoggedRequest, RouteRequestLogContext, log_operation_event, log_request,
log_route_request_event,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PublicRouteOp {
Query,
Fetch,
Insert,
Update,
Delete,
}
impl PublicRouteOp {
fn as_str(self) -> &'static str {
match self {
Self::Query => "query",
Self::Fetch => "fetch",
Self::Insert => "insert",
Self::Update => "update",
Self::Delete => "delete",
}
}
fn parse(raw: &str) -> Option<Self> {
match raw.trim().to_ascii_lowercase().as_str() {
"query" => Some(Self::Query),
"fetch" => Some(Self::Fetch),
"insert" => Some(Self::Insert),
"update" => Some(Self::Update),
"delete" => Some(Self::Delete),
_ => None,
}
}
}
fn invalid_op_response(op: &str) -> HttpResponse {
bad_request(
"Invalid public route operation",
format!(
"Unsupported operation '{}'. Use one of: query, fetch, insert, update, delete.",
op
),
)
}
fn log_public_route_request(
app_state: &AppState,
logged_request: &LoggedRequest,
host: Option<String>,
route_key: Option<String>,
public_op: Option<String>,
resolved_client: Option<String>,
outcome: &str,
status_code: u16,
duration_ms: u128,
error_message: Option<String>,
metadata: Value,
) {
log_route_request_event(
Some(app_state),
RouteRequestLogContext {
request_id: Some(logged_request.request_id.clone()),
source: Some("public_route".to_string()),
outcome: Some(outcome.to_string()),
method: Some(logged_request.method.clone()),
path: Some(logged_request.path.clone()),
host,
route_key,
public_op,
resolved_client,
status_code: Some(status_code),
duration_ms: Some(duration_ms),
error_message,
metadata: Some(metadata),
..Default::default()
},
);
}
#[post("/public/{route_key}/{op}")]
pub async fn public_route_dispatch(
req: HttpRequest,
path: actix_web::web::Path<(String, String)>,
body: Json<Value>,
app_state: Data<AppState>,
) -> HttpResponse {
let started = Instant::now();
let logged_request: LoggedRequest =
log_request(req.clone(), Some(app_state.get_ref()), None, None);
let host = req
.headers()
.get("Host")
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let (route_key, raw_op) = path.into_inner();
let normalized_key = route_key.trim().to_ascii_lowercase();
if normalized_key.is_empty() {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"invalid_key",
started.elapsed().as_secs_f64(),
);
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
None,
Some(raw_op.clone()),
None,
"invalid_route_key",
400,
started.elapsed().as_millis(),
Some("Route key must not be empty.".to_string()),
json!({}),
);
return bad_request(
"Invalid route key",
"Route key must not be empty in /public/{route_key}/{op}.",
);
}
let op = match PublicRouteOp::parse(&raw_op) {
Some(value) => value,
None => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"invalid_op",
started.elapsed().as_secs_f64(),
);
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(normalized_key.clone()),
Some(raw_op.clone()),
None,
"invalid_op",
400,
started.elapsed().as_millis(),
Some(format!("Unsupported operation '{}'.", raw_op)),
json!({}),
);
return invalid_op_response(&raw_op);
}
};
let logging_client = match app_state.logging_client_name.as_ref() {
Some(name) => name,
None => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"catalog_unavailable",
started.elapsed().as_secs_f64(),
);
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(normalized_key.clone()),
Some(op.as_str().to_string()),
None,
"catalog_unavailable",
503,
started.elapsed().as_millis(),
Some("No logging client configured for public route lookup.".to_string()),
json!({}),
);
return service_unavailable(
"Route registry unavailable",
"No logging client configured for public route lookup.",
);
}
};
let pool = match app_state.pg_registry.get_pool(logging_client) {
Some(pool) => pool,
None => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"catalog_unavailable",
started.elapsed().as_secs_f64(),
);
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(normalized_key.clone()),
Some(op.as_str().to_string()),
None,
"catalog_pool_unavailable",
503,
started.elapsed().as_millis(),
Some(format!(
"Logging client '{}' is not connected.",
logging_client
)),
json!({}),
);
return service_unavailable(
"Route registry unavailable",
format!("Logging client '{}' is not connected.", logging_client),
);
}
};
let route = match get_public_gateway_route_by_key(&pool, &normalized_key).await {
Ok(Some(record)) => record,
Ok(None) => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"route_not_found",
started.elapsed().as_secs_f64(),
);
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(normalized_key.clone()),
Some(op.as_str().to_string()),
None,
"route_not_found",
404,
started.elapsed().as_millis(),
Some(format!("No active public route '{}'.", normalized_key)),
json!({}),
);
return not_found(
"Public route not found",
format!("No active public route '{}'.", normalized_key),
);
}
Err(err) => {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"lookup_failed",
started.elapsed().as_secs_f64(),
);
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(normalized_key.clone()),
Some(op.as_str().to_string()),
None,
"lookup_failed",
500,
started.elapsed().as_millis(),
Some(err.to_string()),
json!({}),
);
return HttpResponse::InternalServerError().json(json!({
"status": "error",
"message": "Failed to resolve public route",
"error": err.to_string(),
}));
}
};
if !route.is_active {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"route_inactive",
started.elapsed().as_secs_f64(),
);
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(route.route_key.clone()),
Some(op.as_str().to_string()),
Some(route.client_name.clone()),
"route_inactive",
403,
started.elapsed().as_millis(),
Some(format!("Route '{}' is disabled.", route.route_key)),
json!({}),
);
return HttpResponse::Forbidden().json(json!({
"status": "error",
"code": "route_inactive",
"message": "Public route is inactive",
"error": format!("Route '{}' is disabled.", route.route_key),
}));
}
let allowed = route
.allowed_ops
.iter()
.any(|allowed| allowed.eq_ignore_ascii_case(op.as_str()));
if !allowed {
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"op_not_allowed",
started.elapsed().as_secs_f64(),
);
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(route.route_key.clone()),
Some(op.as_str().to_string()),
Some(route.client_name.clone()),
"op_not_allowed",
403,
started.elapsed().as_millis(),
Some(format!(
"Route '{}' does not allow '{}'.",
route.route_key,
op.as_str()
)),
json!({
"allowed_ops": route.allowed_ops
}),
);
return HttpResponse::Forbidden().json(json!({
"status": "error",
"code": "op_not_allowed",
"message": "Public route operation not allowed",
"error": format!(
"Route '{}' does not allow '{}'.",
route.route_key,
op.as_str()
),
}));
}
set_resolved_athena_client(&req, route.client_name.clone());
set_disallow_jdbc_routing(&req);
app_state.metrics_state.record_management_mutation(
"public_route_lookup",
"hit",
started.elapsed().as_secs_f64(),
);
let response = match op {
PublicRouteOp::Query => {
let parsed: GatewaySqlRequest = match serde_json::from_value(body.0.clone()) {
Ok(v) => v,
Err(_) => {
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(route.route_key.clone()),
Some(op.as_str().to_string()),
Some(route.client_name.clone()),
"invalid_query_payload",
400,
started.elapsed().as_millis(),
Some("Expected body shape: { \"query\": \"...\" }".to_string()),
json!({}),
);
return bad_request(
"Invalid query payload",
"Expected body shape: { \"query\": \"...\" }",
);
}
};
handle_gateway_query_route(req, parsed, app_state.clone()).await
}
PublicRouteOp::Fetch => {
handle_fetch_data_route(req, Some(Json(body.0.clone())), app_state.clone()).await
}
PublicRouteOp::Insert => handle_insert_data(req, body.0.clone(), app_state.clone()).await,
PublicRouteOp::Update => {
handle_gateway_update_route(req, Some(Json(body.0.clone())), app_state.clone()).await
}
PublicRouteOp::Delete => {
let parsed: GatewayDeleteRequest = match serde_json::from_value(body.0.clone()) {
Ok(v) => v,
Err(_) => {
log_public_route_request(
app_state.get_ref(),
&logged_request,
host.clone(),
Some(route.route_key.clone()),
Some(op.as_str().to_string()),
Some(route.client_name.clone()),
"invalid_delete_payload",
400,
started.elapsed().as_millis(),
Some(
"Expected body shape: { \"table_name\": \"...\", \"resource_id\": \"...\" }"
.to_string(),
),
json!({}),
);
return bad_request(
"Invalid delete payload",
"Expected body shape: { \"table_name\": \"...\", \"resource_id\": \"...\" }",
);
}
};
handle_delete_data(req, parsed, app_state.clone()).await
}
};
let status = if response.status().is_success() {
"forwarded_ok"
} else {
"forwarded_error"
};
log_public_route_request(
app_state.get_ref(),
&logged_request,
host,
Some(route.route_key.clone()),
Some(op.as_str().to_string()),
Some(route.client_name.clone()),
status,
response.status().as_u16(),
started.elapsed().as_millis(),
if response.status().is_success() {
None
} else {
Some(format!(
"Public route dispatch returned HTTP {}.",
response.status().as_u16()
))
},
json!({
"allowed_ops": route.allowed_ops.clone(),
"public_op": op.as_str()
}),
);
app_state.metrics_state.record_management_mutation(
"public_route_dispatch",
status,
started.elapsed().as_secs_f64(),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"public_route_dispatch",
None,
started.elapsed().as_millis(),
response.status(),
Some(json!({
"route_key": route.route_key,
"resolved_client_name": route.client_name,
"public_op": op.as_str(),
})),
);
response
}