1use crate::{
10 StructuredFilter, StructuredFilterOperator, StructuredGatewayFetchPlan, StructuredJoinKind,
11 StructuredOrderBy, StructuredRelationField, StructuredSelectField, StructuredSelectOperation,
12 StructuredSelectQuery, StructuredSortDirection, sanitize_identifier,
13};
14use athena_driver::postgresql::column_resolver::resolve_information_schema_targets;
15use athena_driver::postgresql::raw_sql::{
16 PostgresSqlExecutionResult, execute_postgres_sql, execute_postgres_sql_in_schema,
17};
18use serde_json::Value;
19use sqlx::PgPool;
20use std::collections::{BTreeSet, HashMap, HashSet};
21
22fn sanitize_qualified_identifier(identifier: &str) -> Option<String> {
23 let mut parts = Vec::new();
24 for segment in identifier.split('.') {
25 let trimmed = segment.trim().trim_matches('"');
26 if trimmed.is_empty() {
27 return None;
28 }
29 parts.push(sanitize_identifier(trimmed)?);
30 }
31 if parts.is_empty() {
32 return None;
33 }
34 Some(parts.join("."))
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38struct StructuredTableRef {
39 schema_name: String,
40 table_name: String,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44struct StructuredForeignKeyMetadata {
45 child_table: StructuredTableRef,
46 child_column: String,
47 parent_table: StructuredTableRef,
48 parent_column: String,
49}
50
51#[derive(Debug, Default, Clone)]
52struct StructuredSchemaCatalog {
53 table_columns: HashMap<StructuredTableRef, HashSet<String>>,
54 foreign_keys: Vec<StructuredForeignKeyMetadata>,
55}
56
57impl StructuredSchemaCatalog {
58 fn table_has_column(&self, table: &StructuredTableRef, column: &str) -> bool {
59 self.table_columns
60 .get(table)
61 .map(|columns| columns.contains(column))
62 .unwrap_or(false)
63 }
64}
65
66struct StructuredSqlEmitter<'a> {
67 alias_counter: usize,
68 catalog: &'a StructuredSchemaCatalog,
69 default_schema: Option<&'a str>,
70}
71
72pub async fn render_structured_fetch_sql(
74 pool: &PgPool,
75 plan: &StructuredGatewayFetchPlan,
76) -> Result<String, String> {
77 let catalog = load_structured_schema_catalog(pool, plan)
78 .await
79 .map_err(|err| err.to_string())?;
80 compile_structured_fetch_sql_for_catalog(plan, &catalog)
81}
82
83pub async fn execute_structured_fetch_sql(
85 pool: &PgPool,
86 plan: &StructuredGatewayFetchPlan,
87 sql: &str,
88) -> Result<PostgresSqlExecutionResult, sqlx::Error> {
89 match plan.schema_name.as_deref() {
90 Some(schema_name) => execute_postgres_sql_in_schema(pool, sql, schema_name).await,
91 None => execute_postgres_sql(pool, sql).await,
92 }
93}
94
95fn escape_sql_string(input: &str) -> String {
96 input.replace('\'', "''")
97}
98
99fn format_sql_value(value: &Value) -> String {
100 match value {
101 Value::Null => "NULL".to_string(),
102 Value::Bool(flag) => flag.to_string(),
103 Value::Number(number) => number.to_string(),
104 Value::String(text) => format!("'{}'", escape_sql_string(text)),
105 Value::Array(_) | Value::Object(_) => {
106 let json_text = serde_json::to_string(value).unwrap_or_else(|_| "null".to_string());
107 format!("'{}'::jsonb", escape_sql_string(&json_text))
108 }
109 }
110}
111
112fn sanitize_type_cast_sql(raw_cast: &str) -> Result<String, String> {
113 sanitize_qualified_identifier(raw_cast)
114 .ok_or_else(|| format!("invalid SQL type cast '{raw_cast}'"))
115}
116
117fn apply_sql_cast(expression: String, raw_cast: Option<&str>) -> Result<String, String> {
118 match raw_cast {
119 Some(raw_cast) => Ok(format!(
120 "({expression})::{}",
121 sanitize_type_cast_sql(raw_cast)?
122 )),
123 None => Ok(expression),
124 }
125}
126
127fn singularize_identifier(raw: &str) -> &str {
128 raw.trim_end_matches('s')
129}
130
131fn collect_query_table_refs(
132 query: &StructuredSelectQuery,
133 default_schema: Option<&str>,
134 refs: &mut HashSet<StructuredTableRef>,
135) -> Result<(), String> {
136 refs.insert(resolve_structured_table_ref(&query.from, default_schema)?);
137
138 for field in &query.fields {
139 if let StructuredSelectField::Relation(relation) = field {
140 collect_query_table_refs(&relation.query, default_schema, refs)?;
141 }
142 }
143
144 Ok(())
145}
146
147fn resolve_structured_table_ref(
148 raw: &str,
149 default_schema: Option<&str>,
150) -> Result<StructuredTableRef, String> {
151 let trimmed = raw.trim();
152 if trimmed.is_empty() {
153 return Err("structured relation table cannot be empty".to_string());
154 }
155
156 let segments: Vec<&str> = trimmed.split('.').map(str::trim).collect();
157 match segments.as_slice() {
158 [table_name] => {
159 let schema_name = default_schema.unwrap_or("public");
160 let (_, resolved_table_name) = resolve_information_schema_targets(table_name, true)
161 .map_err(|err| err.to_string())?;
162 Ok(StructuredTableRef {
163 schema_name: schema_name.to_string(),
164 table_name: resolved_table_name,
165 })
166 }
167 [schema_name, table_name] => {
168 let (resolved_schema, resolved_table_name) =
169 resolve_information_schema_targets(trimmed, true).map_err(|err| err.to_string())?;
170 if !schema_name.eq_ignore_ascii_case(&resolved_schema)
171 || !table_name.eq_ignore_ascii_case(&resolved_table_name)
172 {
173 return Err(format!("invalid table selector '{trimmed}'"));
174 }
175 Ok(StructuredTableRef {
176 schema_name: resolved_schema,
177 table_name: resolved_table_name,
178 })
179 }
180 _ => Err(format!(
181 "table reference '{trimmed}' must be 'table' or 'schema.table'"
182 )),
183 }
184}
185
186fn render_structured_table_ref(table_ref: &StructuredTableRef) -> String {
187 format!("{}.{}", table_ref.schema_name, table_ref.table_name)
188}
189
190fn missing_structured_table_error(missing_tables: &[StructuredTableRef]) -> String {
191 match missing_tables {
192 [] => "structured fetch references no tables".to_string(),
193 [table_ref] => format!(
194 "table '{}' was not found for structured gateway fetch",
195 render_structured_table_ref(table_ref)
196 ),
197 _ => {
198 let tables = missing_tables
199 .iter()
200 .map(render_structured_table_ref)
201 .collect::<Vec<String>>()
202 .join(", ");
203 format!("tables were not found for structured gateway fetch: {tables}")
204 }
205 }
206}
207
208fn render_sql_string_list(values: &BTreeSet<String>) -> String {
209 values
210 .iter()
211 .map(|value| format!("'{}'", escape_sql_string(value)))
212 .collect::<Vec<String>>()
213 .join(", ")
214}
215
216async fn load_structured_schema_catalog(
217 pool: &PgPool,
218 plan: &StructuredGatewayFetchPlan,
219) -> Result<StructuredSchemaCatalog, sqlx::Error> {
220 let mut table_refs = HashSet::new();
221 collect_query_table_refs(&plan.query, plan.schema_name.as_deref(), &mut table_refs)
222 .map_err(sqlx::Error::Protocol)?;
223
224 let mut schemas = BTreeSet::new();
225 for table_ref in &table_refs {
226 schemas.insert(table_ref.schema_name.clone());
227 }
228 let schema_filter = render_sql_string_list(&schemas);
229
230 let column_rows: Vec<(String, String, String)> =
231 sqlx::query_as::<_, (String, String, String)>(&format!(
232 "SELECT table_schema, table_name, column_name \
233 FROM information_schema.columns \
234 WHERE table_schema IN ({schema_filter})"
235 ))
236 .fetch_all(pool)
237 .await?;
238
239 let mut table_columns: HashMap<StructuredTableRef, HashSet<String>> = HashMap::new();
240 for (schema_name, table_name, column_name) in column_rows {
241 let table_ref = StructuredTableRef {
242 schema_name,
243 table_name,
244 };
245 if table_refs.contains(&table_ref) {
246 table_columns
247 .entry(table_ref)
248 .or_default()
249 .insert(column_name);
250 }
251 }
252
253 let foreign_key_rows: Vec<(String, String, String, String, String, String)> =
254 sqlx::query_as::<_, (String, String, String, String, String, String)>(
255 &format!(
256 "SELECT \
257 tc.table_schema, \
258 tc.table_name, \
259 kcu.column_name, \
260 ccu.table_schema, \
261 ccu.table_name, \
262 ccu.column_name \
263 FROM information_schema.table_constraints AS tc \
264 JOIN information_schema.key_column_usage AS kcu \
265 ON tc.constraint_name = kcu.constraint_name \
266 AND tc.constraint_schema = kcu.constraint_schema \
267 AND tc.table_schema = kcu.table_schema \
268 AND tc.table_name = kcu.table_name \
269 JOIN information_schema.constraint_column_usage AS ccu \
270 ON ccu.constraint_name = tc.constraint_name \
271 AND ccu.constraint_schema = tc.constraint_schema \
272 WHERE tc.constraint_type = 'FOREIGN KEY' \
273 AND (tc.table_schema IN ({schema_filter}) OR ccu.table_schema IN ({schema_filter}))"
274 ),
275 )
276 .fetch_all(pool)
277 .await?;
278
279 let foreign_keys = foreign_key_rows
280 .into_iter()
281 .filter_map(
282 |(
283 child_schema,
284 child_table,
285 child_column,
286 parent_schema,
287 parent_table,
288 parent_column,
289 )| {
290 let child_table_ref = StructuredTableRef {
291 schema_name: child_schema,
292 table_name: child_table,
293 };
294 let parent_table_ref = StructuredTableRef {
295 schema_name: parent_schema,
296 table_name: parent_table,
297 };
298
299 if !table_refs.contains(&child_table_ref) && !table_refs.contains(&parent_table_ref)
300 {
301 return None;
302 }
303
304 Some(StructuredForeignKeyMetadata {
305 child_table: child_table_ref,
306 child_column,
307 parent_table: parent_table_ref,
308 parent_column,
309 })
310 },
311 )
312 .collect();
313
314 Ok(StructuredSchemaCatalog {
315 table_columns,
316 foreign_keys,
317 })
318}
319
320fn compile_structured_fetch_sql_for_catalog(
321 plan: &StructuredGatewayFetchPlan,
322 catalog: &StructuredSchemaCatalog,
323) -> Result<String, String> {
324 let mut referenced_tables = HashSet::new();
325 collect_query_table_refs(
326 &plan.query,
327 plan.schema_name.as_deref(),
328 &mut referenced_tables,
329 )?;
330 let mut missing_tables: Vec<StructuredTableRef> = referenced_tables
331 .into_iter()
332 .filter(|table_ref| !catalog.table_columns.contains_key(table_ref))
333 .collect();
334 missing_tables.sort_by(|left, right| {
335 left.schema_name
336 .cmp(&right.schema_name)
337 .then(left.table_name.cmp(&right.table_name))
338 });
339 if !missing_tables.is_empty() {
340 return Err(missing_structured_table_error(&missing_tables));
341 }
342
343 StructuredSqlEmitter {
344 alias_counter: 0,
345 catalog,
346 default_schema: plan.schema_name.as_deref(),
347 }
348 .emit_root(&plan.query)
349}
350
351fn distribute_filters(
352 fields: &mut [StructuredSelectField],
353 filters: Vec<StructuredFilter>,
354) -> Vec<StructuredFilter> {
355 let mut remaining = Vec::new();
356
357 for filter in filters {
358 let Some((head, tail)) = filter.column.split_once('.') else {
359 remaining.push(filter);
360 continue;
361 };
362
363 let maybe_relation = fields.iter_mut().find_map(|field| match field {
364 StructuredSelectField::Relation(relation)
365 if relation.display_name() == head || relation.name == head =>
366 {
367 Some(relation)
368 }
369 _ => None,
370 });
371
372 if let Some(relation) = maybe_relation {
373 relation.query.filters.push(StructuredFilter {
374 column: tail.to_string(),
375 operator: filter.operator,
376 values: filter.values,
377 column_cast: filter.column_cast,
378 value_cast: filter.value_cast,
379 });
380 } else {
381 remaining.push(filter);
382 }
383 }
384
385 remaining
386}
387
388impl<'a> StructuredSqlEmitter<'a> {
389 fn emit_root(mut self, query: &StructuredSelectQuery) -> Result<String, String> {
390 if query.operation != StructuredSelectOperation::Select {
391 return Err("only structured select queries are supported".to_string());
392 }
393
394 let mut query: StructuredSelectQuery = query.clone();
395 query.filters = distribute_filters(&mut query.fields, query.filters.clone());
396
397 let base_alias: &str = "t0";
398 let table_sql: String = sanitize_root_table_name(&query.from)?;
399 let mut select_exprs: Vec<String> = Vec::new();
400 let mut joins: Vec<String> = Vec::new();
401
402 for field in &query.fields {
403 self.emit_field(
404 field,
405 base_alias,
406 &query.from,
407 &mut select_exprs,
408 &mut joins,
409 )?;
410 }
411
412 if select_exprs.is_empty() {
413 return Err(
414 "structured gateway fetch requires at least one projected field".to_string(),
415 );
416 }
417
418 let mut sql: String = format!(
419 "SELECT {} FROM {} {}",
420 select_exprs.join(", "),
421 table_sql,
422 base_alias
423 );
424
425 if !joins.is_empty() {
426 sql.push('\n');
427 sql.push_str(&joins.join("\n"));
428 }
429
430 let where_clause: String = build_where_clause(&query.filters, base_alias)?;
431 if !where_clause.is_empty() {
432 sql.push('\n');
433 sql.push_str(&where_clause);
434 }
435
436 let order_clause: String = build_order_clause(&query.order_by, base_alias)?;
437 if !order_clause.is_empty() {
438 sql.push('\n');
439 sql.push_str(&order_clause);
440 }
441
442 if let Some(limit) = query.limit {
443 sql.push_str(&format!("\nLIMIT {limit}"));
444 }
445 if let Some(offset) = query.offset {
446 sql.push_str(&format!("\nOFFSET {offset}"));
447 }
448
449 Ok(sql)
450 }
451
452 fn emit_field(
453 &mut self,
454 field: &StructuredSelectField,
455 parent_alias: &str,
456 parent_table_selector: &str,
457 select_exprs: &mut Vec<String>,
458 joins: &mut Vec<String>,
459 ) -> Result<(), String> {
460 match field {
461 StructuredSelectField::Column(column) => {
462 let column_sql = sanitize_identifier(&column.name)
463 .ok_or_else(|| format!("invalid projected column '{}'", column.name))?;
464 match column.alias.as_deref() {
465 Some(alias) => {
466 let alias_sql = sanitize_identifier(alias)
467 .ok_or_else(|| format!("invalid projected alias '{alias}'"))?;
468 select_exprs.push(format!("{parent_alias}.{column_sql} AS {alias_sql}"));
469 }
470 None => {
471 select_exprs.push(format!("{parent_alias}.{column_sql}"));
472 }
473 }
474 }
475 StructuredSelectField::Relation(relation) => {
476 let child_alias: String = self.next_alias(&relation.name);
477 let agg_alias: String = format!("{child_alias}_agg");
478 let join_sql: String = self.emit_relation_subquery(
479 relation,
480 parent_alias,
481 parent_table_selector,
482 &child_alias,
483 &agg_alias,
484 )?;
485 joins.push(join_sql);
486 let output_alias =
487 sanitize_identifier(relation.display_name()).ok_or_else(|| {
488 format!(
489 "invalid relation output alias '{}'",
490 relation.display_name()
491 )
492 })?;
493 select_exprs.push(format!("{agg_alias}.data AS {output_alias}"));
494 }
495 }
496
497 Ok(())
498 }
499
500 fn emit_relation_subquery(
501 &mut self,
502 relation: &StructuredRelationField,
503 parent_alias: &str,
504 parent_table_selector: &str,
505 child_alias: &str,
506 agg_alias: &str,
507 ) -> Result<String, String> {
508 let mut query: StructuredSelectQuery = relation.query.clone();
509 query.filters = distribute_filters(&mut query.fields, query.filters.clone());
510
511 let child_table_sql = sanitize_root_table_name(&query.from)?;
512 let (row_expr, nested_joins) = self.emit_relation_row(&query, child_alias)?;
513 let (join_condition, many_to_one) = relation_join_condition(
514 self.catalog,
515 self.default_schema,
516 relation,
517 parent_alias,
518 parent_table_selector,
519 child_alias,
520 &query.from,
521 )?;
522 let local_where: String = build_where_clause(&query.filters, child_alias)?;
523 let join_type: &str = match relation.join {
524 StructuredJoinKind::Left => "LEFT JOIN",
525 StructuredJoinKind::Inner => "JOIN",
526 };
527
528 let subquery: String = if many_to_one {
529 let limit: i64 = match query.limit {
530 Some(value) if value > 1 => {
531 return Err(format!(
532 "relation '{}' resolves to one row; limit must be 0 or 1",
533 relation.display_name()
534 ));
535 }
536 Some(value) => value,
537 None => 1,
538 };
539 let order_clause = build_order_clause(&query.order_by, child_alias)?;
540 let mut sql =
541 format!("SELECT {row_expr} AS data\nFROM {child_table_sql} {child_alias}");
542 if !nested_joins.is_empty() {
543 sql.push('\n');
544 sql.push_str(&nested_joins.join("\n"));
545 }
546 sql.push_str(&format!("\nWHERE {join_condition}"));
547 if !local_where.is_empty() {
548 sql.push_str(" AND ");
549 sql.push_str(local_where.trim_start_matches("WHERE "));
550 }
551 if !order_clause.is_empty() {
552 sql.push('\n');
553 sql.push_str(&order_clause);
554 }
555 if let Some(offset) = query.offset {
556 sql.push_str(&format!("\nOFFSET {offset}"));
557 }
558 sql.push_str(&format!("\nLIMIT {limit}"));
559 sql
560 } else {
561 let orderings: Vec<(String, StructuredSortDirection, String)> =
562 effective_relation_ordering(&query.order_by, child_alias)?;
563 let inner_order: String = orderings
564 .iter()
565 .map(|(sql_expr, direction, _)| format!("{sql_expr} {}", direction.sql_keyword()))
566 .collect::<Vec<String>>()
567 .join(", ");
568 let aggregate_order: String = orderings
569 .iter()
570 .map(|(_, direction, alias)| {
571 format!("rel_window.{alias} {}", direction.sql_keyword())
572 })
573 .collect::<Vec<String>>()
574 .join(", ");
575 let order_select_columns: String = orderings
576 .iter()
577 .map(|(sql_expr, _, alias)| format!(", {sql_expr} AS {alias}"))
578 .collect::<String>();
579
580 let mut inner: String = format!(
581 "SELECT {row_expr} AS row_data{order_select_columns}\nFROM {child_table_sql} {child_alias}"
582 );
583 if !nested_joins.is_empty() {
584 inner.push('\n');
585 inner.push_str(&nested_joins.join("\n"));
586 }
587 inner.push_str(&format!("\nWHERE {join_condition}"));
588 if !local_where.is_empty() {
589 inner.push_str(" AND ");
590 inner.push_str(local_where.trim_start_matches("WHERE "));
591 }
592 if !inner_order.is_empty() {
593 inner.push_str(&format!("\nORDER BY {inner_order}"));
594 }
595 if let Some(limit) = query.limit {
596 inner.push_str(&format!("\nLIMIT {limit}"));
597 }
598 if let Some(offset) = query.offset {
599 inner.push_str(&format!("\nOFFSET {offset}"));
600 }
601
602 if aggregate_order.is_empty() {
603 format!(
604 "SELECT COALESCE(jsonb_agg(rel_window.row_data) FILTER (WHERE rel_window.row_data IS NOT NULL), '[]'::jsonb) AS data\nFROM (\n{inner}\n) rel_window"
605 )
606 } else {
607 format!(
608 "SELECT COALESCE(jsonb_agg(rel_window.row_data ORDER BY {aggregate_order}) FILTER (WHERE rel_window.row_data IS NOT NULL), '[]'::jsonb) AS data\nFROM (\n{inner}\n) rel_window"
609 )
610 }
611 };
612
613 let join_predicate = match relation.join {
614 StructuredJoinKind::Inner if many_to_one => format!("{agg_alias}.data IS NOT NULL"),
615 StructuredJoinKind::Inner => format!("jsonb_array_length({agg_alias}.data) > 0"),
616 StructuredJoinKind::Left => "TRUE".to_string(),
617 };
618
619 Ok(format!(
620 "{join_type} LATERAL (\n{subquery}\n) {agg_alias} ON {join_predicate}"
621 ))
622 }
623
624 fn emit_relation_row(
625 &mut self,
626 query: &StructuredSelectQuery,
627 alias: &str,
628 ) -> Result<(String, Vec<String>), String> {
629 let mut field_pairs = Vec::new();
630 let mut joins = Vec::new();
631
632 for field in &query.fields {
633 match field {
634 StructuredSelectField::Column(column) => {
635 let json_key = column.alias.as_deref().unwrap_or(&column.name);
636 let column_sql = sanitize_identifier(&column.name)
637 .ok_or_else(|| format!("invalid relation column '{}'", column.name))?;
638 field_pairs.push(format!("'{}', {}.{}", json_key, alias, column_sql));
639 }
640 StructuredSelectField::Relation(relation) => {
641 let child_alias = self.next_alias(&relation.name);
642 let agg_alias = format!("{child_alias}_agg");
643 let join_sql = self.emit_relation_subquery(
644 relation,
645 alias,
646 &query.from,
647 &child_alias,
648 &agg_alias,
649 )?;
650 joins.push(join_sql);
651 field_pairs.push(format!("'{}', {}.data", relation.display_name(), agg_alias));
652 }
653 }
654 }
655
656 let row_expr: String = if field_pairs.is_empty() {
657 format!("to_jsonb({alias})")
658 } else {
659 format!("jsonb_build_object({})", field_pairs.join(", "))
660 };
661
662 Ok((row_expr, joins))
663 }
664
665 fn next_alias(&mut self, base: &str) -> String {
666 let normalized: String = base
667 .chars()
668 .filter(|ch| ch.is_ascii_alphanumeric() || *ch == '_')
669 .collect::<String>();
670 let stem = if normalized.is_empty() {
671 "rel"
672 } else {
673 &normalized
674 };
675 let alias = format!("{stem}_{}", self.alias_counter);
676 self.alias_counter += 1;
677 alias
678 }
679}
680
681fn sanitize_root_table_name(table_name: &str) -> Result<String, String> {
682 if table_name.contains('.') {
683 sanitize_qualified_identifier(table_name)
684 .ok_or_else(|| format!("invalid table selector '{table_name}'"))
685 } else {
686 sanitize_identifier(table_name)
687 .ok_or_else(|| format!("invalid table selector '{table_name}'"))
688 }
689}
690
691fn build_where_clause(filters: &[StructuredFilter], alias: &str) -> Result<String, String> {
692 if filters.is_empty() {
693 return Ok(String::new());
694 }
695
696 let mut clauses: Vec<String> = Vec::new();
697 for filter in filters {
698 let column_sql = apply_sql_cast(
699 qualified_local_column(alias, &filter.column)?,
700 filter.column_cast.as_deref(),
701 )?;
702 match filter.operator {
703 StructuredFilterOperator::Eq => {
704 let value = filter
705 .values
706 .first()
707 .ok_or_else(|| format!("missing eq value for '{}'", filter.column))?;
708 if value.is_null() {
709 clauses.push(format!("{column_sql} IS NULL"));
710 } else {
711 clauses.push(format!(
712 "{column_sql} = {}",
713 apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
714 ));
715 }
716 }
717 StructuredFilterOperator::Neq => {
718 let value = filter
719 .values
720 .first()
721 .ok_or_else(|| format!("missing neq value for '{}'", filter.column))?;
722 if value.is_null() {
723 clauses.push(format!("{column_sql} IS NOT NULL"));
724 } else {
725 clauses.push(format!(
726 "{column_sql} <> {}",
727 apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
728 ));
729 }
730 }
731 StructuredFilterOperator::Gt => {
732 let value = filter
733 .values
734 .first()
735 .ok_or_else(|| format!("missing gt value for '{}'", filter.column))?;
736 clauses.push(format!(
737 "{column_sql} > {}",
738 apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
739 ));
740 }
741 StructuredFilterOperator::Lt => {
742 let value = filter
743 .values
744 .first()
745 .ok_or_else(|| format!("missing lt value for '{}'", filter.column))?;
746 clauses.push(format!(
747 "{column_sql} < {}",
748 apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())?
749 ));
750 }
751 StructuredFilterOperator::In => {
752 if filter.values.is_empty() {
753 clauses.push("FALSE".to_string());
754 } else {
755 let values = filter
756 .values
757 .iter()
758 .map(|value| {
759 apply_sql_cast(format_sql_value(value), filter.value_cast.as_deref())
760 })
761 .collect::<Result<Vec<String>, String>>()?
762 .join(", ");
763 clauses.push(format!("{column_sql} IN ({values})"));
764 }
765 }
766 }
767 }
768
769 Ok(format!("WHERE {}", clauses.join(" AND ")))
770}
771
772fn build_order_clause(order_by: &[StructuredOrderBy], alias: &str) -> Result<String, String> {
773 if order_by.is_empty() {
774 return Ok(String::new());
775 }
776 let fragments: Vec<String> = order_by
777 .iter()
778 .map(|order| {
779 let column_sql = qualified_local_column(alias, &order.column)?;
780 Ok(format!("{column_sql} {}", order.direction.sql_keyword()))
781 })
782 .collect::<Result<Vec<String>, String>>()?;
783 Ok(format!("ORDER BY {}", fragments.join(", ")))
784}
785
786fn effective_relation_ordering(
787 order_by: &[StructuredOrderBy],
788 source_alias: &str,
789) -> Result<Vec<(String, StructuredSortDirection, String)>, String> {
790 if order_by.is_empty() {
791 return Ok(Vec::new());
792 }
793
794 order_by
795 .iter()
796 .cloned()
797 .into_iter()
798 .enumerate()
799 .map(|(index, order)| {
800 let column_sql = qualified_local_column(source_alias, &order.column)?;
801 Ok((
802 column_sql,
803 order.direction,
804 format!("__athena_order_{index}"),
805 ))
806 })
807 .collect()
808}
809
810fn qualified_local_column(alias: &str, column: &str) -> Result<String, String> {
811 if column.contains('.') {
812 return Err(format!(
813 "unsupported dotted column reference '{column}' at this query level"
814 ));
815 }
816 let column_sql = sanitize_identifier(column)
817 .ok_or_else(|| format!("invalid column identifier '{column}'"))?;
818 Ok(format!("{alias}.{column_sql}"))
819}
820
821fn relation_join_condition(
822 catalog: &StructuredSchemaCatalog,
823 default_schema: Option<&str>,
824 relation: &StructuredRelationField,
825 parent_alias: &str,
826 parent_table_selector: &str,
827 child_alias: &str,
828 child_table_selector: &str,
829) -> Result<(String, bool), String> {
830 let parent_table = resolve_structured_table_ref(parent_table_selector, default_schema)?;
831 let child_table = resolve_structured_table_ref(child_table_selector, default_schema)?;
832
833 if let Some(foreign_key) = relation.foreign_key.as_deref() {
834 if let Some(stripped) = foreign_key.strip_prefix("parent.") {
835 let fk = sanitize_identifier(stripped)
836 .ok_or_else(|| format!("invalid parent foreign key '{foreign_key}'"))?;
837 return Ok((format!("{parent_alias}.{fk} = {child_alias}.\"id\""), true));
838 }
839 if let Some(stripped) = foreign_key.strip_prefix("child.") {
840 let fk = sanitize_identifier(stripped)
841 .ok_or_else(|| format!("invalid child foreign key '{foreign_key}'"))?;
842 return Ok((format!("{child_alias}.{fk} = {parent_alias}.\"id\""), false));
843 }
844
845 let fk = sanitize_identifier(foreign_key)
846 .ok_or_else(|| format!("invalid foreign key '{foreign_key}'"))?;
847 let fk_on_parent = catalog.table_has_column(&parent_table, foreign_key);
848 let fk_on_child = catalog.table_has_column(&child_table, foreign_key);
849
850 return match (fk_on_parent, fk_on_child) {
851 (true, false) => Ok((format!("{parent_alias}.{fk} = {child_alias}.\"id\""), true)),
852 (false, true) => Ok((format!("{child_alias}.{fk} = {parent_alias}.\"id\""), false)),
853 (true, true) => Err(format!(
854 "foreign_key '{}' exists on both '{}' and '{}'; prefix it with parent. or child.",
855 foreign_key, parent_table.table_name, child_table.table_name
856 )),
857 (false, false) => Err(format!(
858 "foreign_key '{}' was not found on '{}' or '{}'",
859 foreign_key, parent_table.table_name, child_table.table_name
860 )),
861 };
862 }
863
864 let child_to_parent: Vec<&StructuredForeignKeyMetadata> = catalog
865 .foreign_keys
866 .iter()
867 .filter(|metadata| {
868 metadata.child_table == child_table && metadata.parent_table == parent_table
869 })
870 .collect();
871 let parent_to_child: Vec<&StructuredForeignKeyMetadata> = catalog
872 .foreign_keys
873 .iter()
874 .filter(|metadata| {
875 metadata.child_table == parent_table && metadata.parent_table == child_table
876 })
877 .collect();
878
879 match (child_to_parent.as_slice(), parent_to_child.as_slice()) {
880 ([metadata], []) => {
881 let child_fk = sanitize_identifier(&metadata.child_column).ok_or_else(|| {
882 format!(
883 "invalid inferred child foreign key '{}'",
884 metadata.child_column
885 )
886 })?;
887 let parent_pk = sanitize_identifier(&metadata.parent_column).ok_or_else(|| {
888 format!(
889 "invalid inferred parent reference column '{}'",
890 metadata.parent_column
891 )
892 })?;
893 return Ok((
894 format!("{child_alias}.{child_fk} = {parent_alias}.{parent_pk}"),
895 false,
896 ));
897 }
898 ([], [metadata]) => {
899 let parent_fk = sanitize_identifier(&metadata.child_column).ok_or_else(|| {
900 format!(
901 "invalid inferred parent foreign key '{}'",
902 metadata.child_column
903 )
904 })?;
905 let child_pk = sanitize_identifier(&metadata.parent_column).ok_or_else(|| {
906 format!(
907 "invalid inferred child reference column '{}'",
908 metadata.parent_column
909 )
910 })?;
911 return Ok((
912 format!("{parent_alias}.{parent_fk} = {child_alias}.{child_pk}"),
913 true,
914 ));
915 }
916 ([], []) => {}
917 _ => {
918 return Err(format!(
919 "relation '{}' between '{}' and '{}' is ambiguous; provide foreign_key",
920 relation.display_name(),
921 parent_table.table_name,
922 child_table.table_name
923 ));
924 }
925 }
926
927 let child_fk: String = format!("{}_id", singularize_identifier(&parent_table.table_name));
928 let parent_fk: String = format!("{}_id", singularize_identifier(&child_table.table_name));
929 let child_has_fk = catalog.table_has_column(&child_table, &child_fk);
930 let parent_has_fk = catalog.table_has_column(&parent_table, &parent_fk);
931
932 if child_has_fk && !parent_has_fk {
933 let child_fk_sql = sanitize_identifier(&child_fk)
934 .ok_or_else(|| format!("invalid inferred child foreign key '{child_fk}'"))?;
935 Ok((
936 format!("{child_alias}.{child_fk_sql} = {parent_alias}.\"id\""),
937 false,
938 ))
939 } else if parent_has_fk && !child_has_fk {
940 let parent_fk_sql = sanitize_identifier(&parent_fk)
941 .ok_or_else(|| format!("invalid inferred parent foreign key '{parent_fk}'"))?;
942 Ok((
943 format!("{parent_alias}.{parent_fk_sql} = {child_alias}.\"id\""),
944 true,
945 ))
946 } else if child_has_fk && parent_has_fk {
947 Err(format!(
948 "relation '{}' between '{}' and '{}' is ambiguous; provide foreign_key",
949 relation.display_name(),
950 parent_table.table_name,
951 child_table.table_name
952 ))
953 } else {
954 Err(format!(
955 "could not resolve relation '{}' between '{}' and '{}'; add a foreign key constraint or provide foreign_key",
956 relation.display_name(),
957 parent_table.table_name,
958 child_table.table_name
959 ))
960 }
961}
962
963#[cfg(test)]
964mod tests {
965 use super::*;
966 use crate::build_structured_fetch_plan;
967 use serde_json::json;
968
969 fn build_plan(
970 body: &Value,
971 force_camel_case_to_snake_case: bool,
972 ) -> StructuredGatewayFetchPlan {
973 build_structured_fetch_plan(body, force_camel_case_to_snake_case)
974 .expect("plan ok")
975 .expect("structured plan")
976 }
977
978 fn table_ref(raw: &str) -> StructuredTableRef {
979 resolve_structured_table_ref(raw, Some("public")).expect("table ref")
980 }
981
982 fn test_catalog(
983 foreign_keys: &[(&str, &str, &str, &str, &str, &str)],
984 extra_columns: &[(&str, &[&str])],
985 ) -> StructuredSchemaCatalog {
986 let mut table_columns: HashMap<StructuredTableRef, HashSet<String>> = HashMap::new();
987 let mut catalog_foreign_keys = Vec::new();
988
989 for (child_table, child_column, parent_table, parent_column, child_schema, parent_schema) in
990 foreign_keys
991 {
992 let child_ref = resolve_structured_table_ref(
993 &format!("{child_schema}.{child_table}"),
994 Some("public"),
995 )
996 .expect("child ref");
997 let parent_ref = resolve_structured_table_ref(
998 &format!("{parent_schema}.{parent_table}"),
999 Some("public"),
1000 )
1001 .expect("parent ref");
1002
1003 table_columns
1004 .entry(child_ref.clone())
1005 .or_default()
1006 .insert((*child_column).to_string());
1007 table_columns
1008 .entry(parent_ref.clone())
1009 .or_default()
1010 .insert((*parent_column).to_string());
1011
1012 catalog_foreign_keys.push(StructuredForeignKeyMetadata {
1013 child_table: child_ref,
1014 child_column: (*child_column).to_string(),
1015 parent_table: parent_ref,
1016 parent_column: (*parent_column).to_string(),
1017 });
1018 }
1019
1020 for (table_name, columns) in extra_columns {
1021 let table_ref = table_ref(table_name);
1022 let entry = table_columns.entry(table_ref).or_default();
1023 for column in *columns {
1024 entry.insert((*column).to_string());
1025 }
1026 }
1027
1028 StructuredSchemaCatalog {
1029 table_columns,
1030 foreign_keys: catalog_foreign_keys,
1031 }
1032 }
1033
1034 fn compile_sql(
1035 body: &Value,
1036 force_camel_case_to_snake_case: bool,
1037 catalog: &StructuredSchemaCatalog,
1038 ) -> String {
1039 let plan = build_plan(body, force_camel_case_to_snake_case);
1040 compile_structured_fetch_sql_for_catalog(&plan, catalog).expect("compiled sql")
1041 }
1042
1043 #[test]
1044 fn structured_select_object_builds_nested_sql() {
1045 let body = json!({
1046 "table_name": "orchestral_sections",
1047 "select": {
1048 "name": true,
1049 "instruments": {
1050 "select": {
1051 "name": true,
1052 "active": true
1053 },
1054 "where": {
1055 "active": { "eq": true }
1056 },
1057 "orderBy": {
1058 "name": "asc"
1059 },
1060 "limit": 10
1061 }
1062 },
1063 "where": {
1064 "name": { "neq": "brass" }
1065 },
1066 "orderBy": {
1067 "name": "desc"
1068 },
1069 "limit": 100
1070 });
1071
1072 let plan = build_plan(&body, false);
1073 let sql = compile_sql(
1074 &body,
1075 false,
1076 &test_catalog(
1077 &[(
1078 "instruments",
1079 "orchestral_section_id",
1080 "orchestral_sections",
1081 "id",
1082 "public",
1083 "public",
1084 )],
1085 &[],
1086 ),
1087 );
1088
1089 assert_eq!(plan.table_name, "orchestral_sections");
1090 assert!(sql.contains("FROM \"orchestral_sections\" t0"));
1091 assert!(sql.contains("LEFT JOIN LATERAL"));
1092 assert!(sql.contains("FROM (\nSELECT jsonb_build_object"));
1093 assert!(sql.contains("instruments_0.\"active\" = true"));
1094 assert!(sql.contains("ORDER BY instruments_0.\"name\" ASC"));
1095 assert!(sql.contains("LIMIT 10"));
1096 assert!(sql.contains("ORDER BY t0.\"name\" DESC"));
1097 assert!(sql.contains("LIMIT 100"));
1098 }
1099
1100 #[test]
1101 fn structured_fetch_rejects_top_level_direct_ast_body() {
1102 let body = json!({
1103 "operation": "select",
1104 "from": "orchestral_sections",
1105 "fields": [
1106 {
1107 "kind": "column",
1108 "name": "name"
1109 }
1110 ]
1111 });
1112
1113 let err = build_structured_fetch_plan(&body, false).expect_err("ast should be rejected");
1114 assert!(err.contains("first-class direct AST request bodies are not supported"));
1115 }
1116
1117 #[test]
1118 fn structured_select_string_distributes_relation_filters() {
1119 let body = json!({
1120 "table_name": "orchestral_sections",
1121 "select": "id,name,instruments(name)",
1122 "where": {
1123 "instruments.name": { "eq": "flute" }
1124 }
1125 });
1126
1127 let sql = compile_sql(
1128 &body,
1129 false,
1130 &test_catalog(
1131 &[(
1132 "instruments",
1133 "orchestral_section_id",
1134 "orchestral_sections",
1135 "id",
1136 "public",
1137 "public",
1138 )],
1139 &[],
1140 ),
1141 );
1142
1143 assert!(sql.contains("instruments_0.\"name\" = 'flute'"));
1144 assert!(!sql.contains("WHERE t0.\"instruments\""));
1145 }
1146
1147 #[test]
1148 fn structured_select_normalizes_camel_case_when_requested() {
1149 let body = json!({
1150 "table_name": "client_statistics",
1151 "select": {
1152 "createdAt": true
1153 },
1154 "where": {
1155 "createdAt": { "gt": "2026-01-01T00:00:00Z" }
1156 }
1157 });
1158
1159 let sql = compile_sql(
1160 &body,
1161 true,
1162 &test_catalog(&[], &[("client_statistics", &["created_at"])]),
1163 );
1164
1165 assert!(sql.contains("t0.\"created_at\""));
1166 assert!(sql.contains("t0.\"created_at\" > '2026-01-01T00:00:00Z'"));
1167 }
1168
1169 #[test]
1170 fn structured_select_collects_nested_resource_names() {
1171 let body = json!({
1172 "table_name": "orchestral_sections",
1173 "select": {
1174 "name": true,
1175 "instruments": {
1176 "select": {
1177 "name": true,
1178 "players": {
1179 "select": {
1180 "display_name": true
1181 }
1182 }
1183 }
1184 }
1185 }
1186 });
1187
1188 let plan = build_plan(&body, false);
1189
1190 assert_eq!(
1191 plan.resource_names(),
1192 vec![
1193 "instruments".to_string(),
1194 "orchestral_sections".to_string(),
1195 "players".to_string(),
1196 ]
1197 );
1198 }
1199
1200 #[test]
1201 fn structured_select_cross_schema_relation_string_builds_sql() {
1202 let body = json!({
1203 "table_name": "public.chat_subscriptions",
1204 "select": "user_id,users:athena.users(id,username,image)",
1205 "where_filters": [
1206 {
1207 "column": "user_id",
1208 "operator": "eq",
1209 "value": "ef7a4c74-cc35-4d32-945a-a5271279ecdb",
1210 "column_cast": "text"
1211 }
1212 ],
1213 "orderBy": [
1214 {
1215 "column": "user_id",
1216 "direction": "desc"
1217 }
1218 ],
1219 "limit": 1
1220 });
1221
1222 let sql = compile_sql(
1223 &body,
1224 false,
1225 &test_catalog(
1226 &[(
1227 "chat_subscriptions",
1228 "user_id",
1229 "users",
1230 "id",
1231 "public",
1232 "athena",
1233 )],
1234 &[("athena.users", &["id", "username", "image"])],
1235 ),
1236 );
1237
1238 assert!(sql.contains("FROM \"public\".\"chat_subscriptions\" t0"));
1239 assert!(sql.contains("FROM \"athena\".\"users\" athenausers_0"));
1240 assert!(sql.contains("t0.\"user_id\" = athenausers_0.\"id\""));
1241 assert!(sql.contains("t0.\"user_id\""));
1242 assert!(sql.contains("ef7a4c74-cc35-4d32-945a-a5271279ecdb"));
1243 assert!(sql.contains("ORDER BY t0.\"user_id\" DESC"));
1244 assert!(sql.contains("LIMIT 1"));
1245 }
1246
1247 #[test]
1248 fn structured_select_missing_cross_schema_relation_target_fails_validation() {
1249 let body = json!({
1250 "table_name": "chat_subscriptions",
1251 "schema_name": "public",
1252 "select": "user_id,users:athena.user(id)"
1253 });
1254 let plan = build_plan(&body, false);
1255 let err = compile_structured_fetch_sql_for_catalog(
1256 &plan,
1257 &test_catalog(&[], &[("public.chat_subscriptions", &["id", "user_id"])]),
1258 )
1259 .expect_err("missing relation target should fail before SQL execution");
1260
1261 assert_eq!(
1262 err,
1263 "table 'athena.user' was not found for structured gateway fetch"
1264 );
1265 }
1266
1267 #[test]
1268 fn structured_select_where_only_body_stays_legacy() {
1269 let body = json!({
1270 "table_name": "users",
1271 "where": {
1272 "id": { "eq": 1 }
1273 }
1274 });
1275
1276 assert!(
1277 build_structured_fetch_plan(&body, false)
1278 .expect("plan ok")
1279 .is_none()
1280 );
1281 }
1282
1283 #[test]
1284 fn structured_select_inner_join_filters_empty_collections() {
1285 let body = json!({
1286 "table_name": "orchestral_sections",
1287 "select": "name,instruments!inner(name)"
1288 });
1289
1290 let sql = compile_sql(
1291 &body,
1292 false,
1293 &test_catalog(
1294 &[(
1295 "instruments",
1296 "orchestral_section_id",
1297 "orchestral_sections",
1298 "id",
1299 "public",
1300 "public",
1301 )],
1302 &[],
1303 ),
1304 );
1305
1306 assert!(sql.contains("jsonb_array_length(instruments_0_agg.data) > 0"));
1307 assert!(!sql.contains("instruments_0_agg.data IS NOT NULL"));
1308 }
1309
1310 #[test]
1311 fn structured_select_uses_catalog_for_many_to_one_relations() {
1312 let body = json!({
1313 "table_name": "invoices",
1314 "select": {
1315 "invoice_number": true,
1316 "customer": {
1317 "from": "customers",
1318 "select": {
1319 "name": true
1320 }
1321 }
1322 }
1323 });
1324
1325 let sql = compile_sql(
1326 &body,
1327 false,
1328 &test_catalog(
1329 &[(
1330 "invoices",
1331 "customer_id",
1332 "customers",
1333 "id",
1334 "public",
1335 "public",
1336 )],
1337 &[
1338 ("invoices", &["invoice_number", "customer_id"]),
1339 ("customers", &["id", "name"]),
1340 ],
1341 ),
1342 );
1343
1344 assert!(sql.contains("t0.\"customer_id\" = customer_0.\"id\""));
1345 assert!(sql.contains("LIMIT 1"));
1346 }
1347
1348 #[test]
1349 fn structured_select_relation_without_order_by_does_not_require_id() {
1350 let body = json!({
1351 "table_name": "orchestral_sections",
1352 "select": {
1353 "name": true,
1354 "instruments": {
1355 "select": {
1356 "name": true
1357 }
1358 }
1359 }
1360 });
1361
1362 let sql = compile_sql(
1363 &body,
1364 false,
1365 &test_catalog(
1366 &[(
1367 "instruments",
1368 "orchestral_section_id",
1369 "orchestral_sections",
1370 "id",
1371 "public",
1372 "public",
1373 )],
1374 &[
1375 ("orchestral_sections", &["id", "name"]),
1376 ("instruments", &["name", "orchestral_section_id"]),
1377 ],
1378 ),
1379 );
1380
1381 assert!(!sql.contains("__athena_order_"));
1382 assert!(!sql.contains("ORDER BY instruments_0.\"id\""));
1383 }
1384
1385 #[test]
1386 fn structured_select_ambiguous_relation_requires_foreign_key() {
1387 let body = json!({
1388 "table_name": "orchestral_sections",
1389 "select": {
1390 "instruments": {
1391 "select": {
1392 "name": true
1393 }
1394 }
1395 }
1396 });
1397
1398 let plan = build_plan(&body, false);
1399 let err = compile_structured_fetch_sql_for_catalog(
1400 &plan,
1401 &test_catalog(
1402 &[],
1403 &[
1404 ("orchestral_sections", &["id", "instrument_id"]),
1405 ("instruments", &["id", "orchestral_section_id", "name"]),
1406 ],
1407 ),
1408 )
1409 .expect_err("ambiguous relation should fail");
1410
1411 assert!(err.contains("ambiguous"));
1412 }
1413}