Skip to main content

athena_gateway/
structured_fetch.rs

1//! Portable structured-fetch plan and parser support for `/gateway/fetch`.
2//!
3//! This module owns the request-domain half of structured fetch:
4//!
5//! - the parsed query/field/filter/order model,
6//! - request-body validation,
7//! - compatibility `select` string parsing,
8//! - structured cache-key generation.
9//!
10//! SQL compilation, catalog loading, and backend execution stay in `athena_rs`.
11
12use crate::{
13    normalize_column_name, normalize_gateway_schema_name, qualify_gateway_table_name,
14    sanitize_identifier, schema_name_from_body,
15};
16use serde::Serialize;
17use serde_json::{Map, Value, json};
18use std::collections::BTreeSet;
19use std::iter::Peekable;
20
21/// Parsed structured gateway fetch request plus the normalized root selector.
22#[derive(Debug, Clone)]
23pub struct StructuredGatewayFetchPlan {
24    /// Normalized root table selector in `table` or `schema.table` form.
25    pub table_name: String,
26    /// Optional explicit root schema.
27    pub schema_name: Option<String>,
28    /// Parsed structured query model.
29    pub query: StructuredSelectQuery,
30}
31
32impl StructuredGatewayFetchPlan {
33    /// Returns the top-level limit, if present.
34    pub fn limit(&self) -> Option<i64> {
35        self.query.limit
36    }
37
38    /// Returns the unique resource names referenced by the root query and nested relations.
39    pub fn resource_names(&self) -> Vec<String> {
40        let mut resources = BTreeSet::new();
41        collect_query_resource_names(&self.query, &mut resources);
42        resources.into_iter().collect()
43    }
44}
45
46/// Structured-fetch operation kinds currently accepted by the public contract.
47#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
48pub enum StructuredSelectOperation {
49    Select,
50}
51
52/// Join behavior for nested structured relation selections.
53#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
54pub enum StructuredJoinKind {
55    Left,
56    Inner,
57}
58
59/// Supported filter operators in structured fetch requests.
60#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
61pub enum StructuredFilterOperator {
62    Eq,
63    Neq,
64    Gt,
65    Lt,
66    In,
67}
68
69/// Supported sort directions in structured fetch requests.
70#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
71pub enum StructuredSortDirection {
72    Asc,
73    Desc,
74}
75
76impl StructuredSortDirection {
77    /// Returns the SQL keyword form used by the server-side emitter.
78    pub fn sql_keyword(&self) -> &'static str {
79        match self {
80            Self::Asc => "ASC",
81            Self::Desc => "DESC",
82        }
83    }
84}
85
86/// Parsed structured fetch query.
87#[derive(Debug, Clone, PartialEq, Serialize)]
88pub struct StructuredSelectQuery {
89    pub operation: StructuredSelectOperation,
90    pub from: String,
91    pub fields: Vec<StructuredSelectField>,
92    pub filters: Vec<StructuredFilter>,
93    pub order_by: Vec<StructuredOrderBy>,
94    pub limit: Option<i64>,
95    pub offset: Option<i64>,
96}
97
98/// Field projection entry in a structured fetch query.
99#[derive(Debug, Clone, PartialEq, Serialize)]
100pub enum StructuredSelectField {
101    Column(StructuredColumnField),
102    Relation(StructuredRelationField),
103}
104
105/// Column projection inside a structured fetch query.
106#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
107pub struct StructuredColumnField {
108    pub name: String,
109    pub alias: Option<String>,
110}
111
112/// Nested relation projection inside a structured fetch query.
113#[derive(Debug, Clone, PartialEq, Serialize)]
114pub struct StructuredRelationField {
115    pub name: String,
116    pub alias: Option<String>,
117    pub join: StructuredJoinKind,
118    pub foreign_key: Option<String>,
119    pub query: StructuredSelectQuery,
120}
121
122impl StructuredRelationField {
123    /// Returns the response field name for this relation.
124    pub fn display_name(&self) -> &str {
125        self.alias.as_deref().unwrap_or(&self.name)
126    }
127}
128
129/// Structured filter clause.
130#[derive(Debug, Clone, PartialEq, Serialize)]
131pub struct StructuredFilter {
132    pub column: String,
133    pub operator: StructuredFilterOperator,
134    pub values: Vec<Value>,
135    pub column_cast: Option<String>,
136    pub value_cast: Option<String>,
137}
138
139/// Structured ordering clause.
140#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
141pub struct StructuredOrderBy {
142    pub column: String,
143    pub direction: StructuredSortDirection,
144}
145
146/// Builds a parsed structured fetch plan when the request body uses the structured read contract.
147pub fn build_structured_fetch_plan(
148    body: &Value,
149    force_camel_case_to_snake_case: bool,
150) -> Result<Option<StructuredGatewayFetchPlan>, String> {
151    if uses_top_level_direct_ast_payload(body) {
152        return Err(
153            "first-class direct AST request bodies are not supported on /gateway/fetch in this Athena release; use table_name plus select"
154                .to_string(),
155        );
156    }
157
158    if !is_structured_fetch_body(body) {
159        return Ok(None);
160    }
161
162    let schema_name = normalize_gateway_schema_name(schema_name_from_body(body).as_deref())?;
163    let from = body
164        .get("table_name")
165        .and_then(Value::as_str)
166        .map(str::trim)
167        .filter(|value| !value.is_empty())
168        .ok_or_else(|| "table_name is required for structured gateway fetch".to_string())?;
169
170    let qualified_table_name = qualify_gateway_table_name(from, schema_name.as_deref())?;
171    let query = parse_find_many_query(body, from, force_camel_case_to_snake_case)?;
172
173    Ok(Some(StructuredGatewayFetchPlan {
174        table_name: qualified_table_name,
175        schema_name,
176        query,
177    }))
178}
179
180/// Builds the structured fetch cache key from the normalized plan and compiled SQL.
181pub fn build_structured_fetch_cache_key(
182    plan: &StructuredGatewayFetchPlan,
183    sql: &str,
184    strip_nulls: bool,
185    client_name: &str,
186) -> String {
187    let hash_input = json!({
188        "table_name": plan.table_name,
189        "schema_name": plan.schema_name,
190        "sql": sql,
191        "strip_nulls": strip_nulls,
192        "client": client_name,
193    });
194    let digest = sha256::digest(serde_json::to_string(&hash_input).unwrap_or_default());
195    let short_hash = &digest[..16.min(digest.len())];
196    format!(
197        "{}:structured:{}:{}:{}",
198        plan.table_name, client_name, strip_nulls, short_hash
199    )
200}
201
202fn is_structured_fetch_body(body: &Value) -> bool {
203    body.get("select").is_some() || uses_top_level_direct_ast_payload(body)
204}
205
206fn uses_top_level_direct_ast_payload(body: &Value) -> bool {
207    body.get("operation").is_some()
208        || body.get("fields").is_some()
209        || (body.get("from").is_some() && body.get("table_name").is_none())
210}
211
212fn parse_find_many_query(
213    body: &Value,
214    from: &str,
215    force_camel_case_to_snake_case: bool,
216) -> Result<StructuredSelectQuery, String> {
217    let select_value = body
218        .get("select")
219        .ok_or_else(|| "select is required for structured gateway fetch".to_string())?;
220    let fields = parse_select_input(
221        select_value,
222        force_camel_case_to_snake_case,
223        Some(normalize_query_table_name(from)),
224    )?;
225
226    Ok(StructuredSelectQuery {
227        operation: StructuredSelectOperation::Select,
228        from: from.to_string(),
229        fields,
230        filters: parse_where_clause(
231            body.get("where_filters")
232                .or_else(|| body.get("where"))
233                .or_else(|| body.get("where_clause")),
234            force_camel_case_to_snake_case,
235        )?,
236        order_by: parse_order_by_clause(
237            body.get("orderBy").or_else(|| body.get("order_by")),
238            force_camel_case_to_snake_case,
239        )?,
240        limit: parse_optional_i64(body.get("limit"), "limit")?,
241        offset: parse_optional_i64(body.get("offset"), "offset")?,
242    })
243}
244
245fn parse_ast_query(
246    value: &Value,
247    fallback_from: Option<&str>,
248    force_camel_case_to_snake_case: bool,
249) -> Result<StructuredSelectQuery, String> {
250    let object = value
251        .as_object()
252        .ok_or_else(|| "structured gateway fetch body must be a JSON object".to_string())?;
253
254    let operation = match object.get("operation").and_then(Value::as_str) {
255        None | Some("select") => StructuredSelectOperation::Select,
256        Some(other) => {
257            return Err(format!(
258                "unsupported structured gateway fetch operation '{other}'"
259            ));
260        }
261    };
262
263    let from = object
264        .get("from")
265        .and_then(Value::as_str)
266        .or(fallback_from)
267        .map(str::trim)
268        .filter(|candidate| !candidate.is_empty())
269        .ok_or_else(|| "from is required for structured gateway fetch query".to_string())?
270        .to_string();
271
272    let fields = if let Some(fields_value) = object.get("fields") {
273        parse_ast_fields(
274            fields_value,
275            force_camel_case_to_snake_case,
276            Some(normalize_query_table_name(&from)),
277        )?
278    } else if let Some(select_value) = object.get("select") {
279        parse_select_input(
280            select_value,
281            force_camel_case_to_snake_case,
282            Some(normalize_query_table_name(&from)),
283        )?
284    } else {
285        return Err("fields or select is required for structured gateway fetch query".to_string());
286    };
287
288    Ok(StructuredSelectQuery {
289        operation,
290        from,
291        fields,
292        filters: parse_where_clause(
293            object
294                .get("where_filters")
295                .or_else(|| object.get("where"))
296                .or_else(|| object.get("where_clause")),
297            force_camel_case_to_snake_case,
298        )?,
299        order_by: parse_order_by_clause(
300            object.get("orderBy").or_else(|| object.get("order_by")),
301            force_camel_case_to_snake_case,
302        )?,
303        limit: parse_optional_i64(object.get("limit"), "limit")?,
304        offset: parse_optional_i64(object.get("offset"), "offset")?,
305    })
306}
307
308fn parse_select_input(
309    value: &Value,
310    force_camel_case_to_snake_case: bool,
311    parent_table: Option<String>,
312) -> Result<Vec<StructuredSelectField>, String> {
313    if let Some(select_object) = value.as_object() {
314        return parse_select_object(select_object, force_camel_case_to_snake_case);
315    }
316    if let Some(select_string) = value.as_str() {
317        return parse_select_string(select_string, parent_table, force_camel_case_to_snake_case);
318    }
319    if value.is_array() {
320        return parse_ast_fields(value, force_camel_case_to_snake_case, parent_table);
321    }
322    Err("select must be an object, string, or fields array".to_string())
323}
324
325fn parse_select_object(
326    object: &Map<String, Value>,
327    force_camel_case_to_snake_case: bool,
328) -> Result<Vec<StructuredSelectField>, String> {
329    let mut fields = Vec::new();
330
331    for (raw_key, value) in object {
332        let key = normalize_path(raw_key, force_camel_case_to_snake_case)?;
333        if matches!(value, Value::Bool(true)) {
334            fields.push(StructuredSelectField::Column(StructuredColumnField {
335                name: key,
336                alias: None,
337            }));
338            continue;
339        }
340
341        let relation_object = value.as_object().ok_or_else(|| {
342            format!("select['{raw_key}'] must be true for a column or an object with nested select")
343        })?;
344        if relation_object.get("select").is_none() && relation_object.get("fields").is_none() {
345            return Err(format!(
346                "select['{raw_key}'] is missing nested select/fields for relation projection"
347            ));
348        }
349
350        let nested_query = parse_ast_query(value, Some(&key), force_camel_case_to_snake_case)?;
351        fields.push(StructuredSelectField::Relation(StructuredRelationField {
352            name: key,
353            alias: None,
354            join: StructuredJoinKind::Left,
355            foreign_key: None,
356            query: nested_query,
357        }));
358    }
359
360    if fields.is_empty() {
361        return Err("select must include at least one column or relation".to_string());
362    }
363
364    Ok(fields)
365}
366
367fn parse_ast_fields(
368    value: &Value,
369    force_camel_case_to_snake_case: bool,
370    parent_table: Option<String>,
371) -> Result<Vec<StructuredSelectField>, String> {
372    let array = value
373        .as_array()
374        .ok_or_else(|| "fields must be an array".to_string())?;
375    let mut fields = Vec::new();
376
377    for field in array {
378        let object = field
379            .as_object()
380            .ok_or_else(|| "each fields entry must be an object".to_string())?;
381        let kind = object
382            .get("kind")
383            .and_then(Value::as_str)
384            .ok_or_else(|| "each fields entry must include kind".to_string())?;
385
386        match kind {
387            "column" => {
388                let raw_name = object
389                    .get("name")
390                    .and_then(Value::as_str)
391                    .ok_or_else(|| "column fields require name".to_string())?;
392                let alias = parse_optional_alias(object.get("alias"))?;
393                fields.push(StructuredSelectField::Column(StructuredColumnField {
394                    name: normalize_path(raw_name, force_camel_case_to_snake_case)?,
395                    alias,
396                }));
397            }
398            "relation" => {
399                let raw_name = object
400                    .get("name")
401                    .and_then(Value::as_str)
402                    .ok_or_else(|| "relation fields require name".to_string())?;
403                let name = normalize_path(raw_name, force_camel_case_to_snake_case)?;
404                let alias = parse_optional_alias(object.get("alias"))?;
405                let join = parse_join_kind(object.get("join").or_else(|| object.get("join_type")))?;
406                let foreign_key = parse_optional_foreign_key(object.get("foreign_key"))?;
407                let nested_query = parse_ast_query(
408                    object
409                        .get("query")
410                        .ok_or_else(|| "relation fields require query".to_string())?,
411                    Some(&name),
412                    force_camel_case_to_snake_case,
413                )?;
414
415                fields.push(StructuredSelectField::Relation(StructuredRelationField {
416                    name,
417                    alias,
418                    join,
419                    foreign_key,
420                    query: nested_query,
421                }));
422            }
423            other => {
424                return Err(format!("unsupported field kind '{other}'"));
425            }
426        }
427    }
428
429    if fields.is_empty() {
430        return Err("fields must include at least one entry".to_string());
431    }
432
433    if let Some(parent_table) = parent_table
434        && fields
435            .iter()
436            .all(|field| matches!(field, StructuredSelectField::Relation(_)))
437    {
438        return Err(format!(
439            "structured select for '{parent_table}' must include at least one column"
440        ));
441    }
442
443    Ok(fields)
444}
445
446fn parse_where_clause(
447    value: Option<&Value>,
448    force_camel_case_to_snake_case: bool,
449) -> Result<Vec<StructuredFilter>, String> {
450    let Some(value) = value else {
451        return Ok(Vec::new());
452    };
453
454    if let Some(array) = value.as_array() {
455        return parse_where_filters_array(array, force_camel_case_to_snake_case);
456    }
457
458    let object = value
459        .as_object()
460        .ok_or_else(|| "where/where_clause must be an object".to_string())?;
461    let mut filters = Vec::new();
462
463    for (raw_column, raw_filter) in object {
464        let column = normalize_path(raw_column, force_camel_case_to_snake_case)?;
465        match raw_filter {
466            Value::Object(filter_object) => {
467                let mut pushed = false;
468                for (raw_operator, operand) in filter_object {
469                    let operator = match raw_operator.as_str() {
470                        "eq" => StructuredFilterOperator::Eq,
471                        "neq" => StructuredFilterOperator::Neq,
472                        "gt" => StructuredFilterOperator::Gt,
473                        "lt" => StructuredFilterOperator::Lt,
474                        "in" => StructuredFilterOperator::In,
475                        other => {
476                            return Err(format!(
477                                "unsupported where operator '{other}' for column '{raw_column}'"
478                            ));
479                        }
480                    };
481                    let values = if operator == StructuredFilterOperator::In {
482                        operand.as_array().cloned().ok_or_else(|| {
483                            format!("where.{raw_column}.in must be an array of values")
484                        })?
485                    } else {
486                        vec![operand.clone()]
487                    };
488                    filters.push(StructuredFilter {
489                        column: column.clone(),
490                        operator,
491                        values,
492                        column_cast: None,
493                        value_cast: None,
494                    });
495                    pushed = true;
496                }
497                if !pushed {
498                    return Err(format!(
499                        "where.{raw_column} must include at least one supported operator"
500                    ));
501                }
502            }
503            Value::Array(values) => {
504                filters.push(StructuredFilter {
505                    column,
506                    operator: StructuredFilterOperator::In,
507                    values: values.clone(),
508                    column_cast: None,
509                    value_cast: None,
510                });
511            }
512            scalar => {
513                filters.push(StructuredFilter {
514                    column,
515                    operator: StructuredFilterOperator::Eq,
516                    values: vec![scalar.clone()],
517                    column_cast: None,
518                    value_cast: None,
519                });
520            }
521        }
522    }
523
524    Ok(filters)
525}
526
527fn parse_where_filters_array(
528    array: &[Value],
529    force_camel_case_to_snake_case: bool,
530) -> Result<Vec<StructuredFilter>, String> {
531    let mut filters = Vec::new();
532
533    for entry in array {
534        let object = entry
535            .as_object()
536            .ok_or_else(|| "where/where_clause array entries must be objects".to_string())?;
537        let raw_column = object
538            .get("column")
539            .and_then(Value::as_str)
540            .ok_or_else(|| "where/where_clause array entries require column".to_string())?;
541        let raw_operator = object
542            .get("operator")
543            .and_then(Value::as_str)
544            .ok_or_else(|| "where/where_clause array entries require operator".to_string())?;
545        let operator = parse_structured_filter_operator(raw_operator, raw_column)?;
546        let column = normalize_path(raw_column, force_camel_case_to_snake_case)?;
547        let column_cast = parse_optional_type_cast(
548            object
549                .get("column_cast")
550                .or_else(|| object.get("columnCast")),
551            "column_cast",
552        )?;
553        let value_cast = parse_optional_type_cast(
554            object.get("value_cast").or_else(|| object.get("valueCast")),
555            "value_cast",
556        )?;
557        let values = if operator == StructuredFilterOperator::In {
558            object
559                .get("values")
560                .or_else(|| object.get("value"))
561                .and_then(Value::as_array)
562                .cloned()
563                .ok_or_else(|| {
564                    "where/where_clause array entry with operator 'in' requires values array"
565                        .to_string()
566                })?
567        } else {
568            vec![object.get("value").cloned().ok_or_else(|| {
569                "where/where_clause array entries require value for scalar operators".to_string()
570            })?]
571        };
572
573        filters.push(StructuredFilter {
574            column,
575            operator,
576            values,
577            column_cast,
578            value_cast,
579        });
580    }
581
582    Ok(filters)
583}
584
585fn parse_structured_filter_operator(
586    raw_operator: &str,
587    raw_column: &str,
588) -> Result<StructuredFilterOperator, String> {
589    match raw_operator {
590        "eq" => Ok(StructuredFilterOperator::Eq),
591        "neq" => Ok(StructuredFilterOperator::Neq),
592        "gt" => Ok(StructuredFilterOperator::Gt),
593        "lt" => Ok(StructuredFilterOperator::Lt),
594        "in" => Ok(StructuredFilterOperator::In),
595        other => Err(format!(
596            "unsupported where operator '{other}' for column '{raw_column}'"
597        )),
598    }
599}
600
601fn parse_order_by_clause(
602    value: Option<&Value>,
603    force_camel_case_to_snake_case: bool,
604) -> Result<Vec<StructuredOrderBy>, String> {
605    let Some(value) = value else {
606        return Ok(Vec::new());
607    };
608
609    if let Some(object) = value.as_object() {
610        let mut order_by = Vec::new();
611        for (raw_column, raw_direction) in object {
612            let direction = parse_sort_direction(raw_direction, raw_column)?;
613            order_by.push(StructuredOrderBy {
614                column: normalize_path(raw_column, force_camel_case_to_snake_case)?,
615                direction,
616            });
617        }
618        return Ok(order_by);
619    }
620
621    if let Some(array) = value.as_array() {
622        let mut order_by = Vec::new();
623        for entry in array {
624            let object = entry
625                .as_object()
626                .ok_or_else(|| "orderBy/order_by array entries must be objects".to_string())?;
627            let raw_column = object
628                .get("column")
629                .or_else(|| object.get("field"))
630                .and_then(Value::as_str)
631                .ok_or_else(|| "orderBy/order_by array entries require column/field".to_string())?;
632            let direction = parse_sort_direction(
633                object
634                    .get("direction")
635                    .or_else(|| object.get("order"))
636                    .unwrap_or(&Value::String("asc".to_string())),
637                raw_column,
638            )?;
639            order_by.push(StructuredOrderBy {
640                column: normalize_path(raw_column, force_camel_case_to_snake_case)?,
641                direction,
642            });
643        }
644        return Ok(order_by);
645    }
646
647    Err("orderBy/order_by must be an object or array".to_string())
648}
649
650fn parse_optional_i64(value: Option<&Value>, label: &str) -> Result<Option<i64>, String> {
651    let Some(value) = value else {
652        return Ok(None);
653    };
654    let parsed = value
655        .as_i64()
656        .ok_or_else(|| format!("{label} must be an integer"))?;
657    if parsed < 0 {
658        return Err(format!("{label} must be greater than or equal to 0"));
659    }
660    Ok(Some(parsed))
661}
662
663fn parse_optional_alias(value: Option<&Value>) -> Result<Option<String>, String> {
664    let Some(value) = value else {
665        return Ok(None);
666    };
667    let alias = value
668        .as_str()
669        .map(str::trim)
670        .filter(|candidate| !candidate.is_empty())
671        .ok_or_else(|| "alias must be a non-empty string".to_string())?;
672    validate_identifier(alias, "alias")?;
673    Ok(Some(alias.to_string()))
674}
675
676fn parse_optional_type_cast(value: Option<&Value>, label: &str) -> Result<Option<String>, String> {
677    let Some(value) = value else {
678        return Ok(None);
679    };
680    let raw_cast = value
681        .as_str()
682        .map(str::trim)
683        .filter(|candidate| !candidate.is_empty())
684        .ok_or_else(|| format!("{label} must be a non-empty string"))?;
685    sanitize_qualified_identifier(raw_cast)
686        .ok_or_else(|| format!("{label} '{raw_cast}' must be a valid SQL type identifier"))?;
687    Ok(Some(raw_cast.to_string()))
688}
689
690fn parse_optional_foreign_key(value: Option<&Value>) -> Result<Option<String>, String> {
691    let Some(value) = value else {
692        return Ok(None);
693    };
694    let raw = value
695        .as_str()
696        .map(str::trim)
697        .filter(|candidate| !candidate.is_empty())
698        .ok_or_else(|| "foreign_key must be a non-empty string".to_string())?;
699    validate_foreign_key_hint(raw)?;
700    Ok(Some(raw.to_string()))
701}
702
703fn parse_join_kind(value: Option<&Value>) -> Result<StructuredJoinKind, String> {
704    match value.and_then(Value::as_str) {
705        None | Some("left") => Ok(StructuredJoinKind::Left),
706        Some("inner") => Ok(StructuredJoinKind::Inner),
707        Some(other) => Err(format!("unsupported relation join kind '{other}'")),
708    }
709}
710
711fn parse_sort_direction(value: &Value, context: &str) -> Result<StructuredSortDirection, String> {
712    let raw = value
713        .as_str()
714        .ok_or_else(|| format!("sort direction for '{context}' must be a string"))?;
715    match raw.to_ascii_lowercase().as_str() {
716        "asc" | "ascending" => Ok(StructuredSortDirection::Asc),
717        "desc" | "descending" => Ok(StructuredSortDirection::Desc),
718        other => Err(format!(
719            "unsupported sort direction '{other}' for '{context}'"
720        )),
721    }
722}
723
724fn validate_foreign_key_hint(raw: &str) -> Result<(), String> {
725    if let Some(stripped) = raw.strip_prefix("parent.") {
726        validate_identifier(stripped, "foreign_key")?;
727        return Ok(());
728    }
729    if let Some(stripped) = raw.strip_prefix("child.") {
730        validate_identifier(stripped, "foreign_key")?;
731        return Ok(());
732    }
733    validate_identifier(raw, "foreign_key")
734}
735
736fn normalize_path(raw: &str, force_camel_case_to_snake_case: bool) -> Result<String, String> {
737    let mut normalized_segments = Vec::new();
738    for segment in raw.split('.') {
739        let trimmed = segment.trim();
740        if trimmed.is_empty() {
741            return Err(format!("invalid identifier path '{raw}'"));
742        }
743        let normalized = normalize_column_name(trimmed, force_camel_case_to_snake_case);
744        validate_identifier(&normalized, "identifier")?;
745        normalized_segments.push(normalized);
746    }
747    Ok(normalized_segments.join("."))
748}
749
750fn validate_identifier(identifier: &str, label: &str) -> Result<(), String> {
751    sanitize_identifier(identifier)
752        .map(|_| ())
753        .ok_or_else(|| format!("{label} '{identifier}' must be a valid SQL identifier"))
754}
755
756fn normalize_query_table_name(raw: &str) -> String {
757    raw.rsplit('.').next().unwrap_or(raw).trim().to_string()
758}
759
760fn collect_query_resource_names(query: &StructuredSelectQuery, resources: &mut BTreeSet<String>) {
761    resources.insert(normalize_query_table_name(&query.from));
762
763    for field in &query.fields {
764        if let StructuredSelectField::Relation(relation) = field {
765            collect_query_resource_names(&relation.query, resources);
766        }
767    }
768}
769
770fn parse_select_string(
771    input: &str,
772    parent_table: Option<String>,
773    force_camel_case_to_snake_case: bool,
774) -> Result<Vec<StructuredSelectField>, String> {
775    let mut iter = input.chars().peekable();
776    let fields = parse_string_nodes(&mut iter, parent_table, force_camel_case_to_snake_case)?;
777    skip_whitespace(&mut iter);
778    if iter.peek().is_some() {
779        return Err("invalid select string: trailing tokens".to_string());
780    }
781    Ok(fields)
782}
783
784fn parse_string_nodes<I>(
785    iter: &mut Peekable<I>,
786    parent_table: Option<String>,
787    force_camel_case_to_snake_case: bool,
788) -> Result<Vec<StructuredSelectField>, String>
789where
790    I: Iterator<Item = char>,
791{
792    let mut fields = Vec::new();
793    loop {
794        skip_whitespace(iter);
795        if iter.peek().is_none() || matches!(iter.peek(), Some(')')) {
796            break;
797        }
798        fields.push(parse_string_node(
799            iter,
800            parent_table.clone(),
801            force_camel_case_to_snake_case,
802        )?);
803        skip_whitespace(iter);
804        if matches!(iter.peek(), Some(',')) {
805            iter.next();
806        }
807    }
808    Ok(fields)
809}
810
811fn parse_string_node<I>(
812    iter: &mut Peekable<I>,
813    parent_table: Option<String>,
814    force_camel_case_to_snake_case: bool,
815) -> Result<StructuredSelectField, String>
816where
817    I: Iterator<Item = char>,
818{
819    let token = read_string_token(iter)?;
820    let mut alias = None;
821    let mut name = token.clone();
822
823    if matches!(iter.peek(), Some(':')) {
824        iter.next();
825        alias = Some(token);
826        name = read_string_token(iter)?;
827    }
828
829    let mut join = StructuredJoinKind::Left;
830    let mut foreign_key = None;
831
832    if matches!(iter.peek(), Some('!')) {
833        iter.next();
834        let modifier = read_modifier_token(iter)?;
835        if modifier.eq_ignore_ascii_case("inner") {
836            join = StructuredJoinKind::Inner;
837        } else if !modifier.is_empty() {
838            validate_foreign_key_hint(&modifier)?;
839            foreign_key = Some(modifier);
840        }
841    }
842
843    if matches!(iter.peek(), Some('(')) {
844        iter.next();
845        let relation_name = normalize_path(&name, force_camel_case_to_snake_case)?;
846        let nested_fields = parse_string_nodes(
847            iter,
848            Some(relation_name.clone()),
849            force_camel_case_to_snake_case,
850        )?;
851        if iter.next() != Some(')') {
852            return Err("invalid select string: missing ')'".to_string());
853        }
854        let relation_alias = match alias {
855            Some(raw_alias) => Some(normalize_path(&raw_alias, false)?),
856            None => None,
857        };
858        Ok(StructuredSelectField::Relation(StructuredRelationField {
859            name: relation_name.clone(),
860            alias: relation_alias,
861            join,
862            foreign_key,
863            query: StructuredSelectQuery {
864                operation: StructuredSelectOperation::Select,
865                from: relation_name,
866                fields: nested_fields,
867                filters: Vec::new(),
868                order_by: Vec::new(),
869                limit: None,
870                offset: None,
871            },
872        }))
873    } else {
874        let column_name = normalize_path(&name, force_camel_case_to_snake_case)?;
875        let column_alias = match alias {
876            Some(raw_alias) => Some(normalize_path(&raw_alias, false)?),
877            None => None,
878        };
879        if column_name == "*" && parent_table.is_some() {
880            return Err(
881                "'*' is not supported inside structured nested select projections".to_string(),
882            );
883        }
884        Ok(StructuredSelectField::Column(StructuredColumnField {
885            name: column_name,
886            alias: column_alias,
887        }))
888    }
889}
890
891fn read_string_token<I>(iter: &mut Peekable<I>) -> Result<String, String>
892where
893    I: Iterator<Item = char>,
894{
895    let mut token = String::new();
896    while let Some(&ch) = iter.peek() {
897        if matches!(ch, '!' | ':' | '(' | ')' | ',') {
898            break;
899        }
900        token.push(ch);
901        iter.next();
902    }
903    let trimmed = token.trim();
904    if trimmed.is_empty() {
905        return Err("invalid select string: empty token".to_string());
906    }
907    Ok(trimmed.to_string())
908}
909
910fn read_modifier_token<I>(iter: &mut Peekable<I>) -> Result<String, String>
911where
912    I: Iterator<Item = char>,
913{
914    let mut token = String::new();
915    while let Some(&ch) = iter.peek() {
916        if matches!(ch, '(' | ')' | ',') {
917            break;
918        }
919        token.push(ch);
920        iter.next();
921    }
922    let trimmed = token.trim();
923    if trimmed.is_empty() {
924        return Err("invalid select string: empty relation modifier".to_string());
925    }
926    Ok(trimmed.to_string())
927}
928
929fn skip_whitespace<I>(iter: &mut Peekable<I>)
930where
931    I: Iterator<Item = char>,
932{
933    while matches!(iter.peek(), Some(ch) if ch.is_whitespace()) {
934        iter.next();
935    }
936}
937
938fn sanitize_qualified_identifier(identifier: &str) -> Option<String> {
939    let mut parts = Vec::new();
940    for segment in identifier.split('.') {
941        let trimmed = segment.trim().trim_matches('"');
942        if trimmed.is_empty() {
943            return None;
944        }
945        parts.push(sanitize_identifier(trimmed)?);
946    }
947    if parts.is_empty() {
948        return None;
949    }
950    Some(parts.join("."))
951}
952
953#[cfg(test)]
954mod tests {
955    use super::*;
956    use serde_json::json;
957
958    fn build_plan(
959        body: &Value,
960        force_camel_case_to_snake_case: bool,
961    ) -> StructuredGatewayFetchPlan {
962        build_structured_fetch_plan(body, force_camel_case_to_snake_case)
963            .expect("plan ok")
964            .expect("structured plan")
965    }
966
967    #[test]
968    fn structured_fetch_rejects_top_level_direct_ast_body() {
969        let body = json!({
970            "operation": "select",
971            "from": "orchestral_sections",
972            "fields": [
973                {
974                    "kind": "column",
975                    "name": "name"
976                }
977            ]
978        });
979
980        let err = build_structured_fetch_plan(&body, false).expect_err("ast should be rejected");
981        assert!(err.contains("first-class direct AST request bodies are not supported"));
982    }
983
984    #[test]
985    fn structured_select_collects_nested_resource_names() {
986        let body = json!({
987            "table_name": "orchestral_sections",
988            "select": {
989                "name": true,
990                "instruments": {
991                    "select": {
992                        "name": true,
993                        "players": {
994                            "select": {
995                                "display_name": true
996                            }
997                        }
998                    }
999                }
1000            }
1001        });
1002
1003        let plan = build_plan(&body, false);
1004
1005        assert_eq!(
1006            plan.resource_names(),
1007            vec![
1008                "instruments".to_string(),
1009                "orchestral_sections".to_string(),
1010                "players".to_string(),
1011            ]
1012        );
1013    }
1014
1015    #[test]
1016    fn structured_select_where_only_body_stays_legacy() {
1017        let body = json!({
1018            "table_name": "users",
1019            "where": {
1020                "id": { "eq": 1 }
1021            }
1022        });
1023
1024        assert!(
1025            build_structured_fetch_plan(&body, false)
1026                .expect("plan ok")
1027                .is_none()
1028        );
1029    }
1030
1031    #[test]
1032    fn structured_select_string_alias_and_join_parse() {
1033        let body = json!({
1034            "table_name": "public.chat_subscriptions",
1035            "select": "user_id,users:athena.users!inner(id,username)"
1036        });
1037
1038        let plan = build_plan(&body, false);
1039        assert_eq!(plan.table_name, "public.chat_subscriptions");
1040        assert_eq!(plan.query.fields.len(), 2);
1041
1042        match &plan.query.fields[1] {
1043            StructuredSelectField::Relation(relation) => {
1044                assert_eq!(relation.name, "athena.users");
1045                assert_eq!(relation.alias.as_deref(), Some("users"));
1046                assert_eq!(relation.join, StructuredJoinKind::Inner);
1047                assert_eq!(relation.display_name(), "users");
1048            }
1049            other => panic!("expected relation field, got {other:?}"),
1050        }
1051    }
1052}