Skip to main content

athena_gateway/
structured_fetch_sql.rs

1//! Structured SQL compilation and execution support for `/gateway/fetch`.
2//!
3//! This module owns the database-aware half of structured fetch:
4//!
5//! - `information_schema` catalog loading for referenced relations,
6//! - SQL compilation for nested structured selections,
7//! - portable execution helpers for compiled Postgres structured-fetch SQL.
8
9use crate::{
10    StructuredFilter, StructuredFilterOperator, StructuredGatewayFetchPlan, StructuredJoinKind,
11    StructuredOrderBy, StructuredRelationField, StructuredSelectField, StructuredSelectOperation,
12    StructuredSelectQuery, StructuredSortDirection, sanitize_identifier,
13};
14use athena_driver::postgresql::column_resolver::resolve_information_schema_targets;
15use athena_driver::postgresql::raw_sql::{
16    PostgresSqlExecutionResult, execute_postgres_sql, execute_postgres_sql_in_schema,
17};
18use serde_json::Value;
19use sqlx::PgPool;
20use std::collections::{BTreeSet, HashMap, HashSet};
21
22fn sanitize_qualified_identifier(identifier: &str) -> Option<String> {
23    let mut parts = Vec::new();
24    for segment in identifier.split('.') {
25        let trimmed = segment.trim().trim_matches('"');
26        if trimmed.is_empty() {
27            return None;
28        }
29        parts.push(sanitize_identifier(trimmed)?);
30    }
31    if parts.is_empty() {
32        return None;
33    }
34    Some(parts.join("."))
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38struct StructuredTableRef {
39    schema_name: String,
40    table_name: String,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44struct StructuredForeignKeyMetadata {
45    child_table: StructuredTableRef,
46    child_column: String,
47    parent_table: StructuredTableRef,
48    parent_column: String,
49}
50
51#[derive(Debug, Default, Clone)]
52struct StructuredSchemaCatalog {
53    table_columns: HashMap<StructuredTableRef, HashSet<String>>,
54    foreign_keys: Vec<StructuredForeignKeyMetadata>,
55}
56
57impl StructuredSchemaCatalog {
58    fn table_has_column(&self, table: &StructuredTableRef, column: &str) -> bool {
59        self.table_columns
60            .get(table)
61            .map(|columns| columns.contains(column))
62            .unwrap_or(false)
63    }
64}
65
66struct StructuredSqlEmitter<'a> {
67    alias_counter: usize,
68    catalog: &'a StructuredSchemaCatalog,
69    default_schema: Option<&'a str>,
70}
71
72/// Loads relation metadata and compiles SQL for a parsed structured fetch plan.
73pub async fn render_structured_fetch_sql(
74    pool: &PgPool,
75    plan: &StructuredGatewayFetchPlan,
76) -> Result<String, String> {
77    let catalog = load_structured_schema_catalog(pool, plan)
78        .await
79        .map_err(|err| err.to_string())?;
80    compile_structured_fetch_sql_for_catalog(plan, &catalog)
81}
82
83/// Executes compiled structured fetch SQL against the selected Postgres pool.
84pub async fn execute_structured_fetch_sql(
85    pool: &PgPool,
86    plan: &StructuredGatewayFetchPlan,
87    sql: &str,
88) -> Result<PostgresSqlExecutionResult, sqlx::Error> {
89    match plan.schema_name.as_deref() {
90        Some(schema_name) => execute_postgres_sql_in_schema(pool, sql, schema_name).await,
91        None => execute_postgres_sql(pool, sql).await,
92    }
93}
94
95fn escape_sql_string(input: &str) -> String {
96    input.replace('\'', "''")
97}
98
99fn format_sql_value(value: &Value) -> String {
100    match value {
101        Value::Null => "NULL".to_string(),
102        Value::Bool(flag) => flag.to_string(),
103        Value::Number(number) => number.to_string(),
104        Value::String(text) => format!("'{}'", escape_sql_string(text)),
105        Value::Array(_) | Value::Object(_) => {
106            let json_text = serde_json::to_string(value).unwrap_or_else(|_| "null".to_string());
107            format!("'{}'::jsonb", escape_sql_string(&json_text))
108        }
109    }
110}
111
112fn sanitize_type_cast_sql(raw_cast: &str) -> Result<String, String> {
113    sanitize_qualified_identifier(raw_cast)
114        .ok_or_else(|| format!("invalid SQL type cast '{raw_cast}'"))
115}
116
117fn apply_sql_cast(expression: String, raw_cast: Option<&str>) -> Result<String, String> {
118    match raw_cast {
119        Some(raw_cast) => Ok(format!(
120            "({expression})::{}",
121            sanitize_type_cast_sql(raw_cast)?
122        )),
123        None => Ok(expression),
124    }
125}
126
127fn singularize_identifier(raw: &str) -> &str {
128    raw.trim_end_matches('s')
129}
130
131fn collect_query_table_refs(
132    query: &StructuredSelectQuery,
133    default_schema: Option<&str>,
134    refs: &mut HashSet<StructuredTableRef>,
135) -> Result<(), String> {
136    refs.insert(resolve_structured_table_ref(&query.from, default_schema)?);
137
138    for field in &query.fields {
139        if let StructuredSelectField::Relation(relation) = field {
140            collect_query_table_refs(&relation.query, default_schema, refs)?;
141        }
142    }
143
144    Ok(())
145}
146
147fn resolve_structured_table_ref(
148    raw: &str,
149    default_schema: Option<&str>,
150) -> Result<StructuredTableRef, String> {
151    let trimmed = raw.trim();
152    if trimmed.is_empty() {
153        return Err("structured relation table cannot be empty".to_string());
154    }
155
156    let segments: Vec<&str> = trimmed.split('.').map(str::trim).collect();
157    match segments.as_slice() {
158        [table_name] => {
159            let schema_name = default_schema.unwrap_or("public");
160            let (_, resolved_table_name) = resolve_information_schema_targets(table_name, true)
161                .map_err(|err| err.to_string())?;
162            Ok(StructuredTableRef {
163                schema_name: schema_name.to_string(),
164                table_name: resolved_table_name,
165            })
166        }
167        [schema_name, table_name] => {
168            let (resolved_schema, resolved_table_name) =
169                resolve_information_schema_targets(trimmed, true).map_err(|err| err.to_string())?;
170            if !schema_name.eq_ignore_ascii_case(&resolved_schema)
171                || !table_name.eq_ignore_ascii_case(&resolved_table_name)
172            {
173                return Err(format!("invalid table selector '{trimmed}'"));
174            }
175            Ok(StructuredTableRef {
176                schema_name: resolved_schema,
177                table_name: resolved_table_name,
178            })
179        }
180        _ => Err(format!(
181            "table reference '{trimmed}' must be 'table' or 'schema.table'"
182        )),
183    }
184}
185
186fn render_structured_table_ref(table_ref: &StructuredTableRef) -> String {
187    format!("{}.{}", table_ref.schema_name, table_ref.table_name)
188}
189
190fn missing_structured_table_error(missing_tables: &[StructuredTableRef]) -> String {
191    match missing_tables {
192        [] => "structured fetch references no tables".to_string(),
193        [table_ref] => format!(
194            "table '{}' was not found for structured gateway fetch",
195            render_structured_table_ref(table_ref)
196        ),
197        _ => {
198            let tables = missing_tables
199                .iter()
200                .map(render_structured_table_ref)
201                .collect::<Vec<String>>()
202                .join(", ");
203            format!("tables were not found for structured gateway fetch: {tables}")
204        }
205    }
206}
207
208fn render_sql_string_list(values: &BTreeSet<String>) -> String {
209    values
210        .iter()
211        .map(|value| format!("'{}'", escape_sql_string(value)))
212        .collect::<Vec<String>>()
213        .join(", ")
214}
215
216async fn load_structured_schema_catalog(
217    pool: &PgPool,
218    plan: &StructuredGatewayFetchPlan,
219) -> Result<StructuredSchemaCatalog, sqlx::Error> {
220    let mut table_refs = HashSet::new();
221    collect_query_table_refs(&plan.query, plan.schema_name.as_deref(), &mut table_refs)
222        .map_err(sqlx::Error::Protocol)?;
223
224    let mut schemas = BTreeSet::new();
225    for table_ref in &table_refs {
226        schemas.insert(table_ref.schema_name.clone());
227    }
228    let schema_filter = render_sql_string_list(&schemas);
229
230    let column_rows: Vec<(String, String, String)> =
231        sqlx::query_as::<_, (String, String, String)>(&format!(
232            "SELECT table_schema, table_name, column_name \
233             FROM information_schema.columns \
234             WHERE table_schema IN ({schema_filter})"
235        ))
236        .fetch_all(pool)
237        .await?;
238
239    let mut table_columns: HashMap<StructuredTableRef, HashSet<String>> = HashMap::new();
240    for (schema_name, table_name, column_name) in column_rows {
241        let table_ref = StructuredTableRef {
242            schema_name,
243            table_name,
244        };
245        if table_refs.contains(&table_ref) {
246            table_columns
247                .entry(table_ref)
248                .or_default()
249                .insert(column_name);
250        }
251    }
252
253    let foreign_key_rows: Vec<(String, String, String, String, String, String)> =
254        sqlx::query_as::<_, (String, String, String, String, String, String)>(
255            &format!(
256                "SELECT \
257                     tc.table_schema, \
258                     tc.table_name, \
259                     kcu.column_name, \
260                     ccu.table_schema, \
261                     ccu.table_name, \
262                     ccu.column_name \
263                 FROM information_schema.table_constraints AS tc \
264                 JOIN information_schema.key_column_usage AS kcu \
265                   ON tc.constraint_name = kcu.constraint_name \
266                  AND tc.constraint_schema = kcu.constraint_schema \
267                  AND tc.table_schema = kcu.table_schema \
268                  AND tc.table_name = kcu.table_name \
269                 JOIN information_schema.constraint_column_usage AS ccu \
270                   ON ccu.constraint_name = tc.constraint_name \
271                  AND ccu.constraint_schema = tc.constraint_schema \
272                 WHERE tc.constraint_type = 'FOREIGN KEY' \
273                   AND (tc.table_schema IN ({schema_filter}) OR ccu.table_schema IN ({schema_filter}))"
274            ),
275        )
276        .fetch_all(pool)
277        .await?;
278
279    let foreign_keys = foreign_key_rows
280        .into_iter()
281        .filter_map(
282            |(
283                child_schema,
284                child_table,
285                child_column,
286                parent_schema,
287                parent_table,
288                parent_column,
289            )| {
290                let child_table_ref = StructuredTableRef {
291                    schema_name: child_schema,
292                    table_name: child_table,
293                };
294                let parent_table_ref = StructuredTableRef {
295                    schema_name: parent_schema,
296                    table_name: parent_table,
297                };
298
299                if !table_refs.contains(&child_table_ref) && !table_refs.contains(&parent_table_ref)
300                {
301                    return None;
302                }
303
304                Some(StructuredForeignKeyMetadata {
305                    child_table: child_table_ref,
306                    child_column,
307                    parent_table: parent_table_ref,
308                    parent_column,
309                })
310            },
311        )
312        .collect();
313
314    Ok(StructuredSchemaCatalog {
315        table_columns,
316        foreign_keys,
317    })
318}
319
320fn compile_structured_fetch_sql_for_catalog(
321    plan: &StructuredGatewayFetchPlan,
322    catalog: &StructuredSchemaCatalog,
323) -> Result<String, String> {
324    let mut referenced_tables = HashSet::new();
325    collect_query_table_refs(
326        &plan.query,
327        plan.schema_name.as_deref(),
328        &mut referenced_tables,
329    )?;
330    let mut missing_tables: Vec<StructuredTableRef> = referenced_tables
331        .into_iter()
332        .filter(|table_ref| !catalog.table_columns.contains_key(table_ref))
333        .collect();
334    missing_tables.sort_by(|left, right| {
335        left.schema_name
336            .cmp(&right.schema_name)
337            .then(left.table_name.cmp(&right.table_name))
338    });
339    if !missing_tables.is_empty() {
340        return Err(missing_structured_table_error(&missing_tables));
341    }
342
343    StructuredSqlEmitter {
344        alias_counter: 0,
345        catalog,
346        default_schema: plan.schema_name.as_deref(),
347    }
348    .emit_root(&plan.query)
349}
350
351fn distribute_filters(
352    fields: &mut [StructuredSelectField],
353    filters: Vec<StructuredFilter>,
354) -> Vec<StructuredFilter> {
355    let mut remaining = Vec::new();
356
357    for filter in filters {
358        let Some((head, tail)) = filter.column.split_once('.') else {
359            remaining.push(filter);
360            continue;
361        };
362
363        let maybe_relation = fields.iter_mut().find_map(|field| match field {
364            StructuredSelectField::Relation(relation)
365                if relation.display_name() == head || relation.name == head =>
366            {
367                Some(relation)
368            }
369            _ => None,
370        });
371
372        if let Some(relation) = maybe_relation {
373            relation.query.filters.push(StructuredFilter {
374                column: tail.to_string(),
375                operator: filter.operator,
376                values: filter.values,
377                column_cast: filter.column_cast,
378                value_cast: filter.value_cast,
379            });
380        } else {
381            remaining.push(filter);
382        }
383    }
384
385    remaining
386}
387
388impl<'a> StructuredSqlEmitter<'a> {
389    fn emit_root(mut self, query: &StructuredSelectQuery) -> Result<String, String> {
390        if query.operation != StructuredSelectOperation::Select {
391            return Err("only structured select queries are supported".to_string());
392        }
393
394        let mut query: StructuredSelectQuery = query.clone();
395        query.filters = distribute_filters(&mut query.fields, query.filters.clone());
396
397        let base_alias: &str = "t0";
398        let table_sql: String = sanitize_root_table_name(&query.from)?;
399        let mut select_exprs: Vec<String> = Vec::new();
400        let mut joins: Vec<String> = Vec::new();
401
402        for field in &query.fields {
403            self.emit_field(
404                field,
405                base_alias,
406                &query.from,
407                &mut select_exprs,
408                &mut joins,
409            )?;
410        }
411
412        if select_exprs.is_empty() {
413            return Err(
414                "structured gateway fetch requires at least one projected field".to_string(),
415            );
416        }
417
418        let mut sql: String = format!(
419            "SELECT {} FROM {} {}",
420            select_exprs.join(", "),
421            table_sql,
422            base_alias
423        );
424
425        if !joins.is_empty() {
426            sql.push('\n');
427            sql.push_str(&joins.join("\n"));
428        }
429
430        let where_clause: String = build_where_clause(&query.filters, base_alias)?;
431        if !where_clause.is_empty() {
432            sql.push('\n');
433            sql.push_str(&where_clause);
434        }
435
436        let order_clause: String = build_order_clause(&query.order_by, base_alias)?;
437        if !order_clause.is_empty() {
438            sql.push('\n');
439            sql.push_str(&order_clause);
440        }
441
442        if let Some(limit) = query.limit {
443            sql.push_str(&format!("\nLIMIT {limit}"));
444        }
445        if let Some(offset) = query.offset {
446            sql.push_str(&format!("\nOFFSET {offset}"));
447        }
448
449        Ok(sql)
450    }
451
452    fn emit_field(
453        &mut self,
454        field: &StructuredSelectField,
455        parent_alias: &str,
456        parent_table_selector: &str,
457        select_exprs: &mut Vec<String>,
458        joins: &mut Vec<String>,
459    ) -> Result<(), String> {
460        match field {
461            StructuredSelectField::Column(column) => {
462                let column_sql = sanitize_identifier(&column.name)
463                    .ok_or_else(|| format!("invalid projected column '{}'", column.name))?;
464                match column.alias.as_deref() {
465                    Some(alias) => {
466                        let alias_sql = sanitize_identifier(alias)
467                            .ok_or_else(|| format!("invalid projected alias '{alias}'"))?;
468                        select_exprs.push(format!("{parent_alias}.{column_sql} AS {alias_sql}"));
469                    }
470                    None => {
471                        select_exprs.push(format!("{parent_alias}.{column_sql}"));
472                    }
473                }
474            }
475            StructuredSelectField::Relation(relation) => {
476                let child_alias: String = self.next_alias(&relation.name);
477                let agg_alias: String = format!("{child_alias}_agg");
478                let join_sql: String = self.emit_relation_subquery(
479                    relation,
480                    parent_alias,
481                    parent_table_selector,
482                    &child_alias,
483                    &agg_alias,
484                )?;
485                joins.push(join_sql);
486                let output_alias =
487                    sanitize_identifier(relation.display_name()).ok_or_else(|| {
488                        format!(
489                            "invalid relation output alias '{}'",
490                            relation.display_name()
491                        )
492                    })?;
493                select_exprs.push(format!("{agg_alias}.data AS {output_alias}"));
494            }
495        }
496
497        Ok(())
498    }
499
500    fn emit_relation_subquery(
501        &mut self,
502        relation: &StructuredRelationField,
503        parent_alias: &str,
504        parent_table_selector: &str,
505        child_alias: &str,
506        agg_alias: &str,
507    ) -> Result<String, String> {
508        let mut query: StructuredSelectQuery = relation.query.clone();
509        query.filters = distribute_filters(&mut query.fields, query.filters.clone());
510
511        let child_table_sql = sanitize_root_table_name(&query.from)?;
512        let (row_expr, nested_joins) = self.emit_relation_row(&query, child_alias)?;
513        let (join_condition, many_to_one) = relation_join_condition(
514            self.catalog,
515            self.default_schema,
516            relation,
517            parent_alias,
518            parent_table_selector,
519            child_alias,
520            &query.from,
521        )?;
522        let local_where: String = build_where_clause(&query.filters, child_alias)?;
523        let join_type: &str = match relation.join {
524            StructuredJoinKind::Left => "LEFT JOIN",
525            StructuredJoinKind::Inner => "JOIN",
526        };
527
528        let subquery: String = if many_to_one {
529            let limit: i64 = match query.limit {
530                Some(value) if value > 1 => {
531                    return Err(format!(
532                        "relation '{}' resolves to one row; limit must be 0 or 1",
533                        relation.display_name()
534                    ));
535                }
536                Some(value) => value,
537                None => 1,
538            };
539            let order_clause = build_order_clause(&query.order_by, child_alias)?;
540            let mut sql =
541                format!("SELECT {row_expr} AS data\nFROM {child_table_sql} {child_alias}");
542            if !nested_joins.is_empty() {
543                sql.push('\n');
544                sql.push_str(&nested_joins.join("\n"));
545            }
546            sql.push_str(&format!("\nWHERE {join_condition}"));
547            if !local_where.is_empty() {
548                sql.push_str(" AND ");
549                sql.push_str(local_where.trim_start_matches("WHERE "));
550            }
551            if !order_clause.is_empty() {
552                sql.push('\n');
553                sql.push_str(&order_clause);
554            }
555            if let Some(offset) = query.offset {
556                sql.push_str(&format!("\nOFFSET {offset}"));
557            }
558            sql.push_str(&format!("\nLIMIT {limit}"));
559            sql
560        } else {
561            let orderings: Vec<(String, StructuredSortDirection, String)> =
562                effective_relation_ordering(&query.order_by, child_alias)?;
563            let inner_order: String = orderings
564                .iter()
565                .map(|(sql_expr, direction, _)| format!("{sql_expr} {}", direction.sql_keyword()))
566                .collect::<Vec<String>>()
567                .join(", ");
568            let aggregate_order: String = orderings
569                .iter()
570                .map(|(_, direction, alias)| {
571                    format!("rel_window.{alias} {}", direction.sql_keyword())
572                })
573                .collect::<Vec<String>>()
574                .join(", ");
575            let order_select_columns: String = orderings
576                .iter()
577                .map(|(sql_expr, _, alias)| format!(", {sql_expr} AS {alias}"))
578                .collect::<String>();
579
580            let mut inner: String = format!(
581                "SELECT {row_expr} AS row_data{order_select_columns}\nFROM {child_table_sql} {child_alias}"
582            );
583            if !nested_joins.is_empty() {
584                inner.push('\n');
585                inner.push_str(&nested_joins.join("\n"));
586            }
587            inner.push_str(&format!("\nWHERE {join_condition}"));
588            if !local_where.is_empty() {
589                inner.push_str(" AND ");
590                inner.push_str(local_where.trim_start_matches("WHERE "));
591            }
592            if !inner_order.is_empty() {
593                inner.push_str(&format!("\nORDER BY {inner_order}"));
594            }
595            if let Some(limit) = query.limit {
596                inner.push_str(&format!("\nLIMIT {limit}"));
597            }
598            if let Some(offset) = query.offset {
599                inner.push_str(&format!("\nOFFSET {offset}"));
600            }
601
602            if aggregate_order.is_empty() {
603                format!(
604                    "SELECT COALESCE(jsonb_agg(rel_window.row_data) FILTER (WHERE rel_window.row_data IS NOT NULL), '[]'::jsonb) AS data\nFROM (\n{inner}\n) rel_window"
605                )
606            } else {
607                format!(
608                    "SELECT COALESCE(jsonb_agg(rel_window.row_data ORDER BY {aggregate_order}) FILTER (WHERE rel_window.row_data IS NOT NULL), '[]'::jsonb) AS data\nFROM (\n{inner}\n) rel_window"
609                )
610            }
611        };
612
613        let join_predicate = match relation.join {
614            StructuredJoinKind::Inner if many_to_one => format!("{agg_alias}.data IS NOT NULL"),
615            StructuredJoinKind::Inner => format!("jsonb_array_length({agg_alias}.data) > 0"),
616            StructuredJoinKind::Left => "TRUE".to_string(),
617        };
618
619        Ok(format!(
620            "{join_type} LATERAL (\n{subquery}\n) {agg_alias} ON {join_predicate}"
621        ))
622    }
623
624    fn emit_relation_row(
625        &mut self,
626        query: &StructuredSelectQuery,
627        alias: &str,
628    ) -> Result<(String, Vec<String>), String> {
629        let mut field_pairs = Vec::new();
630        let mut joins = Vec::new();
631
632        for field in &query.fields {
633            match field {
634                StructuredSelectField::Column(column) => {
635                    let json_key = column.alias.as_deref().unwrap_or(&column.name);
636                    let column_sql = sanitize_identifier(&column.name)
637                        .ok_or_else(|| format!("invalid relation column '{}'", column.name))?;
638                    field_pairs.push(format!("'{}', {}.{}", json_key, alias, column_sql));
639                }
640                StructuredSelectField::Relation(relation) => {
641                    let child_alias = self.next_alias(&relation.name);
642                    let agg_alias = format!("{child_alias}_agg");
643                    let join_sql = self.emit_relation_subquery(
644                        relation,
645                        alias,
646                        &query.from,
647                        &child_alias,
648                        &agg_alias,
649                    )?;
650                    joins.push(join_sql);
651                    field_pairs.push(format!("'{}', {}.data", relation.display_name(), agg_alias));
652                }
653            }
654        }
655
656        let row_expr: String = if field_pairs.is_empty() {
657            format!("to_jsonb({alias})")
658        } else {
659            format!("jsonb_build_object({})", field_pairs.join(", "))
660        };
661
662        Ok((row_expr, joins))
663    }
664
665    fn next_alias(&mut self, base: &str) -> String {
666        let normalized: String = base
667            .chars()
668            .filter(|ch| ch.is_ascii_alphanumeric() || *ch == '_')
669            .collect::<String>();
670        let stem = if normalized.is_empty() {
671            "rel"
672        } else {
673            &normalized
674        };
675        let alias = format!("{stem}_{}", self.alias_counter);
676        self.alias_counter += 1;
677        alias
678    }
679}
680
681fn sanitize_root_table_name(table_name: &str) -> Result<String, String> {
682    if table_name.contains('.') {
683        sanitize_qualified_identifier(table_name)
684            .ok_or_else(|| format!("invalid table selector '{table_name}'"))
685    } else {
686        sanitize_identifier(table_name)
687            .ok_or_else(|| format!("invalid table selector '{table_name}'"))
688    }
689}
690
691fn build_where_clause(filters: &[StructuredFilter], alias: &str) -> Result<String, String> {
692    if filters.is_empty() {
693        return Ok(String::new());
694    }
695
696    let mut clauses: Vec<String> = Vec::new();
697    for filter in filters {
698        let column_sql = apply_sql_cast(
699            qualified_local_column(alias, &filter.column)?,
700            filter.column_cast.as_deref(),
701        )?;
702        match filter.operator {
703            StructuredFilterOperator::Eq => {
704                let value = filter
705                    .values
706                    .first()
707                    .ok_or_else(|| format!("missing eq value for '{}'", filter.column))?;
708                if value.is_null() {
709                    clauses.push(format!("{column_sql} IS NULL"));
710                } else {
711                    clauses.push(format!(
712                        "{column_sql} = {}",
713                        apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
714                    ));
715                }
716            }
717            StructuredFilterOperator::Neq => {
718                let value = filter
719                    .values
720                    .first()
721                    .ok_or_else(|| format!("missing neq value for '{}'", filter.column))?;
722                if value.is_null() {
723                    clauses.push(format!("{column_sql} IS NOT NULL"));
724                } else {
725                    clauses.push(format!(
726                        "{column_sql} <> {}",
727                        apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
728                    ));
729                }
730            }
731            StructuredFilterOperator::Gt => {
732                let value = filter
733                    .values
734                    .first()
735                    .ok_or_else(|| format!("missing gt value for '{}'", filter.column))?;
736                clauses.push(format!(
737                    "{column_sql} > {}",
738                    apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
739                ));
740            }
741            StructuredFilterOperator::Lt => {
742                let value = filter
743                    .values
744                    .first()
745                    .ok_or_else(|| format!("missing lt value for '{}'", filter.column))?;
746                clauses.push(format!(
747                    "{column_sql} < {}",
748                    apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
749                ));
750            }
751            StructuredFilterOperator::In => {
752                if filter.values.is_empty() {
753                    clauses.push("FALSE".to_string());
754                } else {
755                    let values = filter
756                        .values
757                        .iter()
758                        .map(|value| {
759                            apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())
760                        })
761                        .collect::<Result<Vec<String>, String>>()?
762                        .join(", ");
763                    clauses.push(format!("{column_sql} IN ({values})"));
764                }
765            }
766        }
767    }
768
769    Ok(format!("WHERE {}", clauses.join(" AND ")))
770}
771
772fn build_order_clause(order_by: &[StructuredOrderBy], alias: &str) -> Result<String, String> {
773    if order_by.is_empty() {
774        return Ok(String::new());
775    }
776    let fragments: Vec<String> = order_by
777        .iter()
778        .map(|order| {
779            let column_sql = qualified_local_column(alias, &order.column)?;
780            Ok(format!("{column_sql} {}", order.direction.sql_keyword()))
781        })
782        .collect::<Result<Vec<String>, String>>()?;
783    Ok(format!("ORDER BY {}", fragments.join(", ")))
784}
785
786fn effective_relation_ordering(
787    order_by: &[StructuredOrderBy],
788    source_alias: &str,
789) -> Result<Vec<(String, StructuredSortDirection, String)>, String> {
790    if order_by.is_empty() {
791        return Ok(Vec::new());
792    }
793
794    order_by
795        .iter()
796        .cloned()
797        .into_iter()
798        .enumerate()
799        .map(|(index, order)| {
800            let column_sql = qualified_local_column(source_alias, &order.column)?;
801            Ok((
802                column_sql,
803                order.direction,
804                format!("__athena_order_{index}"),
805            ))
806        })
807        .collect()
808}
809
810fn qualified_local_column(alias: &str, column: &str) -> Result<String, String> {
811    if column.contains('.') {
812        return Err(format!(
813            "unsupported dotted column reference '{column}' at this query level"
814        ));
815    }
816    let column_sql = sanitize_identifier(column)
817        .ok_or_else(|| format!("invalid column identifier '{column}'"))?;
818    Ok(format!("{alias}.{column_sql}"))
819}
820
821fn relation_join_condition(
822    catalog: &StructuredSchemaCatalog,
823    default_schema: Option<&str>,
824    relation: &StructuredRelationField,
825    parent_alias: &str,
826    parent_table_selector: &str,
827    child_alias: &str,
828    child_table_selector: &str,
829) -> Result<(String, bool), String> {
830    let parent_table = resolve_structured_table_ref(parent_table_selector, default_schema)?;
831    let child_table = resolve_structured_table_ref(child_table_selector, default_schema)?;
832
833    if let Some(foreign_key) = relation.foreign_key.as_deref() {
834        if let Some(stripped) = foreign_key.strip_prefix("parent.") {
835            let fk = sanitize_identifier(stripped)
836                .ok_or_else(|| format!("invalid parent foreign key '{foreign_key}'"))?;
837            return Ok((format!("{parent_alias}.{fk} = {child_alias}.\"id\""), true));
838        }
839        if let Some(stripped) = foreign_key.strip_prefix("child.") {
840            let fk = sanitize_identifier(stripped)
841                .ok_or_else(|| format!("invalid child foreign key '{foreign_key}'"))?;
842            return Ok((format!("{child_alias}.{fk} = {parent_alias}.\"id\""), false));
843        }
844
845        let fk = sanitize_identifier(foreign_key)
846            .ok_or_else(|| format!("invalid foreign key '{foreign_key}'"))?;
847        let fk_on_parent = catalog.table_has_column(&parent_table, foreign_key);
848        let fk_on_child = catalog.table_has_column(&child_table, foreign_key);
849
850        return match (fk_on_parent, fk_on_child) {
851            (true, false) => Ok((format!("{parent_alias}.{fk} = {child_alias}.\"id\""), true)),
852            (false, true) => Ok((format!("{child_alias}.{fk} = {parent_alias}.\"id\""), false)),
853            (true, true) => Err(format!(
854                "foreign_key '{}' exists on both '{}' and '{}'; prefix it with parent. or child.",
855                foreign_key, parent_table.table_name, child_table.table_name
856            )),
857            (false, false) => Err(format!(
858                "foreign_key '{}' was not found on '{}' or '{}'",
859                foreign_key, parent_table.table_name, child_table.table_name
860            )),
861        };
862    }
863
864    let child_to_parent: Vec<&StructuredForeignKeyMetadata> = catalog
865        .foreign_keys
866        .iter()
867        .filter(|metadata| {
868            metadata.child_table == child_table && metadata.parent_table == parent_table
869        })
870        .collect();
871    let parent_to_child: Vec<&StructuredForeignKeyMetadata> = catalog
872        .foreign_keys
873        .iter()
874        .filter(|metadata| {
875            metadata.child_table == parent_table && metadata.parent_table == child_table
876        })
877        .collect();
878
879    match (child_to_parent.as_slice(), parent_to_child.as_slice()) {
880        ([metadata], []) => {
881            let child_fk = sanitize_identifier(&metadata.child_column).ok_or_else(|| {
882                format!(
883                    "invalid inferred child foreign key '{}'",
884                    metadata.child_column
885                )
886            })?;
887            let parent_pk = sanitize_identifier(&metadata.parent_column).ok_or_else(|| {
888                format!(
889                    "invalid inferred parent reference column '{}'",
890                    metadata.parent_column
891                )
892            })?;
893            return Ok((
894                format!("{child_alias}.{child_fk} = {parent_alias}.{parent_pk}"),
895                false,
896            ));
897        }
898        ([], [metadata]) => {
899            let parent_fk = sanitize_identifier(&metadata.child_column).ok_or_else(|| {
900                format!(
901                    "invalid inferred parent foreign key '{}'",
902                    metadata.child_column
903                )
904            })?;
905            let child_pk = sanitize_identifier(&metadata.parent_column).ok_or_else(|| {
906                format!(
907                    "invalid inferred child reference column '{}'",
908                    metadata.parent_column
909                )
910            })?;
911            return Ok((
912                format!("{parent_alias}.{parent_fk} = {child_alias}.{child_pk}"),
913                true,
914            ));
915        }
916        ([], []) => {}
917        _ => {
918            return Err(format!(
919                "relation '{}' between '{}' and '{}' is ambiguous; provide foreign_key",
920                relation.display_name(),
921                parent_table.table_name,
922                child_table.table_name
923            ));
924        }
925    }
926
927    let child_fk: String = format!("{}_id", singularize_identifier(&parent_table.table_name));
928    let parent_fk: String = format!("{}_id", singularize_identifier(&child_table.table_name));
929    let child_has_fk = catalog.table_has_column(&child_table, &child_fk);
930    let parent_has_fk = catalog.table_has_column(&parent_table, &parent_fk);
931
932    if child_has_fk && !parent_has_fk {
933        let child_fk_sql = sanitize_identifier(&child_fk)
934            .ok_or_else(|| format!("invalid inferred child foreign key '{child_fk}'"))?;
935        Ok((
936            format!("{child_alias}.{child_fk_sql} = {parent_alias}.\"id\""),
937            false,
938        ))
939    } else if parent_has_fk && !child_has_fk {
940        let parent_fk_sql = sanitize_identifier(&parent_fk)
941            .ok_or_else(|| format!("invalid inferred parent foreign key '{parent_fk}'"))?;
942        Ok((
943            format!("{parent_alias}.{parent_fk_sql} = {child_alias}.\"id\""),
944            true,
945        ))
946    } else if child_has_fk && parent_has_fk {
947        Err(format!(
948            "relation '{}' between '{}' and '{}' is ambiguous; provide foreign_key",
949            relation.display_name(),
950            parent_table.table_name,
951            child_table.table_name
952        ))
953    } else {
954        Err(format!(
955            "could not resolve relation '{}' between '{}' and '{}'; add a foreign key constraint or provide foreign_key",
956            relation.display_name(),
957            parent_table.table_name,
958            child_table.table_name
959        ))
960    }
961}
962
963#[cfg(test)]
964mod tests {
965    use super::*;
966    use crate::build_structured_fetch_plan;
967    use serde_json::json;
968
969    fn build_plan(
970        body: &Value,
971        force_camel_case_to_snake_case: bool,
972    ) -> StructuredGatewayFetchPlan {
973        build_structured_fetch_plan(body, force_camel_case_to_snake_case)
974            .expect("plan ok")
975            .expect("structured plan")
976    }
977
978    fn table_ref(raw: &str) -> StructuredTableRef {
979        resolve_structured_table_ref(raw, Some("public")).expect("table ref")
980    }
981
982    fn test_catalog(
983        foreign_keys: &[(&str, &str, &str, &str, &str, &str)],
984        extra_columns: &[(&str, &[&str])],
985    ) -> StructuredSchemaCatalog {
986        let mut table_columns: HashMap<StructuredTableRef, HashSet<String>> = HashMap::new();
987        let mut catalog_foreign_keys = Vec::new();
988
989        for (child_table, child_column, parent_table, parent_column, child_schema, parent_schema) in
990            foreign_keys
991        {
992            let child_ref = resolve_structured_table_ref(
993                &format!("{child_schema}.{child_table}"),
994                Some("public"),
995            )
996            .expect("child ref");
997            let parent_ref = resolve_structured_table_ref(
998                &format!("{parent_schema}.{parent_table}"),
999                Some("public"),
1000            )
1001            .expect("parent ref");
1002
1003            table_columns
1004                .entry(child_ref.clone())
1005                .or_default()
1006                .insert((*child_column).to_string());
1007            table_columns
1008                .entry(parent_ref.clone())
1009                .or_default()
1010                .insert((*parent_column).to_string());
1011
1012            catalog_foreign_keys.push(StructuredForeignKeyMetadata {
1013                child_table: child_ref,
1014                child_column: (*child_column).to_string(),
1015                parent_table: parent_ref,
1016                parent_column: (*parent_column).to_string(),
1017            });
1018        }
1019
1020        for (table_name, columns) in extra_columns {
1021            let table_ref = table_ref(table_name);
1022            let entry = table_columns.entry(table_ref).or_default();
1023            for column in *columns {
1024                entry.insert((*column).to_string());
1025            }
1026        }
1027
1028        StructuredSchemaCatalog {
1029            table_columns,
1030            foreign_keys: catalog_foreign_keys,
1031        }
1032    }
1033
1034    fn compile_sql(
1035        body: &Value,
1036        force_camel_case_to_snake_case: bool,
1037        catalog: &StructuredSchemaCatalog,
1038    ) -> String {
1039        let plan = build_plan(body, force_camel_case_to_snake_case);
1040        compile_structured_fetch_sql_for_catalog(&plan, catalog).expect("compiled sql")
1041    }
1042
1043    #[test]
1044    fn structured_select_object_builds_nested_sql() {
1045        let body = json!({
1046            "table_name": "orchestral_sections",
1047            "select": {
1048                "name": true,
1049                "instruments": {
1050                    "select": {
1051                        "name": true,
1052                        "active": true
1053                    },
1054                    "where": {
1055                        "active": { "eq": true }
1056                    },
1057                    "orderBy": {
1058                        "name": "asc"
1059                    },
1060                    "limit": 10
1061                }
1062            },
1063            "where": {
1064                "name": { "neq": "brass" }
1065            },
1066            "orderBy": {
1067                "name": "desc"
1068            },
1069            "limit": 100
1070        });
1071
1072        let plan = build_plan(&body, false);
1073        let sql = compile_sql(
1074            &body,
1075            false,
1076            &test_catalog(
1077                &[(
1078                    "instruments",
1079                    "orchestral_section_id",
1080                    "orchestral_sections",
1081                    "id",
1082                    "public",
1083                    "public",
1084                )],
1085                &[],
1086            ),
1087        );
1088
1089        assert_eq!(plan.table_name, "orchestral_sections");
1090        assert!(sql.contains("FROM \"orchestral_sections\" t0"));
1091        assert!(sql.contains("LEFT JOIN LATERAL"));
1092        assert!(sql.contains("FROM (\nSELECT jsonb_build_object"));
1093        assert!(sql.contains("instruments_0.\"active\" = true"));
1094        assert!(sql.contains("ORDER BY instruments_0.\"name\" ASC"));
1095        assert!(sql.contains("LIMIT 10"));
1096        assert!(sql.contains("ORDER BY t0.\"name\" DESC"));
1097        assert!(sql.contains("LIMIT 100"));
1098    }
1099
1100    #[test]
1101    fn structured_fetch_rejects_top_level_direct_ast_body() {
1102        let body = json!({
1103            "operation": "select",
1104            "from": "orchestral_sections",
1105            "fields": [
1106                {
1107                    "kind": "column",
1108                    "name": "name"
1109                }
1110            ]
1111        });
1112
1113        let err = build_structured_fetch_plan(&body, false).expect_err("ast should be rejected");
1114        assert!(err.contains("first-class direct AST request bodies are not supported"));
1115    }
1116
1117    #[test]
1118    fn structured_select_string_distributes_relation_filters() {
1119        let body = json!({
1120            "table_name": "orchestral_sections",
1121            "select": "id,name,instruments(name)",
1122            "where": {
1123                "instruments.name": { "eq": "flute" }
1124            }
1125        });
1126
1127        let sql = compile_sql(
1128            &body,
1129            false,
1130            &test_catalog(
1131                &[(
1132                    "instruments",
1133                    "orchestral_section_id",
1134                    "orchestral_sections",
1135                    "id",
1136                    "public",
1137                    "public",
1138                )],
1139                &[],
1140            ),
1141        );
1142
1143        assert!(sql.contains("instruments_0.\"name\" = 'flute'"));
1144        assert!(!sql.contains("WHERE t0.\"instruments\""));
1145    }
1146
1147    #[test]
1148    fn structured_select_normalizes_camel_case_when_requested() {
1149        let body = json!({
1150            "table_name": "client_statistics",
1151            "select": {
1152                "createdAt": true
1153            },
1154            "where": {
1155                "createdAt": { "gt": "2026-01-01T00:00:00Z" }
1156            }
1157        });
1158
1159        let sql = compile_sql(
1160            &body,
1161            true,
1162            &test_catalog(&[], &[("client_statistics", &["created_at"])]),
1163        );
1164
1165        assert!(sql.contains("t0.\"created_at\""));
1166        assert!(sql.contains("t0.\"created_at\" > '2026-01-01T00:00:00Z'"));
1167    }
1168
1169    #[test]
1170    fn structured_select_collects_nested_resource_names() {
1171        let body = json!({
1172            "table_name": "orchestral_sections",
1173            "select": {
1174                "name": true,
1175                "instruments": {
1176                    "select": {
1177                        "name": true,
1178                        "players": {
1179                            "select": {
1180                                "display_name": true
1181                            }
1182                        }
1183                    }
1184                }
1185            }
1186        });
1187
1188        let plan = build_plan(&body, false);
1189
1190        assert_eq!(
1191            plan.resource_names(),
1192            vec![
1193                "instruments".to_string(),
1194                "orchestral_sections".to_string(),
1195                "players".to_string(),
1196            ]
1197        );
1198    }
1199
1200    #[test]
1201    fn structured_select_cross_schema_relation_string_builds_sql() {
1202        let body = json!({
1203            "table_name": "public.chat_subscriptions",
1204            "select": "user_id,users:athena.users(id,username,image)",
1205            "where_filters": [
1206                {
1207                    "column": "user_id",
1208                    "operator": "eq",
1209                    "value": "ef7a4c74-cc35-4d32-945a-a5271279ecdb",
1210                    "column_cast": "text"
1211                }
1212            ],
1213            "orderBy": [
1214                {
1215                    "column": "user_id",
1216                    "direction": "desc"
1217                }
1218            ],
1219            "limit": 1
1220        });
1221
1222        let sql = compile_sql(
1223            &body,
1224            false,
1225            &test_catalog(
1226                &[(
1227                    "chat_subscriptions",
1228                    "user_id",
1229                    "users",
1230                    "id",
1231                    "public",
1232                    "athena",
1233                )],
1234                &[("athena.users", &["id", "username", "image"])],
1235            ),
1236        );
1237
1238        assert!(sql.contains("FROM \"public\".\"chat_subscriptions\" t0"));
1239        assert!(sql.contains("FROM \"athena\".\"users\" athenausers_0"));
1240        assert!(sql.contains("t0.\"user_id\" = athenausers_0.\"id\""));
1241        assert!(sql.contains("t0.\"user_id\""));
1242        assert!(sql.contains("ef7a4c74-cc35-4d32-945a-a5271279ecdb"));
1243        assert!(sql.contains("ORDER BY t0.\"user_id\" DESC"));
1244        assert!(sql.contains("LIMIT 1"));
1245    }
1246
1247    #[test]
1248    fn structured_select_missing_cross_schema_relation_target_fails_validation() {
1249        let body = json!({
1250            "table_name": "chat_subscriptions",
1251            "schema_name": "public",
1252            "select": "user_id,users:athena.user(id)"
1253        });
1254        let plan = build_plan(&body, false);
1255        let err = compile_structured_fetch_sql_for_catalog(
1256            &plan,
1257            &test_catalog(&[], &[("public.chat_subscriptions", &["id", "user_id"])]),
1258        )
1259        .expect_err("missing relation target should fail before SQL execution");
1260
1261        assert_eq!(
1262            err,
1263            "table 'athena.user' was not found for structured gateway fetch"
1264        );
1265    }
1266
1267    #[test]
1268    fn structured_select_where_only_body_stays_legacy() {
1269        let body = json!({
1270            "table_name": "users",
1271            "where": {
1272                "id": { "eq": 1 }
1273            }
1274        });
1275
1276        assert!(
1277            build_structured_fetch_plan(&body, false)
1278                .expect("plan ok")
1279                .is_none()
1280        );
1281    }
1282
1283    #[test]
1284    fn structured_select_inner_join_filters_empty_collections() {
1285        let body = json!({
1286            "table_name": "orchestral_sections",
1287            "select": "name,instruments!inner(name)"
1288        });
1289
1290        let sql = compile_sql(
1291            &body,
1292            false,
1293            &test_catalog(
1294                &[(
1295                    "instruments",
1296                    "orchestral_section_id",
1297                    "orchestral_sections",
1298                    "id",
1299                    "public",
1300                    "public",
1301                )],
1302                &[],
1303            ),
1304        );
1305
1306        assert!(sql.contains("jsonb_array_length(instruments_0_agg.data) > 0"));
1307        assert!(!sql.contains("instruments_0_agg.data IS NOT NULL"));
1308    }
1309
1310    #[test]
1311    fn structured_select_uses_catalog_for_many_to_one_relations() {
1312        let body = json!({
1313            "table_name": "invoices",
1314            "select": {
1315                "invoice_number": true,
1316                "customer": {
1317                    "from": "customers",
1318                    "select": {
1319                        "name": true
1320                    }
1321                }
1322            }
1323        });
1324
1325        let sql = compile_sql(
1326            &body,
1327            false,
1328            &test_catalog(
1329                &[(
1330                    "invoices",
1331                    "customer_id",
1332                    "customers",
1333                    "id",
1334                    "public",
1335                    "public",
1336                )],
1337                &[
1338                    ("invoices", &["invoice_number", "customer_id"]),
1339                    ("customers", &["id", "name"]),
1340                ],
1341            ),
1342        );
1343
1344        assert!(sql.contains("t0.\"customer_id\" = customer_0.\"id\""));
1345        assert!(sql.contains("LIMIT 1"));
1346    }
1347
1348    #[test]
1349    fn structured_select_relation_without_order_by_does_not_require_id() {
1350        let body = json!({
1351            "table_name": "orchestral_sections",
1352            "select": {
1353                "name": true,
1354                "instruments": {
1355                    "select": {
1356                        "name": true
1357                    }
1358                }
1359            }
1360        });
1361
1362        let sql = compile_sql(
1363            &body,
1364            false,
1365            &test_catalog(
1366                &[(
1367                    "instruments",
1368                    "orchestral_section_id",
1369                    "orchestral_sections",
1370                    "id",
1371                    "public",
1372                    "public",
1373                )],
1374                &[
1375                    ("orchestral_sections", &["id", "name"]),
1376                    ("instruments", &["name", "orchestral_section_id"]),
1377                ],
1378            ),
1379        );
1380
1381        assert!(!sql.contains("__athena_order_"));
1382        assert!(!sql.contains("ORDER BY instruments_0.\"id\""));
1383    }
1384
1385    #[test]
1386    fn structured_select_ambiguous_relation_requires_foreign_key() {
1387        let body = json!({
1388            "table_name": "orchestral_sections",
1389            "select": {
1390                "instruments": {
1391                    "select": {
1392                        "name": true
1393                    }
1394                }
1395            }
1396        });
1397
1398        let plan = build_plan(&body, false);
1399        let err = compile_structured_fetch_sql_for_catalog(
1400            &plan,
1401            &test_catalog(
1402                &[],
1403                &[
1404                    ("orchestral_sections", &["id", "instrument_id"]),
1405                    ("instruments", &["id", "orchestral_section_id", "name"]),
1406                ],
1407            ),
1408        )
1409        .expect_err("ambiguous relation should fail");
1410
1411        assert!(err.contains("ambiguous"));
1412    }
1413}