Skip to main content

polyglot_sql/
query_analysis.rs

1//! Compact query analysis facts.
2//!
3//! This module intentionally builds on the existing parser, scope builder, type
4//! annotator, and lineage implementation. It is a convenience API: callers that
5//! need the full AST or full lineage graph should continue using those lower
6//! level APIs directly.
7
8use crate::ast_transforms::get_output_column_names;
9use crate::dialects::{Dialect, DialectType};
10use crate::expressions::{DataType, Expression, TableRef, With};
11use crate::lineage::{lineage_by_index_from_expression, LineageNode};
12use crate::optimizer::annotate_types::annotate_types;
13use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
14use crate::schema::{MappingSchema, Schema};
15use crate::scope::{build_scope, Scope, SourceInfo, SourceKind};
16use crate::traversal::{contains_aggregate, ExpressionWalk};
17use crate::validation::{mapping_schema_from_validation_schema, ValidationSchema};
18use crate::{parse_data_type, parse_one, Error, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::HashSet;
21
22/// Options for [`analyze_query`].
23#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24#[serde(rename_all = "camelCase", default)]
25pub struct AnalyzeQueryOptions {
26    /// SQL dialect used for parsing and dialect-aware rendering.
27    pub dialect: DialectType,
28    /// Optional validation schema used for qualification and type annotation.
29    pub schema: Option<ValidationSchema>,
30}
31
32/// Compact facts about a query's output shape and data dependencies.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct QueryAnalysis {
36    pub shape: QueryShape,
37    pub ctes: Vec<String>,
38    pub projections: Vec<ProjectionFact>,
39    pub relations: Vec<RelationFact>,
40    pub base_tables: Vec<RelationFact>,
41    pub set_operations: Vec<SetOperationFact>,
42}
43
44/// Top-level query shape.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47pub enum QueryShape {
48    Select,
49    SetOperation,
50}
51
52/// Compact fact about one output projection.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct ProjectionFact {
56    pub index: usize,
57    pub name: Option<String>,
58    pub is_star: bool,
59    pub star_table: Option<String>,
60    pub transform_kind: TransformKind,
61    pub cast_type: Option<String>,
62    pub type_hint: Option<String>,
63    pub upstream: Vec<ColumnReferenceFact>,
64}
65
66/// Compact fact about an upstream column reference.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct ColumnReferenceFact {
70    pub source_name: Option<String>,
71    pub source_alias: Option<String>,
72    pub source_kind: SourceKind,
73    pub table: Option<String>,
74    pub column: String,
75    pub unqualified: bool,
76    pub confidence: ReferenceConfidence,
77}
78
79/// Compact fact about a relation visible in the root scope.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81#[serde(rename_all = "camelCase")]
82pub struct RelationFact {
83    pub name: String,
84    pub alias: Option<String>,
85    pub kind: SourceKind,
86    pub columns: Vec<String>,
87}
88
89/// Compact fact about a set operation.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "camelCase")]
92pub struct SetOperationFact {
93    pub kind: String,
94    pub all: bool,
95    pub distinct: bool,
96    pub output_columns: Vec<String>,
97    pub branches: Vec<SetOperationBranchFact>,
98}
99
100/// Compact facts for one immediate set-operation branch.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct SetOperationBranchFact {
104    pub index: usize,
105    pub projections: Vec<ProjectionFact>,
106}
107
108/// High-level kind of transformation performed by a projection.
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub enum TransformKind {
112    Direct,
113    Cast,
114    Aggregation,
115    Constant,
116    Expression,
117    Star,
118}
119
120/// Confidence level for a compact upstream column reference.
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub enum ReferenceConfidence {
124    Resolved,
125    Ambiguous,
126    Unknown,
127}
128
129/// Analyze a single SELECT or set-operation query.
130pub fn analyze_query(sql: &str, options: AnalyzeQueryOptions) -> Result<QueryAnalysis> {
131    let mut expression = parse_one(sql, options.dialect)?;
132    expression = effective_query(expression);
133    ensure_query(&expression)?;
134
135    let mapping_schema = options
136        .schema
137        .as_ref()
138        .map(|schema| analysis_mapping_schema(schema, options.dialect));
139
140    if let Some(schema) = mapping_schema.as_ref() {
141        let qualify_options = QualifyColumnsOptions::new().with_dialect(options.dialect);
142        expression = qualify_columns(expression, schema, &qualify_options)
143            .map_err(|e| Error::internal(format!("query analysis qualification failed: {e}")))?;
144    }
145
146    let annotation_schema = mapping_schema.as_ref().map(|schema| {
147        let mut alias_schema = schema.clone();
148        add_scope_aliases_to_schema(
149            &build_scope(&expression),
150            schema,
151            &mut alias_schema,
152            options.dialect,
153        );
154        alias_schema
155    });
156
157    annotate_types(
158        &mut expression,
159        annotation_schema
160            .as_ref()
161            .map(|schema| schema as &dyn Schema),
162        Some(options.dialect),
163    );
164    crate::lineage::expand_cte_stars(
165        &mut expression,
166        annotation_schema
167            .as_ref()
168            .or(mapping_schema.as_ref())
169            .map(|schema| schema as &dyn Schema),
170    );
171
172    let scope = build_scope(&expression);
173    let shape = if is_set_operation(&expression) {
174        QueryShape::SetOperation
175    } else {
176        QueryShape::Select
177    };
178
179    Ok(QueryAnalysis {
180        shape,
181        ctes: collect_cte_names(&expression),
182        projections: projection_facts_for_query(&expression, &scope, options.dialect),
183        relations: relation_facts(&scope, mapping_schema.as_ref()),
184        base_tables: base_table_facts(&scope, mapping_schema.as_ref()),
185        set_operations: set_operation_facts(&expression, &scope, options.dialect),
186    })
187}
188
189fn analysis_mapping_schema(schema: &ValidationSchema, dialect: DialectType) -> MappingSchema {
190    let broad_schema = mapping_schema_from_validation_schema(schema);
191    let mut mapping_schema = MappingSchema::with_dialect(dialect);
192
193    for table in &schema.tables {
194        let table_names = validation_table_names(table);
195        if table_names.is_empty() {
196            continue;
197        }
198
199        let fallback_table = table_names[0].as_str();
200        let columns: Vec<(String, DataType)> = table
201            .columns
202            .iter()
203            .map(|column| {
204                let data_type = parse_analysis_data_type(&column.data_type, dialect)
205                    .unwrap_or_else(|| {
206                        broad_schema
207                            .get_column_type(fallback_table, &column.name)
208                            .unwrap_or(DataType::Unknown)
209                    });
210                (column.name.to_ascii_lowercase(), data_type)
211            })
212            .collect();
213
214        for table_name in table_names {
215            let _ = mapping_schema.add_table(&table_name, &columns, Some(dialect));
216        }
217    }
218
219    mapping_schema
220}
221
222fn validation_table_names(table: &crate::validation::SchemaTable) -> Vec<String> {
223    let mut names = Vec::new();
224
225    names.push(table.name.to_ascii_lowercase());
226    if let Some(schema_name) = &table.schema {
227        names.push(format!(
228            "{}.{}",
229            schema_name.to_ascii_lowercase(),
230            table.name.to_ascii_lowercase()
231        ));
232    }
233    for alias in &table.aliases {
234        names.push(alias.to_ascii_lowercase());
235    }
236
237    names.sort();
238    names.dedup();
239    names
240}
241
242fn parse_analysis_data_type(data_type: &str, dialect: DialectType) -> Option<DataType> {
243    let trimmed = data_type.trim();
244    if trimmed.is_empty() {
245        return None;
246    }
247    parse_data_type(trimmed, dialect).ok()
248}
249
250fn add_scope_aliases_to_schema(
251    scope: &Scope,
252    source_schema: &MappingSchema,
253    target_schema: &mut MappingSchema,
254    dialect: DialectType,
255) {
256    for child_scope in scope.traverse() {
257        for (source_name, source) in &child_scope.sources {
258            if source.kind != SourceKind::Table {
259                continue;
260            }
261            if let Some(table_name) = source_table_name(source) {
262                if source_name == &table_name {
263                    continue;
264                }
265                if let Ok(column_names) = source_schema.column_names(&table_name) {
266                    let columns: Vec<(String, DataType)> = column_names
267                        .iter()
268                        .map(|column| {
269                            (
270                                column.clone(),
271                                source_schema
272                                    .get_column_type(&table_name, column)
273                                    .unwrap_or(DataType::Unknown),
274                            )
275                        })
276                        .collect();
277                    let _ = target_schema.add_table(source_name, &columns, Some(dialect));
278                }
279            }
280        }
281    }
282}
283
284fn effective_query(expression: Expression) -> Expression {
285    match expression {
286        Expression::Prepare(prepare) => prepare.statement,
287        Expression::Subquery(subquery) if subquery.alias.is_none() => subquery.this,
288        other => other,
289    }
290}
291
292fn ensure_query(expression: &Expression) -> Result<()> {
293    if matches!(
294        expression,
295        Expression::Select(_)
296            | Expression::Union(_)
297            | Expression::Intersect(_)
298            | Expression::Except(_)
299    ) {
300        Ok(())
301    } else {
302        Err(Error::internal(
303            "analyze_query requires a SELECT or set operation query",
304        ))
305    }
306}
307
308fn is_set_operation(expression: &Expression) -> bool {
309    matches!(
310        expression,
311        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
312    )
313}
314
315fn collect_cte_names(expression: &Expression) -> Vec<String> {
316    let mut names = Vec::new();
317    let mut seen = HashSet::new();
318    collect_cte_names_inner(expression, &mut names, &mut seen);
319    names
320}
321
322fn collect_cte_names_inner(
323    expression: &Expression,
324    names: &mut Vec<String>,
325    seen: &mut HashSet<String>,
326) {
327    if let Some(with_clause) = with_clause(expression) {
328        collect_with_names(with_clause, names, seen);
329    }
330
331    match expression {
332        Expression::Union(union) => {
333            collect_cte_names_inner(&union.left, names, seen);
334            collect_cte_names_inner(&union.right, names, seen);
335        }
336        Expression::Intersect(intersect) => {
337            collect_cte_names_inner(&intersect.left, names, seen);
338            collect_cte_names_inner(&intersect.right, names, seen);
339        }
340        Expression::Except(except) => {
341            collect_cte_names_inner(&except.left, names, seen);
342            collect_cte_names_inner(&except.right, names, seen);
343        }
344        Expression::Subquery(subquery) => collect_cte_names_inner(&subquery.this, names, seen),
345        _ => {}
346    }
347}
348
349fn collect_with_names(with_clause: &With, names: &mut Vec<String>, seen: &mut HashSet<String>) {
350    for cte in &with_clause.ctes {
351        if seen.insert(cte.alias.name.clone()) {
352            names.push(cte.alias.name.clone());
353        }
354        collect_cte_names_inner(&cte.this, names, seen);
355    }
356}
357
358fn with_clause(expression: &Expression) -> Option<&With> {
359    match expression {
360        Expression::Select(select) => select.with.as_ref(),
361        Expression::Union(union) => union.with.as_ref(),
362        Expression::Intersect(intersect) => intersect.with.as_ref(),
363        Expression::Except(except) => except.with.as_ref(),
364        _ => None,
365    }
366}
367
368fn projection_facts_for_query(
369    expression: &Expression,
370    scope: &Scope,
371    dialect: DialectType,
372) -> Vec<ProjectionFact> {
373    let expressions = select_expressions_for_query(expression);
374    let names = get_output_column_names(expression);
375
376    expressions
377        .iter()
378        .enumerate()
379        .map(|(index, projection)| {
380            projection_fact(
381                index,
382                names
383                    .get(index)
384                    .cloned()
385                    .or_else(|| projection_name(projection)),
386                projection,
387                expression,
388                scope,
389                dialect,
390            )
391        })
392        .collect()
393}
394
395fn select_expressions_for_query(expression: &Expression) -> Vec<&Expression> {
396    match expression {
397        Expression::Select(select) => select.expressions.iter().collect(),
398        Expression::Union(union) => select_expressions_for_query(&union.left),
399        Expression::Intersect(intersect) => select_expressions_for_query(&intersect.left),
400        Expression::Except(except) => select_expressions_for_query(&except.left),
401        Expression::Subquery(subquery) => select_expressions_for_query(&subquery.this),
402        _ => Vec::new(),
403    }
404}
405
406fn projection_fact(
407    index: usize,
408    name: Option<String>,
409    projection: &Expression,
410    query: &Expression,
411    scope: &Scope,
412    dialect: DialectType,
413) -> ProjectionFact {
414    let inner = unwrap_projection_alias(projection);
415    let is_star = projection_is_star(inner);
416    let upstream = lineage_by_index_from_expression(index, query, Some(dialect), false)
417        .map(|node| terminal_references_from_lineage(&node))
418        .ok()
419        .filter(|refs| !refs.is_empty())
420        .unwrap_or_else(|| fallback_column_references(inner, scope));
421
422    ProjectionFact {
423        index,
424        name,
425        is_star,
426        star_table: projection_star_table(inner),
427        transform_kind: transform_kind(inner),
428        cast_type: cast_type(inner, dialect),
429        type_hint: projection
430            .inferred_type()
431            .or_else(|| inner.inferred_type())
432            .and_then(|data_type| render_data_type(data_type, dialect)),
433        upstream,
434    }
435}
436
437fn unwrap_projection_alias(expression: &Expression) -> &Expression {
438    match expression {
439        Expression::Alias(alias) => unwrap_projection_alias(&alias.this),
440        Expression::Annotated(annotated) => unwrap_projection_alias(&annotated.this),
441        Expression::Paren(paren) => unwrap_projection_alias(&paren.this),
442        _ => expression,
443    }
444}
445
446fn projection_name(expression: &Expression) -> Option<String> {
447    match expression {
448        Expression::Alias(alias) => Some(alias.alias.name.clone()),
449        Expression::Column(column) => Some(column.name.name.clone()),
450        Expression::Identifier(identifier) => Some(identifier.name.clone()),
451        Expression::Star(_) => Some("*".to_string()),
452        Expression::Annotated(annotated) => projection_name(&annotated.this),
453        _ => None,
454    }
455}
456
457fn projection_is_star(expression: &Expression) -> bool {
458    matches!(expression, Expression::Star(_))
459        || matches!(expression, Expression::Column(column) if column.name.name == "*")
460}
461
462fn projection_star_table(expression: &Expression) -> Option<String> {
463    match expression {
464        Expression::Star(star) => star
465            .table
466            .as_ref()
467            .map(|identifier| identifier.name.clone()),
468        Expression::Column(column) if column.name.name == "*" => column
469            .table
470            .as_ref()
471            .map(|identifier| identifier.name.clone()),
472        _ => None,
473    }
474}
475
476fn transform_kind(expression: &Expression) -> TransformKind {
477    if projection_is_star(expression) {
478        TransformKind::Star
479    } else if is_cast_expression(expression) {
480        TransformKind::Cast
481    } else if contains_aggregate(expression) {
482        TransformKind::Aggregation
483    } else if matches!(
484        expression,
485        Expression::Column(_) | Expression::Identifier(_)
486    ) {
487        TransformKind::Direct
488    } else if is_simple_constant(expression) {
489        TransformKind::Constant
490    } else {
491        TransformKind::Expression
492    }
493}
494
495fn is_cast_expression(expression: &Expression) -> bool {
496    matches!(
497        expression,
498        Expression::Cast(_) | Expression::TryCast(_) | Expression::SafeCast(_)
499    )
500}
501
502fn cast_type(expression: &Expression, dialect: DialectType) -> Option<String> {
503    match expression {
504        Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
505            render_data_type(&cast.to, dialect)
506        }
507        _ => None,
508    }
509}
510
511fn render_data_type(data_type: &DataType, dialect: DialectType) -> Option<String> {
512    Dialect::get(dialect)
513        .generate(&Expression::DataType(data_type.clone()))
514        .ok()
515}
516
517fn is_simple_constant(expression: &Expression) -> bool {
518    match expression {
519        Expression::Literal(_) | Expression::Boolean(_) | Expression::Null(_) => true,
520        Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
521            is_simple_constant(&cast.this)
522        }
523        Expression::Neg(unary) | Expression::BitwiseNot(unary) => is_simple_constant(&unary.this),
524        _ => false,
525    }
526}
527
528fn terminal_references_from_lineage(node: &LineageNode) -> Vec<ColumnReferenceFact> {
529    let mut refs = Vec::new();
530    collect_terminal_references(node, &mut refs);
531    dedupe_column_refs(refs)
532}
533
534fn collect_terminal_references(node: &LineageNode, refs: &mut Vec<ColumnReferenceFact>) {
535    if node.downstream.is_empty() {
536        if let Some(reference) = column_reference_from_lineage_node(node) {
537            refs.push(reference);
538        }
539        return;
540    }
541
542    for child in &node.downstream {
543        collect_terminal_references(child, refs);
544    }
545}
546
547fn column_reference_from_lineage_node(node: &LineageNode) -> Option<ColumnReferenceFact> {
548    match &node.expression {
549        Expression::Column(column) => {
550            let source_name = non_empty_string(node.source_name.clone());
551            let table =
552                lineage_node_table(node).or_else(|| column.table.as_ref().map(|t| t.name.clone()));
553            let confidence = if node.source_kind == SourceKind::Unknown && source_name.is_none() {
554                ReferenceConfidence::Unknown
555            } else {
556                ReferenceConfidence::Resolved
557            };
558            Some(ColumnReferenceFact {
559                source_name,
560                source_alias: node.source_alias.clone(),
561                source_kind: node.source_kind,
562                table,
563                column: column.name.name.clone(),
564                unqualified: column.table.is_none(),
565                confidence,
566            })
567        }
568        Expression::Star(_) => Some(ColumnReferenceFact {
569            source_name: non_empty_string(node.source_name.clone()),
570            source_alias: node.source_alias.clone(),
571            source_kind: node.source_kind,
572            table: lineage_node_table(node),
573            column: "*".to_string(),
574            unqualified: true,
575            confidence: if node.source_kind == SourceKind::Unknown {
576                ReferenceConfidence::Unknown
577            } else {
578                ReferenceConfidence::Resolved
579            },
580        }),
581        _ => None,
582    }
583}
584
585fn lineage_node_table(node: &LineageNode) -> Option<String> {
586    match &node.source {
587        Expression::Table(table) => Some(table_name(table)),
588        _ => None,
589    }
590}
591
592fn fallback_column_references(expression: &Expression, scope: &Scope) -> Vec<ColumnReferenceFact> {
593    let mut refs = Vec::new();
594    let source_count = scope.sources.len();
595    let single_source = if source_count == 1 {
596        scope.sources.iter().next()
597    } else {
598        None
599    };
600
601    for column_expr in expression.find_all(|candidate| matches!(candidate, Expression::Column(_))) {
602        if let Expression::Column(column) = column_expr {
603            if column.name.name == "*" {
604                continue;
605            }
606            let source = column
607                .table
608                .as_ref()
609                .and_then(|table| scope.sources.get(&table.name));
610            let (source_name, source_alias, source_kind, table, confidence) =
611                if let Some(table_identifier) = &column.table {
612                    if let Some(source) = source {
613                        (
614                            Some(table_identifier.name.clone()),
615                            source.alias.clone(),
616                            source.kind,
617                            source_table_name(source)
618                                .or_else(|| Some(table_identifier.name.clone())),
619                            ReferenceConfidence::Resolved,
620                        )
621                    } else {
622                        (
623                            Some(table_identifier.name.clone()),
624                            None,
625                            SourceKind::Unknown,
626                            Some(table_identifier.name.clone()),
627                            ReferenceConfidence::Unknown,
628                        )
629                    }
630                } else if let Some((name, source)) = single_source {
631                    (
632                        Some(name.clone()),
633                        source.alias.clone(),
634                        source.kind,
635                        source_table_name(source).or_else(|| Some(name.clone())),
636                        ReferenceConfidence::Resolved,
637                    )
638                } else if source_count > 1 {
639                    (
640                        None,
641                        None,
642                        SourceKind::Unknown,
643                        None,
644                        ReferenceConfidence::Ambiguous,
645                    )
646                } else {
647                    (
648                        None,
649                        None,
650                        SourceKind::Unknown,
651                        None,
652                        ReferenceConfidence::Unknown,
653                    )
654                };
655
656            refs.push(ColumnReferenceFact {
657                source_name,
658                source_alias,
659                source_kind,
660                table,
661                column: column.name.name.clone(),
662                unqualified: column.table.is_none(),
663                confidence,
664            });
665        }
666    }
667
668    dedupe_column_refs(refs)
669}
670
671fn dedupe_column_refs(refs: Vec<ColumnReferenceFact>) -> Vec<ColumnReferenceFact> {
672    let mut seen = HashSet::new();
673    let mut deduped = Vec::new();
674
675    for reference in refs {
676        let key = (
677            reference.source_name.clone(),
678            reference.source_alias.clone(),
679            reference.table.clone(),
680            reference.column.clone(),
681            format!("{:?}", reference.source_kind),
682            reference.unqualified,
683            format!("{:?}", reference.confidence),
684        );
685        if seen.insert(key) {
686            deduped.push(reference);
687        }
688    }
689
690    deduped
691}
692
693fn relation_facts(
694    scope: &Scope,
695    mapping_schema: Option<&crate::schema::MappingSchema>,
696) -> Vec<RelationFact> {
697    let mut relations = Vec::new();
698    let mut seen = HashSet::new();
699    collect_relation_facts(scope, mapping_schema, &mut seen, &mut relations);
700
701    relations.sort_by(|left, right| {
702        left.name
703            .cmp(&right.name)
704            .then_with(|| left.alias.cmp(&right.alias))
705    });
706    relations
707}
708
709fn collect_relation_facts(
710    scope: &Scope,
711    mapping_schema: Option<&crate::schema::MappingSchema>,
712    seen: &mut HashSet<String>,
713    relations: &mut Vec<RelationFact>,
714) {
715    for relation in scope
716        .sources
717        .iter()
718        .map(|(source_name, source)| RelationFact {
719            name: source
720                .lineage_name
721                .clone()
722                .or_else(|| source_table_name(source))
723                .unwrap_or_else(|| source_name.clone()),
724            alias: source.alias.clone().or_else(|| source_alias(source)),
725            kind: source.kind,
726            columns: source_columns(source, mapping_schema),
727        })
728    {
729        let key = format!("{:?}|{}|{:?}", relation.kind, relation.name, relation.alias);
730        if seen.insert(key) {
731            relations.push(relation);
732        }
733    }
734
735    for branch_scope in &scope.union_scopes {
736        collect_relation_facts(branch_scope, mapping_schema, seen, relations);
737    }
738}
739
740fn base_table_facts(
741    scope: &Scope,
742    mapping_schema: Option<&crate::schema::MappingSchema>,
743) -> Vec<RelationFact> {
744    let mut relations = Vec::new();
745    let mut seen = HashSet::new();
746
747    for child_scope in scope.traverse() {
748        for source in child_scope.sources.values() {
749            if source.kind != SourceKind::Table {
750                continue;
751            }
752
753            let Some(table_name) = source_table_name(source) else {
754                continue;
755            };
756
757            if seen.insert(table_name.clone()) {
758                relations.push(RelationFact {
759                    name: table_name,
760                    alias: source.alias.clone().or_else(|| source_alias(source)),
761                    kind: SourceKind::Table,
762                    columns: source_columns(source, mapping_schema),
763                });
764            }
765        }
766    }
767
768    relations.sort_by(|left, right| left.name.cmp(&right.name));
769    relations
770}
771
772fn source_columns(
773    source: &SourceInfo,
774    mapping_schema: Option<&crate::schema::MappingSchema>,
775) -> Vec<String> {
776    match &source.expression {
777        Expression::Table(table) => mapping_schema
778            .and_then(|schema| schema.column_names(&table_name(table)).ok())
779            .unwrap_or_default(),
780        Expression::Select(_)
781        | Expression::Union(_)
782        | Expression::Intersect(_)
783        | Expression::Except(_) => get_output_column_names(&source.expression),
784        Expression::Subquery(subquery) => get_output_column_names(&subquery.this),
785        Expression::Cte(cte) if !cte.columns.is_empty() => cte
786            .columns
787            .iter()
788            .map(|column| column.name.clone())
789            .collect(),
790        Expression::Cte(cte) => get_output_column_names(&cte.this),
791        _ => Vec::new(),
792    }
793}
794
795fn source_table_name(source: &SourceInfo) -> Option<String> {
796    match &source.expression {
797        Expression::Table(table) => Some(table_name(table)),
798        _ => None,
799    }
800}
801
802fn source_alias(source: &SourceInfo) -> Option<String> {
803    match &source.expression {
804        Expression::Table(table) => table.alias.as_ref().map(|alias| alias.name.clone()),
805        Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
806        _ => None,
807    }
808}
809
810fn table_name(table: &TableRef) -> String {
811    let mut parts = Vec::new();
812    if let Some(catalog) = &table.catalog {
813        parts.push(catalog.name.clone());
814    }
815    if let Some(schema) = &table.schema {
816        parts.push(schema.name.clone());
817    }
818    parts.push(table.name.name.clone());
819    parts.join(".")
820}
821
822fn set_operation_facts(
823    expression: &Expression,
824    scope: &Scope,
825    dialect: DialectType,
826) -> Vec<SetOperationFact> {
827    let mut facts = Vec::new();
828    collect_set_operation_facts(expression, scope, dialect, &mut facts);
829    facts
830}
831
832fn collect_set_operation_facts(
833    expression: &Expression,
834    scope: &Scope,
835    dialect: DialectType,
836    facts: &mut Vec<SetOperationFact>,
837) {
838    match expression {
839        Expression::Union(union) => {
840            facts.push(SetOperationFact {
841                kind: "union".to_string(),
842                all: union.all,
843                distinct: union.distinct,
844                output_columns: get_output_column_names(expression),
845                branches: set_operation_branches(&union.left, &union.right, scope, dialect),
846            });
847            collect_set_operation_facts(&union.left, scope, dialect, facts);
848            collect_set_operation_facts(&union.right, scope, dialect, facts);
849        }
850        Expression::Intersect(intersect) => {
851            facts.push(SetOperationFact {
852                kind: "intersect".to_string(),
853                all: intersect.all,
854                distinct: intersect.distinct,
855                output_columns: get_output_column_names(expression),
856                branches: set_operation_branches(&intersect.left, &intersect.right, scope, dialect),
857            });
858            collect_set_operation_facts(&intersect.left, scope, dialect, facts);
859            collect_set_operation_facts(&intersect.right, scope, dialect, facts);
860        }
861        Expression::Except(except) => {
862            facts.push(SetOperationFact {
863                kind: "except".to_string(),
864                all: except.all,
865                distinct: except.distinct,
866                output_columns: get_output_column_names(expression),
867                branches: set_operation_branches(&except.left, &except.right, scope, dialect),
868            });
869            collect_set_operation_facts(&except.left, scope, dialect, facts);
870            collect_set_operation_facts(&except.right, scope, dialect, facts);
871        }
872        Expression::Subquery(subquery) => {
873            collect_set_operation_facts(&subquery.this, scope, dialect, facts);
874        }
875        _ => {}
876    }
877}
878
879fn set_operation_branches(
880    left: &Expression,
881    right: &Expression,
882    scope: &Scope,
883    dialect: DialectType,
884) -> Vec<SetOperationBranchFact> {
885    vec![
886        SetOperationBranchFact {
887            index: 0,
888            projections: projection_facts_for_branch(left, scope, dialect),
889        },
890        SetOperationBranchFact {
891            index: 1,
892            projections: projection_facts_for_branch(right, scope, dialect),
893        },
894    ]
895}
896
897fn projection_facts_for_branch(
898    expression: &Expression,
899    root_scope: &Scope,
900    dialect: DialectType,
901) -> Vec<ProjectionFact> {
902    let branch_scope = build_scope(expression);
903    let scope = if branch_scope.sources.is_empty() {
904        root_scope
905    } else {
906        &branch_scope
907    };
908    projection_facts_for_query(expression, scope, dialect)
909}
910
911fn non_empty_string(value: String) -> Option<String> {
912    if value.is_empty() {
913        None
914    } else {
915        Some(value)
916    }
917}