1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, NamedWindow, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub source_kind: SourceKind,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub source_alias: Option<String>,
37 pub reference_node_name: String,
39}
40
41impl LineageNode {
42 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44 Self {
45 name: name.into(),
46 expression,
47 source,
48 downstream: Vec::new(),
49 source_name: String::new(),
50 source_kind: SourceKind::Unknown,
51 source_alias: None,
52 reference_node_name: String::new(),
53 }
54 }
55
56 pub fn walk(&self) -> LineageWalker<'_> {
58 LineageWalker { stack: vec![self] }
59 }
60
61 pub fn downstream_names(&self) -> Vec<String> {
63 self.downstream.iter().map(|n| n.name.clone()).collect()
64 }
65}
66
67fn source_kind_for_scope_context(
68 scope: &Scope,
69 source_name: &str,
70 reference_node_name: &str,
71) -> SourceKind {
72 if source_name.is_empty() && reference_node_name.is_empty() {
73 return SourceKind::Root;
74 }
75 if let Some(source_info) = scope.sources.get(source_name) {
76 return source_info.kind;
77 }
78 if scope.cte_sources.contains_key(source_name) {
79 return SourceKind::Cte;
80 }
81 match scope.scope_type {
82 ScopeType::Cte => SourceKind::Cte,
83 ScopeType::DerivedTable => SourceKind::DerivedTable,
84 ScopeType::Udtf => SourceKind::Virtual,
85 _ => SourceKind::Unknown,
86 }
87}
88
89fn apply_scope_context(
90 node: &mut LineageNode,
91 scope: &Scope,
92 source_name: &str,
93 reference_node_name: &str,
94) {
95 node.source_name = source_name.to_string();
96 node.reference_node_name = reference_node_name.to_string();
97 node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100pub struct LineageWalker<'a> {
102 stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106 type Item = &'a LineageNode;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 if let Some(node) = self.stack.pop() {
110 for child in node.downstream.iter().rev() {
112 self.stack.push(child);
113 }
114 Some(node)
115 } else {
116 None
117 }
118 }
119}
120
121enum ColumnRef<'a> {
127 Name(&'a str),
128 Index(usize),
129}
130
131pub fn lineage(
157 column: &str,
158 sql: &Expression,
159 dialect: Option<DialectType>,
160 trim_selects: bool,
161) -> Result<LineageNode> {
162 let mut owned = lineage_normalized_expression(sql);
163 if has_lineage_with_clause(&owned) {
165 expand_cte_stars(&mut owned, None);
166 }
167 lineage_from_expression(column, &owned, dialect, trim_selects)
168}
169
170pub fn lineage_with_schema(
186 column: &str,
187 sql: &Expression,
188 schema: Option<&dyn Schema>,
189 dialect: Option<DialectType>,
190 trim_selects: bool,
191) -> Result<LineageNode> {
192 let normalized_expression = lineage_normalized_expression(sql);
193 let mut qualified_expression = if let Some(schema) = schema {
194 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
195 QualifyColumnsOptions::new().with_dialect(dialect_type)
196 } else {
197 QualifyColumnsOptions::new()
198 };
199
200 qualify_columns(normalized_expression.clone(), schema, &options).map_err(|e| {
201 Error::internal(format!("Lineage qualification failed with schema: {}", e))
202 })?
203 } else {
204 normalized_expression
205 };
206
207 annotate_types(&mut qualified_expression, schema, dialect);
209
210 expand_cte_stars(&mut qualified_expression, schema);
213
214 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
215}
216
217fn lineage_from_expression(
218 column: &str,
219 sql: &Expression,
220 dialect: Option<DialectType>,
221 trim_selects: bool,
222) -> Result<LineageNode> {
223 let scope = build_scope(sql);
224 to_node(
225 ColumnRef::Name(column),
226 &scope,
227 dialect,
228 "",
229 "",
230 "",
231 trim_selects,
232 )
233}
234
235pub(crate) fn lineage_by_index_from_expression(
236 column_index: usize,
237 sql: &Expression,
238 dialect: Option<DialectType>,
239 trim_selects: bool,
240) -> Result<LineageNode> {
241 let normalized = lineage_normalized_expression(sql);
242 let scope = build_scope(&normalized);
243 to_node(
244 ColumnRef::Index(column_index),
245 &scope,
246 dialect,
247 "",
248 "",
249 "",
250 trim_selects,
251 )
252}
253
254fn lineage_normalized_expression(sql: &Expression) -> Expression {
255 match sql {
256 Expression::Prepare(prepare) => lineage_normalized_expression(&prepare.statement),
257 Expression::CreateTable(create) => create
258 .as_select
259 .as_ref()
260 .map(|query| attach_with_to_query(query.clone(), create.with_cte.clone()))
261 .unwrap_or_else(|| sql.clone()),
262 Expression::CreateView(create) => lineage_normalized_expression(&create.query),
263 Expression::Insert(insert) => insert
264 .query
265 .as_ref()
266 .map(|query| attach_with_to_query(query.clone(), insert.with.clone()))
267 .unwrap_or_else(|| sql.clone()),
268 _ => sql.clone(),
269 }
270}
271
272fn attach_with_to_query(
273 mut query: Expression,
274 with: Option<crate::expressions::With>,
275) -> Expression {
276 if let Some(with) = with {
277 attach_with_to_query_mut(&mut query, with);
278 }
279 query
280}
281
282fn attach_with_to_query_mut(query: &mut Expression, with: crate::expressions::With) {
283 match query {
284 Expression::Select(select) => {
285 if select.with.is_none() {
286 select.with = Some(with);
287 }
288 }
289 Expression::Union(union) => {
290 if union.with.is_none() {
291 union.with = Some(with);
292 }
293 }
294 Expression::Intersect(intersect) => {
295 if intersect.with.is_none() {
296 intersect.with = Some(with);
297 }
298 }
299 Expression::Except(except) => {
300 if except.with.is_none() {
301 except.with = Some(with);
302 }
303 }
304 Expression::Paren(paren) => attach_with_to_query_mut(&mut paren.this, with),
305 _ => {}
306 }
307}
308
309fn has_lineage_with_clause(expr: &Expression) -> bool {
310 match expr {
311 Expression::Select(select) => select.with.is_some(),
312 Expression::Union(union) => {
313 union.with.is_some()
314 || has_lineage_with_clause(&union.left)
315 || has_lineage_with_clause(&union.right)
316 }
317 Expression::Intersect(intersect) => {
318 intersect.with.is_some()
319 || has_lineage_with_clause(&intersect.left)
320 || has_lineage_with_clause(&intersect.right)
321 }
322 Expression::Except(except) => {
323 except.with.is_some()
324 || has_lineage_with_clause(&except.left)
325 || has_lineage_with_clause(&except.right)
326 }
327 Expression::Paren(paren) => has_lineage_with_clause(&paren.this),
328 _ => false,
329 }
330}
331
332fn normalize_cte_name(ident: &Identifier) -> String {
342 if ident.quoted {
343 ident.name.clone()
344 } else {
345 ident.name.to_lowercase()
346 }
347}
348
349pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
361 if let Expression::Prepare(prepare) = expr {
362 expand_cte_stars(&mut prepare.statement, schema);
363 return;
364 }
365
366 let select = match expr {
367 Expression::Select(s) => s,
368 _ => return,
369 };
370
371 let with = match &mut select.with {
372 Some(w) => w,
373 None => return,
374 };
375
376 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
377
378 for cte in &mut with.ctes {
379 let cte_name = normalize_cte_name(&cte.alias);
380
381 if !cte.columns.is_empty() {
383 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
384 resolved_cte_columns.insert(cte_name, cols);
385 continue;
386 }
387
388 if with.recursive {
394 let is_self_referencing =
395 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
396 let body_sources = get_select_sources(body_select);
397 body_sources.iter().any(|s| s.normalized == cte_name)
398 } else {
399 false
400 };
401 if is_self_referencing {
402 continue;
403 }
404 }
405
406 let body_select = match get_leftmost_select_mut(&mut cte.this) {
408 Some(s) => s,
409 None => continue,
410 };
411
412 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
413 resolved_cte_columns.insert(cte_name, columns);
414 }
415
416 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
418}
419
420fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
425 let mut current = expr;
426 for _ in 0..MAX_LINEAGE_DEPTH {
427 match current {
428 Expression::Select(s) => return Some(s),
429 Expression::Union(u) => current = &mut u.left,
430 Expression::Intersect(i) => current = &mut i.left,
431 Expression::Except(e) => current = &mut e.left,
432 Expression::Paren(p) => current = &mut p.this,
433 _ => return None,
434 }
435 }
436 None
437}
438
439fn rewrite_stars_in_select(
443 select: &mut Select,
444 resolved_ctes: &HashMap<String, Vec<String>>,
445 schema: Option<&dyn Schema>,
446) -> Vec<String> {
447 let has_star = select
452 .expressions
453 .iter()
454 .any(|e| matches!(e, Expression::Star(_)));
455 let has_qualified_star = select
456 .expressions
457 .iter()
458 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
459
460 if !has_star && !has_qualified_star {
461 return select
463 .expressions
464 .iter()
465 .filter_map(get_expression_output_name)
466 .collect();
467 }
468
469 let sources = get_select_sources(select);
470 let mut new_expressions = Vec::new();
471 let mut result_columns = Vec::new();
472
473 for expr in &select.expressions {
474 match expr {
475 Expression::Star(star) => {
476 let qual = star.table.as_ref();
477 if let Some(expanded) =
478 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
479 {
480 for (src_alias, col_name) in &expanded {
481 let table_id = Identifier::new(src_alias);
482 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
483 result_columns.push(col_name.clone());
484 }
485 } else {
486 new_expressions.push(expr.clone());
487 result_columns.push("*".to_string());
488 }
489 }
490 Expression::Column(c) if c.name.name == "*" => {
491 let qual = c.table.as_ref();
492 if let Some(expanded) =
493 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
494 {
495 for (_src_alias, col_name) in &expanded {
496 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
498 result_columns.push(col_name.clone());
499 }
500 } else {
501 new_expressions.push(expr.clone());
502 result_columns.push("*".to_string());
503 }
504 }
505 _ => {
506 new_expressions.push(expr.clone());
507 if let Some(name) = get_expression_output_name(expr) {
508 result_columns.push(name);
509 }
510 }
511 }
512 }
513
514 select.expressions = new_expressions;
515 result_columns
516}
517
518fn expand_star_from_sources(
523 qualifier: Option<&Identifier>,
524 sources: &[SourceInfo],
525 resolved_ctes: &HashMap<String, Vec<String>>,
526 schema: Option<&dyn Schema>,
527) -> Option<Vec<(String, String)>> {
528 let mut expanded = Vec::new();
529
530 if let Some(qual) = qualifier {
531 let qual_normalized = normalize_cte_name(qual);
533 for src in sources {
534 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
535 if let Some(cols) = resolved_ctes.get(&src.normalized) {
537 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
538 return Some(expanded);
539 }
540 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
542 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
543 return Some(expanded);
544 }
545 }
546 }
547 None
548 } else {
549 let mut any_expanded = false;
555 for src in sources {
556 if let Some(cols) = resolved_ctes.get(&src.normalized) {
557 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
558 any_expanded = true;
559 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
560 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
561 any_expanded = true;
562 } else {
563 return None;
564 }
565 }
566 if any_expanded {
567 Some(expanded)
568 } else {
569 None
570 }
571 }
572}
573
574fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
576 let schema = schema?;
577 if fq_name.is_empty() {
578 return None;
579 }
580 schema
581 .column_names(fq_name)
582 .ok()
583 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
584}
585
586fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
588 Expression::Column(Box::new(crate::expressions::Column {
589 name: Identifier::new(name),
590 table: table.cloned(),
591 join_mark: false,
592 trailing_comments: Vec::new(),
593 span: None,
594 inferred_type: None,
595 }))
596}
597
598fn get_expression_output_name(expr: &Expression) -> Option<String> {
600 match expr {
601 Expression::Alias(a) => Some(a.alias.name.clone()),
602 Expression::Column(c) => Some(c.name.name.clone()),
603 Expression::Identifier(id) => Some(id.name.clone()),
604 Expression::Star(_) => Some("*".to_string()),
605 _ => None,
606 }
607}
608
609struct SourceInfo {
611 alias: String,
612 quoted: bool,
619 normalized: String,
621 fq_name: String,
623}
624
625fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
628 let mut sources = Vec::new();
629
630 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
631 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
632 SourceInfo {
633 alias: alias.name.clone(),
634 quoted: alias.quoted,
635 normalized: normalize_cte_name(alias),
636 fq_name: alias.name.clone(),
637 }
638 }
639
640 fn named_virtual_source_info(alias: &str) -> SourceInfo {
641 SourceInfo {
642 alias: alias.to_string(),
643 quoted: false,
644 normalized: alias.to_lowercase(),
645 fq_name: alias.to_string(),
646 }
647 }
648
649 match expr {
650 Expression::Table(t) => {
651 let normalized = normalize_cte_name(&t.name);
652 let alias = t
653 .alias
654 .as_ref()
655 .map(|a| a.name.clone())
656 .unwrap_or_else(|| t.name.name.clone());
657 let mut parts = Vec::new();
658 if let Some(catalog) = &t.catalog {
659 parts.push(catalog.name.clone());
660 }
661 if let Some(schema) = &t.schema {
662 parts.push(schema.name.clone());
663 }
664 parts.push(t.name.name.clone());
665 let fq_name = parts.join(".");
666 Some(SourceInfo {
667 alias,
668 quoted: t.name.quoted,
669 normalized,
670 fq_name,
671 })
672 }
673 Expression::Subquery(s) => {
674 let alias_identifier = s.alias.as_ref()?;
675 let alias = alias_identifier.name.clone();
676 let normalized = alias.to_lowercase();
677 let fq_name = alias.clone();
678 Some(SourceInfo {
679 alias,
680 quoted: alias_identifier.quoted,
681 normalized,
682 fq_name,
683 })
684 }
685 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
686 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
687 Some(virtual_source_info(&a.alias))
688 }
689 Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
690 Expression::LateralView(lateral_view) => lateral_view
691 .table_alias
692 .as_ref()
693 .or_else(|| lateral_view.column_aliases.first())
694 .map(virtual_source_info),
695 Expression::Pivot(pivot) => {
696 let alias = pivot_lineage_source_name(
697 &pivot.this,
698 pivot.alias.as_ref().map(|alias| alias.name.as_str()),
699 );
700 Some(SourceInfo {
701 alias: alias.clone(),
702 quoted: false,
703 normalized: alias.to_lowercase(),
704 fq_name: alias,
705 })
706 }
707 Expression::Unpivot(unpivot) => {
708 let alias = pivot_lineage_source_name(
709 &unpivot.this,
710 unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
711 );
712 Some(SourceInfo {
713 alias: alias.clone(),
714 quoted: false,
715 normalized: alias.to_lowercase(),
716 fq_name: alias,
717 })
718 }
719 Expression::Paren(p) => extract_source(&p.this),
720 _ => None,
721 }
722 }
723
724 if let Some(from) = &select.from {
725 for expr in &from.expressions {
726 if let Some(info) = extract_source(expr) {
727 sources.push(info);
728 }
729 }
730 }
731 for join in &select.joins {
732 if let Some(info) = extract_source(&join.this) {
733 sources.push(info);
734 }
735 }
736 for lateral_view in &select.lateral_views {
737 if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
738 {
739 sources.push(info);
740 }
741 }
742 sources
743}
744
745fn pivot_lineage_source_name(source: &Expression, explicit_alias: Option<&str>) -> String {
746 if let Some(alias) = explicit_alias {
747 return alias.to_string();
748 }
749
750 match source {
751 Expression::Table(table) => table
752 .alias
753 .as_ref()
754 .map(|alias| alias.name.clone())
755 .unwrap_or_else(|| table.name.name.clone()),
756 Expression::Subquery(subquery) => subquery
757 .alias
758 .as_ref()
759 .map(|alias| alias.name.clone())
760 .unwrap_or_else(|| "_0".to_string()),
761 Expression::Paren(paren) => pivot_lineage_source_name(&paren.this, explicit_alias),
762 _ => "_0".to_string(),
763 }
764}
765
766pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
768 let mut tables = HashSet::new();
769 collect_source_tables(node, &mut tables);
770 tables
771}
772
773pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
775 if let Expression::Table(table) = &node.source {
776 tables.insert(table.name.name.clone());
777 }
778 for child in &node.downstream {
779 collect_source_tables(child, tables);
780 }
781}
782
783const MAX_LINEAGE_DEPTH: usize = 64;
790
791fn to_node(
793 column: ColumnRef<'_>,
794 scope: &Scope,
795 dialect: Option<DialectType>,
796 scope_name: &str,
797 source_name: &str,
798 reference_node_name: &str,
799 trim_selects: bool,
800) -> Result<LineageNode> {
801 to_node_inner(
802 column,
803 scope,
804 dialect,
805 scope_name,
806 source_name,
807 reference_node_name,
808 trim_selects,
809 &[],
810 0,
811 )
812}
813
814fn to_node_inner(
815 column: ColumnRef<'_>,
816 scope: &Scope,
817 dialect: Option<DialectType>,
818 scope_name: &str,
819 source_name: &str,
820 reference_node_name: &str,
821 trim_selects: bool,
822 ancestor_cte_scopes: &[Scope],
823 depth: usize,
824) -> Result<LineageNode> {
825 if depth > MAX_LINEAGE_DEPTH {
826 return Err(Error::internal(format!(
827 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
828 )));
829 }
830 let scope_expr = &scope.expression;
831
832 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
834 for s in ancestor_cte_scopes {
835 all_cte_scopes.push(s);
836 }
837 let descendant_cte_scopes = descendant_cte_scope_clones(&all_cte_scopes, scope);
838
839 let effective_expr = match scope_expr {
842 Expression::Cte(cte) => &cte.this,
843 other => other,
844 };
845
846 if matches!(
848 effective_expr,
849 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
850 ) {
851 if matches!(scope_expr, Expression::Cte(_)) {
853 let mut inner_scope = Scope::new(effective_expr.clone());
854 inner_scope.union_scopes = scope.union_scopes.clone();
855 inner_scope.sources = scope.sources.clone();
856 inner_scope.cte_sources = scope.cte_sources.clone();
857 inner_scope.cte_scopes = scope.cte_scopes.clone();
858 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
859 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
860 return handle_set_operation(
861 &column,
862 &inner_scope,
863 dialect,
864 scope_name,
865 source_name,
866 reference_node_name,
867 trim_selects,
868 &descendant_cte_scopes,
869 depth,
870 );
871 }
872 return handle_set_operation(
873 &column,
874 scope,
875 dialect,
876 scope_name,
877 source_name,
878 reference_node_name,
879 trim_selects,
880 &descendant_cte_scopes,
881 depth,
882 );
883 }
884
885 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
887 let column_name = resolve_column_name(&column, &select_expr);
888
889 let node_source = if trim_selects {
891 trim_source(effective_expr, &select_expr)
892 } else {
893 effective_expr.clone()
894 };
895
896 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
898 apply_scope_context(&mut node, scope, source_name, reference_node_name);
899
900 if let Expression::Star(star) = &select_expr {
902 let star_table = star
903 .table
904 .as_ref()
905 .map(|identifier| identifier.name.as_str());
906 for (name, source_info) in &scope.sources {
907 if let Some(star_table) = star_table {
908 let table_matches = name.eq_ignore_ascii_case(star_table)
909 || source_info
910 .alias
911 .as_deref()
912 .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
913 || matches!(
914 &source_info.expression,
915 Expression::Table(table_ref)
916 if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
917 );
918 if !table_matches {
919 continue;
920 }
921 }
922
923 let mut child = LineageNode::new(
924 format!("{}.*", name),
925 Expression::Star(crate::expressions::Star {
926 table: star.table.clone(),
927 except: None,
928 replace: None,
929 rename: None,
930 trailing_comments: vec![],
931 span: None,
932 }),
933 source_info.expression.clone(),
934 );
935 apply_source_info_context(&mut child, name, source_info);
936 node.downstream.push(child);
937 }
938 return Ok(node);
939 }
940
941 let subqueries: Vec<&Expression> =
943 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
944 for sq_expr in subqueries {
945 if let Expression::Subquery(sq) = sq_expr {
946 for sq_scope in &scope.subquery_scopes {
947 if sq_scope.expression == sq.this {
948 if let Ok(child) = to_node_inner(
949 ColumnRef::Index(0),
950 sq_scope,
951 dialect,
952 &column_name,
953 "",
954 "",
955 trim_selects,
956 &descendant_cte_scopes,
957 depth + 1,
958 ) {
959 node.downstream.push(child);
960 }
961 break;
962 }
963 }
964 }
965 }
966
967 let col_refs = find_column_refs_in_expr_with_select(&select_expr, effective_expr, dialect);
969 for col_ref in col_refs {
970 let col_name = &col_ref.column;
971 if let Some(ref table_id) = col_ref.table {
972 let tbl = &table_id.name;
973 resolve_qualified_column(
974 &mut node,
975 scope,
976 dialect,
977 tbl,
978 col_name,
979 &column_name,
980 trim_selects,
981 &all_cte_scopes,
982 depth,
983 );
984 } else {
985 resolve_unqualified_column(
986 &mut node,
987 scope,
988 dialect,
989 col_name,
990 &column_name,
991 trim_selects,
992 &all_cte_scopes,
993 depth,
994 );
995 }
996 }
997
998 Ok(node)
999}
1000
1001fn descendant_cte_scope_clones(all_cte_scopes: &[&Scope], current_scope: &Scope) -> Vec<Scope> {
1002 all_cte_scopes
1003 .iter()
1004 .filter(|scope| scope.expression != current_scope.expression)
1005 .map(|scope| (*scope).clone())
1006 .collect()
1007}
1008
1009fn handle_set_operation(
1014 column: &ColumnRef<'_>,
1015 scope: &Scope,
1016 dialect: Option<DialectType>,
1017 scope_name: &str,
1018 source_name: &str,
1019 reference_node_name: &str,
1020 trim_selects: bool,
1021 ancestor_cte_scopes: &[Scope],
1022 depth: usize,
1023) -> Result<LineageNode> {
1024 let scope_expr = &scope.expression;
1025
1026 let col_index = match column {
1028 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
1029 ColumnRef::Index(i) => *i,
1030 };
1031
1032 let col_name = match column {
1033 ColumnRef::Name(name) => name.to_string(),
1034 ColumnRef::Index(_) => format!("_{col_index}"),
1035 };
1036
1037 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
1038 apply_scope_context(&mut node, scope, source_name, reference_node_name);
1039
1040 for branch_scope in &scope.union_scopes {
1042 if let Ok(child) = to_node_inner(
1043 ColumnRef::Index(col_index),
1044 branch_scope,
1045 dialect,
1046 scope_name,
1047 "",
1048 "",
1049 trim_selects,
1050 ancestor_cte_scopes,
1051 depth + 1,
1052 ) {
1053 node.downstream.push(child);
1054 }
1055 }
1056
1057 Ok(node)
1058}
1059
1060fn resolve_qualified_column(
1065 node: &mut LineageNode,
1066 scope: &Scope,
1067 dialect: Option<DialectType>,
1068 table: &str,
1069 col_name: &str,
1070 parent_name: &str,
1071 trim_selects: bool,
1072 all_cte_scopes: &[&Scope],
1073 depth: usize,
1074) {
1075 let resolved_cte_name = resolve_cte_alias(scope, table);
1078 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
1079
1080 if let Some(source_info) = scope
1081 .sources
1082 .get(table)
1083 .or_else(|| scope.sources.get(effective_table))
1084 {
1085 match &source_info.expression {
1086 Expression::Pivot(pivot) => {
1087 if attach_pivot_dependencies(
1088 node,
1089 scope,
1090 dialect,
1091 pivot,
1092 col_name,
1093 trim_selects,
1094 all_cte_scopes,
1095 depth,
1096 ) {
1097 return;
1098 }
1099 }
1100 Expression::Unpivot(unpivot) => {
1101 if attach_unpivot_dependencies(
1102 node,
1103 scope,
1104 dialect,
1105 unpivot,
1106 col_name,
1107 trim_selects,
1108 all_cte_scopes,
1109 depth,
1110 ) {
1111 return;
1112 }
1113 }
1114 _ => {}
1115 }
1116 }
1117
1118 let is_cte = scope.cte_sources.contains_key(effective_table)
1121 || all_cte_scopes.iter().any(
1122 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
1123 );
1124 if is_cte {
1125 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
1126 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1128 if let Ok(child) = to_node_inner(
1129 ColumnRef::Name(col_name),
1130 child_scope,
1131 dialect,
1132 parent_name,
1133 effective_table,
1134 parent_name,
1135 trim_selects,
1136 &ancestors,
1137 depth + 1,
1138 ) {
1139 node.downstream.push(child);
1140 return;
1141 }
1142 }
1143
1144 if let Some(source_info) = scope
1145 .sources
1146 .get(table)
1147 .or_else(|| scope.sources.get(effective_table))
1148 .filter(|source_info| source_info.kind == SourceKind::Cte)
1149 {
1150 node.downstream.push(make_table_column_node_from_source(
1151 effective_table,
1152 col_name,
1153 source_info,
1154 ));
1155 return;
1156 }
1157 }
1158
1159 if let Some(source_info) = scope.sources.get(table) {
1161 if source_info.is_scope {
1162 if let Some(child_scope) = find_child_scope(scope, table) {
1163 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1164 if let Ok(child) = to_node_inner(
1165 ColumnRef::Name(col_name),
1166 child_scope,
1167 dialect,
1168 parent_name,
1169 table,
1170 parent_name,
1171 trim_selects,
1172 &ancestors,
1173 depth + 1,
1174 ) {
1175 node.downstream.push(child);
1176 return;
1177 }
1178 }
1179 }
1180 }
1181
1182 if let Some(source_info) = scope.sources.get(table) {
1185 if !source_info.is_scope {
1186 let mut child = make_table_column_node_from_source(table, col_name, source_info);
1187 if source_info.kind == SourceKind::Virtual {
1188 attach_virtual_source_dependencies(
1189 &mut child,
1190 scope,
1191 dialect,
1192 table,
1193 &source_info.expression,
1194 trim_selects,
1195 all_cte_scopes,
1196 depth,
1197 );
1198 }
1199 node.downstream.push(child);
1200 return;
1201 }
1202 }
1203
1204 node.downstream
1206 .push(make_table_column_node(table, col_name));
1207}
1208
1209fn attach_pivot_dependencies(
1210 node: &mut LineageNode,
1211 scope: &Scope,
1212 dialect: Option<DialectType>,
1213 pivot: &crate::expressions::Pivot,
1214 col_name: &str,
1215 trim_selects: bool,
1216 all_cte_scopes: &[&Scope],
1217 depth: usize,
1218) -> bool {
1219 if pivot.unpivot {
1220 return false;
1221 }
1222
1223 let mapping = pivot_column_mapping(pivot, dialect);
1224 let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1225 if pivot_implicit_source_column(pivot, col_name) {
1226 let col_ref = SimpleColumnRef {
1227 table: None,
1228 column: col_name.to_string(),
1229 };
1230 attach_pivot_input_column(
1231 node,
1232 scope,
1233 dialect,
1234 &pivot.this,
1235 &col_ref,
1236 trim_selects,
1237 all_cte_scopes,
1238 depth,
1239 );
1240 return true;
1241 }
1242 return false;
1243 };
1244
1245 for col_ref in input_columns {
1246 attach_pivot_input_column(
1247 node,
1248 scope,
1249 dialect,
1250 &pivot.this,
1251 col_ref,
1252 trim_selects,
1253 all_cte_scopes,
1254 depth,
1255 );
1256 }
1257 true
1258}
1259
1260fn attach_unpivot_dependencies(
1261 node: &mut LineageNode,
1262 scope: &Scope,
1263 dialect: Option<DialectType>,
1264 unpivot: &crate::expressions::Unpivot,
1265 col_name: &str,
1266 trim_selects: bool,
1267 all_cte_scopes: &[&Scope],
1268 depth: usize,
1269) -> bool {
1270 let mapping = unpivot_column_mapping(unpivot, dialect);
1271 let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1272 return false;
1273 };
1274
1275 for col_ref in input_columns {
1276 attach_pivot_input_column(
1277 node,
1278 scope,
1279 dialect,
1280 &unpivot.this,
1281 col_ref,
1282 trim_selects,
1283 all_cte_scopes,
1284 depth,
1285 );
1286 }
1287 true
1288}
1289
1290fn pivot_column_mapping(
1291 pivot: &crate::expressions::Pivot,
1292 dialect: Option<DialectType>,
1293) -> HashMap<String, Vec<SimpleColumnRef>> {
1294 let fields = pivot_field_output_names(pivot);
1295 if fields.is_empty() {
1296 return HashMap::new();
1297 }
1298
1299 let mut mapping = HashMap::new();
1300 for (agg_index, agg) in pivot.expressions.iter().enumerate() {
1301 let input_columns = find_column_refs_in_expr(agg, dialect);
1302 if input_columns.is_empty() {
1303 continue;
1304 }
1305 let agg_name = pivot_aggregation_name(agg);
1306 for field in &fields {
1307 let mut output_names = Vec::new();
1308 if pivot.expressions.len() == 1 {
1309 output_names.push(field.clone());
1310 }
1311 if let Some(agg_name) = &agg_name {
1312 output_names.push(format!("{field}_{agg_name}"));
1313 output_names.push(format!("{agg_name}_{field}"));
1314 }
1315 if pivot.expressions.len() > 1 && agg_name.is_none() {
1316 output_names.push(format!("{}_{}", field, agg_index));
1317 }
1318
1319 for output_name in output_names {
1320 mapping.insert(
1321 normalize_column_name(&output_name, dialect),
1322 input_columns.clone(),
1323 );
1324 }
1325 }
1326 }
1327 mapping
1328}
1329
1330fn pivot_field_output_names(pivot: &crate::expressions::Pivot) -> Vec<String> {
1331 let mut names = Vec::new();
1332 for field in &pivot.fields {
1333 if let Expression::In(in_expr) = field {
1334 for expr in &in_expr.expressions {
1335 if let Some(name) = pivot_expr_output_name(expr) {
1336 names.push(name);
1337 }
1338 }
1339 }
1340 }
1341 names
1342}
1343
1344fn pivot_aggregation_name(expr: &Expression) -> Option<String> {
1345 match expr {
1346 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1347 _ => get_alias_or_name(expr),
1348 }
1349}
1350
1351fn pivot_expr_output_name(expr: &Expression) -> Option<String> {
1352 match expr {
1353 Expression::PivotAlias(alias) => pivot_expr_output_name(&alias.alias),
1354 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1355 Expression::Identifier(identifier) => Some(identifier.name.clone()),
1356 Expression::Column(column) => Some(column.name.name.clone()),
1357 Expression::Literal(literal) => Some(literal.value_str().to_string()),
1358 Expression::Var(var) => Some(var.this.clone()),
1359 Expression::Tuple(tuple) => tuple.expressions.first().and_then(pivot_expr_output_name),
1360 _ => None,
1361 }
1362}
1363
1364fn pivot_implicit_source_column(pivot: &crate::expressions::Pivot, col_name: &str) -> bool {
1365 let pivot_columns: HashSet<String> = pivot
1366 .fields
1367 .iter()
1368 .filter_map(|field| match field {
1369 Expression::In(in_expr) => Some(&in_expr.this),
1370 _ => None,
1371 })
1372 .flat_map(|expr| find_column_refs_in_expr(expr, None))
1373 .map(|col| col.column.to_lowercase())
1374 .collect();
1375 let aggregation_columns: HashSet<String> = pivot
1376 .expressions
1377 .iter()
1378 .flat_map(|expr| find_column_refs_in_expr(expr, None))
1379 .map(|col| col.column.to_lowercase())
1380 .collect();
1381
1382 let normalized = col_name.to_lowercase();
1383 !pivot_columns.contains(&normalized) && !aggregation_columns.contains(&normalized)
1384}
1385
1386fn unpivot_column_mapping(
1387 unpivot: &crate::expressions::Unpivot,
1388 dialect: Option<DialectType>,
1389) -> HashMap<String, Vec<SimpleColumnRef>> {
1390 let value_columns: Vec<String> = std::iter::once(unpivot.value_column.name.clone())
1391 .chain(
1392 unpivot
1393 .extra_value_columns
1394 .iter()
1395 .map(|column| column.name.clone()),
1396 )
1397 .collect();
1398 let mut all_input_columns = Vec::new();
1399 let mut value_input_columns: Vec<Vec<SimpleColumnRef>> = vec![Vec::new(); value_columns.len()];
1400
1401 for entry in &unpivot.columns {
1402 let columns = unpivot_entry_columns(entry);
1403 all_input_columns.extend(columns.clone());
1404 if columns.len() == value_columns.len() {
1405 for (idx, col_ref) in columns.into_iter().enumerate() {
1406 value_input_columns[idx].push(col_ref);
1407 }
1408 } else {
1409 for inputs in &mut value_input_columns {
1410 inputs.extend(columns.clone());
1411 }
1412 }
1413 }
1414
1415 let mut mapping = HashMap::new();
1416 mapping.insert(
1417 normalize_column_name(&unpivot.name_column.name, dialect),
1418 all_input_columns.clone(),
1419 );
1420 for (idx, value_column) in value_columns.iter().enumerate() {
1421 mapping.insert(
1422 normalize_column_name(value_column, dialect),
1423 value_input_columns.get(idx).cloned().unwrap_or_default(),
1424 );
1425 }
1426 mapping
1427}
1428
1429fn unpivot_entry_columns(expr: &Expression) -> Vec<SimpleColumnRef> {
1430 match expr {
1431 Expression::PivotAlias(alias) => unpivot_entry_columns(&alias.this),
1432 Expression::Tuple(tuple) => tuple
1433 .expressions
1434 .iter()
1435 .flat_map(unpivot_entry_columns)
1436 .collect(),
1437 Expression::Column(column) => vec![SimpleColumnRef {
1438 table: column.table.clone(),
1439 column: column.name.name.clone(),
1440 }],
1441 Expression::Identifier(identifier) => vec![SimpleColumnRef {
1442 table: None,
1443 column: identifier.name.clone(),
1444 }],
1445 _ => find_column_refs_in_expr(expr, None),
1446 }
1447}
1448
1449fn attach_pivot_input_column(
1450 node: &mut LineageNode,
1451 scope: &Scope,
1452 dialect: Option<DialectType>,
1453 source_expr: &Expression,
1454 col_ref: &SimpleColumnRef,
1455 trim_selects: bool,
1456 all_cte_scopes: &[&Scope],
1457 depth: usize,
1458) {
1459 match source_expr {
1460 Expression::Table(table) => {
1461 let table_name = col_ref
1462 .table
1463 .as_ref()
1464 .map(|identifier| identifier.name.as_str())
1465 .unwrap_or(table.name.name.as_str());
1466 if scope.cte_sources.contains_key(table_name) {
1467 resolve_qualified_column(
1468 node,
1469 scope,
1470 dialect,
1471 table_name,
1472 &col_ref.column,
1473 &node.name.clone(),
1474 trim_selects,
1475 all_cte_scopes,
1476 depth + 1,
1477 );
1478 } else {
1479 let mut source = ScopeSourceInfo::new(
1480 Expression::Table(Box::new(table.as_ref().clone())),
1481 false,
1482 SourceKind::Table,
1483 );
1484 if let Some(alias) = &table.alias {
1485 source = source.with_alias(alias.name.clone());
1486 }
1487 let source_key = table
1488 .alias
1489 .as_ref()
1490 .map(|alias| alias.name.as_str())
1491 .unwrap_or(table.name.name.as_str());
1492 node.downstream.push(make_table_column_node_from_source(
1493 source_key,
1494 &col_ref.column,
1495 &source,
1496 ));
1497 }
1498 }
1499 Expression::Subquery(subquery) => {
1500 let source_scope = build_scope(&subquery.this);
1501 let child = if let Some(table) = &col_ref.table {
1502 let mut child_node = LineageNode::new(
1503 &col_ref.column,
1504 subquery.this.clone(),
1505 subquery.this.clone(),
1506 );
1507 resolve_qualified_column(
1508 &mut child_node,
1509 &source_scope,
1510 dialect,
1511 &table.name,
1512 &col_ref.column,
1513 &node.name.clone(),
1514 trim_selects,
1515 all_cte_scopes,
1516 depth + 1,
1517 );
1518 Ok(child_node)
1519 } else {
1520 to_node_inner(
1521 ColumnRef::Name(&col_ref.column),
1522 &source_scope,
1523 dialect,
1524 "",
1525 "",
1526 "",
1527 trim_selects,
1528 &all_cte_scopes
1529 .iter()
1530 .map(|scope| (*scope).clone())
1531 .collect::<Vec<_>>(),
1532 depth + 1,
1533 )
1534 };
1535 if let Ok(child) = child {
1536 node.downstream.push(child);
1537 }
1538 }
1539 Expression::Paren(paren) => attach_pivot_input_column(
1540 node,
1541 scope,
1542 dialect,
1543 &paren.this,
1544 col_ref,
1545 trim_selects,
1546 all_cte_scopes,
1547 depth,
1548 ),
1549 _ => {
1550 if let Some(table) = &col_ref.table {
1551 resolve_qualified_column(
1552 node,
1553 scope,
1554 dialect,
1555 &table.name,
1556 &col_ref.column,
1557 &node.name.clone(),
1558 trim_selects,
1559 all_cte_scopes,
1560 depth + 1,
1561 );
1562 } else {
1563 node.downstream
1564 .push(make_table_column_node("_", &col_ref.column));
1565 }
1566 }
1567 }
1568}
1569
1570fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1576 if scope.cte_sources.contains_key(name) {
1578 return None;
1579 }
1580 if let Some(source_info) = scope.sources.get(name) {
1582 if source_info.is_scope {
1583 if let Expression::Cte(cte) = &source_info.expression {
1584 let cte_name = &cte.alias.name;
1585 if scope.cte_sources.contains_key(cte_name) {
1586 return Some(cte_name.clone());
1587 }
1588 }
1589 }
1590 }
1591 None
1592}
1593
1594fn resolve_unqualified_column(
1595 node: &mut LineageNode,
1596 scope: &Scope,
1597 dialect: Option<DialectType>,
1598 col_name: &str,
1599 parent_name: &str,
1600 trim_selects: bool,
1601 all_cte_scopes: &[&Scope],
1602 depth: usize,
1603) {
1604 let from_source_names = source_names_from_from_join(scope);
1608
1609 if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1610 resolve_qualified_column(
1611 node,
1612 scope,
1613 dialect,
1614 &tbl,
1615 col_name,
1616 parent_name,
1617 trim_selects,
1618 all_cte_scopes,
1619 depth,
1620 );
1621 return;
1622 }
1623
1624 if from_source_names.len() == 1 {
1625 let tbl = &from_source_names[0];
1626 resolve_qualified_column(
1627 node,
1628 scope,
1629 dialect,
1630 tbl,
1631 col_name,
1632 parent_name,
1633 trim_selects,
1634 all_cte_scopes,
1635 depth,
1636 );
1637 return;
1638 }
1639
1640 let child = LineageNode::new(
1642 col_name.to_string(),
1643 Expression::Column(Box::new(crate::expressions::Column {
1644 name: crate::expressions::Identifier::new(col_name.to_string()),
1645 table: None,
1646 join_mark: false,
1647 trailing_comments: vec![],
1648 span: None,
1649 inferred_type: None,
1650 })),
1651 node.source.clone(),
1652 );
1653 node.downstream.push(child);
1654}
1655
1656fn unique_virtual_source_for_column(
1657 scope: &Scope,
1658 source_names: &[String],
1659 col_name: &str,
1660) -> Option<String> {
1661 let mut matches = source_names.iter().filter_map(|source_name| {
1662 let source = scope.sources.get(source_name)?;
1663 if source.kind == SourceKind::Virtual
1664 && virtual_source_output_columns(source)
1665 .any(|column| column.eq_ignore_ascii_case(col_name))
1666 {
1667 Some(source_name.clone())
1668 } else {
1669 None
1670 }
1671 });
1672
1673 let first = matches.next()?;
1674 if matches.next().is_none() {
1675 Some(first)
1676 } else {
1677 None
1678 }
1679}
1680
1681fn virtual_source_output_columns(
1682 source_info: &ScopeSourceInfo,
1683) -> Box<dyn Iterator<Item = String> + '_> {
1684 match &source_info.expression {
1685 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1686 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1687 Box::new(alias_output_columns(alias))
1688 }
1689 Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1690 Expression::LateralView(lateral_view) => {
1691 Box::new(lateral_view_output_columns(lateral_view))
1692 }
1693 _ => Box::new(source_info.alias.clone().into_iter()),
1694 }
1695}
1696
1697fn unnest_output_columns(
1698 unnest: &crate::expressions::UnnestFunc,
1699) -> impl Iterator<Item = String> + '_ {
1700 unnest
1701 .alias
1702 .iter()
1703 .map(|alias| alias.name.clone())
1704 .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1705}
1706
1707fn alias_output_columns(
1708 alias: &crate::expressions::Alias,
1709) -> Box<dyn Iterator<Item = String> + '_> {
1710 if alias.column_aliases.is_empty() {
1711 Box::new(std::iter::once(alias.alias.name.clone()))
1712 } else {
1713 Box::new(
1714 alias
1715 .column_aliases
1716 .iter()
1717 .map(|column| column.name.clone()),
1718 )
1719 }
1720}
1721
1722fn lateral_output_columns(
1723 lateral: &crate::expressions::Lateral,
1724) -> Box<dyn Iterator<Item = String> + '_> {
1725 if lateral.column_aliases.is_empty() {
1726 default_virtual_output_columns(&lateral.this)
1727 } else {
1728 Box::new(lateral.column_aliases.iter().cloned())
1729 }
1730}
1731
1732fn lateral_view_output_columns(
1733 lateral_view: &crate::expressions::LateralView,
1734) -> Box<dyn Iterator<Item = String> + '_> {
1735 Box::new(
1736 lateral_view
1737 .column_aliases
1738 .iter()
1739 .map(|column| column.name.clone()),
1740 )
1741}
1742
1743fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1744 match expr {
1745 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1746 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1747 alias_output_columns(alias)
1748 }
1749 Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1750 Box::new(
1751 ["seq", "key", "path", "index", "value", "this"]
1752 .into_iter()
1753 .map(String::from),
1754 )
1755 }
1756 _ => Box::new(std::iter::empty()),
1757 }
1758}
1759
1760fn attach_virtual_source_dependencies(
1761 node: &mut LineageNode,
1762 scope: &Scope,
1763 dialect: Option<DialectType>,
1764 source_alias: &str,
1765 source_expr: &Expression,
1766 trim_selects: bool,
1767 all_cte_scopes: &[&Scope],
1768 depth: usize,
1769) {
1770 let parent_name = node.name.clone();
1771 let mut seen = HashSet::new();
1772 for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1773 let key = (
1774 col_ref.table.as_ref().map(|t| t.name.clone()),
1775 col_ref.column.clone(),
1776 );
1777 if !seen.insert(key) {
1778 continue;
1779 }
1780
1781 if let Some(table_id) = col_ref.table {
1782 let table = table_id.name;
1783 if table == source_alias {
1784 continue;
1785 }
1786 resolve_qualified_column(
1787 node,
1788 scope,
1789 dialect,
1790 &table,
1791 &col_ref.column,
1792 &parent_name,
1793 trim_selects,
1794 all_cte_scopes,
1795 depth + 1,
1796 );
1797 } else {
1798 let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1799 if non_virtual_sources.len() == 1 {
1800 resolve_qualified_column(
1801 node,
1802 scope,
1803 dialect,
1804 &non_virtual_sources[0],
1805 &col_ref.column,
1806 &parent_name,
1807 trim_selects,
1808 all_cte_scopes,
1809 depth + 1,
1810 );
1811 }
1812 }
1813 }
1814}
1815
1816fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1817 fn source_name(expr: &Expression) -> Option<String> {
1818 match expr {
1819 Expression::Table(table) => Some(
1820 table
1821 .alias
1822 .as_ref()
1823 .map(|a| a.name.clone())
1824 .unwrap_or_else(|| table.name.name.clone()),
1825 ),
1826 Expression::Subquery(subquery) => {
1827 subquery.alias.as_ref().map(|alias| alias.name.clone())
1828 }
1829 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1830 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1831 Some(alias.alias.name.clone())
1832 }
1833 Expression::Lateral(lateral) => lateral.alias.clone(),
1834 Expression::LateralView(lateral_view) => lateral_view
1835 .table_alias
1836 .as_ref()
1837 .or_else(|| lateral_view.column_aliases.first())
1838 .map(|alias| alias.name.clone()),
1839 Expression::Pivot(pivot) => Some(pivot_lineage_source_name(
1840 &pivot.this,
1841 pivot.alias.as_ref().map(|alias| alias.name.as_str()),
1842 )),
1843 Expression::Unpivot(unpivot) => Some(pivot_lineage_source_name(
1844 &unpivot.this,
1845 unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
1846 )),
1847 Expression::Paren(paren) => source_name(&paren.this),
1848 _ => None,
1849 }
1850 }
1851
1852 let effective_expr = match &scope.expression {
1853 Expression::Cte(cte) => &cte.this,
1854 expr => expr,
1855 };
1856
1857 let mut names = Vec::new();
1858 let mut seen = std::collections::HashSet::new();
1859
1860 if let Expression::Select(select) = effective_expr {
1861 if let Some(from) = &select.from {
1862 for expr in &from.expressions {
1863 if let Some(name) = source_name(expr) {
1864 if !name.is_empty() && seen.insert(name.clone()) {
1865 names.push(name);
1866 }
1867 }
1868 }
1869 }
1870 for join in &select.joins {
1871 if let Some(name) = source_name(&join.this) {
1872 if !name.is_empty() && seen.insert(name.clone()) {
1873 names.push(name);
1874 }
1875 }
1876 }
1877 for lateral_view in &select.lateral_views {
1878 if let Some(name) =
1879 source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1880 {
1881 if !name.is_empty() && seen.insert(name.clone()) {
1882 names.push(name);
1883 }
1884 }
1885 }
1886 }
1887
1888 names
1889}
1890
1891fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1892 source_names_from_from_join(scope)
1893 .into_iter()
1894 .filter(|name| {
1895 !matches!(
1896 scope.sources.get(name).map(|source| source.kind),
1897 Some(SourceKind::Virtual)
1898 )
1899 })
1900 .collect()
1901}
1902
1903fn get_alias_or_name(expr: &Expression) -> Option<String> {
1909 match expr {
1910 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1911 Expression::Column(col) => Some(col.name.name.clone()),
1912 Expression::Identifier(id) => Some(id.name.clone()),
1913 Expression::Star(_) => Some("*".to_string()),
1914 Expression::Annotated(a) => get_alias_or_name(&a.this),
1917 _ => None,
1918 }
1919}
1920
1921fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1923 match column {
1924 ColumnRef::Name(n) => n.to_string(),
1925 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1926 }
1927}
1928
1929fn find_select_expr(
1931 scope_expr: &Expression,
1932 column: &ColumnRef<'_>,
1933 dialect: Option<DialectType>,
1934) -> Result<Expression> {
1935 if let Expression::Select(ref select) = scope_expr {
1936 match column {
1937 ColumnRef::Name(name) => {
1938 let normalized_name = normalize_column_name(name, dialect);
1939 for expr in &select.expressions {
1940 if let Some(alias_or_name) = get_alias_or_name(expr) {
1941 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1942 return Ok(expr.clone());
1943 }
1944 }
1945 }
1946 if let Some(expr) = synthesize_star_passthrough_expr(select, name) {
1947 return Ok(expr);
1948 }
1949 Err(crate::error::Error::parse(
1950 format!("Cannot find column '{}' in query", name),
1951 0,
1952 0,
1953 0,
1954 0,
1955 ))
1956 }
1957 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1958 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1959 }),
1960 }
1961 } else {
1962 Err(crate::error::Error::parse(
1963 "Expected SELECT expression for column lookup",
1964 0,
1965 0,
1966 0,
1967 0,
1968 ))
1969 }
1970}
1971
1972fn synthesize_star_passthrough_expr(select: &Select, name: &str) -> Option<Expression> {
1973 let sources = get_select_sources(select);
1974 if sources.is_empty() {
1975 return None;
1976 }
1977
1978 let mut candidate_aliases = Vec::new();
1979 let mut seen = HashSet::new();
1980
1981 for expr in &select.expressions {
1982 let aliases = match star_passthrough_source_aliases(expr, &sources) {
1983 StarPassthroughSources::None => continue,
1984 StarPassthroughSources::Ambiguous => return None,
1985 StarPassthroughSources::Aliases(aliases) => aliases,
1986 };
1987
1988 for alias in aliases {
1989 if seen.insert(alias.clone()) {
1990 candidate_aliases.push(alias);
1991 }
1992 }
1993 }
1994
1995 match candidate_aliases.as_slice() {
1996 [alias] => {
1997 let table = Identifier::new(alias.clone());
1998 Some(make_column_expr(name, Some(&table)))
1999 }
2000 _ => None,
2001 }
2002}
2003
2004enum StarPassthroughSources {
2005 None,
2006 Ambiguous,
2007 Aliases(Vec<String>),
2008}
2009
2010fn star_passthrough_source_aliases(
2011 expr: &Expression,
2012 sources: &[SourceInfo],
2013) -> StarPassthroughSources {
2014 match expr {
2015 Expression::Star(star) => star_source_aliases(star.table.as_ref(), sources),
2016 Expression::Column(column) if column.name.name == "*" => {
2017 star_source_aliases(column.table.as_ref(), sources)
2018 }
2019 Expression::Annotated(annotated) => {
2020 star_passthrough_source_aliases(&annotated.this, sources)
2021 }
2022 _ => StarPassthroughSources::None,
2023 }
2024}
2025
2026fn star_source_aliases(
2027 qualifier: Option<&Identifier>,
2028 sources: &[SourceInfo],
2029) -> StarPassthroughSources {
2030 if let Some(qualifier) = qualifier {
2031 let mut aliases = Vec::new();
2032
2033 for source in sources {
2034 if source_matches_star_qualifier(source, qualifier) {
2035 aliases.push(source.alias.clone());
2036 }
2037 }
2038
2039 return match aliases.len() {
2040 0 => StarPassthroughSources::None,
2041 1 => StarPassthroughSources::Aliases(aliases),
2042 _ => StarPassthroughSources::Ambiguous,
2043 };
2044 }
2045
2046 match sources {
2047 [source] if source.quoted => StarPassthroughSources::None,
2051 [source] => StarPassthroughSources::Aliases(vec![source.alias.clone()]),
2052 [] => StarPassthroughSources::None,
2053 _ => StarPassthroughSources::Ambiguous,
2054 }
2055}
2056
2057fn source_matches_star_qualifier(source: &SourceInfo, qualifier: &Identifier) -> bool {
2058 if source.normalized == normalize_cte_name(qualifier) {
2059 return true;
2060 }
2061
2062 if qualifier.quoted {
2063 source.alias == qualifier.name
2064 } else {
2065 source.alias.eq_ignore_ascii_case(&qualifier.name)
2066 }
2067}
2068
2069fn column_to_index(
2071 set_op_expr: &Expression,
2072 name: &str,
2073 dialect: Option<DialectType>,
2074) -> Result<usize> {
2075 let normalized_name = normalize_column_name(name, dialect);
2076 let mut expr = set_op_expr;
2077 loop {
2078 match expr {
2079 Expression::Union(u) => expr = &u.left,
2080 Expression::Intersect(i) => expr = &i.left,
2081 Expression::Except(e) => expr = &e.left,
2082 Expression::Select(select) => {
2083 for (i, e) in select.expressions.iter().enumerate() {
2084 if let Some(alias_or_name) = get_alias_or_name(e) {
2085 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2086 return Ok(i);
2087 }
2088 }
2089 }
2090 return Err(crate::error::Error::parse(
2091 format!("Cannot find column '{}' in set operation", name),
2092 0,
2093 0,
2094 0,
2095 0,
2096 ));
2097 }
2098 _ => {
2099 return Err(crate::error::Error::parse(
2100 "Expected SELECT or set operation",
2101 0,
2102 0,
2103 0,
2104 0,
2105 ))
2106 }
2107 }
2108 }
2109}
2110
2111fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
2112 normalize_name(name, dialect, false, true)
2113}
2114
2115fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
2117 if let Expression::Select(select) = select_expr {
2118 let mut trimmed = select.as_ref().clone();
2119 trimmed.expressions = vec![target_expr.clone()];
2120 Expression::Select(Box::new(trimmed))
2121 } else {
2122 select_expr.clone()
2123 }
2124}
2125
2126fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
2128 if scope.cte_sources.contains_key(source_name) {
2130 for cte_scope in &scope.cte_scopes {
2131 if let Expression::Cte(cte) = &cte_scope.expression {
2132 if cte.alias.name == source_name {
2133 return Some(cte_scope);
2134 }
2135 }
2136 }
2137 }
2138
2139 if let Some(source_info) = scope.sources.get(source_name) {
2141 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
2142 if let Expression::Subquery(sq) = &source_info.expression {
2143 for dt_scope in &scope.derived_table_scopes {
2144 if dt_scope.expression == sq.this {
2145 return Some(dt_scope);
2146 }
2147 }
2148 }
2149 }
2150 }
2151
2152 None
2153}
2154
2155fn find_child_scope_in<'a>(
2159 all_cte_scopes: &[&'a Scope],
2160 scope: &'a Scope,
2161 source_name: &str,
2162) -> Option<&'a Scope> {
2163 for cte_scope in &scope.cte_scopes {
2165 if let Expression::Cte(cte) = &cte_scope.expression {
2166 if cte.alias.name == source_name {
2167 return Some(cte_scope);
2168 }
2169 }
2170 }
2171
2172 for cte_scope in all_cte_scopes {
2174 if let Expression::Cte(cte) = &cte_scope.expression {
2175 if cte.alias.name == source_name {
2176 return Some(cte_scope);
2177 }
2178 }
2179 }
2180
2181 if let Some(source_info) = scope.sources.get(source_name) {
2183 if source_info.is_scope {
2184 if let Expression::Subquery(sq) = &source_info.expression {
2185 for dt_scope in &scope.derived_table_scopes {
2186 if dt_scope.expression == sq.this {
2187 return Some(dt_scope);
2188 }
2189 }
2190 }
2191 }
2192 }
2193
2194 None
2195}
2196
2197fn make_table_column_node(table: &str, column: &str) -> LineageNode {
2199 let mut node = LineageNode::new(
2200 format!("{}.{}", table, column),
2201 Expression::Column(Box::new(crate::expressions::Column {
2202 name: crate::expressions::Identifier::new(column.to_string()),
2203 table: Some(crate::expressions::Identifier::new(table.to_string())),
2204 join_mark: false,
2205 trailing_comments: vec![],
2206 span: None,
2207 inferred_type: None,
2208 })),
2209 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
2210 );
2211 node.source_name = table.to_string();
2212 node.source_kind = SourceKind::Table;
2213 node
2214}
2215
2216fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
2217 let mut parts: Vec<String> = Vec::new();
2218 if let Some(catalog) = &table_ref.catalog {
2219 parts.push(catalog.name.clone());
2220 }
2221 if let Some(schema) = &table_ref.schema {
2222 parts.push(schema.name.clone());
2223 }
2224 parts.push(table_ref.name.name.clone());
2225 parts.join(".")
2226}
2227
2228fn apply_source_info_context(
2229 node: &mut LineageNode,
2230 source_key: &str,
2231 source_info: &ScopeSourceInfo,
2232) {
2233 node.source_kind = source_info.kind;
2234 node.source_name =
2235 source_info
2236 .lineage_name
2237 .clone()
2238 .unwrap_or_else(|| match &source_info.expression {
2239 Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
2240 _ => source_key.to_string(),
2241 });
2242 node.source_alias = source_info.alias.clone();
2243}
2244
2245fn make_table_column_node_from_source(
2246 source_key: &str,
2247 column: &str,
2248 source_info: &ScopeSourceInfo,
2249) -> LineageNode {
2250 let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
2251 let mut node = LineageNode::new(
2252 format!("{}.{}", lineage_name, column),
2253 Expression::Column(Box::new(crate::expressions::Column {
2254 name: crate::expressions::Identifier::new(column.to_string()),
2255 table: Some(crate::expressions::Identifier::new(
2256 lineage_name.to_string(),
2257 )),
2258 join_mark: false,
2259 trailing_comments: vec![],
2260 span: None,
2261 inferred_type: None,
2262 })),
2263 source_info.expression.clone(),
2264 );
2265
2266 apply_source_info_context(&mut node, source_key, source_info);
2267
2268 node
2269}
2270
2271#[derive(Debug, Clone)]
2273struct SimpleColumnRef {
2274 table: Option<crate::expressions::Identifier>,
2275 column: String,
2276}
2277
2278fn find_column_refs_in_expr(
2280 expr: &Expression,
2281 dialect: Option<DialectType>,
2282) -> Vec<SimpleColumnRef> {
2283 let mut refs = Vec::new();
2284 collect_column_refs(expr, dialect, &mut refs, None);
2285 refs
2286}
2287
2288fn find_column_refs_in_expr_with_select(
2289 expr: &Expression,
2290 select_expr: &Expression,
2291 dialect: Option<DialectType>,
2292) -> Vec<SimpleColumnRef> {
2293 let named_windows = match select_expr {
2294 Expression::Select(select) => select.windows.as_deref(),
2295 _ => None,
2296 };
2297 let mut refs = Vec::new();
2298 collect_column_refs(expr, dialect, &mut refs, named_windows);
2299 refs
2300}
2301
2302fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
2303 match expr {
2304 Expression::Column(col) => {
2305 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
2306 }
2307 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
2308 _ => false,
2309 }
2310}
2311
2312fn collect_column_refs(
2313 expr: &Expression,
2314 dialect: Option<DialectType>,
2315 refs: &mut Vec<SimpleColumnRef>,
2316 named_windows: Option<&[NamedWindow]>,
2317) {
2318 let mut stack: Vec<&Expression> = vec![expr];
2319
2320 while let Some(current) = stack.pop() {
2321 match current {
2322 Expression::Column(col) => {
2324 refs.push(SimpleColumnRef {
2325 table: col.table.clone(),
2326 column: col.name.name.clone(),
2327 });
2328 }
2329
2330 Expression::Subquery(_) | Expression::Exists(_) => {}
2332
2333 Expression::And(op)
2335 | Expression::Or(op)
2336 | Expression::Eq(op)
2337 | Expression::Neq(op)
2338 | Expression::Lt(op)
2339 | Expression::Lte(op)
2340 | Expression::Gt(op)
2341 | Expression::Gte(op)
2342 | Expression::Add(op)
2343 | Expression::Sub(op)
2344 | Expression::Mul(op)
2345 | Expression::Div(op)
2346 | Expression::Mod(op)
2347 | Expression::BitwiseAnd(op)
2348 | Expression::BitwiseOr(op)
2349 | Expression::BitwiseXor(op)
2350 | Expression::BitwiseLeftShift(op)
2351 | Expression::BitwiseRightShift(op)
2352 | Expression::Concat(op)
2353 | Expression::Adjacent(op)
2354 | Expression::TsMatch(op)
2355 | Expression::PropertyEQ(op)
2356 | Expression::ArrayContainsAll(op)
2357 | Expression::ArrayContainedBy(op)
2358 | Expression::ArrayOverlaps(op)
2359 | Expression::JSONBContainsAllTopKeys(op)
2360 | Expression::JSONBContainsAnyTopKeys(op)
2361 | Expression::JSONBDeleteAtPath(op)
2362 | Expression::ExtendsLeft(op)
2363 | Expression::ExtendsRight(op)
2364 | Expression::Is(op)
2365 | Expression::MemberOf(op)
2366 | Expression::NullSafeEq(op)
2367 | Expression::NullSafeNeq(op)
2368 | Expression::Glob(op)
2369 | Expression::Match(op) => {
2370 stack.push(&op.left);
2371 stack.push(&op.right);
2372 }
2373
2374 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
2376 stack.push(&u.this);
2377 }
2378
2379 Expression::Upper(f)
2381 | Expression::Lower(f)
2382 | Expression::Length(f)
2383 | Expression::LTrim(f)
2384 | Expression::RTrim(f)
2385 | Expression::Reverse(f)
2386 | Expression::Abs(f)
2387 | Expression::Sqrt(f)
2388 | Expression::Cbrt(f)
2389 | Expression::Ln(f)
2390 | Expression::Exp(f)
2391 | Expression::Sign(f)
2392 | Expression::Date(f)
2393 | Expression::Time(f)
2394 | Expression::DateFromUnixDate(f)
2395 | Expression::UnixDate(f)
2396 | Expression::UnixSeconds(f)
2397 | Expression::UnixMillis(f)
2398 | Expression::UnixMicros(f)
2399 | Expression::TimeStrToDate(f)
2400 | Expression::DateToDi(f)
2401 | Expression::DiToDate(f)
2402 | Expression::TsOrDiToDi(f)
2403 | Expression::TsOrDsToDatetime(f)
2404 | Expression::TsOrDsToTimestamp(f)
2405 | Expression::YearOfWeek(f)
2406 | Expression::YearOfWeekIso(f)
2407 | Expression::Initcap(f)
2408 | Expression::Ascii(f)
2409 | Expression::Chr(f)
2410 | Expression::Soundex(f)
2411 | Expression::ByteLength(f)
2412 | Expression::Hex(f)
2413 | Expression::LowerHex(f)
2414 | Expression::Unicode(f)
2415 | Expression::Radians(f)
2416 | Expression::Degrees(f)
2417 | Expression::Sin(f)
2418 | Expression::Cos(f)
2419 | Expression::Tan(f)
2420 | Expression::Asin(f)
2421 | Expression::Acos(f)
2422 | Expression::Atan(f)
2423 | Expression::IsNan(f)
2424 | Expression::IsInf(f)
2425 | Expression::ArrayLength(f)
2426 | Expression::ArraySize(f)
2427 | Expression::Cardinality(f)
2428 | Expression::ArrayReverse(f)
2429 | Expression::ArrayDistinct(f)
2430 | Expression::ArrayFlatten(f)
2431 | Expression::ArrayCompact(f)
2432 | Expression::Explode(f)
2433 | Expression::ExplodeOuter(f)
2434 | Expression::ToArray(f)
2435 | Expression::MapFromEntries(f)
2436 | Expression::MapKeys(f)
2437 | Expression::MapValues(f)
2438 | Expression::JsonArrayLength(f)
2439 | Expression::JsonKeys(f)
2440 | Expression::JsonType(f)
2441 | Expression::ParseJson(f)
2442 | Expression::ToJson(f)
2443 | Expression::Typeof(f)
2444 | Expression::BitwiseCount(f)
2445 | Expression::Year(f)
2446 | Expression::Month(f)
2447 | Expression::Day(f)
2448 | Expression::Hour(f)
2449 | Expression::Minute(f)
2450 | Expression::Second(f)
2451 | Expression::DayOfWeek(f)
2452 | Expression::DayOfWeekIso(f)
2453 | Expression::DayOfMonth(f)
2454 | Expression::DayOfYear(f)
2455 | Expression::WeekOfYear(f)
2456 | Expression::Quarter(f)
2457 | Expression::Epoch(f)
2458 | Expression::EpochMs(f)
2459 | Expression::TimeStrToUnix(f)
2460 | Expression::SHA(f)
2461 | Expression::SHA1Digest(f)
2462 | Expression::TimeToUnix(f)
2463 | Expression::JSONBool(f)
2464 | Expression::Int64(f)
2465 | Expression::MD5NumberLower64(f)
2466 | Expression::MD5NumberUpper64(f)
2467 | Expression::DateStrToDate(f)
2468 | Expression::DateToDateStr(f) => {
2469 stack.push(&f.this);
2470 }
2471
2472 Expression::Power(f)
2474 | Expression::NullIf(f)
2475 | Expression::IfNull(f)
2476 | Expression::Nvl(f)
2477 | Expression::UnixToTimeStr(f)
2478 | Expression::Contains(f)
2479 | Expression::StartsWith(f)
2480 | Expression::EndsWith(f)
2481 | Expression::Levenshtein(f)
2482 | Expression::ModFunc(f)
2483 | Expression::Atan2(f)
2484 | Expression::IntDiv(f)
2485 | Expression::AddMonths(f)
2486 | Expression::MonthsBetween(f)
2487 | Expression::NextDay(f)
2488 | Expression::ArrayContains(f)
2489 | Expression::ArrayPosition(f)
2490 | Expression::ArrayAppend(f)
2491 | Expression::ArrayPrepend(f)
2492 | Expression::ArrayUnion(f)
2493 | Expression::ArrayExcept(f)
2494 | Expression::ArrayRemove(f)
2495 | Expression::StarMap(f)
2496 | Expression::MapFromArrays(f)
2497 | Expression::MapContainsKey(f)
2498 | Expression::ElementAt(f)
2499 | Expression::JsonMergePatch(f)
2500 | Expression::JSONBContains(f)
2501 | Expression::JSONBExtract(f) => {
2502 stack.push(&f.this);
2503 stack.push(&f.expression);
2504 }
2505
2506 Expression::Greatest(f)
2508 | Expression::Least(f)
2509 | Expression::Coalesce(f)
2510 | Expression::ArrayConcat(f)
2511 | Expression::ArrayIntersect(f)
2512 | Expression::ArrayZip(f)
2513 | Expression::MapConcat(f)
2514 | Expression::JsonArray(f) => {
2515 for e in &f.expressions {
2516 stack.push(e);
2517 }
2518 }
2519
2520 Expression::Sum(f)
2522 | Expression::Avg(f)
2523 | Expression::Min(f)
2524 | Expression::Max(f)
2525 | Expression::ArrayAgg(f)
2526 | Expression::CountIf(f)
2527 | Expression::Stddev(f)
2528 | Expression::StddevPop(f)
2529 | Expression::StddevSamp(f)
2530 | Expression::Variance(f)
2531 | Expression::VarPop(f)
2532 | Expression::VarSamp(f)
2533 | Expression::Median(f)
2534 | Expression::Mode(f)
2535 | Expression::First(f)
2536 | Expression::Last(f)
2537 | Expression::AnyValue(f)
2538 | Expression::ApproxDistinct(f)
2539 | Expression::ApproxCountDistinct(f)
2540 | Expression::LogicalAnd(f)
2541 | Expression::LogicalOr(f)
2542 | Expression::Skewness(f)
2543 | Expression::ArrayConcatAgg(f)
2544 | Expression::ArrayUniqueAgg(f)
2545 | Expression::BoolXorAgg(f)
2546 | Expression::BitwiseAndAgg(f)
2547 | Expression::BitwiseOrAgg(f)
2548 | Expression::BitwiseXorAgg(f) => {
2549 stack.push(&f.this);
2550 if let Some(ref filter) = f.filter {
2551 stack.push(filter);
2552 }
2553 if let Some((ref expr, _)) = f.having_max {
2554 stack.push(expr);
2555 }
2556 if let Some(ref limit) = f.limit {
2557 stack.push(limit);
2558 }
2559 }
2560
2561 Expression::Function(func) => {
2563 for arg in &func.args {
2564 stack.push(arg);
2565 }
2566 }
2567 Expression::AggregateFunction(func) => {
2568 for arg in &func.args {
2569 stack.push(arg);
2570 }
2571 if let Some(ref filter) = func.filter {
2572 stack.push(filter);
2573 }
2574 if let Some(ref limit) = func.limit {
2575 stack.push(limit);
2576 }
2577 }
2578
2579 Expression::WindowFunction(wf) => {
2581 stack.push(&wf.this);
2582 for e in &wf.over.partition_by {
2583 stack.push(e);
2584 }
2585 for e in &wf.over.order_by {
2586 stack.push(&e.this);
2587 }
2588 if let Some(keep) = &wf.keep {
2589 for e in &keep.order_by {
2590 stack.push(&e.this);
2591 }
2592 }
2593 if let (Some(window_name), Some(named_windows)) =
2594 (&wf.over.window_name, named_windows)
2595 {
2596 for named_window in named_windows {
2597 if named_window
2598 .name
2599 .name
2600 .eq_ignore_ascii_case(&window_name.name)
2601 {
2602 for e in &named_window.spec.partition_by {
2603 stack.push(e);
2604 }
2605 for e in &named_window.spec.order_by {
2606 stack.push(&e.this);
2607 }
2608 }
2609 }
2610 }
2611 }
2612
2613 Expression::Alias(a) => {
2615 stack.push(&a.this);
2616 }
2617 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
2618 stack.push(&c.this);
2619 if let Some(ref fmt) = c.format {
2620 stack.push(fmt);
2621 }
2622 if let Some(ref def) = c.default {
2623 stack.push(def);
2624 }
2625 }
2626 Expression::Paren(p) => {
2627 stack.push(&p.this);
2628 }
2629 Expression::Annotated(a) => {
2630 stack.push(&a.this);
2631 }
2632 Expression::Case(case) => {
2633 if let Some(ref operand) = case.operand {
2634 stack.push(operand);
2635 }
2636 for (cond, result) in &case.whens {
2637 stack.push(cond);
2638 stack.push(result);
2639 }
2640 if let Some(ref else_expr) = case.else_ {
2641 stack.push(else_expr);
2642 }
2643 }
2644 Expression::Collation(c) => {
2645 stack.push(&c.this);
2646 }
2647 Expression::In(i) => {
2648 stack.push(&i.this);
2649 for e in &i.expressions {
2650 stack.push(e);
2651 }
2652 if let Some(ref q) = i.query {
2653 stack.push(q);
2654 }
2655 if let Some(ref u) = i.unnest {
2656 stack.push(u);
2657 }
2658 }
2659 Expression::Between(b) => {
2660 stack.push(&b.this);
2661 stack.push(&b.low);
2662 stack.push(&b.high);
2663 }
2664 Expression::IsNull(n) => {
2665 stack.push(&n.this);
2666 }
2667 Expression::IsTrue(t) | Expression::IsFalse(t) => {
2668 stack.push(&t.this);
2669 }
2670 Expression::IsJson(j) => {
2671 stack.push(&j.this);
2672 }
2673 Expression::Like(l) | Expression::ILike(l) => {
2674 stack.push(&l.left);
2675 stack.push(&l.right);
2676 if let Some(ref esc) = l.escape {
2677 stack.push(esc);
2678 }
2679 }
2680 Expression::SimilarTo(s) => {
2681 stack.push(&s.this);
2682 stack.push(&s.pattern);
2683 if let Some(ref esc) = s.escape {
2684 stack.push(esc);
2685 }
2686 }
2687 Expression::Ordered(o) => {
2688 stack.push(&o.this);
2689 }
2690 Expression::Array(a) => {
2691 for e in &a.expressions {
2692 stack.push(e);
2693 }
2694 }
2695 Expression::Tuple(t) => {
2696 for e in &t.expressions {
2697 stack.push(e);
2698 }
2699 }
2700 Expression::Struct(s) => {
2701 for (_, e) in &s.fields {
2702 stack.push(e);
2703 }
2704 }
2705 Expression::Subscript(s) => {
2706 stack.push(&s.this);
2707 stack.push(&s.index);
2708 }
2709 Expression::Dot(d) => {
2710 stack.push(&d.this);
2711 }
2712 Expression::MethodCall(m) => {
2713 if !matches!(dialect, Some(DialectType::BigQuery))
2714 || !is_bigquery_safe_namespace_receiver(&m.this)
2715 {
2716 stack.push(&m.this);
2717 }
2718 for arg in &m.args {
2719 stack.push(arg);
2720 }
2721 }
2722 Expression::ArraySlice(s) => {
2723 stack.push(&s.this);
2724 if let Some(ref start) = s.start {
2725 stack.push(start);
2726 }
2727 if let Some(ref end) = s.end {
2728 stack.push(end);
2729 }
2730 }
2731 Expression::Lambda(l) => {
2732 stack.push(&l.body);
2733 }
2734 Expression::NamedArgument(n) => {
2735 stack.push(&n.value);
2736 }
2737 Expression::Lateral(l) => {
2738 stack.push(&l.this);
2739 if let Some(ref view) = l.view {
2740 stack.push(view);
2741 }
2742 if let Some(ref outer) = l.outer {
2743 stack.push(outer);
2744 }
2745 if let Some(ref ordinality) = l.ordinality {
2746 stack.push(ordinality);
2747 }
2748 }
2749 Expression::LateralView(lv) => {
2750 stack.push(&lv.this);
2751 }
2752 Expression::TryCatch(t) => {
2753 for stmt in &t.try_body {
2754 stack.push(stmt);
2755 }
2756 if let Some(catch_body) = &t.catch_body {
2757 for stmt in catch_body {
2758 stack.push(stmt);
2759 }
2760 }
2761 }
2762 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2763 stack.push(e);
2764 }
2765
2766 Expression::Substring(f) => {
2768 stack.push(&f.this);
2769 stack.push(&f.start);
2770 if let Some(ref len) = f.length {
2771 stack.push(len);
2772 }
2773 }
2774 Expression::Trim(f) => {
2775 stack.push(&f.this);
2776 if let Some(ref chars) = f.characters {
2777 stack.push(chars);
2778 }
2779 }
2780 Expression::Replace(f) => {
2781 stack.push(&f.this);
2782 stack.push(&f.old);
2783 stack.push(&f.new);
2784 }
2785 Expression::IfFunc(f) => {
2786 stack.push(&f.condition);
2787 stack.push(&f.true_value);
2788 if let Some(ref fv) = f.false_value {
2789 stack.push(fv);
2790 }
2791 }
2792 Expression::Nvl2(f) => {
2793 stack.push(&f.this);
2794 stack.push(&f.true_value);
2795 stack.push(&f.false_value);
2796 }
2797 Expression::ConcatWs(f) => {
2798 stack.push(&f.separator);
2799 for e in &f.expressions {
2800 stack.push(e);
2801 }
2802 }
2803 Expression::Count(f) => {
2804 if let Some(ref this) = f.this {
2805 stack.push(this);
2806 }
2807 if let Some(ref filter) = f.filter {
2808 stack.push(filter);
2809 }
2810 }
2811 Expression::GroupConcat(f) => {
2812 stack.push(&f.this);
2813 if let Some(ref sep) = f.separator {
2814 stack.push(sep);
2815 }
2816 if let Some(ref filter) = f.filter {
2817 stack.push(filter);
2818 }
2819 }
2820 Expression::StringAgg(f) => {
2821 stack.push(&f.this);
2822 if let Some(ref sep) = f.separator {
2823 stack.push(sep);
2824 }
2825 if let Some(ref filter) = f.filter {
2826 stack.push(filter);
2827 }
2828 if let Some(ref limit) = f.limit {
2829 stack.push(limit);
2830 }
2831 }
2832 Expression::ListAgg(f) => {
2833 stack.push(&f.this);
2834 if let Some(ref sep) = f.separator {
2835 stack.push(sep);
2836 }
2837 if let Some(ref filter) = f.filter {
2838 stack.push(filter);
2839 }
2840 }
2841 Expression::SumIf(f) => {
2842 stack.push(&f.this);
2843 stack.push(&f.condition);
2844 if let Some(ref filter) = f.filter {
2845 stack.push(filter);
2846 }
2847 }
2848 Expression::DateAdd(f) | Expression::DateSub(f) => {
2849 stack.push(&f.this);
2850 stack.push(&f.interval);
2851 }
2852 Expression::DateDiff(f) => {
2853 stack.push(&f.this);
2854 stack.push(&f.expression);
2855 }
2856 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2857 stack.push(&f.this);
2858 }
2859 Expression::Extract(f) => {
2860 stack.push(&f.this);
2861 }
2862 Expression::Round(f) => {
2863 stack.push(&f.this);
2864 if let Some(ref d) = f.decimals {
2865 stack.push(d);
2866 }
2867 }
2868 Expression::Floor(f) => {
2869 stack.push(&f.this);
2870 if let Some(ref s) = f.scale {
2871 stack.push(s);
2872 }
2873 if let Some(ref t) = f.to {
2874 stack.push(t);
2875 }
2876 }
2877 Expression::Ceil(f) => {
2878 stack.push(&f.this);
2879 if let Some(ref d) = f.decimals {
2880 stack.push(d);
2881 }
2882 if let Some(ref t) = f.to {
2883 stack.push(t);
2884 }
2885 }
2886 Expression::Log(f) => {
2887 stack.push(&f.this);
2888 if let Some(ref b) = f.base {
2889 stack.push(b);
2890 }
2891 }
2892 Expression::AtTimeZone(f) => {
2893 stack.push(&f.this);
2894 stack.push(&f.zone);
2895 }
2896 Expression::Lead(f) | Expression::Lag(f) => {
2897 stack.push(&f.this);
2898 if let Some(ref off) = f.offset {
2899 stack.push(off);
2900 }
2901 if let Some(ref def) = f.default {
2902 stack.push(def);
2903 }
2904 }
2905 Expression::FirstValue(f) | Expression::LastValue(f) => {
2906 stack.push(&f.this);
2907 }
2908 Expression::NthValue(f) => {
2909 stack.push(&f.this);
2910 stack.push(&f.offset);
2911 }
2912 Expression::Position(f) => {
2913 stack.push(&f.substring);
2914 stack.push(&f.string);
2915 if let Some(ref start) = f.start {
2916 stack.push(start);
2917 }
2918 }
2919 Expression::Decode(f) => {
2920 stack.push(&f.this);
2921 for (search, result) in &f.search_results {
2922 stack.push(search);
2923 stack.push(result);
2924 }
2925 if let Some(ref def) = f.default {
2926 stack.push(def);
2927 }
2928 }
2929 Expression::CharFunc(f) => {
2930 for arg in &f.args {
2931 stack.push(arg);
2932 }
2933 }
2934 Expression::ArraySort(f) => {
2935 stack.push(&f.this);
2936 if let Some(ref cmp) = f.comparator {
2937 stack.push(cmp);
2938 }
2939 }
2940 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2941 stack.push(&f.this);
2942 stack.push(&f.separator);
2943 if let Some(ref nr) = f.null_replacement {
2944 stack.push(nr);
2945 }
2946 }
2947 Expression::ArrayFilter(f) => {
2948 stack.push(&f.this);
2949 stack.push(&f.filter);
2950 }
2951 Expression::ArrayTransform(f) => {
2952 stack.push(&f.this);
2953 stack.push(&f.transform);
2954 }
2955 Expression::Sequence(f)
2956 | Expression::Generate(f)
2957 | Expression::ExplodingGenerateSeries(f) => {
2958 stack.push(&f.start);
2959 stack.push(&f.stop);
2960 if let Some(ref step) = f.step {
2961 stack.push(step);
2962 }
2963 }
2964 Expression::JsonExtract(f)
2965 | Expression::JsonExtractScalar(f)
2966 | Expression::JsonQuery(f)
2967 | Expression::JsonValue(f) => {
2968 stack.push(&f.this);
2969 stack.push(&f.path);
2970 }
2971 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2972 stack.push(&f.this);
2973 for p in &f.paths {
2974 stack.push(p);
2975 }
2976 }
2977 Expression::JsonObject(f) => {
2978 for (k, v) in &f.pairs {
2979 stack.push(k);
2980 stack.push(v);
2981 }
2982 }
2983 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2984 stack.push(&f.this);
2985 for (path, val) in &f.path_values {
2986 stack.push(path);
2987 stack.push(val);
2988 }
2989 }
2990 Expression::Overlay(f) => {
2991 stack.push(&f.this);
2992 stack.push(&f.replacement);
2993 stack.push(&f.from);
2994 if let Some(ref len) = f.length {
2995 stack.push(len);
2996 }
2997 }
2998 Expression::Convert(f) => {
2999 stack.push(&f.this);
3000 if let Some(ref style) = f.style {
3001 stack.push(style);
3002 }
3003 }
3004 Expression::ApproxPercentile(f) => {
3005 stack.push(&f.this);
3006 stack.push(&f.percentile);
3007 if let Some(ref acc) = f.accuracy {
3008 stack.push(acc);
3009 }
3010 if let Some(ref filter) = f.filter {
3011 stack.push(filter);
3012 }
3013 }
3014 Expression::Percentile(f)
3015 | Expression::PercentileCont(f)
3016 | Expression::PercentileDisc(f) => {
3017 stack.push(&f.this);
3018 stack.push(&f.percentile);
3019 if let Some(ref filter) = f.filter {
3020 stack.push(filter);
3021 }
3022 }
3023 Expression::WithinGroup(f) => {
3024 stack.push(&f.this);
3025 for e in &f.order_by {
3026 stack.push(&e.this);
3027 }
3028 }
3029 Expression::Left(f) | Expression::Right(f) => {
3030 stack.push(&f.this);
3031 stack.push(&f.length);
3032 }
3033 Expression::Repeat(f) => {
3034 stack.push(&f.this);
3035 stack.push(&f.times);
3036 }
3037 Expression::Lpad(f) | Expression::Rpad(f) => {
3038 stack.push(&f.this);
3039 stack.push(&f.length);
3040 if let Some(ref fill) = f.fill {
3041 stack.push(fill);
3042 }
3043 }
3044 Expression::Split(f) => {
3045 stack.push(&f.this);
3046 stack.push(&f.delimiter);
3047 }
3048 Expression::RegexpLike(f) => {
3049 stack.push(&f.this);
3050 stack.push(&f.pattern);
3051 if let Some(ref flags) = f.flags {
3052 stack.push(flags);
3053 }
3054 }
3055 Expression::RegexpReplace(f) => {
3056 stack.push(&f.this);
3057 stack.push(&f.pattern);
3058 stack.push(&f.replacement);
3059 if let Some(ref flags) = f.flags {
3060 stack.push(flags);
3061 }
3062 }
3063 Expression::RegexpExtract(f) => {
3064 stack.push(&f.this);
3065 stack.push(&f.pattern);
3066 if let Some(ref group) = f.group {
3067 stack.push(group);
3068 }
3069 }
3070 Expression::ToDate(f) => {
3071 stack.push(&f.this);
3072 if let Some(ref fmt) = f.format {
3073 stack.push(fmt);
3074 }
3075 }
3076 Expression::ToTimestamp(f) => {
3077 stack.push(&f.this);
3078 if let Some(ref fmt) = f.format {
3079 stack.push(fmt);
3080 }
3081 }
3082 Expression::DateFormat(f) | Expression::FormatDate(f) => {
3083 stack.push(&f.this);
3084 stack.push(&f.format);
3085 }
3086 Expression::LastDay(f) => {
3087 stack.push(&f.this);
3088 }
3089 Expression::FromUnixtime(f) => {
3090 stack.push(&f.this);
3091 if let Some(ref fmt) = f.format {
3092 stack.push(fmt);
3093 }
3094 }
3095 Expression::UnixTimestamp(f) => {
3096 if let Some(ref this) = f.this {
3097 stack.push(this);
3098 }
3099 if let Some(ref fmt) = f.format {
3100 stack.push(fmt);
3101 }
3102 }
3103 Expression::MakeDate(f) => {
3104 stack.push(&f.year);
3105 stack.push(&f.month);
3106 stack.push(&f.day);
3107 }
3108 Expression::MakeTimestamp(f) => {
3109 stack.push(&f.year);
3110 stack.push(&f.month);
3111 stack.push(&f.day);
3112 stack.push(&f.hour);
3113 stack.push(&f.minute);
3114 stack.push(&f.second);
3115 if let Some(ref tz) = f.timezone {
3116 stack.push(tz);
3117 }
3118 }
3119 Expression::TruncFunc(f) => {
3120 stack.push(&f.this);
3121 if let Some(ref d) = f.decimals {
3122 stack.push(d);
3123 }
3124 }
3125 Expression::ArrayFunc(f) => {
3126 for e in &f.expressions {
3127 stack.push(e);
3128 }
3129 }
3130 Expression::Unnest(f) => {
3131 stack.push(&f.this);
3132 for e in &f.expressions {
3133 stack.push(e);
3134 }
3135 }
3136 Expression::StructFunc(f) => {
3137 for (_, e) in &f.fields {
3138 stack.push(e);
3139 }
3140 }
3141 Expression::StructExtract(f) => {
3142 stack.push(&f.this);
3143 }
3144 Expression::NamedStruct(f) => {
3145 for (k, v) in &f.pairs {
3146 stack.push(k);
3147 stack.push(v);
3148 }
3149 }
3150 Expression::MapFunc(f) => {
3151 for k in &f.keys {
3152 stack.push(k);
3153 }
3154 for v in &f.values {
3155 stack.push(v);
3156 }
3157 }
3158 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
3159 stack.push(&f.this);
3160 stack.push(&f.transform);
3161 }
3162 Expression::JsonArrayAgg(f) => {
3163 stack.push(&f.this);
3164 if let Some(ref filter) = f.filter {
3165 stack.push(filter);
3166 }
3167 }
3168 Expression::JsonObjectAgg(f) => {
3169 stack.push(&f.key);
3170 stack.push(&f.value);
3171 if let Some(ref filter) = f.filter {
3172 stack.push(filter);
3173 }
3174 }
3175 Expression::NTile(f) => {
3176 if let Some(ref n) = f.num_buckets {
3177 stack.push(n);
3178 }
3179 }
3180 Expression::Rand(f) => {
3181 if let Some(ref s) = f.seed {
3182 stack.push(s);
3183 }
3184 if let Some(ref lo) = f.lower {
3185 stack.push(lo);
3186 }
3187 if let Some(ref hi) = f.upper {
3188 stack.push(hi);
3189 }
3190 }
3191 Expression::Any(q) | Expression::All(q) => {
3192 stack.push(&q.this);
3193 stack.push(&q.subquery);
3194 }
3195 Expression::Overlaps(o) => {
3196 if let Some(ref this) = o.this {
3197 stack.push(this);
3198 }
3199 if let Some(ref expr) = o.expression {
3200 stack.push(expr);
3201 }
3202 if let Some(ref ls) = o.left_start {
3203 stack.push(ls);
3204 }
3205 if let Some(ref le) = o.left_end {
3206 stack.push(le);
3207 }
3208 if let Some(ref rs) = o.right_start {
3209 stack.push(rs);
3210 }
3211 if let Some(ref re) = o.right_end {
3212 stack.push(re);
3213 }
3214 }
3215 Expression::Interval(i) => {
3216 if let Some(ref this) = i.this {
3217 stack.push(this);
3218 }
3219 }
3220 Expression::TimeStrToTime(f) => {
3221 stack.push(&f.this);
3222 if let Some(ref zone) = f.zone {
3223 stack.push(zone);
3224 }
3225 }
3226 Expression::JSONBExtractScalar(f) => {
3227 stack.push(&f.this);
3228 stack.push(&f.expression);
3229 if let Some(ref jt) = f.json_type {
3230 stack.push(jt);
3231 }
3232 }
3233 Expression::JSONExtract(f) => {
3234 stack.push(&f.this);
3235 stack.push(&f.expression);
3236 for e in &f.expressions {
3237 stack.push(e);
3238 }
3239 if let Some(ref option) = f.option {
3240 stack.push(option);
3241 }
3242 if let Some(ref on_condition) = f.on_condition {
3243 stack.push(on_condition);
3244 }
3245 }
3246
3247 _ => {}
3252 }
3253 }
3254}
3255
3256#[cfg(test)]
3261mod tests {
3262 use super::*;
3263 use crate::dialects::{Dialect, DialectType};
3264 use crate::expressions::DataType;
3265 use crate::optimizer::annotate_types::annotate_types;
3266 use crate::parse_one;
3267 use crate::schema::{MappingSchema, Schema};
3268
3269 fn parse(sql: &str) -> Expression {
3270 let dialect = Dialect::get(DialectType::Generic);
3271 let ast = dialect.parse(sql).unwrap();
3272 ast.into_iter().next().unwrap()
3273 }
3274
3275 fn parse_dialect(sql: &str, dialect_type: DialectType) -> Expression {
3276 let dialect = Dialect::get(dialect_type);
3277 let ast = dialect.parse(sql).unwrap();
3278 ast.into_iter().next().unwrap()
3279 }
3280
3281 fn lineage_names(node: &LineageNode) -> Vec<String> {
3282 node.walk().map(|n| n.name.clone()).collect()
3283 }
3284
3285 fn assert_lineage_contains(node: &LineageNode, expected: &str) {
3286 let names = lineage_names(node);
3287 assert!(
3288 names.iter().any(|name| name == expected),
3289 "expected {expected} in lineage, got {names:?}"
3290 );
3291 }
3292
3293 #[test]
3294 fn test_simple_lineage() {
3295 let expr = parse("SELECT a FROM t");
3296 let node = lineage("a", &expr, None, false).unwrap();
3297
3298 assert_eq!(node.name, "a");
3299 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
3300 let names = node.downstream_names();
3302 assert!(
3303 names.iter().any(|n| n == "t.a"),
3304 "Expected t.a in downstream, got: {:?}",
3305 names
3306 );
3307 }
3308
3309 #[test]
3310 fn test_lineage_walk() {
3311 let root = LineageNode {
3312 name: "col_a".to_string(),
3313 expression: Expression::Null(crate::expressions::Null),
3314 source: Expression::Null(crate::expressions::Null),
3315 downstream: vec![LineageNode::new(
3316 "t.a",
3317 Expression::Null(crate::expressions::Null),
3318 Expression::Null(crate::expressions::Null),
3319 )],
3320 source_name: String::new(),
3321 source_kind: SourceKind::Unknown,
3322 source_alias: None,
3323 reference_node_name: String::new(),
3324 };
3325
3326 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
3327 assert_eq!(names.len(), 2);
3328 assert_eq!(names[0], "col_a");
3329 assert_eq!(names[1], "t.a");
3330 }
3331
3332 #[test]
3333 fn test_aliased_column() {
3334 let expr = parse("SELECT a + 1 AS b FROM t");
3335 let node = lineage("b", &expr, None, false).unwrap();
3336
3337 assert_eq!(node.name, "b");
3338 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3340 assert!(
3341 all_names.iter().any(|n| n.contains("a")),
3342 "Expected to trace to column a, got: {:?}",
3343 all_names
3344 );
3345 }
3346
3347 #[test]
3348 fn test_qualified_column() {
3349 let expr = parse("SELECT t.a FROM t");
3350 let node = lineage("a", &expr, None, false).unwrap();
3351
3352 assert_eq!(node.name, "a");
3353 let names = node.downstream_names();
3354 assert!(
3355 names.iter().any(|n| n == "t.a"),
3356 "Expected t.a, got: {:?}",
3357 names
3358 );
3359 }
3360
3361 #[test]
3362 fn test_unqualified_column() {
3363 let expr = parse("SELECT a FROM t");
3364 let node = lineage("a", &expr, None, false).unwrap();
3365
3366 let names = node.downstream_names();
3368 assert!(
3369 names.iter().any(|n| n == "t.a"),
3370 "Expected t.a, got: {:?}",
3371 names
3372 );
3373 }
3374
3375 #[test]
3376 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
3377 let query = "SELECT name FROM users";
3378 let dialect = Dialect::get(DialectType::BigQuery);
3379 let expr = dialect
3380 .parse(query)
3381 .unwrap()
3382 .into_iter()
3383 .next()
3384 .expect("expected one expression");
3385
3386 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3387 schema
3388 .add_table("users", &[("name".into(), DataType::Text)], None)
3389 .expect("schema setup");
3390
3391 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
3392 .expect("lineage without schema");
3393 let mut expr_without = node_without_schema.expression.clone();
3394 annotate_types(
3395 &mut expr_without,
3396 Some(&schema),
3397 Some(DialectType::BigQuery),
3398 );
3399 assert_eq!(
3400 expr_without.inferred_type(),
3401 None,
3402 "Expected unresolved root type without schema-aware lineage qualification"
3403 );
3404
3405 let node_with_schema = lineage_with_schema(
3406 "name",
3407 &expr,
3408 Some(&schema),
3409 Some(DialectType::BigQuery),
3410 false,
3411 )
3412 .expect("lineage with schema");
3413 let mut expr_with = node_with_schema.expression.clone();
3414 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
3415
3416 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
3417 }
3418
3419 #[test]
3420 fn test_lineage_with_schema_correlated_scalar_subquery() {
3421 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
3422 let dialect = Dialect::get(DialectType::BigQuery);
3423 let expr = dialect
3424 .parse(query)
3425 .unwrap()
3426 .into_iter()
3427 .next()
3428 .expect("expected one expression");
3429
3430 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3431 schema
3432 .add_table(
3433 "t1",
3434 &[("id".into(), DataType::BigInt { length: None })],
3435 None,
3436 )
3437 .expect("schema setup");
3438 schema
3439 .add_table(
3440 "t2",
3441 &[
3442 ("id".into(), DataType::BigInt { length: None }),
3443 ("val".into(), DataType::BigInt { length: None }),
3444 ],
3445 None,
3446 )
3447 .expect("schema setup");
3448
3449 let node = lineage_with_schema(
3450 "id",
3451 &expr,
3452 Some(&schema),
3453 Some(DialectType::BigQuery),
3454 false,
3455 )
3456 .expect("lineage_with_schema should handle correlated scalar subqueries");
3457
3458 assert_eq!(node.name, "id");
3459 }
3460
3461 #[test]
3462 fn test_lineage_with_schema_join_using() {
3463 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
3464 let dialect = Dialect::get(DialectType::BigQuery);
3465 let expr = dialect
3466 .parse(query)
3467 .unwrap()
3468 .into_iter()
3469 .next()
3470 .expect("expected one expression");
3471
3472 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3473 schema
3474 .add_table(
3475 "t1",
3476 &[("a".into(), DataType::BigInt { length: None })],
3477 None,
3478 )
3479 .expect("schema setup");
3480 schema
3481 .add_table(
3482 "t2",
3483 &[("a".into(), DataType::BigInt { length: None })],
3484 None,
3485 )
3486 .expect("schema setup");
3487
3488 let node = lineage_with_schema(
3489 "a",
3490 &expr,
3491 Some(&schema),
3492 Some(DialectType::BigQuery),
3493 false,
3494 )
3495 .expect("lineage_with_schema should handle JOIN USING");
3496
3497 assert_eq!(node.name, "a");
3498 }
3499
3500 #[test]
3501 fn test_lineage_with_schema_qualified_table_name() {
3502 let query = "SELECT a FROM raw.t1";
3503 let dialect = Dialect::get(DialectType::BigQuery);
3504 let expr = dialect
3505 .parse(query)
3506 .unwrap()
3507 .into_iter()
3508 .next()
3509 .expect("expected one expression");
3510
3511 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3512 schema
3513 .add_table(
3514 "raw.t1",
3515 &[("a".into(), DataType::BigInt { length: None })],
3516 None,
3517 )
3518 .expect("schema setup");
3519
3520 let node = lineage_with_schema(
3521 "a",
3522 &expr,
3523 Some(&schema),
3524 Some(DialectType::BigQuery),
3525 false,
3526 )
3527 .expect("lineage_with_schema should handle dotted schema.table names");
3528
3529 assert_eq!(node.name, "a");
3530 }
3531
3532 #[test]
3533 fn test_lineage_with_schema_none_matches_lineage() {
3534 let expr = parse("SELECT a FROM t");
3535 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
3536 let with_none =
3537 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
3538
3539 assert_eq!(with_none.name, baseline.name);
3540 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
3541 }
3542
3543 #[test]
3544 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
3545 let dialect = Dialect::get(DialectType::BigQuery);
3546 let expr = dialect
3547 .parse("SELECT Name AS name FROM teams")
3548 .unwrap()
3549 .into_iter()
3550 .next()
3551 .expect("expected one expression");
3552
3553 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3554 schema
3555 .add_table(
3556 "teams",
3557 &[("Name".into(), DataType::String { length: None })],
3558 None,
3559 )
3560 .expect("schema setup");
3561
3562 let node = lineage_with_schema(
3563 "name",
3564 &expr,
3565 Some(&schema),
3566 Some(DialectType::BigQuery),
3567 false,
3568 )
3569 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
3570
3571 let names = node.downstream_names();
3572 assert!(
3573 names.iter().any(|n| n == "teams.Name"),
3574 "Expected teams.Name in downstream, got: {:?}",
3575 names
3576 );
3577 }
3578
3579 #[test]
3580 fn test_lineage_bigquery_mixed_case_alias_lookup() {
3581 let dialect = Dialect::get(DialectType::BigQuery);
3582 let expr = dialect
3583 .parse("SELECT Name AS Name FROM teams")
3584 .unwrap()
3585 .into_iter()
3586 .next()
3587 .expect("expected one expression");
3588
3589 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
3590 .expect("lineage should resolve mixed-case aliases in BigQuery");
3591
3592 assert_eq!(node.name, "name");
3593 }
3594
3595 #[test]
3596 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
3597 let expr = parse_one(
3598 r#"
3599SELECT date_val AS week_start
3600FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
3601"#,
3602 DialectType::BigQuery,
3603 )
3604 .expect("parse");
3605
3606 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
3607 .expect("lineage should resolve UNNEST alias as a source");
3608 let child = node
3609 .downstream
3610 .first()
3611 .expect("week_start should have downstream lineage");
3612
3613 assert_eq!(child.name, "_0.date_val");
3614 assert_eq!(child.source_name, "_0");
3615 assert_eq!(child.source_kind, SourceKind::Virtual);
3616 assert_eq!(child.source_alias.as_deref(), Some("date_val"));
3617
3618 let Expression::Column(column) = &child.expression else {
3619 panic!(
3620 "expected downstream column expression, got {:?}",
3621 child.expression
3622 );
3623 };
3624 assert_eq!(column.name.name, "date_val");
3625 assert_eq!(
3626 column.table.as_ref().map(|table| table.name.as_str()),
3627 Some("_0")
3628 );
3629 assert!(
3630 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
3631 "expected UNNEST source expression, got {:?}",
3632 child.source
3633 );
3634 }
3635
3636 #[test]
3637 fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
3638 let expr =
3639 parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
3640
3641 let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3642 let child = node.downstream.first().expect("id should have lineage");
3643
3644 assert_eq!(child.name, "date_val.id");
3645 assert_eq!(child.source_name, "date_val");
3646 assert_eq!(child.source_kind, SourceKind::Table);
3647 assert_eq!(child.source_alias, None);
3648 }
3649
3650 #[test]
3651 fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
3652 let expr = parse_one(
3653 r#"
3654SELECT a.a AS first_value, b.b AS second_value
3655FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
3656JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
3657"#,
3658 DialectType::BigQuery,
3659 )
3660 .expect("parse");
3661
3662 let first =
3663 lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3664 let second =
3665 lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3666
3667 let first_child = first.downstream.first().expect("first source");
3668 let second_child = second.downstream.first().expect("second source");
3669
3670 assert_eq!(first_child.name, "_0.a");
3671 assert_eq!(first_child.source_name, "_0");
3672 assert_eq!(first_child.source_alias.as_deref(), Some("a"));
3673 assert_eq!(first_child.source_kind, SourceKind::Virtual);
3674
3675 assert_eq!(second_child.name, "_1.b");
3676 assert_eq!(second_child.source_name, "_1");
3677 assert_eq!(second_child.source_alias.as_deref(), Some("b"));
3678 assert_eq!(second_child.source_kind, SourceKind::Virtual);
3679 }
3680
3681 #[test]
3682 fn test_lineage_table_backed_unnest_points_to_real_source_column() {
3683 let expr = parse_one(
3684 r#"
3685SELECT item.item AS item
3686FROM t JOIN UNNEST(t.items) AS item ON TRUE
3687"#,
3688 DialectType::BigQuery,
3689 )
3690 .expect("parse");
3691
3692 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3693 let virtual_child = node.downstream.first().expect("virtual item source");
3694 assert_eq!(virtual_child.name, "_0.item");
3695 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3696
3697 let real_child = virtual_child
3698 .downstream
3699 .first()
3700 .expect("UNNEST(t.items) should depend on t.items");
3701 assert_eq!(real_child.name, "t.items");
3702 assert_eq!(real_child.source_name, "t");
3703 assert_eq!(real_child.source_kind, SourceKind::Table);
3704 }
3705
3706 #[test]
3707 fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
3708 let expr = parse_one(
3709 r#"
3710SELECT item AS item
3711FROM t JOIN UNNEST(t.items) AS item ON TRUE
3712"#,
3713 DialectType::BigQuery,
3714 )
3715 .expect("parse");
3716
3717 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3718 let virtual_child = node.downstream.first().expect("virtual item source");
3719 assert_eq!(virtual_child.name, "_0.item");
3720 assert_eq!(virtual_child.source_name, "_0");
3721 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3722 assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
3723
3724 let real_child = virtual_child
3725 .downstream
3726 .first()
3727 .expect("UNNEST(t.items) should depend on t.items");
3728 assert_eq!(real_child.name, "t.items");
3729 assert_eq!(real_child.source_name, "t");
3730 assert_eq!(real_child.source_kind, SourceKind::Table);
3731 }
3732
3733 #[test]
3734 fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
3735 let cases = [
3736 (
3737 DialectType::PostgreSQL,
3738 "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
3739 ),
3740 (
3741 DialectType::Presto,
3742 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3743 ),
3744 (
3745 DialectType::Trino,
3746 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3747 ),
3748 ];
3749
3750 for (dialect, sql) in cases {
3751 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3752 let node = lineage("out", &expr, Some(dialect), false)
3753 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3754 let virtual_child = node
3755 .downstream
3756 .first()
3757 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3758
3759 assert_eq!(
3760 virtual_child.name, "_0.x",
3761 "unexpected virtual child for {dialect:?}"
3762 );
3763 assert_eq!(virtual_child.source_name, "_0");
3764 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3765 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3766
3767 let real_child = virtual_child
3768 .downstream
3769 .first()
3770 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3771 assert_eq!(real_child.name, "t.items");
3772 assert_eq!(real_child.source_kind, SourceKind::Table);
3773 }
3774 }
3775
3776 #[test]
3777 fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3778 let cases = [
3779 (
3780 DialectType::Spark,
3781 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3782 ),
3783 (
3784 DialectType::Hive,
3785 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3786 ),
3787 ];
3788
3789 for (dialect, sql) in cases {
3790 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3791 let node = lineage("out", &expr, Some(dialect), false)
3792 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3793 let virtual_child = node
3794 .downstream
3795 .first()
3796 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3797
3798 assert_eq!(virtual_child.name, "_0.x");
3799 assert_eq!(virtual_child.source_name, "_0");
3800 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3801 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3802
3803 let real_child = virtual_child
3804 .downstream
3805 .first()
3806 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3807 assert_eq!(real_child.name, "t.items");
3808 assert_eq!(real_child.source_kind, SourceKind::Table);
3809 }
3810 }
3811
3812 #[test]
3813 fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3814 let expr = parse_one(
3815 "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3816 DialectType::Snowflake,
3817 )
3818 .expect("parse");
3819
3820 let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3821 let virtual_child = node.downstream.first().expect("virtual flatten source");
3822 assert_eq!(virtual_child.name, "_0.value");
3823 assert_eq!(virtual_child.source_name, "_0");
3824 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3825 assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3826
3827 let real_child = virtual_child
3828 .downstream
3829 .first()
3830 .expect("FLATTEN input should depend on raw_events.payload");
3831 assert_eq!(real_child.name, "raw_events.payload");
3832 assert_eq!(real_child.source_kind, SourceKind::Table);
3833 }
3834
3835 #[test]
3836 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3837 let expr = parse_one(
3838 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3839 DialectType::Snowflake,
3840 )
3841 .expect("parse");
3842
3843 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3844 schema
3845 .add_table(
3846 "fact.some_daily_metrics",
3847 &[("date_utc".to_string(), DataType::Date)],
3848 None,
3849 )
3850 .expect("schema setup");
3851
3852 let node = lineage_with_schema(
3853 "recency",
3854 &expr,
3855 Some(&schema),
3856 Some(DialectType::Snowflake),
3857 false,
3858 )
3859 .expect("lineage_with_schema should not treat date part as a column");
3860
3861 let names = node.downstream_names();
3862 assert!(
3863 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3864 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3865 names
3866 );
3867 assert!(
3868 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3869 "Did not expect date part to appear as lineage column, got: {:?}",
3870 names
3871 );
3872 }
3873
3874 #[test]
3875 fn test_snowflake_datediff_parses_to_typed_ast() {
3876 let expr = parse_one(
3877 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3878 DialectType::Snowflake,
3879 )
3880 .expect("parse");
3881
3882 match expr {
3883 Expression::Select(select) => match &select.expressions[0] {
3884 Expression::Alias(alias) => match &alias.this {
3885 Expression::DateDiff(f) => {
3886 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3887 }
3888 other => panic!("expected DateDiff, got {other:?}"),
3889 },
3890 other => panic!("expected Alias, got {other:?}"),
3891 },
3892 other => panic!("expected Select, got {other:?}"),
3893 }
3894 }
3895
3896 #[test]
3897 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3898 let expr = parse_one(
3899 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3900 DialectType::Snowflake,
3901 )
3902 .expect("parse");
3903
3904 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3905 schema
3906 .add_table(
3907 "fact.some_daily_metrics",
3908 &[("date_utc".to_string(), DataType::Date)],
3909 None,
3910 )
3911 .expect("schema setup");
3912
3913 let node = lineage_with_schema(
3914 "next_day",
3915 &expr,
3916 Some(&schema),
3917 Some(DialectType::Snowflake),
3918 false,
3919 )
3920 .expect("lineage_with_schema should not treat DATEADD date part as a column");
3921
3922 let names = node.downstream_names();
3923 assert!(
3924 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3925 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3926 names
3927 );
3928 assert!(
3929 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3930 "Did not expect date part to appear as lineage column, got: {:?}",
3931 names
3932 );
3933 }
3934
3935 #[test]
3936 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3937 let expr = parse_one(
3938 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3939 DialectType::Snowflake,
3940 )
3941 .expect("parse");
3942
3943 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3944 schema
3945 .add_table(
3946 "fact.some_daily_metrics",
3947 &[("date_utc".to_string(), DataType::Date)],
3948 None,
3949 )
3950 .expect("schema setup");
3951
3952 let node = lineage_with_schema(
3953 "day_part",
3954 &expr,
3955 Some(&schema),
3956 Some(DialectType::Snowflake),
3957 false,
3958 )
3959 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3960
3961 let names = node.downstream_names();
3962 assert!(
3963 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3964 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3965 names
3966 );
3967 assert!(
3968 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3969 "Did not expect date part to appear as lineage column, got: {:?}",
3970 names
3971 );
3972 }
3973
3974 #[test]
3975 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3976 let expr = parse_one(
3977 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3978 DialectType::Snowflake,
3979 )
3980 .expect("parse");
3981
3982 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3983 schema
3984 .add_table(
3985 "fact.some_daily_metrics",
3986 &[("date_utc".to_string(), DataType::Date)],
3987 None,
3988 )
3989 .expect("schema setup");
3990
3991 let node = lineage_with_schema(
3992 "day_part",
3993 &expr,
3994 Some(&schema),
3995 Some(DialectType::Snowflake),
3996 false,
3997 )
3998 .expect("quoted DATE_PART should continue to work");
3999
4000 let names = node.downstream_names();
4001 assert!(
4002 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4003 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4004 names
4005 );
4006 }
4007
4008 #[test]
4009 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
4010 let expr = parse_one(
4011 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4012 DialectType::Snowflake,
4013 )
4014 .expect("parse");
4015
4016 match expr {
4017 Expression::Select(select) => match &select.expressions[0] {
4018 Expression::Alias(alias) => match &alias.this {
4019 Expression::Function(f) => {
4020 assert_eq!(f.name.to_uppercase(), "DATEADD");
4021 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4022 }
4023 other => panic!("expected generic DATEADD function, got {other:?}"),
4024 },
4025 other => panic!("expected Alias, got {other:?}"),
4026 },
4027 other => panic!("expected Select, got {other:?}"),
4028 }
4029 }
4030
4031 #[test]
4032 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
4033 let expr = parse_one(
4034 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4035 DialectType::Snowflake,
4036 )
4037 .expect("parse");
4038
4039 match expr {
4040 Expression::Select(select) => match &select.expressions[0] {
4041 Expression::Alias(alias) => match &alias.this {
4042 Expression::Function(f) => {
4043 assert_eq!(f.name.to_uppercase(), "DATE_PART");
4044 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4045 }
4046 other => panic!("expected generic DATE_PART function, got {other:?}"),
4047 },
4048 other => panic!("expected Alias, got {other:?}"),
4049 },
4050 other => panic!("expected Select, got {other:?}"),
4051 }
4052 }
4053
4054 #[test]
4055 fn test_snowflake_date_part_string_literal_stays_generic_function() {
4056 let expr = parse_one(
4057 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4058 DialectType::Snowflake,
4059 )
4060 .expect("parse");
4061
4062 match expr {
4063 Expression::Select(select) => match &select.expressions[0] {
4064 Expression::Alias(alias) => match &alias.this {
4065 Expression::Function(f) => {
4066 assert_eq!(f.name.to_uppercase(), "DATE_PART");
4067 }
4068 other => panic!("expected generic DATE_PART function, got {other:?}"),
4069 },
4070 other => panic!("expected Alias, got {other:?}"),
4071 },
4072 other => panic!("expected Select, got {other:?}"),
4073 }
4074 }
4075
4076 #[test]
4077 fn test_lineage_join() {
4078 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4079
4080 let node_a = lineage("a", &expr, None, false).unwrap();
4081 let names_a = node_a.downstream_names();
4082 assert!(
4083 names_a.iter().any(|n| n == "t.a"),
4084 "Expected t.a, got: {:?}",
4085 names_a
4086 );
4087
4088 let node_b = lineage("b", &expr, None, false).unwrap();
4089 let names_b = node_b.downstream_names();
4090 assert!(
4091 names_b.iter().any(|n| n == "s.b"),
4092 "Expected s.b, got: {:?}",
4093 names_b
4094 );
4095 }
4096
4097 #[test]
4098 fn test_lineage_alias_leaf_has_resolved_source_name() {
4099 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
4100 let node = lineage("col1", &expr, None, false).unwrap();
4101
4102 let names = node.downstream_names();
4104 assert!(
4105 names.iter().any(|n| n == "t1.col1"),
4106 "Expected aliased column edge t1.col1, got: {:?}",
4107 names
4108 );
4109
4110 let leaf = node
4112 .downstream
4113 .iter()
4114 .find(|n| n.name == "t1.col1")
4115 .expect("Expected t1.col1 leaf");
4116 assert_eq!(leaf.source_name, "table1");
4117 match &leaf.source {
4118 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
4119 _ => panic!("Expected leaf source to be a table expression"),
4120 }
4121 }
4122
4123 #[test]
4124 fn test_lineage_derived_table() {
4125 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
4126 let node = lineage("a", &expr, None, false).unwrap();
4127
4128 assert_eq!(node.name, "a");
4129 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4131 assert!(
4132 all_names.iter().any(|n| n == "t.a"),
4133 "Expected to trace through derived table to t.a, got: {:?}",
4134 all_names
4135 );
4136 }
4137
4138 #[test]
4139 fn test_lineage_cte() {
4140 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
4141 let node = lineage("a", &expr, None, false).unwrap();
4142
4143 assert_eq!(node.name, "a");
4144 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4145 assert!(
4146 all_names.iter().any(|n| n == "t.a"),
4147 "Expected to trace through CTE to t.a, got: {:?}",
4148 all_names
4149 );
4150 }
4151
4152 #[test]
4153 fn test_lineage_union() {
4154 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
4155 let node = lineage("a", &expr, None, false).unwrap();
4156
4157 assert_eq!(node.name, "a");
4158 assert_eq!(
4160 node.downstream.len(),
4161 2,
4162 "Expected 2 branches for UNION, got {}",
4163 node.downstream.len()
4164 );
4165 }
4166
4167 #[test]
4168 fn test_lineage_cte_union() {
4169 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
4170 let node = lineage("a", &expr, None, false).unwrap();
4171
4172 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4174 assert!(
4175 all_names.len() >= 3,
4176 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
4177 all_names
4178 );
4179 }
4180
4181 #[test]
4182 fn test_lineage_star() {
4183 let expr = parse("SELECT * FROM t");
4184 let node = lineage("*", &expr, None, false).unwrap();
4185
4186 assert_eq!(node.name, "*");
4187 assert!(
4189 !node.downstream.is_empty(),
4190 "Star should produce downstream nodes"
4191 );
4192 }
4193
4194 #[test]
4195 fn test_lineage_subquery_in_select() {
4196 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
4197 let node = lineage("x", &expr, None, false).unwrap();
4198
4199 assert_eq!(node.name, "x");
4200 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4202 assert!(
4203 all_names.len() >= 2,
4204 "Expected tracing into scalar subquery, got: {:?}",
4205 all_names
4206 );
4207 }
4208
4209 #[test]
4210 fn test_lineage_multiple_columns() {
4211 let expr = parse("SELECT a, b FROM t");
4212
4213 let node_a = lineage("a", &expr, None, false).unwrap();
4214 let node_b = lineage("b", &expr, None, false).unwrap();
4215
4216 assert_eq!(node_a.name, "a");
4217 assert_eq!(node_b.name, "b");
4218
4219 let names_a = node_a.downstream_names();
4221 let names_b = node_b.downstream_names();
4222 assert!(names_a.iter().any(|n| n == "t.a"));
4223 assert!(names_b.iter().any(|n| n == "t.b"));
4224 }
4225
4226 #[test]
4227 fn test_get_source_tables() {
4228 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4229 let node = lineage("a", &expr, None, false).unwrap();
4230
4231 let tables = get_source_tables(&node);
4232 assert!(
4233 tables.contains("t"),
4234 "Expected source table 't', got: {:?}",
4235 tables
4236 );
4237 }
4238
4239 #[test]
4240 fn test_lineage_column_not_found() {
4241 let expr = parse("SELECT a FROM t");
4242 let result = lineage("nonexistent", &expr, None, false);
4243 assert!(result.is_err());
4244 }
4245
4246 #[test]
4247 fn test_lineage_nested_cte() {
4248 let expr = parse(
4249 "WITH cte1 AS (SELECT a FROM t), \
4250 cte2 AS (SELECT a FROM cte1) \
4251 SELECT a FROM cte2",
4252 );
4253 let node = lineage("a", &expr, None, false).unwrap();
4254
4255 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4257 assert!(
4258 all_names.len() >= 3,
4259 "Expected to trace through nested CTEs, got: {:?}",
4260 all_names
4261 );
4262 }
4263
4264 #[test]
4265 fn test_trim_selects_true() {
4266 let expr = parse("SELECT a, b, c FROM t");
4267 let node = lineage("a", &expr, None, true).unwrap();
4268
4269 if let Expression::Select(select) = &node.source {
4271 assert_eq!(
4272 select.expressions.len(),
4273 1,
4274 "Trimmed source should have 1 expression, got {}",
4275 select.expressions.len()
4276 );
4277 } else {
4278 panic!("Expected Select source");
4279 }
4280 }
4281
4282 #[test]
4283 fn test_trim_selects_false() {
4284 let expr = parse("SELECT a, b, c FROM t");
4285 let node = lineage("a", &expr, None, false).unwrap();
4286
4287 if let Expression::Select(select) = &node.source {
4289 assert_eq!(
4290 select.expressions.len(),
4291 3,
4292 "Untrimmed source should have 3 expressions"
4293 );
4294 } else {
4295 panic!("Expected Select source");
4296 }
4297 }
4298
4299 #[test]
4300 fn test_lineage_expression_in_select() {
4301 let expr = parse("SELECT a + b AS c FROM t");
4302 let node = lineage("c", &expr, None, false).unwrap();
4303
4304 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4306 assert!(
4307 all_names.len() >= 3,
4308 "Expected to trace a + b to both columns, got: {:?}",
4309 all_names
4310 );
4311 }
4312
4313 #[test]
4314 fn test_set_operation_by_index() {
4315 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
4316
4317 let node = lineage("a", &expr, None, false).unwrap();
4319
4320 assert_eq!(node.downstream.len(), 2);
4322 }
4323
4324 fn print_node(node: &LineageNode, indent: usize) {
4327 let pad = " ".repeat(indent);
4328 println!(
4329 "{pad}name={:?} source_name={:?}",
4330 node.name, node.source_name
4331 );
4332 for child in &node.downstream {
4333 print_node(child, indent + 1);
4334 }
4335 }
4336
4337 #[test]
4338 fn test_issue18_repro() {
4339 let query = "SELECT UPPER(name) as upper_name FROM users";
4341 println!("Query: {query}\n");
4342
4343 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
4344 let exprs = dialect.parse(query).unwrap();
4345 let expr = &exprs[0];
4346
4347 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
4348 println!("lineage(\"upper_name\"):");
4349 print_node(&node, 1);
4350
4351 let names = node.downstream_names();
4352 assert!(
4353 names.iter().any(|n| n == "users.name"),
4354 "Expected users.name in downstream, got: {:?}",
4355 names
4356 );
4357 }
4358
4359 #[test]
4360 fn test_lineage_bigquery_safe_namespace_issue207() {
4361 let query = r#"
4362WITH import_cte AS (
4363 SELECT timestamp, data, operation
4364 FROM `project`.`dataset`.`source_table`
4365),
4366transform_cte AS (
4367 SELECT
4368 timestamp,
4369 SAFE.PARSE_JSON(data) AS json_data
4370 FROM import_cte
4371)
4372SELECT json_data FROM transform_cte
4373"#;
4374 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
4375 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4376 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4377 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4378
4379 assert!(
4380 names.iter().any(|name| name == "source_table.data"),
4381 "expected source_table.data in lineage, got {names:?}"
4382 );
4383 assert!(
4384 !names
4385 .iter()
4386 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
4387 "did not expect SAFE namespace receiver in lineage, got {names:?}"
4388 );
4389 }
4390
4391 #[test]
4392 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
4393 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
4394 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4395 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4396 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4397
4398 assert!(
4399 names.iter().any(|name| name == "t.data"),
4400 "expected t.data in lineage, got {names:?}"
4401 );
4402 assert!(
4403 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
4404 "did not expect SAFE namespace receiver in lineage, got {names:?}"
4405 );
4406 }
4407
4408 #[test]
4409 fn test_lineage_method_call_receiver_control() {
4410 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
4411 let node = lineage("out", &expr, None, false).expect("lineage");
4412 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4413
4414 assert!(
4415 names.iter().any(|name| name == "t.obj"),
4416 "expected ordinary method receiver to remain in lineage, got {names:?}"
4417 );
4418 assert!(
4419 names.iter().any(|name| name == "t.arg"),
4420 "expected method argument in lineage, got {names:?}"
4421 );
4422 }
4423
4424 #[test]
4425 fn test_lineage_upper_function() {
4426 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
4427 let node = lineage("upper_name", &expr, None, false).unwrap();
4428
4429 let names = node.downstream_names();
4430 assert!(
4431 names.iter().any(|n| n == "users.name"),
4432 "Expected users.name in downstream, got: {:?}",
4433 names
4434 );
4435 }
4436
4437 #[test]
4438 fn test_lineage_round_function() {
4439 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
4440 let node = lineage("rounded", &expr, None, false).unwrap();
4441
4442 let names = node.downstream_names();
4443 assert!(
4444 names.iter().any(|n| n == "products.price"),
4445 "Expected products.price in downstream, got: {:?}",
4446 names
4447 );
4448 }
4449
4450 #[test]
4451 fn test_lineage_coalesce_function() {
4452 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
4453 let node = lineage("val", &expr, None, false).unwrap();
4454
4455 let names = node.downstream_names();
4456 assert!(
4457 names.iter().any(|n| n == "t.a"),
4458 "Expected t.a in downstream, got: {:?}",
4459 names
4460 );
4461 assert!(
4462 names.iter().any(|n| n == "t.b"),
4463 "Expected t.b in downstream, got: {:?}",
4464 names
4465 );
4466 }
4467
4468 #[test]
4469 fn test_lineage_count_function() {
4470 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
4471 let node = lineage("cnt", &expr, None, false).unwrap();
4472
4473 let names = node.downstream_names();
4474 assert!(
4475 names.iter().any(|n| n == "t.id"),
4476 "Expected t.id in downstream, got: {:?}",
4477 names
4478 );
4479 }
4480
4481 #[test]
4482 fn test_lineage_sum_function() {
4483 let expr = parse("SELECT SUM(amount) AS total FROM t");
4484 let node = lineage("total", &expr, None, false).unwrap();
4485
4486 let names = node.downstream_names();
4487 assert!(
4488 names.iter().any(|n| n == "t.amount"),
4489 "Expected t.amount in downstream, got: {:?}",
4490 names
4491 );
4492 }
4493
4494 #[test]
4495 fn test_lineage_case_with_nested_functions() {
4496 let expr =
4497 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
4498 let node = lineage("result", &expr, None, false).unwrap();
4499
4500 let names = node.downstream_names();
4501 assert!(
4502 names.iter().any(|n| n == "t.x"),
4503 "Expected t.x in downstream, got: {:?}",
4504 names
4505 );
4506 assert!(
4507 names.iter().any(|n| n == "t.name"),
4508 "Expected t.name in downstream, got: {:?}",
4509 names
4510 );
4511 }
4512
4513 #[test]
4514 fn test_lineage_substring_function() {
4515 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
4516 let node = lineage("short", &expr, None, false).unwrap();
4517
4518 let names = node.downstream_names();
4519 assert!(
4520 names.iter().any(|n| n == "t.name"),
4521 "Expected t.name in downstream, got: {:?}",
4522 names
4523 );
4524 }
4525
4526 #[test]
4529 fn test_lineage_cte_select_star() {
4530 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
4534 let node = lineage("a", &expr, None, false).unwrap();
4535
4536 assert_eq!(node.name, "a");
4537 assert!(
4540 !node.downstream.is_empty(),
4541 "Expected downstream nodes tracing through CTE, got none"
4542 );
4543 }
4544
4545 #[test]
4546 fn test_lineage_schema_less_cte_star_passthrough_resolves_base_column() {
4547 let expr = parse("WITH c AS (SELECT * FROM t) SELECT c.x FROM c");
4548 let node = lineage("x", &expr, None, false).unwrap();
4549
4550 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4551 assert!(
4552 all_names.iter().any(|name| name == "t.x"),
4553 "Expected schema-less CTE star passthrough to reach t.x, got: {:?}",
4554 all_names
4555 );
4556
4557 let cte_node = node
4558 .walk()
4559 .find(|child| child.source_kind == SourceKind::Cte && child.source_name == "c")
4560 .expect("expected CTE hop with source_name c");
4561 assert_eq!(cte_node.source_kind, SourceKind::Cte);
4562 assert_eq!(cte_node.source_name, "c");
4563 }
4564
4565 #[test]
4566 fn test_lineage_schema_less_cte_star_passthrough_with_aggregation() {
4567 let expr = parse(
4568 "WITH c AS (SELECT * FROM t) \
4569 SELECT SUM(c.x) AS s FROM c GROUP BY 1",
4570 );
4571 let node = lineage("s", &expr, None, false).unwrap();
4572
4573 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4574 assert!(
4575 all_names.iter().any(|name| name == "t.x"),
4576 "Expected aggregate over CTE star passthrough to reach t.x, got: {:?}",
4577 all_names
4578 );
4579 }
4580
4581 #[test]
4582 fn test_lineage_schema_less_cte_star_passthrough_with_join_and_alias() {
4583 let expr = parse(
4584 "WITH a AS (SELECT * FROM t1), b AS (SELECT * FROM t2) \
4585 SELECT SUM(b.x) AS s FROM a LEFT JOIN b ON b.id = a.id GROUP BY a.k",
4586 );
4587 let node = lineage("s", &expr, None, false).unwrap();
4588
4589 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4590 assert!(
4591 all_names.iter().any(|name| name == "t2.x"),
4592 "Expected joined CTE star passthrough to reach t2.x, got: {:?}",
4593 all_names
4594 );
4595 }
4596
4597 #[test]
4598 fn test_lineage_schema_less_chained_cte_star_passthrough() {
4599 let expr = parse(
4600 "WITH c1 AS (SELECT * FROM t), \
4601 c2 AS (SELECT * FROM c1), \
4602 c3 AS (SELECT * FROM c2) \
4603 SELECT c3.x FROM c3",
4604 );
4605 let node = lineage("x", &expr, None, false).unwrap();
4606
4607 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4608 assert!(
4609 all_names.iter().any(|name| name == "t.x"),
4610 "Expected chained CTE star passthrough to reach t.x, got: {:?}",
4611 all_names
4612 );
4613 }
4614
4615 #[test]
4616 fn test_lineage_schema_less_unqualified_star_with_multiple_sources_does_not_guess() {
4617 let expr = parse("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
4618 let result = lineage("x", &expr, None, false);
4619
4620 assert!(
4621 result.is_err(),
4622 "Unqualified star over multiple sources should remain ambiguous, got: {:?}",
4623 result
4624 );
4625 }
4626
4627 #[test]
4628 fn test_lineage_cte_select_star_renamed_column() {
4629 let expr =
4632 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
4633 let node = lineage("customer_id", &expr, None, false).unwrap();
4634
4635 assert_eq!(node.name, "customer_id");
4636 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4638 assert!(
4639 all_names.len() >= 2,
4640 "Expected at least 2 nodes (customer_id → source), got: {:?}",
4641 all_names
4642 );
4643 }
4644
4645 #[test]
4646 fn test_lineage_cte_select_star_multiple_columns() {
4647 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
4649
4650 for col in &["a", "b", "c"] {
4651 let node = lineage(col, &expr, None, false).unwrap();
4652 assert_eq!(node.name, *col);
4653 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4655 assert!(
4656 all_names.len() >= 2,
4657 "Expected at least 2 nodes for column {}, got: {:?}",
4658 col,
4659 all_names
4660 );
4661 }
4662 }
4663
4664 #[test]
4665 fn test_lineage_nested_cte_select_star() {
4666 let expr = parse(
4668 "WITH cte1 AS (SELECT a FROM t), \
4669 cte2 AS (SELECT * FROM cte1) \
4670 SELECT * FROM cte2",
4671 );
4672 let node = lineage("a", &expr, None, false).unwrap();
4673
4674 assert_eq!(node.name, "a");
4675 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4676 assert!(
4677 all_names.len() >= 3,
4678 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
4679 all_names
4680 );
4681 }
4682
4683 #[test]
4684 fn test_lineage_three_level_nested_cte_star() {
4685 let expr = parse(
4687 "WITH cte1 AS (SELECT x FROM t), \
4688 cte2 AS (SELECT * FROM cte1), \
4689 cte3 AS (SELECT * FROM cte2) \
4690 SELECT * FROM cte3",
4691 );
4692 let node = lineage("x", &expr, None, false).unwrap();
4693
4694 assert_eq!(node.name, "x");
4695 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4696 assert!(
4697 all_names.len() >= 4,
4698 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
4699 all_names
4700 );
4701 }
4702
4703 #[test]
4704 fn test_lineage_cte_union_star() {
4705 let expr = parse(
4707 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
4708 SELECT * FROM cte",
4709 );
4710 let node = lineage("a", &expr, None, false).unwrap();
4711
4712 assert_eq!(node.name, "a");
4713 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4714 assert!(
4715 all_names.len() >= 2,
4716 "Expected at least 2 nodes for CTE union star, got: {:?}",
4717 all_names
4718 );
4719 }
4720
4721 #[test]
4722 fn test_lineage_cte_star_unknown_table() {
4723 let expr = parse(
4726 "WITH cte AS (SELECT * FROM unknown_table) \
4727 SELECT * FROM cte",
4728 );
4729 let _result = lineage("x", &expr, None, false);
4732 }
4733
4734 #[test]
4735 fn test_lineage_cte_explicit_columns() {
4736 let expr = parse(
4738 "WITH cte(x, y) AS (SELECT a, b FROM t) \
4739 SELECT * FROM cte",
4740 );
4741 let node = lineage("x", &expr, None, false).unwrap();
4742 assert_eq!(node.name, "x");
4743 }
4744
4745 #[test]
4746 fn test_lineage_cte_qualified_star() {
4747 let expr = parse(
4749 "WITH cte AS (SELECT a, b FROM t) \
4750 SELECT cte.* FROM cte",
4751 );
4752 for col in &["a", "b"] {
4753 let node = lineage(col, &expr, None, false).unwrap();
4754 assert_eq!(node.name, *col);
4755 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4756 assert!(
4757 all_names.len() >= 2,
4758 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
4759 col,
4760 all_names
4761 );
4762 }
4763 }
4764
4765 #[test]
4766 fn test_lineage_subquery_select_star() {
4767 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
4770 let node = lineage("x", &expr, None, false).unwrap();
4771
4772 assert_eq!(node.name, "x");
4773 assert!(
4774 !node.downstream.is_empty(),
4775 "Expected downstream nodes for subquery with SELECT *, got none"
4776 );
4777 }
4778
4779 #[test]
4780 fn test_lineage_cte_star_with_schema_external_table() {
4781 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
4783SELECT * FROM orders"#;
4784 let expr = parse(sql);
4785
4786 let mut schema = MappingSchema::new();
4787 let cols = vec![
4788 ("order_id".to_string(), DataType::Unknown),
4789 ("customer_id".to_string(), DataType::Unknown),
4790 ("amount".to_string(), DataType::Unknown),
4791 ];
4792 schema.add_table("stg_orders", &cols, None).unwrap();
4793
4794 let node =
4795 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4796 .unwrap();
4797 assert_eq!(node.name, "order_id");
4798 }
4799
4800 #[test]
4801 fn test_lineage_cte_star_with_schema_three_part_name() {
4802 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
4804SELECT * FROM orders"#;
4805 let expr = parse(sql);
4806
4807 let mut schema = MappingSchema::new();
4808 let cols = vec![
4809 ("order_id".to_string(), DataType::Unknown),
4810 ("customer_id".to_string(), DataType::Unknown),
4811 ];
4812 schema
4813 .add_table("db.schema.stg_orders", &cols, None)
4814 .unwrap();
4815
4816 let node = lineage_with_schema(
4817 "customer_id",
4818 &expr,
4819 Some(&schema as &dyn Schema),
4820 None,
4821 false,
4822 )
4823 .unwrap();
4824 assert_eq!(node.name, "customer_id");
4825 }
4826
4827 #[test]
4828 fn test_lineage_cte_star_with_schema_nested() {
4829 let sql = r#"WITH
4832 raw AS (SELECT * FROM external_table),
4833 enriched AS (SELECT * FROM raw)
4834 SELECT * FROM enriched"#;
4835 let expr = parse(sql);
4836
4837 let mut schema = MappingSchema::new();
4838 let cols = vec![
4839 ("id".to_string(), DataType::Unknown),
4840 ("name".to_string(), DataType::Unknown),
4841 ];
4842 schema.add_table("external_table", &cols, None).unwrap();
4843
4844 let node =
4845 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4846 assert_eq!(node.name, "name");
4847 }
4848
4849 #[test]
4850 fn test_lineage_cte_qualified_star_with_schema() {
4851 let sql = r#"WITH
4854 orders AS (SELECT * FROM stg_orders),
4855 enriched AS (
4856 SELECT orders.*, 'extra' AS extra
4857 FROM orders
4858 )
4859 SELECT * FROM enriched"#;
4860 let expr = parse(sql);
4861
4862 let mut schema = MappingSchema::new();
4863 let cols = vec![
4864 ("order_id".to_string(), DataType::Unknown),
4865 ("total".to_string(), DataType::Unknown),
4866 ];
4867 schema.add_table("stg_orders", &cols, None).unwrap();
4868
4869 let node =
4870 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4871 .unwrap();
4872 assert_eq!(node.name, "order_id");
4873
4874 let extra =
4876 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4877 assert_eq!(extra.name, "extra");
4878 }
4879
4880 #[test]
4881 fn test_lineage_cte_star_without_schema_still_works() {
4882 let sql = r#"WITH
4884 cte1 AS (SELECT id, name FROM raw_table),
4885 cte2 AS (SELECT * FROM cte1)
4886 SELECT * FROM cte2"#;
4887 let expr = parse(sql);
4888
4889 let node = lineage("id", &expr, None, false).unwrap();
4891 assert_eq!(node.name, "id");
4892 }
4893
4894 #[test]
4895 fn test_lineage_nested_cte_star_with_join_and_schema() {
4896 let sql = r#"WITH
4899base_orders AS (
4900 SELECT * FROM stg_orders
4901),
4902with_payments AS (
4903 SELECT
4904 base_orders.*,
4905 p.amount
4906 FROM base_orders
4907 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4908),
4909final_cte AS (
4910 SELECT * FROM with_payments
4911)
4912SELECT * FROM final_cte"#;
4913 let expr = parse(sql);
4914
4915 let mut schema = MappingSchema::new();
4916 let order_cols = vec![
4917 (
4918 "order_id".to_string(),
4919 crate::expressions::DataType::Unknown,
4920 ),
4921 (
4922 "customer_id".to_string(),
4923 crate::expressions::DataType::Unknown,
4924 ),
4925 ("status".to_string(), crate::expressions::DataType::Unknown),
4926 ];
4927 let pay_cols = vec![
4928 (
4929 "payment_id".to_string(),
4930 crate::expressions::DataType::Unknown,
4931 ),
4932 (
4933 "order_id".to_string(),
4934 crate::expressions::DataType::Unknown,
4935 ),
4936 ("amount".to_string(), crate::expressions::DataType::Unknown),
4937 ];
4938 schema.add_table("stg_orders", &order_cols, None).unwrap();
4939 schema.add_table("stg_payments", &pay_cols, None).unwrap();
4940
4941 let node =
4943 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4944 .unwrap();
4945 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4946
4947 let has_table_qualified = all_names
4949 .iter()
4950 .any(|n| n.contains('.') && n.contains("order_id"));
4951 assert!(
4952 has_table_qualified,
4953 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4954 all_names
4955 );
4956
4957 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4959 .unwrap();
4960 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4961
4962 let has_table_qualified = all_names
4963 .iter()
4964 .any(|n| n.contains('.') && n.contains("amount"));
4965 assert!(
4966 has_table_qualified,
4967 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4968 all_names
4969 );
4970 }
4971
4972 #[test]
4973 fn test_lineage_cte_alias_resolution() {
4974 let sql = r#"WITH import_stg_items AS (
4976 SELECT item_id, name, status FROM stg_items
4977)
4978SELECT base.item_id, base.status
4979FROM import_stg_items AS base"#;
4980 let expr = parse(sql);
4981
4982 let node = lineage("item_id", &expr, None, false).unwrap();
4983 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4984 assert!(
4986 all_names.iter().any(|n| n == "stg_items.item_id"),
4987 "Expected leaf 'stg_items.item_id', got: {:?}",
4988 all_names
4989 );
4990 }
4991
4992 #[test]
4993 fn test_lineage_cte_alias_with_schema_and_star() {
4994 let sql = r#"WITH import_stg AS (
4996 SELECT * FROM stg_items
4997)
4998SELECT base.item_id, base.status
4999FROM import_stg AS base"#;
5000 let expr = parse(sql);
5001
5002 let mut schema = MappingSchema::new();
5003 schema
5004 .add_table(
5005 "stg_items",
5006 &[
5007 ("item_id".to_string(), DataType::Unknown),
5008 ("name".to_string(), DataType::Unknown),
5009 ("status".to_string(), DataType::Unknown),
5010 ],
5011 None,
5012 )
5013 .unwrap();
5014
5015 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
5016 .unwrap();
5017 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5018 assert!(
5019 all_names.iter().any(|n| n == "stg_items.item_id"),
5020 "Expected leaf 'stg_items.item_id', got: {:?}",
5021 all_names
5022 );
5023 }
5024
5025 #[test]
5026 fn test_lineage_cte_alias_with_join() {
5027 let sql = r#"WITH
5029 import_users AS (SELECT id, name FROM users),
5030 import_orders AS (SELECT id, user_id, amount FROM orders)
5031SELECT u.name, o.amount
5032FROM import_users AS u
5033LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
5034 let expr = parse(sql);
5035
5036 let node = lineage("name", &expr, None, false).unwrap();
5037 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5038 assert!(
5039 all_names.iter().any(|n| n == "users.name"),
5040 "Expected leaf 'users.name', got: {:?}",
5041 all_names
5042 );
5043
5044 let node = lineage("amount", &expr, None, false).unwrap();
5045 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5046 assert!(
5047 all_names.iter().any(|n| n == "orders.amount"),
5048 "Expected leaf 'orders.amount', got: {:?}",
5049 all_names
5050 );
5051 }
5052
5053 #[test]
5058 fn test_lineage_unquoted_cte_case_insensitive() {
5059 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
5062 let node = lineage("col", &expr, None, false).unwrap();
5063 assert_eq!(node.name, "col");
5064 assert!(
5065 !node.downstream.is_empty(),
5066 "Unquoted CTE should resolve case-insensitively"
5067 );
5068 }
5069
5070 #[test]
5071 fn test_lineage_quoted_cte_case_preserved() {
5072 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
5074 let node = lineage("col", &expr, None, false).unwrap();
5075 assert_eq!(node.name, "col");
5076 assert!(
5077 !node.downstream.is_empty(),
5078 "Quoted CTE with matching case should resolve"
5079 );
5080 }
5081
5082 #[test]
5083 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
5084 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
5088 let result = lineage("col", &expr, None, false);
5091 assert!(
5092 result.is_err(),
5093 "Quoted CTE with case mismatch should not expand star: {:?}",
5094 result
5095 );
5096 }
5097
5098 #[test]
5099 fn test_lineage_mixed_quoted_unquoted_cte() {
5100 let expr = parse(
5102 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
5103 );
5104 let node = lineage("a", &expr, None, false).unwrap();
5105 assert_eq!(node.name, "a");
5106 assert!(
5107 !node.downstream.is_empty(),
5108 "Mixed quoted/unquoted CTE chain should resolve"
5109 );
5110 }
5111
5112 #[test]
5128 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
5129 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
5140 let node = lineage("col", &expr, None, false).unwrap();
5141 assert!(!node.downstream.is_empty());
5142 let child = &node.downstream[0];
5143 assert_eq!(
5145 child.source_name, "MyCte",
5146 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5147 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5148 );
5149 }
5150
5151 #[test]
5152 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
5153 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
5160 let node = lineage("col", &expr, None, false).unwrap();
5161 assert!(!node.downstream.is_empty());
5162 let child = &node.downstream[0];
5163 assert_eq!(
5165 child.source_name, "MyCte",
5166 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5167 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5168 );
5169 }
5170
5171 #[test]
5172 fn test_lineage_recursive_cte_terminates_at_base_case() {
5173 let expr = parse_dialect(
5174 "WITH RECURSIVE nums AS (\
5175 SELECT 1 AS n \
5176 UNION ALL \
5177 SELECT n + 1 FROM nums WHERE n < 5\
5178 ) SELECT n FROM nums",
5179 DialectType::DuckDB,
5180 );
5181 let node = lineage("n", &expr, Some(DialectType::DuckDB), false).unwrap();
5182 let names = lineage_names(&node);
5183
5184 assert!(
5185 names.len() <= 12,
5186 "recursive CTE lineage should not unroll repeatedly, got {names:?}"
5187 );
5188 assert!(
5189 node.walk()
5190 .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "nums"),
5191 "expected recursive source to be marked as a CTE, got {names:?}"
5192 );
5193 }
5194
5195 #[test]
5196 fn test_lineage_window_partition_and_order_columns() {
5197 let expr = parse(
5198 "WITH c AS (SELECT user_id, ts FROM events) \
5199 SELECT ROW_NUMBER() OVER (PARTITION BY c.user_id ORDER BY c.ts) AS out FROM c",
5200 );
5201 let node = lineage("out", &expr, None, false).unwrap();
5202
5203 assert_lineage_contains(&node, "events.user_id");
5204 assert_lineage_contains(&node, "events.ts");
5205 }
5206
5207 #[test]
5208 fn test_lineage_window_aggregate_order_column() {
5209 let expr = parse(
5210 "WITH c AS (SELECT amount, d FROM txns) \
5211 SELECT SUM(c.amount) OVER (ORDER BY c.d) AS running FROM c",
5212 );
5213 let node = lineage("running", &expr, None, false).unwrap();
5214
5215 assert_lineage_contains(&node, "txns.amount");
5216 assert_lineage_contains(&node, "txns.d");
5217 }
5218
5219 #[test]
5220 fn test_lineage_named_window_columns() {
5221 let expr = parse(
5222 "SELECT ROW_NUMBER() OVER w AS out \
5223 FROM events \
5224 WINDOW w AS (PARTITION BY user_id ORDER BY ts)",
5225 );
5226 let node = lineage("out", &expr, None, false).unwrap();
5227
5228 assert_lineage_contains(&node, "events.user_id");
5229 assert_lineage_contains(&node, "events.ts");
5230 }
5231
5232 #[test]
5233 fn test_lineage_within_group_order_column() {
5234 let expr =
5235 parse("SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS p FROM txns");
5236 let node = lineage("p", &expr, None, false).unwrap();
5237
5238 assert_lineage_contains(&node, "txns.amount");
5239 }
5240
5241 #[test]
5242 fn test_lineage_query_wrappers_resolve_inner_select() {
5243 for sql in [
5244 "CREATE TABLE tgt AS SELECT x FROM src",
5245 "CREATE VIEW v AS SELECT x FROM src",
5246 "INSERT INTO tgt SELECT x FROM src",
5247 ] {
5248 let expr = parse(sql);
5249 let node = lineage("x", &expr, None, false).unwrap();
5250 assert_lineage_contains(&node, "src.x");
5251 }
5252 }
5253
5254 #[test]
5255 fn test_lineage_scalar_subquery_through_cte_reaches_base_table() {
5256 let expr = parse(
5257 "WITH c AS (SELECT x FROM t) \
5258 SELECT (SELECT SUM(x) FROM c) AS s FROM c LIMIT 1",
5259 );
5260 let node = lineage("s", &expr, None, false).unwrap();
5261
5262 assert_lineage_contains(&node, "t.x");
5263 assert!(
5264 node.walk()
5265 .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "c"),
5266 "expected scalar subquery CTE hop in lineage, got {:?}",
5267 lineage_names(&node)
5268 );
5269 }
5270
5271 #[test]
5272 fn test_lineage_pivot_output_resolves_aggregation_input() {
5273 let expr = parse_dialect(
5274 "SELECT * FROM (SELECT region, q, amt FROM sales) \
5275 PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5276 DialectType::DuckDB,
5277 );
5278 let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5279
5280 assert_lineage_contains(&node, "sales.amt");
5281 }
5282
5283 #[test]
5284 fn test_lineage_pivot_through_cte_resolves_aggregation_input() {
5285 let expr = parse_dialect(
5286 "WITH src AS (SELECT region, q, amt FROM sales) \
5287 SELECT q1 FROM src PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5288 DialectType::DuckDB,
5289 );
5290 let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5291
5292 assert_lineage_contains(&node, "sales.amt");
5293 }
5294
5295 #[test]
5296 fn test_lineage_unpivot_value_resolves_input_columns() {
5297 let expr = parse_dialect(
5298 "SELECT name, val FROM t UNPIVOT(val FOR col IN (a, b, c))",
5299 DialectType::DuckDB,
5300 );
5301 let node = lineage("val", &expr, Some(DialectType::DuckDB), false).unwrap();
5302
5303 assert_lineage_contains(&node, "t.a");
5304 assert_lineage_contains(&node, "t.b");
5305 assert_lineage_contains(&node, "t.c");
5306 }
5307
5308 #[test]
5309 fn test_lineage_unpivot_multi_value_columns_resolve_positionally() {
5310 let expr = parse_dialect(
5311 "SELECT first_half_sales, second_half_sales, semester \
5312 FROM produce \
5313 UNPIVOT((first_half_sales, second_half_sales) \
5314 FOR semester IN ((q1, q2) AS 'semester_1', (q3, q4) AS 'semester_2'))",
5315 DialectType::BigQuery,
5316 );
5317
5318 let first = lineage(
5319 "first_half_sales",
5320 &expr,
5321 Some(DialectType::BigQuery),
5322 false,
5323 )
5324 .unwrap();
5325 assert_lineage_contains(&first, "produce.q1");
5326 assert_lineage_contains(&first, "produce.q3");
5327
5328 let second = lineage(
5329 "second_half_sales",
5330 &expr,
5331 Some(DialectType::BigQuery),
5332 false,
5333 )
5334 .unwrap();
5335 assert_lineage_contains(&second, "produce.q2");
5336 assert_lineage_contains(&second, "produce.q4");
5337 }
5338
5339 #[test]
5340 fn test_lineage_top_level_union_over_ctes_reaches_base_tables() {
5341 let expr = parse(
5342 "WITH a AS (SELECT x FROM t1), b AS (SELECT x FROM t2) \
5343 SELECT x FROM a UNION SELECT x FROM b",
5344 );
5345 let node = lineage("x", &expr, None, false).unwrap();
5346
5347 assert_lineage_contains(&node, "t1.x");
5348 assert_lineage_contains(&node, "t2.x");
5349 }
5350
5351 #[test]
5358 #[ignore = "requires derived table star expansion (separate issue)"]
5359 fn test_node_name_doesnt_contain_comment() {
5360 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
5361 let node = lineage("x", &expr, None, false).unwrap();
5362
5363 assert_eq!(node.name, "x");
5364 assert!(!node.downstream.is_empty());
5365 }
5366
5367 #[test]
5371 fn test_comment_before_first_column_in_cte() {
5372 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
5373 let sql_without_comment = "with t as (select 1 as a) select a from t";
5374
5375 let expr_ok = parse(sql_without_comment);
5377 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
5378
5379 let expr_comment = parse(sql_with_comment);
5381 let node_comment = lineage("a", &expr_comment, None, false)
5382 .expect("with comment before first column should succeed");
5383
5384 assert_eq!(node_ok.name, node_comment.name, "node names should match");
5385 assert_eq!(
5386 node_ok.downstream_names(),
5387 node_comment.downstream_names(),
5388 "downstream lineage should be identical with or without comment"
5389 );
5390 }
5391
5392 #[test]
5394 fn test_block_comment_before_first_column() {
5395 let sql = "with t as (select 1 as a) select /* section */ a from t";
5396 let expr = parse(sql);
5397 let node = lineage("a", &expr, None, false)
5398 .expect("block comment before first column should succeed");
5399 assert_eq!(node.name, "a");
5400 assert!(
5401 !node.downstream.is_empty(),
5402 "should have downstream lineage"
5403 );
5404 }
5405
5406 #[test]
5408 fn test_comment_before_first_column_second_col_ok() {
5409 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
5410 let expr = parse(sql);
5411
5412 let node_a =
5413 lineage("a", &expr, None, false).expect("column a with comment should succeed");
5414 assert_eq!(node_a.name, "a");
5415
5416 let node_b =
5417 lineage("b", &expr, None, false).expect("column b with comment should succeed");
5418 assert_eq!(node_b.name, "b");
5419 }
5420
5421 #[test]
5423 fn test_comment_before_aliased_column() {
5424 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
5425 let expr = parse(sql);
5426 let node =
5427 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
5428 assert_eq!(node.name, "y");
5429 assert!(
5430 !node.downstream.is_empty(),
5431 "aliased column should have downstream lineage"
5432 );
5433 }
5434}