use serde_json::{Map, Value, json};
mod clauses;
mod joins;
mod literals;
mod syntax;
#[cfg(test)]
mod tests;
use clauses::*;
use joins::*;
use literals::*;
use syntax::*;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GatewayRelationSelectTableRef {
pub schema_name: Option<String>,
pub table_name: String,
}
#[derive(Debug, Clone, PartialEq)]
pub struct GatewayRelationSelectRewrite {
pub request_body: serde_json::Value,
pub select: String,
pub table: GatewayRelationSelectTableRef,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CompatibilityTableBinding {
table: GatewayRelationSelectTableRef,
alias: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
enum CompatibilityProjectionItem {
Column(String),
Relation(CompatibilityRelationProjection),
}
#[derive(Debug, Clone, PartialEq)]
struct CompatibilityRelationProjection {
selector: String,
table: GatewayRelationSelectTableRef,
alias: Option<String>,
nested_select: String,
join: CompatibilityJoinKind,
foreign_key: Option<String>,
sql_alias: Option<String>,
where_filters: Vec<Value>,
order_by: Vec<Value>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum CompatibilityJoinKind {
Left,
Inner,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CompatibilityJoinClause {
kind: CompatibilityJoinKind,
binding: CompatibilityTableBinding,
on: CompatibilityJoinPredicate,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CompatibilityJoinPredicate {
left: CompatibilityColumnExpr,
right: CompatibilityColumnExpr,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CompatibilityColumnExpr {
path: String,
cast: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
struct CompatibilityValueExpr {
value: serde_json::Value,
cast: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum CompatibilityTargetScope {
Root,
Relation(String),
}
#[derive(Debug, Clone, PartialEq)]
struct CompatibilityTargetedValue {
target: CompatibilityTargetScope,
value: Value,
}
#[derive(Debug, Clone)]
struct CompatibilityAliasContext {
root: CompatibilityTableBinding,
relations: Vec<CompatibilityRelationBinding>,
}
#[derive(Debug, Clone)]
struct CompatibilityRelationBinding {
display_name: String,
binding: CompatibilityTableBinding,
}
pub fn try_rewrite_relation_select_query(
normalized_query: &str,
schema_name: Option<&str>,
) -> Result<Option<GatewayRelationSelectRewrite>, String> {
let sql = normalized_query.trim().trim_end_matches(';').trim();
if !starts_with_keyword(sql, "SELECT") {
return Ok(None);
}
let select_body = &sql["SELECT".len()..];
let Some(from_index) = find_top_level_keyword(select_body, "FROM", 0) else {
return Ok(None);
};
let select = select_body[..from_index].trim().to_string();
if !select_clause_uses_relation_compatibility(&select) {
return Ok(None);
}
let mut remaining = select_body[from_index + "FROM".len()..].trim();
let from_end =
find_earliest_top_level_keyword(remaining, &["WHERE", "ORDER BY", "LIMIT", "OFFSET"])
.map(|(index, _)| index)
.unwrap_or_else(|| remaining.len());
let from_clause = remaining[..from_end].trim();
let (root_binding, joins) = parse_from_clause_for_compatibility(from_clause)?;
remaining = remaining[from_end..].trim();
let effective_schema = match (root_binding.table.schema_name.clone(), schema_name) {
(Some(query_schema), Some(request_schema))
if !query_schema.eq_ignore_ascii_case(request_schema) =>
{
return Err(
"schema_name does not match the schema-qualified table in the query".to_string(),
);
}
(Some(query_schema), _) => Some(query_schema),
(None, Some(request_schema)) => Some(request_schema.to_string()),
(None, None) => None,
};
let mut projections = parse_select_projection_items(&select, &root_binding)?;
let alias_context = apply_explicit_join_clauses(&mut projections, joins, &root_binding)?;
let mut where_filters = Vec::new();
let mut order_by = Vec::new();
let mut limit = None;
let mut offset = None;
while !remaining.is_empty() {
if starts_with_keyword(remaining, "WHERE") {
remaining = remaining["WHERE".len()..].trim();
let end = find_earliest_top_level_keyword(remaining, &["ORDER BY", "LIMIT", "OFFSET"])
.map(|(index, _)| index)
.unwrap_or_else(|| remaining.len());
where_filters =
parse_where_clause_for_compatibility(remaining[..end].trim(), &alias_context)?;
remaining = remaining[end..].trim();
continue;
}
if starts_with_keyword(remaining, "ORDER BY") {
remaining = remaining["ORDER BY".len()..].trim();
let end = find_earliest_top_level_keyword(remaining, &["LIMIT", "OFFSET"])
.map(|(index, _)| index)
.unwrap_or_else(|| remaining.len());
order_by = parse_order_by_for_compatibility(remaining[..end].trim(), &alias_context)?;
remaining = remaining[end..].trim();
continue;
}
if starts_with_keyword(remaining, "LIMIT") {
remaining = remaining["LIMIT".len()..].trim();
let end = find_earliest_top_level_keyword(remaining, &["OFFSET"])
.map(|(index, _)| index)
.unwrap_or_else(|| remaining.len());
limit = parse_i64_clause(remaining[..end].trim(), "limit")?;
remaining = remaining[end..].trim();
continue;
}
if starts_with_keyword(remaining, "OFFSET") {
remaining = remaining["OFFSET".len()..].trim();
offset = parse_i64_clause(remaining, "offset")?;
remaining = "";
continue;
}
return Err("unsupported SQL clause in relation-select compatibility mode".to_string());
}
let root_filters = apply_targeted_filters(&mut projections, where_filters)?;
let root_order_by = apply_targeted_order_by(&mut projections, order_by)?;
let request_body = build_request_body(
&root_binding.table.table_name,
effective_schema.as_deref(),
&projections,
&root_filters,
&root_order_by,
limit,
offset,
);
Ok(Some(GatewayRelationSelectRewrite {
request_body,
select,
table: GatewayRelationSelectTableRef {
schema_name: effective_schema,
table_name: root_binding.table.table_name,
},
}))
}
fn build_request_body(
table_name: &str,
schema_name: Option<&str>,
projections: &[CompatibilityProjectionItem],
root_filters: &[Value],
root_order_by: &[Value],
limit: Option<i64>,
offset: Option<i64>,
) -> Value {
let mut fields = Vec::new();
for projection in projections {
match projection {
CompatibilityProjectionItem::Column(column) => {
fields.push(json!({
"kind": "column",
"name": column,
}));
}
CompatibilityProjectionItem::Relation(relation) => {
let mut query = Map::new();
query.insert(
"select".to_string(),
Value::String(relation.nested_select.clone()),
);
if !relation.where_filters.is_empty() {
query.insert(
"where_filters".to_string(),
Value::Array(relation.where_filters.clone()),
);
}
if !relation.order_by.is_empty() {
query.insert(
"orderBy".to_string(),
Value::Array(relation.order_by.clone()),
);
}
let mut field = Map::new();
field.insert("kind".to_string(), json!("relation"));
field.insert("name".to_string(), json!(relation.selector));
field.insert("query".to_string(), Value::Object(query));
if let Some(alias) = relation.alias.as_deref() {
field.insert("alias".to_string(), json!(alias));
}
if relation.join == CompatibilityJoinKind::Inner {
field.insert("join".to_string(), json!("inner"));
}
if let Some(foreign_key) = relation.foreign_key.as_deref() {
field.insert("foreign_key".to_string(), json!(foreign_key));
}
fields.push(Value::Object(field));
}
}
}
let mut body = Map::new();
body.insert("table_name".to_string(), json!(table_name));
body.insert("select".to_string(), Value::Array(fields));
if let Some(schema_name) = schema_name {
body.insert("schema_name".to_string(), json!(schema_name));
}
if !root_filters.is_empty() {
body.insert(
"where_filters".to_string(),
Value::Array(root_filters.to_vec()),
);
}
if !root_order_by.is_empty() {
body.insert("orderBy".to_string(), Value::Array(root_order_by.to_vec()));
}
if let Some(limit) = limit {
body.insert("limit".to_string(), json!(limit));
}
if let Some(offset) = offset {
body.insert("offset".to_string(), json!(offset));
}
Value::Object(body)
}
fn apply_targeted_filters(
projections: &mut [CompatibilityProjectionItem],
filters: Vec<CompatibilityTargetedValue>,
) -> Result<Vec<Value>, String> {
let mut root_filters = Vec::new();
for filter in filters {
match filter.target {
CompatibilityTargetScope::Root => root_filters.push(filter.value),
CompatibilityTargetScope::Relation(display_name) => {
let Some(relation) = find_relation_projection_mut(projections, &display_name)
else {
return Err(format!(
"relation '{}' was not projected in the compatibility select clause",
display_name
));
};
relation.where_filters.push(filter.value);
}
}
}
Ok(root_filters)
}
fn apply_targeted_order_by(
projections: &mut [CompatibilityProjectionItem],
order_by: Vec<CompatibilityTargetedValue>,
) -> Result<Vec<Value>, String> {
let mut root_order_by = Vec::new();
for order in order_by {
match order.target {
CompatibilityTargetScope::Root => root_order_by.push(order.value),
CompatibilityTargetScope::Relation(display_name) => {
let Some(relation) = find_relation_projection_mut(projections, &display_name)
else {
return Err(format!(
"relation '{}' was not projected in the compatibility select clause",
display_name
));
};
relation.order_by.push(order.value);
}
}
}
Ok(root_order_by)
}
fn find_relation_projection_mut<'a>(
projections: &'a mut [CompatibilityProjectionItem],
display_name: &str,
) -> Option<&'a mut CompatibilityRelationProjection> {
projections
.iter_mut()
.find_map(|projection| match projection {
CompatibilityProjectionItem::Relation(relation)
if relation.display_name().eq_ignore_ascii_case(display_name) =>
{
Some(relation)
}
_ => None,
})
}
fn parse_select_projection_items(
select: &str,
root_binding: &CompatibilityTableBinding,
) -> Result<Vec<CompatibilityProjectionItem>, String> {
let mut projections = Vec::new();
for item in split_top_level_commas(select) {
if looks_like_relation_projection(&item) {
projections.push(CompatibilityProjectionItem::Relation(
parse_relation_projection_item(&item)?,
));
continue;
}
let column = parse_root_projection_column(&item, root_binding)?;
projections.push(CompatibilityProjectionItem::Column(column));
}
if projections.is_empty() {
return Err("select must include at least one column or relation".to_string());
}
Ok(projections)
}
fn looks_like_relation_projection(input: &str) -> bool {
find_top_level_char(input, b'(').is_some()
}
fn parse_relation_projection_item(input: &str) -> Result<CompatibilityRelationProjection, String> {
let trimmed = input.trim();
let open_paren = find_top_level_char(trimmed, b'(')
.ok_or_else(|| "relation projections must include nested field selections".to_string())?;
let close_paren = find_matching_closing_paren(trimmed, open_paren)
.ok_or_else(|| "relation projection is missing closing ')'".to_string())?;
if !trimmed[close_paren + 1..].trim().is_empty() {
return Err("unsupported trailing tokens in relation projection".to_string());
}
let head = trimmed[..open_paren].trim();
let nested_select = trimmed[open_paren + 1..close_paren].trim();
if nested_select.is_empty() {
return Err("relation projections must select at least one nested field".to_string());
}
let (head_without_modifier, modifier) = parse_relation_modifier(head)?;
let (alias, selector) = parse_projection_alias(&head_without_modifier)?;
let table = parse_table_ref(&selector)?;
let mut join = CompatibilityJoinKind::Left;
let mut foreign_key = None;
if let Some(modifier) = modifier {
if modifier.eq_ignore_ascii_case("inner") {
join = CompatibilityJoinKind::Inner;
} else {
validate_foreign_key_hint(&modifier)?;
foreign_key = Some(modifier);
}
}
Ok(CompatibilityRelationProjection {
selector,
table,
alias,
nested_select: nested_select.to_string(),
join,
foreign_key,
sql_alias: None,
where_filters: Vec::new(),
order_by: Vec::new(),
})
}
fn parse_relation_modifier(input: &str) -> Result<(String, Option<String>), String> {
let trimmed = input.trim();
if trimmed.is_empty() {
return Err("relation projection is missing a relation selector".to_string());
}
if let Some(index) = trimmed.find('!') {
if trimmed[index + 1..].contains('!') {
return Err("relation projections support only one modifier".to_string());
}
let before = trimmed[..index].trim();
let modifier = trimmed[index + 1..].trim();
if before.is_empty() || modifier.is_empty() {
return Err("invalid relation projection modifier".to_string());
}
return Ok((before.to_string(), Some(modifier.to_string())));
}
Ok((trimmed.to_string(), None))
}
fn parse_projection_alias(input: &str) -> Result<(Option<String>, String), String> {
let trimmed = input.trim();
let Some(index) = trimmed.find(':') else {
if trimmed.contains("::") {
return Err(
"relation selectors do not support casts in compatibility mode".to_string(),
);
}
return Ok((None, trimmed.to_string()));
};
if trimmed[index..].starts_with("::") || trimmed[..index].ends_with(':') {
return Err("invalid relation projection alias".to_string());
}
let alias = trimmed[..index].trim();
let selector = trimmed[index + 1..].trim();
if alias.is_empty() || selector.is_empty() {
return Err("invalid relation projection alias".to_string());
}
Ok((Some(alias.to_string()), selector.to_string()))
}
fn parse_root_projection_column(
input: &str,
root_binding: &CompatibilityTableBinding,
) -> Result<String, String> {
let column = parse_column_expr(input)?;
if column.cast.is_some() {
return Err("projected columns do not support casts in compatibility mode".to_string());
}
let (target, normalized) = normalize_column_target(column, root_binding, &[], true)?;
if target != CompatibilityTargetScope::Root {
return Err(
"joined relation columns cannot be projected at the root level; use relation selectors"
.to_string(),
);
}
if normalized.path.contains('.') {
return Err("projected root columns must reference local columns".to_string());
}
Ok(normalized.path)
}
fn apply_explicit_join_clauses(
projections: &mut [CompatibilityProjectionItem],
joins: Vec<CompatibilityJoinClause>,
root_binding: &CompatibilityTableBinding,
) -> Result<CompatibilityAliasContext, String> {
for join in joins {
let relation_index = find_join_relation_index(projections, &join)?;
let relation = match &mut projections[relation_index] {
CompatibilityProjectionItem::Relation(relation) => relation,
CompatibilityProjectionItem::Column(_) => unreachable!(),
};
let derived_foreign_key = derive_join_foreign_key(&join.on, root_binding, &join.binding)?;
if relation.join == CompatibilityJoinKind::Inner && join.kind == CompatibilityJoinKind::Left
{
return Err(format!(
"JOIN target '{}' conflicts with the select-string !inner modifier",
render_table_ref(&join.binding.table)
));
}
if let Some(existing) = relation.foreign_key.as_deref()
&& !existing.eq_ignore_ascii_case(&derived_foreign_key)
{
return Err(format!(
"JOIN target '{}' conflicts with the select-string foreign_key modifier",
render_table_ref(&join.binding.table)
));
}
relation.join = join.kind;
relation.foreign_key = Some(derived_foreign_key);
relation.sql_alias = join.binding.alias.clone();
}
let relations = projections
.iter()
.filter_map(|projection| match projection {
CompatibilityProjectionItem::Relation(relation) => Some(CompatibilityRelationBinding {
display_name: relation.display_name().to_string(),
binding: CompatibilityTableBinding {
table: relation.table.clone(),
alias: relation.sql_alias.clone(),
},
}),
CompatibilityProjectionItem::Column(_) => None,
})
.collect();
Ok(CompatibilityAliasContext {
root: root_binding.clone(),
relations,
})
}
impl CompatibilityTableBinding {
fn matches_simple_qualifier(&self, qualifier: &str) -> bool {
self.alias
.as_deref()
.map(|alias| alias.eq_ignore_ascii_case(qualifier))
.unwrap_or(false)
|| self.table.table_name.eq_ignore_ascii_case(qualifier)
}
fn matches_schema_table(&self, schema_name: &str, table_name: &str) -> bool {
self.table
.schema_name
.as_deref()
.map(|schema| schema.eq_ignore_ascii_case(schema_name))
.unwrap_or(false)
&& self.table.table_name.eq_ignore_ascii_case(table_name)
}
}
impl CompatibilityRelationProjection {
fn display_name(&self) -> &str {
self.alias.as_deref().unwrap_or(&self.selector)
}
}