use crate::{
StructuredFilter, StructuredFilterOperator, StructuredGatewayFetchPlan, StructuredJoinKind,
StructuredOrderBy, StructuredRelationField, StructuredSelectField, StructuredSelectOperation,
StructuredSelectQuery, StructuredSortDirection, sanitize_identifier,
};
use athena_driver::postgresql::column_resolver::resolve_information_schema_targets;
use athena_driver::postgresql::raw_sql::{
PostgresSqlExecutionResult, execute_postgres_sql, execute_postgres_sql_in_schema,
};
use serde_json::Value;
use sqlx::PgPool;
use std::collections::{BTreeSet, HashMap, HashSet};
fn sanitize_qualified_identifier(identifier: &str) -> Option<String> {
let mut parts = Vec::new();
for segment in identifier.split('.') {
let trimmed = segment.trim().trim_matches('"');
if trimmed.is_empty() {
return None;
}
parts.push(sanitize_identifier(trimmed)?);
}
if parts.is_empty() {
return None;
}
Some(parts.join("."))
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct StructuredTableRef {
schema_name: String,
table_name: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct StructuredForeignKeyMetadata {
child_table: StructuredTableRef,
child_column: String,
parent_table: StructuredTableRef,
parent_column: String,
}
#[derive(Debug, Default, Clone)]
struct StructuredSchemaCatalog {
table_columns: HashMap<StructuredTableRef, HashSet<String>>,
foreign_keys: Vec<StructuredForeignKeyMetadata>,
}
impl StructuredSchemaCatalog {
fn table_has_column(&self, table: &StructuredTableRef, column: &str) -> bool {
self.table_columns
.get(table)
.map(|columns| columns.contains(column))
.unwrap_or(false)
}
}
struct StructuredSqlEmitter<'a> {
alias_counter: usize,
catalog: &'a StructuredSchemaCatalog,
default_schema: Option<&'a str>,
}
pub async fn render_structured_fetch_sql(
pool: &PgPool,
plan: &StructuredGatewayFetchPlan,
) -> Result<String, String> {
let catalog = load_structured_schema_catalog(pool, plan)
.await
.map_err(|err| err.to_string())?;
compile_structured_fetch_sql_for_catalog(plan, &catalog)
}
pub async fn execute_structured_fetch_sql(
pool: &PgPool,
plan: &StructuredGatewayFetchPlan,
sql: &str,
) -> Result<PostgresSqlExecutionResult, sqlx::Error> {
match plan.schema_name.as_deref() {
Some(schema_name) => execute_postgres_sql_in_schema(pool, sql, schema_name).await,
None => execute_postgres_sql(pool, sql).await,
}
}
fn escape_sql_string(input: &str) -> String {
input.replace('\'', "''")
}
fn format_sql_value(value: &Value) -> String {
match value {
Value::Null => "NULL".to_string(),
Value::Bool(flag) => flag.to_string(),
Value::Number(number) => number.to_string(),
Value::String(text) => format!("'{}'", escape_sql_string(text)),
Value::Array(_) | Value::Object(_) => {
let json_text = serde_json::to_string(value).unwrap_or_else(|_| "null".to_string());
format!("'{}'::jsonb", escape_sql_string(&json_text))
}
}
}
fn sanitize_type_cast_sql(raw_cast: &str) -> Result<String, String> {
sanitize_qualified_identifier(raw_cast)
.ok_or_else(|| format!("invalid SQL type cast '{raw_cast}'"))
}
fn apply_sql_cast(expression: String, raw_cast: Option<&str>) -> Result<String, String> {
match raw_cast {
Some(raw_cast) => Ok(format!(
"({expression})::{}",
sanitize_type_cast_sql(raw_cast)?
)),
None => Ok(expression),
}
}
fn singularize_identifier(raw: &str) -> &str {
raw.trim_end_matches('s')
}
fn collect_query_table_refs(
query: &StructuredSelectQuery,
default_schema: Option<&str>,
refs: &mut HashSet<StructuredTableRef>,
) -> Result<(), String> {
refs.insert(resolve_structured_table_ref(&query.from, default_schema)?);
for field in &query.fields {
if let StructuredSelectField::Relation(relation) = field {
collect_query_table_refs(&relation.query, default_schema, refs)?;
}
}
Ok(())
}
fn resolve_structured_table_ref(
raw: &str,
default_schema: Option<&str>,
) -> Result<StructuredTableRef, String> {
let trimmed = raw.trim();
if trimmed.is_empty() {
return Err("structured relation table cannot be empty".to_string());
}
let segments: Vec<&str> = trimmed.split('.').map(str::trim).collect();
match segments.as_slice() {
[table_name] => {
let schema_name = default_schema.unwrap_or("public");
let (_, resolved_table_name) = resolve_information_schema_targets(table_name, true)
.map_err(|err| err.to_string())?;
Ok(StructuredTableRef {
schema_name: schema_name.to_string(),
table_name: resolved_table_name,
})
}
[schema_name, table_name] => {
let (resolved_schema, resolved_table_name) =
resolve_information_schema_targets(trimmed, true).map_err(|err| err.to_string())?;
if !schema_name.eq_ignore_ascii_case(&resolved_schema)
|| !table_name.eq_ignore_ascii_case(&resolved_table_name)
{
return Err(format!("invalid table selector '{trimmed}'"));
}
Ok(StructuredTableRef {
schema_name: resolved_schema,
table_name: resolved_table_name,
})
}
_ => Err(format!(
"table reference '{trimmed}' must be 'table' or 'schema.table'"
)),
}
}
fn render_structured_table_ref(table_ref: &StructuredTableRef) -> String {
format!("{}.{}", table_ref.schema_name, table_ref.table_name)
}
fn missing_structured_table_error(missing_tables: &[StructuredTableRef]) -> String {
match missing_tables {
[] => "structured fetch references no tables".to_string(),
[table_ref] => format!(
"table '{}' was not found for structured gateway fetch",
render_structured_table_ref(table_ref)
),
_ => {
let tables = missing_tables
.iter()
.map(render_structured_table_ref)
.collect::<Vec<String>>()
.join(", ");
format!("tables were not found for structured gateway fetch: {tables}")
}
}
}
fn render_sql_string_list(values: &BTreeSet<String>) -> String {
values
.iter()
.map(|value| format!("'{}'", escape_sql_string(value)))
.collect::<Vec<String>>()
.join(", ")
}
async fn load_structured_schema_catalog(
pool: &PgPool,
plan: &StructuredGatewayFetchPlan,
) -> Result<StructuredSchemaCatalog, sqlx::Error> {
let mut table_refs = HashSet::new();
collect_query_table_refs(&plan.query, plan.schema_name.as_deref(), &mut table_refs)
.map_err(sqlx::Error::Protocol)?;
let mut schemas = BTreeSet::new();
for table_ref in &table_refs {
schemas.insert(table_ref.schema_name.clone());
}
let schema_filter = render_sql_string_list(&schemas);
let column_rows: Vec<(String, String, String)> =
sqlx::query_as::<_, (String, String, String)>(&format!(
"SELECT table_schema, table_name, column_name \
FROM information_schema.columns \
WHERE table_schema IN ({schema_filter})"
))
.fetch_all(pool)
.await?;
let mut table_columns: HashMap<StructuredTableRef, HashSet<String>> = HashMap::new();
for (schema_name, table_name, column_name) in column_rows {
let table_ref = StructuredTableRef {
schema_name,
table_name,
};
if table_refs.contains(&table_ref) {
table_columns
.entry(table_ref)
.or_default()
.insert(column_name);
}
}
let foreign_key_rows: Vec<(String, String, String, String, String, String)> =
sqlx::query_as::<_, (String, String, String, String, String, String)>(
&format!(
"SELECT \
tc.table_schema, \
tc.table_name, \
kcu.column_name, \
ccu.table_schema, \
ccu.table_name, \
ccu.column_name \
FROM information_schema.table_constraints AS tc \
JOIN information_schema.key_column_usage AS kcu \
ON tc.constraint_name = kcu.constraint_name \
AND tc.constraint_schema = kcu.constraint_schema \
AND tc.table_schema = kcu.table_schema \
AND tc.table_name = kcu.table_name \
JOIN information_schema.constraint_column_usage AS ccu \
ON ccu.constraint_name = tc.constraint_name \
AND ccu.constraint_schema = tc.constraint_schema \
WHERE tc.constraint_type = 'FOREIGN KEY' \
AND (tc.table_schema IN ({schema_filter}) OR ccu.table_schema IN ({schema_filter}))"
),
)
.fetch_all(pool)
.await?;
let foreign_keys = foreign_key_rows
.into_iter()
.filter_map(
|(
child_schema,
child_table,
child_column,
parent_schema,
parent_table,
parent_column,
)| {
let child_table_ref = StructuredTableRef {
schema_name: child_schema,
table_name: child_table,
};
let parent_table_ref = StructuredTableRef {
schema_name: parent_schema,
table_name: parent_table,
};
if !table_refs.contains(&child_table_ref) && !table_refs.contains(&parent_table_ref)
{
return None;
}
Some(StructuredForeignKeyMetadata {
child_table: child_table_ref,
child_column,
parent_table: parent_table_ref,
parent_column,
})
},
)
.collect();
Ok(StructuredSchemaCatalog {
table_columns,
foreign_keys,
})
}
fn compile_structured_fetch_sql_for_catalog(
plan: &StructuredGatewayFetchPlan,
catalog: &StructuredSchemaCatalog,
) -> Result<String, String> {
let mut referenced_tables = HashSet::new();
collect_query_table_refs(
&plan.query,
plan.schema_name.as_deref(),
&mut referenced_tables,
)?;
let mut missing_tables: Vec<StructuredTableRef> = referenced_tables
.into_iter()
.filter(|table_ref| !catalog.table_columns.contains_key(table_ref))
.collect();
missing_tables.sort_by(|left, right| {
left.schema_name
.cmp(&right.schema_name)
.then(left.table_name.cmp(&right.table_name))
});
if !missing_tables.is_empty() {
return Err(missing_structured_table_error(&missing_tables));
}
StructuredSqlEmitter {
alias_counter: 0,
catalog,
default_schema: plan.schema_name.as_deref(),
}
.emit_root(&plan.query)
}
fn distribute_filters(
fields: &mut [StructuredSelectField],
filters: Vec<StructuredFilter>,
) -> Vec<StructuredFilter> {
let mut remaining = Vec::new();
for filter in filters {
let Some((head, tail)) = filter.column.split_once('.') else {
remaining.push(filter);
continue;
};
let maybe_relation = fields.iter_mut().find_map(|field| match field {
StructuredSelectField::Relation(relation)
if relation.display_name() == head || relation.name == head =>
{
Some(relation)
}
_ => None,
});
if let Some(relation) = maybe_relation {
relation.query.filters.push(StructuredFilter {
column: tail.to_string(),
operator: filter.operator,
values: filter.values,
column_cast: filter.column_cast,
value_cast: filter.value_cast,
});
} else {
remaining.push(filter);
}
}
remaining
}
impl<'a> StructuredSqlEmitter<'a> {
fn emit_root(mut self, query: &StructuredSelectQuery) -> Result<String, String> {
if query.operation != StructuredSelectOperation::Select {
return Err("only structured select queries are supported".to_string());
}
let mut query: StructuredSelectQuery = query.clone();
query.filters = distribute_filters(&mut query.fields, query.filters.clone());
let base_alias: &str = "t0";
let table_sql: String = sanitize_root_table_name(&query.from)?;
let mut select_exprs: Vec<String> = Vec::new();
let mut joins: Vec<String> = Vec::new();
for field in &query.fields {
self.emit_field(
field,
base_alias,
&query.from,
&mut select_exprs,
&mut joins,
)?;
}
if select_exprs.is_empty() {
return Err(
"structured gateway fetch requires at least one projected field".to_string(),
);
}
let mut sql: String = format!(
"SELECT {} FROM {} {}",
select_exprs.join(", "),
table_sql,
base_alias
);
if !joins.is_empty() {
sql.push('\n');
sql.push_str(&joins.join("\n"));
}
let where_clause: String = build_where_clause(&query.filters, base_alias)?;
if !where_clause.is_empty() {
sql.push('\n');
sql.push_str(&where_clause);
}
let order_clause: String = build_order_clause(&query.order_by, base_alias)?;
if !order_clause.is_empty() {
sql.push('\n');
sql.push_str(&order_clause);
}
if let Some(limit) = query.limit {
sql.push_str(&format!("\nLIMIT {limit}"));
}
if let Some(offset) = query.offset {
sql.push_str(&format!("\nOFFSET {offset}"));
}
Ok(sql)
}
fn emit_field(
&mut self,
field: &StructuredSelectField,
parent_alias: &str,
parent_table_selector: &str,
select_exprs: &mut Vec<String>,
joins: &mut Vec<String>,
) -> Result<(), String> {
match field {
StructuredSelectField::Column(column) => {
let column_sql = sanitize_identifier(&column.name)
.ok_or_else(|| format!("invalid projected column '{}'", column.name))?;
match column.alias.as_deref() {
Some(alias) => {
let alias_sql = sanitize_identifier(alias)
.ok_or_else(|| format!("invalid projected alias '{alias}'"))?;
select_exprs.push(format!("{parent_alias}.{column_sql} AS {alias_sql}"));
}
None => {
select_exprs.push(format!("{parent_alias}.{column_sql}"));
}
}
}
StructuredSelectField::Relation(relation) => {
let child_alias: String = self.next_alias(&relation.name);
let agg_alias: String = format!("{child_alias}_agg");
let join_sql: String = self.emit_relation_subquery(
relation,
parent_alias,
parent_table_selector,
&child_alias,
&agg_alias,
)?;
joins.push(join_sql);
let output_alias =
sanitize_identifier(relation.display_name()).ok_or_else(|| {
format!(
"invalid relation output alias '{}'",
relation.display_name()
)
})?;
select_exprs.push(format!("{agg_alias}.data AS {output_alias}"));
}
}
Ok(())
}
fn emit_relation_subquery(
&mut self,
relation: &StructuredRelationField,
parent_alias: &str,
parent_table_selector: &str,
child_alias: &str,
agg_alias: &str,
) -> Result<String, String> {
let mut query: StructuredSelectQuery = relation.query.clone();
query.filters = distribute_filters(&mut query.fields, query.filters.clone());
let child_table_sql = sanitize_root_table_name(&query.from)?;
let (row_expr, nested_joins) = self.emit_relation_row(&query, child_alias)?;
let (join_condition, many_to_one) = relation_join_condition(
self.catalog,
self.default_schema,
relation,
parent_alias,
parent_table_selector,
child_alias,
&query.from,
)?;
let local_where: String = build_where_clause(&query.filters, child_alias)?;
let join_type: &str = match relation.join {
StructuredJoinKind::Left => "LEFT JOIN",
StructuredJoinKind::Inner => "JOIN",
};
let subquery: String = if many_to_one {
let limit: i64 = match query.limit {
Some(value) if value > 1 => {
return Err(format!(
"relation '{}' resolves to one row; limit must be 0 or 1",
relation.display_name()
));
}
Some(value) => value,
None => 1,
};
let order_clause = build_order_clause(&query.order_by, child_alias)?;
let mut sql =
format!("SELECT {row_expr} AS data\nFROM {child_table_sql} {child_alias}");
if !nested_joins.is_empty() {
sql.push('\n');
sql.push_str(&nested_joins.join("\n"));
}
sql.push_str(&format!("\nWHERE {join_condition}"));
if !local_where.is_empty() {
sql.push_str(" AND ");
sql.push_str(local_where.trim_start_matches("WHERE "));
}
if !order_clause.is_empty() {
sql.push('\n');
sql.push_str(&order_clause);
}
if let Some(offset) = query.offset {
sql.push_str(&format!("\nOFFSET {offset}"));
}
sql.push_str(&format!("\nLIMIT {limit}"));
sql
} else {
let orderings: Vec<(String, StructuredSortDirection, String)> =
effective_relation_ordering(&query.order_by, child_alias)?;
let inner_order: String = orderings
.iter()
.map(|(sql_expr, direction, _)| format!("{sql_expr} {}", direction.sql_keyword()))
.collect::<Vec<String>>()
.join(", ");
let aggregate_order: String = orderings
.iter()
.map(|(_, direction, alias)| {
format!("rel_window.{alias} {}", direction.sql_keyword())
})
.collect::<Vec<String>>()
.join(", ");
let order_select_columns: String = orderings
.iter()
.map(|(sql_expr, _, alias)| format!(", {sql_expr} AS {alias}"))
.collect::<String>();
let mut inner: String = format!(
"SELECT {row_expr} AS row_data{order_select_columns}\nFROM {child_table_sql} {child_alias}"
);
if !nested_joins.is_empty() {
inner.push('\n');
inner.push_str(&nested_joins.join("\n"));
}
inner.push_str(&format!("\nWHERE {join_condition}"));
if !local_where.is_empty() {
inner.push_str(" AND ");
inner.push_str(local_where.trim_start_matches("WHERE "));
}
if !inner_order.is_empty() {
inner.push_str(&format!("\nORDER BY {inner_order}"));
}
if let Some(limit) = query.limit {
inner.push_str(&format!("\nLIMIT {limit}"));
}
if let Some(offset) = query.offset {
inner.push_str(&format!("\nOFFSET {offset}"));
}
if aggregate_order.is_empty() {
format!(
"SELECT COALESCE(jsonb_agg(rel_window.row_data) FILTER (WHERE rel_window.row_data IS NOT NULL), '[]'::jsonb) AS data\nFROM (\n{inner}\n) rel_window"
)
} else {
format!(
"SELECT COALESCE(jsonb_agg(rel_window.row_data ORDER BY {aggregate_order}) FILTER (WHERE rel_window.row_data IS NOT NULL), '[]'::jsonb) AS data\nFROM (\n{inner}\n) rel_window"
)
}
};
let join_predicate = match relation.join {
StructuredJoinKind::Inner if many_to_one => format!("{agg_alias}.data IS NOT NULL"),
StructuredJoinKind::Inner => format!("jsonb_array_length({agg_alias}.data) > 0"),
StructuredJoinKind::Left => "TRUE".to_string(),
};
Ok(format!(
"{join_type} LATERAL (\n{subquery}\n) {agg_alias} ON {join_predicate}"
))
}
fn emit_relation_row(
&mut self,
query: &StructuredSelectQuery,
alias: &str,
) -> Result<(String, Vec<String>), String> {
let mut field_pairs = Vec::new();
let mut joins = Vec::new();
for field in &query.fields {
match field {
StructuredSelectField::Column(column) => {
let json_key = column.alias.as_deref().unwrap_or(&column.name);
let column_sql = sanitize_identifier(&column.name)
.ok_or_else(|| format!("invalid relation column '{}'", column.name))?;
field_pairs.push(format!("'{}', {}.{}", json_key, alias, column_sql));
}
StructuredSelectField::Relation(relation) => {
let child_alias = self.next_alias(&relation.name);
let agg_alias = format!("{child_alias}_agg");
let join_sql = self.emit_relation_subquery(
relation,
alias,
&query.from,
&child_alias,
&agg_alias,
)?;
joins.push(join_sql);
field_pairs.push(format!("'{}', {}.data", relation.display_name(), agg_alias));
}
}
}
let row_expr: String = if field_pairs.is_empty() {
format!("to_jsonb({alias})")
} else {
format!("jsonb_build_object({})", field_pairs.join(", "))
};
Ok((row_expr, joins))
}
fn next_alias(&mut self, base: &str) -> String {
let normalized: String = base
.chars()
.filter(|ch| ch.is_ascii_alphanumeric() || *ch == '_')
.collect::<String>();
let stem = if normalized.is_empty() {
"rel"
} else {
&normalized
};
let alias = format!("{stem}_{}", self.alias_counter);
self.alias_counter += 1;
alias
}
}
fn sanitize_root_table_name(table_name: &str) -> Result<String, String> {
if table_name.contains('.') {
sanitize_qualified_identifier(table_name)
.ok_or_else(|| format!("invalid table selector '{table_name}'"))
} else {
sanitize_identifier(table_name)
.ok_or_else(|| format!("invalid table selector '{table_name}'"))
}
}
fn build_where_clause(filters: &[StructuredFilter], alias: &str) -> Result<String, String> {
if filters.is_empty() {
return Ok(String::new());
}
let mut clauses: Vec<String> = Vec::new();
for filter in filters {
let column_sql = apply_sql_cast(
qualified_local_column(alias, &filter.column)?,
filter.column_cast.as_deref(),
)?;
match filter.operator {
StructuredFilterOperator::Eq => {
let value = filter
.values
.first()
.ok_or_else(|| format!("missing eq value for '{}'", filter.column))?;
if value.is_null() {
clauses.push(format!("{column_sql} IS NULL"));
} else {
clauses.push(format!(
"{column_sql} = {}",
apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
));
}
}
StructuredFilterOperator::Neq => {
let value = filter
.values
.first()
.ok_or_else(|| format!("missing neq value for '{}'", filter.column))?;
if value.is_null() {
clauses.push(format!("{column_sql} IS NOT NULL"));
} else {
clauses.push(format!(
"{column_sql} <> {}",
apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
));
}
}
StructuredFilterOperator::Gt => {
let value = filter
.values
.first()
.ok_or_else(|| format!("missing gt value for '{}'", filter.column))?;
clauses.push(format!(
"{column_sql} > {}",
apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
));
}
StructuredFilterOperator::Lt => {
let value = filter
.values
.first()
.ok_or_else(|| format!("missing lt value for '{}'", filter.column))?;
clauses.push(format!(
"{column_sql} < {}",
apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
));
}
StructuredFilterOperator::In => {
if filter.values.is_empty() {
clauses.push("FALSE".to_string());
} else {
let values = filter
.values
.iter()
.map(|value| {
apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())
})
.collect::<Result<Vec<String>, String>>()?
.join(", ");
clauses.push(format!("{column_sql} IN ({values})"));
}
}
}
}
Ok(format!("WHERE {}", clauses.join(" AND ")))
}
fn build_order_clause(order_by: &[StructuredOrderBy], alias: &str) -> Result<String, String> {
if order_by.is_empty() {
return Ok(String::new());
}
let fragments: Vec<String> = order_by
.iter()
.map(|order| {
let column_sql = qualified_local_column(alias, &order.column)?;
Ok(format!("{column_sql} {}", order.direction.sql_keyword()))
})
.collect::<Result<Vec<String>, String>>()?;
Ok(format!("ORDER BY {}", fragments.join(", ")))
}
fn effective_relation_ordering(
order_by: &[StructuredOrderBy],
source_alias: &str,
) -> Result<Vec<(String, StructuredSortDirection, String)>, String> {
if order_by.is_empty() {
return Ok(Vec::new());
}
order_by
.iter()
.cloned()
.into_iter()
.enumerate()
.map(|(index, order)| {
let column_sql = qualified_local_column(source_alias, &order.column)?;
Ok((
column_sql,
order.direction,
format!("__athena_order_{index}"),
))
})
.collect()
}
fn qualified_local_column(alias: &str, column: &str) -> Result<String, String> {
if column.contains('.') {
return Err(format!(
"unsupported dotted column reference '{column}' at this query level"
));
}
let column_sql = sanitize_identifier(column)
.ok_or_else(|| format!("invalid column identifier '{column}'"))?;
Ok(format!("{alias}.{column_sql}"))
}
fn relation_join_condition(
catalog: &StructuredSchemaCatalog,
default_schema: Option<&str>,
relation: &StructuredRelationField,
parent_alias: &str,
parent_table_selector: &str,
child_alias: &str,
child_table_selector: &str,
) -> Result<(String, bool), String> {
let parent_table = resolve_structured_table_ref(parent_table_selector, default_schema)?;
let child_table = resolve_structured_table_ref(child_table_selector, default_schema)?;
if let Some(foreign_key) = relation.foreign_key.as_deref() {
if let Some(stripped) = foreign_key.strip_prefix("parent.") {
let fk = sanitize_identifier(stripped)
.ok_or_else(|| format!("invalid parent foreign key '{foreign_key}'"))?;
return Ok((format!("{parent_alias}.{fk} = {child_alias}.\"id\""), true));
}
if let Some(stripped) = foreign_key.strip_prefix("child.") {
let fk = sanitize_identifier(stripped)
.ok_or_else(|| format!("invalid child foreign key '{foreign_key}'"))?;
return Ok((format!("{child_alias}.{fk} = {parent_alias}.\"id\""), false));
}
let fk = sanitize_identifier(foreign_key)
.ok_or_else(|| format!("invalid foreign key '{foreign_key}'"))?;
let fk_on_parent = catalog.table_has_column(&parent_table, foreign_key);
let fk_on_child = catalog.table_has_column(&child_table, foreign_key);
return match (fk_on_parent, fk_on_child) {
(true, false) => Ok((format!("{parent_alias}.{fk} = {child_alias}.\"id\""), true)),
(false, true) => Ok((format!("{child_alias}.{fk} = {parent_alias}.\"id\""), false)),
(true, true) => Err(format!(
"foreign_key '{}' exists on both '{}' and '{}'; prefix it with parent. or child.",
foreign_key, parent_table.table_name, child_table.table_name
)),
(false, false) => Err(format!(
"foreign_key '{}' was not found on '{}' or '{}'",
foreign_key, parent_table.table_name, child_table.table_name
)),
};
}
let child_to_parent: Vec<&StructuredForeignKeyMetadata> = catalog
.foreign_keys
.iter()
.filter(|metadata| {
metadata.child_table == child_table && metadata.parent_table == parent_table
})
.collect();
let parent_to_child: Vec<&StructuredForeignKeyMetadata> = catalog
.foreign_keys
.iter()
.filter(|metadata| {
metadata.child_table == parent_table && metadata.parent_table == child_table
})
.collect();
match (child_to_parent.as_slice(), parent_to_child.as_slice()) {
([metadata], []) => {
let child_fk = sanitize_identifier(&metadata.child_column).ok_or_else(|| {
format!(
"invalid inferred child foreign key '{}'",
metadata.child_column
)
})?;
let parent_pk = sanitize_identifier(&metadata.parent_column).ok_or_else(|| {
format!(
"invalid inferred parent reference column '{}'",
metadata.parent_column
)
})?;
return Ok((
format!("{child_alias}.{child_fk} = {parent_alias}.{parent_pk}"),
false,
));
}
([], [metadata]) => {
let parent_fk = sanitize_identifier(&metadata.child_column).ok_or_else(|| {
format!(
"invalid inferred parent foreign key '{}'",
metadata.child_column
)
})?;
let child_pk = sanitize_identifier(&metadata.parent_column).ok_or_else(|| {
format!(
"invalid inferred child reference column '{}'",
metadata.parent_column
)
})?;
return Ok((
format!("{parent_alias}.{parent_fk} = {child_alias}.{child_pk}"),
true,
));
}
([], []) => {}
_ => {
return Err(format!(
"relation '{}' between '{}' and '{}' is ambiguous; provide foreign_key",
relation.display_name(),
parent_table.table_name,
child_table.table_name
));
}
}
let child_fk: String = format!("{}_id", singularize_identifier(&parent_table.table_name));
let parent_fk: String = format!("{}_id", singularize_identifier(&child_table.table_name));
let child_has_fk = catalog.table_has_column(&child_table, &child_fk);
let parent_has_fk = catalog.table_has_column(&parent_table, &parent_fk);
if child_has_fk && !parent_has_fk {
let child_fk_sql = sanitize_identifier(&child_fk)
.ok_or_else(|| format!("invalid inferred child foreign key '{child_fk}'"))?;
Ok((
format!("{child_alias}.{child_fk_sql} = {parent_alias}.\"id\""),
false,
))
} else if parent_has_fk && !child_has_fk {
let parent_fk_sql = sanitize_identifier(&parent_fk)
.ok_or_else(|| format!("invalid inferred parent foreign key '{parent_fk}'"))?;
Ok((
format!("{parent_alias}.{parent_fk_sql} = {child_alias}.\"id\""),
true,
))
} else if child_has_fk && parent_has_fk {
Err(format!(
"relation '{}' between '{}' and '{}' is ambiguous; provide foreign_key",
relation.display_name(),
parent_table.table_name,
child_table.table_name
))
} else {
Err(format!(
"could not resolve relation '{}' between '{}' and '{}'; add a foreign key constraint or provide foreign_key",
relation.display_name(),
parent_table.table_name,
child_table.table_name
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::build_structured_fetch_plan;
use serde_json::json;
fn build_plan(
body: &Value,
force_camel_case_to_snake_case: bool,
) -> StructuredGatewayFetchPlan {
build_structured_fetch_plan(body, force_camel_case_to_snake_case)
.expect("plan ok")
.expect("structured plan")
}
fn table_ref(raw: &str) -> StructuredTableRef {
resolve_structured_table_ref(raw, Some("public")).expect("table ref")
}
fn test_catalog(
foreign_keys: &[(&str, &str, &str, &str, &str, &str)],
extra_columns: &[(&str, &[&str])],
) -> StructuredSchemaCatalog {
let mut table_columns: HashMap<StructuredTableRef, HashSet<String>> = HashMap::new();
let mut catalog_foreign_keys = Vec::new();
for (child_table, child_column, parent_table, parent_column, child_schema, parent_schema) in
foreign_keys
{
let child_ref = resolve_structured_table_ref(
&format!("{child_schema}.{child_table}"),
Some("public"),
)
.expect("child ref");
let parent_ref = resolve_structured_table_ref(
&format!("{parent_schema}.{parent_table}"),
Some("public"),
)
.expect("parent ref");
table_columns
.entry(child_ref.clone())
.or_default()
.insert((*child_column).to_string());
table_columns
.entry(parent_ref.clone())
.or_default()
.insert((*parent_column).to_string());
catalog_foreign_keys.push(StructuredForeignKeyMetadata {
child_table: child_ref,
child_column: (*child_column).to_string(),
parent_table: parent_ref,
parent_column: (*parent_column).to_string(),
});
}
for (table_name, columns) in extra_columns {
let table_ref = table_ref(table_name);
let entry = table_columns.entry(table_ref).or_default();
for column in *columns {
entry.insert((*column).to_string());
}
}
StructuredSchemaCatalog {
table_columns,
foreign_keys: catalog_foreign_keys,
}
}
fn compile_sql(
body: &Value,
force_camel_case_to_snake_case: bool,
catalog: &StructuredSchemaCatalog,
) -> String {
let plan = build_plan(body, force_camel_case_to_snake_case);
compile_structured_fetch_sql_for_catalog(&plan, catalog).expect("compiled sql")
}
#[test]
fn structured_select_object_builds_nested_sql() {
let body = json!({
"table_name": "orchestral_sections",
"select": {
"name": true,
"instruments": {
"select": {
"name": true,
"active": true
},
"where": {
"active": { "eq": true }
},
"orderBy": {
"name": "asc"
},
"limit": 10
}
},
"where": {
"name": { "neq": "brass" }
},
"orderBy": {
"name": "desc"
},
"limit": 100
});
let plan = build_plan(&body, false);
let sql = compile_sql(
&body,
false,
&test_catalog(
&[(
"instruments",
"orchestral_section_id",
"orchestral_sections",
"id",
"public",
"public",
)],
&[],
),
);
assert_eq!(plan.table_name, "orchestral_sections");
assert!(sql.contains("FROM \"orchestral_sections\" t0"));
assert!(sql.contains("LEFT JOIN LATERAL"));
assert!(sql.contains("FROM (\nSELECT jsonb_build_object"));
assert!(sql.contains("instruments_0.\"active\" = true"));
assert!(sql.contains("ORDER BY instruments_0.\"name\" ASC"));
assert!(sql.contains("LIMIT 10"));
assert!(sql.contains("ORDER BY t0.\"name\" DESC"));
assert!(sql.contains("LIMIT 100"));
}
#[test]
fn structured_fetch_rejects_top_level_direct_ast_body() {
let body = json!({
"operation": "select",
"from": "orchestral_sections",
"fields": [
{
"kind": "column",
"name": "name"
}
]
});
let err = build_structured_fetch_plan(&body, false).expect_err("ast should be rejected");
assert!(err.contains("first-class direct AST request bodies are not supported"));
}
#[test]
fn structured_select_string_distributes_relation_filters() {
let body = json!({
"table_name": "orchestral_sections",
"select": "id,name,instruments(name)",
"where": {
"instruments.name": { "eq": "flute" }
}
});
let sql = compile_sql(
&body,
false,
&test_catalog(
&[(
"instruments",
"orchestral_section_id",
"orchestral_sections",
"id",
"public",
"public",
)],
&[],
),
);
assert!(sql.contains("instruments_0.\"name\" = 'flute'"));
assert!(!sql.contains("WHERE t0.\"instruments\""));
}
#[test]
fn structured_select_normalizes_camel_case_when_requested() {
let body = json!({
"table_name": "client_statistics",
"select": {
"createdAt": true
},
"where": {
"createdAt": { "gt": "2026-01-01T00:00:00Z" }
}
});
let sql = compile_sql(
&body,
true,
&test_catalog(&[], &[("client_statistics", &["created_at"])]),
);
assert!(sql.contains("t0.\"created_at\""));
assert!(sql.contains("t0.\"created_at\" > '2026-01-01T00:00:00Z'"));
}
#[test]
fn structured_select_collects_nested_resource_names() {
let body = json!({
"table_name": "orchestral_sections",
"select": {
"name": true,
"instruments": {
"select": {
"name": true,
"players": {
"select": {
"display_name": true
}
}
}
}
}
});
let plan = build_plan(&body, false);
assert_eq!(
plan.resource_names(),
vec![
"instruments".to_string(),
"orchestral_sections".to_string(),
"players".to_string(),
]
);
}
#[test]
fn structured_select_cross_schema_relation_string_builds_sql() {
let body = json!({
"table_name": "public.chat_subscriptions",
"select": "user_id,users:athena.users(id,username,image)",
"where_filters": [
{
"column": "user_id",
"operator": "eq",
"value": "ef7a4c74-cc35-4d32-945a-a5271279ecdb",
"column_cast": "text"
}
],
"orderBy": [
{
"column": "user_id",
"direction": "desc"
}
],
"limit": 1
});
let sql = compile_sql(
&body,
false,
&test_catalog(
&[(
"chat_subscriptions",
"user_id",
"users",
"id",
"public",
"athena",
)],
&[("athena.users", &["id", "username", "image"])],
),
);
assert!(sql.contains("FROM \"public\".\"chat_subscriptions\" t0"));
assert!(sql.contains("FROM \"athena\".\"users\" athenausers_0"));
assert!(sql.contains("t0.\"user_id\" = athenausers_0.\"id\""));
assert!(sql.contains("t0.\"user_id\""));
assert!(sql.contains("ef7a4c74-cc35-4d32-945a-a5271279ecdb"));
assert!(sql.contains("ORDER BY t0.\"user_id\" DESC"));
assert!(sql.contains("LIMIT 1"));
}
#[test]
fn structured_select_missing_cross_schema_relation_target_fails_validation() {
let body = json!({
"table_name": "chat_subscriptions",
"schema_name": "public",
"select": "user_id,users:athena.user(id)"
});
let plan = build_plan(&body, false);
let err = compile_structured_fetch_sql_for_catalog(
&plan,
&test_catalog(&[], &[("public.chat_subscriptions", &["id", "user_id"])]),
)
.expect_err("missing relation target should fail before SQL execution");
assert_eq!(
err,
"table 'athena.user' was not found for structured gateway fetch"
);
}
#[test]
fn structured_select_where_only_body_stays_legacy() {
let body = json!({
"table_name": "users",
"where": {
"id": { "eq": 1 }
}
});
assert!(
build_structured_fetch_plan(&body, false)
.expect("plan ok")
.is_none()
);
}
#[test]
fn structured_select_inner_join_filters_empty_collections() {
let body = json!({
"table_name": "orchestral_sections",
"select": "name,instruments!inner(name)"
});
let sql = compile_sql(
&body,
false,
&test_catalog(
&[(
"instruments",
"orchestral_section_id",
"orchestral_sections",
"id",
"public",
"public",
)],
&[],
),
);
assert!(sql.contains("jsonb_array_length(instruments_0_agg.data) > 0"));
assert!(!sql.contains("instruments_0_agg.data IS NOT NULL"));
}
#[test]
fn structured_select_uses_catalog_for_many_to_one_relations() {
let body = json!({
"table_name": "invoices",
"select": {
"invoice_number": true,
"customer": {
"from": "customers",
"select": {
"name": true
}
}
}
});
let sql = compile_sql(
&body,
false,
&test_catalog(
&[(
"invoices",
"customer_id",
"customers",
"id",
"public",
"public",
)],
&[
("invoices", &["invoice_number", "customer_id"]),
("customers", &["id", "name"]),
],
),
);
assert!(sql.contains("t0.\"customer_id\" = customer_0.\"id\""));
assert!(sql.contains("LIMIT 1"));
}
#[test]
fn structured_select_relation_without_order_by_does_not_require_id() {
let body = json!({
"table_name": "orchestral_sections",
"select": {
"name": true,
"instruments": {
"select": {
"name": true
}
}
}
});
let sql = compile_sql(
&body,
false,
&test_catalog(
&[(
"instruments",
"orchestral_section_id",
"orchestral_sections",
"id",
"public",
"public",
)],
&[
("orchestral_sections", &["id", "name"]),
("instruments", &["name", "orchestral_section_id"]),
],
),
);
assert!(!sql.contains("__athena_order_"));
assert!(!sql.contains("ORDER BY instruments_0.\"id\""));
}
#[test]
fn structured_select_ambiguous_relation_requires_foreign_key() {
let body = json!({
"table_name": "orchestral_sections",
"select": {
"instruments": {
"select": {
"name": true
}
}
}
});
let plan = build_plan(&body, false);
let err = compile_structured_fetch_sql_for_catalog(
&plan,
&test_catalog(
&[],
&[
("orchestral_sections", &["id", "instrument_id"]),
("instruments", &["id", "orchestral_section_id", "name"]),
],
),
)
.expect_err("ambiguous relation should fail");
assert!(err.contains("ambiguous"));
}
}