Skip to main content

polyglot_sql/
query_analysis.rs

1//! Compact query analysis facts.
2//!
3//! This module intentionally builds on the existing parser, scope builder, type
4//! annotator, and lineage implementation. It is a convenience API: callers that
5//! need the full AST or full lineage graph should continue using those lower
6//! level APIs directly.
7
8use crate::ast_transforms::get_output_column_names;
9use crate::dialects::{Dialect, DialectType};
10use crate::expressions::{DataType, Expression, JoinKind, TableRef, With};
11use crate::lineage::{lineage_by_index_from_expression, LineageNode};
12use crate::optimizer::annotate_types::annotate_types;
13use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
14use crate::schema::{MappingSchema, Schema};
15use crate::scope::{build_scope, Scope, SourceInfo, SourceKind};
16use crate::traversal::{contains_aggregate, ExpressionWalk};
17use crate::validation::{mapping_schema_from_validation_schema, ValidationSchema};
18use crate::{parse_data_type, parse_one, Error, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21
22/// Options for [`analyze_query`].
23#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24#[serde(rename_all = "camelCase", default)]
25pub struct AnalyzeQueryOptions {
26    /// SQL dialect used for parsing and dialect-aware rendering.
27    pub dialect: DialectType,
28    /// Optional validation schema used for qualification and type annotation.
29    pub schema: Option<ValidationSchema>,
30}
31
32/// Compact facts about a query's output shape and data dependencies.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct QueryAnalysis {
36    pub shape: QueryShape,
37    pub ctes: Vec<String>,
38    pub cte_facts: Vec<CteFact>,
39    pub projections: Vec<ProjectionFact>,
40    pub relations: Vec<RelationFact>,
41    pub base_tables: Vec<RelationFact>,
42    pub star_projections: Vec<StarProjectionFact>,
43    pub set_operations: Vec<SetOperationFact>,
44}
45
46/// Top-level query shape.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum QueryShape {
50    Select,
51    SetOperation,
52}
53
54/// Compact fact about one output projection.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(rename_all = "camelCase")]
57pub struct ProjectionFact {
58    pub index: usize,
59    pub name: Option<String>,
60    pub is_star: bool,
61    pub star_table: Option<String>,
62    pub transform_kind: TransformKind,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub transform_function: Option<TransformFunctionFact>,
65    pub cast_type: Option<String>,
66    pub type_hint: Option<String>,
67    pub nullability: ProjectionNullability,
68    pub upstream: Vec<ColumnReferenceFact>,
69}
70
71/// Compact fact about a function-like projection transform.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct TransformFunctionFact {
75    pub name: String,
76    pub literal_args: Vec<String>,
77    pub column_args: Vec<ColumnReferenceFact>,
78}
79
80/// Compact fact about one top-level CTE definition.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct CteFact {
84    pub name: String,
85    pub columns: Vec<String>,
86    pub body_sql: String,
87    pub output_columns: Vec<String>,
88}
89
90/// Compact fact about one original star projection.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "camelCase")]
93pub struct StarProjectionFact {
94    pub index: usize,
95    pub table: Option<String>,
96    pub expanded_columns: Vec<String>,
97}
98
99/// Compact fact about an upstream column reference.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "camelCase")]
102pub struct ColumnReferenceFact {
103    pub source_name: Option<String>,
104    pub source_alias: Option<String>,
105    pub source_kind: SourceKind,
106    pub table: Option<String>,
107    pub column: String,
108    pub unqualified: bool,
109    pub confidence: ReferenceConfidence,
110}
111
112/// Compact fact about a relation visible in the root scope.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct RelationFact {
116    pub name: String,
117    pub alias: Option<String>,
118    pub kind: SourceKind,
119    pub columns: Vec<String>,
120    pub catalog: Option<String>,
121    pub schema: Option<String>,
122    pub table: Option<String>,
123}
124
125/// Compact fact about a set operation.
126#[derive(Debug, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct SetOperationFact {
129    pub kind: String,
130    pub all: bool,
131    pub distinct: bool,
132    pub output_columns: Vec<String>,
133    pub branches: Vec<SetOperationBranchFact>,
134}
135
136/// Compact facts for one immediate set-operation branch.
137#[derive(Debug, Clone, Serialize, Deserialize)]
138#[serde(rename_all = "camelCase")]
139pub struct SetOperationBranchFact {
140    pub index: usize,
141    pub projections: Vec<ProjectionFact>,
142}
143
144/// High-level kind of transformation performed by a projection.
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub enum TransformKind {
148    Direct,
149    Cast,
150    Aggregation,
151    Constant,
152    Expression,
153    Star,
154}
155
156/// Confidence level for a compact upstream column reference.
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum ReferenceConfidence {
160    Resolved,
161    Ambiguous,
162    Unknown,
163}
164
165/// Conservative nullability classification for one output projection.
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum ProjectionNullability {
169    NonNull,
170    Nullable,
171    Unknown,
172}
173
174/// Analyze a single SELECT or set-operation query.
175pub fn analyze_query(sql: &str, options: AnalyzeQueryOptions) -> Result<QueryAnalysis> {
176    let mut expression = parse_one(sql, options.dialect)?;
177    expression = effective_query(expression);
178    ensure_query(&expression)?;
179    let original_expression = expression.clone();
180
181    let mapping_schema = options
182        .schema
183        .as_ref()
184        .map(|schema| analysis_mapping_schema(schema, options.dialect));
185    let schema_info = options.schema.as_ref().map(AnalysisSchemaInfo::from_schema);
186    let cte_facts = top_level_cte_facts(&original_expression, options.dialect)?;
187    let star_projections = star_projection_facts(&original_expression, mapping_schema.as_ref());
188
189    if let Some(schema) = mapping_schema.as_ref() {
190        let qualify_options = QualifyColumnsOptions::new().with_dialect(options.dialect);
191        expression = qualify_columns(expression, schema, &qualify_options)
192            .map_err(|e| Error::internal(format!("query analysis qualification failed: {e}")))?;
193    }
194
195    let annotation_schema = mapping_schema.as_ref().map(|schema| {
196        let mut alias_schema = schema.clone();
197        add_scope_aliases_to_schema(
198            &build_scope(&expression),
199            schema,
200            &mut alias_schema,
201            options.dialect,
202        );
203        alias_schema
204    });
205
206    annotate_types(
207        &mut expression,
208        annotation_schema
209            .as_ref()
210            .map(|schema| schema as &dyn Schema),
211        Some(options.dialect),
212    );
213    crate::lineage::expand_cte_stars(
214        &mut expression,
215        annotation_schema
216            .as_ref()
217            .or(mapping_schema.as_ref())
218            .map(|schema| schema as &dyn Schema),
219    );
220
221    let scope = build_scope(&expression);
222    let nullability_context = NullabilityContext {
223        schema: schema_info.as_ref(),
224        nullable_sources: nullable_source_names(&expression),
225    };
226    let shape = if is_set_operation(&expression) {
227        QueryShape::SetOperation
228    } else {
229        QueryShape::Select
230    };
231
232    Ok(QueryAnalysis {
233        shape,
234        ctes: collect_cte_names(&expression),
235        cte_facts,
236        projections: projection_facts_for_query(
237            &expression,
238            &scope,
239            options.dialect,
240            &nullability_context,
241        ),
242        relations: relation_facts(&scope, mapping_schema.as_ref()),
243        base_tables: base_table_facts(&scope, mapping_schema.as_ref()),
244        star_projections,
245        set_operations: set_operation_facts(&expression, &scope, options.dialect),
246    })
247}
248
249fn analysis_mapping_schema(schema: &ValidationSchema, dialect: DialectType) -> MappingSchema {
250    let broad_schema = mapping_schema_from_validation_schema(schema);
251    let mut mapping_schema = MappingSchema::with_dialect(dialect);
252
253    for table in &schema.tables {
254        let table_names = validation_table_names(table);
255        if table_names.is_empty() {
256            continue;
257        }
258
259        let fallback_table = table_names[0].as_str();
260        let columns: Vec<(String, DataType)> = table
261            .columns
262            .iter()
263            .map(|column| {
264                let data_type = parse_analysis_data_type(&column.data_type, dialect)
265                    .unwrap_or_else(|| {
266                        broad_schema
267                            .get_column_type(fallback_table, &column.name)
268                            .unwrap_or(DataType::Unknown)
269                    });
270                (column.name.to_ascii_lowercase(), data_type)
271            })
272            .collect();
273
274        for table_name in table_names {
275            let _ = mapping_schema.add_table(&table_name, &columns, Some(dialect));
276        }
277    }
278
279    mapping_schema
280}
281
282fn validation_table_names(table: &crate::validation::SchemaTable) -> Vec<String> {
283    let mut names = Vec::new();
284
285    names.push(table.name.to_ascii_lowercase());
286    if let Some(schema_name) = &table.schema {
287        names.push(format!(
288            "{}.{}",
289            schema_name.to_ascii_lowercase(),
290            table.name.to_ascii_lowercase()
291        ));
292    }
293    for alias in &table.aliases {
294        names.push(alias.to_ascii_lowercase());
295    }
296
297    names.sort();
298    names.dedup();
299    names
300}
301
302fn parse_analysis_data_type(data_type: &str, dialect: DialectType) -> Option<DataType> {
303    let trimmed = data_type.trim();
304    if trimmed.is_empty() {
305        return None;
306    }
307    parse_data_type(trimmed, dialect).ok()
308}
309
310fn add_scope_aliases_to_schema(
311    scope: &Scope,
312    source_schema: &MappingSchema,
313    target_schema: &mut MappingSchema,
314    dialect: DialectType,
315) {
316    for child_scope in scope.traverse() {
317        for (source_name, source) in &child_scope.sources {
318            if source.kind != SourceKind::Table {
319                continue;
320            }
321            if let Some(table_name) = source_table_name(source) {
322                if source_name == &table_name {
323                    continue;
324                }
325                if let Ok(column_names) = source_schema.column_names(&table_name) {
326                    let columns: Vec<(String, DataType)> = column_names
327                        .iter()
328                        .map(|column| {
329                            (
330                                column.clone(),
331                                source_schema
332                                    .get_column_type(&table_name, column)
333                                    .unwrap_or(DataType::Unknown),
334                            )
335                        })
336                        .collect();
337                    let _ = target_schema.add_table(source_name, &columns, Some(dialect));
338                }
339            }
340        }
341    }
342}
343
344#[derive(Debug, Clone)]
345struct AnalysisColumnInfo {
346    nullable: Option<bool>,
347    primary_key: bool,
348}
349
350#[derive(Debug, Clone)]
351struct AnalysisSchemaInfo {
352    columns: HashMap<(String, String), AnalysisColumnInfo>,
353}
354
355impl AnalysisSchemaInfo {
356    fn from_schema(schema: &ValidationSchema) -> Self {
357        let mut columns = HashMap::new();
358
359        for table in &schema.tables {
360            let table_names = validation_table_names(table);
361            let primary_keys: HashSet<String> = table
362                .primary_key
363                .iter()
364                .map(|column| column.to_ascii_lowercase())
365                .collect();
366
367            for column in &table.columns {
368                let info = AnalysisColumnInfo {
369                    nullable: column.nullable,
370                    primary_key: column.primary_key
371                        || primary_keys.contains(&column.name.to_ascii_lowercase()),
372                };
373
374                for table_name in &table_names {
375                    columns.insert(
376                        (
377                            normalize_lookup_name(table_name),
378                            normalize_lookup_name(&column.name),
379                        ),
380                        info.clone(),
381                    );
382                }
383            }
384        }
385
386        Self { columns }
387    }
388
389    fn column(&self, table: &str, column: &str) -> Option<&AnalysisColumnInfo> {
390        self.columns
391            .get(&(normalize_lookup_name(table), normalize_lookup_name(column)))
392    }
393}
394
395struct NullabilityContext<'a> {
396    schema: Option<&'a AnalysisSchemaInfo>,
397    nullable_sources: HashSet<String>,
398}
399
400fn top_level_cte_facts(expression: &Expression, dialect: DialectType) -> Result<Vec<CteFact>> {
401    let Some(with_clause) = with_clause(expression) else {
402        return Ok(Vec::new());
403    };
404
405    with_clause
406        .ctes
407        .iter()
408        .map(|cte| {
409            Ok(CteFact {
410                name: cte.alias.name.clone(),
411                columns: cte
412                    .columns
413                    .iter()
414                    .map(|column| column.name.clone())
415                    .collect(),
416                body_sql: Dialect::get(dialect).generate(&cte.this)?,
417                output_columns: get_output_column_names(&cte.this),
418            })
419        })
420        .collect()
421}
422
423fn star_projection_facts(
424    expression: &Expression,
425    mapping_schema: Option<&MappingSchema>,
426) -> Vec<StarProjectionFact> {
427    let scope = build_scope(expression);
428    let ordered_sources = ordered_source_names_for_query(expression);
429
430    select_expressions_for_query(expression)
431        .iter()
432        .enumerate()
433        .filter_map(|(index, projection)| {
434            let inner = unwrap_projection_alias(projection);
435            if !projection_is_star(inner) {
436                return None;
437            }
438
439            let table = projection_star_table(inner);
440            let expanded_columns =
441                expanded_star_columns(table.as_deref(), &scope, &ordered_sources, mapping_schema);
442
443            Some(StarProjectionFact {
444                index,
445                table,
446                expanded_columns,
447            })
448        })
449        .collect()
450}
451
452fn expanded_star_columns(
453    star_table: Option<&str>,
454    scope: &Scope,
455    ordered_sources: &[String],
456    mapping_schema: Option<&MappingSchema>,
457) -> Vec<String> {
458    let mut columns = Vec::new();
459    let mut source_names: Vec<String> = if ordered_sources.is_empty() {
460        let mut names: Vec<_> = scope.sources.keys().cloned().collect();
461        names.sort();
462        names
463    } else {
464        ordered_sources.to_vec()
465    };
466
467    source_names.dedup();
468
469    for source_name in source_names {
470        let Some(source) = scope.sources.get(&source_name) else {
471            continue;
472        };
473
474        if let Some(star_table) = star_table {
475            let matches = source_name.eq_ignore_ascii_case(star_table)
476                || source
477                    .alias
478                    .as_deref()
479                    .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
480                || source_table_name(source)
481                    .is_some_and(|table| table.eq_ignore_ascii_case(star_table));
482
483            if !matches {
484                continue;
485            }
486        }
487
488        columns.extend(source_columns(source, mapping_schema));
489    }
490
491    columns
492}
493
494fn ordered_source_names_for_query(expression: &Expression) -> Vec<String> {
495    match expression {
496        Expression::Select(select) => ordered_source_names_for_select(select),
497        Expression::Union(union) => ordered_source_names_for_query(&union.left),
498        Expression::Intersect(intersect) => ordered_source_names_for_query(&intersect.left),
499        Expression::Except(except) => ordered_source_names_for_query(&except.left),
500        Expression::Subquery(subquery) => ordered_source_names_for_query(&subquery.this),
501        _ => Vec::new(),
502    }
503}
504
505fn ordered_source_names_for_select(select: &crate::expressions::Select) -> Vec<String> {
506    let mut sources = Vec::new();
507
508    if let Some(from) = &select.from {
509        for expression in &from.expressions {
510            if let Some(source_name) = expression_source_name(expression) {
511                sources.push(source_name);
512            }
513        }
514    }
515
516    for join in &select.joins {
517        if let Some(source_name) = expression_source_name(&join.this) {
518            sources.push(source_name);
519        }
520    }
521
522    sources
523}
524
525fn nullable_source_names(expression: &Expression) -> HashSet<String> {
526    match expression {
527        Expression::Select(select) => nullable_source_names_for_select(select),
528        Expression::Union(union) => nullable_source_names(&union.left),
529        Expression::Intersect(intersect) => nullable_source_names(&intersect.left),
530        Expression::Except(except) => nullable_source_names(&except.left),
531        Expression::Subquery(subquery) => nullable_source_names(&subquery.this),
532        _ => HashSet::new(),
533    }
534}
535
536fn nullable_source_names_for_select(select: &crate::expressions::Select) -> HashSet<String> {
537    let mut nullable = HashSet::new();
538    let mut left_sources = Vec::new();
539
540    if let Some(from) = &select.from {
541        for expression in &from.expressions {
542            if let Some(source_name) = expression_source_name(expression) {
543                left_sources.push(source_name);
544            }
545        }
546    }
547
548    for join in &select.joins {
549        let right_source = expression_source_name(&join.this);
550
551        if join_nullable_left(join.kind) {
552            for source_name in &left_sources {
553                nullable.insert(normalize_lookup_name(source_name));
554            }
555        }
556
557        if join_nullable_right(join.kind) {
558            if let Some(source_name) = &right_source {
559                nullable.insert(normalize_lookup_name(source_name));
560            }
561        }
562
563        if let Some(source_name) = right_source {
564            left_sources.push(source_name);
565        }
566    }
567
568    nullable
569}
570
571fn join_nullable_left(kind: JoinKind) -> bool {
572    matches!(
573        kind,
574        JoinKind::Right
575            | JoinKind::NaturalRight
576            | JoinKind::AsOfRight
577            | JoinKind::Full
578            | JoinKind::NaturalFull
579            | JoinKind::Outer
580    )
581}
582
583fn join_nullable_right(kind: JoinKind) -> bool {
584    matches!(
585        kind,
586        JoinKind::Left
587            | JoinKind::NaturalLeft
588            | JoinKind::AsOfLeft
589            | JoinKind::LeftLateral
590            | JoinKind::OuterApply
591            | JoinKind::LeftArray
592            | JoinKind::Full
593            | JoinKind::NaturalFull
594            | JoinKind::Outer
595    )
596}
597
598fn expression_source_name(expression: &Expression) -> Option<String> {
599    match expression {
600        Expression::Table(table) => table
601            .alias
602            .as_ref()
603            .map(|alias| alias.name.clone())
604            .or_else(|| Some(table.name.name.clone())),
605        Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
606        Expression::Alias(alias) => Some(alias.alias.name.clone()),
607        Expression::Cte(cte) => Some(cte.alias.name.clone()),
608        _ => None,
609    }
610}
611
612fn normalize_lookup_name(name: &str) -> String {
613    name.to_ascii_lowercase()
614}
615
616fn effective_query(expression: Expression) -> Expression {
617    match expression {
618        Expression::Prepare(prepare) => prepare.statement,
619        Expression::Subquery(subquery) if subquery.alias.is_none() => subquery.this,
620        other => other,
621    }
622}
623
624fn ensure_query(expression: &Expression) -> Result<()> {
625    if matches!(
626        expression,
627        Expression::Select(_)
628            | Expression::Union(_)
629            | Expression::Intersect(_)
630            | Expression::Except(_)
631    ) {
632        Ok(())
633    } else {
634        Err(Error::internal(
635            "analyze_query requires a SELECT or set operation query",
636        ))
637    }
638}
639
640fn is_set_operation(expression: &Expression) -> bool {
641    matches!(
642        expression,
643        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
644    )
645}
646
647fn collect_cte_names(expression: &Expression) -> Vec<String> {
648    let mut names = Vec::new();
649    let mut seen = HashSet::new();
650    collect_cte_names_inner(expression, &mut names, &mut seen);
651    names
652}
653
654fn collect_cte_names_inner(
655    expression: &Expression,
656    names: &mut Vec<String>,
657    seen: &mut HashSet<String>,
658) {
659    if let Some(with_clause) = with_clause(expression) {
660        collect_with_names(with_clause, names, seen);
661    }
662
663    match expression {
664        Expression::Union(union) => {
665            collect_cte_names_inner(&union.left, names, seen);
666            collect_cte_names_inner(&union.right, names, seen);
667        }
668        Expression::Intersect(intersect) => {
669            collect_cte_names_inner(&intersect.left, names, seen);
670            collect_cte_names_inner(&intersect.right, names, seen);
671        }
672        Expression::Except(except) => {
673            collect_cte_names_inner(&except.left, names, seen);
674            collect_cte_names_inner(&except.right, names, seen);
675        }
676        Expression::Subquery(subquery) => collect_cte_names_inner(&subquery.this, names, seen),
677        _ => {}
678    }
679}
680
681fn collect_with_names(with_clause: &With, names: &mut Vec<String>, seen: &mut HashSet<String>) {
682    for cte in &with_clause.ctes {
683        if seen.insert(cte.alias.name.clone()) {
684            names.push(cte.alias.name.clone());
685        }
686        collect_cte_names_inner(&cte.this, names, seen);
687    }
688}
689
690fn with_clause(expression: &Expression) -> Option<&With> {
691    match expression {
692        Expression::Select(select) => select.with.as_ref(),
693        Expression::Union(union) => union.with.as_ref(),
694        Expression::Intersect(intersect) => intersect.with.as_ref(),
695        Expression::Except(except) => except.with.as_ref(),
696        _ => None,
697    }
698}
699
700fn projection_facts_for_query(
701    expression: &Expression,
702    scope: &Scope,
703    dialect: DialectType,
704    nullability_context: &NullabilityContext<'_>,
705) -> Vec<ProjectionFact> {
706    let expressions = select_expressions_for_query(expression);
707    let names = get_output_column_names(expression);
708
709    expressions
710        .iter()
711        .enumerate()
712        .map(|(index, projection)| {
713            projection_fact(
714                index,
715                names
716                    .get(index)
717                    .cloned()
718                    .or_else(|| projection_name(projection)),
719                projection,
720                expression,
721                scope,
722                dialect,
723                nullability_context,
724            )
725        })
726        .collect()
727}
728
729fn select_expressions_for_query(expression: &Expression) -> Vec<&Expression> {
730    match expression {
731        Expression::Select(select) => select.expressions.iter().collect(),
732        Expression::Union(union) => select_expressions_for_query(&union.left),
733        Expression::Intersect(intersect) => select_expressions_for_query(&intersect.left),
734        Expression::Except(except) => select_expressions_for_query(&except.left),
735        Expression::Subquery(subquery) => select_expressions_for_query(&subquery.this),
736        _ => Vec::new(),
737    }
738}
739
740fn projection_fact(
741    index: usize,
742    name: Option<String>,
743    projection: &Expression,
744    query: &Expression,
745    scope: &Scope,
746    dialect: DialectType,
747    nullability_context: &NullabilityContext<'_>,
748) -> ProjectionFact {
749    let inner = unwrap_projection_alias(projection);
750    let is_star = projection_is_star(inner);
751    let upstream = lineage_by_index_from_expression(index, query, Some(dialect), false)
752        .map(|node| terminal_references_from_lineage(&node))
753        .ok()
754        .filter(|refs| !refs.is_empty())
755        .unwrap_or_else(|| fallback_column_references(inner, scope));
756
757    ProjectionFact {
758        index,
759        name,
760        is_star,
761        star_table: projection_star_table(inner),
762        transform_kind: transform_kind(inner),
763        transform_function: transform_function_fact(inner, scope, dialect),
764        cast_type: cast_type(inner, dialect),
765        type_hint: projection
766            .inferred_type()
767            .or_else(|| inner.inferred_type())
768            .and_then(|data_type| render_data_type(data_type, dialect)),
769        nullability: projection_nullability(inner, scope, nullability_context),
770        upstream,
771    }
772}
773
774fn transform_function_fact(
775    expression: &Expression,
776    scope: &Scope,
777    dialect: DialectType,
778) -> Option<TransformFunctionFact> {
779    let mut matches = expression
780        .find_all(|candidate| transform_function_fact_for_node(candidate, scope, dialect).is_some())
781        .into_iter();
782
783    let first = matches.next()?;
784    if matches.next().is_some() {
785        return None;
786    }
787
788    transform_function_fact_for_node(first, scope, dialect)
789}
790
791fn transform_function_fact_for_node(
792    expression: &Expression,
793    scope: &Scope,
794    dialect: DialectType,
795) -> Option<TransformFunctionFact> {
796    match expression {
797        Expression::Function(function) => Some(transform_function_from_args(
798            &function.name,
799            &function.args,
800            scope,
801            dialect,
802        )),
803        Expression::AggregateFunction(function) => Some(transform_function_from_args(
804            &function.name,
805            &function.args,
806            scope,
807            dialect,
808        )),
809        Expression::DateTrunc(function) => Some(transform_function_from_parts(
810            "DATE_TRUNC",
811            vec![datetime_field_name(&function.unit)],
812            vec![&function.this],
813            scope,
814            dialect,
815        )),
816        Expression::TimestampTrunc(function) => Some(transform_function_from_parts(
817            "TIMESTAMP_TRUNC",
818            vec![datetime_field_name(&function.unit)],
819            vec![&function.this],
820            scope,
821            dialect,
822        )),
823        Expression::TimeTrunc(function) => {
824            let mut args = vec![function.this.as_ref()];
825            if let Some(zone) = function.zone.as_deref() {
826                args.push(zone);
827            }
828            Some(transform_function_from_parts(
829                "TIME_TRUNC",
830                vec![function.unit.clone()],
831                args,
832                scope,
833                dialect,
834            ))
835        }
836        Expression::Extract(function) => Some(transform_function_from_parts(
837            "EXTRACT",
838            vec![datetime_field_name(&function.field)],
839            vec![&function.this],
840            scope,
841            dialect,
842        )),
843        Expression::DateAdd(function) => Some(transform_function_from_parts(
844            "DATE_ADD",
845            Vec::new(),
846            vec![&function.this, &function.interval],
847            scope,
848            dialect,
849        )),
850        Expression::DateSub(function) => Some(transform_function_from_parts(
851            "DATE_SUB",
852            Vec::new(),
853            vec![&function.this, &function.interval],
854            scope,
855            dialect,
856        )),
857        Expression::DateDiff(function) => Some(transform_function_from_parts(
858            "DATE_DIFF",
859            Vec::new(),
860            vec![&function.this, &function.expression],
861            scope,
862            dialect,
863        )),
864        _ => None,
865    }
866}
867
868fn transform_function_from_args(
869    name: &str,
870    args: &[Expression],
871    scope: &Scope,
872    dialect: DialectType,
873) -> TransformFunctionFact {
874    let literal_args = args
875        .iter()
876        .filter_map(|arg| literal_argument(arg, dialect))
877        .collect();
878    transform_function_from_parts(name, literal_args, args.iter().collect(), scope, dialect)
879}
880
881fn transform_function_from_parts(
882    name: &str,
883    literal_args: Vec<String>,
884    args: Vec<&Expression>,
885    scope: &Scope,
886    _dialect: DialectType,
887) -> TransformFunctionFact {
888    let column_args = dedupe_column_refs(
889        args.into_iter()
890            .flat_map(|arg| fallback_column_references(arg, scope))
891            .collect(),
892    );
893
894    TransformFunctionFact {
895        name: name.to_string(),
896        literal_args,
897        column_args,
898    }
899}
900
901fn literal_argument(expression: &Expression, dialect: DialectType) -> Option<String> {
902    match expression {
903        Expression::Literal(literal) => Some(literal.value_str().to_string()),
904        Expression::Boolean(boolean) => Some(boolean.value.to_string()),
905        Expression::Null(_) => Some("NULL".to_string()),
906        Expression::Identifier(identifier) => Some(identifier.name.clone()),
907        Expression::Var(var) => Some(var.this.clone()),
908        Expression::DataType(data_type) => render_data_type(data_type, dialect),
909        _ => None,
910    }
911}
912
913fn datetime_field_name(field: &crate::expressions::DateTimeField) -> String {
914    match field {
915        crate::expressions::DateTimeField::Year => "year".to_string(),
916        crate::expressions::DateTimeField::Month => "month".to_string(),
917        crate::expressions::DateTimeField::Day => "day".to_string(),
918        crate::expressions::DateTimeField::Hour => "hour".to_string(),
919        crate::expressions::DateTimeField::Minute => "minute".to_string(),
920        crate::expressions::DateTimeField::Second => "second".to_string(),
921        crate::expressions::DateTimeField::Millisecond => "millisecond".to_string(),
922        crate::expressions::DateTimeField::Microsecond => "microsecond".to_string(),
923        crate::expressions::DateTimeField::DayOfWeek => "day_of_week".to_string(),
924        crate::expressions::DateTimeField::DayOfYear => "day_of_year".to_string(),
925        crate::expressions::DateTimeField::Week => "week".to_string(),
926        crate::expressions::DateTimeField::WeekWithModifier(modifier) => {
927            format!("week({modifier})")
928        }
929        crate::expressions::DateTimeField::Quarter => "quarter".to_string(),
930        crate::expressions::DateTimeField::Epoch => "epoch".to_string(),
931        crate::expressions::DateTimeField::Timezone => "timezone".to_string(),
932        crate::expressions::DateTimeField::TimezoneHour => "timezone_hour".to_string(),
933        crate::expressions::DateTimeField::TimezoneMinute => "timezone_minute".to_string(),
934        crate::expressions::DateTimeField::Date => "date".to_string(),
935        crate::expressions::DateTimeField::Time => "time".to_string(),
936        crate::expressions::DateTimeField::Custom(name) => name.clone(),
937    }
938}
939
940fn unwrap_projection_alias(expression: &Expression) -> &Expression {
941    match expression {
942        Expression::Alias(alias) => unwrap_projection_alias(&alias.this),
943        Expression::Annotated(annotated) => unwrap_projection_alias(&annotated.this),
944        Expression::Paren(paren) => unwrap_projection_alias(&paren.this),
945        _ => expression,
946    }
947}
948
949fn projection_name(expression: &Expression) -> Option<String> {
950    match expression {
951        Expression::Alias(alias) => Some(alias.alias.name.clone()),
952        Expression::Column(column) => Some(column.name.name.clone()),
953        Expression::Identifier(identifier) => Some(identifier.name.clone()),
954        Expression::Star(_) => Some("*".to_string()),
955        Expression::Annotated(annotated) => projection_name(&annotated.this),
956        _ => None,
957    }
958}
959
960fn projection_is_star(expression: &Expression) -> bool {
961    matches!(expression, Expression::Star(_))
962        || matches!(expression, Expression::Column(column) if column.name.name == "*")
963}
964
965fn projection_star_table(expression: &Expression) -> Option<String> {
966    match expression {
967        Expression::Star(star) => star
968            .table
969            .as_ref()
970            .map(|identifier| identifier.name.clone()),
971        Expression::Column(column) if column.name.name == "*" => column
972            .table
973            .as_ref()
974            .map(|identifier| identifier.name.clone()),
975        _ => None,
976    }
977}
978
979fn transform_kind(expression: &Expression) -> TransformKind {
980    if projection_is_star(expression) {
981        TransformKind::Star
982    } else if is_cast_expression(expression) {
983        TransformKind::Cast
984    } else if contains_aggregate(expression) {
985        TransformKind::Aggregation
986    } else if matches!(
987        expression,
988        Expression::Column(_) | Expression::Identifier(_)
989    ) {
990        TransformKind::Direct
991    } else if is_simple_constant(expression) {
992        TransformKind::Constant
993    } else {
994        TransformKind::Expression
995    }
996}
997
998fn is_cast_expression(expression: &Expression) -> bool {
999    matches!(
1000        expression,
1001        Expression::Cast(_) | Expression::TryCast(_) | Expression::SafeCast(_)
1002    )
1003}
1004
1005fn cast_type(expression: &Expression, dialect: DialectType) -> Option<String> {
1006    match expression {
1007        Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1008            render_data_type(&cast.to, dialect)
1009        }
1010        _ => None,
1011    }
1012}
1013
1014fn render_data_type(data_type: &DataType, dialect: DialectType) -> Option<String> {
1015    Dialect::get(dialect)
1016        .generate(&Expression::DataType(data_type.clone()))
1017        .ok()
1018}
1019
1020fn is_simple_constant(expression: &Expression) -> bool {
1021    match expression {
1022        Expression::Literal(_) | Expression::Boolean(_) | Expression::Null(_) => true,
1023        Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1024            is_simple_constant(&cast.this)
1025        }
1026        Expression::Neg(unary) | Expression::BitwiseNot(unary) => is_simple_constant(&unary.this),
1027        _ => false,
1028    }
1029}
1030
1031fn projection_nullability(
1032    expression: &Expression,
1033    scope: &Scope,
1034    context: &NullabilityContext<'_>,
1035) -> ProjectionNullability {
1036    match expression {
1037        Expression::Alias(alias) => projection_nullability(&alias.this, scope, context),
1038        Expression::Annotated(annotated) => projection_nullability(&annotated.this, scope, context),
1039        Expression::Paren(paren) => projection_nullability(&paren.this, scope, context),
1040        Expression::Literal(_) | Expression::Boolean(_) => ProjectionNullability::NonNull,
1041        Expression::Null(_) => ProjectionNullability::Nullable,
1042        Expression::Count(_) | Expression::CountIf(_) => ProjectionNullability::NonNull,
1043        Expression::Cast(cast) => projection_nullability(&cast.this, scope, context),
1044        Expression::TryCast(_) | Expression::SafeCast(_) => ProjectionNullability::Unknown,
1045        Expression::Column(column) => column_nullability(
1046            &column.name.name,
1047            column.table.as_ref().map(|table| table.name.as_str()),
1048            scope,
1049            context,
1050        ),
1051        Expression::Identifier(identifier) => {
1052            column_nullability(&identifier.name, None, scope, context)
1053        }
1054        Expression::Coalesce(func) => coalesce_nullability(&func.expressions, scope, context),
1055        _ => ProjectionNullability::Unknown,
1056    }
1057}
1058
1059fn column_nullability(
1060    column_name: &str,
1061    source_name: Option<&str>,
1062    scope: &Scope,
1063    context: &NullabilityContext<'_>,
1064) -> ProjectionNullability {
1065    let resolved_source_name = source_name
1066        .map(str::to_string)
1067        .or_else(|| single_scope_source_name(scope));
1068
1069    if let Some(source_name) = &resolved_source_name {
1070        if context
1071            .nullable_sources
1072            .contains(&normalize_lookup_name(source_name))
1073        {
1074            return ProjectionNullability::Nullable;
1075        }
1076    }
1077
1078    let Some(schema) = context.schema else {
1079        return ProjectionNullability::Unknown;
1080    };
1081
1082    let table_name = resolved_source_name
1083        .as_ref()
1084        .and_then(|name| scope.sources.get(name).and_then(source_table_name))
1085        .or(resolved_source_name);
1086
1087    let Some(table_name) = table_name else {
1088        return ProjectionNullability::Unknown;
1089    };
1090
1091    match schema.column(&table_name, column_name) {
1092        Some(info) if info.primary_key || info.nullable == Some(false) => {
1093            ProjectionNullability::NonNull
1094        }
1095        Some(info) if info.nullable == Some(true) => ProjectionNullability::Nullable,
1096        Some(_) | None => ProjectionNullability::Unknown,
1097    }
1098}
1099
1100fn single_scope_source_name(scope: &Scope) -> Option<String> {
1101    if scope.sources.len() == 1 {
1102        scope.sources.keys().next().cloned()
1103    } else {
1104        None
1105    }
1106}
1107
1108fn coalesce_nullability(
1109    expressions: &[Expression],
1110    scope: &Scope,
1111    context: &NullabilityContext<'_>,
1112) -> ProjectionNullability {
1113    if expressions.is_empty() {
1114        return ProjectionNullability::Unknown;
1115    }
1116
1117    let mut all_nullable = true;
1118
1119    for expression in expressions {
1120        match projection_nullability(unwrap_projection_alias(expression), scope, context) {
1121            ProjectionNullability::NonNull => return ProjectionNullability::NonNull,
1122            ProjectionNullability::Nullable => {}
1123            ProjectionNullability::Unknown => all_nullable = false,
1124        }
1125    }
1126
1127    if all_nullable {
1128        ProjectionNullability::Nullable
1129    } else {
1130        ProjectionNullability::Unknown
1131    }
1132}
1133
1134fn terminal_references_from_lineage(node: &LineageNode) -> Vec<ColumnReferenceFact> {
1135    let mut refs = Vec::new();
1136    collect_terminal_references(node, &mut refs);
1137    dedupe_column_refs(refs)
1138}
1139
1140fn collect_terminal_references(node: &LineageNode, refs: &mut Vec<ColumnReferenceFact>) {
1141    if node.downstream.is_empty() {
1142        if let Some(reference) = column_reference_from_lineage_node(node) {
1143            refs.push(reference);
1144        }
1145        return;
1146    }
1147
1148    for child in &node.downstream {
1149        collect_terminal_references(child, refs);
1150    }
1151}
1152
1153fn column_reference_from_lineage_node(node: &LineageNode) -> Option<ColumnReferenceFact> {
1154    match &node.expression {
1155        Expression::Column(column) => {
1156            let source_name = non_empty_string(node.source_name.clone());
1157            let table =
1158                lineage_node_table(node).or_else(|| column.table.as_ref().map(|t| t.name.clone()));
1159            let confidence = if node.source_kind == SourceKind::Unknown && source_name.is_none() {
1160                ReferenceConfidence::Unknown
1161            } else {
1162                ReferenceConfidence::Resolved
1163            };
1164            Some(ColumnReferenceFact {
1165                source_name,
1166                source_alias: node.source_alias.clone(),
1167                source_kind: node.source_kind,
1168                table,
1169                column: column.name.name.clone(),
1170                unqualified: column.table.is_none(),
1171                confidence,
1172            })
1173        }
1174        Expression::Star(_) => Some(ColumnReferenceFact {
1175            source_name: non_empty_string(node.source_name.clone()),
1176            source_alias: node.source_alias.clone(),
1177            source_kind: node.source_kind,
1178            table: lineage_node_table(node),
1179            column: "*".to_string(),
1180            unqualified: true,
1181            confidence: if node.source_kind == SourceKind::Unknown {
1182                ReferenceConfidence::Unknown
1183            } else {
1184                ReferenceConfidence::Resolved
1185            },
1186        }),
1187        _ => None,
1188    }
1189}
1190
1191fn lineage_node_table(node: &LineageNode) -> Option<String> {
1192    match &node.source {
1193        Expression::Table(table) => Some(table_name(table)),
1194        _ => None,
1195    }
1196}
1197
1198fn fallback_column_references(expression: &Expression, scope: &Scope) -> Vec<ColumnReferenceFact> {
1199    let mut refs = Vec::new();
1200    let source_count = scope.sources.len();
1201    let single_source = if source_count == 1 {
1202        scope.sources.iter().next()
1203    } else {
1204        None
1205    };
1206
1207    for column_expr in expression.find_all(|candidate| matches!(candidate, Expression::Column(_))) {
1208        if let Expression::Column(column) = column_expr {
1209            if column.name.name == "*" {
1210                continue;
1211            }
1212            let source = column
1213                .table
1214                .as_ref()
1215                .and_then(|table| scope.sources.get(&table.name));
1216            let (source_name, source_alias, source_kind, table, confidence) =
1217                if let Some(table_identifier) = &column.table {
1218                    if let Some(source) = source {
1219                        (
1220                            Some(table_identifier.name.clone()),
1221                            source.alias.clone(),
1222                            source.kind,
1223                            source_table_name(source)
1224                                .or_else(|| Some(table_identifier.name.clone())),
1225                            ReferenceConfidence::Resolved,
1226                        )
1227                    } else {
1228                        (
1229                            Some(table_identifier.name.clone()),
1230                            None,
1231                            SourceKind::Unknown,
1232                            Some(table_identifier.name.clone()),
1233                            ReferenceConfidence::Unknown,
1234                        )
1235                    }
1236                } else if let Some((name, source)) = single_source {
1237                    (
1238                        Some(name.clone()),
1239                        source.alias.clone(),
1240                        source.kind,
1241                        source_table_name(source).or_else(|| Some(name.clone())),
1242                        ReferenceConfidence::Resolved,
1243                    )
1244                } else if source_count > 1 {
1245                    (
1246                        None,
1247                        None,
1248                        SourceKind::Unknown,
1249                        None,
1250                        ReferenceConfidence::Ambiguous,
1251                    )
1252                } else {
1253                    (
1254                        None,
1255                        None,
1256                        SourceKind::Unknown,
1257                        None,
1258                        ReferenceConfidence::Unknown,
1259                    )
1260                };
1261
1262            refs.push(ColumnReferenceFact {
1263                source_name,
1264                source_alias,
1265                source_kind,
1266                table,
1267                column: column.name.name.clone(),
1268                unqualified: column.table.is_none(),
1269                confidence,
1270            });
1271        }
1272    }
1273
1274    dedupe_column_refs(refs)
1275}
1276
1277fn dedupe_column_refs(refs: Vec<ColumnReferenceFact>) -> Vec<ColumnReferenceFact> {
1278    let mut seen = HashSet::new();
1279    let mut deduped = Vec::new();
1280
1281    for reference in refs {
1282        let key = (
1283            reference.source_name.clone(),
1284            reference.source_alias.clone(),
1285            reference.table.clone(),
1286            reference.column.clone(),
1287            format!("{:?}", reference.source_kind),
1288            reference.unqualified,
1289            format!("{:?}", reference.confidence),
1290        );
1291        if seen.insert(key) {
1292            deduped.push(reference);
1293        }
1294    }
1295
1296    deduped
1297}
1298
1299fn relation_facts(
1300    scope: &Scope,
1301    mapping_schema: Option<&crate::schema::MappingSchema>,
1302) -> Vec<RelationFact> {
1303    let mut relations = Vec::new();
1304    let mut seen = HashSet::new();
1305    collect_relation_facts(scope, mapping_schema, &mut seen, &mut relations);
1306
1307    relations.sort_by(|left, right| {
1308        left.name
1309            .cmp(&right.name)
1310            .then_with(|| left.alias.cmp(&right.alias))
1311    });
1312    relations
1313}
1314
1315fn collect_relation_facts(
1316    scope: &Scope,
1317    mapping_schema: Option<&crate::schema::MappingSchema>,
1318    seen: &mut HashSet<String>,
1319    relations: &mut Vec<RelationFact>,
1320) {
1321    for relation in scope.sources.iter().map(|(source_name, source)| {
1322        let identity = source_table_identity(source);
1323        RelationFact {
1324            name: source
1325                .lineage_name
1326                .clone()
1327                .or_else(|| identity.as_ref().map(|identity| identity.name.clone()))
1328                .unwrap_or_else(|| source_name.clone()),
1329            alias: source.alias.clone().or_else(|| source_alias(source)),
1330            kind: source.kind,
1331            columns: source_columns(source, mapping_schema),
1332            catalog: identity
1333                .as_ref()
1334                .and_then(|identity| identity.catalog.clone()),
1335            schema: identity
1336                .as_ref()
1337                .and_then(|identity| identity.schema.clone()),
1338            table: identity
1339                .as_ref()
1340                .and_then(|identity| identity.table.clone()),
1341        }
1342    }) {
1343        let key = format!("{:?}|{}|{:?}", relation.kind, relation.name, relation.alias);
1344        if seen.insert(key) {
1345            relations.push(relation);
1346        }
1347    }
1348
1349    for branch_scope in &scope.union_scopes {
1350        collect_relation_facts(branch_scope, mapping_schema, seen, relations);
1351    }
1352}
1353
1354fn base_table_facts(
1355    scope: &Scope,
1356    mapping_schema: Option<&crate::schema::MappingSchema>,
1357) -> Vec<RelationFact> {
1358    let mut relations = Vec::new();
1359    let mut seen = HashSet::new();
1360
1361    collect_base_table_facts(scope, mapping_schema, &mut seen, &mut relations);
1362
1363    relations.sort_by(|left, right| left.name.cmp(&right.name));
1364    relations
1365}
1366
1367fn collect_base_table_facts(
1368    scope: &Scope,
1369    mapping_schema: Option<&crate::schema::MappingSchema>,
1370    seen: &mut HashSet<String>,
1371    relations: &mut Vec<RelationFact>,
1372) {
1373    for source in scope.sources.values() {
1374        if source.kind != SourceKind::Table {
1375            continue;
1376        }
1377
1378        let Some(identity) = source_table_identity(source) else {
1379            continue;
1380        };
1381
1382        if seen.insert(identity.name.clone()) {
1383            relations.push(RelationFact {
1384                name: identity.name,
1385                alias: source.alias.clone().or_else(|| source_alias(source)),
1386                kind: SourceKind::Table,
1387                columns: source_columns(source, mapping_schema),
1388                catalog: identity.catalog,
1389                schema: identity.schema,
1390                table: identity.table,
1391            });
1392        }
1393    }
1394
1395    for child_scope in scope
1396        .cte_scopes
1397        .iter()
1398        .chain(scope.union_scopes.iter())
1399        .chain(scope.table_scopes.iter())
1400        .chain(scope.derived_table_scopes.iter())
1401        .chain(scope.subquery_scopes.iter())
1402    {
1403        collect_base_table_facts(child_scope, mapping_schema, seen, relations);
1404    }
1405}
1406
1407fn source_columns(
1408    source: &SourceInfo,
1409    mapping_schema: Option<&crate::schema::MappingSchema>,
1410) -> Vec<String> {
1411    match &source.expression {
1412        Expression::Table(table) => mapping_schema
1413            .and_then(|schema| schema.column_names(&table_name(table)).ok())
1414            .unwrap_or_default(),
1415        Expression::Select(_)
1416        | Expression::Union(_)
1417        | Expression::Intersect(_)
1418        | Expression::Except(_) => get_output_column_names(&source.expression),
1419        Expression::Subquery(subquery) => get_output_column_names(&subquery.this),
1420        Expression::Cte(cte) if !cte.columns.is_empty() => cte
1421            .columns
1422            .iter()
1423            .map(|column| column.name.clone())
1424            .collect(),
1425        Expression::Cte(cte) => get_output_column_names(&cte.this),
1426        _ => Vec::new(),
1427    }
1428}
1429
1430fn source_table_name(source: &SourceInfo) -> Option<String> {
1431    source_table_identity(source).map(|identity| identity.name)
1432}
1433
1434fn source_alias(source: &SourceInfo) -> Option<String> {
1435    match &source.expression {
1436        Expression::Table(table) => table.alias.as_ref().map(|alias| alias.name.clone()),
1437        Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
1438        _ => None,
1439    }
1440}
1441
1442fn table_name(table: &TableRef) -> String {
1443    let mut parts = Vec::new();
1444    if let Some(catalog) = &table.catalog {
1445        parts.push(catalog.name.clone());
1446    }
1447    if let Some(schema) = &table.schema {
1448        parts.push(schema.name.clone());
1449    }
1450    parts.push(table.name.name.clone());
1451    parts.join(".")
1452}
1453
1454#[derive(Debug, Clone)]
1455struct RelationIdentity {
1456    name: String,
1457    catalog: Option<String>,
1458    schema: Option<String>,
1459    table: Option<String>,
1460}
1461
1462fn source_table_identity(source: &SourceInfo) -> Option<RelationIdentity> {
1463    match &source.expression {
1464        Expression::Table(table) => Some(table_identity(table)),
1465        _ => None,
1466    }
1467}
1468
1469fn table_identity(table: &TableRef) -> RelationIdentity {
1470    RelationIdentity {
1471        name: table_name(table),
1472        catalog: table.catalog.as_ref().map(|catalog| catalog.name.clone()),
1473        schema: table.schema.as_ref().map(|schema| schema.name.clone()),
1474        table: Some(table.name.name.clone()),
1475    }
1476}
1477
1478fn set_operation_facts(
1479    expression: &Expression,
1480    scope: &Scope,
1481    dialect: DialectType,
1482) -> Vec<SetOperationFact> {
1483    let mut facts = Vec::new();
1484    collect_set_operation_facts(expression, scope, dialect, &mut facts);
1485    facts
1486}
1487
1488fn collect_set_operation_facts(
1489    expression: &Expression,
1490    scope: &Scope,
1491    dialect: DialectType,
1492    facts: &mut Vec<SetOperationFact>,
1493) {
1494    match expression {
1495        Expression::Union(union) => {
1496            facts.push(SetOperationFact {
1497                kind: "union".to_string(),
1498                all: union.all,
1499                distinct: union.distinct,
1500                output_columns: get_output_column_names(expression),
1501                branches: set_operation_branches(&union.left, &union.right, scope, dialect),
1502            });
1503            collect_set_operation_facts(&union.left, scope, dialect, facts);
1504            collect_set_operation_facts(&union.right, scope, dialect, facts);
1505        }
1506        Expression::Intersect(intersect) => {
1507            facts.push(SetOperationFact {
1508                kind: "intersect".to_string(),
1509                all: intersect.all,
1510                distinct: intersect.distinct,
1511                output_columns: get_output_column_names(expression),
1512                branches: set_operation_branches(&intersect.left, &intersect.right, scope, dialect),
1513            });
1514            collect_set_operation_facts(&intersect.left, scope, dialect, facts);
1515            collect_set_operation_facts(&intersect.right, scope, dialect, facts);
1516        }
1517        Expression::Except(except) => {
1518            facts.push(SetOperationFact {
1519                kind: "except".to_string(),
1520                all: except.all,
1521                distinct: except.distinct,
1522                output_columns: get_output_column_names(expression),
1523                branches: set_operation_branches(&except.left, &except.right, scope, dialect),
1524            });
1525            collect_set_operation_facts(&except.left, scope, dialect, facts);
1526            collect_set_operation_facts(&except.right, scope, dialect, facts);
1527        }
1528        Expression::Subquery(subquery) => {
1529            collect_set_operation_facts(&subquery.this, scope, dialect, facts);
1530        }
1531        _ => {}
1532    }
1533}
1534
1535fn set_operation_branches(
1536    left: &Expression,
1537    right: &Expression,
1538    scope: &Scope,
1539    dialect: DialectType,
1540) -> Vec<SetOperationBranchFact> {
1541    vec![
1542        SetOperationBranchFact {
1543            index: 0,
1544            projections: projection_facts_for_branch(left, scope, dialect),
1545        },
1546        SetOperationBranchFact {
1547            index: 1,
1548            projections: projection_facts_for_branch(right, scope, dialect),
1549        },
1550    ]
1551}
1552
1553fn projection_facts_for_branch(
1554    expression: &Expression,
1555    root_scope: &Scope,
1556    dialect: DialectType,
1557) -> Vec<ProjectionFact> {
1558    let branch_scope = build_scope(expression);
1559    let scope = if branch_scope.sources.is_empty() {
1560        root_scope
1561    } else {
1562        &branch_scope
1563    };
1564    let nullability_context = NullabilityContext {
1565        schema: None,
1566        nullable_sources: nullable_source_names(expression),
1567    };
1568    projection_facts_for_query(expression, scope, dialect, &nullability_context)
1569}
1570
1571fn non_empty_string(value: String) -> Option<String> {
1572    if value.is_empty() {
1573        None
1574    } else {
1575        Some(value)
1576    }
1577}