1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub reference_node_name: String,
34}
35
36impl LineageNode {
37 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39 Self {
40 name: name.into(),
41 expression,
42 source,
43 downstream: Vec::new(),
44 source_name: String::new(),
45 reference_node_name: String::new(),
46 }
47 }
48
49 pub fn walk(&self) -> LineageWalker<'_> {
51 LineageWalker { stack: vec![self] }
52 }
53
54 pub fn downstream_names(&self) -> Vec<String> {
56 self.downstream.iter().map(|n| n.name.clone()).collect()
57 }
58}
59
60pub struct LineageWalker<'a> {
62 stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66 type Item = &'a LineageNode;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 if let Some(node) = self.stack.pop() {
70 for child in node.downstream.iter().rev() {
72 self.stack.push(child);
73 }
74 Some(node)
75 } else {
76 None
77 }
78 }
79}
80
81enum ColumnRef<'a> {
87 Name(&'a str),
88 Index(usize),
89}
90
91pub fn lineage(
117 column: &str,
118 sql: &Expression,
119 dialect: Option<DialectType>,
120 trim_selects: bool,
121) -> Result<LineageNode> {
122 let effective_sql = lineage_effective_expression(sql);
124 let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
125 if !has_with {
126 return lineage_from_expression(column, sql, dialect, trim_selects);
127 }
128 let mut owned = sql.clone();
129 expand_cte_stars(&mut owned, None);
130 lineage_from_expression(column, &owned, dialect, trim_selects)
131}
132
133pub fn lineage_with_schema(
149 column: &str,
150 sql: &Expression,
151 schema: Option<&dyn Schema>,
152 dialect: Option<DialectType>,
153 trim_selects: bool,
154) -> Result<LineageNode> {
155 let mut qualified_expression = if let Some(schema) = schema {
156 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
157 QualifyColumnsOptions::new().with_dialect(dialect_type)
158 } else {
159 QualifyColumnsOptions::new()
160 };
161
162 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
163 Error::internal(format!("Lineage qualification failed with schema: {}", e))
164 })?
165 } else {
166 sql.clone()
167 };
168
169 annotate_types(&mut qualified_expression, schema, dialect);
171
172 expand_cte_stars(&mut qualified_expression, schema);
175
176 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
177}
178
179fn lineage_from_expression(
180 column: &str,
181 sql: &Expression,
182 dialect: Option<DialectType>,
183 trim_selects: bool,
184) -> Result<LineageNode> {
185 let sql = lineage_effective_expression(sql);
186 let scope = build_scope(sql);
187 to_node(
188 ColumnRef::Name(column),
189 &scope,
190 dialect,
191 "",
192 "",
193 "",
194 trim_selects,
195 )
196}
197
198fn lineage_effective_expression(sql: &Expression) -> &Expression {
199 match sql {
200 Expression::Prepare(prepare) => &prepare.statement,
201 _ => sql,
202 }
203}
204
205fn normalize_cte_name(ident: &Identifier) -> String {
215 if ident.quoted {
216 ident.name.clone()
217 } else {
218 ident.name.to_lowercase()
219 }
220}
221
222pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
234 if let Expression::Prepare(prepare) = expr {
235 expand_cte_stars(&mut prepare.statement, schema);
236 return;
237 }
238
239 let select = match expr {
240 Expression::Select(s) => s,
241 _ => return,
242 };
243
244 let with = match &mut select.with {
245 Some(w) => w,
246 None => return,
247 };
248
249 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
250
251 for cte in &mut with.ctes {
252 let cte_name = normalize_cte_name(&cte.alias);
253
254 if !cte.columns.is_empty() {
256 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
257 resolved_cte_columns.insert(cte_name, cols);
258 continue;
259 }
260
261 if with.recursive {
267 let is_self_referencing =
268 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
269 let body_sources = get_select_sources(body_select);
270 body_sources.iter().any(|s| s.normalized == cte_name)
271 } else {
272 false
273 };
274 if is_self_referencing {
275 continue;
276 }
277 }
278
279 let body_select = match get_leftmost_select_mut(&mut cte.this) {
281 Some(s) => s,
282 None => continue,
283 };
284
285 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
286 resolved_cte_columns.insert(cte_name, columns);
287 }
288
289 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
291}
292
293fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
298 let mut current = expr;
299 for _ in 0..MAX_LINEAGE_DEPTH {
300 match current {
301 Expression::Select(s) => return Some(s),
302 Expression::Union(u) => current = &mut u.left,
303 Expression::Intersect(i) => current = &mut i.left,
304 Expression::Except(e) => current = &mut e.left,
305 Expression::Paren(p) => current = &mut p.this,
306 _ => return None,
307 }
308 }
309 None
310}
311
312fn rewrite_stars_in_select(
316 select: &mut Select,
317 resolved_ctes: &HashMap<String, Vec<String>>,
318 schema: Option<&dyn Schema>,
319) -> Vec<String> {
320 let has_star = select
325 .expressions
326 .iter()
327 .any(|e| matches!(e, Expression::Star(_)));
328 let has_qualified_star = select
329 .expressions
330 .iter()
331 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
332
333 if !has_star && !has_qualified_star {
334 return select
336 .expressions
337 .iter()
338 .filter_map(get_expression_output_name)
339 .collect();
340 }
341
342 let sources = get_select_sources(select);
343 let mut new_expressions = Vec::new();
344 let mut result_columns = Vec::new();
345
346 for expr in &select.expressions {
347 match expr {
348 Expression::Star(star) => {
349 let qual = star.table.as_ref();
350 if let Some(expanded) =
351 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
352 {
353 for (src_alias, col_name) in &expanded {
354 let table_id = Identifier::new(src_alias);
355 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
356 result_columns.push(col_name.clone());
357 }
358 } else {
359 new_expressions.push(expr.clone());
360 result_columns.push("*".to_string());
361 }
362 }
363 Expression::Column(c) if c.name.name == "*" => {
364 let qual = c.table.as_ref();
365 if let Some(expanded) =
366 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
367 {
368 for (_src_alias, col_name) in &expanded {
369 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
371 result_columns.push(col_name.clone());
372 }
373 } else {
374 new_expressions.push(expr.clone());
375 result_columns.push("*".to_string());
376 }
377 }
378 _ => {
379 new_expressions.push(expr.clone());
380 if let Some(name) = get_expression_output_name(expr) {
381 result_columns.push(name);
382 }
383 }
384 }
385 }
386
387 select.expressions = new_expressions;
388 result_columns
389}
390
391fn expand_star_from_sources(
396 qualifier: Option<&Identifier>,
397 sources: &[SourceInfo],
398 resolved_ctes: &HashMap<String, Vec<String>>,
399 schema: Option<&dyn Schema>,
400) -> Option<Vec<(String, String)>> {
401 let mut expanded = Vec::new();
402
403 if let Some(qual) = qualifier {
404 let qual_normalized = normalize_cte_name(qual);
406 for src in sources {
407 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
408 if let Some(cols) = resolved_ctes.get(&src.normalized) {
410 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
411 return Some(expanded);
412 }
413 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
415 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
416 return Some(expanded);
417 }
418 }
419 }
420 None
421 } else {
422 let mut any_expanded = false;
428 for src in sources {
429 if let Some(cols) = resolved_ctes.get(&src.normalized) {
430 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
431 any_expanded = true;
432 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
433 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
434 any_expanded = true;
435 } else {
436 return None;
437 }
438 }
439 if any_expanded {
440 Some(expanded)
441 } else {
442 None
443 }
444 }
445}
446
447fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
449 let schema = schema?;
450 if fq_name.is_empty() {
451 return None;
452 }
453 schema
454 .column_names(fq_name)
455 .ok()
456 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
457}
458
459fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
461 Expression::Column(Box::new(crate::expressions::Column {
462 name: Identifier::new(name),
463 table: table.cloned(),
464 join_mark: false,
465 trailing_comments: Vec::new(),
466 span: None,
467 inferred_type: None,
468 }))
469}
470
471fn get_expression_output_name(expr: &Expression) -> Option<String> {
473 match expr {
474 Expression::Alias(a) => Some(a.alias.name.clone()),
475 Expression::Column(c) => Some(c.name.name.clone()),
476 Expression::Identifier(id) => Some(id.name.clone()),
477 Expression::Star(_) => Some("*".to_string()),
478 _ => None,
479 }
480}
481
482struct SourceInfo {
484 alias: String,
485 normalized: String,
487 fq_name: String,
489}
490
491fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
494 let mut sources = Vec::new();
495
496 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
497 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
498 SourceInfo {
499 alias: alias.name.clone(),
500 normalized: normalize_cte_name(alias),
501 fq_name: alias.name.clone(),
502 }
503 }
504
505 match expr {
506 Expression::Table(t) => {
507 let normalized = normalize_cte_name(&t.name);
508 let alias = t
509 .alias
510 .as_ref()
511 .map(|a| a.name.clone())
512 .unwrap_or_else(|| t.name.name.clone());
513 let mut parts = Vec::new();
514 if let Some(catalog) = &t.catalog {
515 parts.push(catalog.name.clone());
516 }
517 if let Some(schema) = &t.schema {
518 parts.push(schema.name.clone());
519 }
520 parts.push(t.name.name.clone());
521 let fq_name = parts.join(".");
522 Some(SourceInfo {
523 alias,
524 normalized,
525 fq_name,
526 })
527 }
528 Expression::Subquery(s) => {
529 let alias = s.alias.as_ref()?.name.clone();
530 let normalized = alias.to_lowercase();
531 let fq_name = alias.clone();
532 Some(SourceInfo {
533 alias,
534 normalized,
535 fq_name,
536 })
537 }
538 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
539 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
540 Some(virtual_source_info(&a.alias))
541 }
542 Expression::Paren(p) => extract_source(&p.this),
543 _ => None,
544 }
545 }
546
547 if let Some(from) = &select.from {
548 for expr in &from.expressions {
549 if let Some(info) = extract_source(expr) {
550 sources.push(info);
551 }
552 }
553 }
554 for join in &select.joins {
555 if let Some(info) = extract_source(&join.this) {
556 sources.push(info);
557 }
558 }
559 sources
560}
561
562pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
564 let mut tables = HashSet::new();
565 collect_source_tables(node, &mut tables);
566 tables
567}
568
569pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
571 if let Expression::Table(table) = &node.source {
572 tables.insert(table.name.name.clone());
573 }
574 for child in &node.downstream {
575 collect_source_tables(child, tables);
576 }
577}
578
579const MAX_LINEAGE_DEPTH: usize = 64;
586
587fn to_node(
589 column: ColumnRef<'_>,
590 scope: &Scope,
591 dialect: Option<DialectType>,
592 scope_name: &str,
593 source_name: &str,
594 reference_node_name: &str,
595 trim_selects: bool,
596) -> Result<LineageNode> {
597 to_node_inner(
598 column,
599 scope,
600 dialect,
601 scope_name,
602 source_name,
603 reference_node_name,
604 trim_selects,
605 &[],
606 0,
607 )
608}
609
610fn to_node_inner(
611 column: ColumnRef<'_>,
612 scope: &Scope,
613 dialect: Option<DialectType>,
614 scope_name: &str,
615 source_name: &str,
616 reference_node_name: &str,
617 trim_selects: bool,
618 ancestor_cte_scopes: &[Scope],
619 depth: usize,
620) -> Result<LineageNode> {
621 if depth > MAX_LINEAGE_DEPTH {
622 return Err(Error::internal(format!(
623 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
624 )));
625 }
626 let scope_expr = &scope.expression;
627
628 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
630 for s in ancestor_cte_scopes {
631 all_cte_scopes.push(s);
632 }
633
634 let effective_expr = match scope_expr {
637 Expression::Cte(cte) => &cte.this,
638 other => other,
639 };
640
641 if matches!(
643 effective_expr,
644 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
645 ) {
646 if matches!(scope_expr, Expression::Cte(_)) {
648 let mut inner_scope = Scope::new(effective_expr.clone());
649 inner_scope.union_scopes = scope.union_scopes.clone();
650 inner_scope.sources = scope.sources.clone();
651 inner_scope.cte_sources = scope.cte_sources.clone();
652 inner_scope.cte_scopes = scope.cte_scopes.clone();
653 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
654 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
655 return handle_set_operation(
656 &column,
657 &inner_scope,
658 dialect,
659 scope_name,
660 source_name,
661 reference_node_name,
662 trim_selects,
663 ancestor_cte_scopes,
664 depth,
665 );
666 }
667 return handle_set_operation(
668 &column,
669 scope,
670 dialect,
671 scope_name,
672 source_name,
673 reference_node_name,
674 trim_selects,
675 ancestor_cte_scopes,
676 depth,
677 );
678 }
679
680 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
682 let column_name = resolve_column_name(&column, &select_expr);
683
684 let node_source = if trim_selects {
686 trim_source(effective_expr, &select_expr)
687 } else {
688 effective_expr.clone()
689 };
690
691 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
693 node.source_name = source_name.to_string();
694 node.reference_node_name = reference_node_name.to_string();
695
696 if matches!(&select_expr, Expression::Star(_)) {
698 for (name, source_info) in &scope.sources {
699 let child = LineageNode::new(
700 format!("{}.*", name),
701 Expression::Star(crate::expressions::Star {
702 table: None,
703 except: None,
704 replace: None,
705 rename: None,
706 trailing_comments: vec![],
707 span: None,
708 }),
709 source_info.expression.clone(),
710 );
711 node.downstream.push(child);
712 }
713 return Ok(node);
714 }
715
716 let subqueries: Vec<&Expression> =
718 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
719 for sq_expr in subqueries {
720 if let Expression::Subquery(sq) = sq_expr {
721 for sq_scope in &scope.subquery_scopes {
722 if sq_scope.expression == sq.this {
723 if let Ok(child) = to_node_inner(
724 ColumnRef::Index(0),
725 sq_scope,
726 dialect,
727 &column_name,
728 "",
729 "",
730 trim_selects,
731 ancestor_cte_scopes,
732 depth + 1,
733 ) {
734 node.downstream.push(child);
735 }
736 break;
737 }
738 }
739 }
740 }
741
742 let col_refs = find_column_refs_in_expr(&select_expr, dialect);
744 for col_ref in col_refs {
745 let col_name = &col_ref.column;
746 if let Some(ref table_id) = col_ref.table {
747 let tbl = &table_id.name;
748 resolve_qualified_column(
749 &mut node,
750 scope,
751 dialect,
752 tbl,
753 col_name,
754 &column_name,
755 trim_selects,
756 &all_cte_scopes,
757 depth,
758 );
759 } else {
760 resolve_unqualified_column(
761 &mut node,
762 scope,
763 dialect,
764 col_name,
765 &column_name,
766 trim_selects,
767 &all_cte_scopes,
768 depth,
769 );
770 }
771 }
772
773 Ok(node)
774}
775
776fn handle_set_operation(
781 column: &ColumnRef<'_>,
782 scope: &Scope,
783 dialect: Option<DialectType>,
784 scope_name: &str,
785 source_name: &str,
786 reference_node_name: &str,
787 trim_selects: bool,
788 ancestor_cte_scopes: &[Scope],
789 depth: usize,
790) -> Result<LineageNode> {
791 let scope_expr = &scope.expression;
792
793 let col_index = match column {
795 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
796 ColumnRef::Index(i) => *i,
797 };
798
799 let col_name = match column {
800 ColumnRef::Name(name) => name.to_string(),
801 ColumnRef::Index(_) => format!("_{col_index}"),
802 };
803
804 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
805 node.source_name = source_name.to_string();
806 node.reference_node_name = reference_node_name.to_string();
807
808 for branch_scope in &scope.union_scopes {
810 if let Ok(child) = to_node_inner(
811 ColumnRef::Index(col_index),
812 branch_scope,
813 dialect,
814 scope_name,
815 "",
816 "",
817 trim_selects,
818 ancestor_cte_scopes,
819 depth + 1,
820 ) {
821 node.downstream.push(child);
822 }
823 }
824
825 Ok(node)
826}
827
828fn resolve_qualified_column(
833 node: &mut LineageNode,
834 scope: &Scope,
835 dialect: Option<DialectType>,
836 table: &str,
837 col_name: &str,
838 parent_name: &str,
839 trim_selects: bool,
840 all_cte_scopes: &[&Scope],
841 depth: usize,
842) {
843 let resolved_cte_name = resolve_cte_alias(scope, table);
846 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
847
848 let is_cte = scope.cte_sources.contains_key(effective_table)
851 || all_cte_scopes.iter().any(
852 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
853 );
854 if is_cte {
855 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
856 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
858 if let Ok(child) = to_node_inner(
859 ColumnRef::Name(col_name),
860 child_scope,
861 dialect,
862 parent_name,
863 effective_table,
864 parent_name,
865 trim_selects,
866 &ancestors,
867 depth + 1,
868 ) {
869 node.downstream.push(child);
870 return;
871 }
872 }
873 }
874
875 if let Some(source_info) = scope.sources.get(table) {
877 if source_info.is_scope {
878 if let Some(child_scope) = find_child_scope(scope, table) {
879 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
880 if let Ok(child) = to_node_inner(
881 ColumnRef::Name(col_name),
882 child_scope,
883 dialect,
884 parent_name,
885 table,
886 parent_name,
887 trim_selects,
888 &ancestors,
889 depth + 1,
890 ) {
891 node.downstream.push(child);
892 return;
893 }
894 }
895 }
896 }
897
898 if let Some(source_info) = scope.sources.get(table) {
901 if !source_info.is_scope {
902 node.downstream.push(make_table_column_node_from_source(
903 table,
904 col_name,
905 &source_info.expression,
906 ));
907 return;
908 }
909 }
910
911 node.downstream
913 .push(make_table_column_node(table, col_name));
914}
915
916fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
922 if scope.cte_sources.contains_key(name) {
924 return None;
925 }
926 if let Some(source_info) = scope.sources.get(name) {
928 if source_info.is_scope {
929 if let Expression::Cte(cte) = &source_info.expression {
930 let cte_name = &cte.alias.name;
931 if scope.cte_sources.contains_key(cte_name) {
932 return Some(cte_name.clone());
933 }
934 }
935 }
936 }
937 None
938}
939
940fn resolve_unqualified_column(
941 node: &mut LineageNode,
942 scope: &Scope,
943 dialect: Option<DialectType>,
944 col_name: &str,
945 parent_name: &str,
946 trim_selects: bool,
947 all_cte_scopes: &[&Scope],
948 depth: usize,
949) {
950 let from_source_names = source_names_from_from_join(scope);
954
955 if from_source_names.len() == 1 {
956 let tbl = &from_source_names[0];
957 resolve_qualified_column(
958 node,
959 scope,
960 dialect,
961 tbl,
962 col_name,
963 parent_name,
964 trim_selects,
965 all_cte_scopes,
966 depth,
967 );
968 return;
969 }
970
971 let child = LineageNode::new(
973 col_name.to_string(),
974 Expression::Column(Box::new(crate::expressions::Column {
975 name: crate::expressions::Identifier::new(col_name.to_string()),
976 table: None,
977 join_mark: false,
978 trailing_comments: vec![],
979 span: None,
980 inferred_type: None,
981 })),
982 node.source.clone(),
983 );
984 node.downstream.push(child);
985}
986
987fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
988 fn source_name(expr: &Expression) -> Option<String> {
989 match expr {
990 Expression::Table(table) => Some(
991 table
992 .alias
993 .as_ref()
994 .map(|a| a.name.clone())
995 .unwrap_or_else(|| table.name.name.clone()),
996 ),
997 Expression::Subquery(subquery) => {
998 subquery.alias.as_ref().map(|alias| alias.name.clone())
999 }
1000 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1001 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1002 Some(alias.alias.name.clone())
1003 }
1004 Expression::Paren(paren) => source_name(&paren.this),
1005 _ => None,
1006 }
1007 }
1008
1009 let effective_expr = match &scope.expression {
1010 Expression::Cte(cte) => &cte.this,
1011 expr => expr,
1012 };
1013
1014 let mut names = Vec::new();
1015 let mut seen = std::collections::HashSet::new();
1016
1017 if let Expression::Select(select) = effective_expr {
1018 if let Some(from) = &select.from {
1019 for expr in &from.expressions {
1020 if let Some(name) = source_name(expr) {
1021 if !name.is_empty() && seen.insert(name.clone()) {
1022 names.push(name);
1023 }
1024 }
1025 }
1026 }
1027 for join in &select.joins {
1028 if let Some(name) = source_name(&join.this) {
1029 if !name.is_empty() && seen.insert(name.clone()) {
1030 names.push(name);
1031 }
1032 }
1033 }
1034 }
1035
1036 names
1037}
1038
1039fn get_alias_or_name(expr: &Expression) -> Option<String> {
1045 match expr {
1046 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1047 Expression::Column(col) => Some(col.name.name.clone()),
1048 Expression::Identifier(id) => Some(id.name.clone()),
1049 Expression::Star(_) => Some("*".to_string()),
1050 Expression::Annotated(a) => get_alias_or_name(&a.this),
1053 _ => None,
1054 }
1055}
1056
1057fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1059 match column {
1060 ColumnRef::Name(n) => n.to_string(),
1061 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1062 }
1063}
1064
1065fn find_select_expr(
1067 scope_expr: &Expression,
1068 column: &ColumnRef<'_>,
1069 dialect: Option<DialectType>,
1070) -> Result<Expression> {
1071 if let Expression::Select(ref select) = scope_expr {
1072 match column {
1073 ColumnRef::Name(name) => {
1074 let normalized_name = normalize_column_name(name, dialect);
1075 for expr in &select.expressions {
1076 if let Some(alias_or_name) = get_alias_or_name(expr) {
1077 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1078 return Ok(expr.clone());
1079 }
1080 }
1081 }
1082 Err(crate::error::Error::parse(
1083 format!("Cannot find column '{}' in query", name),
1084 0,
1085 0,
1086 0,
1087 0,
1088 ))
1089 }
1090 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1091 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1092 }),
1093 }
1094 } else {
1095 Err(crate::error::Error::parse(
1096 "Expected SELECT expression for column lookup",
1097 0,
1098 0,
1099 0,
1100 0,
1101 ))
1102 }
1103}
1104
1105fn column_to_index(
1107 set_op_expr: &Expression,
1108 name: &str,
1109 dialect: Option<DialectType>,
1110) -> Result<usize> {
1111 let normalized_name = normalize_column_name(name, dialect);
1112 let mut expr = set_op_expr;
1113 loop {
1114 match expr {
1115 Expression::Union(u) => expr = &u.left,
1116 Expression::Intersect(i) => expr = &i.left,
1117 Expression::Except(e) => expr = &e.left,
1118 Expression::Select(select) => {
1119 for (i, e) in select.expressions.iter().enumerate() {
1120 if let Some(alias_or_name) = get_alias_or_name(e) {
1121 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1122 return Ok(i);
1123 }
1124 }
1125 }
1126 return Err(crate::error::Error::parse(
1127 format!("Cannot find column '{}' in set operation", name),
1128 0,
1129 0,
1130 0,
1131 0,
1132 ));
1133 }
1134 _ => {
1135 return Err(crate::error::Error::parse(
1136 "Expected SELECT or set operation",
1137 0,
1138 0,
1139 0,
1140 0,
1141 ))
1142 }
1143 }
1144 }
1145}
1146
1147fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1148 normalize_name(name, dialect, false, true)
1149}
1150
1151fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1153 if let Expression::Select(select) = select_expr {
1154 let mut trimmed = select.as_ref().clone();
1155 trimmed.expressions = vec![target_expr.clone()];
1156 Expression::Select(Box::new(trimmed))
1157 } else {
1158 select_expr.clone()
1159 }
1160}
1161
1162fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1164 if scope.cte_sources.contains_key(source_name) {
1166 for cte_scope in &scope.cte_scopes {
1167 if let Expression::Cte(cte) = &cte_scope.expression {
1168 if cte.alias.name == source_name {
1169 return Some(cte_scope);
1170 }
1171 }
1172 }
1173 }
1174
1175 if let Some(source_info) = scope.sources.get(source_name) {
1177 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1178 if let Expression::Subquery(sq) = &source_info.expression {
1179 for dt_scope in &scope.derived_table_scopes {
1180 if dt_scope.expression == sq.this {
1181 return Some(dt_scope);
1182 }
1183 }
1184 }
1185 }
1186 }
1187
1188 None
1189}
1190
1191fn find_child_scope_in<'a>(
1195 all_cte_scopes: &[&'a Scope],
1196 scope: &'a Scope,
1197 source_name: &str,
1198) -> Option<&'a Scope> {
1199 for cte_scope in &scope.cte_scopes {
1201 if let Expression::Cte(cte) = &cte_scope.expression {
1202 if cte.alias.name == source_name {
1203 return Some(cte_scope);
1204 }
1205 }
1206 }
1207
1208 for cte_scope in all_cte_scopes {
1210 if let Expression::Cte(cte) = &cte_scope.expression {
1211 if cte.alias.name == source_name {
1212 return Some(cte_scope);
1213 }
1214 }
1215 }
1216
1217 if let Some(source_info) = scope.sources.get(source_name) {
1219 if source_info.is_scope {
1220 if let Expression::Subquery(sq) = &source_info.expression {
1221 for dt_scope in &scope.derived_table_scopes {
1222 if dt_scope.expression == sq.this {
1223 return Some(dt_scope);
1224 }
1225 }
1226 }
1227 }
1228 }
1229
1230 None
1231}
1232
1233fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1235 let mut node = LineageNode::new(
1236 format!("{}.{}", table, column),
1237 Expression::Column(Box::new(crate::expressions::Column {
1238 name: crate::expressions::Identifier::new(column.to_string()),
1239 table: Some(crate::expressions::Identifier::new(table.to_string())),
1240 join_mark: false,
1241 trailing_comments: vec![],
1242 span: None,
1243 inferred_type: None,
1244 })),
1245 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1246 );
1247 node.source_name = table.to_string();
1248 node
1249}
1250
1251fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1252 let mut parts: Vec<String> = Vec::new();
1253 if let Some(catalog) = &table_ref.catalog {
1254 parts.push(catalog.name.clone());
1255 }
1256 if let Some(schema) = &table_ref.schema {
1257 parts.push(schema.name.clone());
1258 }
1259 parts.push(table_ref.name.name.clone());
1260 parts.join(".")
1261}
1262
1263fn make_table_column_node_from_source(
1264 table_alias: &str,
1265 column: &str,
1266 source: &Expression,
1267) -> LineageNode {
1268 let mut node = LineageNode::new(
1269 format!("{}.{}", table_alias, column),
1270 Expression::Column(Box::new(crate::expressions::Column {
1271 name: crate::expressions::Identifier::new(column.to_string()),
1272 table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
1273 join_mark: false,
1274 trailing_comments: vec![],
1275 span: None,
1276 inferred_type: None,
1277 })),
1278 source.clone(),
1279 );
1280
1281 if let Expression::Table(table_ref) = source {
1282 node.source_name = table_name_from_table_ref(table_ref);
1283 } else {
1284 node.source_name = table_alias.to_string();
1285 }
1286
1287 node
1288}
1289
1290#[derive(Debug, Clone)]
1292struct SimpleColumnRef {
1293 table: Option<crate::expressions::Identifier>,
1294 column: String,
1295}
1296
1297fn find_column_refs_in_expr(
1299 expr: &Expression,
1300 dialect: Option<DialectType>,
1301) -> Vec<SimpleColumnRef> {
1302 let mut refs = Vec::new();
1303 collect_column_refs(expr, dialect, &mut refs);
1304 refs
1305}
1306
1307fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1308 match expr {
1309 Expression::Column(col) => {
1310 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1311 }
1312 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1313 _ => false,
1314 }
1315}
1316
1317fn collect_column_refs(
1318 expr: &Expression,
1319 dialect: Option<DialectType>,
1320 refs: &mut Vec<SimpleColumnRef>,
1321) {
1322 let mut stack: Vec<&Expression> = vec![expr];
1323
1324 while let Some(current) = stack.pop() {
1325 match current {
1326 Expression::Column(col) => {
1328 refs.push(SimpleColumnRef {
1329 table: col.table.clone(),
1330 column: col.name.name.clone(),
1331 });
1332 }
1333
1334 Expression::Subquery(_) | Expression::Exists(_) => {}
1336
1337 Expression::And(op)
1339 | Expression::Or(op)
1340 | Expression::Eq(op)
1341 | Expression::Neq(op)
1342 | Expression::Lt(op)
1343 | Expression::Lte(op)
1344 | Expression::Gt(op)
1345 | Expression::Gte(op)
1346 | Expression::Add(op)
1347 | Expression::Sub(op)
1348 | Expression::Mul(op)
1349 | Expression::Div(op)
1350 | Expression::Mod(op)
1351 | Expression::BitwiseAnd(op)
1352 | Expression::BitwiseOr(op)
1353 | Expression::BitwiseXor(op)
1354 | Expression::BitwiseLeftShift(op)
1355 | Expression::BitwiseRightShift(op)
1356 | Expression::Concat(op)
1357 | Expression::Adjacent(op)
1358 | Expression::TsMatch(op)
1359 | Expression::PropertyEQ(op)
1360 | Expression::ArrayContainsAll(op)
1361 | Expression::ArrayContainedBy(op)
1362 | Expression::ArrayOverlaps(op)
1363 | Expression::JSONBContainsAllTopKeys(op)
1364 | Expression::JSONBContainsAnyTopKeys(op)
1365 | Expression::JSONBDeleteAtPath(op)
1366 | Expression::ExtendsLeft(op)
1367 | Expression::ExtendsRight(op)
1368 | Expression::Is(op)
1369 | Expression::MemberOf(op)
1370 | Expression::NullSafeEq(op)
1371 | Expression::NullSafeNeq(op)
1372 | Expression::Glob(op)
1373 | Expression::Match(op) => {
1374 stack.push(&op.left);
1375 stack.push(&op.right);
1376 }
1377
1378 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1380 stack.push(&u.this);
1381 }
1382
1383 Expression::Upper(f)
1385 | Expression::Lower(f)
1386 | Expression::Length(f)
1387 | Expression::LTrim(f)
1388 | Expression::RTrim(f)
1389 | Expression::Reverse(f)
1390 | Expression::Abs(f)
1391 | Expression::Sqrt(f)
1392 | Expression::Cbrt(f)
1393 | Expression::Ln(f)
1394 | Expression::Exp(f)
1395 | Expression::Sign(f)
1396 | Expression::Date(f)
1397 | Expression::Time(f)
1398 | Expression::DateFromUnixDate(f)
1399 | Expression::UnixDate(f)
1400 | Expression::UnixSeconds(f)
1401 | Expression::UnixMillis(f)
1402 | Expression::UnixMicros(f)
1403 | Expression::TimeStrToDate(f)
1404 | Expression::DateToDi(f)
1405 | Expression::DiToDate(f)
1406 | Expression::TsOrDiToDi(f)
1407 | Expression::TsOrDsToDatetime(f)
1408 | Expression::TsOrDsToTimestamp(f)
1409 | Expression::YearOfWeek(f)
1410 | Expression::YearOfWeekIso(f)
1411 | Expression::Initcap(f)
1412 | Expression::Ascii(f)
1413 | Expression::Chr(f)
1414 | Expression::Soundex(f)
1415 | Expression::ByteLength(f)
1416 | Expression::Hex(f)
1417 | Expression::LowerHex(f)
1418 | Expression::Unicode(f)
1419 | Expression::Radians(f)
1420 | Expression::Degrees(f)
1421 | Expression::Sin(f)
1422 | Expression::Cos(f)
1423 | Expression::Tan(f)
1424 | Expression::Asin(f)
1425 | Expression::Acos(f)
1426 | Expression::Atan(f)
1427 | Expression::IsNan(f)
1428 | Expression::IsInf(f)
1429 | Expression::ArrayLength(f)
1430 | Expression::ArraySize(f)
1431 | Expression::Cardinality(f)
1432 | Expression::ArrayReverse(f)
1433 | Expression::ArrayDistinct(f)
1434 | Expression::ArrayFlatten(f)
1435 | Expression::ArrayCompact(f)
1436 | Expression::Explode(f)
1437 | Expression::ExplodeOuter(f)
1438 | Expression::ToArray(f)
1439 | Expression::MapFromEntries(f)
1440 | Expression::MapKeys(f)
1441 | Expression::MapValues(f)
1442 | Expression::JsonArrayLength(f)
1443 | Expression::JsonKeys(f)
1444 | Expression::JsonType(f)
1445 | Expression::ParseJson(f)
1446 | Expression::ToJson(f)
1447 | Expression::Typeof(f)
1448 | Expression::BitwiseCount(f)
1449 | Expression::Year(f)
1450 | Expression::Month(f)
1451 | Expression::Day(f)
1452 | Expression::Hour(f)
1453 | Expression::Minute(f)
1454 | Expression::Second(f)
1455 | Expression::DayOfWeek(f)
1456 | Expression::DayOfWeekIso(f)
1457 | Expression::DayOfMonth(f)
1458 | Expression::DayOfYear(f)
1459 | Expression::WeekOfYear(f)
1460 | Expression::Quarter(f)
1461 | Expression::Epoch(f)
1462 | Expression::EpochMs(f)
1463 | Expression::TimeStrToUnix(f)
1464 | Expression::SHA(f)
1465 | Expression::SHA1Digest(f)
1466 | Expression::TimeToUnix(f)
1467 | Expression::JSONBool(f)
1468 | Expression::Int64(f)
1469 | Expression::MD5NumberLower64(f)
1470 | Expression::MD5NumberUpper64(f)
1471 | Expression::DateStrToDate(f)
1472 | Expression::DateToDateStr(f) => {
1473 stack.push(&f.this);
1474 }
1475
1476 Expression::Power(f)
1478 | Expression::NullIf(f)
1479 | Expression::IfNull(f)
1480 | Expression::Nvl(f)
1481 | Expression::UnixToTimeStr(f)
1482 | Expression::Contains(f)
1483 | Expression::StartsWith(f)
1484 | Expression::EndsWith(f)
1485 | Expression::Levenshtein(f)
1486 | Expression::ModFunc(f)
1487 | Expression::Atan2(f)
1488 | Expression::IntDiv(f)
1489 | Expression::AddMonths(f)
1490 | Expression::MonthsBetween(f)
1491 | Expression::NextDay(f)
1492 | Expression::ArrayContains(f)
1493 | Expression::ArrayPosition(f)
1494 | Expression::ArrayAppend(f)
1495 | Expression::ArrayPrepend(f)
1496 | Expression::ArrayUnion(f)
1497 | Expression::ArrayExcept(f)
1498 | Expression::ArrayRemove(f)
1499 | Expression::StarMap(f)
1500 | Expression::MapFromArrays(f)
1501 | Expression::MapContainsKey(f)
1502 | Expression::ElementAt(f)
1503 | Expression::JsonMergePatch(f)
1504 | Expression::JSONBContains(f)
1505 | Expression::JSONBExtract(f) => {
1506 stack.push(&f.this);
1507 stack.push(&f.expression);
1508 }
1509
1510 Expression::Greatest(f)
1512 | Expression::Least(f)
1513 | Expression::Coalesce(f)
1514 | Expression::ArrayConcat(f)
1515 | Expression::ArrayIntersect(f)
1516 | Expression::ArrayZip(f)
1517 | Expression::MapConcat(f)
1518 | Expression::JsonArray(f) => {
1519 for e in &f.expressions {
1520 stack.push(e);
1521 }
1522 }
1523
1524 Expression::Sum(f)
1526 | Expression::Avg(f)
1527 | Expression::Min(f)
1528 | Expression::Max(f)
1529 | Expression::ArrayAgg(f)
1530 | Expression::CountIf(f)
1531 | Expression::Stddev(f)
1532 | Expression::StddevPop(f)
1533 | Expression::StddevSamp(f)
1534 | Expression::Variance(f)
1535 | Expression::VarPop(f)
1536 | Expression::VarSamp(f)
1537 | Expression::Median(f)
1538 | Expression::Mode(f)
1539 | Expression::First(f)
1540 | Expression::Last(f)
1541 | Expression::AnyValue(f)
1542 | Expression::ApproxDistinct(f)
1543 | Expression::ApproxCountDistinct(f)
1544 | Expression::LogicalAnd(f)
1545 | Expression::LogicalOr(f)
1546 | Expression::Skewness(f)
1547 | Expression::ArrayConcatAgg(f)
1548 | Expression::ArrayUniqueAgg(f)
1549 | Expression::BoolXorAgg(f)
1550 | Expression::BitwiseAndAgg(f)
1551 | Expression::BitwiseOrAgg(f)
1552 | Expression::BitwiseXorAgg(f) => {
1553 stack.push(&f.this);
1554 if let Some(ref filter) = f.filter {
1555 stack.push(filter);
1556 }
1557 if let Some((ref expr, _)) = f.having_max {
1558 stack.push(expr);
1559 }
1560 if let Some(ref limit) = f.limit {
1561 stack.push(limit);
1562 }
1563 }
1564
1565 Expression::Function(func) => {
1567 for arg in &func.args {
1568 stack.push(arg);
1569 }
1570 }
1571 Expression::AggregateFunction(func) => {
1572 for arg in &func.args {
1573 stack.push(arg);
1574 }
1575 if let Some(ref filter) = func.filter {
1576 stack.push(filter);
1577 }
1578 if let Some(ref limit) = func.limit {
1579 stack.push(limit);
1580 }
1581 }
1582
1583 Expression::WindowFunction(wf) => {
1585 stack.push(&wf.this);
1586 }
1587
1588 Expression::Alias(a) => {
1590 stack.push(&a.this);
1591 }
1592 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1593 stack.push(&c.this);
1594 if let Some(ref fmt) = c.format {
1595 stack.push(fmt);
1596 }
1597 if let Some(ref def) = c.default {
1598 stack.push(def);
1599 }
1600 }
1601 Expression::Paren(p) => {
1602 stack.push(&p.this);
1603 }
1604 Expression::Annotated(a) => {
1605 stack.push(&a.this);
1606 }
1607 Expression::Case(case) => {
1608 if let Some(ref operand) = case.operand {
1609 stack.push(operand);
1610 }
1611 for (cond, result) in &case.whens {
1612 stack.push(cond);
1613 stack.push(result);
1614 }
1615 if let Some(ref else_expr) = case.else_ {
1616 stack.push(else_expr);
1617 }
1618 }
1619 Expression::Collation(c) => {
1620 stack.push(&c.this);
1621 }
1622 Expression::In(i) => {
1623 stack.push(&i.this);
1624 for e in &i.expressions {
1625 stack.push(e);
1626 }
1627 if let Some(ref q) = i.query {
1628 stack.push(q);
1629 }
1630 if let Some(ref u) = i.unnest {
1631 stack.push(u);
1632 }
1633 }
1634 Expression::Between(b) => {
1635 stack.push(&b.this);
1636 stack.push(&b.low);
1637 stack.push(&b.high);
1638 }
1639 Expression::IsNull(n) => {
1640 stack.push(&n.this);
1641 }
1642 Expression::IsTrue(t) | Expression::IsFalse(t) => {
1643 stack.push(&t.this);
1644 }
1645 Expression::IsJson(j) => {
1646 stack.push(&j.this);
1647 }
1648 Expression::Like(l) | Expression::ILike(l) => {
1649 stack.push(&l.left);
1650 stack.push(&l.right);
1651 if let Some(ref esc) = l.escape {
1652 stack.push(esc);
1653 }
1654 }
1655 Expression::SimilarTo(s) => {
1656 stack.push(&s.this);
1657 stack.push(&s.pattern);
1658 if let Some(ref esc) = s.escape {
1659 stack.push(esc);
1660 }
1661 }
1662 Expression::Ordered(o) => {
1663 stack.push(&o.this);
1664 }
1665 Expression::Array(a) => {
1666 for e in &a.expressions {
1667 stack.push(e);
1668 }
1669 }
1670 Expression::Tuple(t) => {
1671 for e in &t.expressions {
1672 stack.push(e);
1673 }
1674 }
1675 Expression::Struct(s) => {
1676 for (_, e) in &s.fields {
1677 stack.push(e);
1678 }
1679 }
1680 Expression::Subscript(s) => {
1681 stack.push(&s.this);
1682 stack.push(&s.index);
1683 }
1684 Expression::Dot(d) => {
1685 stack.push(&d.this);
1686 }
1687 Expression::MethodCall(m) => {
1688 if !matches!(dialect, Some(DialectType::BigQuery))
1689 || !is_bigquery_safe_namespace_receiver(&m.this)
1690 {
1691 stack.push(&m.this);
1692 }
1693 for arg in &m.args {
1694 stack.push(arg);
1695 }
1696 }
1697 Expression::ArraySlice(s) => {
1698 stack.push(&s.this);
1699 if let Some(ref start) = s.start {
1700 stack.push(start);
1701 }
1702 if let Some(ref end) = s.end {
1703 stack.push(end);
1704 }
1705 }
1706 Expression::Lambda(l) => {
1707 stack.push(&l.body);
1708 }
1709 Expression::NamedArgument(n) => {
1710 stack.push(&n.value);
1711 }
1712 Expression::TryCatch(t) => {
1713 for stmt in &t.try_body {
1714 stack.push(stmt);
1715 }
1716 if let Some(catch_body) = &t.catch_body {
1717 for stmt in catch_body {
1718 stack.push(stmt);
1719 }
1720 }
1721 }
1722 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1723 stack.push(e);
1724 }
1725
1726 Expression::Substring(f) => {
1728 stack.push(&f.this);
1729 stack.push(&f.start);
1730 if let Some(ref len) = f.length {
1731 stack.push(len);
1732 }
1733 }
1734 Expression::Trim(f) => {
1735 stack.push(&f.this);
1736 if let Some(ref chars) = f.characters {
1737 stack.push(chars);
1738 }
1739 }
1740 Expression::Replace(f) => {
1741 stack.push(&f.this);
1742 stack.push(&f.old);
1743 stack.push(&f.new);
1744 }
1745 Expression::IfFunc(f) => {
1746 stack.push(&f.condition);
1747 stack.push(&f.true_value);
1748 if let Some(ref fv) = f.false_value {
1749 stack.push(fv);
1750 }
1751 }
1752 Expression::Nvl2(f) => {
1753 stack.push(&f.this);
1754 stack.push(&f.true_value);
1755 stack.push(&f.false_value);
1756 }
1757 Expression::ConcatWs(f) => {
1758 stack.push(&f.separator);
1759 for e in &f.expressions {
1760 stack.push(e);
1761 }
1762 }
1763 Expression::Count(f) => {
1764 if let Some(ref this) = f.this {
1765 stack.push(this);
1766 }
1767 if let Some(ref filter) = f.filter {
1768 stack.push(filter);
1769 }
1770 }
1771 Expression::GroupConcat(f) => {
1772 stack.push(&f.this);
1773 if let Some(ref sep) = f.separator {
1774 stack.push(sep);
1775 }
1776 if let Some(ref filter) = f.filter {
1777 stack.push(filter);
1778 }
1779 }
1780 Expression::StringAgg(f) => {
1781 stack.push(&f.this);
1782 if let Some(ref sep) = f.separator {
1783 stack.push(sep);
1784 }
1785 if let Some(ref filter) = f.filter {
1786 stack.push(filter);
1787 }
1788 if let Some(ref limit) = f.limit {
1789 stack.push(limit);
1790 }
1791 }
1792 Expression::ListAgg(f) => {
1793 stack.push(&f.this);
1794 if let Some(ref sep) = f.separator {
1795 stack.push(sep);
1796 }
1797 if let Some(ref filter) = f.filter {
1798 stack.push(filter);
1799 }
1800 }
1801 Expression::SumIf(f) => {
1802 stack.push(&f.this);
1803 stack.push(&f.condition);
1804 if let Some(ref filter) = f.filter {
1805 stack.push(filter);
1806 }
1807 }
1808 Expression::DateAdd(f) | Expression::DateSub(f) => {
1809 stack.push(&f.this);
1810 stack.push(&f.interval);
1811 }
1812 Expression::DateDiff(f) => {
1813 stack.push(&f.this);
1814 stack.push(&f.expression);
1815 }
1816 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1817 stack.push(&f.this);
1818 }
1819 Expression::Extract(f) => {
1820 stack.push(&f.this);
1821 }
1822 Expression::Round(f) => {
1823 stack.push(&f.this);
1824 if let Some(ref d) = f.decimals {
1825 stack.push(d);
1826 }
1827 }
1828 Expression::Floor(f) => {
1829 stack.push(&f.this);
1830 if let Some(ref s) = f.scale {
1831 stack.push(s);
1832 }
1833 if let Some(ref t) = f.to {
1834 stack.push(t);
1835 }
1836 }
1837 Expression::Ceil(f) => {
1838 stack.push(&f.this);
1839 if let Some(ref d) = f.decimals {
1840 stack.push(d);
1841 }
1842 if let Some(ref t) = f.to {
1843 stack.push(t);
1844 }
1845 }
1846 Expression::Log(f) => {
1847 stack.push(&f.this);
1848 if let Some(ref b) = f.base {
1849 stack.push(b);
1850 }
1851 }
1852 Expression::AtTimeZone(f) => {
1853 stack.push(&f.this);
1854 stack.push(&f.zone);
1855 }
1856 Expression::Lead(f) | Expression::Lag(f) => {
1857 stack.push(&f.this);
1858 if let Some(ref off) = f.offset {
1859 stack.push(off);
1860 }
1861 if let Some(ref def) = f.default {
1862 stack.push(def);
1863 }
1864 }
1865 Expression::FirstValue(f) | Expression::LastValue(f) => {
1866 stack.push(&f.this);
1867 }
1868 Expression::NthValue(f) => {
1869 stack.push(&f.this);
1870 stack.push(&f.offset);
1871 }
1872 Expression::Position(f) => {
1873 stack.push(&f.substring);
1874 stack.push(&f.string);
1875 if let Some(ref start) = f.start {
1876 stack.push(start);
1877 }
1878 }
1879 Expression::Decode(f) => {
1880 stack.push(&f.this);
1881 for (search, result) in &f.search_results {
1882 stack.push(search);
1883 stack.push(result);
1884 }
1885 if let Some(ref def) = f.default {
1886 stack.push(def);
1887 }
1888 }
1889 Expression::CharFunc(f) => {
1890 for arg in &f.args {
1891 stack.push(arg);
1892 }
1893 }
1894 Expression::ArraySort(f) => {
1895 stack.push(&f.this);
1896 if let Some(ref cmp) = f.comparator {
1897 stack.push(cmp);
1898 }
1899 }
1900 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1901 stack.push(&f.this);
1902 stack.push(&f.separator);
1903 if let Some(ref nr) = f.null_replacement {
1904 stack.push(nr);
1905 }
1906 }
1907 Expression::ArrayFilter(f) => {
1908 stack.push(&f.this);
1909 stack.push(&f.filter);
1910 }
1911 Expression::ArrayTransform(f) => {
1912 stack.push(&f.this);
1913 stack.push(&f.transform);
1914 }
1915 Expression::Sequence(f)
1916 | Expression::Generate(f)
1917 | Expression::ExplodingGenerateSeries(f) => {
1918 stack.push(&f.start);
1919 stack.push(&f.stop);
1920 if let Some(ref step) = f.step {
1921 stack.push(step);
1922 }
1923 }
1924 Expression::JsonExtract(f)
1925 | Expression::JsonExtractScalar(f)
1926 | Expression::JsonQuery(f)
1927 | Expression::JsonValue(f) => {
1928 stack.push(&f.this);
1929 stack.push(&f.path);
1930 }
1931 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1932 stack.push(&f.this);
1933 for p in &f.paths {
1934 stack.push(p);
1935 }
1936 }
1937 Expression::JsonObject(f) => {
1938 for (k, v) in &f.pairs {
1939 stack.push(k);
1940 stack.push(v);
1941 }
1942 }
1943 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1944 stack.push(&f.this);
1945 for (path, val) in &f.path_values {
1946 stack.push(path);
1947 stack.push(val);
1948 }
1949 }
1950 Expression::Overlay(f) => {
1951 stack.push(&f.this);
1952 stack.push(&f.replacement);
1953 stack.push(&f.from);
1954 if let Some(ref len) = f.length {
1955 stack.push(len);
1956 }
1957 }
1958 Expression::Convert(f) => {
1959 stack.push(&f.this);
1960 if let Some(ref style) = f.style {
1961 stack.push(style);
1962 }
1963 }
1964 Expression::ApproxPercentile(f) => {
1965 stack.push(&f.this);
1966 stack.push(&f.percentile);
1967 if let Some(ref acc) = f.accuracy {
1968 stack.push(acc);
1969 }
1970 if let Some(ref filter) = f.filter {
1971 stack.push(filter);
1972 }
1973 }
1974 Expression::Percentile(f)
1975 | Expression::PercentileCont(f)
1976 | Expression::PercentileDisc(f) => {
1977 stack.push(&f.this);
1978 stack.push(&f.percentile);
1979 if let Some(ref filter) = f.filter {
1980 stack.push(filter);
1981 }
1982 }
1983 Expression::WithinGroup(f) => {
1984 stack.push(&f.this);
1985 }
1986 Expression::Left(f) | Expression::Right(f) => {
1987 stack.push(&f.this);
1988 stack.push(&f.length);
1989 }
1990 Expression::Repeat(f) => {
1991 stack.push(&f.this);
1992 stack.push(&f.times);
1993 }
1994 Expression::Lpad(f) | Expression::Rpad(f) => {
1995 stack.push(&f.this);
1996 stack.push(&f.length);
1997 if let Some(ref fill) = f.fill {
1998 stack.push(fill);
1999 }
2000 }
2001 Expression::Split(f) => {
2002 stack.push(&f.this);
2003 stack.push(&f.delimiter);
2004 }
2005 Expression::RegexpLike(f) => {
2006 stack.push(&f.this);
2007 stack.push(&f.pattern);
2008 if let Some(ref flags) = f.flags {
2009 stack.push(flags);
2010 }
2011 }
2012 Expression::RegexpReplace(f) => {
2013 stack.push(&f.this);
2014 stack.push(&f.pattern);
2015 stack.push(&f.replacement);
2016 if let Some(ref flags) = f.flags {
2017 stack.push(flags);
2018 }
2019 }
2020 Expression::RegexpExtract(f) => {
2021 stack.push(&f.this);
2022 stack.push(&f.pattern);
2023 if let Some(ref group) = f.group {
2024 stack.push(group);
2025 }
2026 }
2027 Expression::ToDate(f) => {
2028 stack.push(&f.this);
2029 if let Some(ref fmt) = f.format {
2030 stack.push(fmt);
2031 }
2032 }
2033 Expression::ToTimestamp(f) => {
2034 stack.push(&f.this);
2035 if let Some(ref fmt) = f.format {
2036 stack.push(fmt);
2037 }
2038 }
2039 Expression::DateFormat(f) | Expression::FormatDate(f) => {
2040 stack.push(&f.this);
2041 stack.push(&f.format);
2042 }
2043 Expression::LastDay(f) => {
2044 stack.push(&f.this);
2045 }
2046 Expression::FromUnixtime(f) => {
2047 stack.push(&f.this);
2048 if let Some(ref fmt) = f.format {
2049 stack.push(fmt);
2050 }
2051 }
2052 Expression::UnixTimestamp(f) => {
2053 if let Some(ref this) = f.this {
2054 stack.push(this);
2055 }
2056 if let Some(ref fmt) = f.format {
2057 stack.push(fmt);
2058 }
2059 }
2060 Expression::MakeDate(f) => {
2061 stack.push(&f.year);
2062 stack.push(&f.month);
2063 stack.push(&f.day);
2064 }
2065 Expression::MakeTimestamp(f) => {
2066 stack.push(&f.year);
2067 stack.push(&f.month);
2068 stack.push(&f.day);
2069 stack.push(&f.hour);
2070 stack.push(&f.minute);
2071 stack.push(&f.second);
2072 if let Some(ref tz) = f.timezone {
2073 stack.push(tz);
2074 }
2075 }
2076 Expression::TruncFunc(f) => {
2077 stack.push(&f.this);
2078 if let Some(ref d) = f.decimals {
2079 stack.push(d);
2080 }
2081 }
2082 Expression::ArrayFunc(f) => {
2083 for e in &f.expressions {
2084 stack.push(e);
2085 }
2086 }
2087 Expression::Unnest(f) => {
2088 stack.push(&f.this);
2089 for e in &f.expressions {
2090 stack.push(e);
2091 }
2092 }
2093 Expression::StructFunc(f) => {
2094 for (_, e) in &f.fields {
2095 stack.push(e);
2096 }
2097 }
2098 Expression::StructExtract(f) => {
2099 stack.push(&f.this);
2100 }
2101 Expression::NamedStruct(f) => {
2102 for (k, v) in &f.pairs {
2103 stack.push(k);
2104 stack.push(v);
2105 }
2106 }
2107 Expression::MapFunc(f) => {
2108 for k in &f.keys {
2109 stack.push(k);
2110 }
2111 for v in &f.values {
2112 stack.push(v);
2113 }
2114 }
2115 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2116 stack.push(&f.this);
2117 stack.push(&f.transform);
2118 }
2119 Expression::JsonArrayAgg(f) => {
2120 stack.push(&f.this);
2121 if let Some(ref filter) = f.filter {
2122 stack.push(filter);
2123 }
2124 }
2125 Expression::JsonObjectAgg(f) => {
2126 stack.push(&f.key);
2127 stack.push(&f.value);
2128 if let Some(ref filter) = f.filter {
2129 stack.push(filter);
2130 }
2131 }
2132 Expression::NTile(f) => {
2133 if let Some(ref n) = f.num_buckets {
2134 stack.push(n);
2135 }
2136 }
2137 Expression::Rand(f) => {
2138 if let Some(ref s) = f.seed {
2139 stack.push(s);
2140 }
2141 if let Some(ref lo) = f.lower {
2142 stack.push(lo);
2143 }
2144 if let Some(ref hi) = f.upper {
2145 stack.push(hi);
2146 }
2147 }
2148 Expression::Any(q) | Expression::All(q) => {
2149 stack.push(&q.this);
2150 stack.push(&q.subquery);
2151 }
2152 Expression::Overlaps(o) => {
2153 if let Some(ref this) = o.this {
2154 stack.push(this);
2155 }
2156 if let Some(ref expr) = o.expression {
2157 stack.push(expr);
2158 }
2159 if let Some(ref ls) = o.left_start {
2160 stack.push(ls);
2161 }
2162 if let Some(ref le) = o.left_end {
2163 stack.push(le);
2164 }
2165 if let Some(ref rs) = o.right_start {
2166 stack.push(rs);
2167 }
2168 if let Some(ref re) = o.right_end {
2169 stack.push(re);
2170 }
2171 }
2172 Expression::Interval(i) => {
2173 if let Some(ref this) = i.this {
2174 stack.push(this);
2175 }
2176 }
2177 Expression::TimeStrToTime(f) => {
2178 stack.push(&f.this);
2179 if let Some(ref zone) = f.zone {
2180 stack.push(zone);
2181 }
2182 }
2183 Expression::JSONBExtractScalar(f) => {
2184 stack.push(&f.this);
2185 stack.push(&f.expression);
2186 if let Some(ref jt) = f.json_type {
2187 stack.push(jt);
2188 }
2189 }
2190
2191 _ => {}
2196 }
2197 }
2198}
2199
2200#[cfg(test)]
2205mod tests {
2206 use super::*;
2207 use crate::dialects::{Dialect, DialectType};
2208 use crate::expressions::DataType;
2209 use crate::optimizer::annotate_types::annotate_types;
2210 use crate::parse_one;
2211 use crate::schema::{MappingSchema, Schema};
2212
2213 fn parse(sql: &str) -> Expression {
2214 let dialect = Dialect::get(DialectType::Generic);
2215 let ast = dialect.parse(sql).unwrap();
2216 ast.into_iter().next().unwrap()
2217 }
2218
2219 #[test]
2220 fn test_simple_lineage() {
2221 let expr = parse("SELECT a FROM t");
2222 let node = lineage("a", &expr, None, false).unwrap();
2223
2224 assert_eq!(node.name, "a");
2225 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2226 let names = node.downstream_names();
2228 assert!(
2229 names.iter().any(|n| n == "t.a"),
2230 "Expected t.a in downstream, got: {:?}",
2231 names
2232 );
2233 }
2234
2235 #[test]
2236 fn test_lineage_walk() {
2237 let root = LineageNode {
2238 name: "col_a".to_string(),
2239 expression: Expression::Null(crate::expressions::Null),
2240 source: Expression::Null(crate::expressions::Null),
2241 downstream: vec![LineageNode::new(
2242 "t.a",
2243 Expression::Null(crate::expressions::Null),
2244 Expression::Null(crate::expressions::Null),
2245 )],
2246 source_name: String::new(),
2247 reference_node_name: String::new(),
2248 };
2249
2250 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2251 assert_eq!(names.len(), 2);
2252 assert_eq!(names[0], "col_a");
2253 assert_eq!(names[1], "t.a");
2254 }
2255
2256 #[test]
2257 fn test_aliased_column() {
2258 let expr = parse("SELECT a + 1 AS b FROM t");
2259 let node = lineage("b", &expr, None, false).unwrap();
2260
2261 assert_eq!(node.name, "b");
2262 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2264 assert!(
2265 all_names.iter().any(|n| n.contains("a")),
2266 "Expected to trace to column a, got: {:?}",
2267 all_names
2268 );
2269 }
2270
2271 #[test]
2272 fn test_qualified_column() {
2273 let expr = parse("SELECT t.a FROM t");
2274 let node = lineage("a", &expr, None, false).unwrap();
2275
2276 assert_eq!(node.name, "a");
2277 let names = node.downstream_names();
2278 assert!(
2279 names.iter().any(|n| n == "t.a"),
2280 "Expected t.a, got: {:?}",
2281 names
2282 );
2283 }
2284
2285 #[test]
2286 fn test_unqualified_column() {
2287 let expr = parse("SELECT a FROM t");
2288 let node = lineage("a", &expr, None, false).unwrap();
2289
2290 let names = node.downstream_names();
2292 assert!(
2293 names.iter().any(|n| n == "t.a"),
2294 "Expected t.a, got: {:?}",
2295 names
2296 );
2297 }
2298
2299 #[test]
2300 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2301 let query = "SELECT name FROM users";
2302 let dialect = Dialect::get(DialectType::BigQuery);
2303 let expr = dialect
2304 .parse(query)
2305 .unwrap()
2306 .into_iter()
2307 .next()
2308 .expect("expected one expression");
2309
2310 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2311 schema
2312 .add_table("users", &[("name".into(), DataType::Text)], None)
2313 .expect("schema setup");
2314
2315 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2316 .expect("lineage without schema");
2317 let mut expr_without = node_without_schema.expression.clone();
2318 annotate_types(
2319 &mut expr_without,
2320 Some(&schema),
2321 Some(DialectType::BigQuery),
2322 );
2323 assert_eq!(
2324 expr_without.inferred_type(),
2325 None,
2326 "Expected unresolved root type without schema-aware lineage qualification"
2327 );
2328
2329 let node_with_schema = lineage_with_schema(
2330 "name",
2331 &expr,
2332 Some(&schema),
2333 Some(DialectType::BigQuery),
2334 false,
2335 )
2336 .expect("lineage with schema");
2337 let mut expr_with = node_with_schema.expression.clone();
2338 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2339
2340 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2341 }
2342
2343 #[test]
2344 fn test_lineage_with_schema_correlated_scalar_subquery() {
2345 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2346 let dialect = Dialect::get(DialectType::BigQuery);
2347 let expr = dialect
2348 .parse(query)
2349 .unwrap()
2350 .into_iter()
2351 .next()
2352 .expect("expected one expression");
2353
2354 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2355 schema
2356 .add_table(
2357 "t1",
2358 &[("id".into(), DataType::BigInt { length: None })],
2359 None,
2360 )
2361 .expect("schema setup");
2362 schema
2363 .add_table(
2364 "t2",
2365 &[
2366 ("id".into(), DataType::BigInt { length: None }),
2367 ("val".into(), DataType::BigInt { length: None }),
2368 ],
2369 None,
2370 )
2371 .expect("schema setup");
2372
2373 let node = lineage_with_schema(
2374 "id",
2375 &expr,
2376 Some(&schema),
2377 Some(DialectType::BigQuery),
2378 false,
2379 )
2380 .expect("lineage_with_schema should handle correlated scalar subqueries");
2381
2382 assert_eq!(node.name, "id");
2383 }
2384
2385 #[test]
2386 fn test_lineage_with_schema_join_using() {
2387 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2388 let dialect = Dialect::get(DialectType::BigQuery);
2389 let expr = dialect
2390 .parse(query)
2391 .unwrap()
2392 .into_iter()
2393 .next()
2394 .expect("expected one expression");
2395
2396 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2397 schema
2398 .add_table(
2399 "t1",
2400 &[("a".into(), DataType::BigInt { length: None })],
2401 None,
2402 )
2403 .expect("schema setup");
2404 schema
2405 .add_table(
2406 "t2",
2407 &[("a".into(), DataType::BigInt { length: None })],
2408 None,
2409 )
2410 .expect("schema setup");
2411
2412 let node = lineage_with_schema(
2413 "a",
2414 &expr,
2415 Some(&schema),
2416 Some(DialectType::BigQuery),
2417 false,
2418 )
2419 .expect("lineage_with_schema should handle JOIN USING");
2420
2421 assert_eq!(node.name, "a");
2422 }
2423
2424 #[test]
2425 fn test_lineage_with_schema_qualified_table_name() {
2426 let query = "SELECT a FROM raw.t1";
2427 let dialect = Dialect::get(DialectType::BigQuery);
2428 let expr = dialect
2429 .parse(query)
2430 .unwrap()
2431 .into_iter()
2432 .next()
2433 .expect("expected one expression");
2434
2435 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2436 schema
2437 .add_table(
2438 "raw.t1",
2439 &[("a".into(), DataType::BigInt { length: None })],
2440 None,
2441 )
2442 .expect("schema setup");
2443
2444 let node = lineage_with_schema(
2445 "a",
2446 &expr,
2447 Some(&schema),
2448 Some(DialectType::BigQuery),
2449 false,
2450 )
2451 .expect("lineage_with_schema should handle dotted schema.table names");
2452
2453 assert_eq!(node.name, "a");
2454 }
2455
2456 #[test]
2457 fn test_lineage_with_schema_none_matches_lineage() {
2458 let expr = parse("SELECT a FROM t");
2459 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2460 let with_none =
2461 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2462
2463 assert_eq!(with_none.name, baseline.name);
2464 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2465 }
2466
2467 #[test]
2468 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2469 let dialect = Dialect::get(DialectType::BigQuery);
2470 let expr = dialect
2471 .parse("SELECT Name AS name FROM teams")
2472 .unwrap()
2473 .into_iter()
2474 .next()
2475 .expect("expected one expression");
2476
2477 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2478 schema
2479 .add_table(
2480 "teams",
2481 &[("Name".into(), DataType::String { length: None })],
2482 None,
2483 )
2484 .expect("schema setup");
2485
2486 let node = lineage_with_schema(
2487 "name",
2488 &expr,
2489 Some(&schema),
2490 Some(DialectType::BigQuery),
2491 false,
2492 )
2493 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2494
2495 let names = node.downstream_names();
2496 assert!(
2497 names.iter().any(|n| n == "teams.Name"),
2498 "Expected teams.Name in downstream, got: {:?}",
2499 names
2500 );
2501 }
2502
2503 #[test]
2504 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2505 let dialect = Dialect::get(DialectType::BigQuery);
2506 let expr = dialect
2507 .parse("SELECT Name AS Name FROM teams")
2508 .unwrap()
2509 .into_iter()
2510 .next()
2511 .expect("expected one expression");
2512
2513 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2514 .expect("lineage should resolve mixed-case aliases in BigQuery");
2515
2516 assert_eq!(node.name, "name");
2517 }
2518
2519 #[test]
2520 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2521 let expr = parse_one(
2522 r#"
2523SELECT date_val AS week_start
2524FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2525"#,
2526 DialectType::BigQuery,
2527 )
2528 .expect("parse");
2529
2530 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2531 .expect("lineage should resolve UNNEST alias as a source");
2532 let child = node
2533 .downstream
2534 .first()
2535 .expect("week_start should have downstream lineage");
2536
2537 assert_eq!(child.name, "date_val.date_val");
2538 assert_eq!(child.source_name, "date_val");
2539
2540 let Expression::Column(column) = &child.expression else {
2541 panic!(
2542 "expected downstream column expression, got {:?}",
2543 child.expression
2544 );
2545 };
2546 assert_eq!(column.name.name, "date_val");
2547 assert_eq!(
2548 column.table.as_ref().map(|table| table.name.as_str()),
2549 Some("date_val")
2550 );
2551 assert!(
2552 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2553 "expected UNNEST source expression, got {:?}",
2554 child.source
2555 );
2556 }
2557
2558 #[test]
2559 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2560 let expr = parse_one(
2561 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2562 DialectType::Snowflake,
2563 )
2564 .expect("parse");
2565
2566 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2567 schema
2568 .add_table(
2569 "fact.some_daily_metrics",
2570 &[("date_utc".to_string(), DataType::Date)],
2571 None,
2572 )
2573 .expect("schema setup");
2574
2575 let node = lineage_with_schema(
2576 "recency",
2577 &expr,
2578 Some(&schema),
2579 Some(DialectType::Snowflake),
2580 false,
2581 )
2582 .expect("lineage_with_schema should not treat date part as a column");
2583
2584 let names = node.downstream_names();
2585 assert!(
2586 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2587 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2588 names
2589 );
2590 assert!(
2591 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2592 "Did not expect date part to appear as lineage column, got: {:?}",
2593 names
2594 );
2595 }
2596
2597 #[test]
2598 fn test_snowflake_datediff_parses_to_typed_ast() {
2599 let expr = parse_one(
2600 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2601 DialectType::Snowflake,
2602 )
2603 .expect("parse");
2604
2605 match expr {
2606 Expression::Select(select) => match &select.expressions[0] {
2607 Expression::Alias(alias) => match &alias.this {
2608 Expression::DateDiff(f) => {
2609 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2610 }
2611 other => panic!("expected DateDiff, got {other:?}"),
2612 },
2613 other => panic!("expected Alias, got {other:?}"),
2614 },
2615 other => panic!("expected Select, got {other:?}"),
2616 }
2617 }
2618
2619 #[test]
2620 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2621 let expr = parse_one(
2622 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2623 DialectType::Snowflake,
2624 )
2625 .expect("parse");
2626
2627 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2628 schema
2629 .add_table(
2630 "fact.some_daily_metrics",
2631 &[("date_utc".to_string(), DataType::Date)],
2632 None,
2633 )
2634 .expect("schema setup");
2635
2636 let node = lineage_with_schema(
2637 "next_day",
2638 &expr,
2639 Some(&schema),
2640 Some(DialectType::Snowflake),
2641 false,
2642 )
2643 .expect("lineage_with_schema should not treat DATEADD date part as a column");
2644
2645 let names = node.downstream_names();
2646 assert!(
2647 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2648 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2649 names
2650 );
2651 assert!(
2652 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2653 "Did not expect date part to appear as lineage column, got: {:?}",
2654 names
2655 );
2656 }
2657
2658 #[test]
2659 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2660 let expr = parse_one(
2661 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2662 DialectType::Snowflake,
2663 )
2664 .expect("parse");
2665
2666 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2667 schema
2668 .add_table(
2669 "fact.some_daily_metrics",
2670 &[("date_utc".to_string(), DataType::Date)],
2671 None,
2672 )
2673 .expect("schema setup");
2674
2675 let node = lineage_with_schema(
2676 "day_part",
2677 &expr,
2678 Some(&schema),
2679 Some(DialectType::Snowflake),
2680 false,
2681 )
2682 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2683
2684 let names = node.downstream_names();
2685 assert!(
2686 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2687 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2688 names
2689 );
2690 assert!(
2691 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2692 "Did not expect date part to appear as lineage column, got: {:?}",
2693 names
2694 );
2695 }
2696
2697 #[test]
2698 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2699 let expr = parse_one(
2700 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2701 DialectType::Snowflake,
2702 )
2703 .expect("parse");
2704
2705 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2706 schema
2707 .add_table(
2708 "fact.some_daily_metrics",
2709 &[("date_utc".to_string(), DataType::Date)],
2710 None,
2711 )
2712 .expect("schema setup");
2713
2714 let node = lineage_with_schema(
2715 "day_part",
2716 &expr,
2717 Some(&schema),
2718 Some(DialectType::Snowflake),
2719 false,
2720 )
2721 .expect("quoted DATE_PART should continue to work");
2722
2723 let names = node.downstream_names();
2724 assert!(
2725 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2726 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2727 names
2728 );
2729 }
2730
2731 #[test]
2732 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2733 let expr = parse_one(
2734 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2735 DialectType::Snowflake,
2736 )
2737 .expect("parse");
2738
2739 match expr {
2740 Expression::Select(select) => match &select.expressions[0] {
2741 Expression::Alias(alias) => match &alias.this {
2742 Expression::Function(f) => {
2743 assert_eq!(f.name.to_uppercase(), "DATEADD");
2744 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2745 }
2746 other => panic!("expected generic DATEADD function, got {other:?}"),
2747 },
2748 other => panic!("expected Alias, got {other:?}"),
2749 },
2750 other => panic!("expected Select, got {other:?}"),
2751 }
2752 }
2753
2754 #[test]
2755 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2756 let expr = parse_one(
2757 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2758 DialectType::Snowflake,
2759 )
2760 .expect("parse");
2761
2762 match expr {
2763 Expression::Select(select) => match &select.expressions[0] {
2764 Expression::Alias(alias) => match &alias.this {
2765 Expression::Function(f) => {
2766 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2767 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2768 }
2769 other => panic!("expected generic DATE_PART function, got {other:?}"),
2770 },
2771 other => panic!("expected Alias, got {other:?}"),
2772 },
2773 other => panic!("expected Select, got {other:?}"),
2774 }
2775 }
2776
2777 #[test]
2778 fn test_snowflake_date_part_string_literal_stays_generic_function() {
2779 let expr = parse_one(
2780 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2781 DialectType::Snowflake,
2782 )
2783 .expect("parse");
2784
2785 match expr {
2786 Expression::Select(select) => match &select.expressions[0] {
2787 Expression::Alias(alias) => match &alias.this {
2788 Expression::Function(f) => {
2789 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2790 }
2791 other => panic!("expected generic DATE_PART function, got {other:?}"),
2792 },
2793 other => panic!("expected Alias, got {other:?}"),
2794 },
2795 other => panic!("expected Select, got {other:?}"),
2796 }
2797 }
2798
2799 #[test]
2800 fn test_lineage_join() {
2801 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2802
2803 let node_a = lineage("a", &expr, None, false).unwrap();
2804 let names_a = node_a.downstream_names();
2805 assert!(
2806 names_a.iter().any(|n| n == "t.a"),
2807 "Expected t.a, got: {:?}",
2808 names_a
2809 );
2810
2811 let node_b = lineage("b", &expr, None, false).unwrap();
2812 let names_b = node_b.downstream_names();
2813 assert!(
2814 names_b.iter().any(|n| n == "s.b"),
2815 "Expected s.b, got: {:?}",
2816 names_b
2817 );
2818 }
2819
2820 #[test]
2821 fn test_lineage_alias_leaf_has_resolved_source_name() {
2822 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2823 let node = lineage("col1", &expr, None, false).unwrap();
2824
2825 let names = node.downstream_names();
2827 assert!(
2828 names.iter().any(|n| n == "t1.col1"),
2829 "Expected aliased column edge t1.col1, got: {:?}",
2830 names
2831 );
2832
2833 let leaf = node
2835 .downstream
2836 .iter()
2837 .find(|n| n.name == "t1.col1")
2838 .expect("Expected t1.col1 leaf");
2839 assert_eq!(leaf.source_name, "table1");
2840 match &leaf.source {
2841 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2842 _ => panic!("Expected leaf source to be a table expression"),
2843 }
2844 }
2845
2846 #[test]
2847 fn test_lineage_derived_table() {
2848 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2849 let node = lineage("a", &expr, None, false).unwrap();
2850
2851 assert_eq!(node.name, "a");
2852 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2854 assert!(
2855 all_names.iter().any(|n| n == "t.a"),
2856 "Expected to trace through derived table to t.a, got: {:?}",
2857 all_names
2858 );
2859 }
2860
2861 #[test]
2862 fn test_lineage_cte() {
2863 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2864 let node = lineage("a", &expr, None, false).unwrap();
2865
2866 assert_eq!(node.name, "a");
2867 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2868 assert!(
2869 all_names.iter().any(|n| n == "t.a"),
2870 "Expected to trace through CTE to t.a, got: {:?}",
2871 all_names
2872 );
2873 }
2874
2875 #[test]
2876 fn test_lineage_union() {
2877 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2878 let node = lineage("a", &expr, None, false).unwrap();
2879
2880 assert_eq!(node.name, "a");
2881 assert_eq!(
2883 node.downstream.len(),
2884 2,
2885 "Expected 2 branches for UNION, got {}",
2886 node.downstream.len()
2887 );
2888 }
2889
2890 #[test]
2891 fn test_lineage_cte_union() {
2892 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2893 let node = lineage("a", &expr, None, false).unwrap();
2894
2895 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2897 assert!(
2898 all_names.len() >= 3,
2899 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2900 all_names
2901 );
2902 }
2903
2904 #[test]
2905 fn test_lineage_star() {
2906 let expr = parse("SELECT * FROM t");
2907 let node = lineage("*", &expr, None, false).unwrap();
2908
2909 assert_eq!(node.name, "*");
2910 assert!(
2912 !node.downstream.is_empty(),
2913 "Star should produce downstream nodes"
2914 );
2915 }
2916
2917 #[test]
2918 fn test_lineage_subquery_in_select() {
2919 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2920 let node = lineage("x", &expr, None, false).unwrap();
2921
2922 assert_eq!(node.name, "x");
2923 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2925 assert!(
2926 all_names.len() >= 2,
2927 "Expected tracing into scalar subquery, got: {:?}",
2928 all_names
2929 );
2930 }
2931
2932 #[test]
2933 fn test_lineage_multiple_columns() {
2934 let expr = parse("SELECT a, b FROM t");
2935
2936 let node_a = lineage("a", &expr, None, false).unwrap();
2937 let node_b = lineage("b", &expr, None, false).unwrap();
2938
2939 assert_eq!(node_a.name, "a");
2940 assert_eq!(node_b.name, "b");
2941
2942 let names_a = node_a.downstream_names();
2944 let names_b = node_b.downstream_names();
2945 assert!(names_a.iter().any(|n| n == "t.a"));
2946 assert!(names_b.iter().any(|n| n == "t.b"));
2947 }
2948
2949 #[test]
2950 fn test_get_source_tables() {
2951 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2952 let node = lineage("a", &expr, None, false).unwrap();
2953
2954 let tables = get_source_tables(&node);
2955 assert!(
2956 tables.contains("t"),
2957 "Expected source table 't', got: {:?}",
2958 tables
2959 );
2960 }
2961
2962 #[test]
2963 fn test_lineage_column_not_found() {
2964 let expr = parse("SELECT a FROM t");
2965 let result = lineage("nonexistent", &expr, None, false);
2966 assert!(result.is_err());
2967 }
2968
2969 #[test]
2970 fn test_lineage_nested_cte() {
2971 let expr = parse(
2972 "WITH cte1 AS (SELECT a FROM t), \
2973 cte2 AS (SELECT a FROM cte1) \
2974 SELECT a FROM cte2",
2975 );
2976 let node = lineage("a", &expr, None, false).unwrap();
2977
2978 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2980 assert!(
2981 all_names.len() >= 3,
2982 "Expected to trace through nested CTEs, got: {:?}",
2983 all_names
2984 );
2985 }
2986
2987 #[test]
2988 fn test_trim_selects_true() {
2989 let expr = parse("SELECT a, b, c FROM t");
2990 let node = lineage("a", &expr, None, true).unwrap();
2991
2992 if let Expression::Select(select) = &node.source {
2994 assert_eq!(
2995 select.expressions.len(),
2996 1,
2997 "Trimmed source should have 1 expression, got {}",
2998 select.expressions.len()
2999 );
3000 } else {
3001 panic!("Expected Select source");
3002 }
3003 }
3004
3005 #[test]
3006 fn test_trim_selects_false() {
3007 let expr = parse("SELECT a, b, c FROM t");
3008 let node = lineage("a", &expr, None, false).unwrap();
3009
3010 if let Expression::Select(select) = &node.source {
3012 assert_eq!(
3013 select.expressions.len(),
3014 3,
3015 "Untrimmed source should have 3 expressions"
3016 );
3017 } else {
3018 panic!("Expected Select source");
3019 }
3020 }
3021
3022 #[test]
3023 fn test_lineage_expression_in_select() {
3024 let expr = parse("SELECT a + b AS c FROM t");
3025 let node = lineage("c", &expr, None, false).unwrap();
3026
3027 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3029 assert!(
3030 all_names.len() >= 3,
3031 "Expected to trace a + b to both columns, got: {:?}",
3032 all_names
3033 );
3034 }
3035
3036 #[test]
3037 fn test_set_operation_by_index() {
3038 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3039
3040 let node = lineage("a", &expr, None, false).unwrap();
3042
3043 assert_eq!(node.downstream.len(), 2);
3045 }
3046
3047 fn print_node(node: &LineageNode, indent: usize) {
3050 let pad = " ".repeat(indent);
3051 println!(
3052 "{pad}name={:?} source_name={:?}",
3053 node.name, node.source_name
3054 );
3055 for child in &node.downstream {
3056 print_node(child, indent + 1);
3057 }
3058 }
3059
3060 #[test]
3061 fn test_issue18_repro() {
3062 let query = "SELECT UPPER(name) as upper_name FROM users";
3064 println!("Query: {query}\n");
3065
3066 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3067 let exprs = dialect.parse(query).unwrap();
3068 let expr = &exprs[0];
3069
3070 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3071 println!("lineage(\"upper_name\"):");
3072 print_node(&node, 1);
3073
3074 let names = node.downstream_names();
3075 assert!(
3076 names.iter().any(|n| n == "users.name"),
3077 "Expected users.name in downstream, got: {:?}",
3078 names
3079 );
3080 }
3081
3082 #[test]
3083 fn test_lineage_bigquery_safe_namespace_issue207() {
3084 let query = r#"
3085WITH import_cte AS (
3086 SELECT timestamp, data, operation
3087 FROM `project`.`dataset`.`source_table`
3088),
3089transform_cte AS (
3090 SELECT
3091 timestamp,
3092 SAFE.PARSE_JSON(data) AS json_data
3093 FROM import_cte
3094)
3095SELECT json_data FROM transform_cte
3096"#;
3097 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3098 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3099 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3100 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3101
3102 assert!(
3103 names.iter().any(|name| name == "source_table.data"),
3104 "expected source_table.data in lineage, got {names:?}"
3105 );
3106 assert!(
3107 !names
3108 .iter()
3109 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3110 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3111 );
3112 }
3113
3114 #[test]
3115 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3116 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3117 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3118 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3119 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3120
3121 assert!(
3122 names.iter().any(|name| name == "t.data"),
3123 "expected t.data in lineage, got {names:?}"
3124 );
3125 assert!(
3126 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3127 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3128 );
3129 }
3130
3131 #[test]
3132 fn test_lineage_method_call_receiver_control() {
3133 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3134 let node = lineage("out", &expr, None, false).expect("lineage");
3135 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3136
3137 assert!(
3138 names.iter().any(|name| name == "t.obj"),
3139 "expected ordinary method receiver to remain in lineage, got {names:?}"
3140 );
3141 assert!(
3142 names.iter().any(|name| name == "t.arg"),
3143 "expected method argument in lineage, got {names:?}"
3144 );
3145 }
3146
3147 #[test]
3148 fn test_lineage_upper_function() {
3149 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3150 let node = lineage("upper_name", &expr, None, false).unwrap();
3151
3152 let names = node.downstream_names();
3153 assert!(
3154 names.iter().any(|n| n == "users.name"),
3155 "Expected users.name in downstream, got: {:?}",
3156 names
3157 );
3158 }
3159
3160 #[test]
3161 fn test_lineage_round_function() {
3162 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3163 let node = lineage("rounded", &expr, None, false).unwrap();
3164
3165 let names = node.downstream_names();
3166 assert!(
3167 names.iter().any(|n| n == "products.price"),
3168 "Expected products.price in downstream, got: {:?}",
3169 names
3170 );
3171 }
3172
3173 #[test]
3174 fn test_lineage_coalesce_function() {
3175 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3176 let node = lineage("val", &expr, None, false).unwrap();
3177
3178 let names = node.downstream_names();
3179 assert!(
3180 names.iter().any(|n| n == "t.a"),
3181 "Expected t.a in downstream, got: {:?}",
3182 names
3183 );
3184 assert!(
3185 names.iter().any(|n| n == "t.b"),
3186 "Expected t.b in downstream, got: {:?}",
3187 names
3188 );
3189 }
3190
3191 #[test]
3192 fn test_lineage_count_function() {
3193 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3194 let node = lineage("cnt", &expr, None, false).unwrap();
3195
3196 let names = node.downstream_names();
3197 assert!(
3198 names.iter().any(|n| n == "t.id"),
3199 "Expected t.id in downstream, got: {:?}",
3200 names
3201 );
3202 }
3203
3204 #[test]
3205 fn test_lineage_sum_function() {
3206 let expr = parse("SELECT SUM(amount) AS total FROM t");
3207 let node = lineage("total", &expr, None, false).unwrap();
3208
3209 let names = node.downstream_names();
3210 assert!(
3211 names.iter().any(|n| n == "t.amount"),
3212 "Expected t.amount in downstream, got: {:?}",
3213 names
3214 );
3215 }
3216
3217 #[test]
3218 fn test_lineage_case_with_nested_functions() {
3219 let expr =
3220 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3221 let node = lineage("result", &expr, None, false).unwrap();
3222
3223 let names = node.downstream_names();
3224 assert!(
3225 names.iter().any(|n| n == "t.x"),
3226 "Expected t.x in downstream, got: {:?}",
3227 names
3228 );
3229 assert!(
3230 names.iter().any(|n| n == "t.name"),
3231 "Expected t.name in downstream, got: {:?}",
3232 names
3233 );
3234 }
3235
3236 #[test]
3237 fn test_lineage_substring_function() {
3238 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3239 let node = lineage("short", &expr, None, false).unwrap();
3240
3241 let names = node.downstream_names();
3242 assert!(
3243 names.iter().any(|n| n == "t.name"),
3244 "Expected t.name in downstream, got: {:?}",
3245 names
3246 );
3247 }
3248
3249 #[test]
3252 fn test_lineage_cte_select_star() {
3253 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3257 let node = lineage("a", &expr, None, false).unwrap();
3258
3259 assert_eq!(node.name, "a");
3260 assert!(
3263 !node.downstream.is_empty(),
3264 "Expected downstream nodes tracing through CTE, got none"
3265 );
3266 }
3267
3268 #[test]
3269 fn test_lineage_cte_select_star_renamed_column() {
3270 let expr =
3273 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3274 let node = lineage("customer_id", &expr, None, false).unwrap();
3275
3276 assert_eq!(node.name, "customer_id");
3277 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3279 assert!(
3280 all_names.len() >= 2,
3281 "Expected at least 2 nodes (customer_id → source), got: {:?}",
3282 all_names
3283 );
3284 }
3285
3286 #[test]
3287 fn test_lineage_cte_select_star_multiple_columns() {
3288 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3290
3291 for col in &["a", "b", "c"] {
3292 let node = lineage(col, &expr, None, false).unwrap();
3293 assert_eq!(node.name, *col);
3294 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3296 assert!(
3297 all_names.len() >= 2,
3298 "Expected at least 2 nodes for column {}, got: {:?}",
3299 col,
3300 all_names
3301 );
3302 }
3303 }
3304
3305 #[test]
3306 fn test_lineage_nested_cte_select_star() {
3307 let expr = parse(
3309 "WITH cte1 AS (SELECT a FROM t), \
3310 cte2 AS (SELECT * FROM cte1) \
3311 SELECT * FROM cte2",
3312 );
3313 let node = lineage("a", &expr, None, false).unwrap();
3314
3315 assert_eq!(node.name, "a");
3316 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3317 assert!(
3318 all_names.len() >= 3,
3319 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3320 all_names
3321 );
3322 }
3323
3324 #[test]
3325 fn test_lineage_three_level_nested_cte_star() {
3326 let expr = parse(
3328 "WITH cte1 AS (SELECT x FROM t), \
3329 cte2 AS (SELECT * FROM cte1), \
3330 cte3 AS (SELECT * FROM cte2) \
3331 SELECT * FROM cte3",
3332 );
3333 let node = lineage("x", &expr, None, false).unwrap();
3334
3335 assert_eq!(node.name, "x");
3336 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3337 assert!(
3338 all_names.len() >= 4,
3339 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3340 all_names
3341 );
3342 }
3343
3344 #[test]
3345 fn test_lineage_cte_union_star() {
3346 let expr = parse(
3348 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3349 SELECT * FROM cte",
3350 );
3351 let node = lineage("a", &expr, None, false).unwrap();
3352
3353 assert_eq!(node.name, "a");
3354 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3355 assert!(
3356 all_names.len() >= 2,
3357 "Expected at least 2 nodes for CTE union star, got: {:?}",
3358 all_names
3359 );
3360 }
3361
3362 #[test]
3363 fn test_lineage_cte_star_unknown_table() {
3364 let expr = parse(
3367 "WITH cte AS (SELECT * FROM unknown_table) \
3368 SELECT * FROM cte",
3369 );
3370 let _result = lineage("x", &expr, None, false);
3373 }
3374
3375 #[test]
3376 fn test_lineage_cte_explicit_columns() {
3377 let expr = parse(
3379 "WITH cte(x, y) AS (SELECT a, b FROM t) \
3380 SELECT * FROM cte",
3381 );
3382 let node = lineage("x", &expr, None, false).unwrap();
3383 assert_eq!(node.name, "x");
3384 }
3385
3386 #[test]
3387 fn test_lineage_cte_qualified_star() {
3388 let expr = parse(
3390 "WITH cte AS (SELECT a, b FROM t) \
3391 SELECT cte.* FROM cte",
3392 );
3393 for col in &["a", "b"] {
3394 let node = lineage(col, &expr, None, false).unwrap();
3395 assert_eq!(node.name, *col);
3396 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3397 assert!(
3398 all_names.len() >= 2,
3399 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3400 col,
3401 all_names
3402 );
3403 }
3404 }
3405
3406 #[test]
3407 fn test_lineage_subquery_select_star() {
3408 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3411 let node = lineage("x", &expr, None, false).unwrap();
3412
3413 assert_eq!(node.name, "x");
3414 assert!(
3415 !node.downstream.is_empty(),
3416 "Expected downstream nodes for subquery with SELECT *, got none"
3417 );
3418 }
3419
3420 #[test]
3421 fn test_lineage_cte_star_with_schema_external_table() {
3422 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3424SELECT * FROM orders"#;
3425 let expr = parse(sql);
3426
3427 let mut schema = MappingSchema::new();
3428 let cols = vec![
3429 ("order_id".to_string(), DataType::Unknown),
3430 ("customer_id".to_string(), DataType::Unknown),
3431 ("amount".to_string(), DataType::Unknown),
3432 ];
3433 schema.add_table("stg_orders", &cols, None).unwrap();
3434
3435 let node =
3436 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3437 .unwrap();
3438 assert_eq!(node.name, "order_id");
3439 }
3440
3441 #[test]
3442 fn test_lineage_cte_star_with_schema_three_part_name() {
3443 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3445SELECT * FROM orders"#;
3446 let expr = parse(sql);
3447
3448 let mut schema = MappingSchema::new();
3449 let cols = vec![
3450 ("order_id".to_string(), DataType::Unknown),
3451 ("customer_id".to_string(), DataType::Unknown),
3452 ];
3453 schema
3454 .add_table("db.schema.stg_orders", &cols, None)
3455 .unwrap();
3456
3457 let node = lineage_with_schema(
3458 "customer_id",
3459 &expr,
3460 Some(&schema as &dyn Schema),
3461 None,
3462 false,
3463 )
3464 .unwrap();
3465 assert_eq!(node.name, "customer_id");
3466 }
3467
3468 #[test]
3469 fn test_lineage_cte_star_with_schema_nested() {
3470 let sql = r#"WITH
3473 raw AS (SELECT * FROM external_table),
3474 enriched AS (SELECT * FROM raw)
3475 SELECT * FROM enriched"#;
3476 let expr = parse(sql);
3477
3478 let mut schema = MappingSchema::new();
3479 let cols = vec![
3480 ("id".to_string(), DataType::Unknown),
3481 ("name".to_string(), DataType::Unknown),
3482 ];
3483 schema.add_table("external_table", &cols, None).unwrap();
3484
3485 let node =
3486 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3487 assert_eq!(node.name, "name");
3488 }
3489
3490 #[test]
3491 fn test_lineage_cte_qualified_star_with_schema() {
3492 let sql = r#"WITH
3495 orders AS (SELECT * FROM stg_orders),
3496 enriched AS (
3497 SELECT orders.*, 'extra' AS extra
3498 FROM orders
3499 )
3500 SELECT * FROM enriched"#;
3501 let expr = parse(sql);
3502
3503 let mut schema = MappingSchema::new();
3504 let cols = vec![
3505 ("order_id".to_string(), DataType::Unknown),
3506 ("total".to_string(), DataType::Unknown),
3507 ];
3508 schema.add_table("stg_orders", &cols, None).unwrap();
3509
3510 let node =
3511 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3512 .unwrap();
3513 assert_eq!(node.name, "order_id");
3514
3515 let extra =
3517 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3518 assert_eq!(extra.name, "extra");
3519 }
3520
3521 #[test]
3522 fn test_lineage_cte_star_without_schema_still_works() {
3523 let sql = r#"WITH
3525 cte1 AS (SELECT id, name FROM raw_table),
3526 cte2 AS (SELECT * FROM cte1)
3527 SELECT * FROM cte2"#;
3528 let expr = parse(sql);
3529
3530 let node = lineage("id", &expr, None, false).unwrap();
3532 assert_eq!(node.name, "id");
3533 }
3534
3535 #[test]
3536 fn test_lineage_nested_cte_star_with_join_and_schema() {
3537 let sql = r#"WITH
3540base_orders AS (
3541 SELECT * FROM stg_orders
3542),
3543with_payments AS (
3544 SELECT
3545 base_orders.*,
3546 p.amount
3547 FROM base_orders
3548 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3549),
3550final_cte AS (
3551 SELECT * FROM with_payments
3552)
3553SELECT * FROM final_cte"#;
3554 let expr = parse(sql);
3555
3556 let mut schema = MappingSchema::new();
3557 let order_cols = vec![
3558 (
3559 "order_id".to_string(),
3560 crate::expressions::DataType::Unknown,
3561 ),
3562 (
3563 "customer_id".to_string(),
3564 crate::expressions::DataType::Unknown,
3565 ),
3566 ("status".to_string(), crate::expressions::DataType::Unknown),
3567 ];
3568 let pay_cols = vec![
3569 (
3570 "payment_id".to_string(),
3571 crate::expressions::DataType::Unknown,
3572 ),
3573 (
3574 "order_id".to_string(),
3575 crate::expressions::DataType::Unknown,
3576 ),
3577 ("amount".to_string(), crate::expressions::DataType::Unknown),
3578 ];
3579 schema.add_table("stg_orders", &order_cols, None).unwrap();
3580 schema.add_table("stg_payments", &pay_cols, None).unwrap();
3581
3582 let node =
3584 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3585 .unwrap();
3586 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3587
3588 let has_table_qualified = all_names
3590 .iter()
3591 .any(|n| n.contains('.') && n.contains("order_id"));
3592 assert!(
3593 has_table_qualified,
3594 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
3595 all_names
3596 );
3597
3598 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
3600 .unwrap();
3601 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3602
3603 let has_table_qualified = all_names
3604 .iter()
3605 .any(|n| n.contains('.') && n.contains("amount"));
3606 assert!(
3607 has_table_qualified,
3608 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
3609 all_names
3610 );
3611 }
3612
3613 #[test]
3614 fn test_lineage_cte_alias_resolution() {
3615 let sql = r#"WITH import_stg_items AS (
3617 SELECT item_id, name, status FROM stg_items
3618)
3619SELECT base.item_id, base.status
3620FROM import_stg_items AS base"#;
3621 let expr = parse(sql);
3622
3623 let node = lineage("item_id", &expr, None, false).unwrap();
3624 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3625 assert!(
3627 all_names.iter().any(|n| n == "stg_items.item_id"),
3628 "Expected leaf 'stg_items.item_id', got: {:?}",
3629 all_names
3630 );
3631 }
3632
3633 #[test]
3634 fn test_lineage_cte_alias_with_schema_and_star() {
3635 let sql = r#"WITH import_stg AS (
3637 SELECT * FROM stg_items
3638)
3639SELECT base.item_id, base.status
3640FROM import_stg AS base"#;
3641 let expr = parse(sql);
3642
3643 let mut schema = MappingSchema::new();
3644 schema
3645 .add_table(
3646 "stg_items",
3647 &[
3648 ("item_id".to_string(), DataType::Unknown),
3649 ("name".to_string(), DataType::Unknown),
3650 ("status".to_string(), DataType::Unknown),
3651 ],
3652 None,
3653 )
3654 .unwrap();
3655
3656 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
3657 .unwrap();
3658 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3659 assert!(
3660 all_names.iter().any(|n| n == "stg_items.item_id"),
3661 "Expected leaf 'stg_items.item_id', got: {:?}",
3662 all_names
3663 );
3664 }
3665
3666 #[test]
3667 fn test_lineage_cte_alias_with_join() {
3668 let sql = r#"WITH
3670 import_users AS (SELECT id, name FROM users),
3671 import_orders AS (SELECT id, user_id, amount FROM orders)
3672SELECT u.name, o.amount
3673FROM import_users AS u
3674LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3675 let expr = parse(sql);
3676
3677 let node = lineage("name", &expr, None, false).unwrap();
3678 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3679 assert!(
3680 all_names.iter().any(|n| n == "users.name"),
3681 "Expected leaf 'users.name', got: {:?}",
3682 all_names
3683 );
3684
3685 let node = lineage("amount", &expr, None, false).unwrap();
3686 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3687 assert!(
3688 all_names.iter().any(|n| n == "orders.amount"),
3689 "Expected leaf 'orders.amount', got: {:?}",
3690 all_names
3691 );
3692 }
3693
3694 #[test]
3699 fn test_lineage_unquoted_cte_case_insensitive() {
3700 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
3703 let node = lineage("col", &expr, None, false).unwrap();
3704 assert_eq!(node.name, "col");
3705 assert!(
3706 !node.downstream.is_empty(),
3707 "Unquoted CTE should resolve case-insensitively"
3708 );
3709 }
3710
3711 #[test]
3712 fn test_lineage_quoted_cte_case_preserved() {
3713 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
3715 let node = lineage("col", &expr, None, false).unwrap();
3716 assert_eq!(node.name, "col");
3717 assert!(
3718 !node.downstream.is_empty(),
3719 "Quoted CTE with matching case should resolve"
3720 );
3721 }
3722
3723 #[test]
3724 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3725 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
3729 let result = lineage("col", &expr, None, false);
3732 assert!(
3733 result.is_err(),
3734 "Quoted CTE with case mismatch should not expand star: {:?}",
3735 result
3736 );
3737 }
3738
3739 #[test]
3740 fn test_lineage_mixed_quoted_unquoted_cte() {
3741 let expr = parse(
3743 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3744 );
3745 let node = lineage("a", &expr, None, false).unwrap();
3746 assert_eq!(node.name, "a");
3747 assert!(
3748 !node.downstream.is_empty(),
3749 "Mixed quoted/unquoted CTE chain should resolve"
3750 );
3751 }
3752
3753 #[test]
3769 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3770 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
3781 let node = lineage("col", &expr, None, false).unwrap();
3782 assert!(!node.downstream.is_empty());
3783 let child = &node.downstream[0];
3784 assert_eq!(
3786 child.source_name, "MyCte",
3787 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3788 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3789 );
3790 }
3791
3792 #[test]
3793 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
3794 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
3801 let node = lineage("col", &expr, None, false).unwrap();
3802 assert!(!node.downstream.is_empty());
3803 let child = &node.downstream[0];
3804 assert_eq!(
3806 child.source_name, "MyCte",
3807 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3808 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3809 );
3810 }
3811
3812 #[test]
3819 #[ignore = "requires derived table star expansion (separate issue)"]
3820 fn test_node_name_doesnt_contain_comment() {
3821 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
3822 let node = lineage("x", &expr, None, false).unwrap();
3823
3824 assert_eq!(node.name, "x");
3825 assert!(!node.downstream.is_empty());
3826 }
3827
3828 #[test]
3832 fn test_comment_before_first_column_in_cte() {
3833 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
3834 let sql_without_comment = "with t as (select 1 as a) select a from t";
3835
3836 let expr_ok = parse(sql_without_comment);
3838 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
3839
3840 let expr_comment = parse(sql_with_comment);
3842 let node_comment = lineage("a", &expr_comment, None, false)
3843 .expect("with comment before first column should succeed");
3844
3845 assert_eq!(node_ok.name, node_comment.name, "node names should match");
3846 assert_eq!(
3847 node_ok.downstream_names(),
3848 node_comment.downstream_names(),
3849 "downstream lineage should be identical with or without comment"
3850 );
3851 }
3852
3853 #[test]
3855 fn test_block_comment_before_first_column() {
3856 let sql = "with t as (select 1 as a) select /* section */ a from t";
3857 let expr = parse(sql);
3858 let node = lineage("a", &expr, None, false)
3859 .expect("block comment before first column should succeed");
3860 assert_eq!(node.name, "a");
3861 assert!(
3862 !node.downstream.is_empty(),
3863 "should have downstream lineage"
3864 );
3865 }
3866
3867 #[test]
3869 fn test_comment_before_first_column_second_col_ok() {
3870 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
3871 let expr = parse(sql);
3872
3873 let node_a =
3874 lineage("a", &expr, None, false).expect("column a with comment should succeed");
3875 assert_eq!(node_a.name, "a");
3876
3877 let node_b =
3878 lineage("b", &expr, None, false).expect("column b with comment should succeed");
3879 assert_eq!(node_b.name, "b");
3880 }
3881
3882 #[test]
3884 fn test_comment_before_aliased_column() {
3885 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
3886 let expr = parse(sql);
3887 let node =
3888 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
3889 assert_eq!(node.name, "y");
3890 assert!(
3891 !node.downstream.is_empty(),
3892 "aliased column should have downstream lineage"
3893 );
3894 }
3895}