mod parser;
use actix_web::{
HttpRequest, HttpResponse, delete, get, patch, post,
web::{Data, Json, Path},
};
use serde_json::{Value, json};
use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{
GatewayAuthOutcome, authorize_gateway_request, delete_right_for_resource,
read_right_for_resource, write_right_for_resource,
};
use crate::api::gateway::postgrest::parser::{
FilterOperator, PostgrestFilter, parse_postgrest_query,
};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::drivers::postgresql::sqlx_driver::{
PostgresInsertError, delete_rows, fetch_rows_with_columns_with_or_groups, insert_row,
insert_rows_bulk, update_rows, upsert_row,
};
use crate::parser::query_builder::{Condition, ConditionOperator};
use crate::utils::request_logging::{LoggedRequest, log_request};
#[deprecated(
note = "This endpoint will be deprecated soon. Please migrate to the new gateway endpoints under /gateway* endpoints"
)]
#[get("/rest/v1/{table}")]
pub async fn postgrest_get_route(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> HttpResponse {
let table_name: String = path.into_inner();
let client_name: String = match require_client_header(&req) {
Ok(name) => name,
Err(resp) => return resp,
};
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let query: parser::PostgrestQuery = match parse_postgrest_query(
&table_name,
&req,
app_state.gateway_force_camel_case_to_snake_case,
) {
Ok(parsed) => parsed,
Err(err) => {
return HttpResponse::BadRequest().json(json!({ "error": err }));
}
};
let limit: i64 = query.limit.unwrap_or(100).max(1);
let offset: i64 = query.offset.unwrap_or(0).max(0);
let pool = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let columns_refs: Vec<&str> = query.columns.iter().map(String::as_str).collect();
let and_conditions: Vec<Condition> = convert_filters(
&query.filters,
app_state.gateway_auto_cast_uuid_filter_values_to_text,
);
let or_condition_groups: Vec<Vec<Condition>> = convert_or_filter_groups(
&query.or_filters,
app_state.gateway_auto_cast_uuid_filter_values_to_text,
);
let order_by: Option<(&str, bool)> = query
.order
.as_ref()
.map(|spec| (spec.column.as_str(), spec.ascending));
match fetch_rows_with_columns_with_or_groups(
&pool,
&table_name,
&columns_refs,
&and_conditions,
&or_condition_groups,
limit,
offset,
order_by,
app_state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await
{
Ok(rows) => {
let response_body: Value = json!({ "data": rows.clone() });
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
crate::webhooks::ROUTE_REST_GET,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
None,
Some(response_body),
),
);
respond_with_content_range(&rows, offset)
}
Err(err) => HttpResponse::InternalServerError().json(json!({ "error": err.to_string() })),
}
}
#[deprecated(
note = "This endpoint will be deprecated soon. Please migrate to the new gateway endpoints under /api/v2"
)]
#[post("/rest/v1/{table}")]
pub async fn postgrest_post_route(
req: HttpRequest,
path: Path<String>,
body: Json<Value>,
app_state: Data<AppState>,
) -> HttpResponse {
let table_name: String = path.into_inner();
let client_name: String = match require_client_header(&req) {
Ok(name) => name,
Err(resp) => return resp,
};
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(Some(&table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let pool: sqlx::Pool<sqlx::Postgres> = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let prefer_header: String = prefer_header_value(&req).unwrap_or_default();
let upsert: bool = prefer_header.contains("resolution=merge-duplicates");
let minimal: bool = prefer_header.contains("return=minimal");
let payload: Value = body.into_inner();
let inserted_rows_result: Result<Vec<Value>, HttpResponse> = if payload.is_array() {
let rows = payload.as_array().cloned().unwrap_or_default();
match insert_rows_bulk(&pool, &table_name, &rows).await {
Ok(rows) => Ok(rows),
Err(err) => Err(map_postgres_insert_error(err)),
}
} else if upsert {
let conflict_column = get_resource_id_key(&table_name).await;
match upsert_row(&pool, &table_name, &payload, &conflict_column).await {
Ok(row) => Ok(vec![row]),
Err(err) => Err(map_postgres_insert_error(err)),
}
} else {
match insert_row(&pool, &table_name, &payload).await {
Ok(row) => Ok(vec![row]),
Err(err) => Err(map_postgres_insert_error(err)),
}
};
let inserted_rows: Vec<Value> = match inserted_rows_result {
Ok(rows) => rows,
Err(resp) => return resp,
};
let _ = invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;
let rest_response: Value = if minimal {
json!({ "return": "minimal", "count": inserted_rows.len() })
} else {
json!({ "data": inserted_rows })
};
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
crate::webhooks::ROUTE_REST_POST,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
Some(payload.clone()),
Some(rest_response.clone()),
),
);
if minimal {
HttpResponse::NoContent().finish()
} else {
HttpResponse::Ok().json(rest_response)
}
}
#[deprecated(
note = "This endpoint will be deprecated soon. Please migrate to the new gateway endpoints under /gateway * endpoints"
)]
#[patch("/rest/v1/{table}")]
pub async fn postgrest_patch_route(
req: HttpRequest,
path: Path<String>,
body: Json<Value>,
app_state: Data<AppState>,
) -> HttpResponse {
let table_name: String = path.into_inner();
let client_name: String = match require_client_header(&req) {
Ok(name) => name,
Err(resp) => return resp,
};
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(Some(&table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let query: parser::PostgrestQuery = match parse_postgrest_query(
&table_name,
&req,
app_state.gateway_force_camel_case_to_snake_case,
) {
Ok(parsed) => parsed,
Err(err) => return HttpResponse::BadRequest().json(json!({ "error": err })),
};
let conditions: Vec<Condition> = convert_filters(
&query.filters,
app_state.gateway_auto_cast_uuid_filter_values_to_text,
);
if conditions.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "filters are required for update"
}));
}
let pool: sqlx::Pool<sqlx::Postgres> = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let prefer_header: String = prefer_header_value(&req).unwrap_or_default();
let minimal: bool = prefer_header.contains("return=minimal");
let payload: Value = body.into_inner();
if !payload.is_object() {
return HttpResponse::BadRequest().json(json!({
"error": "patch body must be an object"
}));
}
let update_result: Result<Vec<Value>, anyhow::Error> =
update_rows(&pool, &table_name, &conditions, &payload).await;
let updated_rows: Vec<Value> = match update_result {
Ok(rows) => rows,
Err(err) => {
return HttpResponse::InternalServerError().json(json!({
"error": err.to_string()
}));
}
};
let _ = invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;
let rest_response: Value = if minimal {
json!({ "return": "minimal", "count": updated_rows.len() })
} else {
json!({ "data": updated_rows })
};
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
crate::webhooks::ROUTE_REST_PATCH,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
Some(payload.clone()),
Some(rest_response.clone()),
),
);
if minimal {
HttpResponse::NoContent().finish()
} else {
HttpResponse::Ok().json(rest_response)
}
}
#[deprecated(
note = "This endpoint will be deprecated soon. Please migrate to the new gateway endpoints under /api/v2"
)]
#[delete("/rest/v1/{table}")]
pub async fn postgrest_delete_route(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> HttpResponse {
let table_name: String = path.into_inner();
let client_name: String = match require_client_header(&req) {
Ok(name) => name,
Err(resp) => return resp,
};
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![delete_right_for_resource(Some(&table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let query: parser::PostgrestQuery = match parse_postgrest_query(
&table_name,
&req,
app_state.gateway_force_camel_case_to_snake_case,
) {
Ok(parsed) => parsed,
Err(err) => return HttpResponse::BadRequest().json(json!({ "error": err })),
};
let conditions: Vec<Condition> = convert_filters(
&query.filters,
app_state.gateway_auto_cast_uuid_filter_values_to_text,
);
if conditions.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "filters are required for delete"
}));
}
let pool: sqlx::Pool<sqlx::Postgres> = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let prefer_header: String = prefer_header_value(&req).unwrap_or_default();
let minimal: bool = prefer_header.contains("return=minimal");
let delete_result: Result<Vec<Value>, anyhow::Error> =
delete_rows(&pool, &table_name, &conditions).await;
let deleted_rows: Vec<Value> = match delete_result {
Ok(rows) => rows,
Err(err) => {
return HttpResponse::InternalServerError().json(json!({
"error": err.to_string()
}));
}
};
let _ = invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;
let rest_response: Value = if minimal {
json!({ "return": "minimal", "count": deleted_rows.len() })
} else {
json!({ "data": deleted_rows })
};
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
crate::webhooks::ROUTE_REST_DELETE,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
None,
Some(rest_response.clone()),
),
);
if minimal {
HttpResponse::NoContent().finish()
} else {
HttpResponse::Ok().json(rest_response)
}
}
fn convert_filters(
filters: &[PostgrestFilter],
auto_cast_uuid_filter_values_to_text: bool,
) -> Vec<Condition> {
filters
.iter()
.filter_map(|filter| {
convert_filter_to_condition(filter, auto_cast_uuid_filter_values_to_text)
})
.collect()
}
fn convert_or_filter_groups(
or_groups: &[Vec<PostgrestFilter>],
auto_cast_uuid_filter_values_to_text: bool,
) -> Vec<Vec<Condition>> {
or_groups
.iter()
.map(|group| convert_filters(group, auto_cast_uuid_filter_values_to_text))
.filter(|group| !group.is_empty())
.collect()
}
fn map_filter_operator(op: FilterOperator) -> ConditionOperator {
match op {
FilterOperator::Eq => ConditionOperator::Eq,
FilterOperator::Neq => ConditionOperator::Neq,
FilterOperator::Gt => ConditionOperator::Gt,
FilterOperator::Lt => ConditionOperator::Lt,
FilterOperator::Gte => ConditionOperator::Gte,
FilterOperator::Lte => ConditionOperator::Lte,
FilterOperator::Like => ConditionOperator::Like,
FilterOperator::ILike => ConditionOperator::ILike,
FilterOperator::Is => ConditionOperator::Is,
FilterOperator::In => ConditionOperator::In,
FilterOperator::Contains => ConditionOperator::Contains,
FilterOperator::Contained => ConditionOperator::Contained,
}
}
fn convert_filter_to_condition(
filter: &PostgrestFilter,
auto_cast_uuid_filter_values_to_text: bool,
) -> Option<Condition> {
Some(
Condition::new(
filter.column.clone(),
map_filter_operator(filter.operator),
filter.values.clone(),
filter.negated,
)
.with_uuid_value_text_cast(auto_cast_uuid_filter_values_to_text),
)
}
fn prefer_header_value(req: &HttpRequest) -> Option<String> {
req.headers()
.get("Prefer")
.and_then(|value| value.to_str().ok())
.map(|value| value.to_lowercase())
}
fn require_client_header(req: &HttpRequest) -> Result<String, HttpResponse> {
let client_name: String = x_athena_client(req);
if client_name.is_empty() {
Err(HttpResponse::BadRequest().json(json!({
"error": "X-Athena-Client header is required"
})))
} else {
Ok(client_name)
}
}
fn map_postgres_insert_error(err: PostgresInsertError) -> HttpResponse {
HttpResponse::InternalServerError().json(json!({
"error": format!("failed to insert rows: {:?}", err)
}))
}
fn respond_with_content_range(rows: &[Value], offset: i64) -> HttpResponse {
let end: i64 = if rows.is_empty() {
offset.saturating_sub(1)
} else {
offset + (rows.len() as i64) - 1
};
HttpResponse::Ok()
.insert_header(("Content-Range", format!("items {}-{}/{}", offset, end, "*")))
.json(json!({ "data": rows }))
}