1use crate::ast_transforms::get_output_column_names;
9use crate::dialects::{Dialect, DialectType};
10use crate::expressions::{DataType, Expression, JoinKind, TableRef, With};
11use crate::lineage::{lineage_by_index_from_expression, LineageNode};
12use crate::optimizer::annotate_types::annotate_types;
13use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
14use crate::schema::{MappingSchema, Schema};
15use crate::scope::{build_scope, Scope, SourceInfo, SourceKind};
16use crate::traversal::{contains_aggregate, ExpressionWalk};
17use crate::validation::{mapping_schema_from_validation_schema, ValidationSchema};
18use crate::{parse_data_type, parse_one, Error, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21
22#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24#[serde(rename_all = "camelCase", default)]
25pub struct AnalyzeQueryOptions {
26 pub dialect: DialectType,
28 pub schema: Option<ValidationSchema>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct QueryAnalysis {
36 pub shape: QueryShape,
37 pub ctes: Vec<String>,
38 pub cte_facts: Vec<CteFact>,
39 pub projections: Vec<ProjectionFact>,
40 pub relations: Vec<RelationFact>,
41 pub base_tables: Vec<RelationFact>,
42 pub star_projections: Vec<StarProjectionFact>,
43 pub set_operations: Vec<SetOperationFact>,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum QueryShape {
50 Select,
51 SetOperation,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(rename_all = "camelCase")]
57pub struct ProjectionFact {
58 pub index: usize,
59 pub name: Option<String>,
60 pub is_star: bool,
61 pub star_table: Option<String>,
62 pub transform_kind: TransformKind,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub transform_function: Option<TransformFunctionFact>,
65 pub cast_type: Option<String>,
66 pub type_hint: Option<String>,
67 pub nullability: ProjectionNullability,
68 pub upstream: Vec<ColumnReferenceFact>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct TransformFunctionFact {
75 pub name: String,
76 pub literal_args: Vec<String>,
77 pub column_args: Vec<ColumnReferenceFact>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct CteFact {
84 pub name: String,
85 pub columns: Vec<String>,
86 pub body_sql: String,
87 pub output_columns: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "camelCase")]
93pub struct StarProjectionFact {
94 pub index: usize,
95 pub table: Option<String>,
96 pub expanded_columns: Vec<String>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "camelCase")]
102pub struct ColumnReferenceFact {
103 pub source_name: Option<String>,
104 pub source_alias: Option<String>,
105 pub source_kind: SourceKind,
106 pub table: Option<String>,
107 pub column: String,
108 pub unqualified: bool,
109 pub confidence: ReferenceConfidence,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct RelationFact {
116 pub name: String,
117 pub alias: Option<String>,
118 pub kind: SourceKind,
119 pub columns: Vec<String>,
120 pub catalog: Option<String>,
121 pub schema: Option<String>,
122 pub table: Option<String>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct SetOperationFact {
129 pub kind: String,
130 pub all: bool,
131 pub distinct: bool,
132 pub output_columns: Vec<String>,
133 pub branches: Vec<SetOperationBranchFact>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138#[serde(rename_all = "camelCase")]
139pub struct SetOperationBranchFact {
140 pub index: usize,
141 pub projections: Vec<ProjectionFact>,
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub enum TransformKind {
148 Direct,
149 Cast,
150 Aggregation,
151 Constant,
152 Expression,
153 Star,
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum ReferenceConfidence {
160 Resolved,
161 Ambiguous,
162 Unknown,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum ProjectionNullability {
169 NonNull,
170 Nullable,
171 Unknown,
172}
173
174pub fn analyze_query(sql: &str, options: AnalyzeQueryOptions) -> Result<QueryAnalysis> {
176 let mut expression = parse_one(sql, options.dialect)?;
177 expression = effective_query(expression);
178 ensure_query(&expression)?;
179 let original_expression = expression.clone();
180
181 let mapping_schema = options
182 .schema
183 .as_ref()
184 .map(|schema| analysis_mapping_schema(schema, options.dialect));
185 let schema_info = options.schema.as_ref().map(AnalysisSchemaInfo::from_schema);
186 let cte_facts = top_level_cte_facts(&original_expression, options.dialect)?;
187 let star_projections = star_projection_facts(&original_expression, mapping_schema.as_ref());
188
189 if let Some(schema) = mapping_schema.as_ref() {
190 let qualify_options = QualifyColumnsOptions::new()
191 .with_dialect(options.dialect)
192 .with_allow_partial(true);
193 expression = qualify_columns(expression, schema, &qualify_options)
194 .map_err(|e| Error::internal(format!("query analysis qualification failed: {e}")))?;
195 }
196
197 let annotation_schema = mapping_schema.as_ref().map(|schema| {
198 let mut alias_schema = schema.clone();
199 add_scope_aliases_to_schema(
200 &build_scope(&expression),
201 schema,
202 &mut alias_schema,
203 options.dialect,
204 );
205 alias_schema
206 });
207
208 annotate_types(
209 &mut expression,
210 annotation_schema
211 .as_ref()
212 .map(|schema| schema as &dyn Schema),
213 Some(options.dialect),
214 );
215 crate::lineage::expand_cte_stars(
216 &mut expression,
217 annotation_schema
218 .as_ref()
219 .or(mapping_schema.as_ref())
220 .map(|schema| schema as &dyn Schema),
221 );
222
223 let scope = build_scope(&expression);
224 let nullability_context = NullabilityContext {
225 schema: schema_info.as_ref(),
226 nullable_sources: nullable_source_names(&expression),
227 };
228 let shape = if is_set_operation(&expression) {
229 QueryShape::SetOperation
230 } else {
231 QueryShape::Select
232 };
233
234 Ok(QueryAnalysis {
235 shape,
236 ctes: collect_cte_names(&expression),
237 cte_facts,
238 projections: projection_facts_for_query(
239 &expression,
240 &scope,
241 options.dialect,
242 &nullability_context,
243 ),
244 relations: relation_facts(&scope, mapping_schema.as_ref()),
245 base_tables: base_table_facts(&scope, mapping_schema.as_ref()),
246 star_projections,
247 set_operations: set_operation_facts(&expression, &scope, options.dialect),
248 })
249}
250
251fn analysis_mapping_schema(schema: &ValidationSchema, dialect: DialectType) -> MappingSchema {
252 let broad_schema = mapping_schema_from_validation_schema(schema);
253 let mut mapping_schema = MappingSchema::with_dialect(dialect);
254
255 for table in &schema.tables {
256 let table_names = validation_table_names(table);
257 if table_names.is_empty() {
258 continue;
259 }
260
261 let fallback_table = table_names[0].as_str();
262 let columns: Vec<(String, DataType)> = table
263 .columns
264 .iter()
265 .map(|column| {
266 let data_type = parse_analysis_data_type(&column.data_type, dialect)
267 .unwrap_or_else(|| {
268 broad_schema
269 .get_column_type(fallback_table, &column.name)
270 .unwrap_or(DataType::Unknown)
271 });
272 (column.name.to_ascii_lowercase(), data_type)
273 })
274 .collect();
275
276 for table_name in table_names {
277 let _ = mapping_schema.add_table(&table_name, &columns, Some(dialect));
278 }
279 }
280
281 mapping_schema
282}
283
284fn validation_table_names(table: &crate::validation::SchemaTable) -> Vec<String> {
285 let mut names = Vec::new();
286
287 names.push(table.name.to_ascii_lowercase());
288 if let Some(schema_name) = &table.schema {
289 names.push(format!(
290 "{}.{}",
291 schema_name.to_ascii_lowercase(),
292 table.name.to_ascii_lowercase()
293 ));
294 }
295 for alias in &table.aliases {
296 names.push(alias.to_ascii_lowercase());
297 }
298
299 names.sort();
300 names.dedup();
301 names
302}
303
304fn parse_analysis_data_type(data_type: &str, dialect: DialectType) -> Option<DataType> {
305 let trimmed = data_type.trim();
306 if trimmed.is_empty() {
307 return None;
308 }
309 parse_data_type(trimmed, dialect).ok()
310}
311
312fn add_scope_aliases_to_schema(
313 scope: &Scope,
314 source_schema: &MappingSchema,
315 target_schema: &mut MappingSchema,
316 dialect: DialectType,
317) {
318 for child_scope in scope.traverse() {
319 for (source_name, source) in &child_scope.sources {
320 if source.kind != SourceKind::Table {
321 continue;
322 }
323 if let Some(table_name) = source_table_name(source) {
324 if source_name == &table_name {
325 continue;
326 }
327 if let Ok(column_names) = source_schema.column_names(&table_name) {
328 let columns: Vec<(String, DataType)> = column_names
329 .iter()
330 .map(|column| {
331 (
332 column.clone(),
333 source_schema
334 .get_column_type(&table_name, column)
335 .unwrap_or(DataType::Unknown),
336 )
337 })
338 .collect();
339 let _ = target_schema.add_table(source_name, &columns, Some(dialect));
340 }
341 }
342 }
343 }
344}
345
346#[derive(Debug, Clone)]
347struct AnalysisColumnInfo {
348 nullable: Option<bool>,
349 primary_key: bool,
350}
351
352#[derive(Debug, Clone)]
353struct AnalysisSchemaInfo {
354 columns: HashMap<(String, String), AnalysisColumnInfo>,
355}
356
357impl AnalysisSchemaInfo {
358 fn from_schema(schema: &ValidationSchema) -> Self {
359 let mut columns = HashMap::new();
360
361 for table in &schema.tables {
362 let table_names = validation_table_names(table);
363 let primary_keys: HashSet<String> = table
364 .primary_key
365 .iter()
366 .map(|column| column.to_ascii_lowercase())
367 .collect();
368
369 for column in &table.columns {
370 let info = AnalysisColumnInfo {
371 nullable: column.nullable,
372 primary_key: column.primary_key
373 || primary_keys.contains(&column.name.to_ascii_lowercase()),
374 };
375
376 for table_name in &table_names {
377 columns.insert(
378 (
379 normalize_lookup_name(table_name),
380 normalize_lookup_name(&column.name),
381 ),
382 info.clone(),
383 );
384 }
385 }
386 }
387
388 Self { columns }
389 }
390
391 fn column(&self, table: &str, column: &str) -> Option<&AnalysisColumnInfo> {
392 self.columns
393 .get(&(normalize_lookup_name(table), normalize_lookup_name(column)))
394 }
395}
396
397struct NullabilityContext<'a> {
398 schema: Option<&'a AnalysisSchemaInfo>,
399 nullable_sources: HashSet<String>,
400}
401
402fn top_level_cte_facts(expression: &Expression, dialect: DialectType) -> Result<Vec<CteFact>> {
403 let Some(with_clause) = with_clause(expression) else {
404 return Ok(Vec::new());
405 };
406
407 with_clause
408 .ctes
409 .iter()
410 .map(|cte| {
411 Ok(CteFact {
412 name: cte.alias.name.clone(),
413 columns: cte
414 .columns
415 .iter()
416 .map(|column| column.name.clone())
417 .collect(),
418 body_sql: Dialect::get(dialect).generate(&cte.this)?,
419 output_columns: get_output_column_names(&cte.this),
420 })
421 })
422 .collect()
423}
424
425fn star_projection_facts(
426 expression: &Expression,
427 mapping_schema: Option<&MappingSchema>,
428) -> Vec<StarProjectionFact> {
429 let scope = build_scope(expression);
430 let ordered_sources = ordered_source_names_for_query(expression);
431
432 select_expressions_for_query(expression)
433 .iter()
434 .enumerate()
435 .filter_map(|(index, projection)| {
436 let inner = unwrap_projection_alias(projection);
437 if !projection_is_star(inner) {
438 return None;
439 }
440
441 let table = projection_star_table(inner);
442 let expanded_columns =
443 expanded_star_columns(table.as_deref(), &scope, &ordered_sources, mapping_schema);
444
445 Some(StarProjectionFact {
446 index,
447 table,
448 expanded_columns,
449 })
450 })
451 .collect()
452}
453
454fn expanded_star_columns(
455 star_table: Option<&str>,
456 scope: &Scope,
457 ordered_sources: &[String],
458 mapping_schema: Option<&MappingSchema>,
459) -> Vec<String> {
460 let mut columns = Vec::new();
461 let mut source_names: Vec<String> = if ordered_sources.is_empty() {
462 let mut names: Vec<_> = scope.sources.keys().cloned().collect();
463 names.sort();
464 names
465 } else {
466 ordered_sources.to_vec()
467 };
468
469 source_names.dedup();
470
471 for source_name in source_names {
472 let Some(source) = scope.sources.get(&source_name) else {
473 continue;
474 };
475
476 if let Some(star_table) = star_table {
477 let matches = source_name.eq_ignore_ascii_case(star_table)
478 || source
479 .alias
480 .as_deref()
481 .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
482 || source_table_name(source)
483 .is_some_and(|table| table.eq_ignore_ascii_case(star_table));
484
485 if !matches {
486 continue;
487 }
488 }
489
490 columns.extend(source_columns(source, mapping_schema));
491 }
492
493 columns
494}
495
496fn ordered_source_names_for_query(expression: &Expression) -> Vec<String> {
497 match expression {
498 Expression::Select(select) => ordered_source_names_for_select(select),
499 Expression::Union(union) => ordered_source_names_for_query(&union.left),
500 Expression::Intersect(intersect) => ordered_source_names_for_query(&intersect.left),
501 Expression::Except(except) => ordered_source_names_for_query(&except.left),
502 Expression::Subquery(subquery) => ordered_source_names_for_query(&subquery.this),
503 _ => Vec::new(),
504 }
505}
506
507fn ordered_source_names_for_select(select: &crate::expressions::Select) -> Vec<String> {
508 let mut sources = Vec::new();
509
510 if let Some(from) = &select.from {
511 for expression in &from.expressions {
512 if let Some(source_name) = expression_source_name(expression) {
513 sources.push(source_name);
514 }
515 }
516 }
517
518 for join in &select.joins {
519 if let Some(source_name) = expression_source_name(&join.this) {
520 sources.push(source_name);
521 }
522 }
523
524 sources
525}
526
527fn nullable_source_names(expression: &Expression) -> HashSet<String> {
528 match expression {
529 Expression::Select(select) => nullable_source_names_for_select(select),
530 Expression::Union(union) => nullable_source_names(&union.left),
531 Expression::Intersect(intersect) => nullable_source_names(&intersect.left),
532 Expression::Except(except) => nullable_source_names(&except.left),
533 Expression::Subquery(subquery) => nullable_source_names(&subquery.this),
534 _ => HashSet::new(),
535 }
536}
537
538fn nullable_source_names_for_select(select: &crate::expressions::Select) -> HashSet<String> {
539 let mut nullable = HashSet::new();
540 let mut left_sources = Vec::new();
541
542 if let Some(from) = &select.from {
543 for expression in &from.expressions {
544 if let Some(source_name) = expression_source_name(expression) {
545 left_sources.push(source_name);
546 }
547 }
548 }
549
550 for join in &select.joins {
551 let right_source = expression_source_name(&join.this);
552
553 if join_nullable_left(join.kind) {
554 for source_name in &left_sources {
555 nullable.insert(normalize_lookup_name(source_name));
556 }
557 }
558
559 if join_nullable_right(join.kind) {
560 if let Some(source_name) = &right_source {
561 nullable.insert(normalize_lookup_name(source_name));
562 }
563 }
564
565 if let Some(source_name) = right_source {
566 left_sources.push(source_name);
567 }
568 }
569
570 nullable
571}
572
573fn join_nullable_left(kind: JoinKind) -> bool {
574 matches!(
575 kind,
576 JoinKind::Right
577 | JoinKind::NaturalRight
578 | JoinKind::AsOfRight
579 | JoinKind::Full
580 | JoinKind::NaturalFull
581 | JoinKind::Outer
582 )
583}
584
585fn join_nullable_right(kind: JoinKind) -> bool {
586 matches!(
587 kind,
588 JoinKind::Left
589 | JoinKind::NaturalLeft
590 | JoinKind::AsOfLeft
591 | JoinKind::LeftLateral
592 | JoinKind::OuterApply
593 | JoinKind::LeftArray
594 | JoinKind::Full
595 | JoinKind::NaturalFull
596 | JoinKind::Outer
597 )
598}
599
600fn expression_source_name(expression: &Expression) -> Option<String> {
601 match expression {
602 Expression::Table(table) => table
603 .alias
604 .as_ref()
605 .map(|alias| alias.name.clone())
606 .or_else(|| Some(table.name.name.clone())),
607 Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
608 Expression::Alias(alias) => Some(alias.alias.name.clone()),
609 Expression::Cte(cte) => Some(cte.alias.name.clone()),
610 _ => None,
611 }
612}
613
614fn normalize_lookup_name(name: &str) -> String {
615 name.to_ascii_lowercase()
616}
617
618fn effective_query(expression: Expression) -> Expression {
619 match expression {
620 Expression::Prepare(prepare) => prepare.statement,
621 Expression::Subquery(subquery) if subquery.alias.is_none() => subquery.this,
622 other => other,
623 }
624}
625
626fn ensure_query(expression: &Expression) -> Result<()> {
627 if matches!(
628 expression,
629 Expression::Select(_)
630 | Expression::Union(_)
631 | Expression::Intersect(_)
632 | Expression::Except(_)
633 ) {
634 Ok(())
635 } else {
636 Err(Error::internal(
637 "analyze_query requires a SELECT or set operation query",
638 ))
639 }
640}
641
642fn is_set_operation(expression: &Expression) -> bool {
643 matches!(
644 expression,
645 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
646 )
647}
648
649fn collect_cte_names(expression: &Expression) -> Vec<String> {
650 let mut names = Vec::new();
651 let mut seen = HashSet::new();
652 collect_cte_names_inner(expression, &mut names, &mut seen);
653 names
654}
655
656fn collect_cte_names_inner(
657 expression: &Expression,
658 names: &mut Vec<String>,
659 seen: &mut HashSet<String>,
660) {
661 if let Some(with_clause) = with_clause(expression) {
662 collect_with_names(with_clause, names, seen);
663 }
664
665 match expression {
666 Expression::Union(union) => {
667 collect_cte_names_inner(&union.left, names, seen);
668 collect_cte_names_inner(&union.right, names, seen);
669 }
670 Expression::Intersect(intersect) => {
671 collect_cte_names_inner(&intersect.left, names, seen);
672 collect_cte_names_inner(&intersect.right, names, seen);
673 }
674 Expression::Except(except) => {
675 collect_cte_names_inner(&except.left, names, seen);
676 collect_cte_names_inner(&except.right, names, seen);
677 }
678 Expression::Subquery(subquery) => collect_cte_names_inner(&subquery.this, names, seen),
679 _ => {}
680 }
681}
682
683fn collect_with_names(with_clause: &With, names: &mut Vec<String>, seen: &mut HashSet<String>) {
684 for cte in &with_clause.ctes {
685 if seen.insert(cte.alias.name.clone()) {
686 names.push(cte.alias.name.clone());
687 }
688 collect_cte_names_inner(&cte.this, names, seen);
689 }
690}
691
692fn with_clause(expression: &Expression) -> Option<&With> {
693 match expression {
694 Expression::Select(select) => select.with.as_ref(),
695 Expression::Union(union) => union.with.as_ref(),
696 Expression::Intersect(intersect) => intersect.with.as_ref(),
697 Expression::Except(except) => except.with.as_ref(),
698 _ => None,
699 }
700}
701
702fn projection_facts_for_query(
703 expression: &Expression,
704 scope: &Scope,
705 dialect: DialectType,
706 nullability_context: &NullabilityContext<'_>,
707) -> Vec<ProjectionFact> {
708 let expressions = select_expressions_for_query(expression);
709 let names = get_output_column_names(expression);
710
711 expressions
712 .iter()
713 .enumerate()
714 .map(|(index, projection)| {
715 projection_fact(
716 index,
717 names
718 .get(index)
719 .cloned()
720 .or_else(|| projection_name(projection)),
721 projection,
722 expression,
723 scope,
724 dialect,
725 nullability_context,
726 )
727 })
728 .collect()
729}
730
731fn select_expressions_for_query(expression: &Expression) -> Vec<&Expression> {
732 match expression {
733 Expression::Select(select) => select.expressions.iter().collect(),
734 Expression::Union(union) => select_expressions_for_query(&union.left),
735 Expression::Intersect(intersect) => select_expressions_for_query(&intersect.left),
736 Expression::Except(except) => select_expressions_for_query(&except.left),
737 Expression::Subquery(subquery) => select_expressions_for_query(&subquery.this),
738 _ => Vec::new(),
739 }
740}
741
742fn projection_fact(
743 index: usize,
744 name: Option<String>,
745 projection: &Expression,
746 query: &Expression,
747 scope: &Scope,
748 dialect: DialectType,
749 nullability_context: &NullabilityContext<'_>,
750) -> ProjectionFact {
751 let inner = unwrap_projection_alias(projection);
752 let is_star = projection_is_star(inner);
753 let upstream = lineage_by_index_from_expression(index, query, Some(dialect), false)
754 .map(|node| terminal_references_from_lineage(&node))
755 .ok()
756 .filter(|refs| !refs.is_empty())
757 .unwrap_or_else(|| fallback_column_references(inner, scope));
758
759 ProjectionFact {
760 index,
761 name,
762 is_star,
763 star_table: projection_star_table(inner),
764 transform_kind: transform_kind(inner),
765 transform_function: transform_function_fact(inner, scope, dialect),
766 cast_type: cast_type(inner, dialect),
767 type_hint: projection
768 .inferred_type()
769 .or_else(|| inner.inferred_type())
770 .and_then(|data_type| render_data_type(data_type, dialect)),
771 nullability: projection_nullability(inner, scope, nullability_context),
772 upstream,
773 }
774}
775
776fn transform_function_fact(
777 expression: &Expression,
778 scope: &Scope,
779 dialect: DialectType,
780) -> Option<TransformFunctionFact> {
781 let mut matches = expression
782 .find_all(|candidate| transform_function_fact_for_node(candidate, scope, dialect).is_some())
783 .into_iter();
784
785 let first = matches.next()?;
786 if matches.next().is_some() {
787 return None;
788 }
789
790 transform_function_fact_for_node(first, scope, dialect)
791}
792
793fn transform_function_fact_for_node(
794 expression: &Expression,
795 scope: &Scope,
796 dialect: DialectType,
797) -> Option<TransformFunctionFact> {
798 match expression {
799 Expression::Function(function) => Some(transform_function_from_args(
800 &function.name,
801 &function.args,
802 scope,
803 dialect,
804 )),
805 Expression::AggregateFunction(function) => Some(transform_function_from_args(
806 &function.name,
807 &function.args,
808 scope,
809 dialect,
810 )),
811 Expression::DateTrunc(function) => Some(transform_function_from_parts(
812 "DATE_TRUNC",
813 vec![datetime_field_name(&function.unit)],
814 vec![&function.this],
815 scope,
816 dialect,
817 )),
818 Expression::TimestampTrunc(function) => Some(transform_function_from_parts(
819 "TIMESTAMP_TRUNC",
820 vec![datetime_field_name(&function.unit)],
821 vec![&function.this],
822 scope,
823 dialect,
824 )),
825 Expression::TimeTrunc(function) => {
826 let mut args = vec![function.this.as_ref()];
827 if let Some(zone) = function.zone.as_deref() {
828 args.push(zone);
829 }
830 Some(transform_function_from_parts(
831 "TIME_TRUNC",
832 vec![function.unit.clone()],
833 args,
834 scope,
835 dialect,
836 ))
837 }
838 Expression::Extract(function) => Some(transform_function_from_parts(
839 "EXTRACT",
840 vec![datetime_field_name(&function.field)],
841 vec![&function.this],
842 scope,
843 dialect,
844 )),
845 Expression::DateAdd(function) => Some(transform_function_from_parts(
846 "DATE_ADD",
847 Vec::new(),
848 vec![&function.this, &function.interval],
849 scope,
850 dialect,
851 )),
852 Expression::DateSub(function) => Some(transform_function_from_parts(
853 "DATE_SUB",
854 Vec::new(),
855 vec![&function.this, &function.interval],
856 scope,
857 dialect,
858 )),
859 Expression::DateDiff(function) => Some(transform_function_from_parts(
860 "DATE_DIFF",
861 Vec::new(),
862 vec![&function.this, &function.expression],
863 scope,
864 dialect,
865 )),
866 _ => None,
867 }
868}
869
870fn transform_function_from_args(
871 name: &str,
872 args: &[Expression],
873 scope: &Scope,
874 dialect: DialectType,
875) -> TransformFunctionFact {
876 let literal_args = args
877 .iter()
878 .filter_map(|arg| literal_argument(arg, dialect))
879 .collect();
880 transform_function_from_parts(name, literal_args, args.iter().collect(), scope, dialect)
881}
882
883fn transform_function_from_parts(
884 name: &str,
885 literal_args: Vec<String>,
886 args: Vec<&Expression>,
887 scope: &Scope,
888 _dialect: DialectType,
889) -> TransformFunctionFact {
890 let column_args = dedupe_column_refs(
891 args.into_iter()
892 .flat_map(|arg| fallback_column_references(arg, scope))
893 .collect(),
894 );
895
896 TransformFunctionFact {
897 name: name.to_string(),
898 literal_args,
899 column_args,
900 }
901}
902
903fn literal_argument(expression: &Expression, dialect: DialectType) -> Option<String> {
904 match expression {
905 Expression::Literal(literal) => Some(literal.value_str().to_string()),
906 Expression::Boolean(boolean) => Some(boolean.value.to_string()),
907 Expression::Null(_) => Some("NULL".to_string()),
908 Expression::Identifier(identifier) => Some(identifier.name.clone()),
909 Expression::Var(var) => Some(var.this.clone()),
910 Expression::DataType(data_type) => render_data_type(data_type, dialect),
911 _ => None,
912 }
913}
914
915fn datetime_field_name(field: &crate::expressions::DateTimeField) -> String {
916 match field {
917 crate::expressions::DateTimeField::Year => "year".to_string(),
918 crate::expressions::DateTimeField::Month => "month".to_string(),
919 crate::expressions::DateTimeField::Day => "day".to_string(),
920 crate::expressions::DateTimeField::Hour => "hour".to_string(),
921 crate::expressions::DateTimeField::Minute => "minute".to_string(),
922 crate::expressions::DateTimeField::Second => "second".to_string(),
923 crate::expressions::DateTimeField::Millisecond => "millisecond".to_string(),
924 crate::expressions::DateTimeField::Microsecond => "microsecond".to_string(),
925 crate::expressions::DateTimeField::DayOfWeek => "day_of_week".to_string(),
926 crate::expressions::DateTimeField::DayOfYear => "day_of_year".to_string(),
927 crate::expressions::DateTimeField::Week => "week".to_string(),
928 crate::expressions::DateTimeField::WeekWithModifier(modifier) => {
929 format!("week({modifier})")
930 }
931 crate::expressions::DateTimeField::Quarter => "quarter".to_string(),
932 crate::expressions::DateTimeField::Epoch => "epoch".to_string(),
933 crate::expressions::DateTimeField::Timezone => "timezone".to_string(),
934 crate::expressions::DateTimeField::TimezoneHour => "timezone_hour".to_string(),
935 crate::expressions::DateTimeField::TimezoneMinute => "timezone_minute".to_string(),
936 crate::expressions::DateTimeField::Date => "date".to_string(),
937 crate::expressions::DateTimeField::Time => "time".to_string(),
938 crate::expressions::DateTimeField::Custom(name) => name.clone(),
939 }
940}
941
942fn unwrap_projection_alias(expression: &Expression) -> &Expression {
943 match expression {
944 Expression::Alias(alias) => unwrap_projection_alias(&alias.this),
945 Expression::Annotated(annotated) => unwrap_projection_alias(&annotated.this),
946 Expression::Paren(paren) => unwrap_projection_alias(&paren.this),
947 _ => expression,
948 }
949}
950
951fn projection_name(expression: &Expression) -> Option<String> {
952 match expression {
953 Expression::Alias(alias) => Some(alias.alias.name.clone()),
954 Expression::Column(column) => Some(column.name.name.clone()),
955 Expression::Identifier(identifier) => Some(identifier.name.clone()),
956 Expression::Star(_) => Some("*".to_string()),
957 Expression::Annotated(annotated) => projection_name(&annotated.this),
958 _ => None,
959 }
960}
961
962fn projection_is_star(expression: &Expression) -> bool {
963 matches!(expression, Expression::Star(_))
964 || matches!(expression, Expression::Column(column) if column.name.name == "*")
965}
966
967fn projection_star_table(expression: &Expression) -> Option<String> {
968 match expression {
969 Expression::Star(star) => star
970 .table
971 .as_ref()
972 .map(|identifier| identifier.name.clone()),
973 Expression::Column(column) if column.name.name == "*" => column
974 .table
975 .as_ref()
976 .map(|identifier| identifier.name.clone()),
977 _ => None,
978 }
979}
980
981fn transform_kind(expression: &Expression) -> TransformKind {
982 if projection_is_star(expression) {
983 TransformKind::Star
984 } else if is_cast_expression(expression) {
985 TransformKind::Cast
986 } else if contains_aggregate(expression) {
987 TransformKind::Aggregation
988 } else if matches!(
989 expression,
990 Expression::Column(_) | Expression::Identifier(_)
991 ) {
992 TransformKind::Direct
993 } else if is_simple_constant(expression) {
994 TransformKind::Constant
995 } else {
996 TransformKind::Expression
997 }
998}
999
1000fn is_cast_expression(expression: &Expression) -> bool {
1001 matches!(
1002 expression,
1003 Expression::Cast(_) | Expression::TryCast(_) | Expression::SafeCast(_)
1004 )
1005}
1006
1007fn cast_type(expression: &Expression, dialect: DialectType) -> Option<String> {
1008 match expression {
1009 Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1010 render_data_type(&cast.to, dialect)
1011 }
1012 _ => None,
1013 }
1014}
1015
1016fn render_data_type(data_type: &DataType, dialect: DialectType) -> Option<String> {
1017 Dialect::get(dialect)
1018 .generate(&Expression::DataType(data_type.clone()))
1019 .ok()
1020}
1021
1022fn is_simple_constant(expression: &Expression) -> bool {
1023 match expression {
1024 Expression::Literal(_) | Expression::Boolean(_) | Expression::Null(_) => true,
1025 Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1026 is_simple_constant(&cast.this)
1027 }
1028 Expression::Neg(unary) | Expression::BitwiseNot(unary) => is_simple_constant(&unary.this),
1029 _ => false,
1030 }
1031}
1032
1033fn projection_nullability(
1034 expression: &Expression,
1035 scope: &Scope,
1036 context: &NullabilityContext<'_>,
1037) -> ProjectionNullability {
1038 match expression {
1039 Expression::Alias(alias) => projection_nullability(&alias.this, scope, context),
1040 Expression::Annotated(annotated) => projection_nullability(&annotated.this, scope, context),
1041 Expression::Paren(paren) => projection_nullability(&paren.this, scope, context),
1042 Expression::Literal(_) | Expression::Boolean(_) => ProjectionNullability::NonNull,
1043 Expression::Null(_) => ProjectionNullability::Nullable,
1044 Expression::Count(_) | Expression::CountIf(_) => ProjectionNullability::NonNull,
1045 Expression::Cast(cast) => projection_nullability(&cast.this, scope, context),
1046 Expression::TryCast(_) | Expression::SafeCast(_) => ProjectionNullability::Unknown,
1047 Expression::Column(column) => column_nullability(
1048 &column.name.name,
1049 column.table.as_ref().map(|table| table.name.as_str()),
1050 scope,
1051 context,
1052 ),
1053 Expression::Identifier(identifier) => {
1054 column_nullability(&identifier.name, None, scope, context)
1055 }
1056 Expression::Coalesce(func) => coalesce_nullability(&func.expressions, scope, context),
1057 _ => ProjectionNullability::Unknown,
1058 }
1059}
1060
1061fn column_nullability(
1062 column_name: &str,
1063 source_name: Option<&str>,
1064 scope: &Scope,
1065 context: &NullabilityContext<'_>,
1066) -> ProjectionNullability {
1067 let resolved_source_name = source_name
1068 .map(str::to_string)
1069 .or_else(|| single_scope_source_name(scope));
1070
1071 if let Some(source_name) = &resolved_source_name {
1072 if context
1073 .nullable_sources
1074 .contains(&normalize_lookup_name(source_name))
1075 {
1076 return ProjectionNullability::Nullable;
1077 }
1078 }
1079
1080 let Some(schema) = context.schema else {
1081 return ProjectionNullability::Unknown;
1082 };
1083
1084 let table_name = resolved_source_name
1085 .as_ref()
1086 .and_then(|name| scope.sources.get(name).and_then(source_table_name))
1087 .or(resolved_source_name);
1088
1089 let Some(table_name) = table_name else {
1090 return ProjectionNullability::Unknown;
1091 };
1092
1093 match schema.column(&table_name, column_name) {
1094 Some(info) if info.primary_key || info.nullable == Some(false) => {
1095 ProjectionNullability::NonNull
1096 }
1097 Some(info) if info.nullable == Some(true) => ProjectionNullability::Nullable,
1098 Some(_) | None => ProjectionNullability::Unknown,
1099 }
1100}
1101
1102fn single_scope_source_name(scope: &Scope) -> Option<String> {
1103 if scope.sources.len() == 1 {
1104 scope.sources.keys().next().cloned()
1105 } else {
1106 None
1107 }
1108}
1109
1110fn coalesce_nullability(
1111 expressions: &[Expression],
1112 scope: &Scope,
1113 context: &NullabilityContext<'_>,
1114) -> ProjectionNullability {
1115 if expressions.is_empty() {
1116 return ProjectionNullability::Unknown;
1117 }
1118
1119 let mut all_nullable = true;
1120
1121 for expression in expressions {
1122 match projection_nullability(unwrap_projection_alias(expression), scope, context) {
1123 ProjectionNullability::NonNull => return ProjectionNullability::NonNull,
1124 ProjectionNullability::Nullable => {}
1125 ProjectionNullability::Unknown => all_nullable = false,
1126 }
1127 }
1128
1129 if all_nullable {
1130 ProjectionNullability::Nullable
1131 } else {
1132 ProjectionNullability::Unknown
1133 }
1134}
1135
1136fn terminal_references_from_lineage(node: &LineageNode) -> Vec<ColumnReferenceFact> {
1137 let mut refs = Vec::new();
1138 collect_terminal_references(node, &mut refs);
1139 dedupe_column_refs(refs)
1140}
1141
1142fn collect_terminal_references(node: &LineageNode, refs: &mut Vec<ColumnReferenceFact>) {
1143 if node.downstream.is_empty() {
1144 if let Some(reference) = column_reference_from_lineage_node(node) {
1145 refs.push(reference);
1146 }
1147 return;
1148 }
1149
1150 for child in &node.downstream {
1151 collect_terminal_references(child, refs);
1152 }
1153}
1154
1155fn column_reference_from_lineage_node(node: &LineageNode) -> Option<ColumnReferenceFact> {
1156 match &node.expression {
1157 Expression::Column(column) => {
1158 let source_name = non_empty_string(node.source_name.clone());
1159 let table =
1160 lineage_node_table(node).or_else(|| column.table.as_ref().map(|t| t.name.clone()));
1161 let confidence = if node.source_kind == SourceKind::Unknown && source_name.is_none() {
1162 ReferenceConfidence::Unknown
1163 } else {
1164 ReferenceConfidence::Resolved
1165 };
1166 Some(ColumnReferenceFact {
1167 source_name,
1168 source_alias: node.source_alias.clone(),
1169 source_kind: node.source_kind,
1170 table,
1171 column: column.name.name.clone(),
1172 unqualified: column.table.is_none(),
1173 confidence,
1174 })
1175 }
1176 Expression::Star(_) => Some(ColumnReferenceFact {
1177 source_name: non_empty_string(node.source_name.clone()),
1178 source_alias: node.source_alias.clone(),
1179 source_kind: node.source_kind,
1180 table: lineage_node_table(node),
1181 column: "*".to_string(),
1182 unqualified: true,
1183 confidence: if node.source_kind == SourceKind::Unknown {
1184 ReferenceConfidence::Unknown
1185 } else {
1186 ReferenceConfidence::Resolved
1187 },
1188 }),
1189 _ => None,
1190 }
1191}
1192
1193fn lineage_node_table(node: &LineageNode) -> Option<String> {
1194 match &node.source {
1195 Expression::Table(table) => Some(table_name(table)),
1196 _ => None,
1197 }
1198}
1199
1200fn fallback_column_references(expression: &Expression, scope: &Scope) -> Vec<ColumnReferenceFact> {
1201 let mut refs = Vec::new();
1202 let source_count = scope.sources.len();
1203 let single_source = if source_count == 1 {
1204 scope.sources.iter().next()
1205 } else {
1206 None
1207 };
1208
1209 for column_expr in expression.find_all(|candidate| matches!(candidate, Expression::Column(_))) {
1210 if let Expression::Column(column) = column_expr {
1211 if column.name.name == "*" {
1212 continue;
1213 }
1214 let source = column
1215 .table
1216 .as_ref()
1217 .and_then(|table| scope.sources.get(&table.name));
1218 let (source_name, source_alias, source_kind, table, confidence) =
1219 if let Some(table_identifier) = &column.table {
1220 if let Some(source) = source {
1221 (
1222 Some(table_identifier.name.clone()),
1223 source.alias.clone(),
1224 source.kind,
1225 source_table_name(source)
1226 .or_else(|| Some(table_identifier.name.clone())),
1227 ReferenceConfidence::Resolved,
1228 )
1229 } else {
1230 (
1231 Some(table_identifier.name.clone()),
1232 None,
1233 SourceKind::Unknown,
1234 Some(table_identifier.name.clone()),
1235 ReferenceConfidence::Unknown,
1236 )
1237 }
1238 } else if let Some((name, source)) = single_source {
1239 (
1240 Some(name.clone()),
1241 source.alias.clone(),
1242 source.kind,
1243 source_table_name(source).or_else(|| Some(name.clone())),
1244 ReferenceConfidence::Resolved,
1245 )
1246 } else if source_count > 1 {
1247 (
1248 None,
1249 None,
1250 SourceKind::Unknown,
1251 None,
1252 ReferenceConfidence::Ambiguous,
1253 )
1254 } else {
1255 (
1256 None,
1257 None,
1258 SourceKind::Unknown,
1259 None,
1260 ReferenceConfidence::Unknown,
1261 )
1262 };
1263
1264 refs.push(ColumnReferenceFact {
1265 source_name,
1266 source_alias,
1267 source_kind,
1268 table,
1269 column: column.name.name.clone(),
1270 unqualified: column.table.is_none(),
1271 confidence,
1272 });
1273 }
1274 }
1275
1276 dedupe_column_refs(refs)
1277}
1278
1279fn dedupe_column_refs(refs: Vec<ColumnReferenceFact>) -> Vec<ColumnReferenceFact> {
1280 let mut seen = HashSet::new();
1281 let mut deduped = Vec::new();
1282
1283 for reference in refs {
1284 let key = (
1285 reference.source_name.clone(),
1286 reference.source_alias.clone(),
1287 reference.table.clone(),
1288 reference.column.clone(),
1289 format!("{:?}", reference.source_kind),
1290 reference.unqualified,
1291 format!("{:?}", reference.confidence),
1292 );
1293 if seen.insert(key) {
1294 deduped.push(reference);
1295 }
1296 }
1297
1298 deduped
1299}
1300
1301fn relation_facts(
1302 scope: &Scope,
1303 mapping_schema: Option<&crate::schema::MappingSchema>,
1304) -> Vec<RelationFact> {
1305 let mut relations = Vec::new();
1306 let mut seen = HashSet::new();
1307 collect_relation_facts(scope, mapping_schema, &mut seen, &mut relations);
1308
1309 relations.sort_by(|left, right| {
1310 left.name
1311 .cmp(&right.name)
1312 .then_with(|| left.alias.cmp(&right.alias))
1313 });
1314 relations
1315}
1316
1317fn collect_relation_facts(
1318 scope: &Scope,
1319 mapping_schema: Option<&crate::schema::MappingSchema>,
1320 seen: &mut HashSet<String>,
1321 relations: &mut Vec<RelationFact>,
1322) {
1323 for relation in scope.sources.iter().map(|(source_name, source)| {
1324 let identity = source_table_identity(source);
1325 RelationFact {
1326 name: source
1327 .lineage_name
1328 .clone()
1329 .or_else(|| identity.as_ref().map(|identity| identity.name.clone()))
1330 .unwrap_or_else(|| source_name.clone()),
1331 alias: source.alias.clone().or_else(|| source_alias(source)),
1332 kind: source.kind,
1333 columns: source_columns(source, mapping_schema),
1334 catalog: identity
1335 .as_ref()
1336 .and_then(|identity| identity.catalog.clone()),
1337 schema: identity
1338 .as_ref()
1339 .and_then(|identity| identity.schema.clone()),
1340 table: identity
1341 .as_ref()
1342 .and_then(|identity| identity.table.clone()),
1343 }
1344 }) {
1345 let key = format!("{:?}|{}|{:?}", relation.kind, relation.name, relation.alias);
1346 if seen.insert(key) {
1347 relations.push(relation);
1348 }
1349 }
1350
1351 for branch_scope in &scope.union_scopes {
1352 collect_relation_facts(branch_scope, mapping_schema, seen, relations);
1353 }
1354}
1355
1356fn base_table_facts(
1357 scope: &Scope,
1358 mapping_schema: Option<&crate::schema::MappingSchema>,
1359) -> Vec<RelationFact> {
1360 let mut relations = Vec::new();
1361 let mut seen = HashSet::new();
1362
1363 collect_base_table_facts(scope, mapping_schema, &mut seen, &mut relations);
1364
1365 relations.sort_by(|left, right| left.name.cmp(&right.name));
1366 relations
1367}
1368
1369fn collect_base_table_facts(
1370 scope: &Scope,
1371 mapping_schema: Option<&crate::schema::MappingSchema>,
1372 seen: &mut HashSet<String>,
1373 relations: &mut Vec<RelationFact>,
1374) {
1375 for source in scope.sources.values() {
1376 if source.kind != SourceKind::Table {
1377 continue;
1378 }
1379
1380 let Some(identity) = source_table_identity(source) else {
1381 continue;
1382 };
1383
1384 if seen.insert(identity.name.clone()) {
1385 relations.push(RelationFact {
1386 name: identity.name,
1387 alias: source.alias.clone().or_else(|| source_alias(source)),
1388 kind: SourceKind::Table,
1389 columns: source_columns(source, mapping_schema),
1390 catalog: identity.catalog,
1391 schema: identity.schema,
1392 table: identity.table,
1393 });
1394 }
1395 }
1396
1397 for child_scope in scope
1398 .cte_scopes
1399 .iter()
1400 .chain(scope.union_scopes.iter())
1401 .chain(scope.table_scopes.iter())
1402 .chain(scope.derived_table_scopes.iter())
1403 .chain(scope.subquery_scopes.iter())
1404 {
1405 collect_base_table_facts(child_scope, mapping_schema, seen, relations);
1406 }
1407}
1408
1409fn source_columns(
1410 source: &SourceInfo,
1411 mapping_schema: Option<&crate::schema::MappingSchema>,
1412) -> Vec<String> {
1413 match &source.expression {
1414 Expression::Table(table) => mapping_schema
1415 .and_then(|schema| schema.column_names(&table_name(table)).ok())
1416 .unwrap_or_default(),
1417 Expression::Select(_)
1418 | Expression::Union(_)
1419 | Expression::Intersect(_)
1420 | Expression::Except(_) => get_output_column_names(&source.expression),
1421 Expression::Subquery(subquery) => get_output_column_names(&subquery.this),
1422 Expression::Cte(cte) if !cte.columns.is_empty() => cte
1423 .columns
1424 .iter()
1425 .map(|column| column.name.clone())
1426 .collect(),
1427 Expression::Cte(cte) => get_output_column_names(&cte.this),
1428 _ => Vec::new(),
1429 }
1430}
1431
1432fn source_table_name(source: &SourceInfo) -> Option<String> {
1433 source_table_identity(source).map(|identity| identity.name)
1434}
1435
1436fn source_alias(source: &SourceInfo) -> Option<String> {
1437 match &source.expression {
1438 Expression::Table(table) => table.alias.as_ref().map(|alias| alias.name.clone()),
1439 Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
1440 _ => None,
1441 }
1442}
1443
1444fn table_name(table: &TableRef) -> String {
1445 let mut parts = Vec::new();
1446 if let Some(catalog) = &table.catalog {
1447 parts.push(catalog.name.clone());
1448 }
1449 if let Some(schema) = &table.schema {
1450 parts.push(schema.name.clone());
1451 }
1452 parts.push(table.name.name.clone());
1453 parts.join(".")
1454}
1455
1456#[derive(Debug, Clone)]
1457struct RelationIdentity {
1458 name: String,
1459 catalog: Option<String>,
1460 schema: Option<String>,
1461 table: Option<String>,
1462}
1463
1464fn source_table_identity(source: &SourceInfo) -> Option<RelationIdentity> {
1465 match &source.expression {
1466 Expression::Table(table) => Some(table_identity(table)),
1467 _ => None,
1468 }
1469}
1470
1471fn table_identity(table: &TableRef) -> RelationIdentity {
1472 RelationIdentity {
1473 name: table_name(table),
1474 catalog: table.catalog.as_ref().map(|catalog| catalog.name.clone()),
1475 schema: table.schema.as_ref().map(|schema| schema.name.clone()),
1476 table: Some(table.name.name.clone()),
1477 }
1478}
1479
1480fn set_operation_facts(
1481 expression: &Expression,
1482 scope: &Scope,
1483 dialect: DialectType,
1484) -> Vec<SetOperationFact> {
1485 let mut facts = Vec::new();
1486 collect_set_operation_facts(expression, scope, dialect, &mut facts);
1487 facts
1488}
1489
1490fn collect_set_operation_facts(
1491 expression: &Expression,
1492 scope: &Scope,
1493 dialect: DialectType,
1494 facts: &mut Vec<SetOperationFact>,
1495) {
1496 match expression {
1497 Expression::Union(union) => {
1498 facts.push(SetOperationFact {
1499 kind: "union".to_string(),
1500 all: union.all,
1501 distinct: union.distinct,
1502 output_columns: get_output_column_names(expression),
1503 branches: set_operation_branches(&union.left, &union.right, scope, dialect),
1504 });
1505 collect_set_operation_facts(&union.left, scope, dialect, facts);
1506 collect_set_operation_facts(&union.right, scope, dialect, facts);
1507 }
1508 Expression::Intersect(intersect) => {
1509 facts.push(SetOperationFact {
1510 kind: "intersect".to_string(),
1511 all: intersect.all,
1512 distinct: intersect.distinct,
1513 output_columns: get_output_column_names(expression),
1514 branches: set_operation_branches(&intersect.left, &intersect.right, scope, dialect),
1515 });
1516 collect_set_operation_facts(&intersect.left, scope, dialect, facts);
1517 collect_set_operation_facts(&intersect.right, scope, dialect, facts);
1518 }
1519 Expression::Except(except) => {
1520 facts.push(SetOperationFact {
1521 kind: "except".to_string(),
1522 all: except.all,
1523 distinct: except.distinct,
1524 output_columns: get_output_column_names(expression),
1525 branches: set_operation_branches(&except.left, &except.right, scope, dialect),
1526 });
1527 collect_set_operation_facts(&except.left, scope, dialect, facts);
1528 collect_set_operation_facts(&except.right, scope, dialect, facts);
1529 }
1530 Expression::Subquery(subquery) => {
1531 collect_set_operation_facts(&subquery.this, scope, dialect, facts);
1532 }
1533 _ => {}
1534 }
1535}
1536
1537fn set_operation_branches(
1538 left: &Expression,
1539 right: &Expression,
1540 scope: &Scope,
1541 dialect: DialectType,
1542) -> Vec<SetOperationBranchFact> {
1543 vec![
1544 SetOperationBranchFact {
1545 index: 0,
1546 projections: projection_facts_for_branch(left, scope, dialect),
1547 },
1548 SetOperationBranchFact {
1549 index: 1,
1550 projections: projection_facts_for_branch(right, scope, dialect),
1551 },
1552 ]
1553}
1554
1555fn projection_facts_for_branch(
1556 expression: &Expression,
1557 root_scope: &Scope,
1558 dialect: DialectType,
1559) -> Vec<ProjectionFact> {
1560 let branch_scope = build_scope(expression);
1561 let scope = if branch_scope.sources.is_empty() {
1562 root_scope
1563 } else {
1564 &branch_scope
1565 };
1566 let nullability_context = NullabilityContext {
1567 schema: None,
1568 nullable_sources: nullable_source_names(expression),
1569 };
1570 projection_facts_for_query(expression, scope, dialect, &nullability_context)
1571}
1572
1573fn non_empty_string(value: String) -> Option<String> {
1574 if value.is_empty() {
1575 None
1576 } else {
1577 Some(value)
1578 }
1579}