Skip to main content

polyglot_sql/
query_analysis.rs

1//! Compact query analysis facts.
2//!
3//! This module intentionally builds on the existing parser, scope builder, type
4//! annotator, and lineage implementation. It is a convenience API: callers that
5//! need the full AST or full lineage graph should continue using those lower
6//! level APIs directly.
7
8use crate::ast_transforms::get_output_column_names;
9use crate::dialects::{Dialect, DialectType};
10use crate::expressions::{DataType, Expression, JoinKind, TableRef, With};
11use crate::lineage::{lineage_by_index_from_expression, LineageNode};
12use crate::optimizer::annotate_types::annotate_types;
13use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
14use crate::schema::{MappingSchema, Schema};
15use crate::scope::{build_scope, Scope, SourceInfo, SourceKind};
16use crate::traversal::{contains_aggregate, ExpressionWalk};
17use crate::validation::{mapping_schema_from_validation_schema, ValidationSchema};
18use crate::{parse_data_type, parse_one, Error, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21
22/// Options for [`analyze_query`].
23#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24#[serde(rename_all = "camelCase", default)]
25pub struct AnalyzeQueryOptions {
26    /// SQL dialect used for parsing and dialect-aware rendering.
27    pub dialect: DialectType,
28    /// Optional validation schema used for qualification and type annotation.
29    pub schema: Option<ValidationSchema>,
30}
31
32/// Compact facts about a query's output shape and data dependencies.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct QueryAnalysis {
36    pub shape: QueryShape,
37    pub ctes: Vec<String>,
38    pub cte_facts: Vec<CteFact>,
39    pub projections: Vec<ProjectionFact>,
40    pub relations: Vec<RelationFact>,
41    pub base_tables: Vec<RelationFact>,
42    pub star_projections: Vec<StarProjectionFact>,
43    pub set_operations: Vec<SetOperationFact>,
44}
45
46/// Top-level query shape.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum QueryShape {
50    Select,
51    SetOperation,
52}
53
54/// Compact fact about one output projection.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(rename_all = "camelCase")]
57pub struct ProjectionFact {
58    pub index: usize,
59    pub name: Option<String>,
60    pub is_star: bool,
61    pub star_table: Option<String>,
62    pub transform_kind: TransformKind,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub transform_function: Option<TransformFunctionFact>,
65    pub cast_type: Option<String>,
66    pub type_hint: Option<String>,
67    pub nullability: ProjectionNullability,
68    pub upstream: Vec<ColumnReferenceFact>,
69}
70
71/// Compact fact about a function-like projection transform.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct TransformFunctionFact {
75    pub name: String,
76    pub literal_args: Vec<String>,
77    pub column_args: Vec<ColumnReferenceFact>,
78}
79
80/// Compact fact about one top-level CTE definition.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct CteFact {
84    pub name: String,
85    pub columns: Vec<String>,
86    pub body_sql: String,
87    pub output_columns: Vec<String>,
88}
89
90/// Compact fact about one original star projection.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "camelCase")]
93pub struct StarProjectionFact {
94    pub index: usize,
95    pub table: Option<String>,
96    pub expanded_columns: Vec<String>,
97}
98
99/// Compact fact about an upstream column reference.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "camelCase")]
102pub struct ColumnReferenceFact {
103    pub source_name: Option<String>,
104    pub source_alias: Option<String>,
105    pub source_kind: SourceKind,
106    pub table: Option<String>,
107    pub column: String,
108    pub unqualified: bool,
109    pub confidence: ReferenceConfidence,
110}
111
112/// Compact fact about a relation visible in the root scope.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct RelationFact {
116    pub name: String,
117    pub alias: Option<String>,
118    pub kind: SourceKind,
119    pub columns: Vec<String>,
120    pub catalog: Option<String>,
121    pub schema: Option<String>,
122    pub table: Option<String>,
123}
124
125/// Compact fact about a set operation.
126#[derive(Debug, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct SetOperationFact {
129    pub kind: String,
130    pub all: bool,
131    pub distinct: bool,
132    pub output_columns: Vec<String>,
133    pub branches: Vec<SetOperationBranchFact>,
134}
135
136/// Compact facts for one immediate set-operation branch.
137#[derive(Debug, Clone, Serialize, Deserialize)]
138#[serde(rename_all = "camelCase")]
139pub struct SetOperationBranchFact {
140    pub index: usize,
141    pub projections: Vec<ProjectionFact>,
142}
143
144/// High-level kind of transformation performed by a projection.
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub enum TransformKind {
148    Direct,
149    Cast,
150    Aggregation,
151    Constant,
152    Expression,
153    Star,
154}
155
156/// Confidence level for a compact upstream column reference.
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum ReferenceConfidence {
160    Resolved,
161    Ambiguous,
162    Unknown,
163}
164
165/// Conservative nullability classification for one output projection.
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum ProjectionNullability {
169    NonNull,
170    Nullable,
171    Unknown,
172}
173
174/// Analyze a single SELECT or set-operation query.
175pub fn analyze_query(sql: &str, options: AnalyzeQueryOptions) -> Result<QueryAnalysis> {
176    let mut expression = parse_one(sql, options.dialect)?;
177    expression = effective_query(expression);
178    ensure_query(&expression)?;
179    let original_expression = expression.clone();
180
181    let mapping_schema = options
182        .schema
183        .as_ref()
184        .map(|schema| analysis_mapping_schema(schema, options.dialect));
185    let schema_info = options.schema.as_ref().map(AnalysisSchemaInfo::from_schema);
186    let cte_facts = top_level_cte_facts(&original_expression, options.dialect)?;
187    let star_projections = star_projection_facts(&original_expression, mapping_schema.as_ref());
188
189    if let Some(schema) = mapping_schema.as_ref() {
190        let qualify_options = QualifyColumnsOptions::new().with_dialect(options.dialect);
191        expression = qualify_columns(expression, schema, &qualify_options)
192            .map_err(|e| Error::internal(format!("query analysis qualification failed: {e}")))?;
193    }
194
195    let annotation_schema = mapping_schema.as_ref().map(|schema| {
196        let mut alias_schema = schema.clone();
197        add_scope_aliases_to_schema(
198            &build_scope(&expression),
199            schema,
200            &mut alias_schema,
201            options.dialect,
202        );
203        alias_schema
204    });
205
206    annotate_types(
207        &mut expression,
208        annotation_schema
209            .as_ref()
210            .map(|schema| schema as &dyn Schema),
211        Some(options.dialect),
212    );
213    crate::lineage::expand_cte_stars(
214        &mut expression,
215        annotation_schema
216            .as_ref()
217            .or(mapping_schema.as_ref())
218            .map(|schema| schema as &dyn Schema),
219    );
220
221    let scope = build_scope(&expression);
222    let nullability_context = NullabilityContext {
223        schema: schema_info.as_ref(),
224        nullable_sources: nullable_source_names(&expression),
225    };
226    let shape = if is_set_operation(&expression) {
227        QueryShape::SetOperation
228    } else {
229        QueryShape::Select
230    };
231
232    Ok(QueryAnalysis {
233        shape,
234        ctes: collect_cte_names(&expression),
235        cte_facts,
236        projections: projection_facts_for_query(
237            &expression,
238            &scope,
239            options.dialect,
240            &nullability_context,
241        ),
242        relations: relation_facts(&scope, mapping_schema.as_ref()),
243        base_tables: base_table_facts(&scope, mapping_schema.as_ref()),
244        star_projections,
245        set_operations: set_operation_facts(&expression, &scope, options.dialect),
246    })
247}
248
249fn analysis_mapping_schema(schema: &ValidationSchema, dialect: DialectType) -> MappingSchema {
250    let broad_schema = mapping_schema_from_validation_schema(schema);
251    let mut mapping_schema = MappingSchema::with_dialect(dialect);
252
253    for table in &schema.tables {
254        let table_names = validation_table_names(table);
255        if table_names.is_empty() {
256            continue;
257        }
258
259        let fallback_table = table_names[0].as_str();
260        let columns: Vec<(String, DataType)> = table
261            .columns
262            .iter()
263            .map(|column| {
264                let data_type = parse_analysis_data_type(&column.data_type, dialect)
265                    .unwrap_or_else(|| {
266                        broad_schema
267                            .get_column_type(fallback_table, &column.name)
268                            .unwrap_or(DataType::Unknown)
269                    });
270                (column.name.to_ascii_lowercase(), data_type)
271            })
272            .collect();
273
274        for table_name in table_names {
275            let _ = mapping_schema.add_table(&table_name, &columns, Some(dialect));
276        }
277    }
278
279    mapping_schema
280}
281
282fn validation_table_names(table: &crate::validation::SchemaTable) -> Vec<String> {
283    let mut names = Vec::new();
284
285    names.push(table.name.to_ascii_lowercase());
286    if let Some(schema_name) = &table.schema {
287        names.push(format!(
288            "{}.{}",
289            schema_name.to_ascii_lowercase(),
290            table.name.to_ascii_lowercase()
291        ));
292    }
293    for alias in &table.aliases {
294        names.push(alias.to_ascii_lowercase());
295    }
296
297    names.sort();
298    names.dedup();
299    names
300}
301
302fn parse_analysis_data_type(data_type: &str, dialect: DialectType) -> Option<DataType> {
303    let trimmed = data_type.trim();
304    if trimmed.is_empty() {
305        return None;
306    }
307    parse_data_type(trimmed, dialect).ok()
308}
309
310fn add_scope_aliases_to_schema(
311    scope: &Scope,
312    source_schema: &MappingSchema,
313    target_schema: &mut MappingSchema,
314    dialect: DialectType,
315) {
316    for child_scope in scope.traverse() {
317        for (source_name, source) in &child_scope.sources {
318            if source.kind != SourceKind::Table {
319                continue;
320            }
321            if let Some(table_name) = source_table_name(source) {
322                if source_name == &table_name {
323                    continue;
324                }
325                if let Ok(column_names) = source_schema.column_names(&table_name) {
326                    let columns: Vec<(String, DataType)> = column_names
327                        .iter()
328                        .map(|column| {
329                            (
330                                column.clone(),
331                                source_schema
332                                    .get_column_type(&table_name, column)
333                                    .unwrap_or(DataType::Unknown),
334                            )
335                        })
336                        .collect();
337                    let _ = target_schema.add_table(source_name, &columns, Some(dialect));
338                }
339            }
340        }
341    }
342}
343
344#[derive(Debug, Clone)]
345struct AnalysisColumnInfo {
346    nullable: Option<bool>,
347    primary_key: bool,
348}
349
350#[derive(Debug, Clone)]
351struct AnalysisSchemaInfo {
352    columns: HashMap<(String, String), AnalysisColumnInfo>,
353}
354
355impl AnalysisSchemaInfo {
356    fn from_schema(schema: &ValidationSchema) -> Self {
357        let mut columns = HashMap::new();
358
359        for table in &schema.tables {
360            let table_names = validation_table_names(table);
361            let primary_keys: HashSet<String> = table
362                .primary_key
363                .iter()
364                .map(|column| column.to_ascii_lowercase())
365                .collect();
366
367            for column in &table.columns {
368                let info = AnalysisColumnInfo {
369                    nullable: column.nullable,
370                    primary_key: column.primary_key
371                        || primary_keys.contains(&column.name.to_ascii_lowercase()),
372                };
373
374                for table_name in &table_names {
375                    columns.insert(
376                        (
377                            normalize_lookup_name(table_name),
378                            normalize_lookup_name(&column.name),
379                        ),
380                        info.clone(),
381                    );
382                }
383            }
384        }
385
386        Self { columns }
387    }
388
389    fn column(&self, table: &str, column: &str) -> Option<&AnalysisColumnInfo> {
390        self.columns
391            .get(&(normalize_lookup_name(table), normalize_lookup_name(column)))
392    }
393}
394
395struct NullabilityContext<'a> {
396    schema: Option<&'a AnalysisSchemaInfo>,
397    nullable_sources: HashSet<String>,
398}
399
400fn top_level_cte_facts(expression: &Expression, dialect: DialectType) -> Result<Vec<CteFact>> {
401    let Some(with_clause) = with_clause(expression) else {
402        return Ok(Vec::new());
403    };
404
405    with_clause
406        .ctes
407        .iter()
408        .map(|cte| {
409            Ok(CteFact {
410                name: cte.alias.name.clone(),
411                columns: cte
412                    .columns
413                    .iter()
414                    .map(|column| column.name.clone())
415                    .collect(),
416                body_sql: Dialect::get(dialect).generate(&cte.this)?,
417                output_columns: get_output_column_names(&cte.this),
418            })
419        })
420        .collect()
421}
422
423fn star_projection_facts(
424    expression: &Expression,
425    mapping_schema: Option<&MappingSchema>,
426) -> Vec<StarProjectionFact> {
427    let scope = build_scope(expression);
428    let ordered_sources = ordered_source_names_for_query(expression);
429
430    select_expressions_for_query(expression)
431        .iter()
432        .enumerate()
433        .filter_map(|(index, projection)| {
434            let inner = unwrap_projection_alias(projection);
435            if !projection_is_star(inner) {
436                return None;
437            }
438
439            let table = projection_star_table(inner);
440            let expanded_columns =
441                expanded_star_columns(table.as_deref(), &scope, &ordered_sources, mapping_schema);
442
443            Some(StarProjectionFact {
444                index,
445                table,
446                expanded_columns,
447            })
448        })
449        .collect()
450}
451
452fn expanded_star_columns(
453    star_table: Option<&str>,
454    scope: &Scope,
455    ordered_sources: &[String],
456    mapping_schema: Option<&MappingSchema>,
457) -> Vec<String> {
458    let mut columns = Vec::new();
459    let mut source_names: Vec<String> = if ordered_sources.is_empty() {
460        let mut names: Vec<_> = scope.sources.keys().cloned().collect();
461        names.sort();
462        names
463    } else {
464        ordered_sources.to_vec()
465    };
466
467    source_names.dedup();
468
469    for source_name in source_names {
470        let Some(source) = scope.sources.get(&source_name) else {
471            continue;
472        };
473
474        if let Some(star_table) = star_table {
475            let matches = source_name.eq_ignore_ascii_case(star_table)
476                || source
477                    .alias
478                    .as_deref()
479                    .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
480                || source_table_name(source)
481                    .is_some_and(|table| table.eq_ignore_ascii_case(star_table));
482
483            if !matches {
484                continue;
485            }
486        }
487
488        columns.extend(source_columns(source, mapping_schema));
489    }
490
491    columns
492}
493
494fn ordered_source_names_for_query(expression: &Expression) -> Vec<String> {
495    match expression {
496        Expression::Select(select) => ordered_source_names_for_select(select),
497        Expression::Union(union) => ordered_source_names_for_query(&union.left),
498        Expression::Intersect(intersect) => ordered_source_names_for_query(&intersect.left),
499        Expression::Except(except) => ordered_source_names_for_query(&except.left),
500        Expression::Subquery(subquery) => ordered_source_names_for_query(&subquery.this),
501        _ => Vec::new(),
502    }
503}
504
505fn ordered_source_names_for_select(select: &crate::expressions::Select) -> Vec<String> {
506    let mut sources = Vec::new();
507
508    if let Some(from) = &select.from {
509        for expression in &from.expressions {
510            if let Some(source_name) = expression_source_name(expression) {
511                sources.push(source_name);
512            }
513        }
514    }
515
516    for join in &select.joins {
517        if let Some(source_name) = expression_source_name(&join.this) {
518            sources.push(source_name);
519        }
520    }
521
522    sources
523}
524
525fn nullable_source_names(expression: &Expression) -> HashSet<String> {
526    match expression {
527        Expression::Select(select) => nullable_source_names_for_select(select),
528        Expression::Union(union) => nullable_source_names(&union.left),
529        Expression::Intersect(intersect) => nullable_source_names(&intersect.left),
530        Expression::Except(except) => nullable_source_names(&except.left),
531        Expression::Subquery(subquery) => nullable_source_names(&subquery.this),
532        _ => HashSet::new(),
533    }
534}
535
536fn nullable_source_names_for_select(select: &crate::expressions::Select) -> HashSet<String> {
537    let mut nullable = HashSet::new();
538    let mut left_sources = Vec::new();
539
540    if let Some(from) = &select.from {
541        for expression in &from.expressions {
542            if let Some(source_name) = expression_source_name(expression) {
543                left_sources.push(source_name);
544            }
545        }
546    }
547
548    for join in &select.joins {
549        let right_source = expression_source_name(&join.this);
550
551        if join_nullable_left(join.kind) {
552            for source_name in &left_sources {
553                nullable.insert(normalize_lookup_name(source_name));
554            }
555        }
556
557        if join_nullable_right(join.kind) {
558            if let Some(source_name) = &right_source {
559                nullable.insert(normalize_lookup_name(source_name));
560            }
561        }
562
563        if let Some(source_name) = right_source {
564            left_sources.push(source_name);
565        }
566    }
567
568    nullable
569}
570
571fn join_nullable_left(kind: JoinKind) -> bool {
572    matches!(
573        kind,
574        JoinKind::Right
575            | JoinKind::NaturalRight
576            | JoinKind::AsOfRight
577            | JoinKind::Full
578            | JoinKind::NaturalFull
579            | JoinKind::Outer
580    )
581}
582
583fn join_nullable_right(kind: JoinKind) -> bool {
584    matches!(
585        kind,
586        JoinKind::Left
587            | JoinKind::NaturalLeft
588            | JoinKind::AsOfLeft
589            | JoinKind::LeftLateral
590            | JoinKind::OuterApply
591            | JoinKind::LeftArray
592            | JoinKind::Full
593            | JoinKind::NaturalFull
594            | JoinKind::Outer
595    )
596}
597
598fn expression_source_name(expression: &Expression) -> Option<String> {
599    match expression {
600        Expression::Table(table) => table
601            .alias
602            .as_ref()
603            .map(|alias| alias.name.clone())
604            .or_else(|| Some(table.name.name.clone())),
605        Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
606        Expression::Alias(alias) => Some(alias.alias.name.clone()),
607        Expression::Cte(cte) => Some(cte.alias.name.clone()),
608        _ => None,
609    }
610}
611
612fn normalize_lookup_name(name: &str) -> String {
613    name.to_ascii_lowercase()
614}
615
616fn effective_query(expression: Expression) -> Expression {
617    match expression {
618        Expression::Prepare(prepare) => prepare.statement,
619        Expression::Subquery(subquery) if subquery.alias.is_none() => subquery.this,
620        other => other,
621    }
622}
623
624fn ensure_query(expression: &Expression) -> Result<()> {
625    if matches!(
626        expression,
627        Expression::Select(_)
628            | Expression::Union(_)
629            | Expression::Intersect(_)
630            | Expression::Except(_)
631    ) {
632        Ok(())
633    } else {
634        Err(Error::internal(
635            "analyze_query requires a SELECT or set operation query",
636        ))
637    }
638}
639
640fn is_set_operation(expression: &Expression) -> bool {
641    matches!(
642        expression,
643        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
644    )
645}
646
647fn collect_cte_names(expression: &Expression) -> Vec<String> {
648    let mut names = Vec::new();
649    let mut seen = HashSet::new();
650    collect_cte_names_inner(expression, &mut names, &mut seen);
651    names
652}
653
654fn collect_cte_names_inner(
655    expression: &Expression,
656    names: &mut Vec<String>,
657    seen: &mut HashSet<String>,
658) {
659    if let Some(with_clause) = with_clause(expression) {
660        collect_with_names(with_clause, names, seen);
661    }
662
663    match expression {
664        Expression::Union(union) => {
665            collect_cte_names_inner(&union.left, names, seen);
666            collect_cte_names_inner(&union.right, names, seen);
667        }
668        Expression::Intersect(intersect) => {
669            collect_cte_names_inner(&intersect.left, names, seen);
670            collect_cte_names_inner(&intersect.right, names, seen);
671        }
672        Expression::Except(except) => {
673            collect_cte_names_inner(&except.left, names, seen);
674            collect_cte_names_inner(&except.right, names, seen);
675        }
676        Expression::Subquery(subquery) => collect_cte_names_inner(&subquery.this, names, seen),
677        _ => {}
678    }
679}
680
681fn collect_with_names(with_clause: &With, names: &mut Vec<String>, seen: &mut HashSet<String>) {
682    for cte in &with_clause.ctes {
683        if seen.insert(cte.alias.name.clone()) {
684            names.push(cte.alias.name.clone());
685        }
686        collect_cte_names_inner(&cte.this, names, seen);
687    }
688}
689
690fn with_clause(expression: &Expression) -> Option<&With> {
691    match expression {
692        Expression::Select(select) => select.with.as_ref(),
693        Expression::Union(union) => union.with.as_ref(),
694        Expression::Intersect(intersect) => intersect.with.as_ref(),
695        Expression::Except(except) => except.with.as_ref(),
696        _ => None,
697    }
698}
699
700fn projection_facts_for_query(
701    expression: &Expression,
702    scope: &Scope,
703    dialect: DialectType,
704    nullability_context: &NullabilityContext<'_>,
705) -> Vec<ProjectionFact> {
706    let expressions = select_expressions_for_query(expression);
707    let names = get_output_column_names(expression);
708
709    expressions
710        .iter()
711        .enumerate()
712        .map(|(index, projection)| {
713            projection_fact(
714                index,
715                names
716                    .get(index)
717                    .cloned()
718                    .or_else(|| projection_name(projection)),
719                projection,
720                expression,
721                scope,
722                dialect,
723                nullability_context,
724            )
725        })
726        .collect()
727}
728
729fn select_expressions_for_query(expression: &Expression) -> Vec<&Expression> {
730    match expression {
731        Expression::Select(select) => select.expressions.iter().collect(),
732        Expression::Union(union) => select_expressions_for_query(&union.left),
733        Expression::Intersect(intersect) => select_expressions_for_query(&intersect.left),
734        Expression::Except(except) => select_expressions_for_query(&except.left),
735        Expression::Subquery(subquery) => select_expressions_for_query(&subquery.this),
736        _ => Vec::new(),
737    }
738}
739
740fn projection_fact(
741    index: usize,
742    name: Option<String>,
743    projection: &Expression,
744    query: &Expression,
745    scope: &Scope,
746    dialect: DialectType,
747    nullability_context: &NullabilityContext<'_>,
748) -> ProjectionFact {
749    let inner = unwrap_projection_alias(projection);
750    let is_star = projection_is_star(inner);
751    let upstream = lineage_by_index_from_expression(index, query, Some(dialect), false)
752        .map(|node| terminal_references_from_lineage(&node))
753        .ok()
754        .filter(|refs| !refs.is_empty())
755        .unwrap_or_else(|| fallback_column_references(inner, scope));
756
757    ProjectionFact {
758        index,
759        name,
760        is_star,
761        star_table: projection_star_table(inner),
762        transform_kind: transform_kind(inner),
763        transform_function: transform_function_fact(inner, scope, dialect),
764        cast_type: cast_type(inner, dialect),
765        type_hint: projection
766            .inferred_type()
767            .or_else(|| inner.inferred_type())
768            .and_then(|data_type| render_data_type(data_type, dialect)),
769        nullability: projection_nullability(inner, scope, nullability_context),
770        upstream,
771    }
772}
773
774fn transform_function_fact(
775    expression: &Expression,
776    scope: &Scope,
777    dialect: DialectType,
778) -> Option<TransformFunctionFact> {
779    match expression {
780        Expression::Function(function) => Some(transform_function_from_args(
781            &function.name,
782            &function.args,
783            scope,
784            dialect,
785        )),
786        Expression::AggregateFunction(function) => Some(transform_function_from_args(
787            &function.name,
788            &function.args,
789            scope,
790            dialect,
791        )),
792        Expression::WindowFunction(function) => {
793            transform_function_fact(&function.this, scope, dialect)
794        }
795        Expression::DateTrunc(function) => Some(transform_function_from_parts(
796            "DATE_TRUNC",
797            vec![datetime_field_name(&function.unit)],
798            vec![&function.this],
799            scope,
800            dialect,
801        )),
802        Expression::TimestampTrunc(function) => Some(transform_function_from_parts(
803            "TIMESTAMP_TRUNC",
804            vec![datetime_field_name(&function.unit)],
805            vec![&function.this],
806            scope,
807            dialect,
808        )),
809        Expression::TimeTrunc(function) => {
810            let mut args = vec![function.this.as_ref()];
811            if let Some(zone) = function.zone.as_deref() {
812                args.push(zone);
813            }
814            Some(transform_function_from_parts(
815                "TIME_TRUNC",
816                vec![function.unit.clone()],
817                args,
818                scope,
819                dialect,
820            ))
821        }
822        Expression::Extract(function) => Some(transform_function_from_parts(
823            "EXTRACT",
824            vec![datetime_field_name(&function.field)],
825            vec![&function.this],
826            scope,
827            dialect,
828        )),
829        Expression::DateAdd(function) => Some(transform_function_from_parts(
830            "DATE_ADD",
831            Vec::new(),
832            vec![&function.this, &function.interval],
833            scope,
834            dialect,
835        )),
836        Expression::DateSub(function) => Some(transform_function_from_parts(
837            "DATE_SUB",
838            Vec::new(),
839            vec![&function.this, &function.interval],
840            scope,
841            dialect,
842        )),
843        Expression::DateDiff(function) => Some(transform_function_from_parts(
844            "DATE_DIFF",
845            Vec::new(),
846            vec![&function.this, &function.expression],
847            scope,
848            dialect,
849        )),
850        _ => None,
851    }
852}
853
854fn transform_function_from_args(
855    name: &str,
856    args: &[Expression],
857    scope: &Scope,
858    dialect: DialectType,
859) -> TransformFunctionFact {
860    let literal_args = args
861        .iter()
862        .filter_map(|arg| literal_argument(arg, dialect))
863        .collect();
864    transform_function_from_parts(name, literal_args, args.iter().collect(), scope, dialect)
865}
866
867fn transform_function_from_parts(
868    name: &str,
869    literal_args: Vec<String>,
870    args: Vec<&Expression>,
871    scope: &Scope,
872    _dialect: DialectType,
873) -> TransformFunctionFact {
874    let column_args = dedupe_column_refs(
875        args.into_iter()
876            .flat_map(|arg| fallback_column_references(arg, scope))
877            .collect(),
878    );
879
880    TransformFunctionFact {
881        name: name.to_string(),
882        literal_args,
883        column_args,
884    }
885}
886
887fn literal_argument(expression: &Expression, dialect: DialectType) -> Option<String> {
888    match expression {
889        Expression::Literal(literal) => Some(literal.value_str().to_string()),
890        Expression::Boolean(boolean) => Some(boolean.value.to_string()),
891        Expression::Null(_) => Some("NULL".to_string()),
892        Expression::Identifier(identifier) => Some(identifier.name.clone()),
893        Expression::Var(var) => Some(var.this.clone()),
894        Expression::DataType(data_type) => render_data_type(data_type, dialect),
895        _ => None,
896    }
897}
898
899fn datetime_field_name(field: &crate::expressions::DateTimeField) -> String {
900    match field {
901        crate::expressions::DateTimeField::Year => "year".to_string(),
902        crate::expressions::DateTimeField::Month => "month".to_string(),
903        crate::expressions::DateTimeField::Day => "day".to_string(),
904        crate::expressions::DateTimeField::Hour => "hour".to_string(),
905        crate::expressions::DateTimeField::Minute => "minute".to_string(),
906        crate::expressions::DateTimeField::Second => "second".to_string(),
907        crate::expressions::DateTimeField::Millisecond => "millisecond".to_string(),
908        crate::expressions::DateTimeField::Microsecond => "microsecond".to_string(),
909        crate::expressions::DateTimeField::DayOfWeek => "day_of_week".to_string(),
910        crate::expressions::DateTimeField::DayOfYear => "day_of_year".to_string(),
911        crate::expressions::DateTimeField::Week => "week".to_string(),
912        crate::expressions::DateTimeField::WeekWithModifier(modifier) => {
913            format!("week({modifier})")
914        }
915        crate::expressions::DateTimeField::Quarter => "quarter".to_string(),
916        crate::expressions::DateTimeField::Epoch => "epoch".to_string(),
917        crate::expressions::DateTimeField::Timezone => "timezone".to_string(),
918        crate::expressions::DateTimeField::TimezoneHour => "timezone_hour".to_string(),
919        crate::expressions::DateTimeField::TimezoneMinute => "timezone_minute".to_string(),
920        crate::expressions::DateTimeField::Date => "date".to_string(),
921        crate::expressions::DateTimeField::Time => "time".to_string(),
922        crate::expressions::DateTimeField::Custom(name) => name.clone(),
923    }
924}
925
926fn unwrap_projection_alias(expression: &Expression) -> &Expression {
927    match expression {
928        Expression::Alias(alias) => unwrap_projection_alias(&alias.this),
929        Expression::Annotated(annotated) => unwrap_projection_alias(&annotated.this),
930        Expression::Paren(paren) => unwrap_projection_alias(&paren.this),
931        _ => expression,
932    }
933}
934
935fn projection_name(expression: &Expression) -> Option<String> {
936    match expression {
937        Expression::Alias(alias) => Some(alias.alias.name.clone()),
938        Expression::Column(column) => Some(column.name.name.clone()),
939        Expression::Identifier(identifier) => Some(identifier.name.clone()),
940        Expression::Star(_) => Some("*".to_string()),
941        Expression::Annotated(annotated) => projection_name(&annotated.this),
942        _ => None,
943    }
944}
945
946fn projection_is_star(expression: &Expression) -> bool {
947    matches!(expression, Expression::Star(_))
948        || matches!(expression, Expression::Column(column) if column.name.name == "*")
949}
950
951fn projection_star_table(expression: &Expression) -> Option<String> {
952    match expression {
953        Expression::Star(star) => star
954            .table
955            .as_ref()
956            .map(|identifier| identifier.name.clone()),
957        Expression::Column(column) if column.name.name == "*" => column
958            .table
959            .as_ref()
960            .map(|identifier| identifier.name.clone()),
961        _ => None,
962    }
963}
964
965fn transform_kind(expression: &Expression) -> TransformKind {
966    if projection_is_star(expression) {
967        TransformKind::Star
968    } else if is_cast_expression(expression) {
969        TransformKind::Cast
970    } else if contains_aggregate(expression) {
971        TransformKind::Aggregation
972    } else if matches!(
973        expression,
974        Expression::Column(_) | Expression::Identifier(_)
975    ) {
976        TransformKind::Direct
977    } else if is_simple_constant(expression) {
978        TransformKind::Constant
979    } else {
980        TransformKind::Expression
981    }
982}
983
984fn is_cast_expression(expression: &Expression) -> bool {
985    matches!(
986        expression,
987        Expression::Cast(_) | Expression::TryCast(_) | Expression::SafeCast(_)
988    )
989}
990
991fn cast_type(expression: &Expression, dialect: DialectType) -> Option<String> {
992    match expression {
993        Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
994            render_data_type(&cast.to, dialect)
995        }
996        _ => None,
997    }
998}
999
1000fn render_data_type(data_type: &DataType, dialect: DialectType) -> Option<String> {
1001    Dialect::get(dialect)
1002        .generate(&Expression::DataType(data_type.clone()))
1003        .ok()
1004}
1005
1006fn is_simple_constant(expression: &Expression) -> bool {
1007    match expression {
1008        Expression::Literal(_) | Expression::Boolean(_) | Expression::Null(_) => true,
1009        Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1010            is_simple_constant(&cast.this)
1011        }
1012        Expression::Neg(unary) | Expression::BitwiseNot(unary) => is_simple_constant(&unary.this),
1013        _ => false,
1014    }
1015}
1016
1017fn projection_nullability(
1018    expression: &Expression,
1019    scope: &Scope,
1020    context: &NullabilityContext<'_>,
1021) -> ProjectionNullability {
1022    match expression {
1023        Expression::Alias(alias) => projection_nullability(&alias.this, scope, context),
1024        Expression::Annotated(annotated) => projection_nullability(&annotated.this, scope, context),
1025        Expression::Paren(paren) => projection_nullability(&paren.this, scope, context),
1026        Expression::Literal(_) | Expression::Boolean(_) => ProjectionNullability::NonNull,
1027        Expression::Null(_) => ProjectionNullability::Nullable,
1028        Expression::Count(_) | Expression::CountIf(_) => ProjectionNullability::NonNull,
1029        Expression::Cast(cast) => projection_nullability(&cast.this, scope, context),
1030        Expression::TryCast(_) | Expression::SafeCast(_) => ProjectionNullability::Unknown,
1031        Expression::Column(column) => column_nullability(
1032            &column.name.name,
1033            column.table.as_ref().map(|table| table.name.as_str()),
1034            scope,
1035            context,
1036        ),
1037        Expression::Identifier(identifier) => {
1038            column_nullability(&identifier.name, None, scope, context)
1039        }
1040        Expression::Coalesce(func) => coalesce_nullability(&func.expressions, scope, context),
1041        _ => ProjectionNullability::Unknown,
1042    }
1043}
1044
1045fn column_nullability(
1046    column_name: &str,
1047    source_name: Option<&str>,
1048    scope: &Scope,
1049    context: &NullabilityContext<'_>,
1050) -> ProjectionNullability {
1051    let resolved_source_name = source_name
1052        .map(str::to_string)
1053        .or_else(|| single_scope_source_name(scope));
1054
1055    if let Some(source_name) = &resolved_source_name {
1056        if context
1057            .nullable_sources
1058            .contains(&normalize_lookup_name(source_name))
1059        {
1060            return ProjectionNullability::Nullable;
1061        }
1062    }
1063
1064    let Some(schema) = context.schema else {
1065        return ProjectionNullability::Unknown;
1066    };
1067
1068    let table_name = resolved_source_name
1069        .as_ref()
1070        .and_then(|name| scope.sources.get(name).and_then(source_table_name))
1071        .or(resolved_source_name);
1072
1073    let Some(table_name) = table_name else {
1074        return ProjectionNullability::Unknown;
1075    };
1076
1077    match schema.column(&table_name, column_name) {
1078        Some(info) if info.primary_key || info.nullable == Some(false) => {
1079            ProjectionNullability::NonNull
1080        }
1081        Some(info) if info.nullable == Some(true) => ProjectionNullability::Nullable,
1082        Some(_) | None => ProjectionNullability::Unknown,
1083    }
1084}
1085
1086fn single_scope_source_name(scope: &Scope) -> Option<String> {
1087    if scope.sources.len() == 1 {
1088        scope.sources.keys().next().cloned()
1089    } else {
1090        None
1091    }
1092}
1093
1094fn coalesce_nullability(
1095    expressions: &[Expression],
1096    scope: &Scope,
1097    context: &NullabilityContext<'_>,
1098) -> ProjectionNullability {
1099    if expressions.is_empty() {
1100        return ProjectionNullability::Unknown;
1101    }
1102
1103    let mut all_nullable = true;
1104
1105    for expression in expressions {
1106        match projection_nullability(unwrap_projection_alias(expression), scope, context) {
1107            ProjectionNullability::NonNull => return ProjectionNullability::NonNull,
1108            ProjectionNullability::Nullable => {}
1109            ProjectionNullability::Unknown => all_nullable = false,
1110        }
1111    }
1112
1113    if all_nullable {
1114        ProjectionNullability::Nullable
1115    } else {
1116        ProjectionNullability::Unknown
1117    }
1118}
1119
1120fn terminal_references_from_lineage(node: &LineageNode) -> Vec<ColumnReferenceFact> {
1121    let mut refs = Vec::new();
1122    collect_terminal_references(node, &mut refs);
1123    dedupe_column_refs(refs)
1124}
1125
1126fn collect_terminal_references(node: &LineageNode, refs: &mut Vec<ColumnReferenceFact>) {
1127    if node.downstream.is_empty() {
1128        if let Some(reference) = column_reference_from_lineage_node(node) {
1129            refs.push(reference);
1130        }
1131        return;
1132    }
1133
1134    for child in &node.downstream {
1135        collect_terminal_references(child, refs);
1136    }
1137}
1138
1139fn column_reference_from_lineage_node(node: &LineageNode) -> Option<ColumnReferenceFact> {
1140    match &node.expression {
1141        Expression::Column(column) => {
1142            let source_name = non_empty_string(node.source_name.clone());
1143            let table =
1144                lineage_node_table(node).or_else(|| column.table.as_ref().map(|t| t.name.clone()));
1145            let confidence = if node.source_kind == SourceKind::Unknown && source_name.is_none() {
1146                ReferenceConfidence::Unknown
1147            } else {
1148                ReferenceConfidence::Resolved
1149            };
1150            Some(ColumnReferenceFact {
1151                source_name,
1152                source_alias: node.source_alias.clone(),
1153                source_kind: node.source_kind,
1154                table,
1155                column: column.name.name.clone(),
1156                unqualified: column.table.is_none(),
1157                confidence,
1158            })
1159        }
1160        Expression::Star(_) => Some(ColumnReferenceFact {
1161            source_name: non_empty_string(node.source_name.clone()),
1162            source_alias: node.source_alias.clone(),
1163            source_kind: node.source_kind,
1164            table: lineage_node_table(node),
1165            column: "*".to_string(),
1166            unqualified: true,
1167            confidence: if node.source_kind == SourceKind::Unknown {
1168                ReferenceConfidence::Unknown
1169            } else {
1170                ReferenceConfidence::Resolved
1171            },
1172        }),
1173        _ => None,
1174    }
1175}
1176
1177fn lineage_node_table(node: &LineageNode) -> Option<String> {
1178    match &node.source {
1179        Expression::Table(table) => Some(table_name(table)),
1180        _ => None,
1181    }
1182}
1183
1184fn fallback_column_references(expression: &Expression, scope: &Scope) -> Vec<ColumnReferenceFact> {
1185    let mut refs = Vec::new();
1186    let source_count = scope.sources.len();
1187    let single_source = if source_count == 1 {
1188        scope.sources.iter().next()
1189    } else {
1190        None
1191    };
1192
1193    for column_expr in expression.find_all(|candidate| matches!(candidate, Expression::Column(_))) {
1194        if let Expression::Column(column) = column_expr {
1195            if column.name.name == "*" {
1196                continue;
1197            }
1198            let source = column
1199                .table
1200                .as_ref()
1201                .and_then(|table| scope.sources.get(&table.name));
1202            let (source_name, source_alias, source_kind, table, confidence) =
1203                if let Some(table_identifier) = &column.table {
1204                    if let Some(source) = source {
1205                        (
1206                            Some(table_identifier.name.clone()),
1207                            source.alias.clone(),
1208                            source.kind,
1209                            source_table_name(source)
1210                                .or_else(|| Some(table_identifier.name.clone())),
1211                            ReferenceConfidence::Resolved,
1212                        )
1213                    } else {
1214                        (
1215                            Some(table_identifier.name.clone()),
1216                            None,
1217                            SourceKind::Unknown,
1218                            Some(table_identifier.name.clone()),
1219                            ReferenceConfidence::Unknown,
1220                        )
1221                    }
1222                } else if let Some((name, source)) = single_source {
1223                    (
1224                        Some(name.clone()),
1225                        source.alias.clone(),
1226                        source.kind,
1227                        source_table_name(source).or_else(|| Some(name.clone())),
1228                        ReferenceConfidence::Resolved,
1229                    )
1230                } else if source_count > 1 {
1231                    (
1232                        None,
1233                        None,
1234                        SourceKind::Unknown,
1235                        None,
1236                        ReferenceConfidence::Ambiguous,
1237                    )
1238                } else {
1239                    (
1240                        None,
1241                        None,
1242                        SourceKind::Unknown,
1243                        None,
1244                        ReferenceConfidence::Unknown,
1245                    )
1246                };
1247
1248            refs.push(ColumnReferenceFact {
1249                source_name,
1250                source_alias,
1251                source_kind,
1252                table,
1253                column: column.name.name.clone(),
1254                unqualified: column.table.is_none(),
1255                confidence,
1256            });
1257        }
1258    }
1259
1260    dedupe_column_refs(refs)
1261}
1262
1263fn dedupe_column_refs(refs: Vec<ColumnReferenceFact>) -> Vec<ColumnReferenceFact> {
1264    let mut seen = HashSet::new();
1265    let mut deduped = Vec::new();
1266
1267    for reference in refs {
1268        let key = (
1269            reference.source_name.clone(),
1270            reference.source_alias.clone(),
1271            reference.table.clone(),
1272            reference.column.clone(),
1273            format!("{:?}", reference.source_kind),
1274            reference.unqualified,
1275            format!("{:?}", reference.confidence),
1276        );
1277        if seen.insert(key) {
1278            deduped.push(reference);
1279        }
1280    }
1281
1282    deduped
1283}
1284
1285fn relation_facts(
1286    scope: &Scope,
1287    mapping_schema: Option<&crate::schema::MappingSchema>,
1288) -> Vec<RelationFact> {
1289    let mut relations = Vec::new();
1290    let mut seen = HashSet::new();
1291    collect_relation_facts(scope, mapping_schema, &mut seen, &mut relations);
1292
1293    relations.sort_by(|left, right| {
1294        left.name
1295            .cmp(&right.name)
1296            .then_with(|| left.alias.cmp(&right.alias))
1297    });
1298    relations
1299}
1300
1301fn collect_relation_facts(
1302    scope: &Scope,
1303    mapping_schema: Option<&crate::schema::MappingSchema>,
1304    seen: &mut HashSet<String>,
1305    relations: &mut Vec<RelationFact>,
1306) {
1307    for relation in scope.sources.iter().map(|(source_name, source)| {
1308        let identity = source_table_identity(source);
1309        RelationFact {
1310            name: source
1311                .lineage_name
1312                .clone()
1313                .or_else(|| identity.as_ref().map(|identity| identity.name.clone()))
1314                .unwrap_or_else(|| source_name.clone()),
1315            alias: source.alias.clone().or_else(|| source_alias(source)),
1316            kind: source.kind,
1317            columns: source_columns(source, mapping_schema),
1318            catalog: identity
1319                .as_ref()
1320                .and_then(|identity| identity.catalog.clone()),
1321            schema: identity
1322                .as_ref()
1323                .and_then(|identity| identity.schema.clone()),
1324            table: identity
1325                .as_ref()
1326                .and_then(|identity| identity.table.clone()),
1327        }
1328    }) {
1329        let key = format!("{:?}|{}|{:?}", relation.kind, relation.name, relation.alias);
1330        if seen.insert(key) {
1331            relations.push(relation);
1332        }
1333    }
1334
1335    for branch_scope in &scope.union_scopes {
1336        collect_relation_facts(branch_scope, mapping_schema, seen, relations);
1337    }
1338}
1339
1340fn base_table_facts(
1341    scope: &Scope,
1342    mapping_schema: Option<&crate::schema::MappingSchema>,
1343) -> Vec<RelationFact> {
1344    let mut relations = Vec::new();
1345    let mut seen = HashSet::new();
1346
1347    collect_base_table_facts(scope, mapping_schema, &mut seen, &mut relations);
1348
1349    relations.sort_by(|left, right| left.name.cmp(&right.name));
1350    relations
1351}
1352
1353fn collect_base_table_facts(
1354    scope: &Scope,
1355    mapping_schema: Option<&crate::schema::MappingSchema>,
1356    seen: &mut HashSet<String>,
1357    relations: &mut Vec<RelationFact>,
1358) {
1359    for source in scope.sources.values() {
1360        if source.kind != SourceKind::Table {
1361            continue;
1362        }
1363
1364        let Some(identity) = source_table_identity(source) else {
1365            continue;
1366        };
1367
1368        if seen.insert(identity.name.clone()) {
1369            relations.push(RelationFact {
1370                name: identity.name,
1371                alias: source.alias.clone().or_else(|| source_alias(source)),
1372                kind: SourceKind::Table,
1373                columns: source_columns(source, mapping_schema),
1374                catalog: identity.catalog,
1375                schema: identity.schema,
1376                table: identity.table,
1377            });
1378        }
1379    }
1380
1381    for child_scope in scope
1382        .cte_scopes
1383        .iter()
1384        .chain(scope.union_scopes.iter())
1385        .chain(scope.table_scopes.iter())
1386        .chain(scope.derived_table_scopes.iter())
1387        .chain(scope.subquery_scopes.iter())
1388    {
1389        collect_base_table_facts(child_scope, mapping_schema, seen, relations);
1390    }
1391}
1392
1393fn source_columns(
1394    source: &SourceInfo,
1395    mapping_schema: Option<&crate::schema::MappingSchema>,
1396) -> Vec<String> {
1397    match &source.expression {
1398        Expression::Table(table) => mapping_schema
1399            .and_then(|schema| schema.column_names(&table_name(table)).ok())
1400            .unwrap_or_default(),
1401        Expression::Select(_)
1402        | Expression::Union(_)
1403        | Expression::Intersect(_)
1404        | Expression::Except(_) => get_output_column_names(&source.expression),
1405        Expression::Subquery(subquery) => get_output_column_names(&subquery.this),
1406        Expression::Cte(cte) if !cte.columns.is_empty() => cte
1407            .columns
1408            .iter()
1409            .map(|column| column.name.clone())
1410            .collect(),
1411        Expression::Cte(cte) => get_output_column_names(&cte.this),
1412        _ => Vec::new(),
1413    }
1414}
1415
1416fn source_table_name(source: &SourceInfo) -> Option<String> {
1417    source_table_identity(source).map(|identity| identity.name)
1418}
1419
1420fn source_alias(source: &SourceInfo) -> Option<String> {
1421    match &source.expression {
1422        Expression::Table(table) => table.alias.as_ref().map(|alias| alias.name.clone()),
1423        Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
1424        _ => None,
1425    }
1426}
1427
1428fn table_name(table: &TableRef) -> String {
1429    let mut parts = Vec::new();
1430    if let Some(catalog) = &table.catalog {
1431        parts.push(catalog.name.clone());
1432    }
1433    if let Some(schema) = &table.schema {
1434        parts.push(schema.name.clone());
1435    }
1436    parts.push(table.name.name.clone());
1437    parts.join(".")
1438}
1439
1440#[derive(Debug, Clone)]
1441struct RelationIdentity {
1442    name: String,
1443    catalog: Option<String>,
1444    schema: Option<String>,
1445    table: Option<String>,
1446}
1447
1448fn source_table_identity(source: &SourceInfo) -> Option<RelationIdentity> {
1449    match &source.expression {
1450        Expression::Table(table) => Some(table_identity(table)),
1451        _ => None,
1452    }
1453}
1454
1455fn table_identity(table: &TableRef) -> RelationIdentity {
1456    RelationIdentity {
1457        name: table_name(table),
1458        catalog: table.catalog.as_ref().map(|catalog| catalog.name.clone()),
1459        schema: table.schema.as_ref().map(|schema| schema.name.clone()),
1460        table: Some(table.name.name.clone()),
1461    }
1462}
1463
1464fn set_operation_facts(
1465    expression: &Expression,
1466    scope: &Scope,
1467    dialect: DialectType,
1468) -> Vec<SetOperationFact> {
1469    let mut facts = Vec::new();
1470    collect_set_operation_facts(expression, scope, dialect, &mut facts);
1471    facts
1472}
1473
1474fn collect_set_operation_facts(
1475    expression: &Expression,
1476    scope: &Scope,
1477    dialect: DialectType,
1478    facts: &mut Vec<SetOperationFact>,
1479) {
1480    match expression {
1481        Expression::Union(union) => {
1482            facts.push(SetOperationFact {
1483                kind: "union".to_string(),
1484                all: union.all,
1485                distinct: union.distinct,
1486                output_columns: get_output_column_names(expression),
1487                branches: set_operation_branches(&union.left, &union.right, scope, dialect),
1488            });
1489            collect_set_operation_facts(&union.left, scope, dialect, facts);
1490            collect_set_operation_facts(&union.right, scope, dialect, facts);
1491        }
1492        Expression::Intersect(intersect) => {
1493            facts.push(SetOperationFact {
1494                kind: "intersect".to_string(),
1495                all: intersect.all,
1496                distinct: intersect.distinct,
1497                output_columns: get_output_column_names(expression),
1498                branches: set_operation_branches(&intersect.left, &intersect.right, scope, dialect),
1499            });
1500            collect_set_operation_facts(&intersect.left, scope, dialect, facts);
1501            collect_set_operation_facts(&intersect.right, scope, dialect, facts);
1502        }
1503        Expression::Except(except) => {
1504            facts.push(SetOperationFact {
1505                kind: "except".to_string(),
1506                all: except.all,
1507                distinct: except.distinct,
1508                output_columns: get_output_column_names(expression),
1509                branches: set_operation_branches(&except.left, &except.right, scope, dialect),
1510            });
1511            collect_set_operation_facts(&except.left, scope, dialect, facts);
1512            collect_set_operation_facts(&except.right, scope, dialect, facts);
1513        }
1514        Expression::Subquery(subquery) => {
1515            collect_set_operation_facts(&subquery.this, scope, dialect, facts);
1516        }
1517        _ => {}
1518    }
1519}
1520
1521fn set_operation_branches(
1522    left: &Expression,
1523    right: &Expression,
1524    scope: &Scope,
1525    dialect: DialectType,
1526) -> Vec<SetOperationBranchFact> {
1527    vec![
1528        SetOperationBranchFact {
1529            index: 0,
1530            projections: projection_facts_for_branch(left, scope, dialect),
1531        },
1532        SetOperationBranchFact {
1533            index: 1,
1534            projections: projection_facts_for_branch(right, scope, dialect),
1535        },
1536    ]
1537}
1538
1539fn projection_facts_for_branch(
1540    expression: &Expression,
1541    root_scope: &Scope,
1542    dialect: DialectType,
1543) -> Vec<ProjectionFact> {
1544    let branch_scope = build_scope(expression);
1545    let scope = if branch_scope.sources.is_empty() {
1546        root_scope
1547    } else {
1548        &branch_scope
1549    };
1550    let nullability_context = NullabilityContext {
1551        schema: None,
1552        nullable_sources: nullable_source_names(expression),
1553    };
1554    projection_facts_for_query(expression, scope, dialect, &nullability_context)
1555}
1556
1557fn non_empty_string(value: String) -> Option<String> {
1558    if value.is_empty() {
1559        None
1560    } else {
1561        Some(value)
1562    }
1563}