mod auth_rights;
mod delete;
mod fetch;
mod fetch_body_plan;
mod fetch_conditions;
mod fetch_get_compat;
mod fetch_singleflight;
mod postgrest;
mod query_plan;
mod relation_select_compatibility;
mod resource_id;
mod rpc_compat;
mod sql;
mod structured_fetch;
mod structured_fetch_sql;
mod update_plan;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::str::FromStr;
fn sanitize_identifier(identifier: &str) -> Option<String> {
let mut chars = identifier.chars();
let first = chars.next()?;
if !(first.is_ascii_alphabetic() || first == '_') {
return None;
}
if !chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
return None;
}
Some(format!("\"{identifier}\""))
}
pub fn camel_to_snake_case(input: &str) -> String {
let mut snake = String::with_capacity(input.len() * 2);
let mut chars = input.chars().peekable();
let mut previous: Option<char> = None;
while let Some(ch) = chars.next() {
if ch.is_ascii_uppercase() {
if let Some(prev) = previous {
let prev_is_lower_or_digit = prev.is_ascii_lowercase() || prev.is_ascii_digit();
let next_is_lower = chars
.peek()
.map(|next| next.is_ascii_lowercase())
.unwrap_or(false);
if prev_is_lower_or_digit || (prev.is_ascii_uppercase() && next_is_lower) {
snake.push('_');
}
}
snake.push(ch.to_ascii_lowercase());
} else {
snake.push(ch);
}
previous = Some(ch);
}
snake
}
pub fn normalize_column_name(column: &str, force: bool) -> String {
if !force || column == "*" {
return column.to_string();
}
camel_to_snake_case(column)
}
pub use auth_rights::{
delete_right_for_resource, missing_required_rights, query_right, read_right_for_resource,
right_matches, rpc_right, storage_proxy_right, typesense_proxy_right, write_right_for_resource,
};
pub use delete::{
GatewayDeleteRequestPlan, GatewayDeleteRequestPlanError, GatewayDeleteResourceIdPlan,
build_gateway_delete_request_plan, delete_request_required_right,
normalize_delete_resource_id_column_name, normalize_delete_resource_id_lookup_table,
plan_delete_resource_id_resolution,
};
pub use fetch::{
AggregationStrategy, GatewayFetchConditionError, PostProcessingConfig, SortOptions,
TimeGranularity, apply_post_processing, build_fetch_hashed_cache_key,
build_fetch_hashed_cache_key_legacy8, coerce_room_id_eq_value, parse_gateway_fetch_conditions,
parse_room_id_value, parse_sort_options_from_body,
};
pub use fetch_body_plan::{
GatewayFetchBodyPlan, GatewayFetchBodyPlanError, build_gateway_fetch_body_plan,
};
pub use fetch_conditions::{
RequestCondition, merge_column_types_with_stats_fallback, resolve_where_column_types,
to_query_conditions, to_query_conditions_with_types,
};
pub use fetch_get_compat::{
GatewayGetFetchCompatibilityError, GatewayGetFetchCompatibilityPlan,
build_gateway_get_fetch_compatibility_plan,
};
pub use fetch_singleflight::{
GatewayFetchRowsResult, GatewayFetchSingleflight, GatewayFetchSingleflightRole,
GatewayInFlightFetch,
};
pub use postgrest::{
OrderSpec, PostgrestFilter, PostgrestFilterOperator, PostgrestQuery,
convert_postgrest_filter_to_condition, convert_postgrest_filters_to_conditions,
convert_postgrest_or_filter_groups_to_conditions, parse_postgrest_query,
};
pub use query_plan::{
GatewayQueryCompatibilityPlan, GatewayQueryRequestParseError, GatewayQueryRequestPlan,
GatewayQueryRequestPlanError, build_gateway_query_request_plan,
parse_gateway_query_request_body,
};
pub use relation_select_compatibility::{
GatewayRelationSelectRewrite, GatewayRelationSelectTableRef, try_rewrite_relation_select_query,
};
pub use resource_id::{
find_closest_uuid_column, get_resource_id_key_with_uuid_loader,
parse_uuid_columns_from_schema_rows,
};
pub use rpc_compat::{
parse_rpc_argument_value, parse_rpc_filter_expression, parse_rpc_order,
rpc_request_from_get_compat, rpc_request_from_post_compat,
};
pub use sql::{
GatewayApiRequest, GatewayApiRequestPayload, GatewaySqlExecutionMode,
GatewaySqlExecutionRequest, GatewaySqlRequest,
};
pub use structured_fetch::{
StructuredColumnField, StructuredFilter, StructuredFilterOperator, StructuredGatewayFetchPlan,
StructuredJoinKind, StructuredOrderBy, StructuredRelationField, StructuredSelectField,
StructuredSelectOperation, StructuredSelectQuery, StructuredSortDirection,
build_structured_fetch_cache_key, build_structured_fetch_plan,
};
pub use structured_fetch_sql::{execute_structured_fetch_sql, render_structured_fetch_sql};
pub use update_plan::{
GatewayUpdateRequestPlan, GatewayUpdateRequestPlanError, build_gateway_update_request_plan,
};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum GatewayOperationKind {
Fetch,
Insert,
Update,
Delete,
Query,
Rpc,
}
impl GatewayOperationKind {
pub const fn as_str(self) -> &'static str {
match self {
Self::Fetch => "fetch",
Self::Insert => "insert",
Self::Update => "update",
Self::Delete => "delete",
Self::Query => "query",
Self::Rpc => "rpc",
}
}
pub const fn deferred_kind(self) -> &'static str {
match self {
Self::Fetch => "gateway_fetch",
Self::Insert => "gateway_insert",
Self::Update => "gateway_update",
Self::Delete => "gateway_delete",
Self::Query => "gateway_query",
Self::Rpc => "gateway_rpc",
}
}
pub fn from_public_route_op(raw: &str) -> Option<Self> {
match raw.trim().to_ascii_lowercase().as_str() {
"fetch" => Some(Self::Fetch),
"insert" => Some(Self::Insert),
"update" => Some(Self::Update),
"delete" => Some(Self::Delete),
"query" => Some(Self::Query),
"rpc" => Some(Self::Rpc),
_ => None,
}
}
pub fn from_deferred_kind(raw: &str) -> Option<Self> {
match raw.trim().to_ascii_lowercase().as_str() {
"gateway_fetch" => Some(Self::Fetch),
"gateway_insert" => Some(Self::Insert),
"gateway_update" => Some(Self::Update),
"gateway_delete" => Some(Self::Delete),
"gateway_query" => Some(Self::Query),
"gateway_rpc" => Some(Self::Rpc),
_ => None,
}
}
}
impl FromStr for GatewayOperationKind {
type Err = ();
fn from_str(value: &str) -> Result<Self, Self::Err> {
Self::from_public_route_op(value).ok_or(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GatewayRequestCondition {
pub eq_column: String,
pub eq_value: Value,
}
impl GatewayRequestCondition {
pub fn new(eq_column: String, eq_value: Value) -> Self {
Self {
eq_column,
eq_value,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayFetchRequest {
pub table_name: String,
#[serde(default, alias = "schema")]
pub schema_name: Option<String>,
#[serde(default)]
pub columns: Vec<String>,
#[serde(default)]
pub conditions: Vec<GatewayRequestCondition>,
#[serde(default)]
pub current_page: Option<i64>,
#[serde(default)]
pub page_size: Option<i64>,
#[serde(default)]
pub limit: Option<i64>,
#[serde(default)]
pub offset: Option<i64>,
}
impl GatewayFetchRequest {
pub fn from_body(body: &Value, force_camel_case_to_snake_case: bool) -> Self {
let table_name = body
.get("table_name")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let schema_name = schema_name_from_body(body);
let mut columns = parse_columns_from_body(body);
if columns.is_empty() {
columns.push("*".to_string());
}
if force_camel_case_to_snake_case {
columns = columns
.into_iter()
.map(|column| normalize_column_name(&column, true))
.collect();
}
let conditions = parse_conditions_from_body(body);
let current_page = body.get("current_page").and_then(Value::as_i64);
let page_size = body.get("page_size").and_then(Value::as_i64);
let limit = body.get("limit").and_then(Value::as_i64);
let offset = body.get("offset").and_then(Value::as_i64);
Self {
table_name,
schema_name,
columns,
conditions,
current_page,
page_size,
limit,
offset,
}
}
pub fn qualified_table_name(&self) -> Result<String, String> {
qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayInsertRequest {
pub table_name: String,
#[serde(default, alias = "schema")]
pub schema_name: Option<String>,
pub insert_body: Value,
#[serde(default)]
pub update_body: Option<Value>,
}
impl GatewayInsertRequest {
pub fn table_name_from_body(body: &Value) -> Option<String> {
body.get("table_name")
.and_then(Value::as_str)
.map(str::to_string)
.filter(|name| !name.trim().is_empty())
}
pub fn from_body(body: &Value) -> Option<Self> {
let table_name = Self::table_name_from_body(body)?;
let insert_body = Self::insert_body_from_body(body)?;
let update_body = body.get("update_body").cloned();
let schema_name = schema_name_from_body(body);
Some(Self {
table_name,
schema_name,
insert_body,
update_body,
})
}
pub fn insert_body_from_body(body: &Value) -> Option<Value> {
body.get("insert_body")
.cloned()
.or_else(|| body.get("data").cloned())
}
pub fn qualified_table_name(&self) -> Result<String, String> {
qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayUpdateRequest {
pub table_name: String,
#[serde(default, alias = "schema")]
pub schema_name: Option<String>,
#[serde(default)]
pub conditions: Vec<GatewayRequestCondition>,
pub data: Value,
}
impl GatewayUpdateRequest {
pub fn from_body(body: &Value, force_camel_case_to_snake_case: bool) -> Option<Self> {
let table_name = body
.get("table_name")
.and_then(Value::as_str)
.map(str::to_string)
.filter(|name| !name.trim().is_empty())?;
let set_payload = extract_update_payload(body, force_camel_case_to_snake_case)?;
let conditions = parse_conditions_from_body(body);
let schema_name = schema_name_from_body(body);
Some(Self {
table_name,
schema_name,
conditions,
data: Value::Object(set_payload),
})
}
pub fn qualified_table_name(&self) -> Result<String, String> {
qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayDeleteRequest {
pub table_name: String,
#[serde(default, alias = "schema")]
pub schema_name: Option<String>,
pub resource_id: String,
#[serde(default, alias = "resource_id_column", alias = "id_column")]
pub column_name: Option<String>,
}
impl GatewayDeleteRequest {
pub fn qualified_table_name(&self) -> Result<String, String> {
qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum GatewayRpcFilterOperator {
#[serde(rename = "eq")]
Eq,
#[serde(rename = "neq")]
Neq,
#[serde(rename = "gt")]
Gt,
#[serde(rename = "gte")]
Gte,
#[serde(rename = "lt")]
Lt,
#[serde(rename = "lte")]
Lte,
#[serde(rename = "in")]
In,
#[serde(rename = "like")]
Like,
#[serde(rename = "ilike", alias = "i_like")]
ILike,
#[serde(rename = "is")]
Is,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GatewayRpcFilter {
pub column: String,
pub operator: GatewayRpcFilterOperator,
#[serde(default)]
pub value: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GatewayRpcOrder {
pub column: String,
#[serde(default = "default_rpc_order_ascending")]
pub ascending: bool,
}
fn default_rpc_order_ascending() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayRpcRequest {
#[serde(alias = "function_name")]
pub function: String,
#[serde(default = "default_rpc_schema")]
pub schema: String,
#[serde(default = "default_rpc_args")]
pub args: Value,
#[serde(default)]
pub select: Option<String>,
#[serde(default)]
pub filters: Vec<GatewayRpcFilter>,
#[serde(default)]
pub count: Option<String>,
#[serde(default)]
pub limit: Option<i64>,
#[serde(default)]
pub offset: Option<i64>,
#[serde(default)]
pub order: Option<GatewayRpcOrder>,
}
fn default_rpc_schema() -> String {
"public".to_string()
}
fn default_rpc_args() -> Value {
Value::Object(Map::new())
}
pub const GATEWAY_DEFERRED_KIND_QUERY: &str = GatewayOperationKind::Query.deferred_kind();
pub const GATEWAY_DEFERRED_KIND_FETCH: &str = GatewayOperationKind::Fetch.deferred_kind();
pub const GATEWAY_DEFERRED_KIND_INSERT: &str = GatewayOperationKind::Insert.deferred_kind();
pub const GATEWAY_DEFERRED_KIND_UPDATE: &str = GatewayOperationKind::Update.deferred_kind();
pub const GATEWAY_DEFERRED_KIND_DELETE: &str = GatewayOperationKind::Delete.deferred_kind();
pub const GATEWAY_DEFERRED_KIND_RPC: &str = GatewayOperationKind::Rpc.deferred_kind();
pub const GATEWAY_ERROR_CODE_VALIDATION_FAILED: &str = "VALIDATION_FAILED";
pub const GATEWAY_ERROR_CODE_INTERNAL_ERROR: &str = "INTERNAL_ERROR";
pub const GATEWAY_ERROR_CODE_SERVICE_UNAVAILABLE: &str = "SERVICE_UNAVAILABLE";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayDeferredRequest {
pub kind: String,
pub request_id: String,
pub client_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_body: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub requested_at_unix_ms: Option<i64>,
}
impl GatewayDeferredRequest {
pub fn for_query(
request_id: impl Into<String>,
client_name: impl Into<String>,
query: impl Into<String>,
) -> Self {
Self {
kind: GATEWAY_DEFERRED_KIND_QUERY.to_string(),
request_id: request_id.into(),
client_name: client_name.into(),
request_body: None,
query: Some(query.into()),
reason: None,
requested_at_unix_ms: None,
}
}
pub fn for_request_body(
kind: impl Into<String>,
request_id: impl Into<String>,
client_name: impl Into<String>,
request_body: Value,
) -> Self {
Self {
kind: kind.into(),
request_id: request_id.into(),
client_name: client_name.into(),
request_body: Some(request_body),
query: None,
reason: None,
requested_at_unix_ms: None,
}
}
pub fn with_reason(mut self, reason: Option<impl Into<String>>) -> Self {
self.reason = reason.map(|value| value.into());
self
}
pub fn with_requested_at_unix_ms(mut self, requested_at_unix_ms: i64) -> Self {
self.requested_at_unix_ms = Some(requested_at_unix_ms);
self
}
pub fn query_text(&self) -> Option<String> {
if let Some(query) = self.query.as_ref() {
return Some(query.clone());
}
self.request_body
.as_ref()
.and_then(|body| body.get("query"))
.and_then(Value::as_str)
.map(str::to_string)
}
pub fn schema_name(&self) -> Option<String> {
self.request_body.as_ref().and_then(schema_name_from_body)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayRowsMeta {
pub backend: String,
pub statement_count: usize,
pub rows_affected: u64,
pub returned_row_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayRowsResponse {
pub data: Vec<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub meta: Option<GatewayRowsMeta>,
}
impl GatewayRowsResponse {
pub fn new(data: Vec<Value>) -> Self {
Self { data, meta: None }
}
pub fn with_meta(mut self, meta: GatewayRowsMeta) -> Self {
self.meta = Some(meta);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GatewayErrorData {
pub operation: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GatewayErrorEnvelope {
pub status: String,
pub code: String,
pub message: String,
pub error: String,
pub data: GatewayErrorData,
}
impl GatewayErrorEnvelope {
pub fn new(
code: impl Into<String>,
operation: GatewayOperationKind,
message: impl Into<String>,
error: impl Into<String>,
details: Option<Value>,
) -> Self {
Self {
status: "error".to_string(),
code: code.into(),
message: message.into(),
error: error.into(),
data: GatewayErrorData {
operation: operation.as_str().to_string(),
details,
},
}
}
}
pub fn parse_columns_from_body(body: &Value) -> Vec<String> {
let Some(columns_value) = body.get("columns") else {
return Vec::new();
};
if let Some(array) = columns_value.as_array() {
return array
.iter()
.filter_map(|value| value.as_str().map(str::to_string))
.collect();
}
if let Some(csv) = columns_value.as_str() {
return csv
.split(',')
.map(str::trim)
.filter(|token| !token.is_empty())
.map(str::to_string)
.collect();
}
Vec::new()
}
pub fn parse_conditions_from_body(body: &Value) -> Vec<GatewayRequestCondition> {
body.get("conditions")
.and_then(Value::as_array)
.map(|conditions| {
conditions
.iter()
.filter_map(|condition| {
let eq_column = condition
.get("eq_column")
.and_then(Value::as_str)
.map(str::to_string)?;
let eq_value = condition.get("eq_value")?.clone();
Some(GatewayRequestCondition {
eq_column,
eq_value,
})
})
.collect()
})
.unwrap_or_default()
}
pub fn extract_update_payload(
body: &Value,
force_camel_case_to_snake_case: bool,
) -> Option<Map<String, Value>> {
let mut payload = Map::new();
if let Some(columns) = body.get("columns").and_then(Value::as_array) {
for object_value in columns {
if let Some(object) = object_value.as_object() {
for (key, value) in object {
let normalized_key = if force_camel_case_to_snake_case {
normalize_column_name(key, true)
} else {
key.clone()
};
payload.insert(normalized_key, value.clone());
}
}
}
} else if let Some(data) = body.get("data").and_then(Value::as_object) {
for (key, value) in data {
let normalized_key = if force_camel_case_to_snake_case {
normalize_column_name(key, true)
} else {
key.clone()
};
payload.insert(normalized_key, value.clone());
}
} else if let Some(set) = body.get("set").and_then(Value::as_object) {
for (key, value) in set {
let normalized_key = if force_camel_case_to_snake_case {
normalize_column_name(key, true)
} else {
key.clone()
};
payload.insert(normalized_key, value.clone());
}
} else {
return None;
}
if payload.is_empty() {
return None;
}
Some(payload)
}
pub fn schema_name_from_body(body: &Value) -> Option<String> {
body.get("schema_name")
.or_else(|| body.get("schema"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub fn normalize_gateway_schema_name(schema_name: Option<&str>) -> Result<Option<String>, String> {
let Some(raw_schema_name) = schema_name else {
return Ok(None);
};
let trimmed = raw_schema_name.trim();
if trimmed.is_empty() {
return Err("schema_name must not be empty".to_string());
}
let sanitized = sanitize_identifier(trimmed)
.ok_or_else(|| "schema_name must be a valid SQL identifier".to_string())?;
Ok(Some(sanitized.trim_matches('"').to_string()))
}
pub fn qualify_gateway_table_name(
table_name: &str,
schema_name: Option<&str>,
) -> Result<String, String> {
let trimmed_table_name = table_name.trim();
if trimmed_table_name.is_empty() {
return Err("table_name is required".to_string());
}
let normalized_schema_name = normalize_gateway_schema_name(schema_name)?;
match normalized_schema_name {
Some(schema_name) => {
if trimmed_table_name.contains('.') {
return Err(
"table_name must not include a schema prefix when schema_name is provided"
.to_string(),
);
}
sanitize_identifier(trimmed_table_name)
.ok_or_else(|| "table_name must be a valid SQL identifier".to_string())?;
sanitize_identifier(&schema_name)
.ok_or_else(|| "schema_name must be a valid SQL identifier".to_string())?;
Ok(format!("{schema_name}.{trimmed_table_name}"))
}
None => Ok(trimmed_table_name.to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn normalize_gateway_schema_name_accepts_valid_identifier() {
assert_eq!(
normalize_gateway_schema_name(Some("analytics")).expect("schema should be valid"),
Some("analytics".to_string())
);
}
#[test]
fn normalize_gateway_schema_name_rejects_invalid_identifier() {
let err = normalize_gateway_schema_name(Some("public;drop schema public"))
.expect_err("invalid identifier should fail");
assert!(err.contains("valid SQL identifier"));
}
#[test]
fn qualify_gateway_table_name_with_schema_builds_qualified_target() {
let qualified = qualify_gateway_table_name("events", Some("analytics"))
.expect("qualifying table should succeed");
assert_eq!(qualified, "analytics.events");
}
#[test]
fn qualify_gateway_table_name_rejects_ambiguous_schema_sources() {
let err = qualify_gateway_table_name("public.events", Some("analytics"))
.expect_err("schema_name with qualified table should fail");
assert!(err.contains("must not include a schema prefix"));
}
#[test]
fn schema_name_from_body_supports_alias() {
assert_eq!(
schema_name_from_body(&json!({ "schema": "analytics" })),
Some("analytics".to_string())
);
assert_eq!(
schema_name_from_body(&json!({ "schema_name": "reporting" })),
Some("reporting".to_string())
);
}
#[test]
fn fetch_request_from_body_normalizes_columns_when_forced() {
let request = GatewayFetchRequest::from_body(
&json!({
"table_name": "events",
"columns": ["userId", "createdAt"],
}),
true,
);
assert_eq!(request.columns, vec!["user_id", "created_at"]);
}
#[test]
fn update_payload_supports_set_alias() {
let payload = extract_update_payload(
&json!({
"set": {
"displayName": "Athena",
}
}),
true,
)
.expect("payload should parse");
assert_eq!(
payload.get("display_name"),
Some(&Value::String("Athena".to_string()))
);
}
#[test]
fn gateway_error_envelope_serializes_expected_shape() {
let envelope = GatewayErrorEnvelope::new(
GATEWAY_ERROR_CODE_VALIDATION_FAILED,
GatewayOperationKind::Fetch,
"Invalid request",
"table_name is required",
Some(json!({ "field": "table_name" })),
);
assert_eq!(envelope.status, "error");
assert_eq!(envelope.data.operation, "fetch");
assert_eq!(
envelope.data.details,
Some(json!({ "field": "table_name" }))
);
}
}