1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, JoinKind, NamedWindow, Select};
10use crate::generator::Generator;
11use crate::optimizer::annotate_types::annotate_types;
12use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
13use crate::schema::{normalize_name, Schema};
14use crate::scope::{
15 build_scope, find_all_in_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind,
16};
17use crate::{Error, Result};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, HashSet};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct LineageNode {
24 pub name: String,
26 pub expression: Expression,
28 pub source: Expression,
30 pub downstream: Vec<LineageNode>,
32 pub source_name: String,
34 pub source_kind: SourceKind,
36 #[serde(default, skip_serializing_if = "Option::is_none")]
38 pub source_alias: Option<String>,
39 pub reference_node_name: String,
41}
42
43impl LineageNode {
44 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
46 Self {
47 name: name.into(),
48 expression,
49 source,
50 downstream: Vec::new(),
51 source_name: String::new(),
52 source_kind: SourceKind::Unknown,
53 source_alias: None,
54 reference_node_name: String::new(),
55 }
56 }
57
58 pub fn walk(&self) -> LineageWalker<'_> {
60 LineageWalker { stack: vec![self] }
61 }
62
63 pub fn downstream_names(&self) -> Vec<String> {
65 self.downstream.iter().map(|n| n.name.clone()).collect()
66 }
67}
68
69fn source_kind_for_scope_context(
70 scope: &Scope,
71 source_name: &str,
72 reference_node_name: &str,
73) -> SourceKind {
74 if source_name.is_empty() && reference_node_name.is_empty() {
75 return SourceKind::Root;
76 }
77 if let Some(source_info) = scope.sources.get(source_name) {
78 return source_info.kind;
79 }
80 if scope.cte_sources.contains_key(source_name) {
81 return SourceKind::Cte;
82 }
83 match scope.scope_type {
84 ScopeType::Cte => SourceKind::Cte,
85 ScopeType::DerivedTable => SourceKind::DerivedTable,
86 ScopeType::Udtf => SourceKind::Virtual,
87 _ => SourceKind::Unknown,
88 }
89}
90
91fn apply_scope_context(
92 node: &mut LineageNode,
93 scope: &Scope,
94 source_name: &str,
95 reference_node_name: &str,
96) {
97 node.source_name = source_name.to_string();
98 node.reference_node_name = reference_node_name.to_string();
99 node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
100}
101
102pub struct LineageWalker<'a> {
104 stack: Vec<&'a LineageNode>,
105}
106
107impl<'a> Iterator for LineageWalker<'a> {
108 type Item = &'a LineageNode;
109
110 fn next(&mut self) -> Option<Self::Item> {
111 if let Some(node) = self.stack.pop() {
112 for child in node.downstream.iter().rev() {
114 self.stack.push(child);
115 }
116 Some(node)
117 } else {
118 None
119 }
120 }
121}
122
123enum ColumnRef<'a> {
129 Name(&'a str),
130 Index(usize),
131}
132
133pub fn lineage(
159 column: &str,
160 sql: &Expression,
161 dialect: Option<DialectType>,
162 trim_selects: bool,
163) -> Result<LineageNode> {
164 let mut owned = lineage_normalized_expression(sql);
165 if has_lineage_with_clause(&owned) {
167 expand_cte_stars(&mut owned, None);
168 }
169 lineage_from_expression(column, &owned, dialect, trim_selects)
170}
171
172pub fn lineage_with_schema(
188 column: &str,
189 sql: &Expression,
190 schema: Option<&dyn Schema>,
191 dialect: Option<DialectType>,
192 trim_selects: bool,
193) -> Result<LineageNode> {
194 let normalized_expression = lineage_normalized_expression(sql);
195 let mut qualified_expression = if let Some(schema) = schema {
196 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197 QualifyColumnsOptions::new().with_dialect(dialect_type)
198 } else {
199 QualifyColumnsOptions::new()
200 };
201
202 qualify_columns(normalized_expression.clone(), schema, &options).map_err(|e| {
203 Error::internal(format!("Lineage qualification failed with schema: {}", e))
204 })?
205 } else {
206 normalized_expression
207 };
208
209 annotate_types(&mut qualified_expression, schema, dialect);
211
212 expand_cte_stars(&mut qualified_expression, schema);
215
216 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220 column: &str,
221 sql: &Expression,
222 dialect: Option<DialectType>,
223 trim_selects: bool,
224) -> Result<LineageNode> {
225 let scope = build_scope(sql);
226 to_node(
227 ColumnRef::Name(column),
228 &scope,
229 dialect,
230 "",
231 "",
232 "",
233 trim_selects,
234 )
235}
236
237pub(crate) fn lineage_by_index_from_expression(
238 column_index: usize,
239 sql: &Expression,
240 dialect: Option<DialectType>,
241 trim_selects: bool,
242) -> Result<LineageNode> {
243 let normalized = lineage_normalized_expression(sql);
244 let scope = build_scope(&normalized);
245 to_node(
246 ColumnRef::Index(column_index),
247 &scope,
248 dialect,
249 "",
250 "",
251 "",
252 trim_selects,
253 )
254}
255
256fn lineage_normalized_expression(sql: &Expression) -> Expression {
257 match sql {
258 Expression::Prepare(prepare) => lineage_normalized_expression(&prepare.statement),
259 Expression::CreateTable(create) => create
260 .as_select
261 .as_ref()
262 .map(|query| attach_with_to_query(query.clone(), create.with_cte.clone()))
263 .unwrap_or_else(|| sql.clone()),
264 Expression::CreateView(create) => lineage_normalized_expression(&create.query),
265 Expression::Insert(insert) => insert
266 .query
267 .as_ref()
268 .map(|query| attach_with_to_query(query.clone(), insert.with.clone()))
269 .unwrap_or_else(|| sql.clone()),
270 _ => sql.clone(),
271 }
272}
273
274fn attach_with_to_query(
275 mut query: Expression,
276 with: Option<crate::expressions::With>,
277) -> Expression {
278 if let Some(with) = with {
279 attach_with_to_query_mut(&mut query, with);
280 }
281 query
282}
283
284fn attach_with_to_query_mut(query: &mut Expression, with: crate::expressions::With) {
285 match query {
286 Expression::Select(select) => {
287 if select.with.is_none() {
288 select.with = Some(with);
289 }
290 }
291 Expression::Union(union) => {
292 if union.with.is_none() {
293 union.with = Some(with);
294 }
295 }
296 Expression::Intersect(intersect) => {
297 if intersect.with.is_none() {
298 intersect.with = Some(with);
299 }
300 }
301 Expression::Except(except) => {
302 if except.with.is_none() {
303 except.with = Some(with);
304 }
305 }
306 Expression::Paren(paren) => attach_with_to_query_mut(&mut paren.this, with),
307 _ => {}
308 }
309}
310
311fn has_lineage_with_clause(expr: &Expression) -> bool {
312 match expr {
313 Expression::Select(select) => select.with.is_some(),
314 Expression::Union(union) => {
315 union.with.is_some()
316 || has_lineage_with_clause(&union.left)
317 || has_lineage_with_clause(&union.right)
318 }
319 Expression::Intersect(intersect) => {
320 intersect.with.is_some()
321 || has_lineage_with_clause(&intersect.left)
322 || has_lineage_with_clause(&intersect.right)
323 }
324 Expression::Except(except) => {
325 except.with.is_some()
326 || has_lineage_with_clause(&except.left)
327 || has_lineage_with_clause(&except.right)
328 }
329 Expression::Paren(paren) => has_lineage_with_clause(&paren.this),
330 _ => false,
331 }
332}
333
334fn normalize_cte_name(ident: &Identifier) -> String {
344 if ident.quoted {
345 ident.name.clone()
346 } else {
347 ident.name.to_lowercase()
348 }
349}
350
351pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
363 if let Expression::Prepare(prepare) = expr {
364 expand_cte_stars(&mut prepare.statement, schema);
365 return;
366 }
367
368 let select = match expr {
369 Expression::Select(s) => s,
370 _ => return,
371 };
372
373 let with = match &mut select.with {
374 Some(w) => w,
375 None => return,
376 };
377
378 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
379
380 for cte in &mut with.ctes {
381 let cte_name = normalize_cte_name(&cte.alias);
382
383 if !cte.columns.is_empty() {
385 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
386 resolved_cte_columns.insert(cte_name, cols);
387 continue;
388 }
389
390 if with.recursive {
396 let is_self_referencing =
397 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
398 let body_sources = get_select_sources(body_select);
399 body_sources.iter().any(|s| s.normalized == cte_name)
400 } else {
401 false
402 };
403 if is_self_referencing {
404 continue;
405 }
406 }
407
408 let body_select = match get_leftmost_select_mut(&mut cte.this) {
410 Some(s) => s,
411 None => continue,
412 };
413
414 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
415 resolved_cte_columns.insert(cte_name, columns);
416 }
417
418 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
420}
421
422fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
427 let mut current = expr;
428 for _ in 0..MAX_LINEAGE_DEPTH {
429 match current {
430 Expression::Select(s) => return Some(s),
431 Expression::Union(u) => current = &mut u.left,
432 Expression::Intersect(i) => current = &mut i.left,
433 Expression::Except(e) => current = &mut e.left,
434 Expression::Paren(p) => current = &mut p.this,
435 _ => return None,
436 }
437 }
438 None
439}
440
441fn rewrite_stars_in_select(
445 select: &mut Select,
446 resolved_ctes: &HashMap<String, Vec<String>>,
447 schema: Option<&dyn Schema>,
448) -> Vec<String> {
449 let has_star = select
454 .expressions
455 .iter()
456 .any(|e| matches!(e, Expression::Star(_)));
457 let has_qualified_star = select
458 .expressions
459 .iter()
460 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
461
462 if !has_star && !has_qualified_star {
463 return select
465 .expressions
466 .iter()
467 .filter_map(get_expression_output_name)
468 .collect();
469 }
470
471 let sources = get_select_sources(select);
472 let mut new_expressions = Vec::new();
473 let mut result_columns = Vec::new();
474
475 for expr in &select.expressions {
476 match expr {
477 Expression::Star(star) => {
478 let qual = star.table.as_ref();
479 if let Some(expanded) =
480 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
481 {
482 for (src_alias, col_name) in &expanded {
483 let table_id = Identifier::new(src_alias);
484 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
485 result_columns.push(col_name.clone());
486 }
487 } else {
488 new_expressions.push(expr.clone());
489 result_columns.push("*".to_string());
490 }
491 }
492 Expression::Column(c) if c.name.name == "*" => {
493 let qual = c.table.as_ref();
494 if let Some(expanded) =
495 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
496 {
497 for (_src_alias, col_name) in &expanded {
498 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
500 result_columns.push(col_name.clone());
501 }
502 } else {
503 new_expressions.push(expr.clone());
504 result_columns.push("*".to_string());
505 }
506 }
507 _ => {
508 new_expressions.push(expr.clone());
509 if let Some(name) = get_expression_output_name(expr) {
510 result_columns.push(name);
511 }
512 }
513 }
514 }
515
516 select.expressions = new_expressions;
517 result_columns
518}
519
520fn expand_star_from_sources(
525 qualifier: Option<&Identifier>,
526 sources: &[SourceInfo],
527 resolved_ctes: &HashMap<String, Vec<String>>,
528 schema: Option<&dyn Schema>,
529) -> Option<Vec<(String, String)>> {
530 let mut expanded = Vec::new();
531
532 if let Some(qual) = qualifier {
533 let qual_normalized = normalize_cte_name(qual);
535 for src in sources {
536 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
537 if let Some(cols) = resolved_ctes.get(&src.normalized) {
539 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
540 return Some(expanded);
541 }
542 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
544 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
545 return Some(expanded);
546 }
547 }
548 }
549 None
550 } else {
551 let mut any_expanded = false;
557 for src in sources {
558 if let Some(cols) = resolved_ctes.get(&src.normalized) {
559 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
560 any_expanded = true;
561 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
562 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
563 any_expanded = true;
564 } else {
565 return None;
566 }
567 }
568 if any_expanded {
569 Some(expanded)
570 } else {
571 None
572 }
573 }
574}
575
576fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
578 let schema = schema?;
579 if fq_name.is_empty() {
580 return None;
581 }
582 schema
583 .column_names(fq_name)
584 .ok()
585 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
586}
587
588fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
590 Expression::Column(Box::new(crate::expressions::Column {
591 name: Identifier::new(name),
592 table: table.cloned(),
593 join_mark: false,
594 trailing_comments: Vec::new(),
595 span: None,
596 inferred_type: None,
597 }))
598}
599
600fn get_expression_output_name(expr: &Expression) -> Option<String> {
602 match expr {
603 Expression::Alias(a) => Some(a.alias.name.clone()),
604 Expression::Column(c) => Some(c.name.name.clone()),
605 Expression::Identifier(id) => Some(id.name.clone()),
606 Expression::Star(_) => Some("*".to_string()),
607 _ => None,
608 }
609}
610
611struct SourceInfo {
613 alias: String,
614 quoted: bool,
621 normalized: String,
623 fq_name: String,
625}
626
627fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
630 let mut sources = Vec::new();
631
632 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
633 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
634 SourceInfo {
635 alias: alias.name.clone(),
636 quoted: alias.quoted,
637 normalized: normalize_cte_name(alias),
638 fq_name: alias.name.clone(),
639 }
640 }
641
642 fn named_virtual_source_info(alias: &str) -> SourceInfo {
643 SourceInfo {
644 alias: alias.to_string(),
645 quoted: false,
646 normalized: alias.to_lowercase(),
647 fq_name: alias.to_string(),
648 }
649 }
650
651 match expr {
652 Expression::Table(t) => {
653 let normalized = normalize_cte_name(&t.name);
654 let alias = t
655 .alias
656 .as_ref()
657 .map(|a| a.name.clone())
658 .unwrap_or_else(|| t.name.name.clone());
659 let mut parts = Vec::new();
660 if let Some(catalog) = &t.catalog {
661 parts.push(catalog.name.clone());
662 }
663 if let Some(schema) = &t.schema {
664 parts.push(schema.name.clone());
665 }
666 parts.push(t.name.name.clone());
667 let fq_name = parts.join(".");
668 Some(SourceInfo {
669 alias,
670 quoted: t.name.quoted,
671 normalized,
672 fq_name,
673 })
674 }
675 Expression::Subquery(s) => {
676 let alias_identifier = s.alias.as_ref()?;
677 let alias = alias_identifier.name.clone();
678 let normalized = alias.to_lowercase();
679 let fq_name = alias.clone();
680 Some(SourceInfo {
681 alias,
682 quoted: alias_identifier.quoted,
683 normalized,
684 fq_name,
685 })
686 }
687 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
688 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
689 Some(virtual_source_info(&a.alias))
690 }
691 Expression::Alias(a) if is_query_like_relation(&a.this) => {
692 Some(virtual_source_info(&a.alias))
693 }
694 Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
695 Expression::LateralView(lateral_view) => lateral_view
696 .table_alias
697 .as_ref()
698 .or_else(|| lateral_view.column_aliases.first())
699 .map(virtual_source_info),
700 Expression::Pivot(pivot) => {
701 let alias = pivot_lineage_source_name(
702 &pivot.this,
703 pivot.alias.as_ref().map(|alias| alias.name.as_str()),
704 );
705 Some(SourceInfo {
706 alias: alias.clone(),
707 quoted: false,
708 normalized: alias.to_lowercase(),
709 fq_name: alias,
710 })
711 }
712 Expression::Unpivot(unpivot) => {
713 let alias = pivot_lineage_source_name(
714 &unpivot.this,
715 unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
716 );
717 Some(SourceInfo {
718 alias: alias.clone(),
719 quoted: false,
720 normalized: alias.to_lowercase(),
721 fq_name: alias,
722 })
723 }
724 Expression::Paren(p) => extract_source(&p.this),
725 _ => None,
726 }
727 }
728
729 if let Some(from) = &select.from {
730 for expr in &from.expressions {
731 if let Some(info) = extract_source(expr) {
732 sources.push(info);
733 }
734 }
735 }
736 for join in &select.joins {
737 if is_semi_or_anti_join_kind(join.kind) {
738 continue;
739 }
740 if let Some(info) = extract_source(&join.this) {
741 sources.push(info);
742 }
743 }
744 for lateral_view in &select.lateral_views {
745 if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
746 {
747 sources.push(info);
748 }
749 }
750 sources
751}
752
753fn pivot_lineage_source_name(source: &Expression, explicit_alias: Option<&str>) -> String {
754 if let Some(alias) = explicit_alias {
755 return alias.to_string();
756 }
757
758 match source {
759 Expression::Table(table) => table
760 .alias
761 .as_ref()
762 .map(|alias| alias.name.clone())
763 .unwrap_or_else(|| table.name.name.clone()),
764 Expression::Subquery(subquery) => subquery
765 .alias
766 .as_ref()
767 .map(|alias| alias.name.clone())
768 .unwrap_or_else(|| "_0".to_string()),
769 Expression::Paren(paren) => pivot_lineage_source_name(&paren.this, explicit_alias),
770 _ => "_0".to_string(),
771 }
772}
773
774pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
776 let mut tables = HashSet::new();
777 collect_source_tables(node, &mut tables);
778 tables
779}
780
781pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
783 if let Expression::Table(table) = &node.source {
784 tables.insert(table.name.name.clone());
785 }
786 for child in &node.downstream {
787 collect_source_tables(child, tables);
788 }
789}
790
791const MAX_LINEAGE_DEPTH: usize = 64;
798
799fn to_node(
801 column: ColumnRef<'_>,
802 scope: &Scope,
803 dialect: Option<DialectType>,
804 scope_name: &str,
805 source_name: &str,
806 reference_node_name: &str,
807 trim_selects: bool,
808) -> Result<LineageNode> {
809 to_node_inner(
810 column,
811 scope,
812 dialect,
813 scope_name,
814 source_name,
815 reference_node_name,
816 trim_selects,
817 &[],
818 0,
819 )
820}
821
822fn to_node_inner(
823 column: ColumnRef<'_>,
824 scope: &Scope,
825 dialect: Option<DialectType>,
826 scope_name: &str,
827 source_name: &str,
828 reference_node_name: &str,
829 trim_selects: bool,
830 ancestor_cte_scopes: &[Scope],
831 depth: usize,
832) -> Result<LineageNode> {
833 if depth > MAX_LINEAGE_DEPTH {
834 return Err(Error::internal(format!(
835 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
836 )));
837 }
838 let scope_expr = &scope.expression;
839
840 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
842 for s in ancestor_cte_scopes {
843 all_cte_scopes.push(s);
844 }
845 let descendant_cte_scopes = descendant_cte_scope_clones(&all_cte_scopes, scope);
846
847 let effective_expr = effective_scope_expression(scope_expr);
850
851 if matches!(
853 effective_expr,
854 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
855 ) {
856 if matches!(scope_expr, Expression::Cte(_)) {
858 let mut inner_scope = Scope::new(effective_expr.clone());
859 inner_scope.union_scopes = scope.union_scopes.clone();
860 inner_scope.sources = scope.sources.clone();
861 inner_scope.cte_sources = scope.cte_sources.clone();
862 inner_scope.cte_scopes = scope.cte_scopes.clone();
863 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
864 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
865 return handle_set_operation(
866 &column,
867 &inner_scope,
868 dialect,
869 scope_name,
870 source_name,
871 reference_node_name,
872 trim_selects,
873 &descendant_cte_scopes,
874 depth,
875 );
876 }
877 return handle_set_operation(
878 &column,
879 scope,
880 dialect,
881 scope_name,
882 source_name,
883 reference_node_name,
884 trim_selects,
885 &descendant_cte_scopes,
886 depth,
887 );
888 }
889
890 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
892 let column_name = resolve_column_name(&column, &select_expr);
893
894 let node_source = if trim_selects {
896 trim_source(effective_expr, &select_expr)
897 } else {
898 effective_expr.clone()
899 };
900
901 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
903 apply_scope_context(&mut node, scope, source_name, reference_node_name);
904
905 if let Expression::Star(star) = &select_expr {
907 let star_table = star
908 .table
909 .as_ref()
910 .map(|identifier| identifier.name.as_str());
911 for (name, source_info) in &scope.sources {
912 if let Some(star_table) = star_table {
913 let table_matches = name.eq_ignore_ascii_case(star_table)
914 || source_info
915 .alias
916 .as_deref()
917 .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
918 || matches!(
919 &source_info.expression,
920 Expression::Table(table_ref)
921 if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
922 );
923 if !table_matches {
924 continue;
925 }
926 }
927
928 let mut child = LineageNode::new(
929 format!("{}.*", name),
930 Expression::Star(crate::expressions::Star {
931 table: star.table.clone(),
932 except: None,
933 replace: None,
934 rename: None,
935 trailing_comments: vec![],
936 span: None,
937 }),
938 source_info.expression.clone(),
939 );
940 apply_source_info_context(&mut child, name, source_info);
941 node.downstream.push(child);
942 }
943 return Ok(node);
944 }
945
946 for query in query_expressions_in_scope(&select_expr) {
948 for sq_scope in &scope.subquery_scopes {
949 if sq_scope.expression == *query {
950 if let Ok(child) = to_node_inner(
951 ColumnRef::Index(0),
952 sq_scope,
953 dialect,
954 &column_name,
955 "",
956 "",
957 trim_selects,
958 &descendant_cte_scopes,
959 depth + 1,
960 ) {
961 node.downstream.push(child);
962 }
963 break;
964 }
965 }
966 }
967
968 let col_refs = find_column_refs_in_expr_with_select(&select_expr, effective_expr, dialect);
970 for col_ref in col_refs {
971 let col_name = &col_ref.column;
972 if let Some(ref table_id) = col_ref.table {
973 let tbl = &table_id.name;
974 resolve_qualified_column(
975 &mut node,
976 scope,
977 dialect,
978 tbl,
979 col_name,
980 &column_name,
981 trim_selects,
982 &all_cte_scopes,
983 depth,
984 );
985 } else {
986 if let Some(alias_expr) =
987 find_prior_select_alias_expr(effective_expr, &select_expr, col_name, dialect)
988 {
989 for alias_ref in
990 find_column_refs_in_expr_with_select(&alias_expr, effective_expr, dialect)
991 {
992 if let Some(ref table_id) = alias_ref.table {
993 resolve_qualified_column(
994 &mut node,
995 scope,
996 dialect,
997 &table_id.name,
998 &alias_ref.column,
999 &column_name,
1000 trim_selects,
1001 &all_cte_scopes,
1002 depth,
1003 );
1004 } else {
1005 resolve_unqualified_column(
1006 &mut node,
1007 scope,
1008 dialect,
1009 &alias_ref.column,
1010 &column_name,
1011 trim_selects,
1012 &all_cte_scopes,
1013 depth,
1014 );
1015 }
1016 }
1017 continue;
1018 }
1019
1020 resolve_unqualified_column(
1021 &mut node,
1022 scope,
1023 dialect,
1024 col_name,
1025 &column_name,
1026 trim_selects,
1027 &all_cte_scopes,
1028 depth,
1029 );
1030 }
1031 }
1032
1033 Ok(node)
1034}
1035
1036fn descendant_cte_scope_clones(all_cte_scopes: &[&Scope], current_scope: &Scope) -> Vec<Scope> {
1037 all_cte_scopes
1038 .iter()
1039 .filter(|scope| scope.expression != current_scope.expression)
1040 .map(|scope| (*scope).clone())
1041 .collect()
1042}
1043
1044fn effective_scope_expression(expr: &Expression) -> &Expression {
1045 match expr {
1046 Expression::Cte(cte) => effective_scope_expression(&cte.this),
1047 Expression::Subquery(subquery) => effective_scope_expression(&subquery.this),
1048 Expression::Paren(paren) => effective_scope_expression(&paren.this),
1049 other => other,
1050 }
1051}
1052
1053fn query_expressions_in_scope(expr: &Expression) -> Vec<&Expression> {
1054 let mut queries = Vec::new();
1055 let mut seen = HashSet::new();
1056
1057 for node in find_all_in_scope(
1058 expr,
1059 |node| {
1060 matches!(
1061 node,
1062 Expression::Subquery(subquery) if subquery.alias.is_none()
1063 ) || matches!(
1064 node,
1065 Expression::Exists(_) | Expression::In(_) | Expression::Any(_) | Expression::All(_)
1066 )
1067 },
1068 false,
1069 ) {
1070 let query = match node {
1071 Expression::Subquery(subquery) if subquery.alias.is_none() => Some(&subquery.this),
1072 Expression::Exists(exists) => Some(&exists.this),
1073 Expression::In(in_expr) => in_expr.query.as_ref(),
1074 Expression::Any(quantified) | Expression::All(quantified) => Some(&quantified.subquery),
1075 _ => None,
1076 };
1077
1078 if let Some(query) = query {
1079 let key = query as *const Expression as usize;
1080 if seen.insert(key) {
1081 queries.push(query);
1082 }
1083 }
1084 }
1085
1086 queries
1087}
1088
1089fn handle_set_operation(
1094 column: &ColumnRef<'_>,
1095 scope: &Scope,
1096 dialect: Option<DialectType>,
1097 scope_name: &str,
1098 source_name: &str,
1099 reference_node_name: &str,
1100 trim_selects: bool,
1101 ancestor_cte_scopes: &[Scope],
1102 depth: usize,
1103) -> Result<LineageNode> {
1104 let scope_expr = &scope.expression;
1105
1106 let col_index = match column {
1108 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
1109 ColumnRef::Index(i) => *i,
1110 };
1111
1112 let col_name = match column {
1113 ColumnRef::Name(name) => name.to_string(),
1114 ColumnRef::Index(_) => format!("_{col_index}"),
1115 };
1116
1117 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
1118 apply_scope_context(&mut node, scope, source_name, reference_node_name);
1119
1120 for branch_scope in &scope.union_scopes {
1122 if let Ok(child) = to_node_inner(
1123 ColumnRef::Index(col_index),
1124 branch_scope,
1125 dialect,
1126 scope_name,
1127 "",
1128 "",
1129 trim_selects,
1130 ancestor_cte_scopes,
1131 depth + 1,
1132 ) {
1133 node.downstream.push(child);
1134 }
1135 }
1136
1137 Ok(node)
1138}
1139
1140fn resolve_qualified_column(
1145 node: &mut LineageNode,
1146 scope: &Scope,
1147 dialect: Option<DialectType>,
1148 table: &str,
1149 col_name: &str,
1150 parent_name: &str,
1151 trim_selects: bool,
1152 all_cte_scopes: &[&Scope],
1153 depth: usize,
1154) {
1155 let resolved_cte_name = resolve_cte_alias(scope, table);
1158 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
1159
1160 if let Some(source_info) = scope
1161 .sources
1162 .get(table)
1163 .or_else(|| scope.sources.get(effective_table))
1164 {
1165 match &source_info.expression {
1166 Expression::Pivot(pivot) => {
1167 if attach_pivot_dependencies(
1168 node,
1169 scope,
1170 dialect,
1171 pivot,
1172 col_name,
1173 trim_selects,
1174 all_cte_scopes,
1175 depth,
1176 ) {
1177 return;
1178 }
1179 }
1180 Expression::Unpivot(unpivot) => {
1181 if attach_unpivot_dependencies(
1182 node,
1183 scope,
1184 dialect,
1185 unpivot,
1186 col_name,
1187 trim_selects,
1188 all_cte_scopes,
1189 depth,
1190 ) {
1191 return;
1192 }
1193 }
1194 _ => {}
1195 }
1196 }
1197
1198 let is_cte = scope.cte_sources.contains_key(effective_table)
1201 || all_cte_scopes.iter().any(
1202 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
1203 );
1204 if is_cte {
1205 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
1206 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1208 if let Ok(child) = to_node_inner(
1209 ColumnRef::Name(col_name),
1210 child_scope,
1211 dialect,
1212 parent_name,
1213 effective_table,
1214 parent_name,
1215 trim_selects,
1216 &ancestors,
1217 depth + 1,
1218 ) {
1219 node.downstream.push(child);
1220 return;
1221 }
1222 }
1223
1224 if let Some(source_info) = scope
1225 .sources
1226 .get(table)
1227 .or_else(|| scope.sources.get(effective_table))
1228 .filter(|source_info| source_info.kind == SourceKind::Cte)
1229 {
1230 node.downstream.push(make_table_column_node_from_source(
1231 effective_table,
1232 col_name,
1233 source_info,
1234 ));
1235 return;
1236 }
1237 }
1238
1239 if let Some(source_info) = scope.sources.get(table) {
1241 if source_info.is_scope {
1242 if let Some(child_scope) = find_child_scope(scope, table) {
1243 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1244 if let Ok(child) = to_node_inner(
1245 ColumnRef::Name(col_name),
1246 child_scope,
1247 dialect,
1248 parent_name,
1249 table,
1250 parent_name,
1251 trim_selects,
1252 &ancestors,
1253 depth + 1,
1254 ) {
1255 node.downstream.push(child);
1256 return;
1257 }
1258 }
1259 }
1260 }
1261
1262 if let Some(source_info) = scope.sources.get(table) {
1265 if !source_info.is_scope {
1266 let mut child = make_table_column_node_from_source(table, col_name, source_info);
1267 if source_info.kind == SourceKind::Virtual {
1268 attach_virtual_source_dependencies(
1269 &mut child,
1270 scope,
1271 dialect,
1272 table,
1273 &source_info.expression,
1274 trim_selects,
1275 all_cte_scopes,
1276 depth,
1277 );
1278 }
1279 node.downstream.push(child);
1280 return;
1281 }
1282 }
1283
1284 node.downstream
1286 .push(make_table_column_node(table, col_name));
1287}
1288
1289fn attach_pivot_dependencies(
1290 node: &mut LineageNode,
1291 scope: &Scope,
1292 dialect: Option<DialectType>,
1293 pivot: &crate::expressions::Pivot,
1294 col_name: &str,
1295 trim_selects: bool,
1296 all_cte_scopes: &[&Scope],
1297 depth: usize,
1298) -> bool {
1299 if pivot.unpivot {
1300 return false;
1301 }
1302
1303 let mapping = pivot_lineage_column_mapping(pivot, scope, dialect);
1304 let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1305 if pivot_implicit_source_column(pivot, col_name) {
1306 let col_ref = SimpleColumnRef {
1307 table: None,
1308 column: col_name.to_string(),
1309 };
1310 attach_pivot_input_column(
1311 node,
1312 scope,
1313 dialect,
1314 &pivot.this,
1315 &col_ref,
1316 trim_selects,
1317 all_cte_scopes,
1318 depth,
1319 );
1320 return true;
1321 }
1322 return false;
1323 };
1324
1325 for col_ref in input_columns {
1326 attach_pivot_input_column(
1327 node,
1328 scope,
1329 dialect,
1330 &pivot.this,
1331 col_ref,
1332 trim_selects,
1333 all_cte_scopes,
1334 depth,
1335 );
1336 }
1337 true
1338}
1339
1340fn attach_unpivot_dependencies(
1341 node: &mut LineageNode,
1342 scope: &Scope,
1343 dialect: Option<DialectType>,
1344 unpivot: &crate::expressions::Unpivot,
1345 col_name: &str,
1346 trim_selects: bool,
1347 all_cte_scopes: &[&Scope],
1348 depth: usize,
1349) -> bool {
1350 let mapping = unpivot_column_mapping(unpivot, dialect);
1351 let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1352 return false;
1353 };
1354
1355 for col_ref in input_columns {
1356 attach_pivot_input_column(
1357 node,
1358 scope,
1359 dialect,
1360 &unpivot.this,
1361 col_ref,
1362 trim_selects,
1363 all_cte_scopes,
1364 depth,
1365 );
1366 }
1367 true
1368}
1369
1370fn pivot_column_mapping(
1371 pivot: &crate::expressions::Pivot,
1372 dialect: Option<DialectType>,
1373) -> HashMap<String, Vec<SimpleColumnRef>> {
1374 let aggregations = pivot_aggregation_expressions(pivot);
1375 let output_columns = pivot_generated_output_columns(pivot, dialect);
1376 if aggregations.is_empty() || output_columns.is_empty() {
1377 return HashMap::new();
1378 }
1379
1380 let mut mapping = HashMap::new();
1381 for (agg_index, agg) in aggregations.iter().enumerate() {
1382 let input_columns = find_column_refs_in_expr(agg, dialect);
1383 if input_columns.is_empty() {
1384 continue;
1385 }
1386 for col_index in (agg_index..output_columns.len()).step_by(aggregations.len()) {
1387 mapping.insert(
1388 normalize_column_name(&output_columns[col_index], dialect),
1389 input_columns.clone(),
1390 );
1391 }
1392 }
1393 mapping
1394}
1395
1396fn pivot_lineage_column_mapping(
1397 pivot: &crate::expressions::Pivot,
1398 scope: &Scope,
1399 dialect: Option<DialectType>,
1400) -> HashMap<String, Vec<SimpleColumnRef>> {
1401 let mut mapping = pivot_column_mapping(pivot, dialect);
1402 let Some(pre_pivot_columns) = pre_pivot_output_columns(&pivot.this, scope) else {
1403 return mapping;
1404 };
1405
1406 let output_columns = pivot_output_columns(pivot, &pre_pivot_columns, dialect);
1407 if output_columns.is_empty() {
1408 return mapping;
1409 }
1410
1411 let base_mapping = mapping.clone();
1412 for (post_name, pre_name) in output_columns {
1413 let normalized_pre = normalize_column_name(&pre_name, dialect);
1414 let normalized_post = normalize_column_name(&post_name, dialect);
1415
1416 if let Some(input_columns) = base_mapping.get(&normalized_pre) {
1417 mapping.insert(normalized_post, input_columns.clone());
1418 } else {
1419 mapping.insert(
1420 normalized_post,
1421 vec![SimpleColumnRef {
1422 table: None,
1423 column: pre_name,
1424 }],
1425 );
1426 }
1427 }
1428
1429 mapping
1430}
1431
1432fn pre_pivot_output_columns(source: &Expression, scope: &Scope) -> Option<Vec<String>> {
1433 match source {
1434 Expression::Subquery(subquery) => known_output_columns(&subquery.this),
1435 Expression::Table(table) if table.schema.is_none() && table.catalog.is_none() => scope
1436 .cte_sources
1437 .get(&table.name.name)
1438 .and_then(|source| known_output_columns(&source.expression)),
1439 Expression::Paren(paren) => pre_pivot_output_columns(&paren.this, scope),
1440 _ => None,
1441 }
1442}
1443
1444fn known_output_columns(expression: &Expression) -> Option<Vec<String>> {
1445 let expression = match expression {
1446 Expression::Cte(cte) => &cte.this,
1447 Expression::Subquery(subquery) => &subquery.this,
1448 other => other,
1449 };
1450 let columns = crate::ast_transforms::get_output_column_names(expression);
1451 if columns.is_empty() || columns.iter().any(|column| column == "*") {
1452 None
1453 } else {
1454 Some(columns)
1455 }
1456}
1457
1458fn pivot_output_columns(
1459 pivot: &crate::expressions::Pivot,
1460 pre_pivot_columns: &[String],
1461 dialect: Option<DialectType>,
1462) -> Vec<(String, String)> {
1463 let generated_outputs = pivot_generated_output_columns(pivot, dialect);
1464 let excluded = pivot_excluded_source_columns(pivot, dialect);
1465
1466 if excluded.is_empty() || generated_outputs.is_empty() {
1467 return Vec::new();
1468 }
1469
1470 let mut pre_rename: Vec<String> = pre_pivot_columns
1471 .iter()
1472 .filter(|column| !excluded.contains(&normalize_column_name(column, dialect)))
1473 .cloned()
1474 .collect();
1475 pre_rename.extend(generated_outputs);
1476
1477 let post_rename = if pivot.alias_columns.is_empty() {
1478 pre_rename.clone()
1479 } else {
1480 let mut names: Vec<String> = pivot
1481 .alias_columns
1482 .iter()
1483 .map(|column| column.name.clone())
1484 .collect();
1485 names.extend(pre_rename.iter().skip(names.len()).cloned());
1486 names
1487 };
1488
1489 post_rename.into_iter().zip(pre_rename).collect()
1490}
1491
1492fn pivot_excluded_source_columns(
1493 pivot: &crate::expressions::Pivot,
1494 dialect: Option<DialectType>,
1495) -> HashSet<String> {
1496 pivot
1497 .fields
1498 .iter()
1499 .chain(pivot.expressions.iter())
1500 .chain(pivot.using.iter())
1501 .flat_map(|expr| find_column_refs_in_expr(expr, dialect))
1502 .map(|column| normalize_column_name(&column.column, dialect))
1503 .collect()
1504}
1505
1506fn pivot_generated_output_columns(
1507 pivot: &crate::expressions::Pivot,
1508 _dialect: Option<DialectType>,
1509) -> Vec<String> {
1510 let fields = pivot_field_output_names(pivot);
1511 if fields.is_empty() {
1512 return Vec::new();
1513 }
1514
1515 let aggregations = pivot_aggregation_expressions(pivot);
1516 if aggregations.is_empty() {
1517 return Vec::new();
1518 }
1519
1520 let needs_suffix = aggregations.len() > 1;
1521 let mut outputs = Vec::new();
1522 for field in fields {
1523 for aggregation in aggregations {
1524 if let Some(suffix) = pivot_aggregation_output_suffix(aggregation, needs_suffix) {
1525 outputs.push(format!("{field}_{suffix}"));
1526 } else {
1527 outputs.push(field.clone());
1528 }
1529 }
1530 }
1531 outputs
1532}
1533
1534fn pivot_aggregation_expressions(pivot: &crate::expressions::Pivot) -> &[Expression] {
1535 if pivot.using.is_empty() {
1536 &pivot.expressions
1537 } else {
1538 &pivot.using
1539 }
1540}
1541
1542fn pivot_aggregation_output_suffix(expr: &Expression, needs_suffix: bool) -> Option<String> {
1543 match expr {
1544 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1545 _ if needs_suffix => Generator::sql(expr).ok().map(|sql| sql.to_lowercase()),
1546 _ => None,
1547 }
1548}
1549
1550fn pivot_field_output_names(pivot: &crate::expressions::Pivot) -> Vec<String> {
1551 let mut names = Vec::new();
1552 for field in &pivot.fields {
1553 if let Expression::In(in_expr) = field {
1554 for expr in &in_expr.expressions {
1555 if let Some(name) = pivot_expr_output_name(expr) {
1556 names.push(name);
1557 }
1558 }
1559 }
1560 }
1561 names
1562}
1563
1564fn pivot_expr_output_name(expr: &Expression) -> Option<String> {
1565 match expr {
1566 Expression::PivotAlias(alias) => pivot_expr_output_name(&alias.alias),
1567 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1568 Expression::Identifier(identifier) => Some(identifier.name.clone()),
1569 Expression::Column(column) => Some(column.name.name.clone()),
1570 Expression::Literal(literal) => Some(literal.value_str().to_string()),
1571 Expression::Var(var) => Some(var.this.clone()),
1572 Expression::Tuple(tuple) => tuple.expressions.first().and_then(pivot_expr_output_name),
1573 _ => None,
1574 }
1575}
1576
1577fn pivot_implicit_source_column(pivot: &crate::expressions::Pivot, col_name: &str) -> bool {
1578 let pivot_columns: HashSet<String> = pivot
1579 .fields
1580 .iter()
1581 .filter_map(|field| match field {
1582 Expression::In(in_expr) => Some(&in_expr.this),
1583 _ => None,
1584 })
1585 .flat_map(|expr| find_column_refs_in_expr(expr, None))
1586 .map(|col| col.column.to_lowercase())
1587 .collect();
1588 let aggregation_columns: HashSet<String> = pivot
1589 .expressions
1590 .iter()
1591 .flat_map(|expr| find_column_refs_in_expr(expr, None))
1592 .map(|col| col.column.to_lowercase())
1593 .collect();
1594
1595 let normalized = col_name.to_lowercase();
1596 !pivot_columns.contains(&normalized) && !aggregation_columns.contains(&normalized)
1597}
1598
1599fn unpivot_column_mapping(
1600 unpivot: &crate::expressions::Unpivot,
1601 dialect: Option<DialectType>,
1602) -> HashMap<String, Vec<SimpleColumnRef>> {
1603 let value_columns: Vec<String> = std::iter::once(unpivot.value_column.name.clone())
1604 .chain(
1605 unpivot
1606 .extra_value_columns
1607 .iter()
1608 .map(|column| column.name.clone()),
1609 )
1610 .collect();
1611 let mut all_input_columns = Vec::new();
1612 let mut value_input_columns: Vec<Vec<SimpleColumnRef>> = vec![Vec::new(); value_columns.len()];
1613
1614 for entry in &unpivot.columns {
1615 let columns = unpivot_entry_columns(entry);
1616 all_input_columns.extend(columns.clone());
1617 if columns.len() == value_columns.len() {
1618 for (idx, col_ref) in columns.into_iter().enumerate() {
1619 value_input_columns[idx].push(col_ref);
1620 }
1621 } else {
1622 for inputs in &mut value_input_columns {
1623 inputs.extend(columns.clone());
1624 }
1625 }
1626 }
1627
1628 let mut mapping = HashMap::new();
1629 mapping.insert(
1630 normalize_column_name(&unpivot.name_column.name, dialect),
1631 all_input_columns.clone(),
1632 );
1633 for (idx, value_column) in value_columns.iter().enumerate() {
1634 mapping.insert(
1635 normalize_column_name(value_column, dialect),
1636 value_input_columns.get(idx).cloned().unwrap_or_default(),
1637 );
1638 }
1639 mapping
1640}
1641
1642fn unpivot_entry_columns(expr: &Expression) -> Vec<SimpleColumnRef> {
1643 match expr {
1644 Expression::PivotAlias(alias) => unpivot_entry_columns(&alias.this),
1645 Expression::Tuple(tuple) => tuple
1646 .expressions
1647 .iter()
1648 .flat_map(unpivot_entry_columns)
1649 .collect(),
1650 Expression::Column(column) => vec![SimpleColumnRef {
1651 table: column.table.clone(),
1652 column: column.name.name.clone(),
1653 }],
1654 Expression::Identifier(identifier) => vec![SimpleColumnRef {
1655 table: None,
1656 column: identifier.name.clone(),
1657 }],
1658 _ => find_column_refs_in_expr(expr, None),
1659 }
1660}
1661
1662fn attach_pivot_input_column(
1663 node: &mut LineageNode,
1664 scope: &Scope,
1665 dialect: Option<DialectType>,
1666 source_expr: &Expression,
1667 col_ref: &SimpleColumnRef,
1668 trim_selects: bool,
1669 all_cte_scopes: &[&Scope],
1670 depth: usize,
1671) {
1672 match source_expr {
1673 Expression::Table(table) => {
1674 let table_name = col_ref
1675 .table
1676 .as_ref()
1677 .map(|identifier| identifier.name.as_str())
1678 .unwrap_or(table.name.name.as_str());
1679 if scope.cte_sources.contains_key(table_name) {
1680 resolve_qualified_column(
1681 node,
1682 scope,
1683 dialect,
1684 table_name,
1685 &col_ref.column,
1686 &node.name.clone(),
1687 trim_selects,
1688 all_cte_scopes,
1689 depth + 1,
1690 );
1691 } else {
1692 let mut source = ScopeSourceInfo::new(
1693 Expression::Table(Box::new(table.as_ref().clone())),
1694 false,
1695 SourceKind::Table,
1696 );
1697 if let Some(alias) = &table.alias {
1698 source = source.with_alias(alias.name.clone());
1699 }
1700 let source_key = table
1701 .alias
1702 .as_ref()
1703 .map(|alias| alias.name.as_str())
1704 .unwrap_or(table.name.name.as_str());
1705 node.downstream.push(make_table_column_node_from_source(
1706 source_key,
1707 &col_ref.column,
1708 &source,
1709 ));
1710 }
1711 }
1712 Expression::Subquery(subquery) => {
1713 let source_scope = build_scope(&subquery.this);
1714 let child = if let Some(table) = &col_ref.table {
1715 let mut child_node = LineageNode::new(
1716 &col_ref.column,
1717 subquery.this.clone(),
1718 subquery.this.clone(),
1719 );
1720 resolve_qualified_column(
1721 &mut child_node,
1722 &source_scope,
1723 dialect,
1724 &table.name,
1725 &col_ref.column,
1726 &node.name.clone(),
1727 trim_selects,
1728 all_cte_scopes,
1729 depth + 1,
1730 );
1731 Ok(child_node)
1732 } else {
1733 to_node_inner(
1734 ColumnRef::Name(&col_ref.column),
1735 &source_scope,
1736 dialect,
1737 "",
1738 "",
1739 "",
1740 trim_selects,
1741 &all_cte_scopes
1742 .iter()
1743 .map(|scope| (*scope).clone())
1744 .collect::<Vec<_>>(),
1745 depth + 1,
1746 )
1747 };
1748 if let Ok(child) = child {
1749 node.downstream.push(child);
1750 }
1751 }
1752 Expression::Paren(paren) => attach_pivot_input_column(
1753 node,
1754 scope,
1755 dialect,
1756 &paren.this,
1757 col_ref,
1758 trim_selects,
1759 all_cte_scopes,
1760 depth,
1761 ),
1762 _ => {
1763 if let Some(table) = &col_ref.table {
1764 resolve_qualified_column(
1765 node,
1766 scope,
1767 dialect,
1768 &table.name,
1769 &col_ref.column,
1770 &node.name.clone(),
1771 trim_selects,
1772 all_cte_scopes,
1773 depth + 1,
1774 );
1775 } else {
1776 node.downstream
1777 .push(make_table_column_node("_", &col_ref.column));
1778 }
1779 }
1780 }
1781}
1782
1783fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1789 if scope.cte_sources.contains_key(name) {
1791 return None;
1792 }
1793 if let Some(source_info) = scope.sources.get(name) {
1795 if source_info.is_scope {
1796 if let Expression::Cte(cte) = &source_info.expression {
1797 let cte_name = &cte.alias.name;
1798 if scope.cte_sources.contains_key(cte_name) {
1799 return Some(cte_name.clone());
1800 }
1801 }
1802 }
1803 }
1804 None
1805}
1806
1807fn resolve_unqualified_column(
1808 node: &mut LineageNode,
1809 scope: &Scope,
1810 dialect: Option<DialectType>,
1811 col_name: &str,
1812 parent_name: &str,
1813 trim_selects: bool,
1814 all_cte_scopes: &[&Scope],
1815 depth: usize,
1816) {
1817 let from_source_names = source_names_from_from_join(scope);
1821
1822 if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1823 resolve_qualified_column(
1824 node,
1825 scope,
1826 dialect,
1827 &tbl,
1828 col_name,
1829 parent_name,
1830 trim_selects,
1831 all_cte_scopes,
1832 depth,
1833 );
1834 return;
1835 }
1836
1837 if from_source_names.len() == 1 {
1838 let tbl = &from_source_names[0];
1839 resolve_qualified_column(
1840 node,
1841 scope,
1842 dialect,
1843 tbl,
1844 col_name,
1845 parent_name,
1846 trim_selects,
1847 all_cte_scopes,
1848 depth,
1849 );
1850 return;
1851 }
1852
1853 let child = LineageNode::new(
1855 col_name.to_string(),
1856 Expression::Column(Box::new(crate::expressions::Column {
1857 name: crate::expressions::Identifier::new(col_name.to_string()),
1858 table: None,
1859 join_mark: false,
1860 trailing_comments: vec![],
1861 span: None,
1862 inferred_type: None,
1863 })),
1864 node.source.clone(),
1865 );
1866 node.downstream.push(child);
1867}
1868
1869fn unique_virtual_source_for_column(
1870 scope: &Scope,
1871 source_names: &[String],
1872 col_name: &str,
1873) -> Option<String> {
1874 let mut matches = source_names.iter().filter_map(|source_name| {
1875 let source = scope.sources.get(source_name)?;
1876 if source.kind == SourceKind::Virtual
1877 && virtual_source_output_columns(source)
1878 .any(|column| column.eq_ignore_ascii_case(col_name))
1879 {
1880 Some(source_name.clone())
1881 } else {
1882 None
1883 }
1884 });
1885
1886 let first = matches.next()?;
1887 if matches.next().is_none() {
1888 Some(first)
1889 } else {
1890 None
1891 }
1892}
1893
1894fn virtual_source_output_columns(
1895 source_info: &ScopeSourceInfo,
1896) -> Box<dyn Iterator<Item = String> + '_> {
1897 match &source_info.expression {
1898 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1899 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1900 Box::new(alias_output_columns(alias))
1901 }
1902 Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1903 Expression::LateralView(lateral_view) => {
1904 Box::new(lateral_view_output_columns(lateral_view))
1905 }
1906 _ => Box::new(source_info.alias.clone().into_iter()),
1907 }
1908}
1909
1910fn unnest_output_columns(
1911 unnest: &crate::expressions::UnnestFunc,
1912) -> impl Iterator<Item = String> + '_ {
1913 unnest
1914 .alias
1915 .iter()
1916 .map(|alias| alias.name.clone())
1917 .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1918}
1919
1920fn alias_output_columns(
1921 alias: &crate::expressions::Alias,
1922) -> Box<dyn Iterator<Item = String> + '_> {
1923 if alias.column_aliases.is_empty() {
1924 Box::new(std::iter::once(alias.alias.name.clone()))
1925 } else {
1926 Box::new(
1927 alias
1928 .column_aliases
1929 .iter()
1930 .map(|column| column.name.clone()),
1931 )
1932 }
1933}
1934
1935fn lateral_output_columns(
1936 lateral: &crate::expressions::Lateral,
1937) -> Box<dyn Iterator<Item = String> + '_> {
1938 if lateral.column_aliases.is_empty() {
1939 default_virtual_output_columns(&lateral.this)
1940 } else {
1941 Box::new(lateral.column_aliases.iter().cloned())
1942 }
1943}
1944
1945fn lateral_view_output_columns(
1946 lateral_view: &crate::expressions::LateralView,
1947) -> Box<dyn Iterator<Item = String> + '_> {
1948 Box::new(
1949 lateral_view
1950 .column_aliases
1951 .iter()
1952 .map(|column| column.name.clone()),
1953 )
1954}
1955
1956fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1957 match expr {
1958 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1959 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1960 alias_output_columns(alias)
1961 }
1962 Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1963 Box::new(
1964 ["seq", "key", "path", "index", "value", "this"]
1965 .into_iter()
1966 .map(String::from),
1967 )
1968 }
1969 _ => Box::new(std::iter::empty()),
1970 }
1971}
1972
1973fn attach_virtual_source_dependencies(
1974 node: &mut LineageNode,
1975 scope: &Scope,
1976 dialect: Option<DialectType>,
1977 source_alias: &str,
1978 source_expr: &Expression,
1979 trim_selects: bool,
1980 all_cte_scopes: &[&Scope],
1981 depth: usize,
1982) {
1983 let parent_name = node.name.clone();
1984 let mut seen = HashSet::new();
1985 for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1986 let key = (
1987 col_ref.table.as_ref().map(|t| t.name.clone()),
1988 col_ref.column.clone(),
1989 );
1990 if !seen.insert(key) {
1991 continue;
1992 }
1993
1994 if let Some(table_id) = col_ref.table {
1995 let table = table_id.name;
1996 if table == source_alias {
1997 continue;
1998 }
1999 resolve_qualified_column(
2000 node,
2001 scope,
2002 dialect,
2003 &table,
2004 &col_ref.column,
2005 &parent_name,
2006 trim_selects,
2007 all_cte_scopes,
2008 depth + 1,
2009 );
2010 } else {
2011 let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
2012 if non_virtual_sources.len() == 1 {
2013 resolve_qualified_column(
2014 node,
2015 scope,
2016 dialect,
2017 &non_virtual_sources[0],
2018 &col_ref.column,
2019 &parent_name,
2020 trim_selects,
2021 all_cte_scopes,
2022 depth + 1,
2023 );
2024 }
2025 }
2026 }
2027}
2028
2029fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
2030 fn source_name(expr: &Expression) -> Option<String> {
2031 match expr {
2032 Expression::Table(table) => Some(
2033 table
2034 .alias
2035 .as_ref()
2036 .map(|a| a.name.clone())
2037 .unwrap_or_else(|| table.name.name.clone()),
2038 ),
2039 Expression::Subquery(subquery) => {
2040 subquery.alias.as_ref().map(|alias| alias.name.clone())
2041 }
2042 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
2043 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
2044 Some(alias.alias.name.clone())
2045 }
2046 Expression::Alias(alias) if is_query_like_relation(&alias.this) => {
2047 Some(alias.alias.name.clone())
2048 }
2049 Expression::Lateral(lateral) => lateral.alias.clone(),
2050 Expression::LateralView(lateral_view) => lateral_view
2051 .table_alias
2052 .as_ref()
2053 .or_else(|| lateral_view.column_aliases.first())
2054 .map(|alias| alias.name.clone()),
2055 Expression::Pivot(pivot) => Some(pivot_lineage_source_name(
2056 &pivot.this,
2057 pivot.alias.as_ref().map(|alias| alias.name.as_str()),
2058 )),
2059 Expression::Unpivot(unpivot) => Some(pivot_lineage_source_name(
2060 &unpivot.this,
2061 unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
2062 )),
2063 Expression::Paren(paren) => source_name(&paren.this),
2064 _ => None,
2065 }
2066 }
2067
2068 let effective_expr = match &scope.expression {
2069 Expression::Cte(cte) => &cte.this,
2070 expr => expr,
2071 };
2072
2073 let mut names = Vec::new();
2074 let mut seen = std::collections::HashSet::new();
2075
2076 if let Expression::Select(select) = effective_expr {
2077 if let Some(from) = &select.from {
2078 for expr in &from.expressions {
2079 if let Some(name) = source_name(expr) {
2080 if !name.is_empty() && seen.insert(name.clone()) {
2081 names.push(name);
2082 }
2083 }
2084 }
2085 }
2086 for join in &select.joins {
2087 if is_semi_or_anti_join_kind(join.kind) {
2088 continue;
2089 }
2090 if let Some(name) = source_name(&join.this) {
2091 if !name.is_empty() && seen.insert(name.clone()) {
2092 names.push(name);
2093 }
2094 }
2095 }
2096 for lateral_view in &select.lateral_views {
2097 if let Some(name) =
2098 source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
2099 {
2100 if !name.is_empty() && seen.insert(name.clone()) {
2101 names.push(name);
2102 }
2103 }
2104 }
2105 }
2106
2107 names
2108}
2109
2110fn is_semi_or_anti_join_kind(kind: JoinKind) -> bool {
2111 matches!(
2112 kind,
2113 JoinKind::Semi
2114 | JoinKind::Anti
2115 | JoinKind::LeftSemi
2116 | JoinKind::LeftAnti
2117 | JoinKind::RightSemi
2118 | JoinKind::RightAnti
2119 )
2120}
2121
2122fn is_query_like_relation(expr: &Expression) -> bool {
2123 match expr {
2124 Expression::Select(_)
2125 | Expression::Subquery(_)
2126 | Expression::Union(_)
2127 | Expression::Intersect(_)
2128 | Expression::Except(_) => true,
2129 Expression::Paren(paren) => is_query_like_relation(&paren.this),
2130 _ => false,
2131 }
2132}
2133
2134fn derived_source_query(expr: &Expression) -> Option<&Expression> {
2135 match expr {
2136 Expression::Subquery(subquery) => Some(&subquery.this),
2137 Expression::Alias(alias) if is_query_like_relation(&alias.this) => Some(&alias.this),
2138 Expression::Select(_)
2139 | Expression::Union(_)
2140 | Expression::Intersect(_)
2141 | Expression::Except(_) => Some(expr),
2142 Expression::Paren(paren) => derived_source_query(&paren.this),
2143 _ => None,
2144 }
2145}
2146
2147fn expressions_equivalent_after_wrappers(left: &Expression, right: &Expression) -> bool {
2148 left == right || effective_scope_expression(left) == effective_scope_expression(right)
2149}
2150
2151fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
2152 source_names_from_from_join(scope)
2153 .into_iter()
2154 .filter(|name| {
2155 !matches!(
2156 scope.sources.get(name).map(|source| source.kind),
2157 Some(SourceKind::Virtual)
2158 )
2159 })
2160 .collect()
2161}
2162
2163fn get_alias_or_name(expr: &Expression) -> Option<String> {
2169 match expr {
2170 Expression::Alias(alias) => Some(alias.alias.name.clone()),
2171 Expression::Column(col) => Some(col.name.name.clone()),
2172 Expression::Identifier(id) => Some(id.name.clone()),
2173 Expression::Star(_) => Some("*".to_string()),
2174 Expression::Annotated(a) => get_alias_or_name(&a.this),
2177 _ => None,
2178 }
2179}
2180
2181fn find_prior_select_alias_expr(
2182 scope_expr: &Expression,
2183 target_expr: &Expression,
2184 alias_name: &str,
2185 dialect: Option<DialectType>,
2186) -> Option<Expression> {
2187 let Expression::Select(select) = scope_expr else {
2188 return None;
2189 };
2190
2191 let normalized_alias = normalize_column_name(alias_name, dialect);
2192 for expr in &select.expressions {
2193 if expr == target_expr {
2194 return None;
2195 }
2196
2197 if let Expression::Alias(alias) = expr {
2198 if normalize_column_name(&alias.alias.name, dialect) == normalized_alias {
2199 return Some(alias.this.clone());
2200 }
2201 }
2202 }
2203
2204 None
2205}
2206
2207fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
2209 match column {
2210 ColumnRef::Name(n) => n.to_string(),
2211 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
2212 }
2213}
2214
2215fn find_select_expr(
2217 scope_expr: &Expression,
2218 column: &ColumnRef<'_>,
2219 dialect: Option<DialectType>,
2220) -> Result<Expression> {
2221 if let Expression::Select(ref select) = scope_expr {
2222 match column {
2223 ColumnRef::Name(name) => {
2224 let normalized_name = normalize_column_name(name, dialect);
2225 for expr in &select.expressions {
2226 if let Some(alias_or_name) = get_alias_or_name(expr) {
2227 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2228 return Ok(expr.clone());
2229 }
2230 }
2231 }
2232 if let Some(expr) = synthesize_star_passthrough_expr(select, name) {
2233 return Ok(expr);
2234 }
2235 Err(crate::error::Error::parse(
2236 format!("Cannot find column '{}' in query", name),
2237 0,
2238 0,
2239 0,
2240 0,
2241 ))
2242 }
2243 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
2244 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
2245 }),
2246 }
2247 } else {
2248 Err(crate::error::Error::parse(
2249 "Expected SELECT expression for column lookup",
2250 0,
2251 0,
2252 0,
2253 0,
2254 ))
2255 }
2256}
2257
2258fn synthesize_star_passthrough_expr(select: &Select, name: &str) -> Option<Expression> {
2259 let sources = get_select_sources(select);
2260 if sources.is_empty() {
2261 return None;
2262 }
2263
2264 let mut candidate_aliases = Vec::new();
2265 let mut seen = HashSet::new();
2266
2267 for expr in &select.expressions {
2268 let aliases = match star_passthrough_source_aliases(expr, &sources) {
2269 StarPassthroughSources::None => continue,
2270 StarPassthroughSources::Ambiguous => return None,
2271 StarPassthroughSources::Aliases(aliases) => aliases,
2272 };
2273
2274 for alias in aliases {
2275 if seen.insert(alias.clone()) {
2276 candidate_aliases.push(alias);
2277 }
2278 }
2279 }
2280
2281 match candidate_aliases.as_slice() {
2282 [alias] => {
2283 let table = Identifier::new(alias.clone());
2284 Some(make_column_expr(name, Some(&table)))
2285 }
2286 _ => None,
2287 }
2288}
2289
2290enum StarPassthroughSources {
2291 None,
2292 Ambiguous,
2293 Aliases(Vec<String>),
2294}
2295
2296fn star_passthrough_source_aliases(
2297 expr: &Expression,
2298 sources: &[SourceInfo],
2299) -> StarPassthroughSources {
2300 match expr {
2301 Expression::Star(star) => star_source_aliases(star.table.as_ref(), sources),
2302 Expression::Column(column) if column.name.name == "*" => {
2303 star_source_aliases(column.table.as_ref(), sources)
2304 }
2305 Expression::Annotated(annotated) => {
2306 star_passthrough_source_aliases(&annotated.this, sources)
2307 }
2308 _ => StarPassthroughSources::None,
2309 }
2310}
2311
2312fn star_source_aliases(
2313 qualifier: Option<&Identifier>,
2314 sources: &[SourceInfo],
2315) -> StarPassthroughSources {
2316 if let Some(qualifier) = qualifier {
2317 let mut aliases = Vec::new();
2318
2319 for source in sources {
2320 if source_matches_star_qualifier(source, qualifier) {
2321 aliases.push(source.alias.clone());
2322 }
2323 }
2324
2325 return match aliases.len() {
2326 0 => StarPassthroughSources::None,
2327 1 => StarPassthroughSources::Aliases(aliases),
2328 _ => StarPassthroughSources::Ambiguous,
2329 };
2330 }
2331
2332 match sources {
2333 [source] if source.quoted => StarPassthroughSources::None,
2337 [source] => StarPassthroughSources::Aliases(vec![source.alias.clone()]),
2338 [] => StarPassthroughSources::None,
2339 _ => StarPassthroughSources::Ambiguous,
2340 }
2341}
2342
2343fn source_matches_star_qualifier(source: &SourceInfo, qualifier: &Identifier) -> bool {
2344 if source.normalized == normalize_cte_name(qualifier) {
2345 return true;
2346 }
2347
2348 if qualifier.quoted {
2349 source.alias == qualifier.name
2350 } else {
2351 source.alias.eq_ignore_ascii_case(&qualifier.name)
2352 }
2353}
2354
2355fn column_to_index(
2357 set_op_expr: &Expression,
2358 name: &str,
2359 dialect: Option<DialectType>,
2360) -> Result<usize> {
2361 let normalized_name = normalize_column_name(name, dialect);
2362 let mut expr = set_op_expr;
2363 loop {
2364 match expr {
2365 Expression::Union(u) => expr = &u.left,
2366 Expression::Intersect(i) => expr = &i.left,
2367 Expression::Except(e) => expr = &e.left,
2368 Expression::Subquery(subquery) => expr = &subquery.this,
2369 Expression::Cte(cte) => expr = &cte.this,
2370 Expression::Paren(paren) => expr = &paren.this,
2371 Expression::Select(select) => {
2372 for (i, e) in select.expressions.iter().enumerate() {
2373 if let Some(alias_or_name) = get_alias_or_name(e) {
2374 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2375 return Ok(i);
2376 }
2377 }
2378 }
2379 return Err(crate::error::Error::parse(
2380 format!("Cannot find column '{}' in set operation", name),
2381 0,
2382 0,
2383 0,
2384 0,
2385 ));
2386 }
2387 _ => {
2388 return Err(crate::error::Error::parse(
2389 "Expected SELECT or set operation",
2390 0,
2391 0,
2392 0,
2393 0,
2394 ))
2395 }
2396 }
2397 }
2398}
2399
2400fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
2401 normalize_name(name, dialect, false, true)
2402}
2403
2404fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
2406 if let Expression::Select(select) = select_expr {
2407 let mut trimmed = select.as_ref().clone();
2408 trimmed.expressions = vec![target_expr.clone()];
2409 Expression::Select(Box::new(trimmed))
2410 } else {
2411 select_expr.clone()
2412 }
2413}
2414
2415fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
2417 if scope.cte_sources.contains_key(source_name) {
2419 for cte_scope in &scope.cte_scopes {
2420 if let Expression::Cte(cte) = &cte_scope.expression {
2421 if cte.alias.name == source_name {
2422 return Some(cte_scope);
2423 }
2424 }
2425 }
2426 }
2427
2428 if let Some(source_info) = scope.sources.get(source_name) {
2430 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
2431 if let Some(query) = derived_source_query(&source_info.expression) {
2432 for dt_scope in &scope.derived_table_scopes {
2433 if expressions_equivalent_after_wrappers(&dt_scope.expression, query) {
2434 return Some(dt_scope);
2435 }
2436 }
2437 }
2438 }
2439 }
2440
2441 None
2442}
2443
2444fn find_child_scope_in<'a>(
2448 all_cte_scopes: &[&'a Scope],
2449 scope: &'a Scope,
2450 source_name: &str,
2451) -> Option<&'a Scope> {
2452 for cte_scope in &scope.cte_scopes {
2454 if let Expression::Cte(cte) = &cte_scope.expression {
2455 if cte.alias.name == source_name {
2456 return Some(cte_scope);
2457 }
2458 }
2459 }
2460
2461 for cte_scope in all_cte_scopes {
2463 if let Expression::Cte(cte) = &cte_scope.expression {
2464 if cte.alias.name == source_name {
2465 return Some(cte_scope);
2466 }
2467 }
2468 }
2469
2470 if let Some(source_info) = scope.sources.get(source_name) {
2472 if source_info.is_scope {
2473 if let Some(query) = derived_source_query(&source_info.expression) {
2474 for dt_scope in &scope.derived_table_scopes {
2475 if expressions_equivalent_after_wrappers(&dt_scope.expression, query) {
2476 return Some(dt_scope);
2477 }
2478 }
2479 }
2480 }
2481 }
2482
2483 None
2484}
2485
2486fn make_table_column_node(table: &str, column: &str) -> LineageNode {
2488 let mut node = LineageNode::new(
2489 format!("{}.{}", table, column),
2490 Expression::Column(Box::new(crate::expressions::Column {
2491 name: crate::expressions::Identifier::new(column.to_string()),
2492 table: Some(crate::expressions::Identifier::new(table.to_string())),
2493 join_mark: false,
2494 trailing_comments: vec![],
2495 span: None,
2496 inferred_type: None,
2497 })),
2498 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
2499 );
2500 node.source_name = table.to_string();
2501 node.source_kind = SourceKind::Table;
2502 node
2503}
2504
2505fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
2506 let mut parts: Vec<String> = Vec::new();
2507 if let Some(catalog) = &table_ref.catalog {
2508 parts.push(catalog.name.clone());
2509 }
2510 if let Some(schema) = &table_ref.schema {
2511 parts.push(schema.name.clone());
2512 }
2513 parts.push(table_ref.name.name.clone());
2514 parts.join(".")
2515}
2516
2517fn apply_source_info_context(
2518 node: &mut LineageNode,
2519 source_key: &str,
2520 source_info: &ScopeSourceInfo,
2521) {
2522 node.source_kind = source_info.kind;
2523 node.source_name =
2524 source_info
2525 .lineage_name
2526 .clone()
2527 .unwrap_or_else(|| match &source_info.expression {
2528 Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
2529 _ => source_key.to_string(),
2530 });
2531 node.source_alias = source_info.alias.clone();
2532}
2533
2534fn make_table_column_node_from_source(
2535 source_key: &str,
2536 column: &str,
2537 source_info: &ScopeSourceInfo,
2538) -> LineageNode {
2539 let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
2540 let mut node = LineageNode::new(
2541 format!("{}.{}", lineage_name, column),
2542 Expression::Column(Box::new(crate::expressions::Column {
2543 name: crate::expressions::Identifier::new(column.to_string()),
2544 table: Some(crate::expressions::Identifier::new(
2545 lineage_name.to_string(),
2546 )),
2547 join_mark: false,
2548 trailing_comments: vec![],
2549 span: None,
2550 inferred_type: None,
2551 })),
2552 source_info.expression.clone(),
2553 );
2554
2555 apply_source_info_context(&mut node, source_key, source_info);
2556
2557 node
2558}
2559
2560#[derive(Debug, Clone)]
2562struct SimpleColumnRef {
2563 table: Option<crate::expressions::Identifier>,
2564 column: String,
2565}
2566
2567fn find_column_refs_in_expr(
2569 expr: &Expression,
2570 dialect: Option<DialectType>,
2571) -> Vec<SimpleColumnRef> {
2572 let mut refs = Vec::new();
2573 collect_column_refs(expr, dialect, &mut refs, None);
2574 refs
2575}
2576
2577fn find_column_refs_in_expr_with_select(
2578 expr: &Expression,
2579 select_expr: &Expression,
2580 dialect: Option<DialectType>,
2581) -> Vec<SimpleColumnRef> {
2582 let named_windows = match select_expr {
2583 Expression::Select(select) => select.windows.as_deref(),
2584 _ => None,
2585 };
2586 let mut refs = Vec::new();
2587 collect_column_refs(expr, dialect, &mut refs, named_windows);
2588 refs
2589}
2590
2591fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
2592 match expr {
2593 Expression::Column(col) => {
2594 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
2595 }
2596 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
2597 _ => false,
2598 }
2599}
2600
2601fn collect_column_refs(
2602 expr: &Expression,
2603 dialect: Option<DialectType>,
2604 refs: &mut Vec<SimpleColumnRef>,
2605 named_windows: Option<&[NamedWindow]>,
2606) {
2607 let mut stack: Vec<&Expression> = vec![expr];
2608
2609 while let Some(current) = stack.pop() {
2610 match current {
2611 Expression::Column(col) => {
2613 refs.push(SimpleColumnRef {
2614 table: col.table.clone(),
2615 column: col.name.name.clone(),
2616 });
2617 }
2618
2619 Expression::Subquery(_) | Expression::Exists(_) => {}
2621
2622 Expression::And(op)
2624 | Expression::Or(op)
2625 | Expression::Eq(op)
2626 | Expression::Neq(op)
2627 | Expression::Lt(op)
2628 | Expression::Lte(op)
2629 | Expression::Gt(op)
2630 | Expression::Gte(op)
2631 | Expression::Add(op)
2632 | Expression::Sub(op)
2633 | Expression::Mul(op)
2634 | Expression::Div(op)
2635 | Expression::Mod(op)
2636 | Expression::BitwiseAnd(op)
2637 | Expression::BitwiseOr(op)
2638 | Expression::BitwiseXor(op)
2639 | Expression::BitwiseLeftShift(op)
2640 | Expression::BitwiseRightShift(op)
2641 | Expression::Concat(op)
2642 | Expression::Adjacent(op)
2643 | Expression::TsMatch(op)
2644 | Expression::PropertyEQ(op)
2645 | Expression::ArrayContainsAll(op)
2646 | Expression::ArrayContainedBy(op)
2647 | Expression::ArrayOverlaps(op)
2648 | Expression::JSONBContainsAllTopKeys(op)
2649 | Expression::JSONBContainsAnyTopKeys(op)
2650 | Expression::JSONBDeleteAtPath(op)
2651 | Expression::ExtendsLeft(op)
2652 | Expression::ExtendsRight(op)
2653 | Expression::Is(op)
2654 | Expression::MemberOf(op)
2655 | Expression::NullSafeEq(op)
2656 | Expression::NullSafeNeq(op)
2657 | Expression::Glob(op)
2658 | Expression::Match(op) => {
2659 stack.push(&op.left);
2660 stack.push(&op.right);
2661 }
2662
2663 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
2665 stack.push(&u.this);
2666 }
2667
2668 Expression::Upper(f)
2670 | Expression::Lower(f)
2671 | Expression::Length(f)
2672 | Expression::LTrim(f)
2673 | Expression::RTrim(f)
2674 | Expression::Reverse(f)
2675 | Expression::Abs(f)
2676 | Expression::Sqrt(f)
2677 | Expression::Cbrt(f)
2678 | Expression::Ln(f)
2679 | Expression::Exp(f)
2680 | Expression::Sign(f)
2681 | Expression::Date(f)
2682 | Expression::Time(f)
2683 | Expression::DateFromUnixDate(f)
2684 | Expression::UnixDate(f)
2685 | Expression::UnixSeconds(f)
2686 | Expression::UnixMillis(f)
2687 | Expression::UnixMicros(f)
2688 | Expression::TimeStrToDate(f)
2689 | Expression::DateToDi(f)
2690 | Expression::DiToDate(f)
2691 | Expression::TsOrDiToDi(f)
2692 | Expression::TsOrDsToDatetime(f)
2693 | Expression::TsOrDsToTimestamp(f)
2694 | Expression::YearOfWeek(f)
2695 | Expression::YearOfWeekIso(f)
2696 | Expression::Initcap(f)
2697 | Expression::Ascii(f)
2698 | Expression::Chr(f)
2699 | Expression::Soundex(f)
2700 | Expression::ByteLength(f)
2701 | Expression::Hex(f)
2702 | Expression::LowerHex(f)
2703 | Expression::Unicode(f)
2704 | Expression::Radians(f)
2705 | Expression::Degrees(f)
2706 | Expression::Sin(f)
2707 | Expression::Cos(f)
2708 | Expression::Tan(f)
2709 | Expression::Asin(f)
2710 | Expression::Acos(f)
2711 | Expression::Atan(f)
2712 | Expression::IsNan(f)
2713 | Expression::IsInf(f)
2714 | Expression::ArrayLength(f)
2715 | Expression::ArraySize(f)
2716 | Expression::Cardinality(f)
2717 | Expression::ArrayReverse(f)
2718 | Expression::ArrayDistinct(f)
2719 | Expression::ArrayFlatten(f)
2720 | Expression::ArrayCompact(f)
2721 | Expression::Explode(f)
2722 | Expression::ExplodeOuter(f)
2723 | Expression::ToArray(f)
2724 | Expression::MapFromEntries(f)
2725 | Expression::MapKeys(f)
2726 | Expression::MapValues(f)
2727 | Expression::JsonArrayLength(f)
2728 | Expression::JsonKeys(f)
2729 | Expression::JsonType(f)
2730 | Expression::ParseJson(f)
2731 | Expression::ToJson(f)
2732 | Expression::Typeof(f)
2733 | Expression::BitwiseCount(f)
2734 | Expression::Year(f)
2735 | Expression::Month(f)
2736 | Expression::Day(f)
2737 | Expression::Hour(f)
2738 | Expression::Minute(f)
2739 | Expression::Second(f)
2740 | Expression::DayOfWeek(f)
2741 | Expression::DayOfWeekIso(f)
2742 | Expression::DayOfMonth(f)
2743 | Expression::DayOfYear(f)
2744 | Expression::WeekOfYear(f)
2745 | Expression::Quarter(f)
2746 | Expression::Epoch(f)
2747 | Expression::EpochMs(f)
2748 | Expression::TimeStrToUnix(f)
2749 | Expression::SHA(f)
2750 | Expression::SHA1Digest(f)
2751 | Expression::TimeToUnix(f)
2752 | Expression::JSONBool(f)
2753 | Expression::Int64(f)
2754 | Expression::MD5NumberLower64(f)
2755 | Expression::MD5NumberUpper64(f)
2756 | Expression::DateStrToDate(f)
2757 | Expression::DateToDateStr(f) => {
2758 stack.push(&f.this);
2759 }
2760
2761 Expression::Power(f)
2763 | Expression::NullIf(f)
2764 | Expression::IfNull(f)
2765 | Expression::Nvl(f)
2766 | Expression::UnixToTimeStr(f)
2767 | Expression::Contains(f)
2768 | Expression::StartsWith(f)
2769 | Expression::EndsWith(f)
2770 | Expression::Levenshtein(f)
2771 | Expression::ModFunc(f)
2772 | Expression::Atan2(f)
2773 | Expression::IntDiv(f)
2774 | Expression::AddMonths(f)
2775 | Expression::MonthsBetween(f)
2776 | Expression::NextDay(f)
2777 | Expression::ArrayContains(f)
2778 | Expression::ArrayPosition(f)
2779 | Expression::ArrayAppend(f)
2780 | Expression::ArrayPrepend(f)
2781 | Expression::ArrayUnion(f)
2782 | Expression::ArrayExcept(f)
2783 | Expression::ArrayRemove(f)
2784 | Expression::StarMap(f)
2785 | Expression::MapFromArrays(f)
2786 | Expression::MapContainsKey(f)
2787 | Expression::ElementAt(f)
2788 | Expression::JsonMergePatch(f)
2789 | Expression::JSONBContains(f)
2790 | Expression::JSONBExtract(f) => {
2791 stack.push(&f.this);
2792 stack.push(&f.expression);
2793 }
2794
2795 Expression::Greatest(f)
2797 | Expression::Least(f)
2798 | Expression::Coalesce(f)
2799 | Expression::ArrayConcat(f)
2800 | Expression::ArrayIntersect(f)
2801 | Expression::ArrayZip(f)
2802 | Expression::MapConcat(f)
2803 | Expression::JsonArray(f) => {
2804 for e in &f.expressions {
2805 stack.push(e);
2806 }
2807 }
2808
2809 Expression::Sum(f)
2811 | Expression::Avg(f)
2812 | Expression::Min(f)
2813 | Expression::Max(f)
2814 | Expression::ArrayAgg(f)
2815 | Expression::CountIf(f)
2816 | Expression::Stddev(f)
2817 | Expression::StddevPop(f)
2818 | Expression::StddevSamp(f)
2819 | Expression::Variance(f)
2820 | Expression::VarPop(f)
2821 | Expression::VarSamp(f)
2822 | Expression::Median(f)
2823 | Expression::Mode(f)
2824 | Expression::First(f)
2825 | Expression::Last(f)
2826 | Expression::AnyValue(f)
2827 | Expression::ApproxDistinct(f)
2828 | Expression::ApproxCountDistinct(f)
2829 | Expression::LogicalAnd(f)
2830 | Expression::LogicalOr(f)
2831 | Expression::Skewness(f)
2832 | Expression::ArrayConcatAgg(f)
2833 | Expression::ArrayUniqueAgg(f)
2834 | Expression::BoolXorAgg(f)
2835 | Expression::BitwiseAndAgg(f)
2836 | Expression::BitwiseOrAgg(f)
2837 | Expression::BitwiseXorAgg(f) => {
2838 stack.push(&f.this);
2839 if let Some(ref filter) = f.filter {
2840 stack.push(filter);
2841 }
2842 if let Some((ref expr, _)) = f.having_max {
2843 stack.push(expr);
2844 }
2845 if let Some(ref limit) = f.limit {
2846 stack.push(limit);
2847 }
2848 }
2849
2850 Expression::Function(func) => {
2852 for arg in &func.args {
2853 stack.push(arg);
2854 }
2855 }
2856 Expression::AggregateFunction(func) => {
2857 for arg in &func.args {
2858 stack.push(arg);
2859 }
2860 if let Some(ref filter) = func.filter {
2861 stack.push(filter);
2862 }
2863 if let Some(ref limit) = func.limit {
2864 stack.push(limit);
2865 }
2866 }
2867
2868 Expression::WindowFunction(wf) => {
2870 stack.push(&wf.this);
2871 for e in &wf.over.partition_by {
2872 stack.push(e);
2873 }
2874 for e in &wf.over.order_by {
2875 stack.push(&e.this);
2876 }
2877 if let Some(keep) = &wf.keep {
2878 for e in &keep.order_by {
2879 stack.push(&e.this);
2880 }
2881 }
2882 if let (Some(window_name), Some(named_windows)) =
2883 (&wf.over.window_name, named_windows)
2884 {
2885 for named_window in named_windows {
2886 if named_window
2887 .name
2888 .name
2889 .eq_ignore_ascii_case(&window_name.name)
2890 {
2891 for e in &named_window.spec.partition_by {
2892 stack.push(e);
2893 }
2894 for e in &named_window.spec.order_by {
2895 stack.push(&e.this);
2896 }
2897 }
2898 }
2899 }
2900 }
2901
2902 Expression::Alias(a) => {
2904 stack.push(&a.this);
2905 }
2906 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
2907 stack.push(&c.this);
2908 if let Some(ref fmt) = c.format {
2909 stack.push(fmt);
2910 }
2911 if let Some(ref def) = c.default {
2912 stack.push(def);
2913 }
2914 }
2915 Expression::Paren(p) => {
2916 stack.push(&p.this);
2917 }
2918 Expression::Annotated(a) => {
2919 stack.push(&a.this);
2920 }
2921 Expression::Case(case) => {
2922 if let Some(ref operand) = case.operand {
2923 stack.push(operand);
2924 }
2925 for (cond, result) in &case.whens {
2926 stack.push(cond);
2927 stack.push(result);
2928 }
2929 if let Some(ref else_expr) = case.else_ {
2930 stack.push(else_expr);
2931 }
2932 }
2933 Expression::Collation(c) => {
2934 stack.push(&c.this);
2935 }
2936 Expression::In(i) => {
2937 stack.push(&i.this);
2938 for e in &i.expressions {
2939 stack.push(e);
2940 }
2941 if let Some(ref q) = i.query {
2942 stack.push(q);
2943 }
2944 if let Some(ref u) = i.unnest {
2945 stack.push(u);
2946 }
2947 }
2948 Expression::Between(b) => {
2949 stack.push(&b.this);
2950 stack.push(&b.low);
2951 stack.push(&b.high);
2952 }
2953 Expression::IsNull(n) => {
2954 stack.push(&n.this);
2955 }
2956 Expression::IsTrue(t) | Expression::IsFalse(t) => {
2957 stack.push(&t.this);
2958 }
2959 Expression::IsJson(j) => {
2960 stack.push(&j.this);
2961 }
2962 Expression::Like(l) | Expression::ILike(l) => {
2963 stack.push(&l.left);
2964 stack.push(&l.right);
2965 if let Some(ref esc) = l.escape {
2966 stack.push(esc);
2967 }
2968 }
2969 Expression::SimilarTo(s) => {
2970 stack.push(&s.this);
2971 stack.push(&s.pattern);
2972 if let Some(ref esc) = s.escape {
2973 stack.push(esc);
2974 }
2975 }
2976 Expression::Ordered(o) => {
2977 stack.push(&o.this);
2978 }
2979 Expression::Array(a) => {
2980 for e in &a.expressions {
2981 stack.push(e);
2982 }
2983 }
2984 Expression::Tuple(t) => {
2985 for e in &t.expressions {
2986 stack.push(e);
2987 }
2988 }
2989 Expression::Struct(s) => {
2990 for (_, e) in &s.fields {
2991 stack.push(e);
2992 }
2993 }
2994 Expression::Subscript(s) => {
2995 stack.push(&s.this);
2996 stack.push(&s.index);
2997 }
2998 Expression::Dot(d) => {
2999 stack.push(&d.this);
3000 }
3001 Expression::MethodCall(m) => {
3002 if !matches!(dialect, Some(DialectType::BigQuery))
3003 || !is_bigquery_safe_namespace_receiver(&m.this)
3004 {
3005 stack.push(&m.this);
3006 }
3007 for arg in &m.args {
3008 stack.push(arg);
3009 }
3010 }
3011 Expression::ArraySlice(s) => {
3012 stack.push(&s.this);
3013 if let Some(ref start) = s.start {
3014 stack.push(start);
3015 }
3016 if let Some(ref end) = s.end {
3017 stack.push(end);
3018 }
3019 }
3020 Expression::Lambda(l) => {
3021 stack.push(&l.body);
3022 }
3023 Expression::NamedArgument(n) => {
3024 stack.push(&n.value);
3025 }
3026 Expression::Lateral(l) => {
3027 stack.push(&l.this);
3028 if let Some(ref view) = l.view {
3029 stack.push(view);
3030 }
3031 if let Some(ref outer) = l.outer {
3032 stack.push(outer);
3033 }
3034 if let Some(ref ordinality) = l.ordinality {
3035 stack.push(ordinality);
3036 }
3037 }
3038 Expression::LateralView(lv) => {
3039 stack.push(&lv.this);
3040 }
3041 Expression::TryCatch(t) => {
3042 for stmt in &t.try_body {
3043 stack.push(stmt);
3044 }
3045 if let Some(catch_body) = &t.catch_body {
3046 for stmt in catch_body {
3047 stack.push(stmt);
3048 }
3049 }
3050 }
3051 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
3052 stack.push(e);
3053 }
3054
3055 Expression::Substring(f) => {
3057 stack.push(&f.this);
3058 stack.push(&f.start);
3059 if let Some(ref len) = f.length {
3060 stack.push(len);
3061 }
3062 }
3063 Expression::Trim(f) => {
3064 stack.push(&f.this);
3065 if let Some(ref chars) = f.characters {
3066 stack.push(chars);
3067 }
3068 }
3069 Expression::Replace(f) => {
3070 stack.push(&f.this);
3071 stack.push(&f.old);
3072 stack.push(&f.new);
3073 }
3074 Expression::IfFunc(f) => {
3075 stack.push(&f.condition);
3076 stack.push(&f.true_value);
3077 if let Some(ref fv) = f.false_value {
3078 stack.push(fv);
3079 }
3080 }
3081 Expression::Nvl2(f) => {
3082 stack.push(&f.this);
3083 stack.push(&f.true_value);
3084 stack.push(&f.false_value);
3085 }
3086 Expression::ConcatWs(f) => {
3087 stack.push(&f.separator);
3088 for e in &f.expressions {
3089 stack.push(e);
3090 }
3091 }
3092 Expression::Count(f) => {
3093 if let Some(ref this) = f.this {
3094 stack.push(this);
3095 }
3096 if let Some(ref filter) = f.filter {
3097 stack.push(filter);
3098 }
3099 }
3100 Expression::GroupConcat(f) => {
3101 stack.push(&f.this);
3102 if let Some(ref sep) = f.separator {
3103 stack.push(sep);
3104 }
3105 if let Some(ref filter) = f.filter {
3106 stack.push(filter);
3107 }
3108 }
3109 Expression::StringAgg(f) => {
3110 stack.push(&f.this);
3111 if let Some(ref sep) = f.separator {
3112 stack.push(sep);
3113 }
3114 if let Some(ref filter) = f.filter {
3115 stack.push(filter);
3116 }
3117 if let Some(ref limit) = f.limit {
3118 stack.push(limit);
3119 }
3120 }
3121 Expression::ListAgg(f) => {
3122 stack.push(&f.this);
3123 if let Some(ref sep) = f.separator {
3124 stack.push(sep);
3125 }
3126 if let Some(ref filter) = f.filter {
3127 stack.push(filter);
3128 }
3129 }
3130 Expression::SumIf(f) => {
3131 stack.push(&f.this);
3132 stack.push(&f.condition);
3133 if let Some(ref filter) = f.filter {
3134 stack.push(filter);
3135 }
3136 }
3137 Expression::DateAdd(f) | Expression::DateSub(f) => {
3138 stack.push(&f.this);
3139 stack.push(&f.interval);
3140 }
3141 Expression::DateDiff(f) => {
3142 stack.push(&f.this);
3143 stack.push(&f.expression);
3144 }
3145 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
3146 stack.push(&f.this);
3147 }
3148 Expression::Extract(f) => {
3149 stack.push(&f.this);
3150 }
3151 Expression::Round(f) => {
3152 stack.push(&f.this);
3153 if let Some(ref d) = f.decimals {
3154 stack.push(d);
3155 }
3156 }
3157 Expression::Floor(f) => {
3158 stack.push(&f.this);
3159 if let Some(ref s) = f.scale {
3160 stack.push(s);
3161 }
3162 if let Some(ref t) = f.to {
3163 stack.push(t);
3164 }
3165 }
3166 Expression::Ceil(f) => {
3167 stack.push(&f.this);
3168 if let Some(ref d) = f.decimals {
3169 stack.push(d);
3170 }
3171 if let Some(ref t) = f.to {
3172 stack.push(t);
3173 }
3174 }
3175 Expression::Log(f) => {
3176 stack.push(&f.this);
3177 if let Some(ref b) = f.base {
3178 stack.push(b);
3179 }
3180 }
3181 Expression::AtTimeZone(f) => {
3182 stack.push(&f.this);
3183 stack.push(&f.zone);
3184 }
3185 Expression::Lead(f) | Expression::Lag(f) => {
3186 stack.push(&f.this);
3187 if let Some(ref off) = f.offset {
3188 stack.push(off);
3189 }
3190 if let Some(ref def) = f.default {
3191 stack.push(def);
3192 }
3193 }
3194 Expression::FirstValue(f) | Expression::LastValue(f) => {
3195 stack.push(&f.this);
3196 }
3197 Expression::NthValue(f) => {
3198 stack.push(&f.this);
3199 stack.push(&f.offset);
3200 }
3201 Expression::Position(f) => {
3202 stack.push(&f.substring);
3203 stack.push(&f.string);
3204 if let Some(ref start) = f.start {
3205 stack.push(start);
3206 }
3207 }
3208 Expression::Decode(f) => {
3209 stack.push(&f.this);
3210 for (search, result) in &f.search_results {
3211 stack.push(search);
3212 stack.push(result);
3213 }
3214 if let Some(ref def) = f.default {
3215 stack.push(def);
3216 }
3217 }
3218 Expression::CharFunc(f) => {
3219 for arg in &f.args {
3220 stack.push(arg);
3221 }
3222 }
3223 Expression::ArraySort(f) => {
3224 stack.push(&f.this);
3225 if let Some(ref cmp) = f.comparator {
3226 stack.push(cmp);
3227 }
3228 }
3229 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
3230 stack.push(&f.this);
3231 stack.push(&f.separator);
3232 if let Some(ref nr) = f.null_replacement {
3233 stack.push(nr);
3234 }
3235 }
3236 Expression::ArrayFilter(f) => {
3237 stack.push(&f.this);
3238 stack.push(&f.filter);
3239 }
3240 Expression::ArrayTransform(f) => {
3241 stack.push(&f.this);
3242 stack.push(&f.transform);
3243 }
3244 Expression::Sequence(f)
3245 | Expression::Generate(f)
3246 | Expression::ExplodingGenerateSeries(f) => {
3247 stack.push(&f.start);
3248 stack.push(&f.stop);
3249 if let Some(ref step) = f.step {
3250 stack.push(step);
3251 }
3252 }
3253 Expression::JsonExtract(f)
3254 | Expression::JsonExtractScalar(f)
3255 | Expression::JsonQuery(f)
3256 | Expression::JsonValue(f) => {
3257 stack.push(&f.this);
3258 stack.push(&f.path);
3259 }
3260 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
3261 stack.push(&f.this);
3262 for p in &f.paths {
3263 stack.push(p);
3264 }
3265 }
3266 Expression::JsonObject(f) => {
3267 for (k, v) in &f.pairs {
3268 stack.push(k);
3269 stack.push(v);
3270 }
3271 }
3272 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
3273 stack.push(&f.this);
3274 for (path, val) in &f.path_values {
3275 stack.push(path);
3276 stack.push(val);
3277 }
3278 }
3279 Expression::Overlay(f) => {
3280 stack.push(&f.this);
3281 stack.push(&f.replacement);
3282 stack.push(&f.from);
3283 if let Some(ref len) = f.length {
3284 stack.push(len);
3285 }
3286 }
3287 Expression::Convert(f) => {
3288 stack.push(&f.this);
3289 if let Some(ref style) = f.style {
3290 stack.push(style);
3291 }
3292 }
3293 Expression::ApproxPercentile(f) => {
3294 stack.push(&f.this);
3295 stack.push(&f.percentile);
3296 if let Some(ref acc) = f.accuracy {
3297 stack.push(acc);
3298 }
3299 if let Some(ref filter) = f.filter {
3300 stack.push(filter);
3301 }
3302 }
3303 Expression::Percentile(f)
3304 | Expression::PercentileCont(f)
3305 | Expression::PercentileDisc(f) => {
3306 stack.push(&f.this);
3307 stack.push(&f.percentile);
3308 if let Some(ref filter) = f.filter {
3309 stack.push(filter);
3310 }
3311 }
3312 Expression::WithinGroup(f) => {
3313 stack.push(&f.this);
3314 for e in &f.order_by {
3315 stack.push(&e.this);
3316 }
3317 }
3318 Expression::Left(f) | Expression::Right(f) => {
3319 stack.push(&f.this);
3320 stack.push(&f.length);
3321 }
3322 Expression::Repeat(f) => {
3323 stack.push(&f.this);
3324 stack.push(&f.times);
3325 }
3326 Expression::Lpad(f) | Expression::Rpad(f) => {
3327 stack.push(&f.this);
3328 stack.push(&f.length);
3329 if let Some(ref fill) = f.fill {
3330 stack.push(fill);
3331 }
3332 }
3333 Expression::Split(f) => {
3334 stack.push(&f.this);
3335 stack.push(&f.delimiter);
3336 }
3337 Expression::RegexpLike(f) => {
3338 stack.push(&f.this);
3339 stack.push(&f.pattern);
3340 if let Some(ref flags) = f.flags {
3341 stack.push(flags);
3342 }
3343 }
3344 Expression::RegexpReplace(f) => {
3345 stack.push(&f.this);
3346 stack.push(&f.pattern);
3347 stack.push(&f.replacement);
3348 if let Some(ref flags) = f.flags {
3349 stack.push(flags);
3350 }
3351 }
3352 Expression::RegexpExtract(f) => {
3353 stack.push(&f.this);
3354 stack.push(&f.pattern);
3355 if let Some(ref group) = f.group {
3356 stack.push(group);
3357 }
3358 }
3359 Expression::ToDate(f) => {
3360 stack.push(&f.this);
3361 if let Some(ref fmt) = f.format {
3362 stack.push(fmt);
3363 }
3364 }
3365 Expression::ToTimestamp(f) => {
3366 stack.push(&f.this);
3367 if let Some(ref fmt) = f.format {
3368 stack.push(fmt);
3369 }
3370 }
3371 Expression::DateFormat(f) | Expression::FormatDate(f) => {
3372 stack.push(&f.this);
3373 stack.push(&f.format);
3374 }
3375 Expression::LastDay(f) => {
3376 stack.push(&f.this);
3377 }
3378 Expression::FromUnixtime(f) => {
3379 stack.push(&f.this);
3380 if let Some(ref fmt) = f.format {
3381 stack.push(fmt);
3382 }
3383 }
3384 Expression::UnixTimestamp(f) => {
3385 if let Some(ref this) = f.this {
3386 stack.push(this);
3387 }
3388 if let Some(ref fmt) = f.format {
3389 stack.push(fmt);
3390 }
3391 }
3392 Expression::MakeDate(f) => {
3393 stack.push(&f.year);
3394 stack.push(&f.month);
3395 stack.push(&f.day);
3396 }
3397 Expression::MakeTimestamp(f) => {
3398 stack.push(&f.year);
3399 stack.push(&f.month);
3400 stack.push(&f.day);
3401 stack.push(&f.hour);
3402 stack.push(&f.minute);
3403 stack.push(&f.second);
3404 if let Some(ref tz) = f.timezone {
3405 stack.push(tz);
3406 }
3407 }
3408 Expression::TruncFunc(f) => {
3409 stack.push(&f.this);
3410 if let Some(ref d) = f.decimals {
3411 stack.push(d);
3412 }
3413 }
3414 Expression::ArrayFunc(f) => {
3415 for e in &f.expressions {
3416 stack.push(e);
3417 }
3418 }
3419 Expression::Unnest(f) => {
3420 stack.push(&f.this);
3421 for e in &f.expressions {
3422 stack.push(e);
3423 }
3424 }
3425 Expression::StructFunc(f) => {
3426 for (_, e) in &f.fields {
3427 stack.push(e);
3428 }
3429 }
3430 Expression::StructExtract(f) => {
3431 stack.push(&f.this);
3432 }
3433 Expression::NamedStruct(f) => {
3434 for (k, v) in &f.pairs {
3435 stack.push(k);
3436 stack.push(v);
3437 }
3438 }
3439 Expression::MapFunc(f) => {
3440 for k in &f.keys {
3441 stack.push(k);
3442 }
3443 for v in &f.values {
3444 stack.push(v);
3445 }
3446 }
3447 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
3448 stack.push(&f.this);
3449 stack.push(&f.transform);
3450 }
3451 Expression::JsonArrayAgg(f) => {
3452 stack.push(&f.this);
3453 if let Some(ref filter) = f.filter {
3454 stack.push(filter);
3455 }
3456 }
3457 Expression::JsonObjectAgg(f) => {
3458 stack.push(&f.key);
3459 stack.push(&f.value);
3460 if let Some(ref filter) = f.filter {
3461 stack.push(filter);
3462 }
3463 }
3464 Expression::NTile(f) => {
3465 if let Some(ref n) = f.num_buckets {
3466 stack.push(n);
3467 }
3468 }
3469 Expression::Rand(f) => {
3470 if let Some(ref s) = f.seed {
3471 stack.push(s);
3472 }
3473 if let Some(ref lo) = f.lower {
3474 stack.push(lo);
3475 }
3476 if let Some(ref hi) = f.upper {
3477 stack.push(hi);
3478 }
3479 }
3480 Expression::Any(q) | Expression::All(q) => {
3481 stack.push(&q.this);
3482 stack.push(&q.subquery);
3483 }
3484 Expression::Overlaps(o) => {
3485 if let Some(ref this) = o.this {
3486 stack.push(this);
3487 }
3488 if let Some(ref expr) = o.expression {
3489 stack.push(expr);
3490 }
3491 if let Some(ref ls) = o.left_start {
3492 stack.push(ls);
3493 }
3494 if let Some(ref le) = o.left_end {
3495 stack.push(le);
3496 }
3497 if let Some(ref rs) = o.right_start {
3498 stack.push(rs);
3499 }
3500 if let Some(ref re) = o.right_end {
3501 stack.push(re);
3502 }
3503 }
3504 Expression::Interval(i) => {
3505 if let Some(ref this) = i.this {
3506 stack.push(this);
3507 }
3508 }
3509 Expression::TimeStrToTime(f) => {
3510 stack.push(&f.this);
3511 if let Some(ref zone) = f.zone {
3512 stack.push(zone);
3513 }
3514 }
3515 Expression::JSONBExtractScalar(f) => {
3516 stack.push(&f.this);
3517 stack.push(&f.expression);
3518 if let Some(ref jt) = f.json_type {
3519 stack.push(jt);
3520 }
3521 }
3522 Expression::JSONExtract(f) => {
3523 stack.push(&f.this);
3524 stack.push(&f.expression);
3525 for e in &f.expressions {
3526 stack.push(e);
3527 }
3528 if let Some(ref option) = f.option {
3529 stack.push(option);
3530 }
3531 if let Some(ref on_condition) = f.on_condition {
3532 stack.push(on_condition);
3533 }
3534 }
3535
3536 _ => {}
3541 }
3542 }
3543}
3544
3545#[cfg(test)]
3550mod tests {
3551 use super::*;
3552 use crate::dialects::{Dialect, DialectType};
3553 use crate::expressions::DataType;
3554 use crate::optimizer::annotate_types::annotate_types;
3555 use crate::parse_one;
3556 use crate::schema::{MappingSchema, Schema};
3557
3558 fn parse(sql: &str) -> Expression {
3559 let dialect = Dialect::get(DialectType::Generic);
3560 let ast = dialect.parse(sql).unwrap();
3561 ast.into_iter().next().unwrap()
3562 }
3563
3564 fn parse_dialect(sql: &str, dialect_type: DialectType) -> Expression {
3565 let dialect = Dialect::get(dialect_type);
3566 let ast = dialect.parse(sql).unwrap();
3567 ast.into_iter().next().unwrap()
3568 }
3569
3570 fn lineage_names(node: &LineageNode) -> Vec<String> {
3571 node.walk().map(|n| n.name.clone()).collect()
3572 }
3573
3574 fn assert_lineage_contains(node: &LineageNode, expected: &str) {
3575 let names = lineage_names(node);
3576 assert!(
3577 names.iter().any(|name| name == expected),
3578 "expected {expected} in lineage, got {names:?}"
3579 );
3580 }
3581
3582 #[test]
3583 fn test_simple_lineage() {
3584 let expr = parse("SELECT a FROM t");
3585 let node = lineage("a", &expr, None, false).unwrap();
3586
3587 assert_eq!(node.name, "a");
3588 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
3589 let names = node.downstream_names();
3591 assert!(
3592 names.iter().any(|n| n == "t.a"),
3593 "Expected t.a in downstream, got: {:?}",
3594 names
3595 );
3596 }
3597
3598 #[test]
3599 fn test_lineage_walk() {
3600 let root = LineageNode {
3601 name: "col_a".to_string(),
3602 expression: Expression::Null(crate::expressions::Null),
3603 source: Expression::Null(crate::expressions::Null),
3604 downstream: vec![LineageNode::new(
3605 "t.a",
3606 Expression::Null(crate::expressions::Null),
3607 Expression::Null(crate::expressions::Null),
3608 )],
3609 source_name: String::new(),
3610 source_kind: SourceKind::Unknown,
3611 source_alias: None,
3612 reference_node_name: String::new(),
3613 };
3614
3615 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
3616 assert_eq!(names.len(), 2);
3617 assert_eq!(names[0], "col_a");
3618 assert_eq!(names[1], "t.a");
3619 }
3620
3621 #[test]
3622 fn test_aliased_column() {
3623 let expr = parse("SELECT a + 1 AS b FROM t");
3624 let node = lineage("b", &expr, None, false).unwrap();
3625
3626 assert_eq!(node.name, "b");
3627 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3629 assert!(
3630 all_names.iter().any(|n| n.contains("a")),
3631 "Expected to trace to column a, got: {:?}",
3632 all_names
3633 );
3634 }
3635
3636 #[test]
3637 fn test_qualified_column() {
3638 let expr = parse("SELECT t.a FROM t");
3639 let node = lineage("a", &expr, None, false).unwrap();
3640
3641 assert_eq!(node.name, "a");
3642 let names = node.downstream_names();
3643 assert!(
3644 names.iter().any(|n| n == "t.a"),
3645 "Expected t.a, got: {:?}",
3646 names
3647 );
3648 }
3649
3650 #[test]
3651 fn test_unqualified_column() {
3652 let expr = parse("SELECT a FROM t");
3653 let node = lineage("a", &expr, None, false).unwrap();
3654
3655 let names = node.downstream_names();
3657 assert!(
3658 names.iter().any(|n| n == "t.a"),
3659 "Expected t.a, got: {:?}",
3660 names
3661 );
3662 }
3663
3664 #[test]
3665 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
3666 let query = "SELECT name FROM users";
3667 let dialect = Dialect::get(DialectType::BigQuery);
3668 let expr = dialect
3669 .parse(query)
3670 .unwrap()
3671 .into_iter()
3672 .next()
3673 .expect("expected one expression");
3674
3675 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3676 schema
3677 .add_table("users", &[("name".into(), DataType::Text)], None)
3678 .expect("schema setup");
3679
3680 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
3681 .expect("lineage without schema");
3682 let mut expr_without = node_without_schema.expression.clone();
3683 annotate_types(
3684 &mut expr_without,
3685 Some(&schema),
3686 Some(DialectType::BigQuery),
3687 );
3688 assert_eq!(
3689 expr_without.inferred_type(),
3690 None,
3691 "Expected unresolved root type without schema-aware lineage qualification"
3692 );
3693
3694 let node_with_schema = lineage_with_schema(
3695 "name",
3696 &expr,
3697 Some(&schema),
3698 Some(DialectType::BigQuery),
3699 false,
3700 )
3701 .expect("lineage with schema");
3702 let mut expr_with = node_with_schema.expression.clone();
3703 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
3704
3705 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
3706 }
3707
3708 #[test]
3709 fn test_lineage_with_schema_correlated_scalar_subquery() {
3710 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
3711 let dialect = Dialect::get(DialectType::BigQuery);
3712 let expr = dialect
3713 .parse(query)
3714 .unwrap()
3715 .into_iter()
3716 .next()
3717 .expect("expected one expression");
3718
3719 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3720 schema
3721 .add_table(
3722 "t1",
3723 &[("id".into(), DataType::BigInt { length: None })],
3724 None,
3725 )
3726 .expect("schema setup");
3727 schema
3728 .add_table(
3729 "t2",
3730 &[
3731 ("id".into(), DataType::BigInt { length: None }),
3732 ("val".into(), DataType::BigInt { length: None }),
3733 ],
3734 None,
3735 )
3736 .expect("schema setup");
3737
3738 let node = lineage_with_schema(
3739 "id",
3740 &expr,
3741 Some(&schema),
3742 Some(DialectType::BigQuery),
3743 false,
3744 )
3745 .expect("lineage_with_schema should handle correlated scalar subqueries");
3746
3747 assert_eq!(node.name, "id");
3748 }
3749
3750 #[test]
3751 fn test_lineage_with_schema_join_using() {
3752 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
3753 let dialect = Dialect::get(DialectType::BigQuery);
3754 let expr = dialect
3755 .parse(query)
3756 .unwrap()
3757 .into_iter()
3758 .next()
3759 .expect("expected one expression");
3760
3761 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3762 schema
3763 .add_table(
3764 "t1",
3765 &[("a".into(), DataType::BigInt { length: None })],
3766 None,
3767 )
3768 .expect("schema setup");
3769 schema
3770 .add_table(
3771 "t2",
3772 &[("a".into(), DataType::BigInt { length: None })],
3773 None,
3774 )
3775 .expect("schema setup");
3776
3777 let node = lineage_with_schema(
3778 "a",
3779 &expr,
3780 Some(&schema),
3781 Some(DialectType::BigQuery),
3782 false,
3783 )
3784 .expect("lineage_with_schema should handle JOIN USING");
3785
3786 assert_eq!(node.name, "a");
3787 }
3788
3789 #[test]
3790 fn test_lineage_with_schema_qualified_table_name() {
3791 let query = "SELECT a FROM raw.t1";
3792 let dialect = Dialect::get(DialectType::BigQuery);
3793 let expr = dialect
3794 .parse(query)
3795 .unwrap()
3796 .into_iter()
3797 .next()
3798 .expect("expected one expression");
3799
3800 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3801 schema
3802 .add_table(
3803 "raw.t1",
3804 &[("a".into(), DataType::BigInt { length: None })],
3805 None,
3806 )
3807 .expect("schema setup");
3808
3809 let node = lineage_with_schema(
3810 "a",
3811 &expr,
3812 Some(&schema),
3813 Some(DialectType::BigQuery),
3814 false,
3815 )
3816 .expect("lineage_with_schema should handle dotted schema.table names");
3817
3818 assert_eq!(node.name, "a");
3819 }
3820
3821 #[test]
3822 fn test_lineage_with_schema_none_matches_lineage() {
3823 let expr = parse("SELECT a FROM t");
3824 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
3825 let with_none =
3826 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
3827
3828 assert_eq!(with_none.name, baseline.name);
3829 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
3830 }
3831
3832 #[test]
3833 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
3834 let dialect = Dialect::get(DialectType::BigQuery);
3835 let expr = dialect
3836 .parse("SELECT Name AS name FROM teams")
3837 .unwrap()
3838 .into_iter()
3839 .next()
3840 .expect("expected one expression");
3841
3842 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3843 schema
3844 .add_table(
3845 "teams",
3846 &[("Name".into(), DataType::String { length: None })],
3847 None,
3848 )
3849 .expect("schema setup");
3850
3851 let node = lineage_with_schema(
3852 "name",
3853 &expr,
3854 Some(&schema),
3855 Some(DialectType::BigQuery),
3856 false,
3857 )
3858 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
3859
3860 let names = node.downstream_names();
3861 assert!(
3862 names.iter().any(|n| n == "teams.Name"),
3863 "Expected teams.Name in downstream, got: {:?}",
3864 names
3865 );
3866 }
3867
3868 #[test]
3869 fn test_lineage_bigquery_mixed_case_alias_lookup() {
3870 let dialect = Dialect::get(DialectType::BigQuery);
3871 let expr = dialect
3872 .parse("SELECT Name AS Name FROM teams")
3873 .unwrap()
3874 .into_iter()
3875 .next()
3876 .expect("expected one expression");
3877
3878 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
3879 .expect("lineage should resolve mixed-case aliases in BigQuery");
3880
3881 assert_eq!(node.name, "name");
3882 }
3883
3884 #[test]
3885 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
3886 let expr = parse_one(
3887 r#"
3888SELECT date_val AS week_start
3889FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
3890"#,
3891 DialectType::BigQuery,
3892 )
3893 .expect("parse");
3894
3895 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
3896 .expect("lineage should resolve UNNEST alias as a source");
3897 let child = node
3898 .downstream
3899 .first()
3900 .expect("week_start should have downstream lineage");
3901
3902 assert_eq!(child.name, "_0.date_val");
3903 assert_eq!(child.source_name, "_0");
3904 assert_eq!(child.source_kind, SourceKind::Virtual);
3905 assert_eq!(child.source_alias.as_deref(), Some("date_val"));
3906
3907 let Expression::Column(column) = &child.expression else {
3908 panic!(
3909 "expected downstream column expression, got {:?}",
3910 child.expression
3911 );
3912 };
3913 assert_eq!(column.name.name, "date_val");
3914 assert_eq!(
3915 column.table.as_ref().map(|table| table.name.as_str()),
3916 Some("_0")
3917 );
3918 assert!(
3919 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
3920 "expected UNNEST source expression, got {:?}",
3921 child.source
3922 );
3923 }
3924
3925 #[test]
3926 fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
3927 let expr =
3928 parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
3929
3930 let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3931 let child = node.downstream.first().expect("id should have lineage");
3932
3933 assert_eq!(child.name, "date_val.id");
3934 assert_eq!(child.source_name, "date_val");
3935 assert_eq!(child.source_kind, SourceKind::Table);
3936 assert_eq!(child.source_alias, None);
3937 }
3938
3939 #[test]
3940 fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
3941 let expr = parse_one(
3942 r#"
3943SELECT a.a AS first_value, b.b AS second_value
3944FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
3945JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
3946"#,
3947 DialectType::BigQuery,
3948 )
3949 .expect("parse");
3950
3951 let first =
3952 lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3953 let second =
3954 lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3955
3956 let first_child = first.downstream.first().expect("first source");
3957 let second_child = second.downstream.first().expect("second source");
3958
3959 assert_eq!(first_child.name, "_0.a");
3960 assert_eq!(first_child.source_name, "_0");
3961 assert_eq!(first_child.source_alias.as_deref(), Some("a"));
3962 assert_eq!(first_child.source_kind, SourceKind::Virtual);
3963
3964 assert_eq!(second_child.name, "_1.b");
3965 assert_eq!(second_child.source_name, "_1");
3966 assert_eq!(second_child.source_alias.as_deref(), Some("b"));
3967 assert_eq!(second_child.source_kind, SourceKind::Virtual);
3968 }
3969
3970 #[test]
3971 fn test_lineage_table_backed_unnest_points_to_real_source_column() {
3972 let expr = parse_one(
3973 r#"
3974SELECT item.item AS item
3975FROM t JOIN UNNEST(t.items) AS item ON TRUE
3976"#,
3977 DialectType::BigQuery,
3978 )
3979 .expect("parse");
3980
3981 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3982 let virtual_child = node.downstream.first().expect("virtual item source");
3983 assert_eq!(virtual_child.name, "_0.item");
3984 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3985
3986 let real_child = virtual_child
3987 .downstream
3988 .first()
3989 .expect("UNNEST(t.items) should depend on t.items");
3990 assert_eq!(real_child.name, "t.items");
3991 assert_eq!(real_child.source_name, "t");
3992 assert_eq!(real_child.source_kind, SourceKind::Table);
3993 }
3994
3995 #[test]
3996 fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
3997 let expr = parse_one(
3998 r#"
3999SELECT item AS item
4000FROM t JOIN UNNEST(t.items) AS item ON TRUE
4001"#,
4002 DialectType::BigQuery,
4003 )
4004 .expect("parse");
4005
4006 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4007 let virtual_child = node.downstream.first().expect("virtual item source");
4008 assert_eq!(virtual_child.name, "_0.item");
4009 assert_eq!(virtual_child.source_name, "_0");
4010 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4011 assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
4012
4013 let real_child = virtual_child
4014 .downstream
4015 .first()
4016 .expect("UNNEST(t.items) should depend on t.items");
4017 assert_eq!(real_child.name, "t.items");
4018 assert_eq!(real_child.source_name, "t");
4019 assert_eq!(real_child.source_kind, SourceKind::Table);
4020 }
4021
4022 #[test]
4023 fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
4024 let cases = [
4025 (
4026 DialectType::PostgreSQL,
4027 "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
4028 ),
4029 (
4030 DialectType::Presto,
4031 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
4032 ),
4033 (
4034 DialectType::Trino,
4035 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
4036 ),
4037 ];
4038
4039 for (dialect, sql) in cases {
4040 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
4041 let node = lineage("out", &expr, Some(dialect), false)
4042 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
4043 let virtual_child = node
4044 .downstream
4045 .first()
4046 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
4047
4048 assert_eq!(
4049 virtual_child.name, "_0.x",
4050 "unexpected virtual child for {dialect:?}"
4051 );
4052 assert_eq!(virtual_child.source_name, "_0");
4053 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4054 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
4055
4056 let real_child = virtual_child
4057 .downstream
4058 .first()
4059 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
4060 assert_eq!(real_child.name, "t.items");
4061 assert_eq!(real_child.source_kind, SourceKind::Table);
4062 }
4063 }
4064
4065 #[test]
4066 fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
4067 let cases = [
4068 (
4069 DialectType::Spark,
4070 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
4071 ),
4072 (
4073 DialectType::Hive,
4074 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
4075 ),
4076 ];
4077
4078 for (dialect, sql) in cases {
4079 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
4080 let node = lineage("out", &expr, Some(dialect), false)
4081 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
4082 let virtual_child = node
4083 .downstream
4084 .first()
4085 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
4086
4087 assert_eq!(virtual_child.name, "_0.x");
4088 assert_eq!(virtual_child.source_name, "_0");
4089 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4090 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
4091
4092 let real_child = virtual_child
4093 .downstream
4094 .first()
4095 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
4096 assert_eq!(real_child.name, "t.items");
4097 assert_eq!(real_child.source_kind, SourceKind::Table);
4098 }
4099 }
4100
4101 #[test]
4102 fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
4103 let expr = parse_one(
4104 "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
4105 DialectType::Snowflake,
4106 )
4107 .expect("parse");
4108
4109 let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
4110 let virtual_child = node.downstream.first().expect("virtual flatten source");
4111 assert_eq!(virtual_child.name, "_0.value");
4112 assert_eq!(virtual_child.source_name, "_0");
4113 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4114 assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
4115
4116 let real_child = virtual_child
4117 .downstream
4118 .first()
4119 .expect("FLATTEN input should depend on raw_events.payload");
4120 assert_eq!(real_child.name, "raw_events.payload");
4121 assert_eq!(real_child.source_kind, SourceKind::Table);
4122 }
4123
4124 #[test]
4125 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
4126 let expr = parse_one(
4127 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
4128 DialectType::Snowflake,
4129 )
4130 .expect("parse");
4131
4132 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4133 schema
4134 .add_table(
4135 "fact.some_daily_metrics",
4136 &[("date_utc".to_string(), DataType::Date)],
4137 None,
4138 )
4139 .expect("schema setup");
4140
4141 let node = lineage_with_schema(
4142 "recency",
4143 &expr,
4144 Some(&schema),
4145 Some(DialectType::Snowflake),
4146 false,
4147 )
4148 .expect("lineage_with_schema should not treat date part as a column");
4149
4150 let names = node.downstream_names();
4151 assert!(
4152 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4153 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4154 names
4155 );
4156 assert!(
4157 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4158 "Did not expect date part to appear as lineage column, got: {:?}",
4159 names
4160 );
4161 }
4162
4163 #[test]
4164 fn test_snowflake_datediff_parses_to_typed_ast() {
4165 let expr = parse_one(
4166 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
4167 DialectType::Snowflake,
4168 )
4169 .expect("parse");
4170
4171 match expr {
4172 Expression::Select(select) => match &select.expressions[0] {
4173 Expression::Alias(alias) => match &alias.this {
4174 Expression::DateDiff(f) => {
4175 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
4176 }
4177 other => panic!("expected DateDiff, got {other:?}"),
4178 },
4179 other => panic!("expected Alias, got {other:?}"),
4180 },
4181 other => panic!("expected Select, got {other:?}"),
4182 }
4183 }
4184
4185 #[test]
4186 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
4187 let expr = parse_one(
4188 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4189 DialectType::Snowflake,
4190 )
4191 .expect("parse");
4192
4193 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4194 schema
4195 .add_table(
4196 "fact.some_daily_metrics",
4197 &[("date_utc".to_string(), DataType::Date)],
4198 None,
4199 )
4200 .expect("schema setup");
4201
4202 let node = lineage_with_schema(
4203 "next_day",
4204 &expr,
4205 Some(&schema),
4206 Some(DialectType::Snowflake),
4207 false,
4208 )
4209 .expect("lineage_with_schema should not treat DATEADD date part as a column");
4210
4211 let names = node.downstream_names();
4212 assert!(
4213 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4214 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4215 names
4216 );
4217 assert!(
4218 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4219 "Did not expect date part to appear as lineage column, got: {:?}",
4220 names
4221 );
4222 }
4223
4224 #[test]
4225 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
4226 let expr = parse_one(
4227 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4228 DialectType::Snowflake,
4229 )
4230 .expect("parse");
4231
4232 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4233 schema
4234 .add_table(
4235 "fact.some_daily_metrics",
4236 &[("date_utc".to_string(), DataType::Date)],
4237 None,
4238 )
4239 .expect("schema setup");
4240
4241 let node = lineage_with_schema(
4242 "day_part",
4243 &expr,
4244 Some(&schema),
4245 Some(DialectType::Snowflake),
4246 false,
4247 )
4248 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
4249
4250 let names = node.downstream_names();
4251 assert!(
4252 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4253 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4254 names
4255 );
4256 assert!(
4257 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4258 "Did not expect date part to appear as lineage column, got: {:?}",
4259 names
4260 );
4261 }
4262
4263 #[test]
4264 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
4265 let expr = parse_one(
4266 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4267 DialectType::Snowflake,
4268 )
4269 .expect("parse");
4270
4271 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4272 schema
4273 .add_table(
4274 "fact.some_daily_metrics",
4275 &[("date_utc".to_string(), DataType::Date)],
4276 None,
4277 )
4278 .expect("schema setup");
4279
4280 let node = lineage_with_schema(
4281 "day_part",
4282 &expr,
4283 Some(&schema),
4284 Some(DialectType::Snowflake),
4285 false,
4286 )
4287 .expect("quoted DATE_PART should continue to work");
4288
4289 let names = node.downstream_names();
4290 assert!(
4291 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4292 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4293 names
4294 );
4295 }
4296
4297 #[test]
4298 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
4299 let expr = parse_one(
4300 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4301 DialectType::Snowflake,
4302 )
4303 .expect("parse");
4304
4305 match expr {
4306 Expression::Select(select) => match &select.expressions[0] {
4307 Expression::Alias(alias) => match &alias.this {
4308 Expression::Function(f) => {
4309 assert_eq!(f.name.to_uppercase(), "DATEADD");
4310 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4311 }
4312 other => panic!("expected generic DATEADD function, got {other:?}"),
4313 },
4314 other => panic!("expected Alias, got {other:?}"),
4315 },
4316 other => panic!("expected Select, got {other:?}"),
4317 }
4318 }
4319
4320 #[test]
4321 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
4322 let expr = parse_one(
4323 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4324 DialectType::Snowflake,
4325 )
4326 .expect("parse");
4327
4328 match expr {
4329 Expression::Select(select) => match &select.expressions[0] {
4330 Expression::Alias(alias) => match &alias.this {
4331 Expression::Function(f) => {
4332 assert_eq!(f.name.to_uppercase(), "DATE_PART");
4333 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4334 }
4335 other => panic!("expected generic DATE_PART function, got {other:?}"),
4336 },
4337 other => panic!("expected Alias, got {other:?}"),
4338 },
4339 other => panic!("expected Select, got {other:?}"),
4340 }
4341 }
4342
4343 #[test]
4344 fn test_snowflake_date_part_string_literal_stays_generic_function() {
4345 let expr = parse_one(
4346 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4347 DialectType::Snowflake,
4348 )
4349 .expect("parse");
4350
4351 match expr {
4352 Expression::Select(select) => match &select.expressions[0] {
4353 Expression::Alias(alias) => match &alias.this {
4354 Expression::Function(f) => {
4355 assert_eq!(f.name.to_uppercase(), "DATE_PART");
4356 }
4357 other => panic!("expected generic DATE_PART function, got {other:?}"),
4358 },
4359 other => panic!("expected Alias, got {other:?}"),
4360 },
4361 other => panic!("expected Select, got {other:?}"),
4362 }
4363 }
4364
4365 #[test]
4366 fn test_lineage_join() {
4367 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4368
4369 let node_a = lineage("a", &expr, None, false).unwrap();
4370 let names_a = node_a.downstream_names();
4371 assert!(
4372 names_a.iter().any(|n| n == "t.a"),
4373 "Expected t.a, got: {:?}",
4374 names_a
4375 );
4376
4377 let node_b = lineage("b", &expr, None, false).unwrap();
4378 let names_b = node_b.downstream_names();
4379 assert!(
4380 names_b.iter().any(|n| n == "s.b"),
4381 "Expected s.b, got: {:?}",
4382 names_b
4383 );
4384 }
4385
4386 #[test]
4387 fn test_lineage_alias_leaf_has_resolved_source_name() {
4388 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
4389 let node = lineage("col1", &expr, None, false).unwrap();
4390
4391 let names = node.downstream_names();
4393 assert!(
4394 names.iter().any(|n| n == "t1.col1"),
4395 "Expected aliased column edge t1.col1, got: {:?}",
4396 names
4397 );
4398
4399 let leaf = node
4401 .downstream
4402 .iter()
4403 .find(|n| n.name == "t1.col1")
4404 .expect("Expected t1.col1 leaf");
4405 assert_eq!(leaf.source_name, "table1");
4406 match &leaf.source {
4407 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
4408 _ => panic!("Expected leaf source to be a table expression"),
4409 }
4410 }
4411
4412 #[test]
4413 fn test_lineage_derived_table() {
4414 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
4415 let node = lineage("a", &expr, None, false).unwrap();
4416
4417 assert_eq!(node.name, "a");
4418 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4420 assert!(
4421 all_names.iter().any(|n| n == "t.a"),
4422 "Expected to trace through derived table to t.a, got: {:?}",
4423 all_names
4424 );
4425 }
4426
4427 #[test]
4428 fn test_lineage_cte() {
4429 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
4430 let node = lineage("a", &expr, None, false).unwrap();
4431
4432 assert_eq!(node.name, "a");
4433 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4434 assert!(
4435 all_names.iter().any(|n| n == "t.a"),
4436 "Expected to trace through CTE to t.a, got: {:?}",
4437 all_names
4438 );
4439 }
4440
4441 #[test]
4442 fn test_lineage_union() {
4443 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
4444 let node = lineage("a", &expr, None, false).unwrap();
4445
4446 assert_eq!(node.name, "a");
4447 assert_eq!(
4449 node.downstream.len(),
4450 2,
4451 "Expected 2 branches for UNION, got {}",
4452 node.downstream.len()
4453 );
4454 }
4455
4456 #[test]
4457 fn test_lineage_cte_union() {
4458 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
4459 let node = lineage("a", &expr, None, false).unwrap();
4460
4461 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4463 assert!(
4464 all_names.len() >= 3,
4465 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
4466 all_names
4467 );
4468 }
4469
4470 #[test]
4471 fn test_lineage_star() {
4472 let expr = parse("SELECT * FROM t");
4473 let node = lineage("*", &expr, None, false).unwrap();
4474
4475 assert_eq!(node.name, "*");
4476 assert!(
4478 !node.downstream.is_empty(),
4479 "Star should produce downstream nodes"
4480 );
4481 }
4482
4483 #[test]
4484 fn test_lineage_subquery_in_select() {
4485 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
4486 let node = lineage("x", &expr, None, false).unwrap();
4487
4488 assert_eq!(node.name, "x");
4489 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4491 assert!(
4492 all_names.len() >= 2,
4493 "Expected tracing into scalar subquery, got: {:?}",
4494 all_names
4495 );
4496 }
4497
4498 #[test]
4499 fn test_lineage_multiple_columns() {
4500 let expr = parse("SELECT a, b FROM t");
4501
4502 let node_a = lineage("a", &expr, None, false).unwrap();
4503 let node_b = lineage("b", &expr, None, false).unwrap();
4504
4505 assert_eq!(node_a.name, "a");
4506 assert_eq!(node_b.name, "b");
4507
4508 let names_a = node_a.downstream_names();
4510 let names_b = node_b.downstream_names();
4511 assert!(names_a.iter().any(|n| n == "t.a"));
4512 assert!(names_b.iter().any(|n| n == "t.b"));
4513 }
4514
4515 #[test]
4516 fn test_get_source_tables() {
4517 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4518 let node = lineage("a", &expr, None, false).unwrap();
4519
4520 let tables = get_source_tables(&node);
4521 assert!(
4522 tables.contains("t"),
4523 "Expected source table 't', got: {:?}",
4524 tables
4525 );
4526 }
4527
4528 #[test]
4529 fn test_lineage_column_not_found() {
4530 let expr = parse("SELECT a FROM t");
4531 let result = lineage("nonexistent", &expr, None, false);
4532 assert!(result.is_err());
4533 }
4534
4535 #[test]
4536 fn test_lineage_nested_cte() {
4537 let expr = parse(
4538 "WITH cte1 AS (SELECT a FROM t), \
4539 cte2 AS (SELECT a FROM cte1) \
4540 SELECT a FROM cte2",
4541 );
4542 let node = lineage("a", &expr, None, false).unwrap();
4543
4544 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4546 assert!(
4547 all_names.len() >= 3,
4548 "Expected to trace through nested CTEs, got: {:?}",
4549 all_names
4550 );
4551 }
4552
4553 #[test]
4554 fn test_trim_selects_true() {
4555 let expr = parse("SELECT a, b, c FROM t");
4556 let node = lineage("a", &expr, None, true).unwrap();
4557
4558 if let Expression::Select(select) = &node.source {
4560 assert_eq!(
4561 select.expressions.len(),
4562 1,
4563 "Trimmed source should have 1 expression, got {}",
4564 select.expressions.len()
4565 );
4566 } else {
4567 panic!("Expected Select source");
4568 }
4569 }
4570
4571 #[test]
4572 fn test_trim_selects_false() {
4573 let expr = parse("SELECT a, b, c FROM t");
4574 let node = lineage("a", &expr, None, false).unwrap();
4575
4576 if let Expression::Select(select) = &node.source {
4578 assert_eq!(
4579 select.expressions.len(),
4580 3,
4581 "Untrimmed source should have 3 expressions"
4582 );
4583 } else {
4584 panic!("Expected Select source");
4585 }
4586 }
4587
4588 #[test]
4589 fn test_lineage_expression_in_select() {
4590 let expr = parse("SELECT a + b AS c FROM t");
4591 let node = lineage("c", &expr, None, false).unwrap();
4592
4593 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4595 assert!(
4596 all_names.len() >= 3,
4597 "Expected to trace a + b to both columns, got: {:?}",
4598 all_names
4599 );
4600 }
4601
4602 #[test]
4603 fn test_set_operation_by_index() {
4604 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
4605
4606 let node = lineage("a", &expr, None, false).unwrap();
4608
4609 assert_eq!(node.downstream.len(), 2);
4611 }
4612
4613 fn print_node(node: &LineageNode, indent: usize) {
4616 let pad = " ".repeat(indent);
4617 println!(
4618 "{pad}name={:?} source_name={:?}",
4619 node.name, node.source_name
4620 );
4621 for child in &node.downstream {
4622 print_node(child, indent + 1);
4623 }
4624 }
4625
4626 #[test]
4627 fn test_issue18_repro() {
4628 let query = "SELECT UPPER(name) as upper_name FROM users";
4630 println!("Query: {query}\n");
4631
4632 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
4633 let exprs = dialect.parse(query).unwrap();
4634 let expr = &exprs[0];
4635
4636 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
4637 println!("lineage(\"upper_name\"):");
4638 print_node(&node, 1);
4639
4640 let names = node.downstream_names();
4641 assert!(
4642 names.iter().any(|n| n == "users.name"),
4643 "Expected users.name in downstream, got: {:?}",
4644 names
4645 );
4646 }
4647
4648 #[test]
4649 fn test_lineage_bigquery_safe_namespace_issue207() {
4650 let query = r#"
4651WITH import_cte AS (
4652 SELECT timestamp, data, operation
4653 FROM `project`.`dataset`.`source_table`
4654),
4655transform_cte AS (
4656 SELECT
4657 timestamp,
4658 SAFE.PARSE_JSON(data) AS json_data
4659 FROM import_cte
4660)
4661SELECT json_data FROM transform_cte
4662"#;
4663 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
4664 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4665 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4666 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4667
4668 assert!(
4669 names.iter().any(|name| name == "source_table.data"),
4670 "expected source_table.data in lineage, got {names:?}"
4671 );
4672 assert!(
4673 !names
4674 .iter()
4675 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
4676 "did not expect SAFE namespace receiver in lineage, got {names:?}"
4677 );
4678 }
4679
4680 #[test]
4681 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
4682 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
4683 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4684 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4685 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4686
4687 assert!(
4688 names.iter().any(|name| name == "t.data"),
4689 "expected t.data in lineage, got {names:?}"
4690 );
4691 assert!(
4692 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
4693 "did not expect SAFE namespace receiver in lineage, got {names:?}"
4694 );
4695 }
4696
4697 #[test]
4698 fn test_lineage_method_call_receiver_control() {
4699 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
4700 let node = lineage("out", &expr, None, false).expect("lineage");
4701 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4702
4703 assert!(
4704 names.iter().any(|name| name == "t.obj"),
4705 "expected ordinary method receiver to remain in lineage, got {names:?}"
4706 );
4707 assert!(
4708 names.iter().any(|name| name == "t.arg"),
4709 "expected method argument in lineage, got {names:?}"
4710 );
4711 }
4712
4713 #[test]
4714 fn test_lineage_upper_function() {
4715 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
4716 let node = lineage("upper_name", &expr, None, false).unwrap();
4717
4718 let names = node.downstream_names();
4719 assert!(
4720 names.iter().any(|n| n == "users.name"),
4721 "Expected users.name in downstream, got: {:?}",
4722 names
4723 );
4724 }
4725
4726 #[test]
4727 fn test_lineage_round_function() {
4728 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
4729 let node = lineage("rounded", &expr, None, false).unwrap();
4730
4731 let names = node.downstream_names();
4732 assert!(
4733 names.iter().any(|n| n == "products.price"),
4734 "Expected products.price in downstream, got: {:?}",
4735 names
4736 );
4737 }
4738
4739 #[test]
4740 fn test_lineage_coalesce_function() {
4741 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
4742 let node = lineage("val", &expr, None, false).unwrap();
4743
4744 let names = node.downstream_names();
4745 assert!(
4746 names.iter().any(|n| n == "t.a"),
4747 "Expected t.a in downstream, got: {:?}",
4748 names
4749 );
4750 assert!(
4751 names.iter().any(|n| n == "t.b"),
4752 "Expected t.b in downstream, got: {:?}",
4753 names
4754 );
4755 }
4756
4757 #[test]
4758 fn test_lineage_count_function() {
4759 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
4760 let node = lineage("cnt", &expr, None, false).unwrap();
4761
4762 let names = node.downstream_names();
4763 assert!(
4764 names.iter().any(|n| n == "t.id"),
4765 "Expected t.id in downstream, got: {:?}",
4766 names
4767 );
4768 }
4769
4770 #[test]
4771 fn test_lineage_sum_function() {
4772 let expr = parse("SELECT SUM(amount) AS total FROM t");
4773 let node = lineage("total", &expr, None, false).unwrap();
4774
4775 let names = node.downstream_names();
4776 assert!(
4777 names.iter().any(|n| n == "t.amount"),
4778 "Expected t.amount in downstream, got: {:?}",
4779 names
4780 );
4781 }
4782
4783 #[test]
4784 fn test_lineage_case_with_nested_functions() {
4785 let expr =
4786 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
4787 let node = lineage("result", &expr, None, false).unwrap();
4788
4789 let names = node.downstream_names();
4790 assert!(
4791 names.iter().any(|n| n == "t.x"),
4792 "Expected t.x in downstream, got: {:?}",
4793 names
4794 );
4795 assert!(
4796 names.iter().any(|n| n == "t.name"),
4797 "Expected t.name in downstream, got: {:?}",
4798 names
4799 );
4800 }
4801
4802 #[test]
4803 fn test_lineage_substring_function() {
4804 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
4805 let node = lineage("short", &expr, None, false).unwrap();
4806
4807 let names = node.downstream_names();
4808 assert!(
4809 names.iter().any(|n| n == "t.name"),
4810 "Expected t.name in downstream, got: {:?}",
4811 names
4812 );
4813 }
4814
4815 #[test]
4818 fn test_lineage_cte_select_star() {
4819 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
4823 let node = lineage("a", &expr, None, false).unwrap();
4824
4825 assert_eq!(node.name, "a");
4826 assert!(
4829 !node.downstream.is_empty(),
4830 "Expected downstream nodes tracing through CTE, got none"
4831 );
4832 }
4833
4834 #[test]
4835 fn test_lineage_schema_less_cte_star_passthrough_resolves_base_column() {
4836 let expr = parse("WITH c AS (SELECT * FROM t) SELECT c.x FROM c");
4837 let node = lineage("x", &expr, None, false).unwrap();
4838
4839 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4840 assert!(
4841 all_names.iter().any(|name| name == "t.x"),
4842 "Expected schema-less CTE star passthrough to reach t.x, got: {:?}",
4843 all_names
4844 );
4845
4846 let cte_node = node
4847 .walk()
4848 .find(|child| child.source_kind == SourceKind::Cte && child.source_name == "c")
4849 .expect("expected CTE hop with source_name c");
4850 assert_eq!(cte_node.source_kind, SourceKind::Cte);
4851 assert_eq!(cte_node.source_name, "c");
4852 }
4853
4854 #[test]
4855 fn test_lineage_schema_less_cte_star_passthrough_with_aggregation() {
4856 let expr = parse(
4857 "WITH c AS (SELECT * FROM t) \
4858 SELECT SUM(c.x) AS s FROM c GROUP BY 1",
4859 );
4860 let node = lineage("s", &expr, None, false).unwrap();
4861
4862 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4863 assert!(
4864 all_names.iter().any(|name| name == "t.x"),
4865 "Expected aggregate over CTE star passthrough to reach t.x, got: {:?}",
4866 all_names
4867 );
4868 }
4869
4870 #[test]
4871 fn test_lineage_schema_less_cte_star_passthrough_with_join_and_alias() {
4872 let expr = parse(
4873 "WITH a AS (SELECT * FROM t1), b AS (SELECT * FROM t2) \
4874 SELECT SUM(b.x) AS s FROM a LEFT JOIN b ON b.id = a.id GROUP BY a.k",
4875 );
4876 let node = lineage("s", &expr, None, false).unwrap();
4877
4878 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4879 assert!(
4880 all_names.iter().any(|name| name == "t2.x"),
4881 "Expected joined CTE star passthrough to reach t2.x, got: {:?}",
4882 all_names
4883 );
4884 }
4885
4886 #[test]
4887 fn test_lineage_schema_less_chained_cte_star_passthrough() {
4888 let expr = parse(
4889 "WITH c1 AS (SELECT * FROM t), \
4890 c2 AS (SELECT * FROM c1), \
4891 c3 AS (SELECT * FROM c2) \
4892 SELECT c3.x FROM c3",
4893 );
4894 let node = lineage("x", &expr, None, false).unwrap();
4895
4896 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4897 assert!(
4898 all_names.iter().any(|name| name == "t.x"),
4899 "Expected chained CTE star passthrough to reach t.x, got: {:?}",
4900 all_names
4901 );
4902 }
4903
4904 #[test]
4905 fn test_lineage_schema_less_unqualified_star_with_multiple_sources_does_not_guess() {
4906 let expr = parse("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
4907 let result = lineage("x", &expr, None, false);
4908
4909 assert!(
4910 result.is_err(),
4911 "Unqualified star over multiple sources should remain ambiguous, got: {:?}",
4912 result
4913 );
4914 }
4915
4916 #[test]
4917 fn test_lineage_cte_select_star_renamed_column() {
4918 let expr =
4921 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
4922 let node = lineage("customer_id", &expr, None, false).unwrap();
4923
4924 assert_eq!(node.name, "customer_id");
4925 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4927 assert!(
4928 all_names.len() >= 2,
4929 "Expected at least 2 nodes (customer_id → source), got: {:?}",
4930 all_names
4931 );
4932 }
4933
4934 #[test]
4935 fn test_lineage_cte_select_star_multiple_columns() {
4936 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
4938
4939 for col in &["a", "b", "c"] {
4940 let node = lineage(col, &expr, None, false).unwrap();
4941 assert_eq!(node.name, *col);
4942 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4944 assert!(
4945 all_names.len() >= 2,
4946 "Expected at least 2 nodes for column {}, got: {:?}",
4947 col,
4948 all_names
4949 );
4950 }
4951 }
4952
4953 #[test]
4954 fn test_lineage_nested_cte_select_star() {
4955 let expr = parse(
4957 "WITH cte1 AS (SELECT a FROM t), \
4958 cte2 AS (SELECT * FROM cte1) \
4959 SELECT * FROM cte2",
4960 );
4961 let node = lineage("a", &expr, None, false).unwrap();
4962
4963 assert_eq!(node.name, "a");
4964 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4965 assert!(
4966 all_names.len() >= 3,
4967 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
4968 all_names
4969 );
4970 }
4971
4972 #[test]
4973 fn test_lineage_three_level_nested_cte_star() {
4974 let expr = parse(
4976 "WITH cte1 AS (SELECT x FROM t), \
4977 cte2 AS (SELECT * FROM cte1), \
4978 cte3 AS (SELECT * FROM cte2) \
4979 SELECT * FROM cte3",
4980 );
4981 let node = lineage("x", &expr, None, false).unwrap();
4982
4983 assert_eq!(node.name, "x");
4984 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4985 assert!(
4986 all_names.len() >= 4,
4987 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
4988 all_names
4989 );
4990 }
4991
4992 #[test]
4993 fn test_lineage_cte_union_star() {
4994 let expr = parse(
4996 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
4997 SELECT * FROM cte",
4998 );
4999 let node = lineage("a", &expr, None, false).unwrap();
5000
5001 assert_eq!(node.name, "a");
5002 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5003 assert!(
5004 all_names.len() >= 2,
5005 "Expected at least 2 nodes for CTE union star, got: {:?}",
5006 all_names
5007 );
5008 }
5009
5010 #[test]
5011 fn test_lineage_cte_star_unknown_table() {
5012 let expr = parse(
5015 "WITH cte AS (SELECT * FROM unknown_table) \
5016 SELECT * FROM cte",
5017 );
5018 let _result = lineage("x", &expr, None, false);
5021 }
5022
5023 #[test]
5024 fn test_lineage_cte_explicit_columns() {
5025 let expr = parse(
5027 "WITH cte(x, y) AS (SELECT a, b FROM t) \
5028 SELECT * FROM cte",
5029 );
5030 let node = lineage("x", &expr, None, false).unwrap();
5031 assert_eq!(node.name, "x");
5032 }
5033
5034 #[test]
5035 fn test_lineage_cte_qualified_star() {
5036 let expr = parse(
5038 "WITH cte AS (SELECT a, b FROM t) \
5039 SELECT cte.* FROM cte",
5040 );
5041 for col in &["a", "b"] {
5042 let node = lineage(col, &expr, None, false).unwrap();
5043 assert_eq!(node.name, *col);
5044 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5045 assert!(
5046 all_names.len() >= 2,
5047 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
5048 col,
5049 all_names
5050 );
5051 }
5052 }
5053
5054 #[test]
5055 fn test_lineage_subquery_select_star() {
5056 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
5059 let node = lineage("x", &expr, None, false).unwrap();
5060
5061 assert_eq!(node.name, "x");
5062 assert!(
5063 !node.downstream.is_empty(),
5064 "Expected downstream nodes for subquery with SELECT *, got none"
5065 );
5066 }
5067
5068 #[test]
5069 fn test_lineage_cte_star_with_schema_external_table() {
5070 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
5072SELECT * FROM orders"#;
5073 let expr = parse(sql);
5074
5075 let mut schema = MappingSchema::new();
5076 let cols = vec![
5077 ("order_id".to_string(), DataType::Unknown),
5078 ("customer_id".to_string(), DataType::Unknown),
5079 ("amount".to_string(), DataType::Unknown),
5080 ];
5081 schema.add_table("stg_orders", &cols, None).unwrap();
5082
5083 let node =
5084 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5085 .unwrap();
5086 assert_eq!(node.name, "order_id");
5087 }
5088
5089 #[test]
5090 fn test_lineage_cte_star_with_schema_three_part_name() {
5091 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
5093SELECT * FROM orders"#;
5094 let expr = parse(sql);
5095
5096 let mut schema = MappingSchema::new();
5097 let cols = vec![
5098 ("order_id".to_string(), DataType::Unknown),
5099 ("customer_id".to_string(), DataType::Unknown),
5100 ];
5101 schema
5102 .add_table("db.schema.stg_orders", &cols, None)
5103 .unwrap();
5104
5105 let node = lineage_with_schema(
5106 "customer_id",
5107 &expr,
5108 Some(&schema as &dyn Schema),
5109 None,
5110 false,
5111 )
5112 .unwrap();
5113 assert_eq!(node.name, "customer_id");
5114 }
5115
5116 #[test]
5117 fn test_lineage_cte_star_with_schema_nested() {
5118 let sql = r#"WITH
5121 raw AS (SELECT * FROM external_table),
5122 enriched AS (SELECT * FROM raw)
5123 SELECT * FROM enriched"#;
5124 let expr = parse(sql);
5125
5126 let mut schema = MappingSchema::new();
5127 let cols = vec![
5128 ("id".to_string(), DataType::Unknown),
5129 ("name".to_string(), DataType::Unknown),
5130 ];
5131 schema.add_table("external_table", &cols, None).unwrap();
5132
5133 let node =
5134 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
5135 assert_eq!(node.name, "name");
5136 }
5137
5138 #[test]
5139 fn test_lineage_cte_qualified_star_with_schema() {
5140 let sql = r#"WITH
5143 orders AS (SELECT * FROM stg_orders),
5144 enriched AS (
5145 SELECT orders.*, 'extra' AS extra
5146 FROM orders
5147 )
5148 SELECT * FROM enriched"#;
5149 let expr = parse(sql);
5150
5151 let mut schema = MappingSchema::new();
5152 let cols = vec![
5153 ("order_id".to_string(), DataType::Unknown),
5154 ("total".to_string(), DataType::Unknown),
5155 ];
5156 schema.add_table("stg_orders", &cols, None).unwrap();
5157
5158 let node =
5159 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5160 .unwrap();
5161 assert_eq!(node.name, "order_id");
5162
5163 let extra =
5165 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
5166 assert_eq!(extra.name, "extra");
5167 }
5168
5169 #[test]
5170 fn test_lineage_cte_star_without_schema_still_works() {
5171 let sql = r#"WITH
5173 cte1 AS (SELECT id, name FROM raw_table),
5174 cte2 AS (SELECT * FROM cte1)
5175 SELECT * FROM cte2"#;
5176 let expr = parse(sql);
5177
5178 let node = lineage("id", &expr, None, false).unwrap();
5180 assert_eq!(node.name, "id");
5181 }
5182
5183 #[test]
5184 fn test_lineage_nested_cte_star_with_join_and_schema() {
5185 let sql = r#"WITH
5188base_orders AS (
5189 SELECT * FROM stg_orders
5190),
5191with_payments AS (
5192 SELECT
5193 base_orders.*,
5194 p.amount
5195 FROM base_orders
5196 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
5197),
5198final_cte AS (
5199 SELECT * FROM with_payments
5200)
5201SELECT * FROM final_cte"#;
5202 let expr = parse(sql);
5203
5204 let mut schema = MappingSchema::new();
5205 let order_cols = vec![
5206 (
5207 "order_id".to_string(),
5208 crate::expressions::DataType::Unknown,
5209 ),
5210 (
5211 "customer_id".to_string(),
5212 crate::expressions::DataType::Unknown,
5213 ),
5214 ("status".to_string(), crate::expressions::DataType::Unknown),
5215 ];
5216 let pay_cols = vec![
5217 (
5218 "payment_id".to_string(),
5219 crate::expressions::DataType::Unknown,
5220 ),
5221 (
5222 "order_id".to_string(),
5223 crate::expressions::DataType::Unknown,
5224 ),
5225 ("amount".to_string(), crate::expressions::DataType::Unknown),
5226 ];
5227 schema.add_table("stg_orders", &order_cols, None).unwrap();
5228 schema.add_table("stg_payments", &pay_cols, None).unwrap();
5229
5230 let node =
5232 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5233 .unwrap();
5234 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5235
5236 let has_table_qualified = all_names
5238 .iter()
5239 .any(|n| n.contains('.') && n.contains("order_id"));
5240 assert!(
5241 has_table_qualified,
5242 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
5243 all_names
5244 );
5245
5246 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
5248 .unwrap();
5249 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5250
5251 let has_table_qualified = all_names
5252 .iter()
5253 .any(|n| n.contains('.') && n.contains("amount"));
5254 assert!(
5255 has_table_qualified,
5256 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
5257 all_names
5258 );
5259 }
5260
5261 #[test]
5262 fn test_lineage_cte_alias_resolution() {
5263 let sql = r#"WITH import_stg_items AS (
5265 SELECT item_id, name, status FROM stg_items
5266)
5267SELECT base.item_id, base.status
5268FROM import_stg_items AS base"#;
5269 let expr = parse(sql);
5270
5271 let node = lineage("item_id", &expr, None, false).unwrap();
5272 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5273 assert!(
5275 all_names.iter().any(|n| n == "stg_items.item_id"),
5276 "Expected leaf 'stg_items.item_id', got: {:?}",
5277 all_names
5278 );
5279 }
5280
5281 #[test]
5282 fn test_lineage_cte_alias_with_schema_and_star() {
5283 let sql = r#"WITH import_stg AS (
5285 SELECT * FROM stg_items
5286)
5287SELECT base.item_id, base.status
5288FROM import_stg AS base"#;
5289 let expr = parse(sql);
5290
5291 let mut schema = MappingSchema::new();
5292 schema
5293 .add_table(
5294 "stg_items",
5295 &[
5296 ("item_id".to_string(), DataType::Unknown),
5297 ("name".to_string(), DataType::Unknown),
5298 ("status".to_string(), DataType::Unknown),
5299 ],
5300 None,
5301 )
5302 .unwrap();
5303
5304 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
5305 .unwrap();
5306 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5307 assert!(
5308 all_names.iter().any(|n| n == "stg_items.item_id"),
5309 "Expected leaf 'stg_items.item_id', got: {:?}",
5310 all_names
5311 );
5312 }
5313
5314 #[test]
5315 fn test_lineage_cte_alias_with_join() {
5316 let sql = r#"WITH
5318 import_users AS (SELECT id, name FROM users),
5319 import_orders AS (SELECT id, user_id, amount FROM orders)
5320SELECT u.name, o.amount
5321FROM import_users AS u
5322LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
5323 let expr = parse(sql);
5324
5325 let node = lineage("name", &expr, None, false).unwrap();
5326 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5327 assert!(
5328 all_names.iter().any(|n| n == "users.name"),
5329 "Expected leaf 'users.name', got: {:?}",
5330 all_names
5331 );
5332
5333 let node = lineage("amount", &expr, None, false).unwrap();
5334 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5335 assert!(
5336 all_names.iter().any(|n| n == "orders.amount"),
5337 "Expected leaf 'orders.amount', got: {:?}",
5338 all_names
5339 );
5340 }
5341
5342 #[test]
5347 fn test_lineage_unquoted_cte_case_insensitive() {
5348 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
5351 let node = lineage("col", &expr, None, false).unwrap();
5352 assert_eq!(node.name, "col");
5353 assert!(
5354 !node.downstream.is_empty(),
5355 "Unquoted CTE should resolve case-insensitively"
5356 );
5357 }
5358
5359 #[test]
5360 fn test_lineage_quoted_cte_case_preserved() {
5361 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
5363 let node = lineage("col", &expr, None, false).unwrap();
5364 assert_eq!(node.name, "col");
5365 assert!(
5366 !node.downstream.is_empty(),
5367 "Quoted CTE with matching case should resolve"
5368 );
5369 }
5370
5371 #[test]
5372 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
5373 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
5377 let result = lineage("col", &expr, None, false);
5380 assert!(
5381 result.is_err(),
5382 "Quoted CTE with case mismatch should not expand star: {:?}",
5383 result
5384 );
5385 }
5386
5387 #[test]
5388 fn test_lineage_mixed_quoted_unquoted_cte() {
5389 let expr = parse(
5391 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
5392 );
5393 let node = lineage("a", &expr, None, false).unwrap();
5394 assert_eq!(node.name, "a");
5395 assert!(
5396 !node.downstream.is_empty(),
5397 "Mixed quoted/unquoted CTE chain should resolve"
5398 );
5399 }
5400
5401 #[test]
5417 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
5418 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
5429 let node = lineage("col", &expr, None, false).unwrap();
5430 assert!(!node.downstream.is_empty());
5431 let child = &node.downstream[0];
5432 assert_eq!(
5434 child.source_name, "MyCte",
5435 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5436 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5437 );
5438 }
5439
5440 #[test]
5441 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
5442 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
5449 let node = lineage("col", &expr, None, false).unwrap();
5450 assert!(!node.downstream.is_empty());
5451 let child = &node.downstream[0];
5452 assert_eq!(
5454 child.source_name, "MyCte",
5455 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5456 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5457 );
5458 }
5459
5460 #[test]
5461 fn test_lineage_recursive_cte_terminates_at_base_case() {
5462 let expr = parse_dialect(
5463 "WITH RECURSIVE nums AS (\
5464 SELECT 1 AS n \
5465 UNION ALL \
5466 SELECT n + 1 FROM nums WHERE n < 5\
5467 ) SELECT n FROM nums",
5468 DialectType::DuckDB,
5469 );
5470 let node = lineage("n", &expr, Some(DialectType::DuckDB), false).unwrap();
5471 let names = lineage_names(&node);
5472
5473 assert!(
5474 names.len() <= 12,
5475 "recursive CTE lineage should not unroll repeatedly, got {names:?}"
5476 );
5477 assert!(
5478 node.walk()
5479 .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "nums"),
5480 "expected recursive source to be marked as a CTE, got {names:?}"
5481 );
5482 }
5483
5484 #[test]
5485 fn test_lineage_window_partition_and_order_columns() {
5486 let expr = parse(
5487 "WITH c AS (SELECT user_id, ts FROM events) \
5488 SELECT ROW_NUMBER() OVER (PARTITION BY c.user_id ORDER BY c.ts) AS out FROM c",
5489 );
5490 let node = lineage("out", &expr, None, false).unwrap();
5491
5492 assert_lineage_contains(&node, "events.user_id");
5493 assert_lineage_contains(&node, "events.ts");
5494 }
5495
5496 #[test]
5497 fn test_lineage_window_aggregate_order_column() {
5498 let expr = parse(
5499 "WITH c AS (SELECT amount, d FROM txns) \
5500 SELECT SUM(c.amount) OVER (ORDER BY c.d) AS running FROM c",
5501 );
5502 let node = lineage("running", &expr, None, false).unwrap();
5503
5504 assert_lineage_contains(&node, "txns.amount");
5505 assert_lineage_contains(&node, "txns.d");
5506 }
5507
5508 #[test]
5509 fn test_lineage_named_window_columns() {
5510 let expr = parse(
5511 "SELECT ROW_NUMBER() OVER w AS out \
5512 FROM events \
5513 WINDOW w AS (PARTITION BY user_id ORDER BY ts)",
5514 );
5515 let node = lineage("out", &expr, None, false).unwrap();
5516
5517 assert_lineage_contains(&node, "events.user_id");
5518 assert_lineage_contains(&node, "events.ts");
5519 }
5520
5521 #[test]
5522 fn test_lineage_within_group_order_column() {
5523 let expr =
5524 parse("SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS p FROM txns");
5525 let node = lineage("p", &expr, None, false).unwrap();
5526
5527 assert_lineage_contains(&node, "txns.amount");
5528 }
5529
5530 #[test]
5531 fn test_lineage_query_wrappers_resolve_inner_select() {
5532 for sql in [
5533 "CREATE TABLE tgt AS SELECT x FROM src",
5534 "CREATE VIEW v AS SELECT x FROM src",
5535 "INSERT INTO tgt SELECT x FROM src",
5536 ] {
5537 let expr = parse(sql);
5538 let node = lineage("x", &expr, None, false).unwrap();
5539 assert_lineage_contains(&node, "src.x");
5540 }
5541 }
5542
5543 #[test]
5544 fn test_lineage_scalar_subquery_through_cte_reaches_base_table() {
5545 let expr = parse(
5546 "WITH c AS (SELECT x FROM t) \
5547 SELECT (SELECT SUM(x) FROM c) AS s FROM c LIMIT 1",
5548 );
5549 let node = lineage("s", &expr, None, false).unwrap();
5550
5551 assert_lineage_contains(&node, "t.x");
5552 assert!(
5553 node.walk()
5554 .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "c"),
5555 "expected scalar subquery CTE hop in lineage, got {:?}",
5556 lineage_names(&node)
5557 );
5558 }
5559
5560 #[test]
5561 fn test_lineage_scalar_subqueries_inside_expression_wrappers() {
5562 for sql in [
5563 "WITH c AS (SELECT a, b FROM t) \
5564 SELECT CASE WHEN c.a > 0 THEN c.b ELSE (SELECT MAX(z) FROM o) END AS r FROM c",
5565 "WITH c AS (SELECT a FROM t) \
5566 SELECT COALESCE(c.a, (SELECT MAX(z) FROM o)) AS r FROM c",
5567 "WITH c AS (SELECT a FROM t) \
5568 SELECT CAST((SELECT MAX(z) FROM o) AS INT) + c.a AS r FROM c",
5569 "WITH c AS (SELECT a FROM t) \
5570 SELECT CASE WHEN c.a BETWEEN 0 AND (SELECT MAX(z) FROM o) THEN c.a END AS r FROM c",
5571 ] {
5572 let expr = parse_dialect(sql, DialectType::DuckDB);
5573 let node = lineage("r", &expr, Some(DialectType::DuckDB), false)
5574 .unwrap_or_else(|error| panic!("lineage failed for {sql}: {error}"));
5575
5576 assert_lineage_contains(&node, "o.z");
5577 assert_lineage_contains(&node, "t.a");
5578 }
5579 }
5580
5581 #[test]
5582 fn test_lineage_nested_set_operation_inside_derived_table() {
5583 let expr = parse_dialect(
5584 "SELECT v FROM ((SELECT v FROM t1 UNION ALL SELECT v FROM t2) \
5585 UNION ALL SELECT v FROM t3) u",
5586 DialectType::DuckDB,
5587 );
5588 let node = lineage("v", &expr, Some(DialectType::DuckDB), false).unwrap();
5589
5590 assert_lineage_contains(&node, "t1.v");
5591 assert_lineage_contains(&node, "t2.v");
5592 assert_lineage_contains(&node, "t3.v");
5593 }
5594
5595 #[test]
5596 fn test_lineage_select_alias_reference_resolves_to_alias_source() {
5597 let expr = parse_dialect(
5598 "WITH c AS (SELECT x FROM t) SELECT c.x AS a, a + 1 AS b FROM c",
5599 DialectType::DuckDB,
5600 );
5601 let node = lineage("b", &expr, Some(DialectType::DuckDB), false).unwrap();
5602
5603 assert_lineage_contains(&node, "t.x");
5604 }
5605
5606 #[test]
5607 fn test_lineage_pivot_output_resolves_aggregation_input() {
5608 let expr = parse_dialect(
5609 "SELECT * FROM (SELECT region, q, amt FROM sales) \
5610 PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5611 DialectType::DuckDB,
5612 );
5613 let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5614
5615 assert_lineage_contains(&node, "sales.amt");
5616 }
5617
5618 #[test]
5619 fn test_lineage_pivot_multi_aggregate_and_alias_columns() {
5620 let multi = parse_dialect(
5621 "SELECT * FROM (SELECT category, value, price FROM t) \
5622 PIVOT(SUM(value) AS value_sum, MAX(price) FOR category IN ('a' AS cat_a, 'b'))",
5623 DialectType::DuckDB,
5624 );
5625 let value_sum =
5626 lineage("cat_a_value_sum", &multi, Some(DialectType::DuckDB), false).unwrap();
5627 assert_lineage_contains(&value_sum, "t.value");
5628
5629 let max_price =
5630 lineage("cat_a_max(price)", &multi, Some(DialectType::DuckDB), false).unwrap();
5631 assert_lineage_contains(&max_price, "t.price");
5632
5633 let aliased = parse_dialect(
5634 "SELECT * FROM (SELECT region, q, amt FROM sales) \
5635 PIVOT(SUM(amt) FOR q IN ('Q1')) AS p(region2, p1)",
5636 DialectType::DuckDB,
5637 );
5638 let region = lineage("region2", &aliased, Some(DialectType::DuckDB), false).unwrap();
5639 assert_lineage_contains(®ion, "sales.region");
5640
5641 let pivot_value = lineage("p1", &aliased, Some(DialectType::DuckDB), false).unwrap();
5642 assert_lineage_contains(&pivot_value, "sales.amt");
5643 }
5644
5645 #[test]
5646 fn test_lineage_pivot_through_cte_resolves_aggregation_input() {
5647 let expr = parse_dialect(
5648 "WITH src AS (SELECT region, q, amt FROM sales) \
5649 SELECT q1 FROM src PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5650 DialectType::DuckDB,
5651 );
5652 let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5653
5654 assert_lineage_contains(&node, "sales.amt");
5655 }
5656
5657 #[test]
5658 fn test_lineage_unpivot_value_resolves_input_columns() {
5659 let expr = parse_dialect(
5660 "SELECT name, val FROM t UNPIVOT(val FOR col IN (a, b, c))",
5661 DialectType::DuckDB,
5662 );
5663 let node = lineage("val", &expr, Some(DialectType::DuckDB), false).unwrap();
5664
5665 assert_lineage_contains(&node, "t.a");
5666 assert_lineage_contains(&node, "t.b");
5667 assert_lineage_contains(&node, "t.c");
5668 }
5669
5670 #[test]
5671 fn test_lineage_unpivot_multi_value_columns_resolve_positionally() {
5672 let expr = parse_dialect(
5673 "SELECT first_half_sales, second_half_sales, semester \
5674 FROM produce \
5675 UNPIVOT((first_half_sales, second_half_sales) \
5676 FOR semester IN ((q1, q2) AS 'semester_1', (q3, q4) AS 'semester_2'))",
5677 DialectType::BigQuery,
5678 );
5679
5680 let first = lineage(
5681 "first_half_sales",
5682 &expr,
5683 Some(DialectType::BigQuery),
5684 false,
5685 )
5686 .unwrap();
5687 assert_lineage_contains(&first, "produce.q1");
5688 assert_lineage_contains(&first, "produce.q3");
5689
5690 let second = lineage(
5691 "second_half_sales",
5692 &expr,
5693 Some(DialectType::BigQuery),
5694 false,
5695 )
5696 .unwrap();
5697 assert_lineage_contains(&second, "produce.q2");
5698 assert_lineage_contains(&second, "produce.q4");
5699 }
5700
5701 #[test]
5702 fn test_lineage_top_level_union_over_ctes_reaches_base_tables() {
5703 let expr = parse(
5704 "WITH a AS (SELECT x FROM t1), b AS (SELECT x FROM t2) \
5705 SELECT x FROM a UNION SELECT x FROM b",
5706 );
5707 let node = lineage("x", &expr, None, false).unwrap();
5708
5709 assert_lineage_contains(&node, "t1.x");
5710 assert_lineage_contains(&node, "t2.x");
5711 }
5712
5713 #[test]
5714 fn test_lineage_star_excludes_semi_join_rhs_source() {
5715 let expr = parse_dialect(
5716 "SELECT * FROM orders LEFT SEMI JOIN customers ON orders.customer_id = customers.id",
5717 DialectType::DuckDB,
5718 );
5719 let node = lineage("customer_id", &expr, Some(DialectType::DuckDB), false).unwrap();
5720
5721 assert_lineage_contains(&node, "orders.customer_id");
5722 }
5723
5724 #[test]
5731 #[ignore = "requires derived table star expansion (separate issue)"]
5732 fn test_node_name_doesnt_contain_comment() {
5733 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
5734 let node = lineage("x", &expr, None, false).unwrap();
5735
5736 assert_eq!(node.name, "x");
5737 assert!(!node.downstream.is_empty());
5738 }
5739
5740 #[test]
5744 fn test_comment_before_first_column_in_cte() {
5745 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
5746 let sql_without_comment = "with t as (select 1 as a) select a from t";
5747
5748 let expr_ok = parse(sql_without_comment);
5750 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
5751
5752 let expr_comment = parse(sql_with_comment);
5754 let node_comment = lineage("a", &expr_comment, None, false)
5755 .expect("with comment before first column should succeed");
5756
5757 assert_eq!(node_ok.name, node_comment.name, "node names should match");
5758 assert_eq!(
5759 node_ok.downstream_names(),
5760 node_comment.downstream_names(),
5761 "downstream lineage should be identical with or without comment"
5762 );
5763 }
5764
5765 #[test]
5767 fn test_block_comment_before_first_column() {
5768 let sql = "with t as (select 1 as a) select /* section */ a from t";
5769 let expr = parse(sql);
5770 let node = lineage("a", &expr, None, false)
5771 .expect("block comment before first column should succeed");
5772 assert_eq!(node.name, "a");
5773 assert!(
5774 !node.downstream.is_empty(),
5775 "should have downstream lineage"
5776 );
5777 }
5778
5779 #[test]
5781 fn test_comment_before_first_column_second_col_ok() {
5782 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
5783 let expr = parse(sql);
5784
5785 let node_a =
5786 lineage("a", &expr, None, false).expect("column a with comment should succeed");
5787 assert_eq!(node_a.name, "a");
5788
5789 let node_b =
5790 lineage("b", &expr, None, false).expect("column b with comment should succeed");
5791 assert_eq!(node_b.name, "b");
5792 }
5793
5794 #[test]
5796 fn test_comment_before_aliased_column() {
5797 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
5798 let expr = parse(sql);
5799 let node =
5800 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
5801 assert_eq!(node.name, "y");
5802 assert!(
5803 !node.downstream.is_empty(),
5804 "aliased column should have downstream lineage"
5805 );
5806 }
5807}