Skip to main content

polyglot_sql/
query_analysis.rs

1//! Compact query analysis facts.
2//!
3//! This module intentionally builds on the existing parser, scope builder, type
4//! annotator, and lineage implementation. It is a convenience API: callers that
5//! need the full AST or full lineage graph should continue using those lower
6//! level APIs directly.
7
8use crate::ast_transforms::get_output_column_names;
9use crate::dialects::{Dialect, DialectType};
10use crate::expressions::{DataType, Expression, JoinKind, TableRef, With};
11use crate::lineage::{lineage_by_index_from_expression, LineageNode};
12use crate::optimizer::annotate_types::annotate_types;
13use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
14use crate::schema::{MappingSchema, Schema};
15use crate::scope::{build_scope, Scope, SourceInfo, SourceKind};
16use crate::traversal::{contains_aggregate, ExpressionWalk};
17use crate::validation::{mapping_schema_from_validation_schema, ValidationSchema};
18use crate::{parse_data_type, parse_one, Error, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21
22/// Options for [`analyze_query`].
23#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24#[serde(rename_all = "camelCase", default)]
25pub struct AnalyzeQueryOptions {
26    /// SQL dialect used for parsing and dialect-aware rendering.
27    pub dialect: DialectType,
28    /// Optional validation schema used for qualification and type annotation.
29    pub schema: Option<ValidationSchema>,
30}
31
32/// Compact facts about a query's output shape and data dependencies.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct QueryAnalysis {
36    pub shape: QueryShape,
37    pub ctes: Vec<String>,
38    pub cte_facts: Vec<CteFact>,
39    pub projections: Vec<ProjectionFact>,
40    pub relations: Vec<RelationFact>,
41    pub base_tables: Vec<RelationFact>,
42    pub star_projections: Vec<StarProjectionFact>,
43    pub set_operations: Vec<SetOperationFact>,
44}
45
46/// Top-level query shape.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum QueryShape {
50    Select,
51    SetOperation,
52}
53
54/// Compact fact about one output projection.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(rename_all = "camelCase")]
57pub struct ProjectionFact {
58    pub index: usize,
59    pub name: Option<String>,
60    pub is_star: bool,
61    pub star_table: Option<String>,
62    pub transform_kind: TransformKind,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub transform_function: Option<TransformFunctionFact>,
65    pub cast_type: Option<String>,
66    pub type_hint: Option<String>,
67    pub nullability: ProjectionNullability,
68    pub upstream: Vec<ColumnReferenceFact>,
69}
70
71/// Compact fact about a function-like projection transform.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct TransformFunctionFact {
75    pub name: String,
76    pub literal_args: Vec<String>,
77    pub column_args: Vec<ColumnReferenceFact>,
78}
79
80/// Compact fact about one top-level CTE definition.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct CteFact {
84    pub name: String,
85    pub columns: Vec<String>,
86    pub body_sql: String,
87    pub output_columns: Vec<String>,
88}
89
90/// Compact fact about one original star projection.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "camelCase")]
93pub struct StarProjectionFact {
94    pub index: usize,
95    pub table: Option<String>,
96    pub expanded_columns: Vec<String>,
97}
98
99/// Compact fact about an upstream column reference.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "camelCase")]
102pub struct ColumnReferenceFact {
103    pub source_name: Option<String>,
104    pub source_alias: Option<String>,
105    pub source_kind: SourceKind,
106    pub table: Option<String>,
107    pub column: String,
108    pub unqualified: bool,
109    pub confidence: ReferenceConfidence,
110}
111
112/// Compact fact about a relation visible in the root scope.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct RelationFact {
116    pub name: String,
117    pub alias: Option<String>,
118    pub kind: SourceKind,
119    pub columns: Vec<String>,
120    pub catalog: Option<String>,
121    pub schema: Option<String>,
122    pub table: Option<String>,
123}
124
125/// Compact fact about a set operation.
126#[derive(Debug, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct SetOperationFact {
129    pub kind: String,
130    pub all: bool,
131    pub distinct: bool,
132    pub output_columns: Vec<String>,
133    pub branches: Vec<SetOperationBranchFact>,
134}
135
136/// Compact facts for one immediate set-operation branch.
137#[derive(Debug, Clone, Serialize, Deserialize)]
138#[serde(rename_all = "camelCase")]
139pub struct SetOperationBranchFact {
140    pub index: usize,
141    pub projections: Vec<ProjectionFact>,
142}
143
144/// High-level kind of transformation performed by a projection.
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub enum TransformKind {
148    Direct,
149    Cast,
150    Aggregation,
151    Constant,
152    Expression,
153    Star,
154}
155
156/// Confidence level for a compact upstream column reference.
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum ReferenceConfidence {
160    Resolved,
161    Ambiguous,
162    Unknown,
163}
164
165/// Conservative nullability classification for one output projection.
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum ProjectionNullability {
169    NonNull,
170    Nullable,
171    Unknown,
172}
173
174/// Analyze a single SELECT or set-operation query.
175pub fn analyze_query(sql: &str, options: AnalyzeQueryOptions) -> Result<QueryAnalysis> {
176    let mut expression = parse_one(sql, options.dialect)?;
177    expression = effective_query(expression);
178    ensure_query(&expression)?;
179    let original_expression = expression.clone();
180
181    let mapping_schema = options
182        .schema
183        .as_ref()
184        .map(|schema| analysis_mapping_schema(schema, options.dialect));
185    let schema_info = options.schema.as_ref().map(AnalysisSchemaInfo::from_schema);
186    let cte_facts = top_level_cte_facts(&original_expression, options.dialect)?;
187    let star_projections = star_projection_facts(&original_expression, mapping_schema.as_ref());
188
189    if let Some(schema) = mapping_schema.as_ref() {
190        let qualify_options = QualifyColumnsOptions::new()
191            .with_dialect(options.dialect)
192            .with_allow_partial(true);
193        expression = qualify_columns(expression, schema, &qualify_options)
194            .map_err(|e| Error::internal(format!("query analysis qualification failed: {e}")))?;
195    }
196
197    let annotation_schema = mapping_schema.as_ref().map(|schema| {
198        let mut alias_schema = schema.clone();
199        add_scope_aliases_to_schema(
200            &build_scope(&expression),
201            schema,
202            &mut alias_schema,
203            options.dialect,
204        );
205        alias_schema
206    });
207
208    annotate_types(
209        &mut expression,
210        annotation_schema
211            .as_ref()
212            .map(|schema| schema as &dyn Schema),
213        Some(options.dialect),
214    );
215    crate::lineage::expand_cte_stars(
216        &mut expression,
217        annotation_schema
218            .as_ref()
219            .or(mapping_schema.as_ref())
220            .map(|schema| schema as &dyn Schema),
221    );
222
223    let scope = build_scope(&expression);
224    let nullability_context = NullabilityContext {
225        schema: schema_info.as_ref(),
226        nullable_sources: nullable_source_names(&expression),
227    };
228    let shape = if is_set_operation(&expression) {
229        QueryShape::SetOperation
230    } else {
231        QueryShape::Select
232    };
233
234    Ok(QueryAnalysis {
235        shape,
236        ctes: collect_cte_names(&expression),
237        cte_facts,
238        projections: projection_facts_for_query(
239            &expression,
240            &scope,
241            options.dialect,
242            &nullability_context,
243        ),
244        relations: relation_facts(&scope, mapping_schema.as_ref()),
245        base_tables: base_table_facts(&scope, mapping_schema.as_ref()),
246        star_projections,
247        set_operations: set_operation_facts(&expression, &scope, options.dialect),
248    })
249}
250
251fn analysis_mapping_schema(schema: &ValidationSchema, dialect: DialectType) -> MappingSchema {
252    let broad_schema = mapping_schema_from_validation_schema(schema);
253    let mut mapping_schema = MappingSchema::with_dialect(dialect);
254
255    for table in &schema.tables {
256        let table_names = validation_table_names(table);
257        if table_names.is_empty() {
258            continue;
259        }
260
261        let fallback_table = table_names[0].as_str();
262        let columns: Vec<(String, DataType)> = table
263            .columns
264            .iter()
265            .map(|column| {
266                let data_type = parse_analysis_data_type(&column.data_type, dialect)
267                    .unwrap_or_else(|| {
268                        broad_schema
269                            .get_column_type(fallback_table, &column.name)
270                            .unwrap_or(DataType::Unknown)
271                    });
272                (column.name.to_ascii_lowercase(), data_type)
273            })
274            .collect();
275
276        for table_name in table_names {
277            let _ = mapping_schema.add_table(&table_name, &columns, Some(dialect));
278        }
279    }
280
281    mapping_schema
282}
283
284fn validation_table_names(table: &crate::validation::SchemaTable) -> Vec<String> {
285    let mut names = Vec::new();
286
287    names.push(table.name.to_ascii_lowercase());
288    if let Some(schema_name) = &table.schema {
289        names.push(format!(
290            "{}.{}",
291            schema_name.to_ascii_lowercase(),
292            table.name.to_ascii_lowercase()
293        ));
294    }
295    for alias in &table.aliases {
296        names.push(alias.to_ascii_lowercase());
297    }
298
299    names.sort();
300    names.dedup();
301    names
302}
303
304fn parse_analysis_data_type(data_type: &str, dialect: DialectType) -> Option<DataType> {
305    let trimmed = data_type.trim();
306    if trimmed.is_empty() {
307        return None;
308    }
309    parse_data_type(trimmed, dialect).ok()
310}
311
312fn add_scope_aliases_to_schema(
313    scope: &Scope,
314    source_schema: &MappingSchema,
315    target_schema: &mut MappingSchema,
316    dialect: DialectType,
317) {
318    for child_scope in scope.traverse() {
319        for (source_name, source) in &child_scope.sources {
320            if source.kind != SourceKind::Table {
321                continue;
322            }
323            if let Some(table_name) = source_table_name(source) {
324                if source_name == &table_name {
325                    continue;
326                }
327                if let Ok(column_names) = source_schema.column_names(&table_name) {
328                    let columns: Vec<(String, DataType)> = column_names
329                        .iter()
330                        .map(|column| {
331                            (
332                                column.clone(),
333                                source_schema
334                                    .get_column_type(&table_name, column)
335                                    .unwrap_or(DataType::Unknown),
336                            )
337                        })
338                        .collect();
339                    let _ = target_schema.add_table(source_name, &columns, Some(dialect));
340                }
341            }
342        }
343    }
344}
345
346#[derive(Debug, Clone)]
347struct AnalysisColumnInfo {
348    nullable: Option<bool>,
349    primary_key: bool,
350}
351
352#[derive(Debug, Clone)]
353struct AnalysisSchemaInfo {
354    columns: HashMap<(String, String), AnalysisColumnInfo>,
355}
356
357impl AnalysisSchemaInfo {
358    fn from_schema(schema: &ValidationSchema) -> Self {
359        let mut columns = HashMap::new();
360
361        for table in &schema.tables {
362            let table_names = validation_table_names(table);
363            let primary_keys: HashSet<String> = table
364                .primary_key
365                .iter()
366                .map(|column| column.to_ascii_lowercase())
367                .collect();
368
369            for column in &table.columns {
370                let info = AnalysisColumnInfo {
371                    nullable: column.nullable,
372                    primary_key: column.primary_key
373                        || primary_keys.contains(&column.name.to_ascii_lowercase()),
374                };
375
376                for table_name in &table_names {
377                    columns.insert(
378                        (
379                            normalize_lookup_name(table_name),
380                            normalize_lookup_name(&column.name),
381                        ),
382                        info.clone(),
383                    );
384                }
385            }
386        }
387
388        Self { columns }
389    }
390
391    fn column(&self, table: &str, column: &str) -> Option<&AnalysisColumnInfo> {
392        self.columns
393            .get(&(normalize_lookup_name(table), normalize_lookup_name(column)))
394    }
395}
396
397struct NullabilityContext<'a> {
398    schema: Option<&'a AnalysisSchemaInfo>,
399    nullable_sources: HashSet<String>,
400}
401
402fn top_level_cte_facts(expression: &Expression, dialect: DialectType) -> Result<Vec<CteFact>> {
403    let Some(with_clause) = with_clause(expression) else {
404        return Ok(Vec::new());
405    };
406
407    with_clause
408        .ctes
409        .iter()
410        .map(|cte| {
411            Ok(CteFact {
412                name: cte.alias.name.clone(),
413                columns: cte
414                    .columns
415                    .iter()
416                    .map(|column| column.name.clone())
417                    .collect(),
418                body_sql: Dialect::get(dialect).generate(&cte.this)?,
419                output_columns: get_output_column_names(&cte.this),
420            })
421        })
422        .collect()
423}
424
425fn star_projection_facts(
426    expression: &Expression,
427    mapping_schema: Option<&MappingSchema>,
428) -> Vec<StarProjectionFact> {
429    let scope = build_scope(expression);
430    let ordered_sources = ordered_source_names_for_query(expression);
431
432    select_expressions_for_query(expression)
433        .iter()
434        .enumerate()
435        .filter_map(|(index, projection)| {
436            let inner = unwrap_projection_alias(projection);
437            if !projection_is_star(inner) {
438                return None;
439            }
440
441            let table = projection_star_table(inner);
442            let expanded_columns =
443                expanded_star_columns(table.as_deref(), &scope, &ordered_sources, mapping_schema);
444
445            Some(StarProjectionFact {
446                index,
447                table,
448                expanded_columns,
449            })
450        })
451        .collect()
452}
453
454fn expanded_star_columns(
455    star_table: Option<&str>,
456    scope: &Scope,
457    ordered_sources: &[String],
458    mapping_schema: Option<&MappingSchema>,
459) -> Vec<String> {
460    let mut columns = Vec::new();
461    let mut source_names: Vec<String> = if ordered_sources.is_empty() {
462        let mut names: Vec<_> = scope.sources.keys().cloned().collect();
463        names.sort();
464        names
465    } else {
466        ordered_sources.to_vec()
467    };
468
469    source_names.dedup();
470
471    for source_name in source_names {
472        let Some(source) = scope.sources.get(&source_name) else {
473            continue;
474        };
475
476        if let Some(star_table) = star_table {
477            let matches = source_name.eq_ignore_ascii_case(star_table)
478                || source
479                    .alias
480                    .as_deref()
481                    .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
482                || source_table_name(source)
483                    .is_some_and(|table| table.eq_ignore_ascii_case(star_table));
484
485            if !matches {
486                continue;
487            }
488        }
489
490        columns.extend(source_columns(source, mapping_schema));
491    }
492
493    columns
494}
495
496fn ordered_source_names_for_query(expression: &Expression) -> Vec<String> {
497    match expression {
498        Expression::Select(select) => ordered_source_names_for_select(select),
499        Expression::Union(union) => ordered_source_names_for_query(&union.left),
500        Expression::Intersect(intersect) => ordered_source_names_for_query(&intersect.left),
501        Expression::Except(except) => ordered_source_names_for_query(&except.left),
502        Expression::Subquery(subquery) => ordered_source_names_for_query(&subquery.this),
503        _ => Vec::new(),
504    }
505}
506
507fn ordered_source_names_for_select(select: &crate::expressions::Select) -> Vec<String> {
508    let mut sources = Vec::new();
509
510    if let Some(from) = &select.from {
511        for expression in &from.expressions {
512            if let Some(source_name) = expression_source_name(expression) {
513                sources.push(source_name);
514            }
515        }
516    }
517
518    for join in &select.joins {
519        if let Some(source_name) = expression_source_name(&join.this) {
520            sources.push(source_name);
521        }
522    }
523
524    sources
525}
526
527fn nullable_source_names(expression: &Expression) -> HashSet<String> {
528    match expression {
529        Expression::Select(select) => nullable_source_names_for_select(select),
530        Expression::Union(union) => nullable_source_names(&union.left),
531        Expression::Intersect(intersect) => nullable_source_names(&intersect.left),
532        Expression::Except(except) => nullable_source_names(&except.left),
533        Expression::Subquery(subquery) => nullable_source_names(&subquery.this),
534        _ => HashSet::new(),
535    }
536}
537
538fn nullable_source_names_for_select(select: &crate::expressions::Select) -> HashSet<String> {
539    let mut nullable = HashSet::new();
540    let mut left_sources = Vec::new();
541
542    if let Some(from) = &select.from {
543        for expression in &from.expressions {
544            if let Some(source_name) = expression_source_name(expression) {
545                left_sources.push(source_name);
546            }
547        }
548    }
549
550    for join in &select.joins {
551        let right_source = expression_source_name(&join.this);
552
553        if join_nullable_left(join.kind) {
554            for source_name in &left_sources {
555                nullable.insert(normalize_lookup_name(source_name));
556            }
557        }
558
559        if join_nullable_right(join.kind) {
560            if let Some(source_name) = &right_source {
561                nullable.insert(normalize_lookup_name(source_name));
562            }
563        }
564
565        if let Some(source_name) = right_source {
566            left_sources.push(source_name);
567        }
568    }
569
570    nullable
571}
572
573fn join_nullable_left(kind: JoinKind) -> bool {
574    matches!(
575        kind,
576        JoinKind::Right
577            | JoinKind::NaturalRight
578            | JoinKind::AsOfRight
579            | JoinKind::Full
580            | JoinKind::NaturalFull
581            | JoinKind::Outer
582    )
583}
584
585fn join_nullable_right(kind: JoinKind) -> bool {
586    matches!(
587        kind,
588        JoinKind::Left
589            | JoinKind::NaturalLeft
590            | JoinKind::AsOfLeft
591            | JoinKind::LeftLateral
592            | JoinKind::OuterApply
593            | JoinKind::LeftArray
594            | JoinKind::Full
595            | JoinKind::NaturalFull
596            | JoinKind::Outer
597    )
598}
599
600fn expression_source_name(expression: &Expression) -> Option<String> {
601    match expression {
602        Expression::Table(table) => table
603            .alias
604            .as_ref()
605            .map(|alias| alias.name.clone())
606            .or_else(|| Some(table.name.name.clone())),
607        Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
608        Expression::Alias(alias) => Some(alias.alias.name.clone()),
609        Expression::Cte(cte) => Some(cte.alias.name.clone()),
610        _ => None,
611    }
612}
613
614fn normalize_lookup_name(name: &str) -> String {
615    name.to_ascii_lowercase()
616}
617
618fn effective_query(expression: Expression) -> Expression {
619    match expression {
620        Expression::Prepare(prepare) => prepare.statement,
621        Expression::Subquery(subquery) if subquery.alias.is_none() => subquery.this,
622        other => other,
623    }
624}
625
626fn ensure_query(expression: &Expression) -> Result<()> {
627    if matches!(
628        expression,
629        Expression::Select(_)
630            | Expression::Union(_)
631            | Expression::Intersect(_)
632            | Expression::Except(_)
633    ) {
634        Ok(())
635    } else {
636        Err(Error::internal(
637            "analyze_query requires a SELECT or set operation query",
638        ))
639    }
640}
641
642fn is_set_operation(expression: &Expression) -> bool {
643    matches!(
644        expression,
645        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
646    )
647}
648
649fn collect_cte_names(expression: &Expression) -> Vec<String> {
650    let mut names = Vec::new();
651    let mut seen = HashSet::new();
652    collect_cte_names_inner(expression, &mut names, &mut seen);
653    names
654}
655
656fn collect_cte_names_inner(
657    expression: &Expression,
658    names: &mut Vec<String>,
659    seen: &mut HashSet<String>,
660) {
661    if let Some(with_clause) = with_clause(expression) {
662        collect_with_names(with_clause, names, seen);
663    }
664
665    match expression {
666        Expression::Union(union) => {
667            collect_cte_names_inner(&union.left, names, seen);
668            collect_cte_names_inner(&union.right, names, seen);
669        }
670        Expression::Intersect(intersect) => {
671            collect_cte_names_inner(&intersect.left, names, seen);
672            collect_cte_names_inner(&intersect.right, names, seen);
673        }
674        Expression::Except(except) => {
675            collect_cte_names_inner(&except.left, names, seen);
676            collect_cte_names_inner(&except.right, names, seen);
677        }
678        Expression::Subquery(subquery) => collect_cte_names_inner(&subquery.this, names, seen),
679        _ => {}
680    }
681}
682
683fn collect_with_names(with_clause: &With, names: &mut Vec<String>, seen: &mut HashSet<String>) {
684    for cte in &with_clause.ctes {
685        if seen.insert(cte.alias.name.clone()) {
686            names.push(cte.alias.name.clone());
687        }
688        collect_cte_names_inner(&cte.this, names, seen);
689    }
690}
691
692fn with_clause(expression: &Expression) -> Option<&With> {
693    match expression {
694        Expression::Select(select) => select.with.as_ref(),
695        Expression::Union(union) => union.with.as_ref(),
696        Expression::Intersect(intersect) => intersect.with.as_ref(),
697        Expression::Except(except) => except.with.as_ref(),
698        _ => None,
699    }
700}
701
702fn projection_facts_for_query(
703    expression: &Expression,
704    scope: &Scope,
705    dialect: DialectType,
706    nullability_context: &NullabilityContext<'_>,
707) -> Vec<ProjectionFact> {
708    let expressions = select_expressions_for_query(expression);
709    let names = get_output_column_names(expression);
710
711    expressions
712        .iter()
713        .enumerate()
714        .map(|(index, projection)| {
715            projection_fact(
716                index,
717                names
718                    .get(index)
719                    .cloned()
720                    .or_else(|| projection_name(projection)),
721                projection,
722                expression,
723                scope,
724                dialect,
725                nullability_context,
726            )
727        })
728        .collect()
729}
730
731fn select_expressions_for_query(expression: &Expression) -> Vec<&Expression> {
732    match expression {
733        Expression::Select(select) => select.expressions.iter().collect(),
734        Expression::Union(union) => select_expressions_for_query(&union.left),
735        Expression::Intersect(intersect) => select_expressions_for_query(&intersect.left),
736        Expression::Except(except) => select_expressions_for_query(&except.left),
737        Expression::Subquery(subquery) => select_expressions_for_query(&subquery.this),
738        _ => Vec::new(),
739    }
740}
741
742fn projection_fact(
743    index: usize,
744    name: Option<String>,
745    projection: &Expression,
746    query: &Expression,
747    scope: &Scope,
748    dialect: DialectType,
749    nullability_context: &NullabilityContext<'_>,
750) -> ProjectionFact {
751    let inner = unwrap_projection_alias(projection);
752    let is_star = projection_is_star(inner);
753    let upstream = lineage_by_index_from_expression(index, query, Some(dialect), false)
754        .map(|node| terminal_references_from_lineage(&node))
755        .ok()
756        .filter(|refs| !refs.is_empty())
757        .unwrap_or_else(|| fallback_column_references(inner, scope));
758
759    ProjectionFact {
760        index,
761        name,
762        is_star,
763        star_table: projection_star_table(inner),
764        transform_kind: transform_kind(inner),
765        transform_function: transform_function_fact(inner, scope, dialect),
766        cast_type: cast_type(inner, dialect),
767        type_hint: projection
768            .inferred_type()
769            .or_else(|| inner.inferred_type())
770            .and_then(|data_type| render_data_type(data_type, dialect)),
771        nullability: projection_nullability(inner, scope, nullability_context),
772        upstream,
773    }
774}
775
776fn transform_function_fact(
777    expression: &Expression,
778    scope: &Scope,
779    dialect: DialectType,
780) -> Option<TransformFunctionFact> {
781    let mut matches = expression
782        .find_all(|candidate| transform_function_fact_for_node(candidate, scope, dialect).is_some())
783        .into_iter();
784
785    let first = matches.next()?;
786    if matches.next().is_some() {
787        return None;
788    }
789
790    transform_function_fact_for_node(first, scope, dialect)
791}
792
793fn transform_function_fact_for_node(
794    expression: &Expression,
795    scope: &Scope,
796    dialect: DialectType,
797) -> Option<TransformFunctionFact> {
798    match expression {
799        Expression::Function(function) => Some(transform_function_from_args(
800            &function.name,
801            &function.args,
802            scope,
803            dialect,
804        )),
805        Expression::AggregateFunction(function) => Some(transform_function_from_args(
806            &function.name,
807            &function.args,
808            scope,
809            dialect,
810        )),
811        Expression::DateTrunc(function) => Some(transform_function_from_parts(
812            "DATE_TRUNC",
813            vec![datetime_field_name(&function.unit)],
814            vec![&function.this],
815            scope,
816            dialect,
817        )),
818        Expression::TimestampTrunc(function) => Some(transform_function_from_parts(
819            "TIMESTAMP_TRUNC",
820            vec![datetime_field_name(&function.unit)],
821            vec![&function.this],
822            scope,
823            dialect,
824        )),
825        Expression::TimeTrunc(function) => {
826            let mut args = vec![function.this.as_ref()];
827            if let Some(zone) = function.zone.as_deref() {
828                args.push(zone);
829            }
830            Some(transform_function_from_parts(
831                "TIME_TRUNC",
832                vec![function.unit.clone()],
833                args,
834                scope,
835                dialect,
836            ))
837        }
838        Expression::Extract(function) => Some(transform_function_from_parts(
839            "EXTRACT",
840            vec![datetime_field_name(&function.field)],
841            vec![&function.this],
842            scope,
843            dialect,
844        )),
845        Expression::DateAdd(function) => Some(transform_function_from_parts(
846            "DATE_ADD",
847            Vec::new(),
848            vec![&function.this, &function.interval],
849            scope,
850            dialect,
851        )),
852        Expression::DateSub(function) => Some(transform_function_from_parts(
853            "DATE_SUB",
854            Vec::new(),
855            vec![&function.this, &function.interval],
856            scope,
857            dialect,
858        )),
859        Expression::DateDiff(function) => Some(transform_function_from_parts(
860            "DATE_DIFF",
861            Vec::new(),
862            vec![&function.this, &function.expression],
863            scope,
864            dialect,
865        )),
866        _ => None,
867    }
868}
869
870fn transform_function_from_args(
871    name: &str,
872    args: &[Expression],
873    scope: &Scope,
874    dialect: DialectType,
875) -> TransformFunctionFact {
876    let literal_args = args
877        .iter()
878        .filter_map(|arg| literal_argument(arg, dialect))
879        .collect();
880    transform_function_from_parts(name, literal_args, args.iter().collect(), scope, dialect)
881}
882
883fn transform_function_from_parts(
884    name: &str,
885    literal_args: Vec<String>,
886    args: Vec<&Expression>,
887    scope: &Scope,
888    _dialect: DialectType,
889) -> TransformFunctionFact {
890    let column_args = dedupe_column_refs(
891        args.into_iter()
892            .flat_map(|arg| fallback_column_references(arg, scope))
893            .collect(),
894    );
895
896    TransformFunctionFact {
897        name: name.to_string(),
898        literal_args,
899        column_args,
900    }
901}
902
903fn literal_argument(expression: &Expression, dialect: DialectType) -> Option<String> {
904    match expression {
905        Expression::Literal(literal) => Some(literal.value_str().to_string()),
906        Expression::Boolean(boolean) => Some(boolean.value.to_string()),
907        Expression::Null(_) => Some("NULL".to_string()),
908        Expression::Identifier(identifier) => Some(identifier.name.clone()),
909        Expression::Var(var) => Some(var.this.clone()),
910        Expression::DataType(data_type) => render_data_type(data_type, dialect),
911        _ => None,
912    }
913}
914
915fn datetime_field_name(field: &crate::expressions::DateTimeField) -> String {
916    match field {
917        crate::expressions::DateTimeField::Year => "year".to_string(),
918        crate::expressions::DateTimeField::Month => "month".to_string(),
919        crate::expressions::DateTimeField::Day => "day".to_string(),
920        crate::expressions::DateTimeField::Hour => "hour".to_string(),
921        crate::expressions::DateTimeField::Minute => "minute".to_string(),
922        crate::expressions::DateTimeField::Second => "second".to_string(),
923        crate::expressions::DateTimeField::Millisecond => "millisecond".to_string(),
924        crate::expressions::DateTimeField::Microsecond => "microsecond".to_string(),
925        crate::expressions::DateTimeField::DayOfWeek => "day_of_week".to_string(),
926        crate::expressions::DateTimeField::DayOfYear => "day_of_year".to_string(),
927        crate::expressions::DateTimeField::Week => "week".to_string(),
928        crate::expressions::DateTimeField::WeekWithModifier(modifier) => {
929            format!("week({modifier})")
930        }
931        crate::expressions::DateTimeField::Quarter => "quarter".to_string(),
932        crate::expressions::DateTimeField::Epoch => "epoch".to_string(),
933        crate::expressions::DateTimeField::Timezone => "timezone".to_string(),
934        crate::expressions::DateTimeField::TimezoneHour => "timezone_hour".to_string(),
935        crate::expressions::DateTimeField::TimezoneMinute => "timezone_minute".to_string(),
936        crate::expressions::DateTimeField::Date => "date".to_string(),
937        crate::expressions::DateTimeField::Time => "time".to_string(),
938        crate::expressions::DateTimeField::Custom(name) => name.clone(),
939    }
940}
941
942fn unwrap_projection_alias(expression: &Expression) -> &Expression {
943    match expression {
944        Expression::Alias(alias) => unwrap_projection_alias(&alias.this),
945        Expression::Annotated(annotated) => unwrap_projection_alias(&annotated.this),
946        Expression::Paren(paren) => unwrap_projection_alias(&paren.this),
947        _ => expression,
948    }
949}
950
951fn projection_name(expression: &Expression) -> Option<String> {
952    match expression {
953        Expression::Alias(alias) => Some(alias.alias.name.clone()),
954        Expression::Column(column) => Some(column.name.name.clone()),
955        Expression::Identifier(identifier) => Some(identifier.name.clone()),
956        Expression::Star(_) => Some("*".to_string()),
957        Expression::Annotated(annotated) => projection_name(&annotated.this),
958        _ => None,
959    }
960}
961
962fn projection_is_star(expression: &Expression) -> bool {
963    matches!(expression, Expression::Star(_))
964        || matches!(expression, Expression::Column(column) if column.name.name == "*")
965}
966
967fn projection_star_table(expression: &Expression) -> Option<String> {
968    match expression {
969        Expression::Star(star) => star
970            .table
971            .as_ref()
972            .map(|identifier| identifier.name.clone()),
973        Expression::Column(column) if column.name.name == "*" => column
974            .table
975            .as_ref()
976            .map(|identifier| identifier.name.clone()),
977        _ => None,
978    }
979}
980
981fn transform_kind(expression: &Expression) -> TransformKind {
982    if projection_is_star(expression) {
983        TransformKind::Star
984    } else if is_cast_expression(expression) {
985        TransformKind::Cast
986    } else if contains_aggregate(expression) {
987        TransformKind::Aggregation
988    } else if matches!(
989        expression,
990        Expression::Column(_) | Expression::Identifier(_)
991    ) {
992        TransformKind::Direct
993    } else if is_simple_constant(expression) {
994        TransformKind::Constant
995    } else {
996        TransformKind::Expression
997    }
998}
999
1000fn is_cast_expression(expression: &Expression) -> bool {
1001    matches!(
1002        expression,
1003        Expression::Cast(_) | Expression::TryCast(_) | Expression::SafeCast(_)
1004    )
1005}
1006
1007fn cast_type(expression: &Expression, dialect: DialectType) -> Option<String> {
1008    match expression {
1009        Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1010            render_data_type(&cast.to, dialect)
1011        }
1012        _ => None,
1013    }
1014}
1015
1016fn render_data_type(data_type: &DataType, dialect: DialectType) -> Option<String> {
1017    Dialect::get(dialect)
1018        .generate(&Expression::DataType(data_type.clone()))
1019        .ok()
1020}
1021
1022fn is_simple_constant(expression: &Expression) -> bool {
1023    match expression {
1024        Expression::Literal(_) | Expression::Boolean(_) | Expression::Null(_) => true,
1025        Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1026            is_simple_constant(&cast.this)
1027        }
1028        Expression::Neg(unary) | Expression::BitwiseNot(unary) => is_simple_constant(&unary.this),
1029        _ => false,
1030    }
1031}
1032
1033fn projection_nullability(
1034    expression: &Expression,
1035    scope: &Scope,
1036    context: &NullabilityContext<'_>,
1037) -> ProjectionNullability {
1038    match expression {
1039        Expression::Alias(alias) => projection_nullability(&alias.this, scope, context),
1040        Expression::Annotated(annotated) => projection_nullability(&annotated.this, scope, context),
1041        Expression::Paren(paren) => projection_nullability(&paren.this, scope, context),
1042        Expression::Literal(_) | Expression::Boolean(_) => ProjectionNullability::NonNull,
1043        Expression::Null(_) => ProjectionNullability::Nullable,
1044        Expression::Count(_) | Expression::CountIf(_) => ProjectionNullability::NonNull,
1045        Expression::Cast(cast) => projection_nullability(&cast.this, scope, context),
1046        Expression::TryCast(_) | Expression::SafeCast(_) => ProjectionNullability::Unknown,
1047        Expression::Column(column) => column_nullability(
1048            &column.name.name,
1049            column.table.as_ref().map(|table| table.name.as_str()),
1050            scope,
1051            context,
1052        ),
1053        Expression::Identifier(identifier) => {
1054            column_nullability(&identifier.name, None, scope, context)
1055        }
1056        Expression::Coalesce(func) => coalesce_nullability(&func.expressions, scope, context),
1057        _ => ProjectionNullability::Unknown,
1058    }
1059}
1060
1061fn column_nullability(
1062    column_name: &str,
1063    source_name: Option<&str>,
1064    scope: &Scope,
1065    context: &NullabilityContext<'_>,
1066) -> ProjectionNullability {
1067    let resolved_source_name = source_name
1068        .map(str::to_string)
1069        .or_else(|| single_scope_source_name(scope));
1070
1071    if let Some(source_name) = &resolved_source_name {
1072        if context
1073            .nullable_sources
1074            .contains(&normalize_lookup_name(source_name))
1075        {
1076            return ProjectionNullability::Nullable;
1077        }
1078    }
1079
1080    let Some(schema) = context.schema else {
1081        return ProjectionNullability::Unknown;
1082    };
1083
1084    let table_name = resolved_source_name
1085        .as_ref()
1086        .and_then(|name| scope.sources.get(name).and_then(source_table_name))
1087        .or(resolved_source_name);
1088
1089    let Some(table_name) = table_name else {
1090        return ProjectionNullability::Unknown;
1091    };
1092
1093    match schema.column(&table_name, column_name) {
1094        Some(info) if info.primary_key || info.nullable == Some(false) => {
1095            ProjectionNullability::NonNull
1096        }
1097        Some(info) if info.nullable == Some(true) => ProjectionNullability::Nullable,
1098        Some(_) | None => ProjectionNullability::Unknown,
1099    }
1100}
1101
1102fn single_scope_source_name(scope: &Scope) -> Option<String> {
1103    if scope.sources.len() == 1 {
1104        scope.sources.keys().next().cloned()
1105    } else {
1106        None
1107    }
1108}
1109
1110fn coalesce_nullability(
1111    expressions: &[Expression],
1112    scope: &Scope,
1113    context: &NullabilityContext<'_>,
1114) -> ProjectionNullability {
1115    if expressions.is_empty() {
1116        return ProjectionNullability::Unknown;
1117    }
1118
1119    let mut all_nullable = true;
1120
1121    for expression in expressions {
1122        match projection_nullability(unwrap_projection_alias(expression), scope, context) {
1123            ProjectionNullability::NonNull => return ProjectionNullability::NonNull,
1124            ProjectionNullability::Nullable => {}
1125            ProjectionNullability::Unknown => all_nullable = false,
1126        }
1127    }
1128
1129    if all_nullable {
1130        ProjectionNullability::Nullable
1131    } else {
1132        ProjectionNullability::Unknown
1133    }
1134}
1135
1136fn terminal_references_from_lineage(node: &LineageNode) -> Vec<ColumnReferenceFact> {
1137    let mut refs = Vec::new();
1138    collect_terminal_references(node, &mut refs);
1139    dedupe_column_refs(refs)
1140}
1141
1142fn collect_terminal_references(node: &LineageNode, refs: &mut Vec<ColumnReferenceFact>) {
1143    if node.downstream.is_empty() {
1144        if let Some(reference) = column_reference_from_lineage_node(node) {
1145            refs.push(reference);
1146        }
1147        return;
1148    }
1149
1150    for child in &node.downstream {
1151        collect_terminal_references(child, refs);
1152    }
1153}
1154
1155fn column_reference_from_lineage_node(node: &LineageNode) -> Option<ColumnReferenceFact> {
1156    match &node.expression {
1157        Expression::Column(column) => {
1158            let source_name = non_empty_string(node.source_name.clone());
1159            let table =
1160                lineage_node_table(node).or_else(|| column.table.as_ref().map(|t| t.name.clone()));
1161            let confidence = if node.source_kind == SourceKind::Unknown && source_name.is_none() {
1162                ReferenceConfidence::Unknown
1163            } else {
1164                ReferenceConfidence::Resolved
1165            };
1166            Some(ColumnReferenceFact {
1167                source_name,
1168                source_alias: node.source_alias.clone(),
1169                source_kind: node.source_kind,
1170                table,
1171                column: column.name.name.clone(),
1172                unqualified: column.table.is_none(),
1173                confidence,
1174            })
1175        }
1176        Expression::Star(_) => Some(ColumnReferenceFact {
1177            source_name: non_empty_string(node.source_name.clone()),
1178            source_alias: node.source_alias.clone(),
1179            source_kind: node.source_kind,
1180            table: lineage_node_table(node),
1181            column: "*".to_string(),
1182            unqualified: true,
1183            confidence: if node.source_kind == SourceKind::Unknown {
1184                ReferenceConfidence::Unknown
1185            } else {
1186                ReferenceConfidence::Resolved
1187            },
1188        }),
1189        _ => None,
1190    }
1191}
1192
1193fn lineage_node_table(node: &LineageNode) -> Option<String> {
1194    match &node.source {
1195        Expression::Table(table) => Some(table_name(table)),
1196        _ => None,
1197    }
1198}
1199
1200fn fallback_column_references(expression: &Expression, scope: &Scope) -> Vec<ColumnReferenceFact> {
1201    let mut refs = Vec::new();
1202    let source_count = scope.sources.len();
1203    let single_source = if source_count == 1 {
1204        scope.sources.iter().next()
1205    } else {
1206        None
1207    };
1208
1209    for column_expr in expression.find_all(|candidate| matches!(candidate, Expression::Column(_))) {
1210        if let Expression::Column(column) = column_expr {
1211            if column.name.name == "*" {
1212                continue;
1213            }
1214            let source = column
1215                .table
1216                .as_ref()
1217                .and_then(|table| scope.sources.get(&table.name));
1218            let (source_name, source_alias, source_kind, table, confidence) =
1219                if let Some(table_identifier) = &column.table {
1220                    if let Some(source) = source {
1221                        (
1222                            Some(table_identifier.name.clone()),
1223                            source.alias.clone(),
1224                            source.kind,
1225                            source_table_name(source)
1226                                .or_else(|| Some(table_identifier.name.clone())),
1227                            ReferenceConfidence::Resolved,
1228                        )
1229                    } else {
1230                        (
1231                            Some(table_identifier.name.clone()),
1232                            None,
1233                            SourceKind::Unknown,
1234                            Some(table_identifier.name.clone()),
1235                            ReferenceConfidence::Unknown,
1236                        )
1237                    }
1238                } else if let Some((name, source)) = single_source {
1239                    (
1240                        Some(name.clone()),
1241                        source.alias.clone(),
1242                        source.kind,
1243                        source_table_name(source).or_else(|| Some(name.clone())),
1244                        ReferenceConfidence::Resolved,
1245                    )
1246                } else if source_count > 1 {
1247                    (
1248                        None,
1249                        None,
1250                        SourceKind::Unknown,
1251                        None,
1252                        ReferenceConfidence::Ambiguous,
1253                    )
1254                } else {
1255                    (
1256                        None,
1257                        None,
1258                        SourceKind::Unknown,
1259                        None,
1260                        ReferenceConfidence::Unknown,
1261                    )
1262                };
1263
1264            refs.push(ColumnReferenceFact {
1265                source_name,
1266                source_alias,
1267                source_kind,
1268                table,
1269                column: column.name.name.clone(),
1270                unqualified: column.table.is_none(),
1271                confidence,
1272            });
1273        }
1274    }
1275
1276    dedupe_column_refs(refs)
1277}
1278
1279fn dedupe_column_refs(refs: Vec<ColumnReferenceFact>) -> Vec<ColumnReferenceFact> {
1280    let mut seen = HashSet::new();
1281    let mut deduped = Vec::new();
1282
1283    for reference in refs {
1284        let key = (
1285            reference.source_name.clone(),
1286            reference.source_alias.clone(),
1287            reference.table.clone(),
1288            reference.column.clone(),
1289            format!("{:?}", reference.source_kind),
1290            reference.unqualified,
1291            format!("{:?}", reference.confidence),
1292        );
1293        if seen.insert(key) {
1294            deduped.push(reference);
1295        }
1296    }
1297
1298    deduped
1299}
1300
1301fn relation_facts(
1302    scope: &Scope,
1303    mapping_schema: Option<&crate::schema::MappingSchema>,
1304) -> Vec<RelationFact> {
1305    let mut relations = Vec::new();
1306    let mut seen = HashSet::new();
1307    collect_relation_facts(scope, mapping_schema, &mut seen, &mut relations);
1308
1309    relations.sort_by(|left, right| {
1310        left.name
1311            .cmp(&right.name)
1312            .then_with(|| left.alias.cmp(&right.alias))
1313    });
1314    relations
1315}
1316
1317fn collect_relation_facts(
1318    scope: &Scope,
1319    mapping_schema: Option<&crate::schema::MappingSchema>,
1320    seen: &mut HashSet<String>,
1321    relations: &mut Vec<RelationFact>,
1322) {
1323    for relation in scope.sources.iter().map(|(source_name, source)| {
1324        let identity = source_table_identity(source);
1325        RelationFact {
1326            name: source
1327                .lineage_name
1328                .clone()
1329                .or_else(|| identity.as_ref().map(|identity| identity.name.clone()))
1330                .unwrap_or_else(|| source_name.clone()),
1331            alias: source.alias.clone().or_else(|| source_alias(source)),
1332            kind: source.kind,
1333            columns: source_columns(source, mapping_schema),
1334            catalog: identity
1335                .as_ref()
1336                .and_then(|identity| identity.catalog.clone()),
1337            schema: identity
1338                .as_ref()
1339                .and_then(|identity| identity.schema.clone()),
1340            table: identity
1341                .as_ref()
1342                .and_then(|identity| identity.table.clone()),
1343        }
1344    }) {
1345        let key = format!("{:?}|{}|{:?}", relation.kind, relation.name, relation.alias);
1346        if seen.insert(key) {
1347            relations.push(relation);
1348        }
1349    }
1350
1351    for branch_scope in &scope.union_scopes {
1352        collect_relation_facts(branch_scope, mapping_schema, seen, relations);
1353    }
1354}
1355
1356fn base_table_facts(
1357    scope: &Scope,
1358    mapping_schema: Option<&crate::schema::MappingSchema>,
1359) -> Vec<RelationFact> {
1360    let mut relations = Vec::new();
1361    let mut seen = HashSet::new();
1362
1363    collect_base_table_facts(scope, mapping_schema, &mut seen, &mut relations);
1364
1365    relations.sort_by(|left, right| left.name.cmp(&right.name));
1366    relations
1367}
1368
1369fn collect_base_table_facts(
1370    scope: &Scope,
1371    mapping_schema: Option<&crate::schema::MappingSchema>,
1372    seen: &mut HashSet<String>,
1373    relations: &mut Vec<RelationFact>,
1374) {
1375    for source in scope.sources.values() {
1376        if source.kind != SourceKind::Table {
1377            continue;
1378        }
1379
1380        let Some(identity) = source_table_identity(source) else {
1381            continue;
1382        };
1383
1384        if seen.insert(identity.name.clone()) {
1385            relations.push(RelationFact {
1386                name: identity.name,
1387                alias: source.alias.clone().or_else(|| source_alias(source)),
1388                kind: SourceKind::Table,
1389                columns: source_columns(source, mapping_schema),
1390                catalog: identity.catalog,
1391                schema: identity.schema,
1392                table: identity.table,
1393            });
1394        }
1395    }
1396
1397    for child_scope in scope
1398        .cte_scopes
1399        .iter()
1400        .chain(scope.union_scopes.iter())
1401        .chain(scope.table_scopes.iter())
1402        .chain(scope.derived_table_scopes.iter())
1403        .chain(scope.subquery_scopes.iter())
1404    {
1405        collect_base_table_facts(child_scope, mapping_schema, seen, relations);
1406    }
1407}
1408
1409fn source_columns(
1410    source: &SourceInfo,
1411    mapping_schema: Option<&crate::schema::MappingSchema>,
1412) -> Vec<String> {
1413    match &source.expression {
1414        Expression::Table(table) => mapping_schema
1415            .and_then(|schema| schema.column_names(&table_name(table)).ok())
1416            .unwrap_or_default(),
1417        Expression::Select(_)
1418        | Expression::Union(_)
1419        | Expression::Intersect(_)
1420        | Expression::Except(_) => get_output_column_names(&source.expression),
1421        Expression::Subquery(subquery) => get_output_column_names(&subquery.this),
1422        Expression::Cte(cte) if !cte.columns.is_empty() => cte
1423            .columns
1424            .iter()
1425            .map(|column| column.name.clone())
1426            .collect(),
1427        Expression::Cte(cte) => get_output_column_names(&cte.this),
1428        _ => Vec::new(),
1429    }
1430}
1431
1432fn source_table_name(source: &SourceInfo) -> Option<String> {
1433    source_table_identity(source).map(|identity| identity.name)
1434}
1435
1436fn source_alias(source: &SourceInfo) -> Option<String> {
1437    match &source.expression {
1438        Expression::Table(table) => table.alias.as_ref().map(|alias| alias.name.clone()),
1439        Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
1440        _ => None,
1441    }
1442}
1443
1444fn table_name(table: &TableRef) -> String {
1445    let mut parts = Vec::new();
1446    if let Some(catalog) = &table.catalog {
1447        parts.push(catalog.name.clone());
1448    }
1449    if let Some(schema) = &table.schema {
1450        parts.push(schema.name.clone());
1451    }
1452    parts.push(table.name.name.clone());
1453    parts.join(".")
1454}
1455
1456#[derive(Debug, Clone)]
1457struct RelationIdentity {
1458    name: String,
1459    catalog: Option<String>,
1460    schema: Option<String>,
1461    table: Option<String>,
1462}
1463
1464fn source_table_identity(source: &SourceInfo) -> Option<RelationIdentity> {
1465    match &source.expression {
1466        Expression::Table(table) => Some(table_identity(table)),
1467        _ => None,
1468    }
1469}
1470
1471fn table_identity(table: &TableRef) -> RelationIdentity {
1472    RelationIdentity {
1473        name: table_name(table),
1474        catalog: table.catalog.as_ref().map(|catalog| catalog.name.clone()),
1475        schema: table.schema.as_ref().map(|schema| schema.name.clone()),
1476        table: Some(table.name.name.clone()),
1477    }
1478}
1479
1480fn set_operation_facts(
1481    expression: &Expression,
1482    scope: &Scope,
1483    dialect: DialectType,
1484) -> Vec<SetOperationFact> {
1485    let mut facts = Vec::new();
1486    collect_set_operation_facts(expression, scope, dialect, &mut facts);
1487    facts
1488}
1489
1490fn collect_set_operation_facts(
1491    expression: &Expression,
1492    scope: &Scope,
1493    dialect: DialectType,
1494    facts: &mut Vec<SetOperationFact>,
1495) {
1496    match expression {
1497        Expression::Union(union) => {
1498            facts.push(SetOperationFact {
1499                kind: "union".to_string(),
1500                all: union.all,
1501                distinct: union.distinct,
1502                output_columns: get_output_column_names(expression),
1503                branches: set_operation_branches(&union.left, &union.right, scope, dialect),
1504            });
1505            collect_set_operation_facts(&union.left, scope, dialect, facts);
1506            collect_set_operation_facts(&union.right, scope, dialect, facts);
1507        }
1508        Expression::Intersect(intersect) => {
1509            facts.push(SetOperationFact {
1510                kind: "intersect".to_string(),
1511                all: intersect.all,
1512                distinct: intersect.distinct,
1513                output_columns: get_output_column_names(expression),
1514                branches: set_operation_branches(&intersect.left, &intersect.right, scope, dialect),
1515            });
1516            collect_set_operation_facts(&intersect.left, scope, dialect, facts);
1517            collect_set_operation_facts(&intersect.right, scope, dialect, facts);
1518        }
1519        Expression::Except(except) => {
1520            facts.push(SetOperationFact {
1521                kind: "except".to_string(),
1522                all: except.all,
1523                distinct: except.distinct,
1524                output_columns: get_output_column_names(expression),
1525                branches: set_operation_branches(&except.left, &except.right, scope, dialect),
1526            });
1527            collect_set_operation_facts(&except.left, scope, dialect, facts);
1528            collect_set_operation_facts(&except.right, scope, dialect, facts);
1529        }
1530        Expression::Subquery(subquery) => {
1531            collect_set_operation_facts(&subquery.this, scope, dialect, facts);
1532        }
1533        _ => {}
1534    }
1535}
1536
1537fn set_operation_branches(
1538    left: &Expression,
1539    right: &Expression,
1540    scope: &Scope,
1541    dialect: DialectType,
1542) -> Vec<SetOperationBranchFact> {
1543    vec![
1544        SetOperationBranchFact {
1545            index: 0,
1546            projections: projection_facts_for_branch(left, scope, dialect),
1547        },
1548        SetOperationBranchFact {
1549            index: 1,
1550            projections: projection_facts_for_branch(right, scope, dialect),
1551        },
1552    ]
1553}
1554
1555fn projection_facts_for_branch(
1556    expression: &Expression,
1557    root_scope: &Scope,
1558    dialect: DialectType,
1559) -> Vec<ProjectionFact> {
1560    let branch_scope = build_scope(expression);
1561    let scope = if branch_scope.sources.is_empty() {
1562        root_scope
1563    } else {
1564        &branch_scope
1565    };
1566    let nullability_context = NullabilityContext {
1567        schema: None,
1568        nullable_sources: nullable_source_names(expression),
1569    };
1570    projection_facts_for_query(expression, scope, dialect, &nullability_context)
1571}
1572
1573fn non_empty_string(value: String) -> Option<String> {
1574    if value.is_empty() {
1575        None
1576    } else {
1577        Some(value)
1578    }
1579}