1use crate::ast_transforms::get_output_column_names;
9use crate::dialects::{Dialect, DialectType};
10use crate::expressions::{DataType, Expression, JoinKind, TableRef, With};
11use crate::lineage::{lineage_by_index_from_expression, LineageNode};
12use crate::optimizer::annotate_types::annotate_types;
13use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
14use crate::schema::{MappingSchema, Schema};
15use crate::scope::{build_scope, Scope, SourceInfo, SourceKind};
16use crate::traversal::{contains_aggregate, ExpressionWalk};
17use crate::validation::{mapping_schema_from_validation_schema, ValidationSchema};
18use crate::{parse_data_type, parse_one, Error, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21
22#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24#[serde(rename_all = "camelCase", default)]
25pub struct AnalyzeQueryOptions {
26 pub dialect: DialectType,
28 pub schema: Option<ValidationSchema>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct QueryAnalysis {
36 pub shape: QueryShape,
37 pub ctes: Vec<String>,
38 pub cte_facts: Vec<CteFact>,
39 pub projections: Vec<ProjectionFact>,
40 pub relations: Vec<RelationFact>,
41 pub base_tables: Vec<RelationFact>,
42 pub star_projections: Vec<StarProjectionFact>,
43 pub set_operations: Vec<SetOperationFact>,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum QueryShape {
50 Select,
51 SetOperation,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(rename_all = "camelCase")]
57pub struct ProjectionFact {
58 pub index: usize,
59 pub name: Option<String>,
60 pub is_star: bool,
61 pub star_table: Option<String>,
62 pub transform_kind: TransformKind,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub transform_function: Option<TransformFunctionFact>,
65 pub cast_type: Option<String>,
66 pub type_hint: Option<String>,
67 pub nullability: ProjectionNullability,
68 pub upstream: Vec<ColumnReferenceFact>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct TransformFunctionFact {
75 pub name: String,
76 pub literal_args: Vec<String>,
77 pub column_args: Vec<ColumnReferenceFact>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct CteFact {
84 pub name: String,
85 pub columns: Vec<String>,
86 pub body_sql: String,
87 pub output_columns: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "camelCase")]
93pub struct StarProjectionFact {
94 pub index: usize,
95 pub table: Option<String>,
96 pub expanded_columns: Vec<String>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "camelCase")]
102pub struct ColumnReferenceFact {
103 pub source_name: Option<String>,
104 pub source_alias: Option<String>,
105 pub source_kind: SourceKind,
106 pub table: Option<String>,
107 pub column: String,
108 pub unqualified: bool,
109 pub confidence: ReferenceConfidence,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct RelationFact {
116 pub name: String,
117 pub alias: Option<String>,
118 pub kind: SourceKind,
119 pub columns: Vec<String>,
120 pub catalog: Option<String>,
121 pub schema: Option<String>,
122 pub table: Option<String>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct SetOperationFact {
129 pub kind: String,
130 pub all: bool,
131 pub distinct: bool,
132 pub output_columns: Vec<String>,
133 pub branches: Vec<SetOperationBranchFact>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138#[serde(rename_all = "camelCase")]
139pub struct SetOperationBranchFact {
140 pub index: usize,
141 pub projections: Vec<ProjectionFact>,
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub enum TransformKind {
148 Direct,
149 Cast,
150 Aggregation,
151 Constant,
152 Expression,
153 Star,
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum ReferenceConfidence {
160 Resolved,
161 Ambiguous,
162 Unknown,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum ProjectionNullability {
169 NonNull,
170 Nullable,
171 Unknown,
172}
173
174pub fn analyze_query(sql: &str, options: AnalyzeQueryOptions) -> Result<QueryAnalysis> {
176 let mut expression = parse_one(sql, options.dialect)?;
177 expression = effective_query(expression);
178 ensure_query(&expression)?;
179 let original_expression = expression.clone();
180
181 let mapping_schema = options
182 .schema
183 .as_ref()
184 .map(|schema| analysis_mapping_schema(schema, options.dialect));
185 let schema_info = options.schema.as_ref().map(AnalysisSchemaInfo::from_schema);
186 let cte_facts = top_level_cte_facts(&original_expression, options.dialect)?;
187 let star_projections = star_projection_facts(&original_expression, mapping_schema.as_ref());
188
189 if let Some(schema) = mapping_schema.as_ref() {
190 let qualify_options = QualifyColumnsOptions::new().with_dialect(options.dialect);
191 expression = qualify_columns(expression, schema, &qualify_options)
192 .map_err(|e| Error::internal(format!("query analysis qualification failed: {e}")))?;
193 }
194
195 let annotation_schema = mapping_schema.as_ref().map(|schema| {
196 let mut alias_schema = schema.clone();
197 add_scope_aliases_to_schema(
198 &build_scope(&expression),
199 schema,
200 &mut alias_schema,
201 options.dialect,
202 );
203 alias_schema
204 });
205
206 annotate_types(
207 &mut expression,
208 annotation_schema
209 .as_ref()
210 .map(|schema| schema as &dyn Schema),
211 Some(options.dialect),
212 );
213 crate::lineage::expand_cte_stars(
214 &mut expression,
215 annotation_schema
216 .as_ref()
217 .or(mapping_schema.as_ref())
218 .map(|schema| schema as &dyn Schema),
219 );
220
221 let scope = build_scope(&expression);
222 let nullability_context = NullabilityContext {
223 schema: schema_info.as_ref(),
224 nullable_sources: nullable_source_names(&expression),
225 };
226 let shape = if is_set_operation(&expression) {
227 QueryShape::SetOperation
228 } else {
229 QueryShape::Select
230 };
231
232 Ok(QueryAnalysis {
233 shape,
234 ctes: collect_cte_names(&expression),
235 cte_facts,
236 projections: projection_facts_for_query(
237 &expression,
238 &scope,
239 options.dialect,
240 &nullability_context,
241 ),
242 relations: relation_facts(&scope, mapping_schema.as_ref()),
243 base_tables: base_table_facts(&scope, mapping_schema.as_ref()),
244 star_projections,
245 set_operations: set_operation_facts(&expression, &scope, options.dialect),
246 })
247}
248
249fn analysis_mapping_schema(schema: &ValidationSchema, dialect: DialectType) -> MappingSchema {
250 let broad_schema = mapping_schema_from_validation_schema(schema);
251 let mut mapping_schema = MappingSchema::with_dialect(dialect);
252
253 for table in &schema.tables {
254 let table_names = validation_table_names(table);
255 if table_names.is_empty() {
256 continue;
257 }
258
259 let fallback_table = table_names[0].as_str();
260 let columns: Vec<(String, DataType)> = table
261 .columns
262 .iter()
263 .map(|column| {
264 let data_type = parse_analysis_data_type(&column.data_type, dialect)
265 .unwrap_or_else(|| {
266 broad_schema
267 .get_column_type(fallback_table, &column.name)
268 .unwrap_or(DataType::Unknown)
269 });
270 (column.name.to_ascii_lowercase(), data_type)
271 })
272 .collect();
273
274 for table_name in table_names {
275 let _ = mapping_schema.add_table(&table_name, &columns, Some(dialect));
276 }
277 }
278
279 mapping_schema
280}
281
282fn validation_table_names(table: &crate::validation::SchemaTable) -> Vec<String> {
283 let mut names = Vec::new();
284
285 names.push(table.name.to_ascii_lowercase());
286 if let Some(schema_name) = &table.schema {
287 names.push(format!(
288 "{}.{}",
289 schema_name.to_ascii_lowercase(),
290 table.name.to_ascii_lowercase()
291 ));
292 }
293 for alias in &table.aliases {
294 names.push(alias.to_ascii_lowercase());
295 }
296
297 names.sort();
298 names.dedup();
299 names
300}
301
302fn parse_analysis_data_type(data_type: &str, dialect: DialectType) -> Option<DataType> {
303 let trimmed = data_type.trim();
304 if trimmed.is_empty() {
305 return None;
306 }
307 parse_data_type(trimmed, dialect).ok()
308}
309
310fn add_scope_aliases_to_schema(
311 scope: &Scope,
312 source_schema: &MappingSchema,
313 target_schema: &mut MappingSchema,
314 dialect: DialectType,
315) {
316 for child_scope in scope.traverse() {
317 for (source_name, source) in &child_scope.sources {
318 if source.kind != SourceKind::Table {
319 continue;
320 }
321 if let Some(table_name) = source_table_name(source) {
322 if source_name == &table_name {
323 continue;
324 }
325 if let Ok(column_names) = source_schema.column_names(&table_name) {
326 let columns: Vec<(String, DataType)> = column_names
327 .iter()
328 .map(|column| {
329 (
330 column.clone(),
331 source_schema
332 .get_column_type(&table_name, column)
333 .unwrap_or(DataType::Unknown),
334 )
335 })
336 .collect();
337 let _ = target_schema.add_table(source_name, &columns, Some(dialect));
338 }
339 }
340 }
341 }
342}
343
344#[derive(Debug, Clone)]
345struct AnalysisColumnInfo {
346 nullable: Option<bool>,
347 primary_key: bool,
348}
349
350#[derive(Debug, Clone)]
351struct AnalysisSchemaInfo {
352 columns: HashMap<(String, String), AnalysisColumnInfo>,
353}
354
355impl AnalysisSchemaInfo {
356 fn from_schema(schema: &ValidationSchema) -> Self {
357 let mut columns = HashMap::new();
358
359 for table in &schema.tables {
360 let table_names = validation_table_names(table);
361 let primary_keys: HashSet<String> = table
362 .primary_key
363 .iter()
364 .map(|column| column.to_ascii_lowercase())
365 .collect();
366
367 for column in &table.columns {
368 let info = AnalysisColumnInfo {
369 nullable: column.nullable,
370 primary_key: column.primary_key
371 || primary_keys.contains(&column.name.to_ascii_lowercase()),
372 };
373
374 for table_name in &table_names {
375 columns.insert(
376 (
377 normalize_lookup_name(table_name),
378 normalize_lookup_name(&column.name),
379 ),
380 info.clone(),
381 );
382 }
383 }
384 }
385
386 Self { columns }
387 }
388
389 fn column(&self, table: &str, column: &str) -> Option<&AnalysisColumnInfo> {
390 self.columns
391 .get(&(normalize_lookup_name(table), normalize_lookup_name(column)))
392 }
393}
394
395struct NullabilityContext<'a> {
396 schema: Option<&'a AnalysisSchemaInfo>,
397 nullable_sources: HashSet<String>,
398}
399
400fn top_level_cte_facts(expression: &Expression, dialect: DialectType) -> Result<Vec<CteFact>> {
401 let Some(with_clause) = with_clause(expression) else {
402 return Ok(Vec::new());
403 };
404
405 with_clause
406 .ctes
407 .iter()
408 .map(|cte| {
409 Ok(CteFact {
410 name: cte.alias.name.clone(),
411 columns: cte
412 .columns
413 .iter()
414 .map(|column| column.name.clone())
415 .collect(),
416 body_sql: Dialect::get(dialect).generate(&cte.this)?,
417 output_columns: get_output_column_names(&cte.this),
418 })
419 })
420 .collect()
421}
422
423fn star_projection_facts(
424 expression: &Expression,
425 mapping_schema: Option<&MappingSchema>,
426) -> Vec<StarProjectionFact> {
427 let scope = build_scope(expression);
428 let ordered_sources = ordered_source_names_for_query(expression);
429
430 select_expressions_for_query(expression)
431 .iter()
432 .enumerate()
433 .filter_map(|(index, projection)| {
434 let inner = unwrap_projection_alias(projection);
435 if !projection_is_star(inner) {
436 return None;
437 }
438
439 let table = projection_star_table(inner);
440 let expanded_columns =
441 expanded_star_columns(table.as_deref(), &scope, &ordered_sources, mapping_schema);
442
443 Some(StarProjectionFact {
444 index,
445 table,
446 expanded_columns,
447 })
448 })
449 .collect()
450}
451
452fn expanded_star_columns(
453 star_table: Option<&str>,
454 scope: &Scope,
455 ordered_sources: &[String],
456 mapping_schema: Option<&MappingSchema>,
457) -> Vec<String> {
458 let mut columns = Vec::new();
459 let mut source_names: Vec<String> = if ordered_sources.is_empty() {
460 let mut names: Vec<_> = scope.sources.keys().cloned().collect();
461 names.sort();
462 names
463 } else {
464 ordered_sources.to_vec()
465 };
466
467 source_names.dedup();
468
469 for source_name in source_names {
470 let Some(source) = scope.sources.get(&source_name) else {
471 continue;
472 };
473
474 if let Some(star_table) = star_table {
475 let matches = source_name.eq_ignore_ascii_case(star_table)
476 || source
477 .alias
478 .as_deref()
479 .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
480 || source_table_name(source)
481 .is_some_and(|table| table.eq_ignore_ascii_case(star_table));
482
483 if !matches {
484 continue;
485 }
486 }
487
488 columns.extend(source_columns(source, mapping_schema));
489 }
490
491 columns
492}
493
494fn ordered_source_names_for_query(expression: &Expression) -> Vec<String> {
495 match expression {
496 Expression::Select(select) => ordered_source_names_for_select(select),
497 Expression::Union(union) => ordered_source_names_for_query(&union.left),
498 Expression::Intersect(intersect) => ordered_source_names_for_query(&intersect.left),
499 Expression::Except(except) => ordered_source_names_for_query(&except.left),
500 Expression::Subquery(subquery) => ordered_source_names_for_query(&subquery.this),
501 _ => Vec::new(),
502 }
503}
504
505fn ordered_source_names_for_select(select: &crate::expressions::Select) -> Vec<String> {
506 let mut sources = Vec::new();
507
508 if let Some(from) = &select.from {
509 for expression in &from.expressions {
510 if let Some(source_name) = expression_source_name(expression) {
511 sources.push(source_name);
512 }
513 }
514 }
515
516 for join in &select.joins {
517 if let Some(source_name) = expression_source_name(&join.this) {
518 sources.push(source_name);
519 }
520 }
521
522 sources
523}
524
525fn nullable_source_names(expression: &Expression) -> HashSet<String> {
526 match expression {
527 Expression::Select(select) => nullable_source_names_for_select(select),
528 Expression::Union(union) => nullable_source_names(&union.left),
529 Expression::Intersect(intersect) => nullable_source_names(&intersect.left),
530 Expression::Except(except) => nullable_source_names(&except.left),
531 Expression::Subquery(subquery) => nullable_source_names(&subquery.this),
532 _ => HashSet::new(),
533 }
534}
535
536fn nullable_source_names_for_select(select: &crate::expressions::Select) -> HashSet<String> {
537 let mut nullable = HashSet::new();
538 let mut left_sources = Vec::new();
539
540 if let Some(from) = &select.from {
541 for expression in &from.expressions {
542 if let Some(source_name) = expression_source_name(expression) {
543 left_sources.push(source_name);
544 }
545 }
546 }
547
548 for join in &select.joins {
549 let right_source = expression_source_name(&join.this);
550
551 if join_nullable_left(join.kind) {
552 for source_name in &left_sources {
553 nullable.insert(normalize_lookup_name(source_name));
554 }
555 }
556
557 if join_nullable_right(join.kind) {
558 if let Some(source_name) = &right_source {
559 nullable.insert(normalize_lookup_name(source_name));
560 }
561 }
562
563 if let Some(source_name) = right_source {
564 left_sources.push(source_name);
565 }
566 }
567
568 nullable
569}
570
571fn join_nullable_left(kind: JoinKind) -> bool {
572 matches!(
573 kind,
574 JoinKind::Right
575 | JoinKind::NaturalRight
576 | JoinKind::AsOfRight
577 | JoinKind::Full
578 | JoinKind::NaturalFull
579 | JoinKind::Outer
580 )
581}
582
583fn join_nullable_right(kind: JoinKind) -> bool {
584 matches!(
585 kind,
586 JoinKind::Left
587 | JoinKind::NaturalLeft
588 | JoinKind::AsOfLeft
589 | JoinKind::LeftLateral
590 | JoinKind::OuterApply
591 | JoinKind::LeftArray
592 | JoinKind::Full
593 | JoinKind::NaturalFull
594 | JoinKind::Outer
595 )
596}
597
598fn expression_source_name(expression: &Expression) -> Option<String> {
599 match expression {
600 Expression::Table(table) => table
601 .alias
602 .as_ref()
603 .map(|alias| alias.name.clone())
604 .or_else(|| Some(table.name.name.clone())),
605 Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
606 Expression::Alias(alias) => Some(alias.alias.name.clone()),
607 Expression::Cte(cte) => Some(cte.alias.name.clone()),
608 _ => None,
609 }
610}
611
612fn normalize_lookup_name(name: &str) -> String {
613 name.to_ascii_lowercase()
614}
615
616fn effective_query(expression: Expression) -> Expression {
617 match expression {
618 Expression::Prepare(prepare) => prepare.statement,
619 Expression::Subquery(subquery) if subquery.alias.is_none() => subquery.this,
620 other => other,
621 }
622}
623
624fn ensure_query(expression: &Expression) -> Result<()> {
625 if matches!(
626 expression,
627 Expression::Select(_)
628 | Expression::Union(_)
629 | Expression::Intersect(_)
630 | Expression::Except(_)
631 ) {
632 Ok(())
633 } else {
634 Err(Error::internal(
635 "analyze_query requires a SELECT or set operation query",
636 ))
637 }
638}
639
640fn is_set_operation(expression: &Expression) -> bool {
641 matches!(
642 expression,
643 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
644 )
645}
646
647fn collect_cte_names(expression: &Expression) -> Vec<String> {
648 let mut names = Vec::new();
649 let mut seen = HashSet::new();
650 collect_cte_names_inner(expression, &mut names, &mut seen);
651 names
652}
653
654fn collect_cte_names_inner(
655 expression: &Expression,
656 names: &mut Vec<String>,
657 seen: &mut HashSet<String>,
658) {
659 if let Some(with_clause) = with_clause(expression) {
660 collect_with_names(with_clause, names, seen);
661 }
662
663 match expression {
664 Expression::Union(union) => {
665 collect_cte_names_inner(&union.left, names, seen);
666 collect_cte_names_inner(&union.right, names, seen);
667 }
668 Expression::Intersect(intersect) => {
669 collect_cte_names_inner(&intersect.left, names, seen);
670 collect_cte_names_inner(&intersect.right, names, seen);
671 }
672 Expression::Except(except) => {
673 collect_cte_names_inner(&except.left, names, seen);
674 collect_cte_names_inner(&except.right, names, seen);
675 }
676 Expression::Subquery(subquery) => collect_cte_names_inner(&subquery.this, names, seen),
677 _ => {}
678 }
679}
680
681fn collect_with_names(with_clause: &With, names: &mut Vec<String>, seen: &mut HashSet<String>) {
682 for cte in &with_clause.ctes {
683 if seen.insert(cte.alias.name.clone()) {
684 names.push(cte.alias.name.clone());
685 }
686 collect_cte_names_inner(&cte.this, names, seen);
687 }
688}
689
690fn with_clause(expression: &Expression) -> Option<&With> {
691 match expression {
692 Expression::Select(select) => select.with.as_ref(),
693 Expression::Union(union) => union.with.as_ref(),
694 Expression::Intersect(intersect) => intersect.with.as_ref(),
695 Expression::Except(except) => except.with.as_ref(),
696 _ => None,
697 }
698}
699
700fn projection_facts_for_query(
701 expression: &Expression,
702 scope: &Scope,
703 dialect: DialectType,
704 nullability_context: &NullabilityContext<'_>,
705) -> Vec<ProjectionFact> {
706 let expressions = select_expressions_for_query(expression);
707 let names = get_output_column_names(expression);
708
709 expressions
710 .iter()
711 .enumerate()
712 .map(|(index, projection)| {
713 projection_fact(
714 index,
715 names
716 .get(index)
717 .cloned()
718 .or_else(|| projection_name(projection)),
719 projection,
720 expression,
721 scope,
722 dialect,
723 nullability_context,
724 )
725 })
726 .collect()
727}
728
729fn select_expressions_for_query(expression: &Expression) -> Vec<&Expression> {
730 match expression {
731 Expression::Select(select) => select.expressions.iter().collect(),
732 Expression::Union(union) => select_expressions_for_query(&union.left),
733 Expression::Intersect(intersect) => select_expressions_for_query(&intersect.left),
734 Expression::Except(except) => select_expressions_for_query(&except.left),
735 Expression::Subquery(subquery) => select_expressions_for_query(&subquery.this),
736 _ => Vec::new(),
737 }
738}
739
740fn projection_fact(
741 index: usize,
742 name: Option<String>,
743 projection: &Expression,
744 query: &Expression,
745 scope: &Scope,
746 dialect: DialectType,
747 nullability_context: &NullabilityContext<'_>,
748) -> ProjectionFact {
749 let inner = unwrap_projection_alias(projection);
750 let is_star = projection_is_star(inner);
751 let upstream = lineage_by_index_from_expression(index, query, Some(dialect), false)
752 .map(|node| terminal_references_from_lineage(&node))
753 .ok()
754 .filter(|refs| !refs.is_empty())
755 .unwrap_or_else(|| fallback_column_references(inner, scope));
756
757 ProjectionFact {
758 index,
759 name,
760 is_star,
761 star_table: projection_star_table(inner),
762 transform_kind: transform_kind(inner),
763 transform_function: transform_function_fact(inner, scope, dialect),
764 cast_type: cast_type(inner, dialect),
765 type_hint: projection
766 .inferred_type()
767 .or_else(|| inner.inferred_type())
768 .and_then(|data_type| render_data_type(data_type, dialect)),
769 nullability: projection_nullability(inner, scope, nullability_context),
770 upstream,
771 }
772}
773
774fn transform_function_fact(
775 expression: &Expression,
776 scope: &Scope,
777 dialect: DialectType,
778) -> Option<TransformFunctionFact> {
779 match expression {
780 Expression::Function(function) => Some(transform_function_from_args(
781 &function.name,
782 &function.args,
783 scope,
784 dialect,
785 )),
786 Expression::AggregateFunction(function) => Some(transform_function_from_args(
787 &function.name,
788 &function.args,
789 scope,
790 dialect,
791 )),
792 Expression::WindowFunction(function) => {
793 transform_function_fact(&function.this, scope, dialect)
794 }
795 Expression::DateTrunc(function) => Some(transform_function_from_parts(
796 "DATE_TRUNC",
797 vec![datetime_field_name(&function.unit)],
798 vec![&function.this],
799 scope,
800 dialect,
801 )),
802 Expression::TimestampTrunc(function) => Some(transform_function_from_parts(
803 "TIMESTAMP_TRUNC",
804 vec![datetime_field_name(&function.unit)],
805 vec![&function.this],
806 scope,
807 dialect,
808 )),
809 Expression::TimeTrunc(function) => {
810 let mut args = vec![function.this.as_ref()];
811 if let Some(zone) = function.zone.as_deref() {
812 args.push(zone);
813 }
814 Some(transform_function_from_parts(
815 "TIME_TRUNC",
816 vec![function.unit.clone()],
817 args,
818 scope,
819 dialect,
820 ))
821 }
822 Expression::Extract(function) => Some(transform_function_from_parts(
823 "EXTRACT",
824 vec![datetime_field_name(&function.field)],
825 vec![&function.this],
826 scope,
827 dialect,
828 )),
829 Expression::DateAdd(function) => Some(transform_function_from_parts(
830 "DATE_ADD",
831 Vec::new(),
832 vec![&function.this, &function.interval],
833 scope,
834 dialect,
835 )),
836 Expression::DateSub(function) => Some(transform_function_from_parts(
837 "DATE_SUB",
838 Vec::new(),
839 vec![&function.this, &function.interval],
840 scope,
841 dialect,
842 )),
843 Expression::DateDiff(function) => Some(transform_function_from_parts(
844 "DATE_DIFF",
845 Vec::new(),
846 vec![&function.this, &function.expression],
847 scope,
848 dialect,
849 )),
850 _ => None,
851 }
852}
853
854fn transform_function_from_args(
855 name: &str,
856 args: &[Expression],
857 scope: &Scope,
858 dialect: DialectType,
859) -> TransformFunctionFact {
860 let literal_args = args
861 .iter()
862 .filter_map(|arg| literal_argument(arg, dialect))
863 .collect();
864 transform_function_from_parts(name, literal_args, args.iter().collect(), scope, dialect)
865}
866
867fn transform_function_from_parts(
868 name: &str,
869 literal_args: Vec<String>,
870 args: Vec<&Expression>,
871 scope: &Scope,
872 _dialect: DialectType,
873) -> TransformFunctionFact {
874 let column_args = dedupe_column_refs(
875 args.into_iter()
876 .flat_map(|arg| fallback_column_references(arg, scope))
877 .collect(),
878 );
879
880 TransformFunctionFact {
881 name: name.to_string(),
882 literal_args,
883 column_args,
884 }
885}
886
887fn literal_argument(expression: &Expression, dialect: DialectType) -> Option<String> {
888 match expression {
889 Expression::Literal(literal) => Some(literal.value_str().to_string()),
890 Expression::Boolean(boolean) => Some(boolean.value.to_string()),
891 Expression::Null(_) => Some("NULL".to_string()),
892 Expression::Identifier(identifier) => Some(identifier.name.clone()),
893 Expression::Var(var) => Some(var.this.clone()),
894 Expression::DataType(data_type) => render_data_type(data_type, dialect),
895 _ => None,
896 }
897}
898
899fn datetime_field_name(field: &crate::expressions::DateTimeField) -> String {
900 match field {
901 crate::expressions::DateTimeField::Year => "year".to_string(),
902 crate::expressions::DateTimeField::Month => "month".to_string(),
903 crate::expressions::DateTimeField::Day => "day".to_string(),
904 crate::expressions::DateTimeField::Hour => "hour".to_string(),
905 crate::expressions::DateTimeField::Minute => "minute".to_string(),
906 crate::expressions::DateTimeField::Second => "second".to_string(),
907 crate::expressions::DateTimeField::Millisecond => "millisecond".to_string(),
908 crate::expressions::DateTimeField::Microsecond => "microsecond".to_string(),
909 crate::expressions::DateTimeField::DayOfWeek => "day_of_week".to_string(),
910 crate::expressions::DateTimeField::DayOfYear => "day_of_year".to_string(),
911 crate::expressions::DateTimeField::Week => "week".to_string(),
912 crate::expressions::DateTimeField::WeekWithModifier(modifier) => {
913 format!("week({modifier})")
914 }
915 crate::expressions::DateTimeField::Quarter => "quarter".to_string(),
916 crate::expressions::DateTimeField::Epoch => "epoch".to_string(),
917 crate::expressions::DateTimeField::Timezone => "timezone".to_string(),
918 crate::expressions::DateTimeField::TimezoneHour => "timezone_hour".to_string(),
919 crate::expressions::DateTimeField::TimezoneMinute => "timezone_minute".to_string(),
920 crate::expressions::DateTimeField::Date => "date".to_string(),
921 crate::expressions::DateTimeField::Time => "time".to_string(),
922 crate::expressions::DateTimeField::Custom(name) => name.clone(),
923 }
924}
925
926fn unwrap_projection_alias(expression: &Expression) -> &Expression {
927 match expression {
928 Expression::Alias(alias) => unwrap_projection_alias(&alias.this),
929 Expression::Annotated(annotated) => unwrap_projection_alias(&annotated.this),
930 Expression::Paren(paren) => unwrap_projection_alias(&paren.this),
931 _ => expression,
932 }
933}
934
935fn projection_name(expression: &Expression) -> Option<String> {
936 match expression {
937 Expression::Alias(alias) => Some(alias.alias.name.clone()),
938 Expression::Column(column) => Some(column.name.name.clone()),
939 Expression::Identifier(identifier) => Some(identifier.name.clone()),
940 Expression::Star(_) => Some("*".to_string()),
941 Expression::Annotated(annotated) => projection_name(&annotated.this),
942 _ => None,
943 }
944}
945
946fn projection_is_star(expression: &Expression) -> bool {
947 matches!(expression, Expression::Star(_))
948 || matches!(expression, Expression::Column(column) if column.name.name == "*")
949}
950
951fn projection_star_table(expression: &Expression) -> Option<String> {
952 match expression {
953 Expression::Star(star) => star
954 .table
955 .as_ref()
956 .map(|identifier| identifier.name.clone()),
957 Expression::Column(column) if column.name.name == "*" => column
958 .table
959 .as_ref()
960 .map(|identifier| identifier.name.clone()),
961 _ => None,
962 }
963}
964
965fn transform_kind(expression: &Expression) -> TransformKind {
966 if projection_is_star(expression) {
967 TransformKind::Star
968 } else if is_cast_expression(expression) {
969 TransformKind::Cast
970 } else if contains_aggregate(expression) {
971 TransformKind::Aggregation
972 } else if matches!(
973 expression,
974 Expression::Column(_) | Expression::Identifier(_)
975 ) {
976 TransformKind::Direct
977 } else if is_simple_constant(expression) {
978 TransformKind::Constant
979 } else {
980 TransformKind::Expression
981 }
982}
983
984fn is_cast_expression(expression: &Expression) -> bool {
985 matches!(
986 expression,
987 Expression::Cast(_) | Expression::TryCast(_) | Expression::SafeCast(_)
988 )
989}
990
991fn cast_type(expression: &Expression, dialect: DialectType) -> Option<String> {
992 match expression {
993 Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
994 render_data_type(&cast.to, dialect)
995 }
996 _ => None,
997 }
998}
999
1000fn render_data_type(data_type: &DataType, dialect: DialectType) -> Option<String> {
1001 Dialect::get(dialect)
1002 .generate(&Expression::DataType(data_type.clone()))
1003 .ok()
1004}
1005
1006fn is_simple_constant(expression: &Expression) -> bool {
1007 match expression {
1008 Expression::Literal(_) | Expression::Boolean(_) | Expression::Null(_) => true,
1009 Expression::Cast(cast) | Expression::TryCast(cast) | Expression::SafeCast(cast) => {
1010 is_simple_constant(&cast.this)
1011 }
1012 Expression::Neg(unary) | Expression::BitwiseNot(unary) => is_simple_constant(&unary.this),
1013 _ => false,
1014 }
1015}
1016
1017fn projection_nullability(
1018 expression: &Expression,
1019 scope: &Scope,
1020 context: &NullabilityContext<'_>,
1021) -> ProjectionNullability {
1022 match expression {
1023 Expression::Alias(alias) => projection_nullability(&alias.this, scope, context),
1024 Expression::Annotated(annotated) => projection_nullability(&annotated.this, scope, context),
1025 Expression::Paren(paren) => projection_nullability(&paren.this, scope, context),
1026 Expression::Literal(_) | Expression::Boolean(_) => ProjectionNullability::NonNull,
1027 Expression::Null(_) => ProjectionNullability::Nullable,
1028 Expression::Count(_) | Expression::CountIf(_) => ProjectionNullability::NonNull,
1029 Expression::Cast(cast) => projection_nullability(&cast.this, scope, context),
1030 Expression::TryCast(_) | Expression::SafeCast(_) => ProjectionNullability::Unknown,
1031 Expression::Column(column) => column_nullability(
1032 &column.name.name,
1033 column.table.as_ref().map(|table| table.name.as_str()),
1034 scope,
1035 context,
1036 ),
1037 Expression::Identifier(identifier) => {
1038 column_nullability(&identifier.name, None, scope, context)
1039 }
1040 Expression::Coalesce(func) => coalesce_nullability(&func.expressions, scope, context),
1041 _ => ProjectionNullability::Unknown,
1042 }
1043}
1044
1045fn column_nullability(
1046 column_name: &str,
1047 source_name: Option<&str>,
1048 scope: &Scope,
1049 context: &NullabilityContext<'_>,
1050) -> ProjectionNullability {
1051 let resolved_source_name = source_name
1052 .map(str::to_string)
1053 .or_else(|| single_scope_source_name(scope));
1054
1055 if let Some(source_name) = &resolved_source_name {
1056 if context
1057 .nullable_sources
1058 .contains(&normalize_lookup_name(source_name))
1059 {
1060 return ProjectionNullability::Nullable;
1061 }
1062 }
1063
1064 let Some(schema) = context.schema else {
1065 return ProjectionNullability::Unknown;
1066 };
1067
1068 let table_name = resolved_source_name
1069 .as_ref()
1070 .and_then(|name| scope.sources.get(name).and_then(source_table_name))
1071 .or(resolved_source_name);
1072
1073 let Some(table_name) = table_name else {
1074 return ProjectionNullability::Unknown;
1075 };
1076
1077 match schema.column(&table_name, column_name) {
1078 Some(info) if info.primary_key || info.nullable == Some(false) => {
1079 ProjectionNullability::NonNull
1080 }
1081 Some(info) if info.nullable == Some(true) => ProjectionNullability::Nullable,
1082 Some(_) | None => ProjectionNullability::Unknown,
1083 }
1084}
1085
1086fn single_scope_source_name(scope: &Scope) -> Option<String> {
1087 if scope.sources.len() == 1 {
1088 scope.sources.keys().next().cloned()
1089 } else {
1090 None
1091 }
1092}
1093
1094fn coalesce_nullability(
1095 expressions: &[Expression],
1096 scope: &Scope,
1097 context: &NullabilityContext<'_>,
1098) -> ProjectionNullability {
1099 if expressions.is_empty() {
1100 return ProjectionNullability::Unknown;
1101 }
1102
1103 let mut all_nullable = true;
1104
1105 for expression in expressions {
1106 match projection_nullability(unwrap_projection_alias(expression), scope, context) {
1107 ProjectionNullability::NonNull => return ProjectionNullability::NonNull,
1108 ProjectionNullability::Nullable => {}
1109 ProjectionNullability::Unknown => all_nullable = false,
1110 }
1111 }
1112
1113 if all_nullable {
1114 ProjectionNullability::Nullable
1115 } else {
1116 ProjectionNullability::Unknown
1117 }
1118}
1119
1120fn terminal_references_from_lineage(node: &LineageNode) -> Vec<ColumnReferenceFact> {
1121 let mut refs = Vec::new();
1122 collect_terminal_references(node, &mut refs);
1123 dedupe_column_refs(refs)
1124}
1125
1126fn collect_terminal_references(node: &LineageNode, refs: &mut Vec<ColumnReferenceFact>) {
1127 if node.downstream.is_empty() {
1128 if let Some(reference) = column_reference_from_lineage_node(node) {
1129 refs.push(reference);
1130 }
1131 return;
1132 }
1133
1134 for child in &node.downstream {
1135 collect_terminal_references(child, refs);
1136 }
1137}
1138
1139fn column_reference_from_lineage_node(node: &LineageNode) -> Option<ColumnReferenceFact> {
1140 match &node.expression {
1141 Expression::Column(column) => {
1142 let source_name = non_empty_string(node.source_name.clone());
1143 let table =
1144 lineage_node_table(node).or_else(|| column.table.as_ref().map(|t| t.name.clone()));
1145 let confidence = if node.source_kind == SourceKind::Unknown && source_name.is_none() {
1146 ReferenceConfidence::Unknown
1147 } else {
1148 ReferenceConfidence::Resolved
1149 };
1150 Some(ColumnReferenceFact {
1151 source_name,
1152 source_alias: node.source_alias.clone(),
1153 source_kind: node.source_kind,
1154 table,
1155 column: column.name.name.clone(),
1156 unqualified: column.table.is_none(),
1157 confidence,
1158 })
1159 }
1160 Expression::Star(_) => Some(ColumnReferenceFact {
1161 source_name: non_empty_string(node.source_name.clone()),
1162 source_alias: node.source_alias.clone(),
1163 source_kind: node.source_kind,
1164 table: lineage_node_table(node),
1165 column: "*".to_string(),
1166 unqualified: true,
1167 confidence: if node.source_kind == SourceKind::Unknown {
1168 ReferenceConfidence::Unknown
1169 } else {
1170 ReferenceConfidence::Resolved
1171 },
1172 }),
1173 _ => None,
1174 }
1175}
1176
1177fn lineage_node_table(node: &LineageNode) -> Option<String> {
1178 match &node.source {
1179 Expression::Table(table) => Some(table_name(table)),
1180 _ => None,
1181 }
1182}
1183
1184fn fallback_column_references(expression: &Expression, scope: &Scope) -> Vec<ColumnReferenceFact> {
1185 let mut refs = Vec::new();
1186 let source_count = scope.sources.len();
1187 let single_source = if source_count == 1 {
1188 scope.sources.iter().next()
1189 } else {
1190 None
1191 };
1192
1193 for column_expr in expression.find_all(|candidate| matches!(candidate, Expression::Column(_))) {
1194 if let Expression::Column(column) = column_expr {
1195 if column.name.name == "*" {
1196 continue;
1197 }
1198 let source = column
1199 .table
1200 .as_ref()
1201 .and_then(|table| scope.sources.get(&table.name));
1202 let (source_name, source_alias, source_kind, table, confidence) =
1203 if let Some(table_identifier) = &column.table {
1204 if let Some(source) = source {
1205 (
1206 Some(table_identifier.name.clone()),
1207 source.alias.clone(),
1208 source.kind,
1209 source_table_name(source)
1210 .or_else(|| Some(table_identifier.name.clone())),
1211 ReferenceConfidence::Resolved,
1212 )
1213 } else {
1214 (
1215 Some(table_identifier.name.clone()),
1216 None,
1217 SourceKind::Unknown,
1218 Some(table_identifier.name.clone()),
1219 ReferenceConfidence::Unknown,
1220 )
1221 }
1222 } else if let Some((name, source)) = single_source {
1223 (
1224 Some(name.clone()),
1225 source.alias.clone(),
1226 source.kind,
1227 source_table_name(source).or_else(|| Some(name.clone())),
1228 ReferenceConfidence::Resolved,
1229 )
1230 } else if source_count > 1 {
1231 (
1232 None,
1233 None,
1234 SourceKind::Unknown,
1235 None,
1236 ReferenceConfidence::Ambiguous,
1237 )
1238 } else {
1239 (
1240 None,
1241 None,
1242 SourceKind::Unknown,
1243 None,
1244 ReferenceConfidence::Unknown,
1245 )
1246 };
1247
1248 refs.push(ColumnReferenceFact {
1249 source_name,
1250 source_alias,
1251 source_kind,
1252 table,
1253 column: column.name.name.clone(),
1254 unqualified: column.table.is_none(),
1255 confidence,
1256 });
1257 }
1258 }
1259
1260 dedupe_column_refs(refs)
1261}
1262
1263fn dedupe_column_refs(refs: Vec<ColumnReferenceFact>) -> Vec<ColumnReferenceFact> {
1264 let mut seen = HashSet::new();
1265 let mut deduped = Vec::new();
1266
1267 for reference in refs {
1268 let key = (
1269 reference.source_name.clone(),
1270 reference.source_alias.clone(),
1271 reference.table.clone(),
1272 reference.column.clone(),
1273 format!("{:?}", reference.source_kind),
1274 reference.unqualified,
1275 format!("{:?}", reference.confidence),
1276 );
1277 if seen.insert(key) {
1278 deduped.push(reference);
1279 }
1280 }
1281
1282 deduped
1283}
1284
1285fn relation_facts(
1286 scope: &Scope,
1287 mapping_schema: Option<&crate::schema::MappingSchema>,
1288) -> Vec<RelationFact> {
1289 let mut relations = Vec::new();
1290 let mut seen = HashSet::new();
1291 collect_relation_facts(scope, mapping_schema, &mut seen, &mut relations);
1292
1293 relations.sort_by(|left, right| {
1294 left.name
1295 .cmp(&right.name)
1296 .then_with(|| left.alias.cmp(&right.alias))
1297 });
1298 relations
1299}
1300
1301fn collect_relation_facts(
1302 scope: &Scope,
1303 mapping_schema: Option<&crate::schema::MappingSchema>,
1304 seen: &mut HashSet<String>,
1305 relations: &mut Vec<RelationFact>,
1306) {
1307 for relation in scope.sources.iter().map(|(source_name, source)| {
1308 let identity = source_table_identity(source);
1309 RelationFact {
1310 name: source
1311 .lineage_name
1312 .clone()
1313 .or_else(|| identity.as_ref().map(|identity| identity.name.clone()))
1314 .unwrap_or_else(|| source_name.clone()),
1315 alias: source.alias.clone().or_else(|| source_alias(source)),
1316 kind: source.kind,
1317 columns: source_columns(source, mapping_schema),
1318 catalog: identity
1319 .as_ref()
1320 .and_then(|identity| identity.catalog.clone()),
1321 schema: identity
1322 .as_ref()
1323 .and_then(|identity| identity.schema.clone()),
1324 table: identity
1325 .as_ref()
1326 .and_then(|identity| identity.table.clone()),
1327 }
1328 }) {
1329 let key = format!("{:?}|{}|{:?}", relation.kind, relation.name, relation.alias);
1330 if seen.insert(key) {
1331 relations.push(relation);
1332 }
1333 }
1334
1335 for branch_scope in &scope.union_scopes {
1336 collect_relation_facts(branch_scope, mapping_schema, seen, relations);
1337 }
1338}
1339
1340fn base_table_facts(
1341 scope: &Scope,
1342 mapping_schema: Option<&crate::schema::MappingSchema>,
1343) -> Vec<RelationFact> {
1344 let mut relations = Vec::new();
1345 let mut seen = HashSet::new();
1346
1347 collect_base_table_facts(scope, mapping_schema, &mut seen, &mut relations);
1348
1349 relations.sort_by(|left, right| left.name.cmp(&right.name));
1350 relations
1351}
1352
1353fn collect_base_table_facts(
1354 scope: &Scope,
1355 mapping_schema: Option<&crate::schema::MappingSchema>,
1356 seen: &mut HashSet<String>,
1357 relations: &mut Vec<RelationFact>,
1358) {
1359 for source in scope.sources.values() {
1360 if source.kind != SourceKind::Table {
1361 continue;
1362 }
1363
1364 let Some(identity) = source_table_identity(source) else {
1365 continue;
1366 };
1367
1368 if seen.insert(identity.name.clone()) {
1369 relations.push(RelationFact {
1370 name: identity.name,
1371 alias: source.alias.clone().or_else(|| source_alias(source)),
1372 kind: SourceKind::Table,
1373 columns: source_columns(source, mapping_schema),
1374 catalog: identity.catalog,
1375 schema: identity.schema,
1376 table: identity.table,
1377 });
1378 }
1379 }
1380
1381 for child_scope in scope
1382 .cte_scopes
1383 .iter()
1384 .chain(scope.union_scopes.iter())
1385 .chain(scope.table_scopes.iter())
1386 .chain(scope.derived_table_scopes.iter())
1387 .chain(scope.subquery_scopes.iter())
1388 {
1389 collect_base_table_facts(child_scope, mapping_schema, seen, relations);
1390 }
1391}
1392
1393fn source_columns(
1394 source: &SourceInfo,
1395 mapping_schema: Option<&crate::schema::MappingSchema>,
1396) -> Vec<String> {
1397 match &source.expression {
1398 Expression::Table(table) => mapping_schema
1399 .and_then(|schema| schema.column_names(&table_name(table)).ok())
1400 .unwrap_or_default(),
1401 Expression::Select(_)
1402 | Expression::Union(_)
1403 | Expression::Intersect(_)
1404 | Expression::Except(_) => get_output_column_names(&source.expression),
1405 Expression::Subquery(subquery) => get_output_column_names(&subquery.this),
1406 Expression::Cte(cte) if !cte.columns.is_empty() => cte
1407 .columns
1408 .iter()
1409 .map(|column| column.name.clone())
1410 .collect(),
1411 Expression::Cte(cte) => get_output_column_names(&cte.this),
1412 _ => Vec::new(),
1413 }
1414}
1415
1416fn source_table_name(source: &SourceInfo) -> Option<String> {
1417 source_table_identity(source).map(|identity| identity.name)
1418}
1419
1420fn source_alias(source: &SourceInfo) -> Option<String> {
1421 match &source.expression {
1422 Expression::Table(table) => table.alias.as_ref().map(|alias| alias.name.clone()),
1423 Expression::Subquery(subquery) => subquery.alias.as_ref().map(|alias| alias.name.clone()),
1424 _ => None,
1425 }
1426}
1427
1428fn table_name(table: &TableRef) -> String {
1429 let mut parts = Vec::new();
1430 if let Some(catalog) = &table.catalog {
1431 parts.push(catalog.name.clone());
1432 }
1433 if let Some(schema) = &table.schema {
1434 parts.push(schema.name.clone());
1435 }
1436 parts.push(table.name.name.clone());
1437 parts.join(".")
1438}
1439
1440#[derive(Debug, Clone)]
1441struct RelationIdentity {
1442 name: String,
1443 catalog: Option<String>,
1444 schema: Option<String>,
1445 table: Option<String>,
1446}
1447
1448fn source_table_identity(source: &SourceInfo) -> Option<RelationIdentity> {
1449 match &source.expression {
1450 Expression::Table(table) => Some(table_identity(table)),
1451 _ => None,
1452 }
1453}
1454
1455fn table_identity(table: &TableRef) -> RelationIdentity {
1456 RelationIdentity {
1457 name: table_name(table),
1458 catalog: table.catalog.as_ref().map(|catalog| catalog.name.clone()),
1459 schema: table.schema.as_ref().map(|schema| schema.name.clone()),
1460 table: Some(table.name.name.clone()),
1461 }
1462}
1463
1464fn set_operation_facts(
1465 expression: &Expression,
1466 scope: &Scope,
1467 dialect: DialectType,
1468) -> Vec<SetOperationFact> {
1469 let mut facts = Vec::new();
1470 collect_set_operation_facts(expression, scope, dialect, &mut facts);
1471 facts
1472}
1473
1474fn collect_set_operation_facts(
1475 expression: &Expression,
1476 scope: &Scope,
1477 dialect: DialectType,
1478 facts: &mut Vec<SetOperationFact>,
1479) {
1480 match expression {
1481 Expression::Union(union) => {
1482 facts.push(SetOperationFact {
1483 kind: "union".to_string(),
1484 all: union.all,
1485 distinct: union.distinct,
1486 output_columns: get_output_column_names(expression),
1487 branches: set_operation_branches(&union.left, &union.right, scope, dialect),
1488 });
1489 collect_set_operation_facts(&union.left, scope, dialect, facts);
1490 collect_set_operation_facts(&union.right, scope, dialect, facts);
1491 }
1492 Expression::Intersect(intersect) => {
1493 facts.push(SetOperationFact {
1494 kind: "intersect".to_string(),
1495 all: intersect.all,
1496 distinct: intersect.distinct,
1497 output_columns: get_output_column_names(expression),
1498 branches: set_operation_branches(&intersect.left, &intersect.right, scope, dialect),
1499 });
1500 collect_set_operation_facts(&intersect.left, scope, dialect, facts);
1501 collect_set_operation_facts(&intersect.right, scope, dialect, facts);
1502 }
1503 Expression::Except(except) => {
1504 facts.push(SetOperationFact {
1505 kind: "except".to_string(),
1506 all: except.all,
1507 distinct: except.distinct,
1508 output_columns: get_output_column_names(expression),
1509 branches: set_operation_branches(&except.left, &except.right, scope, dialect),
1510 });
1511 collect_set_operation_facts(&except.left, scope, dialect, facts);
1512 collect_set_operation_facts(&except.right, scope, dialect, facts);
1513 }
1514 Expression::Subquery(subquery) => {
1515 collect_set_operation_facts(&subquery.this, scope, dialect, facts);
1516 }
1517 _ => {}
1518 }
1519}
1520
1521fn set_operation_branches(
1522 left: &Expression,
1523 right: &Expression,
1524 scope: &Scope,
1525 dialect: DialectType,
1526) -> Vec<SetOperationBranchFact> {
1527 vec![
1528 SetOperationBranchFact {
1529 index: 0,
1530 projections: projection_facts_for_branch(left, scope, dialect),
1531 },
1532 SetOperationBranchFact {
1533 index: 1,
1534 projections: projection_facts_for_branch(right, scope, dialect),
1535 },
1536 ]
1537}
1538
1539fn projection_facts_for_branch(
1540 expression: &Expression,
1541 root_scope: &Scope,
1542 dialect: DialectType,
1543) -> Vec<ProjectionFact> {
1544 let branch_scope = build_scope(expression);
1545 let scope = if branch_scope.sources.is_empty() {
1546 root_scope
1547 } else {
1548 &branch_scope
1549 };
1550 let nullability_context = NullabilityContext {
1551 schema: None,
1552 nullable_sources: nullable_source_names(expression),
1553 };
1554 projection_facts_for_query(expression, scope, dialect, &nullability_context)
1555}
1556
1557fn non_empty_string(value: String) -> Option<String> {
1558 if value.is_empty() {
1559 None
1560 } else {
1561 Some(value)
1562 }
1563}