mod parser;
use actix_web::{
HttpRequest, HttpResponse, delete, get, patch, post,
web::{Data, Json, Path},
};
use serde_json::{Value, json};
use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{
authorize_gateway_request, delete_right_for_resource, read_right_for_resource,
write_right_for_resource,
};
use crate::api::gateway::postgrest::parser::{
FilterOperator, PostgrestFilter, parse_postgrest_query,
};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::drivers::postgresql::sqlx_driver::{
PostgresInsertError, delete_rows, fetch_rows_with_columns_with_or_groups, insert_row,
insert_rows_bulk, update_rows, upsert_row,
};
use crate::parser::query_builder::{Condition, ConditionOperator};
use crate::utils::request_logging::{LoggedRequest, log_request};
#[get("/rest/v1/{table}")]
pub async fn postgrest_get_route(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> HttpResponse {
let table_name = path.into_inner();
let client_name = match require_client_header(&req) {
Ok(name) => name,
Err(resp) => return resp,
};
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&table_name))],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let query = match parse_postgrest_query(
&table_name,
&req,
app_state.gateway_force_camel_case_to_snake_case,
) {
Ok(parsed) => parsed,
Err(err) => {
return HttpResponse::BadRequest().json(json!({ "error": err }));
}
};
let limit = query.limit.unwrap_or(100).max(1);
let offset = query.offset.unwrap_or(0).max(0);
let pool = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let columns_refs: Vec<&str> = query.columns.iter().map(String::as_str).collect();
let and_conditions = convert_filters(
&query.filters,
app_state.gateway_auto_cast_uuid_filter_values_to_text,
);
let or_condition_groups = convert_or_filter_groups(
&query.or_filters,
app_state.gateway_auto_cast_uuid_filter_values_to_text,
);
let order_by = query
.order
.as_ref()
.map(|spec| (spec.column.as_str(), spec.ascending));
match fetch_rows_with_columns_with_or_groups(
&pool,
&table_name,
&columns_refs,
&and_conditions,
&or_condition_groups,
limit,
offset,
order_by,
app_state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await
{
Ok(rows) => respond_with_content_range(&rows, offset),
Err(err) => HttpResponse::InternalServerError().json(json!({ "error": err.to_string() })),
}
}
#[post("/rest/v1/{table}")]
pub async fn postgrest_post_route(
req: HttpRequest,
path: Path<String>,
body: Json<Value>,
app_state: Data<AppState>,
) -> HttpResponse {
let table_name = path.into_inner();
let client_name = match require_client_header(&req) {
Ok(name) => name,
Err(resp) => return resp,
};
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(Some(&table_name))],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let pool = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let prefer_header = prefer_header_value(&req).unwrap_or_default();
let upsert = prefer_header.contains("resolution=merge-duplicates");
let minimal = prefer_header.contains("return=minimal");
let payload = body.into_inner();
let inserted_rows_result: Result<Vec<Value>, HttpResponse> = if payload.is_array() {
let rows = payload.as_array().cloned().unwrap_or_default();
match insert_rows_bulk(&pool, &table_name, &rows).await {
Ok(rows) => Ok(rows),
Err(err) => Err(map_postgres_insert_error(err)),
}
} else if upsert {
let conflict_column = get_resource_id_key(&table_name).await;
match upsert_row(&pool, &table_name, &payload, &conflict_column).await {
Ok(row) => Ok(vec![row]),
Err(err) => Err(map_postgres_insert_error(err)),
}
} else {
match insert_row(&pool, &table_name, &payload).await {
Ok(row) => Ok(vec![row]),
Err(err) => Err(map_postgres_insert_error(err)),
}
};
let inserted_rows = match inserted_rows_result {
Ok(rows) => rows,
Err(resp) => return resp,
};
let _ = invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;
if minimal {
HttpResponse::NoContent().finish()
} else {
HttpResponse::Ok().json(json!({ "data": inserted_rows }))
}
}
#[patch("/rest/v1/{table}")]
pub async fn postgrest_patch_route(
req: HttpRequest,
path: Path<String>,
body: Json<Value>,
app_state: Data<AppState>,
) -> HttpResponse {
let table_name = path.into_inner();
let client_name = match require_client_header(&req) {
Ok(name) => name,
Err(resp) => return resp,
};
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(Some(&table_name))],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let query = match parse_postgrest_query(
&table_name,
&req,
app_state.gateway_force_camel_case_to_snake_case,
) {
Ok(parsed) => parsed,
Err(err) => return HttpResponse::BadRequest().json(json!({ "error": err })),
};
let conditions = convert_filters(
&query.filters,
app_state.gateway_auto_cast_uuid_filter_values_to_text,
);
if conditions.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "filters are required for update"
}));
}
let pool = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let prefer_header = prefer_header_value(&req).unwrap_or_default();
let minimal = prefer_header.contains("return=minimal");
let payload = body.into_inner();
if !payload.is_object() {
return HttpResponse::BadRequest().json(json!({
"error": "patch body must be an object"
}));
}
let update_result = update_rows(&pool, &table_name, &conditions, &payload).await;
let updated_rows = match update_result {
Ok(rows) => rows,
Err(err) => {
return HttpResponse::InternalServerError().json(json!({
"error": err.to_string()
}));
}
};
let _ = invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;
if minimal {
HttpResponse::NoContent().finish()
} else {
HttpResponse::Ok().json(json!({ "data": updated_rows }))
}
}
#[delete("/rest/v1/{table}")]
pub async fn postgrest_delete_route(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> HttpResponse {
let table_name = path.into_inner();
let client_name = match require_client_header(&req) {
Ok(name) => name,
Err(resp) => return resp,
};
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![delete_right_for_resource(Some(&table_name))],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let query = match parse_postgrest_query(
&table_name,
&req,
app_state.gateway_force_camel_case_to_snake_case,
) {
Ok(parsed) => parsed,
Err(err) => return HttpResponse::BadRequest().json(json!({ "error": err })),
};
let conditions = convert_filters(
&query.filters,
app_state.gateway_auto_cast_uuid_filter_values_to_text,
);
if conditions.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "filters are required for delete"
}));
}
let pool = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let prefer_header = prefer_header_value(&req).unwrap_or_default();
let minimal = prefer_header.contains("return=minimal");
let delete_result = delete_rows(&pool, &table_name, &conditions).await;
let deleted_rows = match delete_result {
Ok(rows) => rows,
Err(err) => {
return HttpResponse::InternalServerError().json(json!({
"error": err.to_string()
}));
}
};
let _ = invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;
if minimal {
HttpResponse::NoContent().finish()
} else {
HttpResponse::Ok().json(json!({ "data": deleted_rows }))
}
}
fn convert_filters(
filters: &[PostgrestFilter],
auto_cast_uuid_filter_values_to_text: bool,
) -> Vec<Condition> {
filters
.iter()
.filter_map(|filter| {
convert_filter_to_condition(filter, auto_cast_uuid_filter_values_to_text)
})
.collect()
}
fn convert_or_filter_groups(
or_groups: &[Vec<PostgrestFilter>],
auto_cast_uuid_filter_values_to_text: bool,
) -> Vec<Vec<Condition>> {
or_groups
.iter()
.map(|group| convert_filters(group, auto_cast_uuid_filter_values_to_text))
.filter(|group| !group.is_empty())
.collect()
}
fn map_filter_operator(op: FilterOperator) -> ConditionOperator {
match op {
FilterOperator::Eq => ConditionOperator::Eq,
FilterOperator::Neq => ConditionOperator::Neq,
FilterOperator::Gt => ConditionOperator::Gt,
FilterOperator::Lt => ConditionOperator::Lt,
FilterOperator::Gte => ConditionOperator::Gte,
FilterOperator::Lte => ConditionOperator::Lte,
FilterOperator::Like => ConditionOperator::Like,
FilterOperator::ILike => ConditionOperator::ILike,
FilterOperator::Is => ConditionOperator::Is,
FilterOperator::In => ConditionOperator::In,
FilterOperator::Contains => ConditionOperator::Contains,
FilterOperator::Contained => ConditionOperator::Contained,
}
}
fn convert_filter_to_condition(
filter: &PostgrestFilter,
auto_cast_uuid_filter_values_to_text: bool,
) -> Option<Condition> {
Some(
Condition::new(
filter.column.clone(),
map_filter_operator(filter.operator),
filter.values.clone(),
filter.negated,
)
.with_uuid_value_text_cast(auto_cast_uuid_filter_values_to_text),
)
}
fn prefer_header_value(req: &HttpRequest) -> Option<String> {
req.headers()
.get("Prefer")
.and_then(|value| value.to_str().ok())
.map(|value| value.to_lowercase())
}
fn require_client_header(req: &HttpRequest) -> Result<String, HttpResponse> {
let client_name = x_athena_client(req);
if client_name.is_empty() {
Err(HttpResponse::BadRequest().json(json!({
"error": "X-Athena-Client header is required"
})))
} else {
Ok(client_name)
}
}
fn map_postgres_insert_error(err: PostgresInsertError) -> HttpResponse {
HttpResponse::InternalServerError().json(json!({
"error": format!("failed to insert rows: {:?}", err)
}))
}
fn respond_with_content_range(rows: &[Value], offset: i64) -> HttpResponse {
let end = if rows.is_empty() {
offset.saturating_sub(1)
} else {
offset + (rows.len() as i64) - 1
};
HttpResponse::Ok()
.insert_header(("Content-Range", format!("items {}-{}/{}", offset, end, "*")))
.json(json!({ "data": rows }))
}