1use crate::ast_transforms::get_output_column_names;
9use crate::dialects::{Dialect, DialectType};
10use crate::expressions::{DataType, Expression, JoinKind, TableRef, With};
11use crate::lineage::{lineage_by_index_from_expression, LineageNode};
12use crate::optimizer::annotate_types::annotate_types;
13use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
14use crate::schema::{MappingSchema, Schema};
15use crate::scope::{build_scope, Scope, SourceInfo, SourceKind};
16use crate::traversal::{contains_aggregate, ExpressionWalk};
17use crate::validation::{mapping_schema_from_validation_schema, ValidationSchema};
18use crate::{parse_data_type, parse_one, Error, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21
22#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24#[serde(rename_all = "camelCase", default)]
25pub struct AnalyzeQueryOptions {
26 pub dialect: DialectType,
28 pub schema: Option<ValidationSchema>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct QueryAnalysis {
36 pub shape: QueryShape,
37 pub ctes: Vec<String>,
38 pub cte_facts: Vec<CteFact>,
39 pub projections: Vec<ProjectionFact>,
40 pub relations: Vec<RelationFact>,
41 pub base_tables: Vec<RelationFact>,
42 pub star_projections: Vec<StarProjectionFact>,
43 pub set_operations: Vec<SetOperationFact>,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum QueryShape {
50 Select,
51 SetOperation,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(rename_all = "camelCase")]
57pub struct ProjectionFact {
58 pub index: usize,
59 pub name: Option<String>,
60 pub is_star: bool,
61 pub star_table: Option<String>,
62 pub transform_kind: TransformKind,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub transform_function: Option<TransformFunctionFact>,
65 pub cast_type: Option<String>,
66 pub type_hint: Option<String>,
67 pub nullability: ProjectionNullability,
68 pub upstream: Vec<ColumnReferenceFact>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct TransformFunctionFact {
75 pub name: String,
76 pub literal_args: Vec<String>,
77 pub column_args: Vec<ColumnReferenceFact>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct CteFact {
84 pub name: String,
85 pub columns: Vec<String>,
86 pub body_sql: String,
87 pub output_columns: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "camelCase")]
93pub struct StarProjectionFact {
94 pub index: usize,
95 pub table: Option<String>,
96 pub expanded_columns: Vec<String>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "camelCase")]
102pub struct ColumnReferenceFact {
103 pub source_name: Option<String>,
104 pub source_alias: Option<String>,
105 pub source_kind: SourceKind,
106 pub table: Option<String>,
107 pub column: String,
108 pub unqualified: bool,
109 pub confidence: ReferenceConfidence,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct RelationFact {
116 pub name: String,
117 pub alias: Option<String>,
118 pub kind: SourceKind,
119 pub columns: Vec<String>,
120 pub catalog: Option<String>,
121 pub schema: Option<String>,
122 pub table: Option<String>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct SetOperationFact {
129 pub kind: String,
130 pub all: bool,
131 pub distinct: bool,
132 pub output_columns: Vec<String>,
133 pub branches: Vec<SetOperationBranchFact>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138#[serde(rename_all = "camelCase")]
139pub struct SetOperationBranchFact {
140 pub index: usize,
141 pub projections: Vec<ProjectionFact>,
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub enum TransformKind {
148 Direct,
149 Cast,
150 Aggregation,
151 Constant,
152 Expression,
153 Star,
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum ReferenceConfidence {
160 Resolved,
161 Ambiguous,
162 Unknown,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum ProjectionNullability {
169 NonNull,
170 Nullable,
171 Unknown,
172}
173
174pub fn analyze_query(sql: &str, options: AnalyzeQueryOptions) -> Result<QueryAnalysis> {
176 let mut expression = parse_one(sql, options.dialect)?;
177 expression = effective_query(expression);
178 ensure_query(&expression)?;
179 let original_expression = expression.clone();
180
181 let mapping_schema = options
182 .schema
183 .as_ref()
184 .map(|schema| analysis_mapping_schema(schema, options.dialect));
185 let schema_info = options.schema.as_ref().map(AnalysisSchemaInfo::from_schema);
186 let cte_facts = top_level_cte_facts(&original_expression, options.dialect)?;
187 let star_projections = star_projection_facts(&original_expression, mapping_schema.as_ref());
188
189 if let Some(schema) = mapping_schema.as_ref() {
190 let qualify_options = QualifyColumnsOptions::new().with_dialect(options.dialect);
191 expression = qualify_columns(expression, schema, &qualify_options)
192 .map_err(|e| Error::internal(format!("query analysis qualification failed: {e}")))?;
193 }
194
195 let annotation_schema = mapping_schema.as_ref().map(|schema| {
196 let mut alias_schema = schema.clone();
197 add_scope_aliases_to_schema(
198 &build_scope(&expression),
199 schema,
200 &mut alias_schema,
201 options.dialect,
202 );
203 alias_schema
204 });
205
206 annotate_types(
207 &mut expression,
208 annotation_schema
209 .as_ref()
210 .map(|schema| schema as &dyn Schema),
211 Some(options.dialect),
212 );
213 crate::lineage::expand_cte_stars(
214 &mut expression,
215 annotation_schema
216 .as_ref()
217 .or(mapping_schema.as_ref())
218 .map(|schema| schema as &dyn Schema),
219 );
220
221 let scope = build_scope(&expression);
222 let nullability_context = NullabilityContext {
223 schema: schema_info.as_ref(),
224 nullable_sources: nullable_source_names(&expression),
225 };
226 let shape = if is_set_operation(&expression) {
227 QueryShape::SetOperation
228 } else {
229 QueryShape::Select
230 };
231
232 Ok(QueryAnalysis {
233 shape,
234 ctes: collect_cte_names(&expression),
235 cte_facts,
236 projections: projection_facts_for_query(
237 &expression,
238 &scope,
239 options.dialect,
240 &nullability_context,
241 ),
242 relations: relation_facts(&scope, mapping_schema.as_ref()),
243 base_tables: base_table_facts(&scope, mapping_schema.as_ref()),
244 star_projections,
245 set_operations: set_operation_facts(&expression, &scope, options.dialect),
246 })
247}
248
249fn analysis_mapping_schema(schema: &ValidationSchema, dialect: DialectType) -> MappingSchema {
250 let broad_schema = mapping_schema_from_validation_schema(schema);
251 let mut mapping_schema = MappingSchema::with_dialect(dialect);
252
253 for table in &schema.tables {
254 let table_names = validation_table_names(table);
255 if table_names.is_empty() {
256 continue;
257 }
258
259 let fallback_table = table_names[0].as_str();
260 let columns: Vec<(String, DataType)> = table
261 .columns
262 .iter()
263 .map(|column| {
264 let data_type = parse_analysis_data_type(&column.data_type, dialect)
265 .unwrap_or_else(|| {
266 broad_schema
267 .get_column_type(fallback_table, &column.name)
268 .unwrap_or(DataType::Unknown)
269 });
270 (column.name.to_ascii_lowercase(), data_type)
271 })
272 .collect();
273
274 for table_name in table_names {
275 let _ = mapping_schema.add_table(&table_name, &columns, Some(dialect));
276 }
277 }
278
279 mapping_schema
280}
281
282fn validation_table_names(table: &crate::validation::SchemaTable) -> Vec<String> {
283 let mut names = Vec::new();
284
285 names.push(table.name.to_ascii_lowercase());
286 if let Some(schema_name) = &table.schema {
287 names.push(format!(
288 "{}.{}",
289 schema_name.to_ascii_lowercase(),
290 table.name.to_ascii_lowercase()
291 ));
292 }
293 for alias in &table.aliases {
294 names.push(alias.to_ascii_lowercase());
295 }
296
297 names.sort();
298 names.dedup();
299 names
300}
301
302fn parse_analysis_data_type(data_type: &str, dialect: DialectType) -> Option<DataType> {
303 let trimmed = data_type.trim();
304 if trimmed.is_empty() {
305 return None;
306 }
307 parse_data_type(trimmed, dialect).ok()
308}
309
310fn add_scope_aliases_to_schema(
311 scope: &Scope,
312 source_schema: &MappingSchema,
313 target_schema: &mut MappingSchema,
314 dialect: DialectType,
315) {
316 for child_scope in scope.traverse() {
317 for (source_name, source) in &child_scope.sources {
318 if source.kind != SourceKind::Table {
319 continue;
320 }
321 if let Some(table_name) = source_table_name(source) {
322 if source_name == &table_name {
323 continue;
324 }
325 if let Ok(column_names) = source_schema.column_names(&table_name) {
326 let columns: Vec<(String, DataType)> = column_names
327 .iter()
328 .map(|column| {
329 (
330 column.clone(),
331 source_schema
332 .get_column_type(&table_name, column)
333 .unwrap_or(DataType::Unknown),
334 )
335 })
336 .collect();
337 let _ = target_schema.add_table(source_name, &columns, Some(dialect));
338 }
339 }
340 }
341 }
342}
343
344#[derive(Debug, Clone)]
345struct AnalysisColumnInfo {
346 nullable: Option<bool>,
347 primary_key: bool,
348}
349
350#[derive(Debug, Clone)]
351struct AnalysisSchemaInfo {
352 columns: HashMap<(String, String), AnalysisColumnInfo>,
353}
354
355impl AnalysisSchemaInfo {
356 fn from_schema(schema: &ValidationSchema) -> Self {
357 let mut columns = HashMap::new();
358
359 for table in &schema.tables {
360 let table_names = validation_table_names(table);
361 let primary_keys: HashSet<String> = table
362 .primary_key
363 .iter()
364 .map(|column| column.to_ascii_lowercase())
365 .collect();
366
367 for column in &table.columns {
368 let info = AnalysisColumnInfo {
369 nullable: column.nullable,
370 primary_key: column.primary_key
371 || primary_keys.contains(&column.name.to_ascii_lowercase()),
372 };
373
374 for table_name in &table_names {
375 columns.insert(
376 (
377 normalize_lookup_name(table_name),
378 normalize_lookup_name(&column.name),
379 ),
380 info.clone(),
381 );
382 }
383 }
384 }
385
386 Self { columns }
387 }
388
389 fn column(&self, table: &str, column: &str) -> Option<&AnalysisColumnInfo> {
390 self.columns
391 .get(&(normalize_lookup_name(table), normalize_lookup_name(column)))
392 }
393}
394
395struct NullabilityContext<'a> {
396 schema: Option<&'a AnalysisSchemaInfo>,
397 nullable_sources: HashSet<String>,
398}
399
400fn top_level_cte_facts(expression: &Expression, dialect: DialectType) -> Result<Vec<CteFact>> {
401 let Some(with_clause) = with_clause(expression) else {
402 return Ok(Vec::new());
403 };
404
405 with_clause
406 .ctes
407 .iter()
408 .map(|cte| {
409 Ok(CteFact {
410 name: cte.alias.name.clone(),
411 columns: cte
412 .columns
413 .iter()
414 .map(|column| column.name.clone())
415 .collect(),
416 body_sql: Dialect::get(dialect).generate(&cte.this)?,
417 output_columns: get_output_column_names(&cte.this),
418 })
419 })
420 .collect()
421}
422
423fn star_projection_facts(
424 expression: &Expression,
425 mapping_schema: Option<&MappingSchema>,
426) -> Vec<StarProjectionFact> {
427 let scope = build_scope(expression);
428 let ordered_sources = ordered_source_names_for_query(expression);
429
430 select_expressions_for_query(expression)
431 .iter()
432 .enumerate()
433 .filter_map(|(index, projection)| {
434 let inner = unwrap_projection_alias(projection);
435 if !projection_is_star(inner) {
436 return None;
437 }
438
439 let table = projection_star_table(inner);
440 let expanded_columns =
441 expanded_star_columns(table.as_deref(), &scope, &ordered_sources, mapping_schema);
442
443 Some(StarProjectionFact {
444 index,
445 table,
446 expanded_columns,
447 })
448 })
449 .collect()
450}
451
452fn expanded_star_columns(
453 star_table: Option<&str>,
454 scope: &Scope,
455 ordered_sources: &[String],
456 mapping_schema: Option<&MappingSchema>,
457) -> Vec<String> {
458 let mut columns = Vec::new();
459 let mut source_names: Vec<String> = if ordered_sources.is_empty() {
460 let mut names: Vec<_> = scope.sources.keys().cloned().collect();
461 names.sort();
462 names
463 } else {
464 ordered_sources.to_vec()
465 };
466
467 source_names.dedup();
468
469 for source_name in source_names {
470 let Some(source) = scope.sources.get(&source_name) else {
471 continue;
472 };
473
474 if let Some(star_table) = star_table {
475 let matches = source_name.eq_ignore_ascii_case(star_table)
476 || source
477 .alias
478 .as_deref()
479 .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
480 || source_table_name(source)
481 .is_some_and(|table| table.eq_ignore_ascii_case(star_table));
482
483 if !matches {
484 continue;
485 }
486 }
487
488 columns.extend(source_columns(source, mapping_schema));
489 }
490
491 columns
492}
493
494fn ordered_source_names_for_query(expression: &Expression) -> Vec<String> {
495 match expression {
496 Expression::Select(select) => ordered_source_names_for_select(select),
497 Expression::Union(union) => ordered_source_names_for_query(&union.left),
498 Expression::Intersect(intersect) => ordered_source_names_for_query(&intersect.left),
499 Expression::Except(except) => ordered_source_names_for_query(&except.left),
500 Expression::Subquery(subquery) => ordered_source_names_for_query(&subquery.this),
501 _ => Vec::new(),
502 }
503}
504
505fn ordered_source_names_for_select(select: &crate::expressions::Select) -> Vec<String> {
506 let mut sources = Vec::new();
507
508 if let Some(from) = &select.from {
509 for expression in &from.expressions {
510 if let Some(source_name) = expression_source_name(expression) {
511 sources.push(source_name);
512 }
513 }
514 }
515
516 for join in &select.joins {
517 if let Some(source_name) = expression_source_name(&join.this) {
518 sources.push(source_name);
519 }
520 }
521
522 sources
523}
524
525fn nullable_source_names(expression: &Expression) -> HashSet<String> {
526 match expression {
527 Expression::Select(select) => nullable_source_names_for_select(select),
528 Expression::Union(union) => nullable_source_names(&union.left),
529 Expression::Intersect(intersect) => nullable_source_names(&intersect.left),
530 Expression::Except(except) => nullable_source_names(&except.left),
531 Expression::Subquery(subquery) => nullable_source_names(&subquery.this),
532 _ => HashSet::new(),
533 }
534}
535
536fn nullable_source_names_for_select(select: &crate::expressions::Select) -> HashSet<String> {
537 let mut nullable = HashSet::new();
538 let mut left_sources = Vec::new();
539
540 if let Some(from) = &select.from {
541 for expression in &from.expressions {
542 if let Some(source_name) = expression_source_name(expression) {
543 left_sources.push(source_name);
544 }
545 }
546 }
547
548 for join in &select.joins {
549 let right_source = expression_source_name(&join.this);
550
551 if join_nullable_left(join.kind) {
552 for source_name in &left_sources {
553 nullable.insert(normalize_lookup_name(source_name));
554 }
555 }
556
557 if join_nullable_right(join.kind) {
558 if let Some(source_name) = &right_source {
559 nullable.insert(normalize_lookup_name(source_name));
560 }
561 }
562
563 if let Some(source_name) = right_source {
564 left_sources.push(source_name);
565 }
566 }
567
568 nullable
569}
570
571fn join_nullable_left(kind: JoinKind) -> bool {
572 matches!(
573 kind,
574 JoinKind::Right
575 | JoinKind::NaturalRight
576 | JoinKind::AsOfRight
577 | JoinKind::Full
578 | JoinKind::NaturalFull
579 | JoinKind::Outer
580 )
581}
582
583fn join_nullable_right(kind: JoinKind) -> bool {
584 matches!(
585 kind,
586 JoinKind::Left
587 | JoinKind::NaturalLeft
588 | JoinKind::AsOfLeft
589 | JoinKind::LeftLateral
590 | JoinKind::OuterApply
591 | JoinKind::LeftArray
592 | JoinKind::Full
593 | JoinKind::NaturalFull
594 | JoinKind::Outer
595 )
596}
597
598fn expression_source_name(expression: &Expression) -> Option<String> {
599 match expression {
600 Expression::Table(table) => table
601 .alias
602 .as_ref()
603 .map(|alias| alias.name.clone())
604 .or_else(|| Some(table.name.name.clone())),
605 Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
606 Expression::Alias(alias) => Some(alias.alias.name.clone()),
607 Expression::Cte(cte) => Some(cte.alias.name.clone()),
608 _ => None,
609 }
610}
611
612fn normalize_lookup_name(name: &str) -> String {
613 name.to_ascii_lowercase()
614}
615
616fn effective_query(expression: Expression) -> Expression {
617 match expression {
618 Expression::Prepare(prepare) => prepare.statement,
619 Expression::Subquery(subquery) if subquery.alias.is_none() => subquery.this,
620 other => other,
621 }
622}
623
624fn ensure_query(expression: &Expression) -> Result<()> {
625 if matches!(
626 expression,
627 Expression::Select(_)
628 | Expression::Union(_)
629 | Expression::Intersect(_)
630 | Expression::Except(_)
631 ) {
632 Ok(())
633 } else {
634 Err(Error::internal(
635 "analyze_query requires a SELECT or set operation query",
636 ))
637 }
638}
639
640fn is_set_operation(expression: &Expression) -> bool {
641 matches!(
642 expression,
643 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
644 )
645}
646
647fn collect_cte_names(expression: &Expression) -> Vec<String> {
648 let mut names = Vec::new();
649 let mut seen = HashSet::new();
650 collect_cte_names_inner(expression, &mut names, &mut seen);
651 names
652}
653
654fn collect_cte_names_inner(
655 expression: &Expression,
656 names: &mut Vec<String>,
657 seen: &mut HashSet<String>,
658) {
659 if let Some(with_clause) = with_clause(expression) {
660 collect_with_names(with_clause, names, seen);
661 }
662
663 match expression {
664 Expression::Union(union) => {
665 collect_cte_names_inner(&union.left, names, seen);
666 collect_cte_names_inner(&union.right, names, seen);
667 }
668 Expression::Intersect(intersect) => {
669 collect_cte_names_inner(&intersect.left, names, seen);
670 collect_cte_names_inner(&intersect.right, names, seen);
671 }
672 Expression::Except(except) => {
673 collect_cte_names_inner(&except.left, names, seen);
674 collect_cte_names_inner(&except.right, names, seen);
675 }
676 Expression::Subquery(subquery) => collect_cte_names_inner(&subquery.this, names, seen),
677 _ => {}
678 }
679}
680
681fn collect_with_names(with_clause: &With, names: &mut Vec<String>, seen: &mut HashSet<String>) {
682 for cte in &with_clause.ctes {
683 if seen.insert(cte.alias.name.clone()) {
684 names.push(cte.alias.name.clone());
685 }
686 collect_cte_names_inner(&cte.this, names, seen);
687 }
688}
689
690fn with_clause(expression: &Expression) -> Option<&With> {
691 match expression {
692 Expression::Select(select) => select.with.as_ref(),
693 Expression::Union(union) => union.with.as_ref(),
694 Expression::Intersect(intersect) => intersect.with.as_ref(),
695 Expression::Except(except) => except.with.as_ref(),
696 _ => None,
697 }
698}
699
700fn projection_facts_for_query(
701 expression: &Expression,
702 scope: &Scope,
703 dialect: DialectType,
704 nullability_context: &NullabilityContext<'_>,
705) -> Vec<ProjectionFact> {
706 let expressions = select_expressions_for_query(expression);
707 let names = get_output_column_names(expression);
708
709 expressions
710 .iter()
711 .enumerate()
712 .map(|(index, projection)| {
713 projection_fact(
714 index,
715 names
716 .get(index)
717 .cloned()
718 .or_else(|| projection_name(projection)),
719 projection,
720 expression,
721 scope,
722 dialect,
723 nullability_context,
724 )
725 })
726 .collect()
727}
728
729fn select_expressions_for_query(expression: &Expression) -> Vec<&Expression> {
730 match expression {
731 Expression::Select(select) => select.expressions.iter().collect(),
732 Expression::Union(union) => select_expressions_for_query(&union.left),
733 Expression::Intersect(intersect) => select_expressions_for_query(&intersect.left),
734 Expression::Except(except) => select_expressions_for_query(&except.left),
735 Expression::Subquery(subquery) => select_expressions_for_query(&subquery.this),
736 _ => Vec::new(),
737 }
738}
739
740fn projection_fact(
741 index: usize,
742 name: Option<String>,
743 projection: &Expression,
744 query: &Expression,
745 scope: &Scope,
746 dialect: DialectType,
747 nullability_context: &NullabilityContext<'_>,
748) -> ProjectionFact {
749 let inner = unwrap_projection_alias(projection);
750 let is_star = projection_is_star(inner);
751 let upstream = lineage_by_index_from_expression(index, query, Some(dialect), false)
752 .map(|node| terminal_references_from_lineage(&node))
753 .ok()
754 .filter(|refs| !refs.is_empty())
755 .unwrap_or_else(|| fallback_column_references(inner, scope));
756
757 ProjectionFact {
758 index,
759 name,
760 is_star,
761 star_table: projection_star_table(inner),
762 transform_kind: transform_kind(inner),
763 transform_function: transform_function_fact(inner, scope, dialect),
764 cast_type: cast_type(inner, dialect),
765 type_hint: projection
766 .inferred_type()
767 .or_else(|| inner.inferred_type())
768 .and_then(|data_type| render_data_type(data_type, dialect)),
769 nullability: projection_nullability(inner, scope, nullability_context),
770 upstream,
771 }
772}
773
774fn transform_function_fact(
775 expression: &Expression,
776 scope: &Scope,
777 dialect: DialectType,
778) -> Option<TransformFunctionFact> {
779 let mut matches = expression
780 .find_all(|candidate| transform_function_fact_for_node(candidate, scope, dialect).is_some())
781 .into_iter();
782
783 let first = matches.next()?;
784 if matches.next().is_some() {
785 return None;
786 }
787
788 transform_function_fact_for_node(first, scope, dialect)
789}
790
791fn transform_function_fact_for_node(
792 expression: &Expression,
793 scope: &Scope,
794 dialect: DialectType,
795) -> Option<TransformFunctionFact> {
796 match expression {
797 Expression::Function(function) => Some(transform_function_from_args(
798 &function.name,
799 &function.args,
800 scope,
801 dialect,
802 )),
803 Expression::AggregateFunction(function) => Some(transform_function_from_args(
804 &function.name,
805 &function.args,
806 scope,
807 dialect,
808 )),
809 Expression::DateTrunc(function) => Some(transform_function_from_parts(
810 "DATE_TRUNC",
811 vec![datetime_field_name(&function.unit)],
812 vec![&function.this],
813 scope,
814 dialect,
815 )),
816 Expression::TimestampTrunc(function) => Some(transform_function_from_parts(
817 "TIMESTAMP_TRUNC",
818 vec![datetime_field_name(&function.unit)],
819 vec![&function.this],
820 scope,
821 dialect,
822 )),
823 Expression::TimeTrunc(function) => {
824 let mut args = vec![function.this.as_ref()];
825 if let Some(zone) = function.zone.as_deref() {
826 args.push(zone);
827 }
828 Some(transform_function_from_parts(
829 "TIME_TRUNC",
830 vec![function.unit.clone()],
831 args,
832 scope,
833 dialect,
834 ))
835 }
836 Expression::Extract(function) => Some(transform_function_from_parts(
837 "EXTRACT",
838 vec![datetime_field_name(&function.field)],
839 vec![&function.this],
840 scope,
841 dialect,
842 )),
843 Expression::DateAdd(function) => Some(transform_function_from_parts(
844 "DATE_ADD",
845 Vec::new(),
846 vec![&function.this, &function.interval],
847 scope,
848 dialect,
849 )),
850 Expression::DateSub(function) => Some(transform_function_from_parts(
851 "DATE_SUB",
852 Vec::new(),
853 vec![&function.this, &function.interval],
854 scope,
855 dialect,
856 )),
857 Expression::DateDiff(function) => Some(transform_function_from_parts(
858 "DATE_DIFF",
859 Vec::new(),
860 vec![&function.this, &function.expression],
861 scope,
862 dialect,
863 )),
864 _ => None,
865 }
866}
867
868fn transform_function_from_args(
869 name: &str,
870 args: &[Expression],
871 scope: &Scope,
872 dialect: DialectType,
873) -> TransformFunctionFact {
874 let literal_args = args
875 .iter()
876 .filter_map(|arg| literal_argument(arg, dialect))
877 .collect();
878 transform_function_from_parts(name, literal_args, args.iter().collect(), scope, dialect)
879}
880
881fn transform_function_from_parts(
882 name: &str,
883 literal_args: Vec<String>,
884 args: Vec<&Expression>,
885 scope: &Scope,
886 _dialect: DialectType,
887) -> TransformFunctionFact {
888 let column_args = dedupe_column_refs(
889 args.into_iter()
890 .flat_map(|arg| fallback_column_references(arg, scope))
891 .collect(),
892 );
893
894 TransformFunctionFact {
895 name: name.to_string(),
896 literal_args,
897 column_args,
898 }
899}
900
901fn literal_argument(expression: &Expression, dialect: DialectType) -> Option<String> {
902 match expression {
903 Expression::Literal(literal) => Some(literal.value_str().to_string()),
904 Expression::Boolean(boolean) => Some(boolean.value.to_string()),
905 Expression::Null(_) => Some("NULL".to_string()),
906 Expression::Identifier(identifier) => Some(identifier.name.clone()),
907 Expression::Var(var) => Some(var.this.clone()),
908 Expression::DataType(data_type) => render_data_type(data_type, dialect),
909 _ => None,
910 }
911}
912
913fn datetime_field_name(field: &crate::expressions::DateTimeField) -> String {
914 match field {
915 crate::expressions::DateTimeField::Year => "year".to_string(),
916 crate::expressions::DateTimeField::Month => "month".to_string(),
917 crate::expressions::DateTimeField::Day => "day".to_string(),
918 crate::expressions::DateTimeField::Hour => "hour".to_string(),
919 crate::expressions::DateTimeField::Minute => "minute".to_string(),
920 crate::expressions::DateTimeField::Second => "second".to_string(),
921 crate::expressions::DateTimeField::Millisecond => "millisecond".to_string(),
922 crate::expressions::DateTimeField::Microsecond => "microsecond".to_string(),
923 crate::expressions::DateTimeField::DayOfWeek => "day_of_week".to_string(),
924 crate::expressions::DateTimeField::DayOfYear => "day_of_year".to_string(),
925 crate::expressions::DateTimeField::Week => "week".to_string(),
926 crate::expressions::DateTimeField::WeekWithModifier(modifier) => {
927 format!("week({modifier})")
928 }
929 crate::expressions::DateTimeField::Quarter => "quarter".to_string(),
930 crate::expressions::DateTimeField::Epoch => "epoch".to_string(),
931 crate::expressions::DateTimeField::Timezone => "timezone".to_string(),
932 crate::expressions::DateTimeField::TimezoneHour => "timezone_hour".to_string(),
933 crate::expressions::DateTimeField::TimezoneMinute => "timezone_minute".to_string(),
934 crate::expressions::DateTimeField::Date => "date".to_string(),
935 crate::expressions::DateTimeField::Time => "time".to_string(),
936 crate::expressions::DateTimeField::Custom(name) => name.clone(),
937 }
938}
939
940fn unwrap_projection_alias(expression: &Expression) -> &Expression {
941 match expression {
942 Expression::Alias(alias) => unwrap_projection_alias(&alias.this),
943 Expression::Annotated(annotated) => unwrap_projection_alias(&annotated.this),
944 Expression::Paren(paren) => unwrap_projection_alias(&paren.this),
945 _ => expression,
946 }
947}
948
949fn projection_name(expression: &Expression) -> Option<String> {
950 match expression {
951 Expression::Alias(alias) => Some(alias.alias.name.clone()),
952 Expression::Column(column) => Some(column.name.name.clone()),
953 Expression::Identifier(identifier) => Some(identifier.name.clone()),
954 Expression::Star(_) => Some("*".to_string()),
955 Expression::Annotated(annotated) => projection_name(&annotated.this),
956 _ => None,
957 }
958}
959
960fn projection_is_star(expression: &Expression) -> bool {
961 matches!(expression, Expression::Star(_))
962 || matches!(expression, Expression::Column(column) if column.name.name == "*")
963}
964
965fn projection_star_table(expression: &Expression) -> Option<String> {
966 match expression {
967 Expression::Star(star) => star
968 .table
969 .as_ref()
970 .map(|identifier| identifier.name.clone()),
971 Expression::Column(column) if column.name.name == "*" => column
972 .table
973 .as_ref()
974 .map(|identifier| identifier.name.clone()),
975 _ => None,
976 }
977}
978
979fn transform_kind(expression: &Expression) -> TransformKind {
980 if projection_is_star(expression) {
981 TransformKind::Star
982 } else if is_cast_expression(expression) {
983 TransformKind::Cast
984 } else if contains_aggregate(expression) {
985 TransformKind::Aggregation
986 } else if matches!(
987 expression,
988 Expression::Column(_) | Expression::Identifier(_)
989 ) {
990 TransformKind::Direct
991 } else if is_simple_constant(expression) {
992 TransformKind::Constant
993 } else {
994 TransformKind::Expression
995 }
996}
997
998fn is_cast_expression(expression: &Expression) -> bool {
999 matches!(
1000 expression,
1001 Expression::Cast(_) | Expression::TryCast(_) | Expression::SafeCast(_)
1002 )
1003}
1004
1005fn cast_type(expression: &Expression, dialect: DialectType) -> Option<String> {
1006 match expression {
1007 Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1008 render_data_type(&cast.to, dialect)
1009 }
1010 _ => None,
1011 }
1012}
1013
1014fn render_data_type(data_type: &DataType, dialect: DialectType) -> Option<String> {
1015 Dialect::get(dialect)
1016 .generate(&Expression::DataType(data_type.clone()))
1017 .ok()
1018}
1019
1020fn is_simple_constant(expression: &Expression) -> bool {
1021 match expression {
1022 Expression::Literal(_) | Expression::Boolean(_) | Expression::Null(_) => true,
1023 Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1024 is_simple_constant(&cast.this)
1025 }
1026 Expression::Neg(unary) | Expression::BitwiseNot(unary) => is_simple_constant(&unary.this),
1027 _ => false,
1028 }
1029}
1030
1031fn projection_nullability(
1032 expression: &Expression,
1033 scope: &Scope,
1034 context: &NullabilityContext<'_>,
1035) -> ProjectionNullability {
1036 match expression {
1037 Expression::Alias(alias) => projection_nullability(&alias.this, scope, context),
1038 Expression::Annotated(annotated) => projection_nullability(&annotated.this, scope, context),
1039 Expression::Paren(paren) => projection_nullability(&paren.this, scope, context),
1040 Expression::Literal(_) | Expression::Boolean(_) => ProjectionNullability::NonNull,
1041 Expression::Null(_) => ProjectionNullability::Nullable,
1042 Expression::Count(_) | Expression::CountIf(_) => ProjectionNullability::NonNull,
1043 Expression::Cast(cast) => projection_nullability(&cast.this, scope, context),
1044 Expression::TryCast(_) | Expression::SafeCast(_) => ProjectionNullability::Unknown,
1045 Expression::Column(column) => column_nullability(
1046 &column.name.name,
1047 column.table.as_ref().map(|table| table.name.as_str()),
1048 scope,
1049 context,
1050 ),
1051 Expression::Identifier(identifier) => {
1052 column_nullability(&identifier.name, None, scope, context)
1053 }
1054 Expression::Coalesce(func) => coalesce_nullability(&func.expressions, scope, context),
1055 _ => ProjectionNullability::Unknown,
1056 }
1057}
1058
1059fn column_nullability(
1060 column_name: &str,
1061 source_name: Option<&str>,
1062 scope: &Scope,
1063 context: &NullabilityContext<'_>,
1064) -> ProjectionNullability {
1065 let resolved_source_name = source_name
1066 .map(str::to_string)
1067 .or_else(|| single_scope_source_name(scope));
1068
1069 if let Some(source_name) = &resolved_source_name {
1070 if context
1071 .nullable_sources
1072 .contains(&normalize_lookup_name(source_name))
1073 {
1074 return ProjectionNullability::Nullable;
1075 }
1076 }
1077
1078 let Some(schema) = context.schema else {
1079 return ProjectionNullability::Unknown;
1080 };
1081
1082 let table_name = resolved_source_name
1083 .as_ref()
1084 .and_then(|name| scope.sources.get(name).and_then(source_table_name))
1085 .or(resolved_source_name);
1086
1087 let Some(table_name) = table_name else {
1088 return ProjectionNullability::Unknown;
1089 };
1090
1091 match schema.column(&table_name, column_name) {
1092 Some(info) if info.primary_key || info.nullable == Some(false) => {
1093 ProjectionNullability::NonNull
1094 }
1095 Some(info) if info.nullable == Some(true) => ProjectionNullability::Nullable,
1096 Some(_) | None => ProjectionNullability::Unknown,
1097 }
1098}
1099
1100fn single_scope_source_name(scope: &Scope) -> Option<String> {
1101 if scope.sources.len() == 1 {
1102 scope.sources.keys().next().cloned()
1103 } else {
1104 None
1105 }
1106}
1107
1108fn coalesce_nullability(
1109 expressions: &[Expression],
1110 scope: &Scope,
1111 context: &NullabilityContext<'_>,
1112) -> ProjectionNullability {
1113 if expressions.is_empty() {
1114 return ProjectionNullability::Unknown;
1115 }
1116
1117 let mut all_nullable = true;
1118
1119 for expression in expressions {
1120 match projection_nullability(unwrap_projection_alias(expression), scope, context) {
1121 ProjectionNullability::NonNull => return ProjectionNullability::NonNull,
1122 ProjectionNullability::Nullable => {}
1123 ProjectionNullability::Unknown => all_nullable = false,
1124 }
1125 }
1126
1127 if all_nullable {
1128 ProjectionNullability::Nullable
1129 } else {
1130 ProjectionNullability::Unknown
1131 }
1132}
1133
1134fn terminal_references_from_lineage(node: &LineageNode) -> Vec<ColumnReferenceFact> {
1135 let mut refs = Vec::new();
1136 collect_terminal_references(node, &mut refs);
1137 dedupe_column_refs(refs)
1138}
1139
1140fn collect_terminal_references(node: &LineageNode, refs: &mut Vec<ColumnReferenceFact>) {
1141 if node.downstream.is_empty() {
1142 if let Some(reference) = column_reference_from_lineage_node(node) {
1143 refs.push(reference);
1144 }
1145 return;
1146 }
1147
1148 for child in &node.downstream {
1149 collect_terminal_references(child, refs);
1150 }
1151}
1152
1153fn column_reference_from_lineage_node(node: &LineageNode) -> Option<ColumnReferenceFact> {
1154 match &node.expression {
1155 Expression::Column(column) => {
1156 let source_name = non_empty_string(node.source_name.clone());
1157 let table =
1158 lineage_node_table(node).or_else(|| column.table.as_ref().map(|t| t.name.clone()));
1159 let confidence = if node.source_kind == SourceKind::Unknown && source_name.is_none() {
1160 ReferenceConfidence::Unknown
1161 } else {
1162 ReferenceConfidence::Resolved
1163 };
1164 Some(ColumnReferenceFact {
1165 source_name,
1166 source_alias: node.source_alias.clone(),
1167 source_kind: node.source_kind,
1168 table,
1169 column: column.name.name.clone(),
1170 unqualified: column.table.is_none(),
1171 confidence,
1172 })
1173 }
1174 Expression::Star(_) => Some(ColumnReferenceFact {
1175 source_name: non_empty_string(node.source_name.clone()),
1176 source_alias: node.source_alias.clone(),
1177 source_kind: node.source_kind,
1178 table: lineage_node_table(node),
1179 column: "*".to_string(),
1180 unqualified: true,
1181 confidence: if node.source_kind == SourceKind::Unknown {
1182 ReferenceConfidence::Unknown
1183 } else {
1184 ReferenceConfidence::Resolved
1185 },
1186 }),
1187 _ => None,
1188 }
1189}
1190
1191fn lineage_node_table(node: &LineageNode) -> Option<String> {
1192 match &node.source {
1193 Expression::Table(table) => Some(table_name(table)),
1194 _ => None,
1195 }
1196}
1197
1198fn fallback_column_references(expression: &Expression, scope: &Scope) -> Vec<ColumnReferenceFact> {
1199 let mut refs = Vec::new();
1200 let source_count = scope.sources.len();
1201 let single_source = if source_count == 1 {
1202 scope.sources.iter().next()
1203 } else {
1204 None
1205 };
1206
1207 for column_expr in expression.find_all(|candidate| matches!(candidate, Expression::Column(_))) {
1208 if let Expression::Column(column) = column_expr {
1209 if column.name.name == "*" {
1210 continue;
1211 }
1212 let source = column
1213 .table
1214 .as_ref()
1215 .and_then(|table| scope.sources.get(&table.name));
1216 let (source_name, source_alias, source_kind, table, confidence) =
1217 if let Some(table_identifier) = &column.table {
1218 if let Some(source) = source {
1219 (
1220 Some(table_identifier.name.clone()),
1221 source.alias.clone(),
1222 source.kind,
1223 source_table_name(source)
1224 .or_else(|| Some(table_identifier.name.clone())),
1225 ReferenceConfidence::Resolved,
1226 )
1227 } else {
1228 (
1229 Some(table_identifier.name.clone()),
1230 None,
1231 SourceKind::Unknown,
1232 Some(table_identifier.name.clone()),
1233 ReferenceConfidence::Unknown,
1234 )
1235 }
1236 } else if let Some((name, source)) = single_source {
1237 (
1238 Some(name.clone()),
1239 source.alias.clone(),
1240 source.kind,
1241 source_table_name(source).or_else(|| Some(name.clone())),
1242 ReferenceConfidence::Resolved,
1243 )
1244 } else if source_count > 1 {
1245 (
1246 None,
1247 None,
1248 SourceKind::Unknown,
1249 None,
1250 ReferenceConfidence::Ambiguous,
1251 )
1252 } else {
1253 (
1254 None,
1255 None,
1256 SourceKind::Unknown,
1257 None,
1258 ReferenceConfidence::Unknown,
1259 )
1260 };
1261
1262 refs.push(ColumnReferenceFact {
1263 source_name,
1264 source_alias,
1265 source_kind,
1266 table,
1267 column: column.name.name.clone(),
1268 unqualified: column.table.is_none(),
1269 confidence,
1270 });
1271 }
1272 }
1273
1274 dedupe_column_refs(refs)
1275}
1276
1277fn dedupe_column_refs(refs: Vec<ColumnReferenceFact>) -> Vec<ColumnReferenceFact> {
1278 let mut seen = HashSet::new();
1279 let mut deduped = Vec::new();
1280
1281 for reference in refs {
1282 let key = (
1283 reference.source_name.clone(),
1284 reference.source_alias.clone(),
1285 reference.table.clone(),
1286 reference.column.clone(),
1287 format!("{:?}", reference.source_kind),
1288 reference.unqualified,
1289 format!("{:?}", reference.confidence),
1290 );
1291 if seen.insert(key) {
1292 deduped.push(reference);
1293 }
1294 }
1295
1296 deduped
1297}
1298
1299fn relation_facts(
1300 scope: &Scope,
1301 mapping_schema: Option<&crate::schema::MappingSchema>,
1302) -> Vec<RelationFact> {
1303 let mut relations = Vec::new();
1304 let mut seen = HashSet::new();
1305 collect_relation_facts(scope, mapping_schema, &mut seen, &mut relations);
1306
1307 relations.sort_by(|left, right| {
1308 left.name
1309 .cmp(&right.name)
1310 .then_with(|| left.alias.cmp(&right.alias))
1311 });
1312 relations
1313}
1314
1315fn collect_relation_facts(
1316 scope: &Scope,
1317 mapping_schema: Option<&crate::schema::MappingSchema>,
1318 seen: &mut HashSet<String>,
1319 relations: &mut Vec<RelationFact>,
1320) {
1321 for relation in scope.sources.iter().map(|(source_name, source)| {
1322 let identity = source_table_identity(source);
1323 RelationFact {
1324 name: source
1325 .lineage_name
1326 .clone()
1327 .or_else(|| identity.as_ref().map(|identity| identity.name.clone()))
1328 .unwrap_or_else(|| source_name.clone()),
1329 alias: source.alias.clone().or_else(|| source_alias(source)),
1330 kind: source.kind,
1331 columns: source_columns(source, mapping_schema),
1332 catalog: identity
1333 .as_ref()
1334 .and_then(|identity| identity.catalog.clone()),
1335 schema: identity
1336 .as_ref()
1337 .and_then(|identity| identity.schema.clone()),
1338 table: identity
1339 .as_ref()
1340 .and_then(|identity| identity.table.clone()),
1341 }
1342 }) {
1343 let key = format!("{:?}|{}|{:?}", relation.kind, relation.name, relation.alias);
1344 if seen.insert(key) {
1345 relations.push(relation);
1346 }
1347 }
1348
1349 for branch_scope in &scope.union_scopes {
1350 collect_relation_facts(branch_scope, mapping_schema, seen, relations);
1351 }
1352}
1353
1354fn base_table_facts(
1355 scope: &Scope,
1356 mapping_schema: Option<&crate::schema::MappingSchema>,
1357) -> Vec<RelationFact> {
1358 let mut relations = Vec::new();
1359 let mut seen = HashSet::new();
1360
1361 collect_base_table_facts(scope, mapping_schema, &mut seen, &mut relations);
1362
1363 relations.sort_by(|left, right| left.name.cmp(&right.name));
1364 relations
1365}
1366
1367fn collect_base_table_facts(
1368 scope: &Scope,
1369 mapping_schema: Option<&crate::schema::MappingSchema>,
1370 seen: &mut HashSet<String>,
1371 relations: &mut Vec<RelationFact>,
1372) {
1373 for source in scope.sources.values() {
1374 if source.kind != SourceKind::Table {
1375 continue;
1376 }
1377
1378 let Some(identity) = source_table_identity(source) else {
1379 continue;
1380 };
1381
1382 if seen.insert(identity.name.clone()) {
1383 relations.push(RelationFact {
1384 name: identity.name,
1385 alias: source.alias.clone().or_else(|| source_alias(source)),
1386 kind: SourceKind::Table,
1387 columns: source_columns(source, mapping_schema),
1388 catalog: identity.catalog,
1389 schema: identity.schema,
1390 table: identity.table,
1391 });
1392 }
1393 }
1394
1395 for child_scope in scope
1396 .cte_scopes
1397 .iter()
1398 .chain(scope.union_scopes.iter())
1399 .chain(scope.table_scopes.iter())
1400 .chain(scope.derived_table_scopes.iter())
1401 .chain(scope.subquery_scopes.iter())
1402 {
1403 collect_base_table_facts(child_scope, mapping_schema, seen, relations);
1404 }
1405}
1406
1407fn source_columns(
1408 source: &SourceInfo,
1409 mapping_schema: Option<&crate::schema::MappingSchema>,
1410) -> Vec<String> {
1411 match &source.expression {
1412 Expression::Table(table) => mapping_schema
1413 .and_then(|schema| schema.column_names(&table_name(table)).ok())
1414 .unwrap_or_default(),
1415 Expression::Select(_)
1416 | Expression::Union(_)
1417 | Expression::Intersect(_)
1418 | Expression::Except(_) => get_output_column_names(&source.expression),
1419 Expression::Subquery(subquery) => get_output_column_names(&subquery.this),
1420 Expression::Cte(cte) if !cte.columns.is_empty() => cte
1421 .columns
1422 .iter()
1423 .map(|column| column.name.clone())
1424 .collect(),
1425 Expression::Cte(cte) => get_output_column_names(&cte.this),
1426 _ => Vec::new(),
1427 }
1428}
1429
1430fn source_table_name(source: &SourceInfo) -> Option<String> {
1431 source_table_identity(source).map(|identity| identity.name)
1432}
1433
1434fn source_alias(source: &SourceInfo) -> Option<String> {
1435 match &source.expression {
1436 Expression::Table(table) => table.alias.as_ref().map(|alias| alias.name.clone()),
1437 Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
1438 _ => None,
1439 }
1440}
1441
1442fn table_name(table: &TableRef) -> String {
1443 let mut parts = Vec::new();
1444 if let Some(catalog) = &table.catalog {
1445 parts.push(catalog.name.clone());
1446 }
1447 if let Some(schema) = &table.schema {
1448 parts.push(schema.name.clone());
1449 }
1450 parts.push(table.name.name.clone());
1451 parts.join(".")
1452}
1453
1454#[derive(Debug, Clone)]
1455struct RelationIdentity {
1456 name: String,
1457 catalog: Option<String>,
1458 schema: Option<String>,
1459 table: Option<String>,
1460}
1461
1462fn source_table_identity(source: &SourceInfo) -> Option<RelationIdentity> {
1463 match &source.expression {
1464 Expression::Table(table) => Some(table_identity(table)),
1465 _ => None,
1466 }
1467}
1468
1469fn table_identity(table: &TableRef) -> RelationIdentity {
1470 RelationIdentity {
1471 name: table_name(table),
1472 catalog: table.catalog.as_ref().map(|catalog| catalog.name.clone()),
1473 schema: table.schema.as_ref().map(|schema| schema.name.clone()),
1474 table: Some(table.name.name.clone()),
1475 }
1476}
1477
1478fn set_operation_facts(
1479 expression: &Expression,
1480 scope: &Scope,
1481 dialect: DialectType,
1482) -> Vec<SetOperationFact> {
1483 let mut facts = Vec::new();
1484 collect_set_operation_facts(expression, scope, dialect, &mut facts);
1485 facts
1486}
1487
1488fn collect_set_operation_facts(
1489 expression: &Expression,
1490 scope: &Scope,
1491 dialect: DialectType,
1492 facts: &mut Vec<SetOperationFact>,
1493) {
1494 match expression {
1495 Expression::Union(union) => {
1496 facts.push(SetOperationFact {
1497 kind: "union".to_string(),
1498 all: union.all,
1499 distinct: union.distinct,
1500 output_columns: get_output_column_names(expression),
1501 branches: set_operation_branches(&union.left, &union.right, scope, dialect),
1502 });
1503 collect_set_operation_facts(&union.left, scope, dialect, facts);
1504 collect_set_operation_facts(&union.right, scope, dialect, facts);
1505 }
1506 Expression::Intersect(intersect) => {
1507 facts.push(SetOperationFact {
1508 kind: "intersect".to_string(),
1509 all: intersect.all,
1510 distinct: intersect.distinct,
1511 output_columns: get_output_column_names(expression),
1512 branches: set_operation_branches(&intersect.left, &intersect.right, scope, dialect),
1513 });
1514 collect_set_operation_facts(&intersect.left, scope, dialect, facts);
1515 collect_set_operation_facts(&intersect.right, scope, dialect, facts);
1516 }
1517 Expression::Except(except) => {
1518 facts.push(SetOperationFact {
1519 kind: "except".to_string(),
1520 all: except.all,
1521 distinct: except.distinct,
1522 output_columns: get_output_column_names(expression),
1523 branches: set_operation_branches(&except.left, &except.right, scope, dialect),
1524 });
1525 collect_set_operation_facts(&except.left, scope, dialect, facts);
1526 collect_set_operation_facts(&except.right, scope, dialect, facts);
1527 }
1528 Expression::Subquery(subquery) => {
1529 collect_set_operation_facts(&subquery.this, scope, dialect, facts);
1530 }
1531 _ => {}
1532 }
1533}
1534
1535fn set_operation_branches(
1536 left: &Expression,
1537 right: &Expression,
1538 scope: &Scope,
1539 dialect: DialectType,
1540) -> Vec<SetOperationBranchFact> {
1541 vec![
1542 SetOperationBranchFact {
1543 index: 0,
1544 projections: projection_facts_for_branch(left, scope, dialect),
1545 },
1546 SetOperationBranchFact {
1547 index: 1,
1548 projections: projection_facts_for_branch(right, scope, dialect),
1549 },
1550 ]
1551}
1552
1553fn projection_facts_for_branch(
1554 expression: &Expression,
1555 root_scope: &Scope,
1556 dialect: DialectType,
1557) -> Vec<ProjectionFact> {
1558 let branch_scope = build_scope(expression);
1559 let scope = if branch_scope.sources.is_empty() {
1560 root_scope
1561 } else {
1562 &branch_scope
1563 };
1564 let nullability_context = NullabilityContext {
1565 schema: None,
1566 nullable_sources: nullable_source_names(expression),
1567 };
1568 projection_facts_for_query(expression, scope, dialect, &nullability_context)
1569}
1570
1571fn non_empty_string(value: String) -> Option<String> {
1572 if value.is_empty() {
1573 None
1574 } else {
1575 Some(value)
1576 }
1577}