1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub reference_node_name: String,
34}
35
36impl LineageNode {
37 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39 Self {
40 name: name.into(),
41 expression,
42 source,
43 downstream: Vec::new(),
44 source_name: String::new(),
45 reference_node_name: String::new(),
46 }
47 }
48
49 pub fn walk(&self) -> LineageWalker<'_> {
51 LineageWalker { stack: vec![self] }
52 }
53
54 pub fn downstream_names(&self) -> Vec<String> {
56 self.downstream.iter().map(|n| n.name.clone()).collect()
57 }
58}
59
60pub struct LineageWalker<'a> {
62 stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66 type Item = &'a LineageNode;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 if let Some(node) = self.stack.pop() {
70 for child in node.downstream.iter().rev() {
72 self.stack.push(child);
73 }
74 Some(node)
75 } else {
76 None
77 }
78 }
79}
80
81enum ColumnRef<'a> {
87 Name(&'a str),
88 Index(usize),
89}
90
91pub fn lineage(
117 column: &str,
118 sql: &Expression,
119 dialect: Option<DialectType>,
120 trim_selects: bool,
121) -> Result<LineageNode> {
122 let has_with = matches!(sql, Expression::Select(s) if s.with.is_some());
124 if !has_with {
125 return lineage_from_expression(column, sql, dialect, trim_selects);
126 }
127 let mut owned = sql.clone();
128 expand_cte_stars(&mut owned, None);
129 lineage_from_expression(column, &owned, dialect, trim_selects)
130}
131
132pub fn lineage_with_schema(
148 column: &str,
149 sql: &Expression,
150 schema: Option<&dyn Schema>,
151 dialect: Option<DialectType>,
152 trim_selects: bool,
153) -> Result<LineageNode> {
154 let mut qualified_expression = if let Some(schema) = schema {
155 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
156 QualifyColumnsOptions::new().with_dialect(dialect_type)
157 } else {
158 QualifyColumnsOptions::new()
159 };
160
161 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
162 Error::internal(format!("Lineage qualification failed with schema: {}", e))
163 })?
164 } else {
165 sql.clone()
166 };
167
168 annotate_types(&mut qualified_expression, schema, dialect);
170
171 expand_cte_stars(&mut qualified_expression, schema);
174
175 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
176}
177
178fn lineage_from_expression(
179 column: &str,
180 sql: &Expression,
181 dialect: Option<DialectType>,
182 trim_selects: bool,
183) -> Result<LineageNode> {
184 let scope = build_scope(sql);
185 to_node(
186 ColumnRef::Name(column),
187 &scope,
188 dialect,
189 "",
190 "",
191 "",
192 trim_selects,
193 )
194}
195
196fn normalize_cte_name(ident: &Identifier) -> String {
206 if ident.quoted {
207 ident.name.clone()
208 } else {
209 ident.name.to_lowercase()
210 }
211}
212
213pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
225 let select = match expr {
226 Expression::Select(s) => s,
227 _ => return,
228 };
229
230 let with = match &mut select.with {
231 Some(w) => w,
232 None => return,
233 };
234
235 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
236
237 for cte in &mut with.ctes {
238 let cte_name = normalize_cte_name(&cte.alias);
239
240 if !cte.columns.is_empty() {
242 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
243 resolved_cte_columns.insert(cte_name, cols);
244 continue;
245 }
246
247 if with.recursive {
253 let is_self_referencing =
254 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
255 let body_sources = get_select_sources(body_select);
256 body_sources.iter().any(|s| s.normalized == cte_name)
257 } else {
258 false
259 };
260 if is_self_referencing {
261 continue;
262 }
263 }
264
265 let body_select = match get_leftmost_select_mut(&mut cte.this) {
267 Some(s) => s,
268 None => continue,
269 };
270
271 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
272 resolved_cte_columns.insert(cte_name, columns);
273 }
274
275 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
277}
278
279fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
284 let mut current = expr;
285 for _ in 0..MAX_LINEAGE_DEPTH {
286 match current {
287 Expression::Select(s) => return Some(s),
288 Expression::Union(u) => current = &mut u.left,
289 Expression::Intersect(i) => current = &mut i.left,
290 Expression::Except(e) => current = &mut e.left,
291 Expression::Paren(p) => current = &mut p.this,
292 _ => return None,
293 }
294 }
295 None
296}
297
298fn rewrite_stars_in_select(
302 select: &mut Select,
303 resolved_ctes: &HashMap<String, Vec<String>>,
304 schema: Option<&dyn Schema>,
305) -> Vec<String> {
306 let has_star = select
311 .expressions
312 .iter()
313 .any(|e| matches!(e, Expression::Star(_)));
314 let has_qualified_star = select
315 .expressions
316 .iter()
317 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
318
319 if !has_star && !has_qualified_star {
320 return select
322 .expressions
323 .iter()
324 .filter_map(get_expression_output_name)
325 .collect();
326 }
327
328 let sources = get_select_sources(select);
329 let mut new_expressions = Vec::new();
330 let mut result_columns = Vec::new();
331
332 for expr in &select.expressions {
333 match expr {
334 Expression::Star(star) => {
335 let qual = star.table.as_ref();
336 if let Some(expanded) =
337 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
338 {
339 for (src_alias, col_name) in &expanded {
340 let table_id = Identifier::new(src_alias);
341 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
342 result_columns.push(col_name.clone());
343 }
344 } else {
345 new_expressions.push(expr.clone());
346 result_columns.push("*".to_string());
347 }
348 }
349 Expression::Column(c) if c.name.name == "*" => {
350 let qual = c.table.as_ref();
351 if let Some(expanded) =
352 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
353 {
354 for (_src_alias, col_name) in &expanded {
355 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
357 result_columns.push(col_name.clone());
358 }
359 } else {
360 new_expressions.push(expr.clone());
361 result_columns.push("*".to_string());
362 }
363 }
364 _ => {
365 new_expressions.push(expr.clone());
366 if let Some(name) = get_expression_output_name(expr) {
367 result_columns.push(name);
368 }
369 }
370 }
371 }
372
373 select.expressions = new_expressions;
374 result_columns
375}
376
377fn expand_star_from_sources(
382 qualifier: Option<&Identifier>,
383 sources: &[SourceInfo],
384 resolved_ctes: &HashMap<String, Vec<String>>,
385 schema: Option<&dyn Schema>,
386) -> Option<Vec<(String, String)>> {
387 let mut expanded = Vec::new();
388
389 if let Some(qual) = qualifier {
390 let qual_normalized = normalize_cte_name(qual);
392 for src in sources {
393 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
394 if let Some(cols) = resolved_ctes.get(&src.normalized) {
396 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
397 return Some(expanded);
398 }
399 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
401 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
402 return Some(expanded);
403 }
404 }
405 }
406 None
407 } else {
408 let mut any_expanded = false;
414 for src in sources {
415 if let Some(cols) = resolved_ctes.get(&src.normalized) {
416 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
417 any_expanded = true;
418 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
419 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
420 any_expanded = true;
421 } else {
422 return None;
423 }
424 }
425 if any_expanded {
426 Some(expanded)
427 } else {
428 None
429 }
430 }
431}
432
433fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
435 let schema = schema?;
436 if fq_name.is_empty() {
437 return None;
438 }
439 schema
440 .column_names(fq_name)
441 .ok()
442 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
443}
444
445fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
447 Expression::Column(Box::new(crate::expressions::Column {
448 name: Identifier::new(name),
449 table: table.cloned(),
450 join_mark: false,
451 trailing_comments: Vec::new(),
452 span: None,
453 inferred_type: None,
454 }))
455}
456
457fn get_expression_output_name(expr: &Expression) -> Option<String> {
459 match expr {
460 Expression::Alias(a) => Some(a.alias.name.clone()),
461 Expression::Column(c) => Some(c.name.name.clone()),
462 Expression::Identifier(id) => Some(id.name.clone()),
463 Expression::Star(_) => Some("*".to_string()),
464 _ => None,
465 }
466}
467
468struct SourceInfo {
470 alias: String,
471 normalized: String,
473 fq_name: String,
475}
476
477fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
480 let mut sources = Vec::new();
481
482 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
483 match expr {
484 Expression::Table(t) => {
485 let normalized = normalize_cte_name(&t.name);
486 let alias = t
487 .alias
488 .as_ref()
489 .map(|a| a.name.clone())
490 .unwrap_or_else(|| t.name.name.clone());
491 let mut parts = Vec::new();
492 if let Some(catalog) = &t.catalog {
493 parts.push(catalog.name.clone());
494 }
495 if let Some(schema) = &t.schema {
496 parts.push(schema.name.clone());
497 }
498 parts.push(t.name.name.clone());
499 let fq_name = parts.join(".");
500 Some(SourceInfo {
501 alias,
502 normalized,
503 fq_name,
504 })
505 }
506 Expression::Subquery(s) => {
507 let alias = s.alias.as_ref()?.name.clone();
508 let normalized = alias.to_lowercase();
509 let fq_name = alias.clone();
510 Some(SourceInfo {
511 alias,
512 normalized,
513 fq_name,
514 })
515 }
516 Expression::Paren(p) => extract_source(&p.this),
517 _ => None,
518 }
519 }
520
521 if let Some(from) = &select.from {
522 for expr in &from.expressions {
523 if let Some(info) = extract_source(expr) {
524 sources.push(info);
525 }
526 }
527 }
528 for join in &select.joins {
529 if let Some(info) = extract_source(&join.this) {
530 sources.push(info);
531 }
532 }
533 sources
534}
535
536pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
538 let mut tables = HashSet::new();
539 collect_source_tables(node, &mut tables);
540 tables
541}
542
543pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
545 if let Expression::Table(table) = &node.source {
546 tables.insert(table.name.name.clone());
547 }
548 for child in &node.downstream {
549 collect_source_tables(child, tables);
550 }
551}
552
553const MAX_LINEAGE_DEPTH: usize = 64;
560
561fn to_node(
563 column: ColumnRef<'_>,
564 scope: &Scope,
565 dialect: Option<DialectType>,
566 scope_name: &str,
567 source_name: &str,
568 reference_node_name: &str,
569 trim_selects: bool,
570) -> Result<LineageNode> {
571 to_node_inner(
572 column,
573 scope,
574 dialect,
575 scope_name,
576 source_name,
577 reference_node_name,
578 trim_selects,
579 &[],
580 0,
581 )
582}
583
584fn to_node_inner(
585 column: ColumnRef<'_>,
586 scope: &Scope,
587 dialect: Option<DialectType>,
588 scope_name: &str,
589 source_name: &str,
590 reference_node_name: &str,
591 trim_selects: bool,
592 ancestor_cte_scopes: &[Scope],
593 depth: usize,
594) -> Result<LineageNode> {
595 if depth > MAX_LINEAGE_DEPTH {
596 return Err(Error::internal(format!(
597 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
598 )));
599 }
600 let scope_expr = &scope.expression;
601
602 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
604 for s in ancestor_cte_scopes {
605 all_cte_scopes.push(s);
606 }
607
608 let effective_expr = match scope_expr {
611 Expression::Cte(cte) => &cte.this,
612 other => other,
613 };
614
615 if matches!(
617 effective_expr,
618 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
619 ) {
620 if matches!(scope_expr, Expression::Cte(_)) {
622 let mut inner_scope = Scope::new(effective_expr.clone());
623 inner_scope.union_scopes = scope.union_scopes.clone();
624 inner_scope.sources = scope.sources.clone();
625 inner_scope.cte_sources = scope.cte_sources.clone();
626 inner_scope.cte_scopes = scope.cte_scopes.clone();
627 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
628 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
629 return handle_set_operation(
630 &column,
631 &inner_scope,
632 dialect,
633 scope_name,
634 source_name,
635 reference_node_name,
636 trim_selects,
637 ancestor_cte_scopes,
638 depth,
639 );
640 }
641 return handle_set_operation(
642 &column,
643 scope,
644 dialect,
645 scope_name,
646 source_name,
647 reference_node_name,
648 trim_selects,
649 ancestor_cte_scopes,
650 depth,
651 );
652 }
653
654 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
656 let column_name = resolve_column_name(&column, &select_expr);
657
658 let node_source = if trim_selects {
660 trim_source(effective_expr, &select_expr)
661 } else {
662 effective_expr.clone()
663 };
664
665 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
667 node.source_name = source_name.to_string();
668 node.reference_node_name = reference_node_name.to_string();
669
670 if matches!(&select_expr, Expression::Star(_)) {
672 for (name, source_info) in &scope.sources {
673 let child = LineageNode::new(
674 format!("{}.*", name),
675 Expression::Star(crate::expressions::Star {
676 table: None,
677 except: None,
678 replace: None,
679 rename: None,
680 trailing_comments: vec![],
681 span: None,
682 }),
683 source_info.expression.clone(),
684 );
685 node.downstream.push(child);
686 }
687 return Ok(node);
688 }
689
690 let subqueries: Vec<&Expression> =
692 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
693 for sq_expr in subqueries {
694 if let Expression::Subquery(sq) = sq_expr {
695 for sq_scope in &scope.subquery_scopes {
696 if sq_scope.expression == sq.this {
697 if let Ok(child) = to_node_inner(
698 ColumnRef::Index(0),
699 sq_scope,
700 dialect,
701 &column_name,
702 "",
703 "",
704 trim_selects,
705 ancestor_cte_scopes,
706 depth + 1,
707 ) {
708 node.downstream.push(child);
709 }
710 break;
711 }
712 }
713 }
714 }
715
716 let col_refs = find_column_refs_in_expr(&select_expr);
718 for col_ref in col_refs {
719 let col_name = &col_ref.column;
720 if let Some(ref table_id) = col_ref.table {
721 let tbl = &table_id.name;
722 resolve_qualified_column(
723 &mut node,
724 scope,
725 dialect,
726 tbl,
727 col_name,
728 &column_name,
729 trim_selects,
730 &all_cte_scopes,
731 depth,
732 );
733 } else {
734 resolve_unqualified_column(
735 &mut node,
736 scope,
737 dialect,
738 col_name,
739 &column_name,
740 trim_selects,
741 &all_cte_scopes,
742 depth,
743 );
744 }
745 }
746
747 Ok(node)
748}
749
750fn handle_set_operation(
755 column: &ColumnRef<'_>,
756 scope: &Scope,
757 dialect: Option<DialectType>,
758 scope_name: &str,
759 source_name: &str,
760 reference_node_name: &str,
761 trim_selects: bool,
762 ancestor_cte_scopes: &[Scope],
763 depth: usize,
764) -> Result<LineageNode> {
765 let scope_expr = &scope.expression;
766
767 let col_index = match column {
769 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
770 ColumnRef::Index(i) => *i,
771 };
772
773 let col_name = match column {
774 ColumnRef::Name(name) => name.to_string(),
775 ColumnRef::Index(_) => format!("_{col_index}"),
776 };
777
778 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
779 node.source_name = source_name.to_string();
780 node.reference_node_name = reference_node_name.to_string();
781
782 for branch_scope in &scope.union_scopes {
784 if let Ok(child) = to_node_inner(
785 ColumnRef::Index(col_index),
786 branch_scope,
787 dialect,
788 scope_name,
789 "",
790 "",
791 trim_selects,
792 ancestor_cte_scopes,
793 depth + 1,
794 ) {
795 node.downstream.push(child);
796 }
797 }
798
799 Ok(node)
800}
801
802fn resolve_qualified_column(
807 node: &mut LineageNode,
808 scope: &Scope,
809 dialect: Option<DialectType>,
810 table: &str,
811 col_name: &str,
812 parent_name: &str,
813 trim_selects: bool,
814 all_cte_scopes: &[&Scope],
815 depth: usize,
816) {
817 let resolved_cte_name = resolve_cte_alias(scope, table);
820 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
821
822 let is_cte = scope.cte_sources.contains_key(effective_table)
825 || all_cte_scopes.iter().any(
826 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
827 );
828 if is_cte {
829 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
830 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
832 if let Ok(child) = to_node_inner(
833 ColumnRef::Name(col_name),
834 child_scope,
835 dialect,
836 parent_name,
837 effective_table,
838 parent_name,
839 trim_selects,
840 &ancestors,
841 depth + 1,
842 ) {
843 node.downstream.push(child);
844 return;
845 }
846 }
847 }
848
849 if let Some(source_info) = scope.sources.get(table) {
851 if source_info.is_scope {
852 if let Some(child_scope) = find_child_scope(scope, table) {
853 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
854 if let Ok(child) = to_node_inner(
855 ColumnRef::Name(col_name),
856 child_scope,
857 dialect,
858 parent_name,
859 table,
860 parent_name,
861 trim_selects,
862 &ancestors,
863 depth + 1,
864 ) {
865 node.downstream.push(child);
866 return;
867 }
868 }
869 }
870 }
871
872 if let Some(source_info) = scope.sources.get(table) {
875 if !source_info.is_scope {
876 node.downstream.push(make_table_column_node_from_source(
877 table,
878 col_name,
879 &source_info.expression,
880 ));
881 return;
882 }
883 }
884
885 node.downstream
887 .push(make_table_column_node(table, col_name));
888}
889
890fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
896 if scope.cte_sources.contains_key(name) {
898 return None;
899 }
900 if let Some(source_info) = scope.sources.get(name) {
902 if source_info.is_scope {
903 if let Expression::Cte(cte) = &source_info.expression {
904 let cte_name = &cte.alias.name;
905 if scope.cte_sources.contains_key(cte_name) {
906 return Some(cte_name.clone());
907 }
908 }
909 }
910 }
911 None
912}
913
914fn resolve_unqualified_column(
915 node: &mut LineageNode,
916 scope: &Scope,
917 dialect: Option<DialectType>,
918 col_name: &str,
919 parent_name: &str,
920 trim_selects: bool,
921 all_cte_scopes: &[&Scope],
922 depth: usize,
923) {
924 let from_source_names = source_names_from_from_join(scope);
928
929 if from_source_names.len() == 1 {
930 let tbl = &from_source_names[0];
931 resolve_qualified_column(
932 node,
933 scope,
934 dialect,
935 tbl,
936 col_name,
937 parent_name,
938 trim_selects,
939 all_cte_scopes,
940 depth,
941 );
942 return;
943 }
944
945 let child = LineageNode::new(
947 col_name.to_string(),
948 Expression::Column(Box::new(crate::expressions::Column {
949 name: crate::expressions::Identifier::new(col_name.to_string()),
950 table: None,
951 join_mark: false,
952 trailing_comments: vec![],
953 span: None,
954 inferred_type: None,
955 })),
956 node.source.clone(),
957 );
958 node.downstream.push(child);
959}
960
961fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
962 fn source_name(expr: &Expression) -> Option<String> {
963 match expr {
964 Expression::Table(table) => Some(
965 table
966 .alias
967 .as_ref()
968 .map(|a| a.name.clone())
969 .unwrap_or_else(|| table.name.name.clone()),
970 ),
971 Expression::Subquery(subquery) => {
972 subquery.alias.as_ref().map(|alias| alias.name.clone())
973 }
974 Expression::Paren(paren) => source_name(&paren.this),
975 _ => None,
976 }
977 }
978
979 let effective_expr = match &scope.expression {
980 Expression::Cte(cte) => &cte.this,
981 expr => expr,
982 };
983
984 let mut names = Vec::new();
985 let mut seen = std::collections::HashSet::new();
986
987 if let Expression::Select(select) = effective_expr {
988 if let Some(from) = &select.from {
989 for expr in &from.expressions {
990 if let Some(name) = source_name(expr) {
991 if !name.is_empty() && seen.insert(name.clone()) {
992 names.push(name);
993 }
994 }
995 }
996 }
997 for join in &select.joins {
998 if let Some(name) = source_name(&join.this) {
999 if !name.is_empty() && seen.insert(name.clone()) {
1000 names.push(name);
1001 }
1002 }
1003 }
1004 }
1005
1006 names
1007}
1008
1009fn get_alias_or_name(expr: &Expression) -> Option<String> {
1015 match expr {
1016 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1017 Expression::Column(col) => Some(col.name.name.clone()),
1018 Expression::Identifier(id) => Some(id.name.clone()),
1019 Expression::Star(_) => Some("*".to_string()),
1020 Expression::Annotated(a) => get_alias_or_name(&a.this),
1023 _ => None,
1024 }
1025}
1026
1027fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1029 match column {
1030 ColumnRef::Name(n) => n.to_string(),
1031 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1032 }
1033}
1034
1035fn find_select_expr(
1037 scope_expr: &Expression,
1038 column: &ColumnRef<'_>,
1039 dialect: Option<DialectType>,
1040) -> Result<Expression> {
1041 if let Expression::Select(ref select) = scope_expr {
1042 match column {
1043 ColumnRef::Name(name) => {
1044 let normalized_name = normalize_column_name(name, dialect);
1045 for expr in &select.expressions {
1046 if let Some(alias_or_name) = get_alias_or_name(expr) {
1047 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1048 return Ok(expr.clone());
1049 }
1050 }
1051 }
1052 Err(crate::error::Error::parse(
1053 format!("Cannot find column '{}' in query", name),
1054 0,
1055 0,
1056 0,
1057 0,
1058 ))
1059 }
1060 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1061 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1062 }),
1063 }
1064 } else {
1065 Err(crate::error::Error::parse(
1066 "Expected SELECT expression for column lookup",
1067 0,
1068 0,
1069 0,
1070 0,
1071 ))
1072 }
1073}
1074
1075fn column_to_index(
1077 set_op_expr: &Expression,
1078 name: &str,
1079 dialect: Option<DialectType>,
1080) -> Result<usize> {
1081 let normalized_name = normalize_column_name(name, dialect);
1082 let mut expr = set_op_expr;
1083 loop {
1084 match expr {
1085 Expression::Union(u) => expr = &u.left,
1086 Expression::Intersect(i) => expr = &i.left,
1087 Expression::Except(e) => expr = &e.left,
1088 Expression::Select(select) => {
1089 for (i, e) in select.expressions.iter().enumerate() {
1090 if let Some(alias_or_name) = get_alias_or_name(e) {
1091 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1092 return Ok(i);
1093 }
1094 }
1095 }
1096 return Err(crate::error::Error::parse(
1097 format!("Cannot find column '{}' in set operation", name),
1098 0,
1099 0,
1100 0,
1101 0,
1102 ));
1103 }
1104 _ => {
1105 return Err(crate::error::Error::parse(
1106 "Expected SELECT or set operation",
1107 0,
1108 0,
1109 0,
1110 0,
1111 ))
1112 }
1113 }
1114 }
1115}
1116
1117fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1118 normalize_name(name, dialect, false, true)
1119}
1120
1121fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1123 if let Expression::Select(select) = select_expr {
1124 let mut trimmed = select.as_ref().clone();
1125 trimmed.expressions = vec![target_expr.clone()];
1126 Expression::Select(Box::new(trimmed))
1127 } else {
1128 select_expr.clone()
1129 }
1130}
1131
1132fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1134 if scope.cte_sources.contains_key(source_name) {
1136 for cte_scope in &scope.cte_scopes {
1137 if let Expression::Cte(cte) = &cte_scope.expression {
1138 if cte.alias.name == source_name {
1139 return Some(cte_scope);
1140 }
1141 }
1142 }
1143 }
1144
1145 if let Some(source_info) = scope.sources.get(source_name) {
1147 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1148 if let Expression::Subquery(sq) = &source_info.expression {
1149 for dt_scope in &scope.derived_table_scopes {
1150 if dt_scope.expression == sq.this {
1151 return Some(dt_scope);
1152 }
1153 }
1154 }
1155 }
1156 }
1157
1158 None
1159}
1160
1161fn find_child_scope_in<'a>(
1165 all_cte_scopes: &[&'a Scope],
1166 scope: &'a Scope,
1167 source_name: &str,
1168) -> Option<&'a Scope> {
1169 for cte_scope in &scope.cte_scopes {
1171 if let Expression::Cte(cte) = &cte_scope.expression {
1172 if cte.alias.name == source_name {
1173 return Some(cte_scope);
1174 }
1175 }
1176 }
1177
1178 for cte_scope in all_cte_scopes {
1180 if let Expression::Cte(cte) = &cte_scope.expression {
1181 if cte.alias.name == source_name {
1182 return Some(cte_scope);
1183 }
1184 }
1185 }
1186
1187 if let Some(source_info) = scope.sources.get(source_name) {
1189 if source_info.is_scope {
1190 if let Expression::Subquery(sq) = &source_info.expression {
1191 for dt_scope in &scope.derived_table_scopes {
1192 if dt_scope.expression == sq.this {
1193 return Some(dt_scope);
1194 }
1195 }
1196 }
1197 }
1198 }
1199
1200 None
1201}
1202
1203fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1205 let mut node = LineageNode::new(
1206 format!("{}.{}", table, column),
1207 Expression::Column(Box::new(crate::expressions::Column {
1208 name: crate::expressions::Identifier::new(column.to_string()),
1209 table: Some(crate::expressions::Identifier::new(table.to_string())),
1210 join_mark: false,
1211 trailing_comments: vec![],
1212 span: None,
1213 inferred_type: None,
1214 })),
1215 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1216 );
1217 node.source_name = table.to_string();
1218 node
1219}
1220
1221fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1222 let mut parts: Vec<String> = Vec::new();
1223 if let Some(catalog) = &table_ref.catalog {
1224 parts.push(catalog.name.clone());
1225 }
1226 if let Some(schema) = &table_ref.schema {
1227 parts.push(schema.name.clone());
1228 }
1229 parts.push(table_ref.name.name.clone());
1230 parts.join(".")
1231}
1232
1233fn make_table_column_node_from_source(
1234 table_alias: &str,
1235 column: &str,
1236 source: &Expression,
1237) -> LineageNode {
1238 let mut node = LineageNode::new(
1239 format!("{}.{}", table_alias, column),
1240 Expression::Column(Box::new(crate::expressions::Column {
1241 name: crate::expressions::Identifier::new(column.to_string()),
1242 table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
1243 join_mark: false,
1244 trailing_comments: vec![],
1245 span: None,
1246 inferred_type: None,
1247 })),
1248 source.clone(),
1249 );
1250
1251 if let Expression::Table(table_ref) = source {
1252 node.source_name = table_name_from_table_ref(table_ref);
1253 } else {
1254 node.source_name = table_alias.to_string();
1255 }
1256
1257 node
1258}
1259
1260#[derive(Debug, Clone)]
1262struct SimpleColumnRef {
1263 table: Option<crate::expressions::Identifier>,
1264 column: String,
1265}
1266
1267fn find_column_refs_in_expr(expr: &Expression) -> Vec<SimpleColumnRef> {
1269 let mut refs = Vec::new();
1270 collect_column_refs(expr, &mut refs);
1271 refs
1272}
1273
1274fn collect_column_refs(expr: &Expression, refs: &mut Vec<SimpleColumnRef>) {
1275 let mut stack: Vec<&Expression> = vec![expr];
1276
1277 while let Some(current) = stack.pop() {
1278 match current {
1279 Expression::Column(col) => {
1281 refs.push(SimpleColumnRef {
1282 table: col.table.clone(),
1283 column: col.name.name.clone(),
1284 });
1285 }
1286
1287 Expression::Subquery(_) | Expression::Exists(_) => {}
1289
1290 Expression::And(op)
1292 | Expression::Or(op)
1293 | Expression::Eq(op)
1294 | Expression::Neq(op)
1295 | Expression::Lt(op)
1296 | Expression::Lte(op)
1297 | Expression::Gt(op)
1298 | Expression::Gte(op)
1299 | Expression::Add(op)
1300 | Expression::Sub(op)
1301 | Expression::Mul(op)
1302 | Expression::Div(op)
1303 | Expression::Mod(op)
1304 | Expression::BitwiseAnd(op)
1305 | Expression::BitwiseOr(op)
1306 | Expression::BitwiseXor(op)
1307 | Expression::BitwiseLeftShift(op)
1308 | Expression::BitwiseRightShift(op)
1309 | Expression::Concat(op)
1310 | Expression::Adjacent(op)
1311 | Expression::TsMatch(op)
1312 | Expression::PropertyEQ(op)
1313 | Expression::ArrayContainsAll(op)
1314 | Expression::ArrayContainedBy(op)
1315 | Expression::ArrayOverlaps(op)
1316 | Expression::JSONBContainsAllTopKeys(op)
1317 | Expression::JSONBContainsAnyTopKeys(op)
1318 | Expression::JSONBDeleteAtPath(op)
1319 | Expression::ExtendsLeft(op)
1320 | Expression::ExtendsRight(op)
1321 | Expression::Is(op)
1322 | Expression::MemberOf(op)
1323 | Expression::NullSafeEq(op)
1324 | Expression::NullSafeNeq(op)
1325 | Expression::Glob(op)
1326 | Expression::Match(op) => {
1327 stack.push(&op.left);
1328 stack.push(&op.right);
1329 }
1330
1331 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1333 stack.push(&u.this);
1334 }
1335
1336 Expression::Upper(f)
1338 | Expression::Lower(f)
1339 | Expression::Length(f)
1340 | Expression::LTrim(f)
1341 | Expression::RTrim(f)
1342 | Expression::Reverse(f)
1343 | Expression::Abs(f)
1344 | Expression::Sqrt(f)
1345 | Expression::Cbrt(f)
1346 | Expression::Ln(f)
1347 | Expression::Exp(f)
1348 | Expression::Sign(f)
1349 | Expression::Date(f)
1350 | Expression::Time(f)
1351 | Expression::DateFromUnixDate(f)
1352 | Expression::UnixDate(f)
1353 | Expression::UnixSeconds(f)
1354 | Expression::UnixMillis(f)
1355 | Expression::UnixMicros(f)
1356 | Expression::TimeStrToDate(f)
1357 | Expression::DateToDi(f)
1358 | Expression::DiToDate(f)
1359 | Expression::TsOrDiToDi(f)
1360 | Expression::TsOrDsToDatetime(f)
1361 | Expression::TsOrDsToTimestamp(f)
1362 | Expression::YearOfWeek(f)
1363 | Expression::YearOfWeekIso(f)
1364 | Expression::Initcap(f)
1365 | Expression::Ascii(f)
1366 | Expression::Chr(f)
1367 | Expression::Soundex(f)
1368 | Expression::ByteLength(f)
1369 | Expression::Hex(f)
1370 | Expression::LowerHex(f)
1371 | Expression::Unicode(f)
1372 | Expression::Radians(f)
1373 | Expression::Degrees(f)
1374 | Expression::Sin(f)
1375 | Expression::Cos(f)
1376 | Expression::Tan(f)
1377 | Expression::Asin(f)
1378 | Expression::Acos(f)
1379 | Expression::Atan(f)
1380 | Expression::IsNan(f)
1381 | Expression::IsInf(f)
1382 | Expression::ArrayLength(f)
1383 | Expression::ArraySize(f)
1384 | Expression::Cardinality(f)
1385 | Expression::ArrayReverse(f)
1386 | Expression::ArrayDistinct(f)
1387 | Expression::ArrayFlatten(f)
1388 | Expression::ArrayCompact(f)
1389 | Expression::Explode(f)
1390 | Expression::ExplodeOuter(f)
1391 | Expression::ToArray(f)
1392 | Expression::MapFromEntries(f)
1393 | Expression::MapKeys(f)
1394 | Expression::MapValues(f)
1395 | Expression::JsonArrayLength(f)
1396 | Expression::JsonKeys(f)
1397 | Expression::JsonType(f)
1398 | Expression::ParseJson(f)
1399 | Expression::ToJson(f)
1400 | Expression::Typeof(f)
1401 | Expression::BitwiseCount(f)
1402 | Expression::Year(f)
1403 | Expression::Month(f)
1404 | Expression::Day(f)
1405 | Expression::Hour(f)
1406 | Expression::Minute(f)
1407 | Expression::Second(f)
1408 | Expression::DayOfWeek(f)
1409 | Expression::DayOfWeekIso(f)
1410 | Expression::DayOfMonth(f)
1411 | Expression::DayOfYear(f)
1412 | Expression::WeekOfYear(f)
1413 | Expression::Quarter(f)
1414 | Expression::Epoch(f)
1415 | Expression::EpochMs(f)
1416 | Expression::TimeStrToUnix(f)
1417 | Expression::SHA(f)
1418 | Expression::SHA1Digest(f)
1419 | Expression::TimeToUnix(f)
1420 | Expression::JSONBool(f)
1421 | Expression::Int64(f)
1422 | Expression::MD5NumberLower64(f)
1423 | Expression::MD5NumberUpper64(f)
1424 | Expression::DateStrToDate(f)
1425 | Expression::DateToDateStr(f) => {
1426 stack.push(&f.this);
1427 }
1428
1429 Expression::Power(f)
1431 | Expression::NullIf(f)
1432 | Expression::IfNull(f)
1433 | Expression::Nvl(f)
1434 | Expression::UnixToTimeStr(f)
1435 | Expression::Contains(f)
1436 | Expression::StartsWith(f)
1437 | Expression::EndsWith(f)
1438 | Expression::Levenshtein(f)
1439 | Expression::ModFunc(f)
1440 | Expression::Atan2(f)
1441 | Expression::IntDiv(f)
1442 | Expression::AddMonths(f)
1443 | Expression::MonthsBetween(f)
1444 | Expression::NextDay(f)
1445 | Expression::ArrayContains(f)
1446 | Expression::ArrayPosition(f)
1447 | Expression::ArrayAppend(f)
1448 | Expression::ArrayPrepend(f)
1449 | Expression::ArrayUnion(f)
1450 | Expression::ArrayExcept(f)
1451 | Expression::ArrayRemove(f)
1452 | Expression::StarMap(f)
1453 | Expression::MapFromArrays(f)
1454 | Expression::MapContainsKey(f)
1455 | Expression::ElementAt(f)
1456 | Expression::JsonMergePatch(f)
1457 | Expression::JSONBContains(f)
1458 | Expression::JSONBExtract(f) => {
1459 stack.push(&f.this);
1460 stack.push(&f.expression);
1461 }
1462
1463 Expression::Greatest(f)
1465 | Expression::Least(f)
1466 | Expression::Coalesce(f)
1467 | Expression::ArrayConcat(f)
1468 | Expression::ArrayIntersect(f)
1469 | Expression::ArrayZip(f)
1470 | Expression::MapConcat(f)
1471 | Expression::JsonArray(f) => {
1472 for e in &f.expressions {
1473 stack.push(e);
1474 }
1475 }
1476
1477 Expression::Sum(f)
1479 | Expression::Avg(f)
1480 | Expression::Min(f)
1481 | Expression::Max(f)
1482 | Expression::ArrayAgg(f)
1483 | Expression::CountIf(f)
1484 | Expression::Stddev(f)
1485 | Expression::StddevPop(f)
1486 | Expression::StddevSamp(f)
1487 | Expression::Variance(f)
1488 | Expression::VarPop(f)
1489 | Expression::VarSamp(f)
1490 | Expression::Median(f)
1491 | Expression::Mode(f)
1492 | Expression::First(f)
1493 | Expression::Last(f)
1494 | Expression::AnyValue(f)
1495 | Expression::ApproxDistinct(f)
1496 | Expression::ApproxCountDistinct(f)
1497 | Expression::LogicalAnd(f)
1498 | Expression::LogicalOr(f)
1499 | Expression::Skewness(f)
1500 | Expression::ArrayConcatAgg(f)
1501 | Expression::ArrayUniqueAgg(f)
1502 | Expression::BoolXorAgg(f)
1503 | Expression::BitwiseAndAgg(f)
1504 | Expression::BitwiseOrAgg(f)
1505 | Expression::BitwiseXorAgg(f) => {
1506 stack.push(&f.this);
1507 if let Some(ref filter) = f.filter {
1508 stack.push(filter);
1509 }
1510 if let Some((ref expr, _)) = f.having_max {
1511 stack.push(expr);
1512 }
1513 if let Some(ref limit) = f.limit {
1514 stack.push(limit);
1515 }
1516 }
1517
1518 Expression::Function(func) => {
1520 for arg in &func.args {
1521 stack.push(arg);
1522 }
1523 }
1524 Expression::AggregateFunction(func) => {
1525 for arg in &func.args {
1526 stack.push(arg);
1527 }
1528 if let Some(ref filter) = func.filter {
1529 stack.push(filter);
1530 }
1531 if let Some(ref limit) = func.limit {
1532 stack.push(limit);
1533 }
1534 }
1535
1536 Expression::WindowFunction(wf) => {
1538 stack.push(&wf.this);
1539 }
1540
1541 Expression::Alias(a) => {
1543 stack.push(&a.this);
1544 }
1545 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1546 stack.push(&c.this);
1547 if let Some(ref fmt) = c.format {
1548 stack.push(fmt);
1549 }
1550 if let Some(ref def) = c.default {
1551 stack.push(def);
1552 }
1553 }
1554 Expression::Paren(p) => {
1555 stack.push(&p.this);
1556 }
1557 Expression::Annotated(a) => {
1558 stack.push(&a.this);
1559 }
1560 Expression::Case(case) => {
1561 if let Some(ref operand) = case.operand {
1562 stack.push(operand);
1563 }
1564 for (cond, result) in &case.whens {
1565 stack.push(cond);
1566 stack.push(result);
1567 }
1568 if let Some(ref else_expr) = case.else_ {
1569 stack.push(else_expr);
1570 }
1571 }
1572 Expression::Collation(c) => {
1573 stack.push(&c.this);
1574 }
1575 Expression::In(i) => {
1576 stack.push(&i.this);
1577 for e in &i.expressions {
1578 stack.push(e);
1579 }
1580 if let Some(ref q) = i.query {
1581 stack.push(q);
1582 }
1583 if let Some(ref u) = i.unnest {
1584 stack.push(u);
1585 }
1586 }
1587 Expression::Between(b) => {
1588 stack.push(&b.this);
1589 stack.push(&b.low);
1590 stack.push(&b.high);
1591 }
1592 Expression::IsNull(n) => {
1593 stack.push(&n.this);
1594 }
1595 Expression::IsTrue(t) | Expression::IsFalse(t) => {
1596 stack.push(&t.this);
1597 }
1598 Expression::IsJson(j) => {
1599 stack.push(&j.this);
1600 }
1601 Expression::Like(l) | Expression::ILike(l) => {
1602 stack.push(&l.left);
1603 stack.push(&l.right);
1604 if let Some(ref esc) = l.escape {
1605 stack.push(esc);
1606 }
1607 }
1608 Expression::SimilarTo(s) => {
1609 stack.push(&s.this);
1610 stack.push(&s.pattern);
1611 if let Some(ref esc) = s.escape {
1612 stack.push(esc);
1613 }
1614 }
1615 Expression::Ordered(o) => {
1616 stack.push(&o.this);
1617 }
1618 Expression::Array(a) => {
1619 for e in &a.expressions {
1620 stack.push(e);
1621 }
1622 }
1623 Expression::Tuple(t) => {
1624 for e in &t.expressions {
1625 stack.push(e);
1626 }
1627 }
1628 Expression::Struct(s) => {
1629 for (_, e) in &s.fields {
1630 stack.push(e);
1631 }
1632 }
1633 Expression::Subscript(s) => {
1634 stack.push(&s.this);
1635 stack.push(&s.index);
1636 }
1637 Expression::Dot(d) => {
1638 stack.push(&d.this);
1639 }
1640 Expression::MethodCall(m) => {
1641 stack.push(&m.this);
1642 for arg in &m.args {
1643 stack.push(arg);
1644 }
1645 }
1646 Expression::ArraySlice(s) => {
1647 stack.push(&s.this);
1648 if let Some(ref start) = s.start {
1649 stack.push(start);
1650 }
1651 if let Some(ref end) = s.end {
1652 stack.push(end);
1653 }
1654 }
1655 Expression::Lambda(l) => {
1656 stack.push(&l.body);
1657 }
1658 Expression::NamedArgument(n) => {
1659 stack.push(&n.value);
1660 }
1661 Expression::TryCatch(t) => {
1662 for stmt in &t.try_body {
1663 stack.push(stmt);
1664 }
1665 if let Some(catch_body) = &t.catch_body {
1666 for stmt in catch_body {
1667 stack.push(stmt);
1668 }
1669 }
1670 }
1671 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1672 stack.push(e);
1673 }
1674
1675 Expression::Substring(f) => {
1677 stack.push(&f.this);
1678 stack.push(&f.start);
1679 if let Some(ref len) = f.length {
1680 stack.push(len);
1681 }
1682 }
1683 Expression::Trim(f) => {
1684 stack.push(&f.this);
1685 if let Some(ref chars) = f.characters {
1686 stack.push(chars);
1687 }
1688 }
1689 Expression::Replace(f) => {
1690 stack.push(&f.this);
1691 stack.push(&f.old);
1692 stack.push(&f.new);
1693 }
1694 Expression::IfFunc(f) => {
1695 stack.push(&f.condition);
1696 stack.push(&f.true_value);
1697 if let Some(ref fv) = f.false_value {
1698 stack.push(fv);
1699 }
1700 }
1701 Expression::Nvl2(f) => {
1702 stack.push(&f.this);
1703 stack.push(&f.true_value);
1704 stack.push(&f.false_value);
1705 }
1706 Expression::ConcatWs(f) => {
1707 stack.push(&f.separator);
1708 for e in &f.expressions {
1709 stack.push(e);
1710 }
1711 }
1712 Expression::Count(f) => {
1713 if let Some(ref this) = f.this {
1714 stack.push(this);
1715 }
1716 if let Some(ref filter) = f.filter {
1717 stack.push(filter);
1718 }
1719 }
1720 Expression::GroupConcat(f) => {
1721 stack.push(&f.this);
1722 if let Some(ref sep) = f.separator {
1723 stack.push(sep);
1724 }
1725 if let Some(ref filter) = f.filter {
1726 stack.push(filter);
1727 }
1728 }
1729 Expression::StringAgg(f) => {
1730 stack.push(&f.this);
1731 if let Some(ref sep) = f.separator {
1732 stack.push(sep);
1733 }
1734 if let Some(ref filter) = f.filter {
1735 stack.push(filter);
1736 }
1737 if let Some(ref limit) = f.limit {
1738 stack.push(limit);
1739 }
1740 }
1741 Expression::ListAgg(f) => {
1742 stack.push(&f.this);
1743 if let Some(ref sep) = f.separator {
1744 stack.push(sep);
1745 }
1746 if let Some(ref filter) = f.filter {
1747 stack.push(filter);
1748 }
1749 }
1750 Expression::SumIf(f) => {
1751 stack.push(&f.this);
1752 stack.push(&f.condition);
1753 if let Some(ref filter) = f.filter {
1754 stack.push(filter);
1755 }
1756 }
1757 Expression::DateAdd(f) | Expression::DateSub(f) => {
1758 stack.push(&f.this);
1759 stack.push(&f.interval);
1760 }
1761 Expression::DateDiff(f) => {
1762 stack.push(&f.this);
1763 stack.push(&f.expression);
1764 }
1765 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1766 stack.push(&f.this);
1767 }
1768 Expression::Extract(f) => {
1769 stack.push(&f.this);
1770 }
1771 Expression::Round(f) => {
1772 stack.push(&f.this);
1773 if let Some(ref d) = f.decimals {
1774 stack.push(d);
1775 }
1776 }
1777 Expression::Floor(f) => {
1778 stack.push(&f.this);
1779 if let Some(ref s) = f.scale {
1780 stack.push(s);
1781 }
1782 if let Some(ref t) = f.to {
1783 stack.push(t);
1784 }
1785 }
1786 Expression::Ceil(f) => {
1787 stack.push(&f.this);
1788 if let Some(ref d) = f.decimals {
1789 stack.push(d);
1790 }
1791 if let Some(ref t) = f.to {
1792 stack.push(t);
1793 }
1794 }
1795 Expression::Log(f) => {
1796 stack.push(&f.this);
1797 if let Some(ref b) = f.base {
1798 stack.push(b);
1799 }
1800 }
1801 Expression::AtTimeZone(f) => {
1802 stack.push(&f.this);
1803 stack.push(&f.zone);
1804 }
1805 Expression::Lead(f) | Expression::Lag(f) => {
1806 stack.push(&f.this);
1807 if let Some(ref off) = f.offset {
1808 stack.push(off);
1809 }
1810 if let Some(ref def) = f.default {
1811 stack.push(def);
1812 }
1813 }
1814 Expression::FirstValue(f) | Expression::LastValue(f) => {
1815 stack.push(&f.this);
1816 }
1817 Expression::NthValue(f) => {
1818 stack.push(&f.this);
1819 stack.push(&f.offset);
1820 }
1821 Expression::Position(f) => {
1822 stack.push(&f.substring);
1823 stack.push(&f.string);
1824 if let Some(ref start) = f.start {
1825 stack.push(start);
1826 }
1827 }
1828 Expression::Decode(f) => {
1829 stack.push(&f.this);
1830 for (search, result) in &f.search_results {
1831 stack.push(search);
1832 stack.push(result);
1833 }
1834 if let Some(ref def) = f.default {
1835 stack.push(def);
1836 }
1837 }
1838 Expression::CharFunc(f) => {
1839 for arg in &f.args {
1840 stack.push(arg);
1841 }
1842 }
1843 Expression::ArraySort(f) => {
1844 stack.push(&f.this);
1845 if let Some(ref cmp) = f.comparator {
1846 stack.push(cmp);
1847 }
1848 }
1849 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1850 stack.push(&f.this);
1851 stack.push(&f.separator);
1852 if let Some(ref nr) = f.null_replacement {
1853 stack.push(nr);
1854 }
1855 }
1856 Expression::ArrayFilter(f) => {
1857 stack.push(&f.this);
1858 stack.push(&f.filter);
1859 }
1860 Expression::ArrayTransform(f) => {
1861 stack.push(&f.this);
1862 stack.push(&f.transform);
1863 }
1864 Expression::Sequence(f)
1865 | Expression::Generate(f)
1866 | Expression::ExplodingGenerateSeries(f) => {
1867 stack.push(&f.start);
1868 stack.push(&f.stop);
1869 if let Some(ref step) = f.step {
1870 stack.push(step);
1871 }
1872 }
1873 Expression::JsonExtract(f)
1874 | Expression::JsonExtractScalar(f)
1875 | Expression::JsonQuery(f)
1876 | Expression::JsonValue(f) => {
1877 stack.push(&f.this);
1878 stack.push(&f.path);
1879 }
1880 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1881 stack.push(&f.this);
1882 for p in &f.paths {
1883 stack.push(p);
1884 }
1885 }
1886 Expression::JsonObject(f) => {
1887 for (k, v) in &f.pairs {
1888 stack.push(k);
1889 stack.push(v);
1890 }
1891 }
1892 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1893 stack.push(&f.this);
1894 for (path, val) in &f.path_values {
1895 stack.push(path);
1896 stack.push(val);
1897 }
1898 }
1899 Expression::Overlay(f) => {
1900 stack.push(&f.this);
1901 stack.push(&f.replacement);
1902 stack.push(&f.from);
1903 if let Some(ref len) = f.length {
1904 stack.push(len);
1905 }
1906 }
1907 Expression::Convert(f) => {
1908 stack.push(&f.this);
1909 if let Some(ref style) = f.style {
1910 stack.push(style);
1911 }
1912 }
1913 Expression::ApproxPercentile(f) => {
1914 stack.push(&f.this);
1915 stack.push(&f.percentile);
1916 if let Some(ref acc) = f.accuracy {
1917 stack.push(acc);
1918 }
1919 if let Some(ref filter) = f.filter {
1920 stack.push(filter);
1921 }
1922 }
1923 Expression::Percentile(f)
1924 | Expression::PercentileCont(f)
1925 | Expression::PercentileDisc(f) => {
1926 stack.push(&f.this);
1927 stack.push(&f.percentile);
1928 if let Some(ref filter) = f.filter {
1929 stack.push(filter);
1930 }
1931 }
1932 Expression::WithinGroup(f) => {
1933 stack.push(&f.this);
1934 }
1935 Expression::Left(f) | Expression::Right(f) => {
1936 stack.push(&f.this);
1937 stack.push(&f.length);
1938 }
1939 Expression::Repeat(f) => {
1940 stack.push(&f.this);
1941 stack.push(&f.times);
1942 }
1943 Expression::Lpad(f) | Expression::Rpad(f) => {
1944 stack.push(&f.this);
1945 stack.push(&f.length);
1946 if let Some(ref fill) = f.fill {
1947 stack.push(fill);
1948 }
1949 }
1950 Expression::Split(f) => {
1951 stack.push(&f.this);
1952 stack.push(&f.delimiter);
1953 }
1954 Expression::RegexpLike(f) => {
1955 stack.push(&f.this);
1956 stack.push(&f.pattern);
1957 if let Some(ref flags) = f.flags {
1958 stack.push(flags);
1959 }
1960 }
1961 Expression::RegexpReplace(f) => {
1962 stack.push(&f.this);
1963 stack.push(&f.pattern);
1964 stack.push(&f.replacement);
1965 if let Some(ref flags) = f.flags {
1966 stack.push(flags);
1967 }
1968 }
1969 Expression::RegexpExtract(f) => {
1970 stack.push(&f.this);
1971 stack.push(&f.pattern);
1972 if let Some(ref group) = f.group {
1973 stack.push(group);
1974 }
1975 }
1976 Expression::ToDate(f) => {
1977 stack.push(&f.this);
1978 if let Some(ref fmt) = f.format {
1979 stack.push(fmt);
1980 }
1981 }
1982 Expression::ToTimestamp(f) => {
1983 stack.push(&f.this);
1984 if let Some(ref fmt) = f.format {
1985 stack.push(fmt);
1986 }
1987 }
1988 Expression::DateFormat(f) | Expression::FormatDate(f) => {
1989 stack.push(&f.this);
1990 stack.push(&f.format);
1991 }
1992 Expression::LastDay(f) => {
1993 stack.push(&f.this);
1994 }
1995 Expression::FromUnixtime(f) => {
1996 stack.push(&f.this);
1997 if let Some(ref fmt) = f.format {
1998 stack.push(fmt);
1999 }
2000 }
2001 Expression::UnixTimestamp(f) => {
2002 if let Some(ref this) = f.this {
2003 stack.push(this);
2004 }
2005 if let Some(ref fmt) = f.format {
2006 stack.push(fmt);
2007 }
2008 }
2009 Expression::MakeDate(f) => {
2010 stack.push(&f.year);
2011 stack.push(&f.month);
2012 stack.push(&f.day);
2013 }
2014 Expression::MakeTimestamp(f) => {
2015 stack.push(&f.year);
2016 stack.push(&f.month);
2017 stack.push(&f.day);
2018 stack.push(&f.hour);
2019 stack.push(&f.minute);
2020 stack.push(&f.second);
2021 if let Some(ref tz) = f.timezone {
2022 stack.push(tz);
2023 }
2024 }
2025 Expression::TruncFunc(f) => {
2026 stack.push(&f.this);
2027 if let Some(ref d) = f.decimals {
2028 stack.push(d);
2029 }
2030 }
2031 Expression::ArrayFunc(f) => {
2032 for e in &f.expressions {
2033 stack.push(e);
2034 }
2035 }
2036 Expression::Unnest(f) => {
2037 stack.push(&f.this);
2038 for e in &f.expressions {
2039 stack.push(e);
2040 }
2041 }
2042 Expression::StructFunc(f) => {
2043 for (_, e) in &f.fields {
2044 stack.push(e);
2045 }
2046 }
2047 Expression::StructExtract(f) => {
2048 stack.push(&f.this);
2049 }
2050 Expression::NamedStruct(f) => {
2051 for (k, v) in &f.pairs {
2052 stack.push(k);
2053 stack.push(v);
2054 }
2055 }
2056 Expression::MapFunc(f) => {
2057 for k in &f.keys {
2058 stack.push(k);
2059 }
2060 for v in &f.values {
2061 stack.push(v);
2062 }
2063 }
2064 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2065 stack.push(&f.this);
2066 stack.push(&f.transform);
2067 }
2068 Expression::JsonArrayAgg(f) => {
2069 stack.push(&f.this);
2070 if let Some(ref filter) = f.filter {
2071 stack.push(filter);
2072 }
2073 }
2074 Expression::JsonObjectAgg(f) => {
2075 stack.push(&f.key);
2076 stack.push(&f.value);
2077 if let Some(ref filter) = f.filter {
2078 stack.push(filter);
2079 }
2080 }
2081 Expression::NTile(f) => {
2082 if let Some(ref n) = f.num_buckets {
2083 stack.push(n);
2084 }
2085 }
2086 Expression::Rand(f) => {
2087 if let Some(ref s) = f.seed {
2088 stack.push(s);
2089 }
2090 if let Some(ref lo) = f.lower {
2091 stack.push(lo);
2092 }
2093 if let Some(ref hi) = f.upper {
2094 stack.push(hi);
2095 }
2096 }
2097 Expression::Any(q) | Expression::All(q) => {
2098 stack.push(&q.this);
2099 stack.push(&q.subquery);
2100 }
2101 Expression::Overlaps(o) => {
2102 if let Some(ref this) = o.this {
2103 stack.push(this);
2104 }
2105 if let Some(ref expr) = o.expression {
2106 stack.push(expr);
2107 }
2108 if let Some(ref ls) = o.left_start {
2109 stack.push(ls);
2110 }
2111 if let Some(ref le) = o.left_end {
2112 stack.push(le);
2113 }
2114 if let Some(ref rs) = o.right_start {
2115 stack.push(rs);
2116 }
2117 if let Some(ref re) = o.right_end {
2118 stack.push(re);
2119 }
2120 }
2121 Expression::Interval(i) => {
2122 if let Some(ref this) = i.this {
2123 stack.push(this);
2124 }
2125 }
2126 Expression::TimeStrToTime(f) => {
2127 stack.push(&f.this);
2128 if let Some(ref zone) = f.zone {
2129 stack.push(zone);
2130 }
2131 }
2132 Expression::JSONBExtractScalar(f) => {
2133 stack.push(&f.this);
2134 stack.push(&f.expression);
2135 if let Some(ref jt) = f.json_type {
2136 stack.push(jt);
2137 }
2138 }
2139
2140 _ => {}
2145 }
2146 }
2147}
2148
2149#[cfg(test)]
2154mod tests {
2155 use super::*;
2156 use crate::dialects::{Dialect, DialectType};
2157 use crate::expressions::DataType;
2158 use crate::optimizer::annotate_types::annotate_types;
2159 use crate::parse_one;
2160 use crate::schema::{MappingSchema, Schema};
2161
2162 fn parse(sql: &str) -> Expression {
2163 let dialect = Dialect::get(DialectType::Generic);
2164 let ast = dialect.parse(sql).unwrap();
2165 ast.into_iter().next().unwrap()
2166 }
2167
2168 #[test]
2169 fn test_simple_lineage() {
2170 let expr = parse("SELECT a FROM t");
2171 let node = lineage("a", &expr, None, false).unwrap();
2172
2173 assert_eq!(node.name, "a");
2174 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2175 let names = node.downstream_names();
2177 assert!(
2178 names.iter().any(|n| n == "t.a"),
2179 "Expected t.a in downstream, got: {:?}",
2180 names
2181 );
2182 }
2183
2184 #[test]
2185 fn test_lineage_walk() {
2186 let root = LineageNode {
2187 name: "col_a".to_string(),
2188 expression: Expression::Null(crate::expressions::Null),
2189 source: Expression::Null(crate::expressions::Null),
2190 downstream: vec![LineageNode::new(
2191 "t.a",
2192 Expression::Null(crate::expressions::Null),
2193 Expression::Null(crate::expressions::Null),
2194 )],
2195 source_name: String::new(),
2196 reference_node_name: String::new(),
2197 };
2198
2199 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2200 assert_eq!(names.len(), 2);
2201 assert_eq!(names[0], "col_a");
2202 assert_eq!(names[1], "t.a");
2203 }
2204
2205 #[test]
2206 fn test_aliased_column() {
2207 let expr = parse("SELECT a + 1 AS b FROM t");
2208 let node = lineage("b", &expr, None, false).unwrap();
2209
2210 assert_eq!(node.name, "b");
2211 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2213 assert!(
2214 all_names.iter().any(|n| n.contains("a")),
2215 "Expected to trace to column a, got: {:?}",
2216 all_names
2217 );
2218 }
2219
2220 #[test]
2221 fn test_qualified_column() {
2222 let expr = parse("SELECT t.a FROM t");
2223 let node = lineage("a", &expr, None, false).unwrap();
2224
2225 assert_eq!(node.name, "a");
2226 let names = node.downstream_names();
2227 assert!(
2228 names.iter().any(|n| n == "t.a"),
2229 "Expected t.a, got: {:?}",
2230 names
2231 );
2232 }
2233
2234 #[test]
2235 fn test_unqualified_column() {
2236 let expr = parse("SELECT a FROM t");
2237 let node = lineage("a", &expr, None, false).unwrap();
2238
2239 let names = node.downstream_names();
2241 assert!(
2242 names.iter().any(|n| n == "t.a"),
2243 "Expected t.a, got: {:?}",
2244 names
2245 );
2246 }
2247
2248 #[test]
2249 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2250 let query = "SELECT name FROM users";
2251 let dialect = Dialect::get(DialectType::BigQuery);
2252 let expr = dialect
2253 .parse(query)
2254 .unwrap()
2255 .into_iter()
2256 .next()
2257 .expect("expected one expression");
2258
2259 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2260 schema
2261 .add_table("users", &[("name".into(), DataType::Text)], None)
2262 .expect("schema setup");
2263
2264 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2265 .expect("lineage without schema");
2266 let mut expr_without = node_without_schema.expression.clone();
2267 annotate_types(
2268 &mut expr_without,
2269 Some(&schema),
2270 Some(DialectType::BigQuery),
2271 );
2272 assert_eq!(
2273 expr_without.inferred_type(),
2274 None,
2275 "Expected unresolved root type without schema-aware lineage qualification"
2276 );
2277
2278 let node_with_schema = lineage_with_schema(
2279 "name",
2280 &expr,
2281 Some(&schema),
2282 Some(DialectType::BigQuery),
2283 false,
2284 )
2285 .expect("lineage with schema");
2286 let mut expr_with = node_with_schema.expression.clone();
2287 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2288
2289 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2290 }
2291
2292 #[test]
2293 fn test_lineage_with_schema_correlated_scalar_subquery() {
2294 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2295 let dialect = Dialect::get(DialectType::BigQuery);
2296 let expr = dialect
2297 .parse(query)
2298 .unwrap()
2299 .into_iter()
2300 .next()
2301 .expect("expected one expression");
2302
2303 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2304 schema
2305 .add_table(
2306 "t1",
2307 &[("id".into(), DataType::BigInt { length: None })],
2308 None,
2309 )
2310 .expect("schema setup");
2311 schema
2312 .add_table(
2313 "t2",
2314 &[
2315 ("id".into(), DataType::BigInt { length: None }),
2316 ("val".into(), DataType::BigInt { length: None }),
2317 ],
2318 None,
2319 )
2320 .expect("schema setup");
2321
2322 let node = lineage_with_schema(
2323 "id",
2324 &expr,
2325 Some(&schema),
2326 Some(DialectType::BigQuery),
2327 false,
2328 )
2329 .expect("lineage_with_schema should handle correlated scalar subqueries");
2330
2331 assert_eq!(node.name, "id");
2332 }
2333
2334 #[test]
2335 fn test_lineage_with_schema_join_using() {
2336 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2337 let dialect = Dialect::get(DialectType::BigQuery);
2338 let expr = dialect
2339 .parse(query)
2340 .unwrap()
2341 .into_iter()
2342 .next()
2343 .expect("expected one expression");
2344
2345 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2346 schema
2347 .add_table(
2348 "t1",
2349 &[("a".into(), DataType::BigInt { length: None })],
2350 None,
2351 )
2352 .expect("schema setup");
2353 schema
2354 .add_table(
2355 "t2",
2356 &[("a".into(), DataType::BigInt { length: None })],
2357 None,
2358 )
2359 .expect("schema setup");
2360
2361 let node = lineage_with_schema(
2362 "a",
2363 &expr,
2364 Some(&schema),
2365 Some(DialectType::BigQuery),
2366 false,
2367 )
2368 .expect("lineage_with_schema should handle JOIN USING");
2369
2370 assert_eq!(node.name, "a");
2371 }
2372
2373 #[test]
2374 fn test_lineage_with_schema_qualified_table_name() {
2375 let query = "SELECT a FROM raw.t1";
2376 let dialect = Dialect::get(DialectType::BigQuery);
2377 let expr = dialect
2378 .parse(query)
2379 .unwrap()
2380 .into_iter()
2381 .next()
2382 .expect("expected one expression");
2383
2384 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2385 schema
2386 .add_table(
2387 "raw.t1",
2388 &[("a".into(), DataType::BigInt { length: None })],
2389 None,
2390 )
2391 .expect("schema setup");
2392
2393 let node = lineage_with_schema(
2394 "a",
2395 &expr,
2396 Some(&schema),
2397 Some(DialectType::BigQuery),
2398 false,
2399 )
2400 .expect("lineage_with_schema should handle dotted schema.table names");
2401
2402 assert_eq!(node.name, "a");
2403 }
2404
2405 #[test]
2406 fn test_lineage_with_schema_none_matches_lineage() {
2407 let expr = parse("SELECT a FROM t");
2408 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2409 let with_none =
2410 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2411
2412 assert_eq!(with_none.name, baseline.name);
2413 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2414 }
2415
2416 #[test]
2417 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2418 let dialect = Dialect::get(DialectType::BigQuery);
2419 let expr = dialect
2420 .parse("SELECT Name AS name FROM teams")
2421 .unwrap()
2422 .into_iter()
2423 .next()
2424 .expect("expected one expression");
2425
2426 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2427 schema
2428 .add_table(
2429 "teams",
2430 &[("Name".into(), DataType::String { length: None })],
2431 None,
2432 )
2433 .expect("schema setup");
2434
2435 let node = lineage_with_schema(
2436 "name",
2437 &expr,
2438 Some(&schema),
2439 Some(DialectType::BigQuery),
2440 false,
2441 )
2442 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2443
2444 let names = node.downstream_names();
2445 assert!(
2446 names.iter().any(|n| n == "teams.Name"),
2447 "Expected teams.Name in downstream, got: {:?}",
2448 names
2449 );
2450 }
2451
2452 #[test]
2453 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2454 let dialect = Dialect::get(DialectType::BigQuery);
2455 let expr = dialect
2456 .parse("SELECT Name AS Name FROM teams")
2457 .unwrap()
2458 .into_iter()
2459 .next()
2460 .expect("expected one expression");
2461
2462 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2463 .expect("lineage should resolve mixed-case aliases in BigQuery");
2464
2465 assert_eq!(node.name, "name");
2466 }
2467
2468 #[test]
2469 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2470 let expr = parse_one(
2471 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2472 DialectType::Snowflake,
2473 )
2474 .expect("parse");
2475
2476 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2477 schema
2478 .add_table(
2479 "fact.some_daily_metrics",
2480 &[("date_utc".to_string(), DataType::Date)],
2481 None,
2482 )
2483 .expect("schema setup");
2484
2485 let node = lineage_with_schema(
2486 "recency",
2487 &expr,
2488 Some(&schema),
2489 Some(DialectType::Snowflake),
2490 false,
2491 )
2492 .expect("lineage_with_schema should not treat date part as a column");
2493
2494 let names = node.downstream_names();
2495 assert!(
2496 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2497 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2498 names
2499 );
2500 assert!(
2501 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2502 "Did not expect date part to appear as lineage column, got: {:?}",
2503 names
2504 );
2505 }
2506
2507 #[test]
2508 fn test_snowflake_datediff_parses_to_typed_ast() {
2509 let expr = parse_one(
2510 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2511 DialectType::Snowflake,
2512 )
2513 .expect("parse");
2514
2515 match expr {
2516 Expression::Select(select) => match &select.expressions[0] {
2517 Expression::Alias(alias) => match &alias.this {
2518 Expression::DateDiff(f) => {
2519 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2520 }
2521 other => panic!("expected DateDiff, got {other:?}"),
2522 },
2523 other => panic!("expected Alias, got {other:?}"),
2524 },
2525 other => panic!("expected Select, got {other:?}"),
2526 }
2527 }
2528
2529 #[test]
2530 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2531 let expr = parse_one(
2532 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2533 DialectType::Snowflake,
2534 )
2535 .expect("parse");
2536
2537 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2538 schema
2539 .add_table(
2540 "fact.some_daily_metrics",
2541 &[("date_utc".to_string(), DataType::Date)],
2542 None,
2543 )
2544 .expect("schema setup");
2545
2546 let node = lineage_with_schema(
2547 "next_day",
2548 &expr,
2549 Some(&schema),
2550 Some(DialectType::Snowflake),
2551 false,
2552 )
2553 .expect("lineage_with_schema should not treat DATEADD date part as a column");
2554
2555 let names = node.downstream_names();
2556 assert!(
2557 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2558 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2559 names
2560 );
2561 assert!(
2562 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2563 "Did not expect date part to appear as lineage column, got: {:?}",
2564 names
2565 );
2566 }
2567
2568 #[test]
2569 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2570 let expr = parse_one(
2571 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2572 DialectType::Snowflake,
2573 )
2574 .expect("parse");
2575
2576 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2577 schema
2578 .add_table(
2579 "fact.some_daily_metrics",
2580 &[("date_utc".to_string(), DataType::Date)],
2581 None,
2582 )
2583 .expect("schema setup");
2584
2585 let node = lineage_with_schema(
2586 "day_part",
2587 &expr,
2588 Some(&schema),
2589 Some(DialectType::Snowflake),
2590 false,
2591 )
2592 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2593
2594 let names = node.downstream_names();
2595 assert!(
2596 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2597 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2598 names
2599 );
2600 assert!(
2601 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2602 "Did not expect date part to appear as lineage column, got: {:?}",
2603 names
2604 );
2605 }
2606
2607 #[test]
2608 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2609 let expr = parse_one(
2610 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2611 DialectType::Snowflake,
2612 )
2613 .expect("parse");
2614
2615 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2616 schema
2617 .add_table(
2618 "fact.some_daily_metrics",
2619 &[("date_utc".to_string(), DataType::Date)],
2620 None,
2621 )
2622 .expect("schema setup");
2623
2624 let node = lineage_with_schema(
2625 "day_part",
2626 &expr,
2627 Some(&schema),
2628 Some(DialectType::Snowflake),
2629 false,
2630 )
2631 .expect("quoted DATE_PART should continue to work");
2632
2633 let names = node.downstream_names();
2634 assert!(
2635 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2636 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2637 names
2638 );
2639 }
2640
2641 #[test]
2642 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2643 let expr = parse_one(
2644 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2645 DialectType::Snowflake,
2646 )
2647 .expect("parse");
2648
2649 match expr {
2650 Expression::Select(select) => match &select.expressions[0] {
2651 Expression::Alias(alias) => match &alias.this {
2652 Expression::Function(f) => {
2653 assert_eq!(f.name.to_uppercase(), "DATEADD");
2654 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2655 }
2656 other => panic!("expected generic DATEADD function, got {other:?}"),
2657 },
2658 other => panic!("expected Alias, got {other:?}"),
2659 },
2660 other => panic!("expected Select, got {other:?}"),
2661 }
2662 }
2663
2664 #[test]
2665 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2666 let expr = parse_one(
2667 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2668 DialectType::Snowflake,
2669 )
2670 .expect("parse");
2671
2672 match expr {
2673 Expression::Select(select) => match &select.expressions[0] {
2674 Expression::Alias(alias) => match &alias.this {
2675 Expression::Function(f) => {
2676 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2677 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2678 }
2679 other => panic!("expected generic DATE_PART function, got {other:?}"),
2680 },
2681 other => panic!("expected Alias, got {other:?}"),
2682 },
2683 other => panic!("expected Select, got {other:?}"),
2684 }
2685 }
2686
2687 #[test]
2688 fn test_snowflake_date_part_string_literal_stays_generic_function() {
2689 let expr = parse_one(
2690 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2691 DialectType::Snowflake,
2692 )
2693 .expect("parse");
2694
2695 match expr {
2696 Expression::Select(select) => match &select.expressions[0] {
2697 Expression::Alias(alias) => match &alias.this {
2698 Expression::Function(f) => {
2699 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2700 }
2701 other => panic!("expected generic DATE_PART function, got {other:?}"),
2702 },
2703 other => panic!("expected Alias, got {other:?}"),
2704 },
2705 other => panic!("expected Select, got {other:?}"),
2706 }
2707 }
2708
2709 #[test]
2710 fn test_lineage_join() {
2711 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2712
2713 let node_a = lineage("a", &expr, None, false).unwrap();
2714 let names_a = node_a.downstream_names();
2715 assert!(
2716 names_a.iter().any(|n| n == "t.a"),
2717 "Expected t.a, got: {:?}",
2718 names_a
2719 );
2720
2721 let node_b = lineage("b", &expr, None, false).unwrap();
2722 let names_b = node_b.downstream_names();
2723 assert!(
2724 names_b.iter().any(|n| n == "s.b"),
2725 "Expected s.b, got: {:?}",
2726 names_b
2727 );
2728 }
2729
2730 #[test]
2731 fn test_lineage_alias_leaf_has_resolved_source_name() {
2732 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2733 let node = lineage("col1", &expr, None, false).unwrap();
2734
2735 let names = node.downstream_names();
2737 assert!(
2738 names.iter().any(|n| n == "t1.col1"),
2739 "Expected aliased column edge t1.col1, got: {:?}",
2740 names
2741 );
2742
2743 let leaf = node
2745 .downstream
2746 .iter()
2747 .find(|n| n.name == "t1.col1")
2748 .expect("Expected t1.col1 leaf");
2749 assert_eq!(leaf.source_name, "table1");
2750 match &leaf.source {
2751 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2752 _ => panic!("Expected leaf source to be a table expression"),
2753 }
2754 }
2755
2756 #[test]
2757 fn test_lineage_derived_table() {
2758 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2759 let node = lineage("a", &expr, None, false).unwrap();
2760
2761 assert_eq!(node.name, "a");
2762 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2764 assert!(
2765 all_names.iter().any(|n| n == "t.a"),
2766 "Expected to trace through derived table to t.a, got: {:?}",
2767 all_names
2768 );
2769 }
2770
2771 #[test]
2772 fn test_lineage_cte() {
2773 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2774 let node = lineage("a", &expr, None, false).unwrap();
2775
2776 assert_eq!(node.name, "a");
2777 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2778 assert!(
2779 all_names.iter().any(|n| n == "t.a"),
2780 "Expected to trace through CTE to t.a, got: {:?}",
2781 all_names
2782 );
2783 }
2784
2785 #[test]
2786 fn test_lineage_union() {
2787 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2788 let node = lineage("a", &expr, None, false).unwrap();
2789
2790 assert_eq!(node.name, "a");
2791 assert_eq!(
2793 node.downstream.len(),
2794 2,
2795 "Expected 2 branches for UNION, got {}",
2796 node.downstream.len()
2797 );
2798 }
2799
2800 #[test]
2801 fn test_lineage_cte_union() {
2802 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2803 let node = lineage("a", &expr, None, false).unwrap();
2804
2805 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2807 assert!(
2808 all_names.len() >= 3,
2809 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2810 all_names
2811 );
2812 }
2813
2814 #[test]
2815 fn test_lineage_star() {
2816 let expr = parse("SELECT * FROM t");
2817 let node = lineage("*", &expr, None, false).unwrap();
2818
2819 assert_eq!(node.name, "*");
2820 assert!(
2822 !node.downstream.is_empty(),
2823 "Star should produce downstream nodes"
2824 );
2825 }
2826
2827 #[test]
2828 fn test_lineage_subquery_in_select() {
2829 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2830 let node = lineage("x", &expr, None, false).unwrap();
2831
2832 assert_eq!(node.name, "x");
2833 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2835 assert!(
2836 all_names.len() >= 2,
2837 "Expected tracing into scalar subquery, got: {:?}",
2838 all_names
2839 );
2840 }
2841
2842 #[test]
2843 fn test_lineage_multiple_columns() {
2844 let expr = parse("SELECT a, b FROM t");
2845
2846 let node_a = lineage("a", &expr, None, false).unwrap();
2847 let node_b = lineage("b", &expr, None, false).unwrap();
2848
2849 assert_eq!(node_a.name, "a");
2850 assert_eq!(node_b.name, "b");
2851
2852 let names_a = node_a.downstream_names();
2854 let names_b = node_b.downstream_names();
2855 assert!(names_a.iter().any(|n| n == "t.a"));
2856 assert!(names_b.iter().any(|n| n == "t.b"));
2857 }
2858
2859 #[test]
2860 fn test_get_source_tables() {
2861 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2862 let node = lineage("a", &expr, None, false).unwrap();
2863
2864 let tables = get_source_tables(&node);
2865 assert!(
2866 tables.contains("t"),
2867 "Expected source table 't', got: {:?}",
2868 tables
2869 );
2870 }
2871
2872 #[test]
2873 fn test_lineage_column_not_found() {
2874 let expr = parse("SELECT a FROM t");
2875 let result = lineage("nonexistent", &expr, None, false);
2876 assert!(result.is_err());
2877 }
2878
2879 #[test]
2880 fn test_lineage_nested_cte() {
2881 let expr = parse(
2882 "WITH cte1 AS (SELECT a FROM t), \
2883 cte2 AS (SELECT a FROM cte1) \
2884 SELECT a FROM cte2",
2885 );
2886 let node = lineage("a", &expr, None, false).unwrap();
2887
2888 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2890 assert!(
2891 all_names.len() >= 3,
2892 "Expected to trace through nested CTEs, got: {:?}",
2893 all_names
2894 );
2895 }
2896
2897 #[test]
2898 fn test_trim_selects_true() {
2899 let expr = parse("SELECT a, b, c FROM t");
2900 let node = lineage("a", &expr, None, true).unwrap();
2901
2902 if let Expression::Select(select) = &node.source {
2904 assert_eq!(
2905 select.expressions.len(),
2906 1,
2907 "Trimmed source should have 1 expression, got {}",
2908 select.expressions.len()
2909 );
2910 } else {
2911 panic!("Expected Select source");
2912 }
2913 }
2914
2915 #[test]
2916 fn test_trim_selects_false() {
2917 let expr = parse("SELECT a, b, c FROM t");
2918 let node = lineage("a", &expr, None, false).unwrap();
2919
2920 if let Expression::Select(select) = &node.source {
2922 assert_eq!(
2923 select.expressions.len(),
2924 3,
2925 "Untrimmed source should have 3 expressions"
2926 );
2927 } else {
2928 panic!("Expected Select source");
2929 }
2930 }
2931
2932 #[test]
2933 fn test_lineage_expression_in_select() {
2934 let expr = parse("SELECT a + b AS c FROM t");
2935 let node = lineage("c", &expr, None, false).unwrap();
2936
2937 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2939 assert!(
2940 all_names.len() >= 3,
2941 "Expected to trace a + b to both columns, got: {:?}",
2942 all_names
2943 );
2944 }
2945
2946 #[test]
2947 fn test_set_operation_by_index() {
2948 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
2949
2950 let node = lineage("a", &expr, None, false).unwrap();
2952
2953 assert_eq!(node.downstream.len(), 2);
2955 }
2956
2957 fn print_node(node: &LineageNode, indent: usize) {
2960 let pad = " ".repeat(indent);
2961 println!(
2962 "{pad}name={:?} source_name={:?}",
2963 node.name, node.source_name
2964 );
2965 for child in &node.downstream {
2966 print_node(child, indent + 1);
2967 }
2968 }
2969
2970 #[test]
2971 fn test_issue18_repro() {
2972 let query = "SELECT UPPER(name) as upper_name FROM users";
2974 println!("Query: {query}\n");
2975
2976 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
2977 let exprs = dialect.parse(query).unwrap();
2978 let expr = &exprs[0];
2979
2980 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
2981 println!("lineage(\"upper_name\"):");
2982 print_node(&node, 1);
2983
2984 let names = node.downstream_names();
2985 assert!(
2986 names.iter().any(|n| n == "users.name"),
2987 "Expected users.name in downstream, got: {:?}",
2988 names
2989 );
2990 }
2991
2992 #[test]
2993 fn test_lineage_upper_function() {
2994 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
2995 let node = lineage("upper_name", &expr, None, false).unwrap();
2996
2997 let names = node.downstream_names();
2998 assert!(
2999 names.iter().any(|n| n == "users.name"),
3000 "Expected users.name in downstream, got: {:?}",
3001 names
3002 );
3003 }
3004
3005 #[test]
3006 fn test_lineage_round_function() {
3007 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3008 let node = lineage("rounded", &expr, None, false).unwrap();
3009
3010 let names = node.downstream_names();
3011 assert!(
3012 names.iter().any(|n| n == "products.price"),
3013 "Expected products.price in downstream, got: {:?}",
3014 names
3015 );
3016 }
3017
3018 #[test]
3019 fn test_lineage_coalesce_function() {
3020 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3021 let node = lineage("val", &expr, None, false).unwrap();
3022
3023 let names = node.downstream_names();
3024 assert!(
3025 names.iter().any(|n| n == "t.a"),
3026 "Expected t.a in downstream, got: {:?}",
3027 names
3028 );
3029 assert!(
3030 names.iter().any(|n| n == "t.b"),
3031 "Expected t.b in downstream, got: {:?}",
3032 names
3033 );
3034 }
3035
3036 #[test]
3037 fn test_lineage_count_function() {
3038 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3039 let node = lineage("cnt", &expr, None, false).unwrap();
3040
3041 let names = node.downstream_names();
3042 assert!(
3043 names.iter().any(|n| n == "t.id"),
3044 "Expected t.id in downstream, got: {:?}",
3045 names
3046 );
3047 }
3048
3049 #[test]
3050 fn test_lineage_sum_function() {
3051 let expr = parse("SELECT SUM(amount) AS total FROM t");
3052 let node = lineage("total", &expr, None, false).unwrap();
3053
3054 let names = node.downstream_names();
3055 assert!(
3056 names.iter().any(|n| n == "t.amount"),
3057 "Expected t.amount in downstream, got: {:?}",
3058 names
3059 );
3060 }
3061
3062 #[test]
3063 fn test_lineage_case_with_nested_functions() {
3064 let expr =
3065 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3066 let node = lineage("result", &expr, None, false).unwrap();
3067
3068 let names = node.downstream_names();
3069 assert!(
3070 names.iter().any(|n| n == "t.x"),
3071 "Expected t.x in downstream, got: {:?}",
3072 names
3073 );
3074 assert!(
3075 names.iter().any(|n| n == "t.name"),
3076 "Expected t.name in downstream, got: {:?}",
3077 names
3078 );
3079 }
3080
3081 #[test]
3082 fn test_lineage_substring_function() {
3083 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3084 let node = lineage("short", &expr, None, false).unwrap();
3085
3086 let names = node.downstream_names();
3087 assert!(
3088 names.iter().any(|n| n == "t.name"),
3089 "Expected t.name in downstream, got: {:?}",
3090 names
3091 );
3092 }
3093
3094 #[test]
3097 fn test_lineage_cte_select_star() {
3098 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3102 let node = lineage("a", &expr, None, false).unwrap();
3103
3104 assert_eq!(node.name, "a");
3105 assert!(
3108 !node.downstream.is_empty(),
3109 "Expected downstream nodes tracing through CTE, got none"
3110 );
3111 }
3112
3113 #[test]
3114 fn test_lineage_cte_select_star_renamed_column() {
3115 let expr =
3118 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3119 let node = lineage("customer_id", &expr, None, false).unwrap();
3120
3121 assert_eq!(node.name, "customer_id");
3122 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3124 assert!(
3125 all_names.len() >= 2,
3126 "Expected at least 2 nodes (customer_id → source), got: {:?}",
3127 all_names
3128 );
3129 }
3130
3131 #[test]
3132 fn test_lineage_cte_select_star_multiple_columns() {
3133 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3135
3136 for col in &["a", "b", "c"] {
3137 let node = lineage(col, &expr, None, false).unwrap();
3138 assert_eq!(node.name, *col);
3139 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3141 assert!(
3142 all_names.len() >= 2,
3143 "Expected at least 2 nodes for column {}, got: {:?}",
3144 col,
3145 all_names
3146 );
3147 }
3148 }
3149
3150 #[test]
3151 fn test_lineage_nested_cte_select_star() {
3152 let expr = parse(
3154 "WITH cte1 AS (SELECT a FROM t), \
3155 cte2 AS (SELECT * FROM cte1) \
3156 SELECT * FROM cte2",
3157 );
3158 let node = lineage("a", &expr, None, false).unwrap();
3159
3160 assert_eq!(node.name, "a");
3161 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3162 assert!(
3163 all_names.len() >= 3,
3164 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3165 all_names
3166 );
3167 }
3168
3169 #[test]
3170 fn test_lineage_three_level_nested_cte_star() {
3171 let expr = parse(
3173 "WITH cte1 AS (SELECT x FROM t), \
3174 cte2 AS (SELECT * FROM cte1), \
3175 cte3 AS (SELECT * FROM cte2) \
3176 SELECT * FROM cte3",
3177 );
3178 let node = lineage("x", &expr, None, false).unwrap();
3179
3180 assert_eq!(node.name, "x");
3181 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3182 assert!(
3183 all_names.len() >= 4,
3184 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3185 all_names
3186 );
3187 }
3188
3189 #[test]
3190 fn test_lineage_cte_union_star() {
3191 let expr = parse(
3193 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3194 SELECT * FROM cte",
3195 );
3196 let node = lineage("a", &expr, None, false).unwrap();
3197
3198 assert_eq!(node.name, "a");
3199 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3200 assert!(
3201 all_names.len() >= 2,
3202 "Expected at least 2 nodes for CTE union star, got: {:?}",
3203 all_names
3204 );
3205 }
3206
3207 #[test]
3208 fn test_lineage_cte_star_unknown_table() {
3209 let expr = parse(
3212 "WITH cte AS (SELECT * FROM unknown_table) \
3213 SELECT * FROM cte",
3214 );
3215 let _result = lineage("x", &expr, None, false);
3218 }
3219
3220 #[test]
3221 fn test_lineage_cte_explicit_columns() {
3222 let expr = parse(
3224 "WITH cte(x, y) AS (SELECT a, b FROM t) \
3225 SELECT * FROM cte",
3226 );
3227 let node = lineage("x", &expr, None, false).unwrap();
3228 assert_eq!(node.name, "x");
3229 }
3230
3231 #[test]
3232 fn test_lineage_cte_qualified_star() {
3233 let expr = parse(
3235 "WITH cte AS (SELECT a, b FROM t) \
3236 SELECT cte.* FROM cte",
3237 );
3238 for col in &["a", "b"] {
3239 let node = lineage(col, &expr, None, false).unwrap();
3240 assert_eq!(node.name, *col);
3241 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3242 assert!(
3243 all_names.len() >= 2,
3244 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3245 col,
3246 all_names
3247 );
3248 }
3249 }
3250
3251 #[test]
3252 fn test_lineage_subquery_select_star() {
3253 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3256 let node = lineage("x", &expr, None, false).unwrap();
3257
3258 assert_eq!(node.name, "x");
3259 assert!(
3260 !node.downstream.is_empty(),
3261 "Expected downstream nodes for subquery with SELECT *, got none"
3262 );
3263 }
3264
3265 #[test]
3266 fn test_lineage_cte_star_with_schema_external_table() {
3267 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3269SELECT * FROM orders"#;
3270 let expr = parse(sql);
3271
3272 let mut schema = MappingSchema::new();
3273 let cols = vec![
3274 ("order_id".to_string(), DataType::Unknown),
3275 ("customer_id".to_string(), DataType::Unknown),
3276 ("amount".to_string(), DataType::Unknown),
3277 ];
3278 schema.add_table("stg_orders", &cols, None).unwrap();
3279
3280 let node =
3281 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3282 .unwrap();
3283 assert_eq!(node.name, "order_id");
3284 }
3285
3286 #[test]
3287 fn test_lineage_cte_star_with_schema_three_part_name() {
3288 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3290SELECT * FROM orders"#;
3291 let expr = parse(sql);
3292
3293 let mut schema = MappingSchema::new();
3294 let cols = vec![
3295 ("order_id".to_string(), DataType::Unknown),
3296 ("customer_id".to_string(), DataType::Unknown),
3297 ];
3298 schema
3299 .add_table("db.schema.stg_orders", &cols, None)
3300 .unwrap();
3301
3302 let node = lineage_with_schema(
3303 "customer_id",
3304 &expr,
3305 Some(&schema as &dyn Schema),
3306 None,
3307 false,
3308 )
3309 .unwrap();
3310 assert_eq!(node.name, "customer_id");
3311 }
3312
3313 #[test]
3314 fn test_lineage_cte_star_with_schema_nested() {
3315 let sql = r#"WITH
3318 raw AS (SELECT * FROM external_table),
3319 enriched AS (SELECT * FROM raw)
3320 SELECT * FROM enriched"#;
3321 let expr = parse(sql);
3322
3323 let mut schema = MappingSchema::new();
3324 let cols = vec![
3325 ("id".to_string(), DataType::Unknown),
3326 ("name".to_string(), DataType::Unknown),
3327 ];
3328 schema.add_table("external_table", &cols, None).unwrap();
3329
3330 let node =
3331 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3332 assert_eq!(node.name, "name");
3333 }
3334
3335 #[test]
3336 fn test_lineage_cte_qualified_star_with_schema() {
3337 let sql = r#"WITH
3340 orders AS (SELECT * FROM stg_orders),
3341 enriched AS (
3342 SELECT orders.*, 'extra' AS extra
3343 FROM orders
3344 )
3345 SELECT * FROM enriched"#;
3346 let expr = parse(sql);
3347
3348 let mut schema = MappingSchema::new();
3349 let cols = vec![
3350 ("order_id".to_string(), DataType::Unknown),
3351 ("total".to_string(), DataType::Unknown),
3352 ];
3353 schema.add_table("stg_orders", &cols, None).unwrap();
3354
3355 let node =
3356 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3357 .unwrap();
3358 assert_eq!(node.name, "order_id");
3359
3360 let extra =
3362 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3363 assert_eq!(extra.name, "extra");
3364 }
3365
3366 #[test]
3367 fn test_lineage_cte_star_without_schema_still_works() {
3368 let sql = r#"WITH
3370 cte1 AS (SELECT id, name FROM raw_table),
3371 cte2 AS (SELECT * FROM cte1)
3372 SELECT * FROM cte2"#;
3373 let expr = parse(sql);
3374
3375 let node = lineage("id", &expr, None, false).unwrap();
3377 assert_eq!(node.name, "id");
3378 }
3379
3380 #[test]
3381 fn test_lineage_nested_cte_star_with_join_and_schema() {
3382 let sql = r#"WITH
3385base_orders AS (
3386 SELECT * FROM stg_orders
3387),
3388with_payments AS (
3389 SELECT
3390 base_orders.*,
3391 p.amount
3392 FROM base_orders
3393 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3394),
3395final_cte AS (
3396 SELECT * FROM with_payments
3397)
3398SELECT * FROM final_cte"#;
3399 let expr = parse(sql);
3400
3401 let mut schema = MappingSchema::new();
3402 let order_cols = vec![
3403 (
3404 "order_id".to_string(),
3405 crate::expressions::DataType::Unknown,
3406 ),
3407 (
3408 "customer_id".to_string(),
3409 crate::expressions::DataType::Unknown,
3410 ),
3411 ("status".to_string(), crate::expressions::DataType::Unknown),
3412 ];
3413 let pay_cols = vec![
3414 (
3415 "payment_id".to_string(),
3416 crate::expressions::DataType::Unknown,
3417 ),
3418 (
3419 "order_id".to_string(),
3420 crate::expressions::DataType::Unknown,
3421 ),
3422 ("amount".to_string(), crate::expressions::DataType::Unknown),
3423 ];
3424 schema.add_table("stg_orders", &order_cols, None).unwrap();
3425 schema.add_table("stg_payments", &pay_cols, None).unwrap();
3426
3427 let node =
3429 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3430 .unwrap();
3431 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3432
3433 let has_table_qualified = all_names
3435 .iter()
3436 .any(|n| n.contains('.') && n.contains("order_id"));
3437 assert!(
3438 has_table_qualified,
3439 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
3440 all_names
3441 );
3442
3443 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
3445 .unwrap();
3446 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3447
3448 let has_table_qualified = all_names
3449 .iter()
3450 .any(|n| n.contains('.') && n.contains("amount"));
3451 assert!(
3452 has_table_qualified,
3453 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
3454 all_names
3455 );
3456 }
3457
3458 #[test]
3459 fn test_lineage_cte_alias_resolution() {
3460 let sql = r#"WITH import_stg_items AS (
3462 SELECT item_id, name, status FROM stg_items
3463)
3464SELECT base.item_id, base.status
3465FROM import_stg_items AS base"#;
3466 let expr = parse(sql);
3467
3468 let node = lineage("item_id", &expr, None, false).unwrap();
3469 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3470 assert!(
3472 all_names.iter().any(|n| n == "stg_items.item_id"),
3473 "Expected leaf 'stg_items.item_id', got: {:?}",
3474 all_names
3475 );
3476 }
3477
3478 #[test]
3479 fn test_lineage_cte_alias_with_schema_and_star() {
3480 let sql = r#"WITH import_stg AS (
3482 SELECT * FROM stg_items
3483)
3484SELECT base.item_id, base.status
3485FROM import_stg AS base"#;
3486 let expr = parse(sql);
3487
3488 let mut schema = MappingSchema::new();
3489 schema
3490 .add_table(
3491 "stg_items",
3492 &[
3493 ("item_id".to_string(), DataType::Unknown),
3494 ("name".to_string(), DataType::Unknown),
3495 ("status".to_string(), DataType::Unknown),
3496 ],
3497 None,
3498 )
3499 .unwrap();
3500
3501 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
3502 .unwrap();
3503 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3504 assert!(
3505 all_names.iter().any(|n| n == "stg_items.item_id"),
3506 "Expected leaf 'stg_items.item_id', got: {:?}",
3507 all_names
3508 );
3509 }
3510
3511 #[test]
3512 fn test_lineage_cte_alias_with_join() {
3513 let sql = r#"WITH
3515 import_users AS (SELECT id, name FROM users),
3516 import_orders AS (SELECT id, user_id, amount FROM orders)
3517SELECT u.name, o.amount
3518FROM import_users AS u
3519LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3520 let expr = parse(sql);
3521
3522 let node = lineage("name", &expr, None, false).unwrap();
3523 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3524 assert!(
3525 all_names.iter().any(|n| n == "users.name"),
3526 "Expected leaf 'users.name', got: {:?}",
3527 all_names
3528 );
3529
3530 let node = lineage("amount", &expr, None, false).unwrap();
3531 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3532 assert!(
3533 all_names.iter().any(|n| n == "orders.amount"),
3534 "Expected leaf 'orders.amount', got: {:?}",
3535 all_names
3536 );
3537 }
3538
3539 #[test]
3544 fn test_lineage_unquoted_cte_case_insensitive() {
3545 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
3548 let node = lineage("col", &expr, None, false).unwrap();
3549 assert_eq!(node.name, "col");
3550 assert!(
3551 !node.downstream.is_empty(),
3552 "Unquoted CTE should resolve case-insensitively"
3553 );
3554 }
3555
3556 #[test]
3557 fn test_lineage_quoted_cte_case_preserved() {
3558 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
3560 let node = lineage("col", &expr, None, false).unwrap();
3561 assert_eq!(node.name, "col");
3562 assert!(
3563 !node.downstream.is_empty(),
3564 "Quoted CTE with matching case should resolve"
3565 );
3566 }
3567
3568 #[test]
3569 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3570 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
3574 let result = lineage("col", &expr, None, false);
3577 assert!(
3578 result.is_err(),
3579 "Quoted CTE with case mismatch should not expand star: {:?}",
3580 result
3581 );
3582 }
3583
3584 #[test]
3585 fn test_lineage_mixed_quoted_unquoted_cte() {
3586 let expr = parse(
3588 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3589 );
3590 let node = lineage("a", &expr, None, false).unwrap();
3591 assert_eq!(node.name, "a");
3592 assert!(
3593 !node.downstream.is_empty(),
3594 "Mixed quoted/unquoted CTE chain should resolve"
3595 );
3596 }
3597
3598 #[test]
3614 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3615 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
3626 let node = lineage("col", &expr, None, false).unwrap();
3627 assert!(!node.downstream.is_empty());
3628 let child = &node.downstream[0];
3629 assert_eq!(
3631 child.source_name, "MyCte",
3632 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3633 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3634 );
3635 }
3636
3637 #[test]
3638 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
3639 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
3646 let node = lineage("col", &expr, None, false).unwrap();
3647 assert!(!node.downstream.is_empty());
3648 let child = &node.downstream[0];
3649 assert_eq!(
3651 child.source_name, "MyCte",
3652 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3653 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3654 );
3655 }
3656
3657 #[test]
3664 #[ignore = "requires derived table star expansion (separate issue)"]
3665 fn test_node_name_doesnt_contain_comment() {
3666 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
3667 let node = lineage("x", &expr, None, false).unwrap();
3668
3669 assert_eq!(node.name, "x");
3670 assert!(!node.downstream.is_empty());
3671 }
3672
3673 #[test]
3677 fn test_comment_before_first_column_in_cte() {
3678 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
3679 let sql_without_comment = "with t as (select 1 as a) select a from t";
3680
3681 let expr_ok = parse(sql_without_comment);
3683 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
3684
3685 let expr_comment = parse(sql_with_comment);
3687 let node_comment = lineage("a", &expr_comment, None, false)
3688 .expect("with comment before first column should succeed");
3689
3690 assert_eq!(node_ok.name, node_comment.name, "node names should match");
3691 assert_eq!(
3692 node_ok.downstream_names(),
3693 node_comment.downstream_names(),
3694 "downstream lineage should be identical with or without comment"
3695 );
3696 }
3697
3698 #[test]
3700 fn test_block_comment_before_first_column() {
3701 let sql = "with t as (select 1 as a) select /* section */ a from t";
3702 let expr = parse(sql);
3703 let node = lineage("a", &expr, None, false)
3704 .expect("block comment before first column should succeed");
3705 assert_eq!(node.name, "a");
3706 assert!(
3707 !node.downstream.is_empty(),
3708 "should have downstream lineage"
3709 );
3710 }
3711
3712 #[test]
3714 fn test_comment_before_first_column_second_col_ok() {
3715 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
3716 let expr = parse(sql);
3717
3718 let node_a =
3719 lineage("a", &expr, None, false).expect("column a with comment should succeed");
3720 assert_eq!(node_a.name, "a");
3721
3722 let node_b =
3723 lineage("b", &expr, None, false).expect("column b with comment should succeed");
3724 assert_eq!(node_b.name, "b");
3725 }
3726
3727 #[test]
3729 fn test_comment_before_aliased_column() {
3730 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
3731 let expr = parse(sql);
3732 let node =
3733 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
3734 assert_eq!(node.name, "y");
3735 assert!(
3736 !node.downstream.is_empty(),
3737 "aliased column should have downstream lineage"
3738 );
3739 }
3740}