1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub reference_node_name: String,
34}
35
36impl LineageNode {
37 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39 Self {
40 name: name.into(),
41 expression,
42 source,
43 downstream: Vec::new(),
44 source_name: String::new(),
45 reference_node_name: String::new(),
46 }
47 }
48
49 pub fn walk(&self) -> LineageWalker<'_> {
51 LineageWalker { stack: vec![self] }
52 }
53
54 pub fn downstream_names(&self) -> Vec<String> {
56 self.downstream.iter().map(|n| n.name.clone()).collect()
57 }
58}
59
60pub struct LineageWalker<'a> {
62 stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66 type Item = &'a LineageNode;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 if let Some(node) = self.stack.pop() {
70 for child in node.downstream.iter().rev() {
72 self.stack.push(child);
73 }
74 Some(node)
75 } else {
76 None
77 }
78 }
79}
80
81enum ColumnRef<'a> {
87 Name(&'a str),
88 Index(usize),
89}
90
91pub fn lineage(
117 column: &str,
118 sql: &Expression,
119 dialect: Option<DialectType>,
120 trim_selects: bool,
121) -> Result<LineageNode> {
122 let has_with = matches!(sql, Expression::Select(s) if s.with.is_some());
124 if !has_with {
125 return lineage_from_expression(column, sql, dialect, trim_selects);
126 }
127 let mut owned = sql.clone();
128 expand_cte_stars(&mut owned, None);
129 lineage_from_expression(column, &owned, dialect, trim_selects)
130}
131
132pub fn lineage_with_schema(
148 column: &str,
149 sql: &Expression,
150 schema: Option<&dyn Schema>,
151 dialect: Option<DialectType>,
152 trim_selects: bool,
153) -> Result<LineageNode> {
154 let mut qualified_expression = if let Some(schema) = schema {
155 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
156 QualifyColumnsOptions::new().with_dialect(dialect_type)
157 } else {
158 QualifyColumnsOptions::new()
159 };
160
161 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
162 Error::internal(format!("Lineage qualification failed with schema: {}", e))
163 })?
164 } else {
165 sql.clone()
166 };
167
168 annotate_types(&mut qualified_expression, schema, dialect);
170
171 expand_cte_stars(&mut qualified_expression, schema);
174
175 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
176}
177
178fn lineage_from_expression(
179 column: &str,
180 sql: &Expression,
181 dialect: Option<DialectType>,
182 trim_selects: bool,
183) -> Result<LineageNode> {
184 let scope = build_scope(sql);
185 to_node(
186 ColumnRef::Name(column),
187 &scope,
188 dialect,
189 "",
190 "",
191 "",
192 trim_selects,
193 )
194}
195
196fn normalize_cte_name(ident: &Identifier) -> String {
206 if ident.quoted {
207 ident.name.clone()
208 } else {
209 ident.name.to_lowercase()
210 }
211}
212
213pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
225 let select = match expr {
226 Expression::Select(s) => s,
227 _ => return,
228 };
229
230 let with = match &mut select.with {
231 Some(w) => w,
232 None => return,
233 };
234
235 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
236
237 for cte in &mut with.ctes {
238 let cte_name = normalize_cte_name(&cte.alias);
239
240 if !cte.columns.is_empty() {
242 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
243 resolved_cte_columns.insert(cte_name, cols);
244 continue;
245 }
246
247 if with.recursive {
253 let is_self_referencing =
254 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
255 let body_sources = get_select_sources(body_select);
256 body_sources.iter().any(|s| s.normalized == cte_name)
257 } else {
258 false
259 };
260 if is_self_referencing {
261 continue;
262 }
263 }
264
265 let body_select = match get_leftmost_select_mut(&mut cte.this) {
267 Some(s) => s,
268 None => continue,
269 };
270
271 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
272 resolved_cte_columns.insert(cte_name, columns);
273 }
274
275 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
277}
278
279fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
284 let mut current = expr;
285 for _ in 0..MAX_LINEAGE_DEPTH {
286 match current {
287 Expression::Select(s) => return Some(s),
288 Expression::Union(u) => current = &mut u.left,
289 Expression::Intersect(i) => current = &mut i.left,
290 Expression::Except(e) => current = &mut e.left,
291 Expression::Paren(p) => current = &mut p.this,
292 _ => return None,
293 }
294 }
295 None
296}
297
298fn rewrite_stars_in_select(
302 select: &mut Select,
303 resolved_ctes: &HashMap<String, Vec<String>>,
304 schema: Option<&dyn Schema>,
305) -> Vec<String> {
306 let has_star = select
311 .expressions
312 .iter()
313 .any(|e| matches!(e, Expression::Star(_)));
314 let has_qualified_star = select
315 .expressions
316 .iter()
317 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
318
319 if !has_star && !has_qualified_star {
320 return select
322 .expressions
323 .iter()
324 .filter_map(get_expression_output_name)
325 .collect();
326 }
327
328 let sources = get_select_sources(select);
329 let mut new_expressions = Vec::new();
330 let mut result_columns = Vec::new();
331
332 for expr in &select.expressions {
333 match expr {
334 Expression::Star(star) => {
335 let qual = star.table.as_ref();
336 if let Some(expanded) =
337 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
338 {
339 for (src_alias, col_name) in &expanded {
340 let table_id = Identifier::new(src_alias);
341 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
342 result_columns.push(col_name.clone());
343 }
344 } else {
345 new_expressions.push(expr.clone());
346 result_columns.push("*".to_string());
347 }
348 }
349 Expression::Column(c) if c.name.name == "*" => {
350 let qual = c.table.as_ref();
351 if let Some(expanded) =
352 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
353 {
354 for (_src_alias, col_name) in &expanded {
355 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
357 result_columns.push(col_name.clone());
358 }
359 } else {
360 new_expressions.push(expr.clone());
361 result_columns.push("*".to_string());
362 }
363 }
364 _ => {
365 new_expressions.push(expr.clone());
366 if let Some(name) = get_expression_output_name(expr) {
367 result_columns.push(name);
368 }
369 }
370 }
371 }
372
373 select.expressions = new_expressions;
374 result_columns
375}
376
377fn expand_star_from_sources(
382 qualifier: Option<&Identifier>,
383 sources: &[SourceInfo],
384 resolved_ctes: &HashMap<String, Vec<String>>,
385 schema: Option<&dyn Schema>,
386) -> Option<Vec<(String, String)>> {
387 let mut expanded = Vec::new();
388
389 if let Some(qual) = qualifier {
390 let qual_normalized = normalize_cte_name(qual);
392 for src in sources {
393 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
394 if let Some(cols) = resolved_ctes.get(&src.normalized) {
396 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
397 return Some(expanded);
398 }
399 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
401 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
402 return Some(expanded);
403 }
404 }
405 }
406 None
407 } else {
408 let mut any_expanded = false;
414 for src in sources {
415 if let Some(cols) = resolved_ctes.get(&src.normalized) {
416 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
417 any_expanded = true;
418 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
419 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
420 any_expanded = true;
421 } else {
422 return None;
423 }
424 }
425 if any_expanded {
426 Some(expanded)
427 } else {
428 None
429 }
430 }
431}
432
433fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
435 let schema = schema?;
436 if fq_name.is_empty() {
437 return None;
438 }
439 schema
440 .column_names(fq_name)
441 .ok()
442 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
443}
444
445fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
447 Expression::Column(Box::new(crate::expressions::Column {
448 name: Identifier::new(name),
449 table: table.cloned(),
450 join_mark: false,
451 trailing_comments: Vec::new(),
452 span: None,
453 inferred_type: None,
454 }))
455}
456
457fn get_expression_output_name(expr: &Expression) -> Option<String> {
459 match expr {
460 Expression::Alias(a) => Some(a.alias.name.clone()),
461 Expression::Column(c) => Some(c.name.name.clone()),
462 Expression::Identifier(id) => Some(id.name.clone()),
463 Expression::Star(_) => Some("*".to_string()),
464 _ => None,
465 }
466}
467
468struct SourceInfo {
470 alias: String,
471 normalized: String,
473 fq_name: String,
475}
476
477fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
480 let mut sources = Vec::new();
481
482 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
483 match expr {
484 Expression::Table(t) => {
485 let normalized = normalize_cte_name(&t.name);
486 let alias = t
487 .alias
488 .as_ref()
489 .map(|a| a.name.clone())
490 .unwrap_or_else(|| t.name.name.clone());
491 let mut parts = Vec::new();
492 if let Some(catalog) = &t.catalog {
493 parts.push(catalog.name.clone());
494 }
495 if let Some(schema) = &t.schema {
496 parts.push(schema.name.clone());
497 }
498 parts.push(t.name.name.clone());
499 let fq_name = parts.join(".");
500 Some(SourceInfo {
501 alias,
502 normalized,
503 fq_name,
504 })
505 }
506 Expression::Subquery(s) => {
507 let alias = s.alias.as_ref()?.name.clone();
508 let normalized = alias.to_lowercase();
509 let fq_name = alias.clone();
510 Some(SourceInfo {
511 alias,
512 normalized,
513 fq_name,
514 })
515 }
516 Expression::Paren(p) => extract_source(&p.this),
517 _ => None,
518 }
519 }
520
521 if let Some(from) = &select.from {
522 for expr in &from.expressions {
523 if let Some(info) = extract_source(expr) {
524 sources.push(info);
525 }
526 }
527 }
528 for join in &select.joins {
529 if let Some(info) = extract_source(&join.this) {
530 sources.push(info);
531 }
532 }
533 sources
534}
535
536pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
538 let mut tables = HashSet::new();
539 collect_source_tables(node, &mut tables);
540 tables
541}
542
543pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
545 if let Expression::Table(table) = &node.source {
546 tables.insert(table.name.name.clone());
547 }
548 for child in &node.downstream {
549 collect_source_tables(child, tables);
550 }
551}
552
553const MAX_LINEAGE_DEPTH: usize = 64;
560
561fn to_node(
563 column: ColumnRef<'_>,
564 scope: &Scope,
565 dialect: Option<DialectType>,
566 scope_name: &str,
567 source_name: &str,
568 reference_node_name: &str,
569 trim_selects: bool,
570) -> Result<LineageNode> {
571 to_node_inner(
572 column,
573 scope,
574 dialect,
575 scope_name,
576 source_name,
577 reference_node_name,
578 trim_selects,
579 &[],
580 0,
581 )
582}
583
584fn to_node_inner(
585 column: ColumnRef<'_>,
586 scope: &Scope,
587 dialect: Option<DialectType>,
588 scope_name: &str,
589 source_name: &str,
590 reference_node_name: &str,
591 trim_selects: bool,
592 ancestor_cte_scopes: &[Scope],
593 depth: usize,
594) -> Result<LineageNode> {
595 if depth > MAX_LINEAGE_DEPTH {
596 return Err(Error::internal(format!(
597 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
598 )));
599 }
600 let scope_expr = &scope.expression;
601
602 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
604 for s in ancestor_cte_scopes {
605 all_cte_scopes.push(s);
606 }
607
608 let effective_expr = match scope_expr {
611 Expression::Cte(cte) => &cte.this,
612 other => other,
613 };
614
615 if matches!(
617 effective_expr,
618 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
619 ) {
620 if matches!(scope_expr, Expression::Cte(_)) {
622 let mut inner_scope = Scope::new(effective_expr.clone());
623 inner_scope.union_scopes = scope.union_scopes.clone();
624 inner_scope.sources = scope.sources.clone();
625 inner_scope.cte_sources = scope.cte_sources.clone();
626 inner_scope.cte_scopes = scope.cte_scopes.clone();
627 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
628 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
629 return handle_set_operation(
630 &column,
631 &inner_scope,
632 dialect,
633 scope_name,
634 source_name,
635 reference_node_name,
636 trim_selects,
637 ancestor_cte_scopes,
638 depth,
639 );
640 }
641 return handle_set_operation(
642 &column,
643 scope,
644 dialect,
645 scope_name,
646 source_name,
647 reference_node_name,
648 trim_selects,
649 ancestor_cte_scopes,
650 depth,
651 );
652 }
653
654 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
656 let column_name = resolve_column_name(&column, &select_expr);
657
658 let node_source = if trim_selects {
660 trim_source(effective_expr, &select_expr)
661 } else {
662 effective_expr.clone()
663 };
664
665 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
667 node.source_name = source_name.to_string();
668 node.reference_node_name = reference_node_name.to_string();
669
670 if matches!(&select_expr, Expression::Star(_)) {
672 for (name, source_info) in &scope.sources {
673 let child = LineageNode::new(
674 format!("{}.*", name),
675 Expression::Star(crate::expressions::Star {
676 table: None,
677 except: None,
678 replace: None,
679 rename: None,
680 trailing_comments: vec![],
681 span: None,
682 }),
683 source_info.expression.clone(),
684 );
685 node.downstream.push(child);
686 }
687 return Ok(node);
688 }
689
690 let subqueries: Vec<&Expression> =
692 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
693 for sq_expr in subqueries {
694 if let Expression::Subquery(sq) = sq_expr {
695 for sq_scope in &scope.subquery_scopes {
696 if sq_scope.expression == sq.this {
697 if let Ok(child) = to_node_inner(
698 ColumnRef::Index(0),
699 sq_scope,
700 dialect,
701 &column_name,
702 "",
703 "",
704 trim_selects,
705 ancestor_cte_scopes,
706 depth + 1,
707 ) {
708 node.downstream.push(child);
709 }
710 break;
711 }
712 }
713 }
714 }
715
716 let col_refs = find_column_refs_in_expr(&select_expr, dialect);
718 for col_ref in col_refs {
719 let col_name = &col_ref.column;
720 if let Some(ref table_id) = col_ref.table {
721 let tbl = &table_id.name;
722 resolve_qualified_column(
723 &mut node,
724 scope,
725 dialect,
726 tbl,
727 col_name,
728 &column_name,
729 trim_selects,
730 &all_cte_scopes,
731 depth,
732 );
733 } else {
734 resolve_unqualified_column(
735 &mut node,
736 scope,
737 dialect,
738 col_name,
739 &column_name,
740 trim_selects,
741 &all_cte_scopes,
742 depth,
743 );
744 }
745 }
746
747 Ok(node)
748}
749
750fn handle_set_operation(
755 column: &ColumnRef<'_>,
756 scope: &Scope,
757 dialect: Option<DialectType>,
758 scope_name: &str,
759 source_name: &str,
760 reference_node_name: &str,
761 trim_selects: bool,
762 ancestor_cte_scopes: &[Scope],
763 depth: usize,
764) -> Result<LineageNode> {
765 let scope_expr = &scope.expression;
766
767 let col_index = match column {
769 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
770 ColumnRef::Index(i) => *i,
771 };
772
773 let col_name = match column {
774 ColumnRef::Name(name) => name.to_string(),
775 ColumnRef::Index(_) => format!("_{col_index}"),
776 };
777
778 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
779 node.source_name = source_name.to_string();
780 node.reference_node_name = reference_node_name.to_string();
781
782 for branch_scope in &scope.union_scopes {
784 if let Ok(child) = to_node_inner(
785 ColumnRef::Index(col_index),
786 branch_scope,
787 dialect,
788 scope_name,
789 "",
790 "",
791 trim_selects,
792 ancestor_cte_scopes,
793 depth + 1,
794 ) {
795 node.downstream.push(child);
796 }
797 }
798
799 Ok(node)
800}
801
802fn resolve_qualified_column(
807 node: &mut LineageNode,
808 scope: &Scope,
809 dialect: Option<DialectType>,
810 table: &str,
811 col_name: &str,
812 parent_name: &str,
813 trim_selects: bool,
814 all_cte_scopes: &[&Scope],
815 depth: usize,
816) {
817 let resolved_cte_name = resolve_cte_alias(scope, table);
820 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
821
822 let is_cte = scope.cte_sources.contains_key(effective_table)
825 || all_cte_scopes.iter().any(
826 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
827 );
828 if is_cte {
829 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
830 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
832 if let Ok(child) = to_node_inner(
833 ColumnRef::Name(col_name),
834 child_scope,
835 dialect,
836 parent_name,
837 effective_table,
838 parent_name,
839 trim_selects,
840 &ancestors,
841 depth + 1,
842 ) {
843 node.downstream.push(child);
844 return;
845 }
846 }
847 }
848
849 if let Some(source_info) = scope.sources.get(table) {
851 if source_info.is_scope {
852 if let Some(child_scope) = find_child_scope(scope, table) {
853 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
854 if let Ok(child) = to_node_inner(
855 ColumnRef::Name(col_name),
856 child_scope,
857 dialect,
858 parent_name,
859 table,
860 parent_name,
861 trim_selects,
862 &ancestors,
863 depth + 1,
864 ) {
865 node.downstream.push(child);
866 return;
867 }
868 }
869 }
870 }
871
872 if let Some(source_info) = scope.sources.get(table) {
875 if !source_info.is_scope {
876 node.downstream.push(make_table_column_node_from_source(
877 table,
878 col_name,
879 &source_info.expression,
880 ));
881 return;
882 }
883 }
884
885 node.downstream
887 .push(make_table_column_node(table, col_name));
888}
889
890fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
896 if scope.cte_sources.contains_key(name) {
898 return None;
899 }
900 if let Some(source_info) = scope.sources.get(name) {
902 if source_info.is_scope {
903 if let Expression::Cte(cte) = &source_info.expression {
904 let cte_name = &cte.alias.name;
905 if scope.cte_sources.contains_key(cte_name) {
906 return Some(cte_name.clone());
907 }
908 }
909 }
910 }
911 None
912}
913
914fn resolve_unqualified_column(
915 node: &mut LineageNode,
916 scope: &Scope,
917 dialect: Option<DialectType>,
918 col_name: &str,
919 parent_name: &str,
920 trim_selects: bool,
921 all_cte_scopes: &[&Scope],
922 depth: usize,
923) {
924 let from_source_names = source_names_from_from_join(scope);
928
929 if from_source_names.len() == 1 {
930 let tbl = &from_source_names[0];
931 resolve_qualified_column(
932 node,
933 scope,
934 dialect,
935 tbl,
936 col_name,
937 parent_name,
938 trim_selects,
939 all_cte_scopes,
940 depth,
941 );
942 return;
943 }
944
945 let child = LineageNode::new(
947 col_name.to_string(),
948 Expression::Column(Box::new(crate::expressions::Column {
949 name: crate::expressions::Identifier::new(col_name.to_string()),
950 table: None,
951 join_mark: false,
952 trailing_comments: vec![],
953 span: None,
954 inferred_type: None,
955 })),
956 node.source.clone(),
957 );
958 node.downstream.push(child);
959}
960
961fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
962 fn source_name(expr: &Expression) -> Option<String> {
963 match expr {
964 Expression::Table(table) => Some(
965 table
966 .alias
967 .as_ref()
968 .map(|a| a.name.clone())
969 .unwrap_or_else(|| table.name.name.clone()),
970 ),
971 Expression::Subquery(subquery) => {
972 subquery.alias.as_ref().map(|alias| alias.name.clone())
973 }
974 Expression::Paren(paren) => source_name(&paren.this),
975 _ => None,
976 }
977 }
978
979 let effective_expr = match &scope.expression {
980 Expression::Cte(cte) => &cte.this,
981 expr => expr,
982 };
983
984 let mut names = Vec::new();
985 let mut seen = std::collections::HashSet::new();
986
987 if let Expression::Select(select) = effective_expr {
988 if let Some(from) = &select.from {
989 for expr in &from.expressions {
990 if let Some(name) = source_name(expr) {
991 if !name.is_empty() && seen.insert(name.clone()) {
992 names.push(name);
993 }
994 }
995 }
996 }
997 for join in &select.joins {
998 if let Some(name) = source_name(&join.this) {
999 if !name.is_empty() && seen.insert(name.clone()) {
1000 names.push(name);
1001 }
1002 }
1003 }
1004 }
1005
1006 names
1007}
1008
1009fn get_alias_or_name(expr: &Expression) -> Option<String> {
1015 match expr {
1016 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1017 Expression::Column(col) => Some(col.name.name.clone()),
1018 Expression::Identifier(id) => Some(id.name.clone()),
1019 Expression::Star(_) => Some("*".to_string()),
1020 Expression::Annotated(a) => get_alias_or_name(&a.this),
1023 _ => None,
1024 }
1025}
1026
1027fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1029 match column {
1030 ColumnRef::Name(n) => n.to_string(),
1031 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1032 }
1033}
1034
1035fn find_select_expr(
1037 scope_expr: &Expression,
1038 column: &ColumnRef<'_>,
1039 dialect: Option<DialectType>,
1040) -> Result<Expression> {
1041 if let Expression::Select(ref select) = scope_expr {
1042 match column {
1043 ColumnRef::Name(name) => {
1044 let normalized_name = normalize_column_name(name, dialect);
1045 for expr in &select.expressions {
1046 if let Some(alias_or_name) = get_alias_or_name(expr) {
1047 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1048 return Ok(expr.clone());
1049 }
1050 }
1051 }
1052 Err(crate::error::Error::parse(
1053 format!("Cannot find column '{}' in query", name),
1054 0,
1055 0,
1056 0,
1057 0,
1058 ))
1059 }
1060 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1061 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1062 }),
1063 }
1064 } else {
1065 Err(crate::error::Error::parse(
1066 "Expected SELECT expression for column lookup",
1067 0,
1068 0,
1069 0,
1070 0,
1071 ))
1072 }
1073}
1074
1075fn column_to_index(
1077 set_op_expr: &Expression,
1078 name: &str,
1079 dialect: Option<DialectType>,
1080) -> Result<usize> {
1081 let normalized_name = normalize_column_name(name, dialect);
1082 let mut expr = set_op_expr;
1083 loop {
1084 match expr {
1085 Expression::Union(u) => expr = &u.left,
1086 Expression::Intersect(i) => expr = &i.left,
1087 Expression::Except(e) => expr = &e.left,
1088 Expression::Select(select) => {
1089 for (i, e) in select.expressions.iter().enumerate() {
1090 if let Some(alias_or_name) = get_alias_or_name(e) {
1091 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1092 return Ok(i);
1093 }
1094 }
1095 }
1096 return Err(crate::error::Error::parse(
1097 format!("Cannot find column '{}' in set operation", name),
1098 0,
1099 0,
1100 0,
1101 0,
1102 ));
1103 }
1104 _ => {
1105 return Err(crate::error::Error::parse(
1106 "Expected SELECT or set operation",
1107 0,
1108 0,
1109 0,
1110 0,
1111 ))
1112 }
1113 }
1114 }
1115}
1116
1117fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1118 normalize_name(name, dialect, false, true)
1119}
1120
1121fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1123 if let Expression::Select(select) = select_expr {
1124 let mut trimmed = select.as_ref().clone();
1125 trimmed.expressions = vec![target_expr.clone()];
1126 Expression::Select(Box::new(trimmed))
1127 } else {
1128 select_expr.clone()
1129 }
1130}
1131
1132fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1134 if scope.cte_sources.contains_key(source_name) {
1136 for cte_scope in &scope.cte_scopes {
1137 if let Expression::Cte(cte) = &cte_scope.expression {
1138 if cte.alias.name == source_name {
1139 return Some(cte_scope);
1140 }
1141 }
1142 }
1143 }
1144
1145 if let Some(source_info) = scope.sources.get(source_name) {
1147 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1148 if let Expression::Subquery(sq) = &source_info.expression {
1149 for dt_scope in &scope.derived_table_scopes {
1150 if dt_scope.expression == sq.this {
1151 return Some(dt_scope);
1152 }
1153 }
1154 }
1155 }
1156 }
1157
1158 None
1159}
1160
1161fn find_child_scope_in<'a>(
1165 all_cte_scopes: &[&'a Scope],
1166 scope: &'a Scope,
1167 source_name: &str,
1168) -> Option<&'a Scope> {
1169 for cte_scope in &scope.cte_scopes {
1171 if let Expression::Cte(cte) = &cte_scope.expression {
1172 if cte.alias.name == source_name {
1173 return Some(cte_scope);
1174 }
1175 }
1176 }
1177
1178 for cte_scope in all_cte_scopes {
1180 if let Expression::Cte(cte) = &cte_scope.expression {
1181 if cte.alias.name == source_name {
1182 return Some(cte_scope);
1183 }
1184 }
1185 }
1186
1187 if let Some(source_info) = scope.sources.get(source_name) {
1189 if source_info.is_scope {
1190 if let Expression::Subquery(sq) = &source_info.expression {
1191 for dt_scope in &scope.derived_table_scopes {
1192 if dt_scope.expression == sq.this {
1193 return Some(dt_scope);
1194 }
1195 }
1196 }
1197 }
1198 }
1199
1200 None
1201}
1202
1203fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1205 let mut node = LineageNode::new(
1206 format!("{}.{}", table, column),
1207 Expression::Column(Box::new(crate::expressions::Column {
1208 name: crate::expressions::Identifier::new(column.to_string()),
1209 table: Some(crate::expressions::Identifier::new(table.to_string())),
1210 join_mark: false,
1211 trailing_comments: vec![],
1212 span: None,
1213 inferred_type: None,
1214 })),
1215 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1216 );
1217 node.source_name = table.to_string();
1218 node
1219}
1220
1221fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1222 let mut parts: Vec<String> = Vec::new();
1223 if let Some(catalog) = &table_ref.catalog {
1224 parts.push(catalog.name.clone());
1225 }
1226 if let Some(schema) = &table_ref.schema {
1227 parts.push(schema.name.clone());
1228 }
1229 parts.push(table_ref.name.name.clone());
1230 parts.join(".")
1231}
1232
1233fn make_table_column_node_from_source(
1234 table_alias: &str,
1235 column: &str,
1236 source: &Expression,
1237) -> LineageNode {
1238 let mut node = LineageNode::new(
1239 format!("{}.{}", table_alias, column),
1240 Expression::Column(Box::new(crate::expressions::Column {
1241 name: crate::expressions::Identifier::new(column.to_string()),
1242 table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
1243 join_mark: false,
1244 trailing_comments: vec![],
1245 span: None,
1246 inferred_type: None,
1247 })),
1248 source.clone(),
1249 );
1250
1251 if let Expression::Table(table_ref) = source {
1252 node.source_name = table_name_from_table_ref(table_ref);
1253 } else {
1254 node.source_name = table_alias.to_string();
1255 }
1256
1257 node
1258}
1259
1260#[derive(Debug, Clone)]
1262struct SimpleColumnRef {
1263 table: Option<crate::expressions::Identifier>,
1264 column: String,
1265}
1266
1267fn find_column_refs_in_expr(
1269 expr: &Expression,
1270 dialect: Option<DialectType>,
1271) -> Vec<SimpleColumnRef> {
1272 let mut refs = Vec::new();
1273 collect_column_refs(expr, dialect, &mut refs);
1274 refs
1275}
1276
1277fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1278 match expr {
1279 Expression::Column(col) => {
1280 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1281 }
1282 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1283 _ => false,
1284 }
1285}
1286
1287fn collect_column_refs(
1288 expr: &Expression,
1289 dialect: Option<DialectType>,
1290 refs: &mut Vec<SimpleColumnRef>,
1291) {
1292 let mut stack: Vec<&Expression> = vec![expr];
1293
1294 while let Some(current) = stack.pop() {
1295 match current {
1296 Expression::Column(col) => {
1298 refs.push(SimpleColumnRef {
1299 table: col.table.clone(),
1300 column: col.name.name.clone(),
1301 });
1302 }
1303
1304 Expression::Subquery(_) | Expression::Exists(_) => {}
1306
1307 Expression::And(op)
1309 | Expression::Or(op)
1310 | Expression::Eq(op)
1311 | Expression::Neq(op)
1312 | Expression::Lt(op)
1313 | Expression::Lte(op)
1314 | Expression::Gt(op)
1315 | Expression::Gte(op)
1316 | Expression::Add(op)
1317 | Expression::Sub(op)
1318 | Expression::Mul(op)
1319 | Expression::Div(op)
1320 | Expression::Mod(op)
1321 | Expression::BitwiseAnd(op)
1322 | Expression::BitwiseOr(op)
1323 | Expression::BitwiseXor(op)
1324 | Expression::BitwiseLeftShift(op)
1325 | Expression::BitwiseRightShift(op)
1326 | Expression::Concat(op)
1327 | Expression::Adjacent(op)
1328 | Expression::TsMatch(op)
1329 | Expression::PropertyEQ(op)
1330 | Expression::ArrayContainsAll(op)
1331 | Expression::ArrayContainedBy(op)
1332 | Expression::ArrayOverlaps(op)
1333 | Expression::JSONBContainsAllTopKeys(op)
1334 | Expression::JSONBContainsAnyTopKeys(op)
1335 | Expression::JSONBDeleteAtPath(op)
1336 | Expression::ExtendsLeft(op)
1337 | Expression::ExtendsRight(op)
1338 | Expression::Is(op)
1339 | Expression::MemberOf(op)
1340 | Expression::NullSafeEq(op)
1341 | Expression::NullSafeNeq(op)
1342 | Expression::Glob(op)
1343 | Expression::Match(op) => {
1344 stack.push(&op.left);
1345 stack.push(&op.right);
1346 }
1347
1348 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1350 stack.push(&u.this);
1351 }
1352
1353 Expression::Upper(f)
1355 | Expression::Lower(f)
1356 | Expression::Length(f)
1357 | Expression::LTrim(f)
1358 | Expression::RTrim(f)
1359 | Expression::Reverse(f)
1360 | Expression::Abs(f)
1361 | Expression::Sqrt(f)
1362 | Expression::Cbrt(f)
1363 | Expression::Ln(f)
1364 | Expression::Exp(f)
1365 | Expression::Sign(f)
1366 | Expression::Date(f)
1367 | Expression::Time(f)
1368 | Expression::DateFromUnixDate(f)
1369 | Expression::UnixDate(f)
1370 | Expression::UnixSeconds(f)
1371 | Expression::UnixMillis(f)
1372 | Expression::UnixMicros(f)
1373 | Expression::TimeStrToDate(f)
1374 | Expression::DateToDi(f)
1375 | Expression::DiToDate(f)
1376 | Expression::TsOrDiToDi(f)
1377 | Expression::TsOrDsToDatetime(f)
1378 | Expression::TsOrDsToTimestamp(f)
1379 | Expression::YearOfWeek(f)
1380 | Expression::YearOfWeekIso(f)
1381 | Expression::Initcap(f)
1382 | Expression::Ascii(f)
1383 | Expression::Chr(f)
1384 | Expression::Soundex(f)
1385 | Expression::ByteLength(f)
1386 | Expression::Hex(f)
1387 | Expression::LowerHex(f)
1388 | Expression::Unicode(f)
1389 | Expression::Radians(f)
1390 | Expression::Degrees(f)
1391 | Expression::Sin(f)
1392 | Expression::Cos(f)
1393 | Expression::Tan(f)
1394 | Expression::Asin(f)
1395 | Expression::Acos(f)
1396 | Expression::Atan(f)
1397 | Expression::IsNan(f)
1398 | Expression::IsInf(f)
1399 | Expression::ArrayLength(f)
1400 | Expression::ArraySize(f)
1401 | Expression::Cardinality(f)
1402 | Expression::ArrayReverse(f)
1403 | Expression::ArrayDistinct(f)
1404 | Expression::ArrayFlatten(f)
1405 | Expression::ArrayCompact(f)
1406 | Expression::Explode(f)
1407 | Expression::ExplodeOuter(f)
1408 | Expression::ToArray(f)
1409 | Expression::MapFromEntries(f)
1410 | Expression::MapKeys(f)
1411 | Expression::MapValues(f)
1412 | Expression::JsonArrayLength(f)
1413 | Expression::JsonKeys(f)
1414 | Expression::JsonType(f)
1415 | Expression::ParseJson(f)
1416 | Expression::ToJson(f)
1417 | Expression::Typeof(f)
1418 | Expression::BitwiseCount(f)
1419 | Expression::Year(f)
1420 | Expression::Month(f)
1421 | Expression::Day(f)
1422 | Expression::Hour(f)
1423 | Expression::Minute(f)
1424 | Expression::Second(f)
1425 | Expression::DayOfWeek(f)
1426 | Expression::DayOfWeekIso(f)
1427 | Expression::DayOfMonth(f)
1428 | Expression::DayOfYear(f)
1429 | Expression::WeekOfYear(f)
1430 | Expression::Quarter(f)
1431 | Expression::Epoch(f)
1432 | Expression::EpochMs(f)
1433 | Expression::TimeStrToUnix(f)
1434 | Expression::SHA(f)
1435 | Expression::SHA1Digest(f)
1436 | Expression::TimeToUnix(f)
1437 | Expression::JSONBool(f)
1438 | Expression::Int64(f)
1439 | Expression::MD5NumberLower64(f)
1440 | Expression::MD5NumberUpper64(f)
1441 | Expression::DateStrToDate(f)
1442 | Expression::DateToDateStr(f) => {
1443 stack.push(&f.this);
1444 }
1445
1446 Expression::Power(f)
1448 | Expression::NullIf(f)
1449 | Expression::IfNull(f)
1450 | Expression::Nvl(f)
1451 | Expression::UnixToTimeStr(f)
1452 | Expression::Contains(f)
1453 | Expression::StartsWith(f)
1454 | Expression::EndsWith(f)
1455 | Expression::Levenshtein(f)
1456 | Expression::ModFunc(f)
1457 | Expression::Atan2(f)
1458 | Expression::IntDiv(f)
1459 | Expression::AddMonths(f)
1460 | Expression::MonthsBetween(f)
1461 | Expression::NextDay(f)
1462 | Expression::ArrayContains(f)
1463 | Expression::ArrayPosition(f)
1464 | Expression::ArrayAppend(f)
1465 | Expression::ArrayPrepend(f)
1466 | Expression::ArrayUnion(f)
1467 | Expression::ArrayExcept(f)
1468 | Expression::ArrayRemove(f)
1469 | Expression::StarMap(f)
1470 | Expression::MapFromArrays(f)
1471 | Expression::MapContainsKey(f)
1472 | Expression::ElementAt(f)
1473 | Expression::JsonMergePatch(f)
1474 | Expression::JSONBContains(f)
1475 | Expression::JSONBExtract(f) => {
1476 stack.push(&f.this);
1477 stack.push(&f.expression);
1478 }
1479
1480 Expression::Greatest(f)
1482 | Expression::Least(f)
1483 | Expression::Coalesce(f)
1484 | Expression::ArrayConcat(f)
1485 | Expression::ArrayIntersect(f)
1486 | Expression::ArrayZip(f)
1487 | Expression::MapConcat(f)
1488 | Expression::JsonArray(f) => {
1489 for e in &f.expressions {
1490 stack.push(e);
1491 }
1492 }
1493
1494 Expression::Sum(f)
1496 | Expression::Avg(f)
1497 | Expression::Min(f)
1498 | Expression::Max(f)
1499 | Expression::ArrayAgg(f)
1500 | Expression::CountIf(f)
1501 | Expression::Stddev(f)
1502 | Expression::StddevPop(f)
1503 | Expression::StddevSamp(f)
1504 | Expression::Variance(f)
1505 | Expression::VarPop(f)
1506 | Expression::VarSamp(f)
1507 | Expression::Median(f)
1508 | Expression::Mode(f)
1509 | Expression::First(f)
1510 | Expression::Last(f)
1511 | Expression::AnyValue(f)
1512 | Expression::ApproxDistinct(f)
1513 | Expression::ApproxCountDistinct(f)
1514 | Expression::LogicalAnd(f)
1515 | Expression::LogicalOr(f)
1516 | Expression::Skewness(f)
1517 | Expression::ArrayConcatAgg(f)
1518 | Expression::ArrayUniqueAgg(f)
1519 | Expression::BoolXorAgg(f)
1520 | Expression::BitwiseAndAgg(f)
1521 | Expression::BitwiseOrAgg(f)
1522 | Expression::BitwiseXorAgg(f) => {
1523 stack.push(&f.this);
1524 if let Some(ref filter) = f.filter {
1525 stack.push(filter);
1526 }
1527 if let Some((ref expr, _)) = f.having_max {
1528 stack.push(expr);
1529 }
1530 if let Some(ref limit) = f.limit {
1531 stack.push(limit);
1532 }
1533 }
1534
1535 Expression::Function(func) => {
1537 for arg in &func.args {
1538 stack.push(arg);
1539 }
1540 }
1541 Expression::AggregateFunction(func) => {
1542 for arg in &func.args {
1543 stack.push(arg);
1544 }
1545 if let Some(ref filter) = func.filter {
1546 stack.push(filter);
1547 }
1548 if let Some(ref limit) = func.limit {
1549 stack.push(limit);
1550 }
1551 }
1552
1553 Expression::WindowFunction(wf) => {
1555 stack.push(&wf.this);
1556 }
1557
1558 Expression::Alias(a) => {
1560 stack.push(&a.this);
1561 }
1562 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1563 stack.push(&c.this);
1564 if let Some(ref fmt) = c.format {
1565 stack.push(fmt);
1566 }
1567 if let Some(ref def) = c.default {
1568 stack.push(def);
1569 }
1570 }
1571 Expression::Paren(p) => {
1572 stack.push(&p.this);
1573 }
1574 Expression::Annotated(a) => {
1575 stack.push(&a.this);
1576 }
1577 Expression::Case(case) => {
1578 if let Some(ref operand) = case.operand {
1579 stack.push(operand);
1580 }
1581 for (cond, result) in &case.whens {
1582 stack.push(cond);
1583 stack.push(result);
1584 }
1585 if let Some(ref else_expr) = case.else_ {
1586 stack.push(else_expr);
1587 }
1588 }
1589 Expression::Collation(c) => {
1590 stack.push(&c.this);
1591 }
1592 Expression::In(i) => {
1593 stack.push(&i.this);
1594 for e in &i.expressions {
1595 stack.push(e);
1596 }
1597 if let Some(ref q) = i.query {
1598 stack.push(q);
1599 }
1600 if let Some(ref u) = i.unnest {
1601 stack.push(u);
1602 }
1603 }
1604 Expression::Between(b) => {
1605 stack.push(&b.this);
1606 stack.push(&b.low);
1607 stack.push(&b.high);
1608 }
1609 Expression::IsNull(n) => {
1610 stack.push(&n.this);
1611 }
1612 Expression::IsTrue(t) | Expression::IsFalse(t) => {
1613 stack.push(&t.this);
1614 }
1615 Expression::IsJson(j) => {
1616 stack.push(&j.this);
1617 }
1618 Expression::Like(l) | Expression::ILike(l) => {
1619 stack.push(&l.left);
1620 stack.push(&l.right);
1621 if let Some(ref esc) = l.escape {
1622 stack.push(esc);
1623 }
1624 }
1625 Expression::SimilarTo(s) => {
1626 stack.push(&s.this);
1627 stack.push(&s.pattern);
1628 if let Some(ref esc) = s.escape {
1629 stack.push(esc);
1630 }
1631 }
1632 Expression::Ordered(o) => {
1633 stack.push(&o.this);
1634 }
1635 Expression::Array(a) => {
1636 for e in &a.expressions {
1637 stack.push(e);
1638 }
1639 }
1640 Expression::Tuple(t) => {
1641 for e in &t.expressions {
1642 stack.push(e);
1643 }
1644 }
1645 Expression::Struct(s) => {
1646 for (_, e) in &s.fields {
1647 stack.push(e);
1648 }
1649 }
1650 Expression::Subscript(s) => {
1651 stack.push(&s.this);
1652 stack.push(&s.index);
1653 }
1654 Expression::Dot(d) => {
1655 stack.push(&d.this);
1656 }
1657 Expression::MethodCall(m) => {
1658 if !matches!(dialect, Some(DialectType::BigQuery))
1659 || !is_bigquery_safe_namespace_receiver(&m.this)
1660 {
1661 stack.push(&m.this);
1662 }
1663 for arg in &m.args {
1664 stack.push(arg);
1665 }
1666 }
1667 Expression::ArraySlice(s) => {
1668 stack.push(&s.this);
1669 if let Some(ref start) = s.start {
1670 stack.push(start);
1671 }
1672 if let Some(ref end) = s.end {
1673 stack.push(end);
1674 }
1675 }
1676 Expression::Lambda(l) => {
1677 stack.push(&l.body);
1678 }
1679 Expression::NamedArgument(n) => {
1680 stack.push(&n.value);
1681 }
1682 Expression::TryCatch(t) => {
1683 for stmt in &t.try_body {
1684 stack.push(stmt);
1685 }
1686 if let Some(catch_body) = &t.catch_body {
1687 for stmt in catch_body {
1688 stack.push(stmt);
1689 }
1690 }
1691 }
1692 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1693 stack.push(e);
1694 }
1695
1696 Expression::Substring(f) => {
1698 stack.push(&f.this);
1699 stack.push(&f.start);
1700 if let Some(ref len) = f.length {
1701 stack.push(len);
1702 }
1703 }
1704 Expression::Trim(f) => {
1705 stack.push(&f.this);
1706 if let Some(ref chars) = f.characters {
1707 stack.push(chars);
1708 }
1709 }
1710 Expression::Replace(f) => {
1711 stack.push(&f.this);
1712 stack.push(&f.old);
1713 stack.push(&f.new);
1714 }
1715 Expression::IfFunc(f) => {
1716 stack.push(&f.condition);
1717 stack.push(&f.true_value);
1718 if let Some(ref fv) = f.false_value {
1719 stack.push(fv);
1720 }
1721 }
1722 Expression::Nvl2(f) => {
1723 stack.push(&f.this);
1724 stack.push(&f.true_value);
1725 stack.push(&f.false_value);
1726 }
1727 Expression::ConcatWs(f) => {
1728 stack.push(&f.separator);
1729 for e in &f.expressions {
1730 stack.push(e);
1731 }
1732 }
1733 Expression::Count(f) => {
1734 if let Some(ref this) = f.this {
1735 stack.push(this);
1736 }
1737 if let Some(ref filter) = f.filter {
1738 stack.push(filter);
1739 }
1740 }
1741 Expression::GroupConcat(f) => {
1742 stack.push(&f.this);
1743 if let Some(ref sep) = f.separator {
1744 stack.push(sep);
1745 }
1746 if let Some(ref filter) = f.filter {
1747 stack.push(filter);
1748 }
1749 }
1750 Expression::StringAgg(f) => {
1751 stack.push(&f.this);
1752 if let Some(ref sep) = f.separator {
1753 stack.push(sep);
1754 }
1755 if let Some(ref filter) = f.filter {
1756 stack.push(filter);
1757 }
1758 if let Some(ref limit) = f.limit {
1759 stack.push(limit);
1760 }
1761 }
1762 Expression::ListAgg(f) => {
1763 stack.push(&f.this);
1764 if let Some(ref sep) = f.separator {
1765 stack.push(sep);
1766 }
1767 if let Some(ref filter) = f.filter {
1768 stack.push(filter);
1769 }
1770 }
1771 Expression::SumIf(f) => {
1772 stack.push(&f.this);
1773 stack.push(&f.condition);
1774 if let Some(ref filter) = f.filter {
1775 stack.push(filter);
1776 }
1777 }
1778 Expression::DateAdd(f) | Expression::DateSub(f) => {
1779 stack.push(&f.this);
1780 stack.push(&f.interval);
1781 }
1782 Expression::DateDiff(f) => {
1783 stack.push(&f.this);
1784 stack.push(&f.expression);
1785 }
1786 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1787 stack.push(&f.this);
1788 }
1789 Expression::Extract(f) => {
1790 stack.push(&f.this);
1791 }
1792 Expression::Round(f) => {
1793 stack.push(&f.this);
1794 if let Some(ref d) = f.decimals {
1795 stack.push(d);
1796 }
1797 }
1798 Expression::Floor(f) => {
1799 stack.push(&f.this);
1800 if let Some(ref s) = f.scale {
1801 stack.push(s);
1802 }
1803 if let Some(ref t) = f.to {
1804 stack.push(t);
1805 }
1806 }
1807 Expression::Ceil(f) => {
1808 stack.push(&f.this);
1809 if let Some(ref d) = f.decimals {
1810 stack.push(d);
1811 }
1812 if let Some(ref t) = f.to {
1813 stack.push(t);
1814 }
1815 }
1816 Expression::Log(f) => {
1817 stack.push(&f.this);
1818 if let Some(ref b) = f.base {
1819 stack.push(b);
1820 }
1821 }
1822 Expression::AtTimeZone(f) => {
1823 stack.push(&f.this);
1824 stack.push(&f.zone);
1825 }
1826 Expression::Lead(f) | Expression::Lag(f) => {
1827 stack.push(&f.this);
1828 if let Some(ref off) = f.offset {
1829 stack.push(off);
1830 }
1831 if let Some(ref def) = f.default {
1832 stack.push(def);
1833 }
1834 }
1835 Expression::FirstValue(f) | Expression::LastValue(f) => {
1836 stack.push(&f.this);
1837 }
1838 Expression::NthValue(f) => {
1839 stack.push(&f.this);
1840 stack.push(&f.offset);
1841 }
1842 Expression::Position(f) => {
1843 stack.push(&f.substring);
1844 stack.push(&f.string);
1845 if let Some(ref start) = f.start {
1846 stack.push(start);
1847 }
1848 }
1849 Expression::Decode(f) => {
1850 stack.push(&f.this);
1851 for (search, result) in &f.search_results {
1852 stack.push(search);
1853 stack.push(result);
1854 }
1855 if let Some(ref def) = f.default {
1856 stack.push(def);
1857 }
1858 }
1859 Expression::CharFunc(f) => {
1860 for arg in &f.args {
1861 stack.push(arg);
1862 }
1863 }
1864 Expression::ArraySort(f) => {
1865 stack.push(&f.this);
1866 if let Some(ref cmp) = f.comparator {
1867 stack.push(cmp);
1868 }
1869 }
1870 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1871 stack.push(&f.this);
1872 stack.push(&f.separator);
1873 if let Some(ref nr) = f.null_replacement {
1874 stack.push(nr);
1875 }
1876 }
1877 Expression::ArrayFilter(f) => {
1878 stack.push(&f.this);
1879 stack.push(&f.filter);
1880 }
1881 Expression::ArrayTransform(f) => {
1882 stack.push(&f.this);
1883 stack.push(&f.transform);
1884 }
1885 Expression::Sequence(f)
1886 | Expression::Generate(f)
1887 | Expression::ExplodingGenerateSeries(f) => {
1888 stack.push(&f.start);
1889 stack.push(&f.stop);
1890 if let Some(ref step) = f.step {
1891 stack.push(step);
1892 }
1893 }
1894 Expression::JsonExtract(f)
1895 | Expression::JsonExtractScalar(f)
1896 | Expression::JsonQuery(f)
1897 | Expression::JsonValue(f) => {
1898 stack.push(&f.this);
1899 stack.push(&f.path);
1900 }
1901 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1902 stack.push(&f.this);
1903 for p in &f.paths {
1904 stack.push(p);
1905 }
1906 }
1907 Expression::JsonObject(f) => {
1908 for (k, v) in &f.pairs {
1909 stack.push(k);
1910 stack.push(v);
1911 }
1912 }
1913 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1914 stack.push(&f.this);
1915 for (path, val) in &f.path_values {
1916 stack.push(path);
1917 stack.push(val);
1918 }
1919 }
1920 Expression::Overlay(f) => {
1921 stack.push(&f.this);
1922 stack.push(&f.replacement);
1923 stack.push(&f.from);
1924 if let Some(ref len) = f.length {
1925 stack.push(len);
1926 }
1927 }
1928 Expression::Convert(f) => {
1929 stack.push(&f.this);
1930 if let Some(ref style) = f.style {
1931 stack.push(style);
1932 }
1933 }
1934 Expression::ApproxPercentile(f) => {
1935 stack.push(&f.this);
1936 stack.push(&f.percentile);
1937 if let Some(ref acc) = f.accuracy {
1938 stack.push(acc);
1939 }
1940 if let Some(ref filter) = f.filter {
1941 stack.push(filter);
1942 }
1943 }
1944 Expression::Percentile(f)
1945 | Expression::PercentileCont(f)
1946 | Expression::PercentileDisc(f) => {
1947 stack.push(&f.this);
1948 stack.push(&f.percentile);
1949 if let Some(ref filter) = f.filter {
1950 stack.push(filter);
1951 }
1952 }
1953 Expression::WithinGroup(f) => {
1954 stack.push(&f.this);
1955 }
1956 Expression::Left(f) | Expression::Right(f) => {
1957 stack.push(&f.this);
1958 stack.push(&f.length);
1959 }
1960 Expression::Repeat(f) => {
1961 stack.push(&f.this);
1962 stack.push(&f.times);
1963 }
1964 Expression::Lpad(f) | Expression::Rpad(f) => {
1965 stack.push(&f.this);
1966 stack.push(&f.length);
1967 if let Some(ref fill) = f.fill {
1968 stack.push(fill);
1969 }
1970 }
1971 Expression::Split(f) => {
1972 stack.push(&f.this);
1973 stack.push(&f.delimiter);
1974 }
1975 Expression::RegexpLike(f) => {
1976 stack.push(&f.this);
1977 stack.push(&f.pattern);
1978 if let Some(ref flags) = f.flags {
1979 stack.push(flags);
1980 }
1981 }
1982 Expression::RegexpReplace(f) => {
1983 stack.push(&f.this);
1984 stack.push(&f.pattern);
1985 stack.push(&f.replacement);
1986 if let Some(ref flags) = f.flags {
1987 stack.push(flags);
1988 }
1989 }
1990 Expression::RegexpExtract(f) => {
1991 stack.push(&f.this);
1992 stack.push(&f.pattern);
1993 if let Some(ref group) = f.group {
1994 stack.push(group);
1995 }
1996 }
1997 Expression::ToDate(f) => {
1998 stack.push(&f.this);
1999 if let Some(ref fmt) = f.format {
2000 stack.push(fmt);
2001 }
2002 }
2003 Expression::ToTimestamp(f) => {
2004 stack.push(&f.this);
2005 if let Some(ref fmt) = f.format {
2006 stack.push(fmt);
2007 }
2008 }
2009 Expression::DateFormat(f) | Expression::FormatDate(f) => {
2010 stack.push(&f.this);
2011 stack.push(&f.format);
2012 }
2013 Expression::LastDay(f) => {
2014 stack.push(&f.this);
2015 }
2016 Expression::FromUnixtime(f) => {
2017 stack.push(&f.this);
2018 if let Some(ref fmt) = f.format {
2019 stack.push(fmt);
2020 }
2021 }
2022 Expression::UnixTimestamp(f) => {
2023 if let Some(ref this) = f.this {
2024 stack.push(this);
2025 }
2026 if let Some(ref fmt) = f.format {
2027 stack.push(fmt);
2028 }
2029 }
2030 Expression::MakeDate(f) => {
2031 stack.push(&f.year);
2032 stack.push(&f.month);
2033 stack.push(&f.day);
2034 }
2035 Expression::MakeTimestamp(f) => {
2036 stack.push(&f.year);
2037 stack.push(&f.month);
2038 stack.push(&f.day);
2039 stack.push(&f.hour);
2040 stack.push(&f.minute);
2041 stack.push(&f.second);
2042 if let Some(ref tz) = f.timezone {
2043 stack.push(tz);
2044 }
2045 }
2046 Expression::TruncFunc(f) => {
2047 stack.push(&f.this);
2048 if let Some(ref d) = f.decimals {
2049 stack.push(d);
2050 }
2051 }
2052 Expression::ArrayFunc(f) => {
2053 for e in &f.expressions {
2054 stack.push(e);
2055 }
2056 }
2057 Expression::Unnest(f) => {
2058 stack.push(&f.this);
2059 for e in &f.expressions {
2060 stack.push(e);
2061 }
2062 }
2063 Expression::StructFunc(f) => {
2064 for (_, e) in &f.fields {
2065 stack.push(e);
2066 }
2067 }
2068 Expression::StructExtract(f) => {
2069 stack.push(&f.this);
2070 }
2071 Expression::NamedStruct(f) => {
2072 for (k, v) in &f.pairs {
2073 stack.push(k);
2074 stack.push(v);
2075 }
2076 }
2077 Expression::MapFunc(f) => {
2078 for k in &f.keys {
2079 stack.push(k);
2080 }
2081 for v in &f.values {
2082 stack.push(v);
2083 }
2084 }
2085 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2086 stack.push(&f.this);
2087 stack.push(&f.transform);
2088 }
2089 Expression::JsonArrayAgg(f) => {
2090 stack.push(&f.this);
2091 if let Some(ref filter) = f.filter {
2092 stack.push(filter);
2093 }
2094 }
2095 Expression::JsonObjectAgg(f) => {
2096 stack.push(&f.key);
2097 stack.push(&f.value);
2098 if let Some(ref filter) = f.filter {
2099 stack.push(filter);
2100 }
2101 }
2102 Expression::NTile(f) => {
2103 if let Some(ref n) = f.num_buckets {
2104 stack.push(n);
2105 }
2106 }
2107 Expression::Rand(f) => {
2108 if let Some(ref s) = f.seed {
2109 stack.push(s);
2110 }
2111 if let Some(ref lo) = f.lower {
2112 stack.push(lo);
2113 }
2114 if let Some(ref hi) = f.upper {
2115 stack.push(hi);
2116 }
2117 }
2118 Expression::Any(q) | Expression::All(q) => {
2119 stack.push(&q.this);
2120 stack.push(&q.subquery);
2121 }
2122 Expression::Overlaps(o) => {
2123 if let Some(ref this) = o.this {
2124 stack.push(this);
2125 }
2126 if let Some(ref expr) = o.expression {
2127 stack.push(expr);
2128 }
2129 if let Some(ref ls) = o.left_start {
2130 stack.push(ls);
2131 }
2132 if let Some(ref le) = o.left_end {
2133 stack.push(le);
2134 }
2135 if let Some(ref rs) = o.right_start {
2136 stack.push(rs);
2137 }
2138 if let Some(ref re) = o.right_end {
2139 stack.push(re);
2140 }
2141 }
2142 Expression::Interval(i) => {
2143 if let Some(ref this) = i.this {
2144 stack.push(this);
2145 }
2146 }
2147 Expression::TimeStrToTime(f) => {
2148 stack.push(&f.this);
2149 if let Some(ref zone) = f.zone {
2150 stack.push(zone);
2151 }
2152 }
2153 Expression::JSONBExtractScalar(f) => {
2154 stack.push(&f.this);
2155 stack.push(&f.expression);
2156 if let Some(ref jt) = f.json_type {
2157 stack.push(jt);
2158 }
2159 }
2160
2161 _ => {}
2166 }
2167 }
2168}
2169
2170#[cfg(test)]
2175mod tests {
2176 use super::*;
2177 use crate::dialects::{Dialect, DialectType};
2178 use crate::expressions::DataType;
2179 use crate::optimizer::annotate_types::annotate_types;
2180 use crate::parse_one;
2181 use crate::schema::{MappingSchema, Schema};
2182
2183 fn parse(sql: &str) -> Expression {
2184 let dialect = Dialect::get(DialectType::Generic);
2185 let ast = dialect.parse(sql).unwrap();
2186 ast.into_iter().next().unwrap()
2187 }
2188
2189 #[test]
2190 fn test_simple_lineage() {
2191 let expr = parse("SELECT a FROM t");
2192 let node = lineage("a", &expr, None, false).unwrap();
2193
2194 assert_eq!(node.name, "a");
2195 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2196 let names = node.downstream_names();
2198 assert!(
2199 names.iter().any(|n| n == "t.a"),
2200 "Expected t.a in downstream, got: {:?}",
2201 names
2202 );
2203 }
2204
2205 #[test]
2206 fn test_lineage_walk() {
2207 let root = LineageNode {
2208 name: "col_a".to_string(),
2209 expression: Expression::Null(crate::expressions::Null),
2210 source: Expression::Null(crate::expressions::Null),
2211 downstream: vec![LineageNode::new(
2212 "t.a",
2213 Expression::Null(crate::expressions::Null),
2214 Expression::Null(crate::expressions::Null),
2215 )],
2216 source_name: String::new(),
2217 reference_node_name: String::new(),
2218 };
2219
2220 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2221 assert_eq!(names.len(), 2);
2222 assert_eq!(names[0], "col_a");
2223 assert_eq!(names[1], "t.a");
2224 }
2225
2226 #[test]
2227 fn test_aliased_column() {
2228 let expr = parse("SELECT a + 1 AS b FROM t");
2229 let node = lineage("b", &expr, None, false).unwrap();
2230
2231 assert_eq!(node.name, "b");
2232 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2234 assert!(
2235 all_names.iter().any(|n| n.contains("a")),
2236 "Expected to trace to column a, got: {:?}",
2237 all_names
2238 );
2239 }
2240
2241 #[test]
2242 fn test_qualified_column() {
2243 let expr = parse("SELECT t.a FROM t");
2244 let node = lineage("a", &expr, None, false).unwrap();
2245
2246 assert_eq!(node.name, "a");
2247 let names = node.downstream_names();
2248 assert!(
2249 names.iter().any(|n| n == "t.a"),
2250 "Expected t.a, got: {:?}",
2251 names
2252 );
2253 }
2254
2255 #[test]
2256 fn test_unqualified_column() {
2257 let expr = parse("SELECT a FROM t");
2258 let node = lineage("a", &expr, None, false).unwrap();
2259
2260 let names = node.downstream_names();
2262 assert!(
2263 names.iter().any(|n| n == "t.a"),
2264 "Expected t.a, got: {:?}",
2265 names
2266 );
2267 }
2268
2269 #[test]
2270 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2271 let query = "SELECT name FROM users";
2272 let dialect = Dialect::get(DialectType::BigQuery);
2273 let expr = dialect
2274 .parse(query)
2275 .unwrap()
2276 .into_iter()
2277 .next()
2278 .expect("expected one expression");
2279
2280 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2281 schema
2282 .add_table("users", &[("name".into(), DataType::Text)], None)
2283 .expect("schema setup");
2284
2285 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2286 .expect("lineage without schema");
2287 let mut expr_without = node_without_schema.expression.clone();
2288 annotate_types(
2289 &mut expr_without,
2290 Some(&schema),
2291 Some(DialectType::BigQuery),
2292 );
2293 assert_eq!(
2294 expr_without.inferred_type(),
2295 None,
2296 "Expected unresolved root type without schema-aware lineage qualification"
2297 );
2298
2299 let node_with_schema = lineage_with_schema(
2300 "name",
2301 &expr,
2302 Some(&schema),
2303 Some(DialectType::BigQuery),
2304 false,
2305 )
2306 .expect("lineage with schema");
2307 let mut expr_with = node_with_schema.expression.clone();
2308 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2309
2310 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2311 }
2312
2313 #[test]
2314 fn test_lineage_with_schema_correlated_scalar_subquery() {
2315 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2316 let dialect = Dialect::get(DialectType::BigQuery);
2317 let expr = dialect
2318 .parse(query)
2319 .unwrap()
2320 .into_iter()
2321 .next()
2322 .expect("expected one expression");
2323
2324 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2325 schema
2326 .add_table(
2327 "t1",
2328 &[("id".into(), DataType::BigInt { length: None })],
2329 None,
2330 )
2331 .expect("schema setup");
2332 schema
2333 .add_table(
2334 "t2",
2335 &[
2336 ("id".into(), DataType::BigInt { length: None }),
2337 ("val".into(), DataType::BigInt { length: None }),
2338 ],
2339 None,
2340 )
2341 .expect("schema setup");
2342
2343 let node = lineage_with_schema(
2344 "id",
2345 &expr,
2346 Some(&schema),
2347 Some(DialectType::BigQuery),
2348 false,
2349 )
2350 .expect("lineage_with_schema should handle correlated scalar subqueries");
2351
2352 assert_eq!(node.name, "id");
2353 }
2354
2355 #[test]
2356 fn test_lineage_with_schema_join_using() {
2357 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2358 let dialect = Dialect::get(DialectType::BigQuery);
2359 let expr = dialect
2360 .parse(query)
2361 .unwrap()
2362 .into_iter()
2363 .next()
2364 .expect("expected one expression");
2365
2366 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2367 schema
2368 .add_table(
2369 "t1",
2370 &[("a".into(), DataType::BigInt { length: None })],
2371 None,
2372 )
2373 .expect("schema setup");
2374 schema
2375 .add_table(
2376 "t2",
2377 &[("a".into(), DataType::BigInt { length: None })],
2378 None,
2379 )
2380 .expect("schema setup");
2381
2382 let node = lineage_with_schema(
2383 "a",
2384 &expr,
2385 Some(&schema),
2386 Some(DialectType::BigQuery),
2387 false,
2388 )
2389 .expect("lineage_with_schema should handle JOIN USING");
2390
2391 assert_eq!(node.name, "a");
2392 }
2393
2394 #[test]
2395 fn test_lineage_with_schema_qualified_table_name() {
2396 let query = "SELECT a FROM raw.t1";
2397 let dialect = Dialect::get(DialectType::BigQuery);
2398 let expr = dialect
2399 .parse(query)
2400 .unwrap()
2401 .into_iter()
2402 .next()
2403 .expect("expected one expression");
2404
2405 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2406 schema
2407 .add_table(
2408 "raw.t1",
2409 &[("a".into(), DataType::BigInt { length: None })],
2410 None,
2411 )
2412 .expect("schema setup");
2413
2414 let node = lineage_with_schema(
2415 "a",
2416 &expr,
2417 Some(&schema),
2418 Some(DialectType::BigQuery),
2419 false,
2420 )
2421 .expect("lineage_with_schema should handle dotted schema.table names");
2422
2423 assert_eq!(node.name, "a");
2424 }
2425
2426 #[test]
2427 fn test_lineage_with_schema_none_matches_lineage() {
2428 let expr = parse("SELECT a FROM t");
2429 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2430 let with_none =
2431 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2432
2433 assert_eq!(with_none.name, baseline.name);
2434 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2435 }
2436
2437 #[test]
2438 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2439 let dialect = Dialect::get(DialectType::BigQuery);
2440 let expr = dialect
2441 .parse("SELECT Name AS name FROM teams")
2442 .unwrap()
2443 .into_iter()
2444 .next()
2445 .expect("expected one expression");
2446
2447 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2448 schema
2449 .add_table(
2450 "teams",
2451 &[("Name".into(), DataType::String { length: None })],
2452 None,
2453 )
2454 .expect("schema setup");
2455
2456 let node = lineage_with_schema(
2457 "name",
2458 &expr,
2459 Some(&schema),
2460 Some(DialectType::BigQuery),
2461 false,
2462 )
2463 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2464
2465 let names = node.downstream_names();
2466 assert!(
2467 names.iter().any(|n| n == "teams.Name"),
2468 "Expected teams.Name in downstream, got: {:?}",
2469 names
2470 );
2471 }
2472
2473 #[test]
2474 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2475 let dialect = Dialect::get(DialectType::BigQuery);
2476 let expr = dialect
2477 .parse("SELECT Name AS Name FROM teams")
2478 .unwrap()
2479 .into_iter()
2480 .next()
2481 .expect("expected one expression");
2482
2483 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2484 .expect("lineage should resolve mixed-case aliases in BigQuery");
2485
2486 assert_eq!(node.name, "name");
2487 }
2488
2489 #[test]
2490 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2491 let expr = parse_one(
2492 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2493 DialectType::Snowflake,
2494 )
2495 .expect("parse");
2496
2497 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2498 schema
2499 .add_table(
2500 "fact.some_daily_metrics",
2501 &[("date_utc".to_string(), DataType::Date)],
2502 None,
2503 )
2504 .expect("schema setup");
2505
2506 let node = lineage_with_schema(
2507 "recency",
2508 &expr,
2509 Some(&schema),
2510 Some(DialectType::Snowflake),
2511 false,
2512 )
2513 .expect("lineage_with_schema should not treat date part as a column");
2514
2515 let names = node.downstream_names();
2516 assert!(
2517 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2518 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2519 names
2520 );
2521 assert!(
2522 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2523 "Did not expect date part to appear as lineage column, got: {:?}",
2524 names
2525 );
2526 }
2527
2528 #[test]
2529 fn test_snowflake_datediff_parses_to_typed_ast() {
2530 let expr = parse_one(
2531 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2532 DialectType::Snowflake,
2533 )
2534 .expect("parse");
2535
2536 match expr {
2537 Expression::Select(select) => match &select.expressions[0] {
2538 Expression::Alias(alias) => match &alias.this {
2539 Expression::DateDiff(f) => {
2540 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2541 }
2542 other => panic!("expected DateDiff, got {other:?}"),
2543 },
2544 other => panic!("expected Alias, got {other:?}"),
2545 },
2546 other => panic!("expected Select, got {other:?}"),
2547 }
2548 }
2549
2550 #[test]
2551 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2552 let expr = parse_one(
2553 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2554 DialectType::Snowflake,
2555 )
2556 .expect("parse");
2557
2558 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2559 schema
2560 .add_table(
2561 "fact.some_daily_metrics",
2562 &[("date_utc".to_string(), DataType::Date)],
2563 None,
2564 )
2565 .expect("schema setup");
2566
2567 let node = lineage_with_schema(
2568 "next_day",
2569 &expr,
2570 Some(&schema),
2571 Some(DialectType::Snowflake),
2572 false,
2573 )
2574 .expect("lineage_with_schema should not treat DATEADD date part as a column");
2575
2576 let names = node.downstream_names();
2577 assert!(
2578 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2579 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2580 names
2581 );
2582 assert!(
2583 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2584 "Did not expect date part to appear as lineage column, got: {:?}",
2585 names
2586 );
2587 }
2588
2589 #[test]
2590 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2591 let expr = parse_one(
2592 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2593 DialectType::Snowflake,
2594 )
2595 .expect("parse");
2596
2597 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2598 schema
2599 .add_table(
2600 "fact.some_daily_metrics",
2601 &[("date_utc".to_string(), DataType::Date)],
2602 None,
2603 )
2604 .expect("schema setup");
2605
2606 let node = lineage_with_schema(
2607 "day_part",
2608 &expr,
2609 Some(&schema),
2610 Some(DialectType::Snowflake),
2611 false,
2612 )
2613 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2614
2615 let names = node.downstream_names();
2616 assert!(
2617 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2618 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2619 names
2620 );
2621 assert!(
2622 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2623 "Did not expect date part to appear as lineage column, got: {:?}",
2624 names
2625 );
2626 }
2627
2628 #[test]
2629 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2630 let expr = parse_one(
2631 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2632 DialectType::Snowflake,
2633 )
2634 .expect("parse");
2635
2636 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2637 schema
2638 .add_table(
2639 "fact.some_daily_metrics",
2640 &[("date_utc".to_string(), DataType::Date)],
2641 None,
2642 )
2643 .expect("schema setup");
2644
2645 let node = lineage_with_schema(
2646 "day_part",
2647 &expr,
2648 Some(&schema),
2649 Some(DialectType::Snowflake),
2650 false,
2651 )
2652 .expect("quoted DATE_PART should continue to work");
2653
2654 let names = node.downstream_names();
2655 assert!(
2656 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2657 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2658 names
2659 );
2660 }
2661
2662 #[test]
2663 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2664 let expr = parse_one(
2665 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2666 DialectType::Snowflake,
2667 )
2668 .expect("parse");
2669
2670 match expr {
2671 Expression::Select(select) => match &select.expressions[0] {
2672 Expression::Alias(alias) => match &alias.this {
2673 Expression::Function(f) => {
2674 assert_eq!(f.name.to_uppercase(), "DATEADD");
2675 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2676 }
2677 other => panic!("expected generic DATEADD function, got {other:?}"),
2678 },
2679 other => panic!("expected Alias, got {other:?}"),
2680 },
2681 other => panic!("expected Select, got {other:?}"),
2682 }
2683 }
2684
2685 #[test]
2686 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2687 let expr = parse_one(
2688 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2689 DialectType::Snowflake,
2690 )
2691 .expect("parse");
2692
2693 match expr {
2694 Expression::Select(select) => match &select.expressions[0] {
2695 Expression::Alias(alias) => match &alias.this {
2696 Expression::Function(f) => {
2697 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2698 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2699 }
2700 other => panic!("expected generic DATE_PART function, got {other:?}"),
2701 },
2702 other => panic!("expected Alias, got {other:?}"),
2703 },
2704 other => panic!("expected Select, got {other:?}"),
2705 }
2706 }
2707
2708 #[test]
2709 fn test_snowflake_date_part_string_literal_stays_generic_function() {
2710 let expr = parse_one(
2711 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2712 DialectType::Snowflake,
2713 )
2714 .expect("parse");
2715
2716 match expr {
2717 Expression::Select(select) => match &select.expressions[0] {
2718 Expression::Alias(alias) => match &alias.this {
2719 Expression::Function(f) => {
2720 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2721 }
2722 other => panic!("expected generic DATE_PART function, got {other:?}"),
2723 },
2724 other => panic!("expected Alias, got {other:?}"),
2725 },
2726 other => panic!("expected Select, got {other:?}"),
2727 }
2728 }
2729
2730 #[test]
2731 fn test_lineage_join() {
2732 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2733
2734 let node_a = lineage("a", &expr, None, false).unwrap();
2735 let names_a = node_a.downstream_names();
2736 assert!(
2737 names_a.iter().any(|n| n == "t.a"),
2738 "Expected t.a, got: {:?}",
2739 names_a
2740 );
2741
2742 let node_b = lineage("b", &expr, None, false).unwrap();
2743 let names_b = node_b.downstream_names();
2744 assert!(
2745 names_b.iter().any(|n| n == "s.b"),
2746 "Expected s.b, got: {:?}",
2747 names_b
2748 );
2749 }
2750
2751 #[test]
2752 fn test_lineage_alias_leaf_has_resolved_source_name() {
2753 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2754 let node = lineage("col1", &expr, None, false).unwrap();
2755
2756 let names = node.downstream_names();
2758 assert!(
2759 names.iter().any(|n| n == "t1.col1"),
2760 "Expected aliased column edge t1.col1, got: {:?}",
2761 names
2762 );
2763
2764 let leaf = node
2766 .downstream
2767 .iter()
2768 .find(|n| n.name == "t1.col1")
2769 .expect("Expected t1.col1 leaf");
2770 assert_eq!(leaf.source_name, "table1");
2771 match &leaf.source {
2772 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2773 _ => panic!("Expected leaf source to be a table expression"),
2774 }
2775 }
2776
2777 #[test]
2778 fn test_lineage_derived_table() {
2779 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2780 let node = lineage("a", &expr, None, false).unwrap();
2781
2782 assert_eq!(node.name, "a");
2783 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2785 assert!(
2786 all_names.iter().any(|n| n == "t.a"),
2787 "Expected to trace through derived table to t.a, got: {:?}",
2788 all_names
2789 );
2790 }
2791
2792 #[test]
2793 fn test_lineage_cte() {
2794 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2795 let node = lineage("a", &expr, None, false).unwrap();
2796
2797 assert_eq!(node.name, "a");
2798 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2799 assert!(
2800 all_names.iter().any(|n| n == "t.a"),
2801 "Expected to trace through CTE to t.a, got: {:?}",
2802 all_names
2803 );
2804 }
2805
2806 #[test]
2807 fn test_lineage_union() {
2808 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2809 let node = lineage("a", &expr, None, false).unwrap();
2810
2811 assert_eq!(node.name, "a");
2812 assert_eq!(
2814 node.downstream.len(),
2815 2,
2816 "Expected 2 branches for UNION, got {}",
2817 node.downstream.len()
2818 );
2819 }
2820
2821 #[test]
2822 fn test_lineage_cte_union() {
2823 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2824 let node = lineage("a", &expr, None, false).unwrap();
2825
2826 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2828 assert!(
2829 all_names.len() >= 3,
2830 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2831 all_names
2832 );
2833 }
2834
2835 #[test]
2836 fn test_lineage_star() {
2837 let expr = parse("SELECT * FROM t");
2838 let node = lineage("*", &expr, None, false).unwrap();
2839
2840 assert_eq!(node.name, "*");
2841 assert!(
2843 !node.downstream.is_empty(),
2844 "Star should produce downstream nodes"
2845 );
2846 }
2847
2848 #[test]
2849 fn test_lineage_subquery_in_select() {
2850 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2851 let node = lineage("x", &expr, None, false).unwrap();
2852
2853 assert_eq!(node.name, "x");
2854 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2856 assert!(
2857 all_names.len() >= 2,
2858 "Expected tracing into scalar subquery, got: {:?}",
2859 all_names
2860 );
2861 }
2862
2863 #[test]
2864 fn test_lineage_multiple_columns() {
2865 let expr = parse("SELECT a, b FROM t");
2866
2867 let node_a = lineage("a", &expr, None, false).unwrap();
2868 let node_b = lineage("b", &expr, None, false).unwrap();
2869
2870 assert_eq!(node_a.name, "a");
2871 assert_eq!(node_b.name, "b");
2872
2873 let names_a = node_a.downstream_names();
2875 let names_b = node_b.downstream_names();
2876 assert!(names_a.iter().any(|n| n == "t.a"));
2877 assert!(names_b.iter().any(|n| n == "t.b"));
2878 }
2879
2880 #[test]
2881 fn test_get_source_tables() {
2882 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2883 let node = lineage("a", &expr, None, false).unwrap();
2884
2885 let tables = get_source_tables(&node);
2886 assert!(
2887 tables.contains("t"),
2888 "Expected source table 't', got: {:?}",
2889 tables
2890 );
2891 }
2892
2893 #[test]
2894 fn test_lineage_column_not_found() {
2895 let expr = parse("SELECT a FROM t");
2896 let result = lineage("nonexistent", &expr, None, false);
2897 assert!(result.is_err());
2898 }
2899
2900 #[test]
2901 fn test_lineage_nested_cte() {
2902 let expr = parse(
2903 "WITH cte1 AS (SELECT a FROM t), \
2904 cte2 AS (SELECT a FROM cte1) \
2905 SELECT a FROM cte2",
2906 );
2907 let node = lineage("a", &expr, None, false).unwrap();
2908
2909 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2911 assert!(
2912 all_names.len() >= 3,
2913 "Expected to trace through nested CTEs, got: {:?}",
2914 all_names
2915 );
2916 }
2917
2918 #[test]
2919 fn test_trim_selects_true() {
2920 let expr = parse("SELECT a, b, c FROM t");
2921 let node = lineage("a", &expr, None, true).unwrap();
2922
2923 if let Expression::Select(select) = &node.source {
2925 assert_eq!(
2926 select.expressions.len(),
2927 1,
2928 "Trimmed source should have 1 expression, got {}",
2929 select.expressions.len()
2930 );
2931 } else {
2932 panic!("Expected Select source");
2933 }
2934 }
2935
2936 #[test]
2937 fn test_trim_selects_false() {
2938 let expr = parse("SELECT a, b, c FROM t");
2939 let node = lineage("a", &expr, None, false).unwrap();
2940
2941 if let Expression::Select(select) = &node.source {
2943 assert_eq!(
2944 select.expressions.len(),
2945 3,
2946 "Untrimmed source should have 3 expressions"
2947 );
2948 } else {
2949 panic!("Expected Select source");
2950 }
2951 }
2952
2953 #[test]
2954 fn test_lineage_expression_in_select() {
2955 let expr = parse("SELECT a + b AS c FROM t");
2956 let node = lineage("c", &expr, None, false).unwrap();
2957
2958 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2960 assert!(
2961 all_names.len() >= 3,
2962 "Expected to trace a + b to both columns, got: {:?}",
2963 all_names
2964 );
2965 }
2966
2967 #[test]
2968 fn test_set_operation_by_index() {
2969 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
2970
2971 let node = lineage("a", &expr, None, false).unwrap();
2973
2974 assert_eq!(node.downstream.len(), 2);
2976 }
2977
2978 fn print_node(node: &LineageNode, indent: usize) {
2981 let pad = " ".repeat(indent);
2982 println!(
2983 "{pad}name={:?} source_name={:?}",
2984 node.name, node.source_name
2985 );
2986 for child in &node.downstream {
2987 print_node(child, indent + 1);
2988 }
2989 }
2990
2991 #[test]
2992 fn test_issue18_repro() {
2993 let query = "SELECT UPPER(name) as upper_name FROM users";
2995 println!("Query: {query}\n");
2996
2997 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
2998 let exprs = dialect.parse(query).unwrap();
2999 let expr = &exprs[0];
3000
3001 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3002 println!("lineage(\"upper_name\"):");
3003 print_node(&node, 1);
3004
3005 let names = node.downstream_names();
3006 assert!(
3007 names.iter().any(|n| n == "users.name"),
3008 "Expected users.name in downstream, got: {:?}",
3009 names
3010 );
3011 }
3012
3013 #[test]
3014 fn test_lineage_bigquery_safe_namespace_issue207() {
3015 let query = r#"
3016WITH import_cte AS (
3017 SELECT timestamp, data, operation
3018 FROM `project`.`dataset`.`source_table`
3019),
3020transform_cte AS (
3021 SELECT
3022 timestamp,
3023 SAFE.PARSE_JSON(data) AS json_data
3024 FROM import_cte
3025)
3026SELECT json_data FROM transform_cte
3027"#;
3028 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3029 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3030 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3031 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3032
3033 assert!(
3034 names.iter().any(|name| name == "source_table.data"),
3035 "expected source_table.data in lineage, got {names:?}"
3036 );
3037 assert!(
3038 !names
3039 .iter()
3040 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3041 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3042 );
3043 }
3044
3045 #[test]
3046 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3047 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3048 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3049 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3050 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3051
3052 assert!(
3053 names.iter().any(|name| name == "t.data"),
3054 "expected t.data in lineage, got {names:?}"
3055 );
3056 assert!(
3057 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3058 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3059 );
3060 }
3061
3062 #[test]
3063 fn test_lineage_method_call_receiver_control() {
3064 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3065 let node = lineage("out", &expr, None, false).expect("lineage");
3066 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3067
3068 assert!(
3069 names.iter().any(|name| name == "t.obj"),
3070 "expected ordinary method receiver to remain in lineage, got {names:?}"
3071 );
3072 assert!(
3073 names.iter().any(|name| name == "t.arg"),
3074 "expected method argument in lineage, got {names:?}"
3075 );
3076 }
3077
3078 #[test]
3079 fn test_lineage_upper_function() {
3080 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3081 let node = lineage("upper_name", &expr, None, false).unwrap();
3082
3083 let names = node.downstream_names();
3084 assert!(
3085 names.iter().any(|n| n == "users.name"),
3086 "Expected users.name in downstream, got: {:?}",
3087 names
3088 );
3089 }
3090
3091 #[test]
3092 fn test_lineage_round_function() {
3093 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3094 let node = lineage("rounded", &expr, None, false).unwrap();
3095
3096 let names = node.downstream_names();
3097 assert!(
3098 names.iter().any(|n| n == "products.price"),
3099 "Expected products.price in downstream, got: {:?}",
3100 names
3101 );
3102 }
3103
3104 #[test]
3105 fn test_lineage_coalesce_function() {
3106 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3107 let node = lineage("val", &expr, None, false).unwrap();
3108
3109 let names = node.downstream_names();
3110 assert!(
3111 names.iter().any(|n| n == "t.a"),
3112 "Expected t.a in downstream, got: {:?}",
3113 names
3114 );
3115 assert!(
3116 names.iter().any(|n| n == "t.b"),
3117 "Expected t.b in downstream, got: {:?}",
3118 names
3119 );
3120 }
3121
3122 #[test]
3123 fn test_lineage_count_function() {
3124 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3125 let node = lineage("cnt", &expr, None, false).unwrap();
3126
3127 let names = node.downstream_names();
3128 assert!(
3129 names.iter().any(|n| n == "t.id"),
3130 "Expected t.id in downstream, got: {:?}",
3131 names
3132 );
3133 }
3134
3135 #[test]
3136 fn test_lineage_sum_function() {
3137 let expr = parse("SELECT SUM(amount) AS total FROM t");
3138 let node = lineage("total", &expr, None, false).unwrap();
3139
3140 let names = node.downstream_names();
3141 assert!(
3142 names.iter().any(|n| n == "t.amount"),
3143 "Expected t.amount in downstream, got: {:?}",
3144 names
3145 );
3146 }
3147
3148 #[test]
3149 fn test_lineage_case_with_nested_functions() {
3150 let expr =
3151 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3152 let node = lineage("result", &expr, None, false).unwrap();
3153
3154 let names = node.downstream_names();
3155 assert!(
3156 names.iter().any(|n| n == "t.x"),
3157 "Expected t.x in downstream, got: {:?}",
3158 names
3159 );
3160 assert!(
3161 names.iter().any(|n| n == "t.name"),
3162 "Expected t.name in downstream, got: {:?}",
3163 names
3164 );
3165 }
3166
3167 #[test]
3168 fn test_lineage_substring_function() {
3169 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3170 let node = lineage("short", &expr, None, false).unwrap();
3171
3172 let names = node.downstream_names();
3173 assert!(
3174 names.iter().any(|n| n == "t.name"),
3175 "Expected t.name in downstream, got: {:?}",
3176 names
3177 );
3178 }
3179
3180 #[test]
3183 fn test_lineage_cte_select_star() {
3184 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3188 let node = lineage("a", &expr, None, false).unwrap();
3189
3190 assert_eq!(node.name, "a");
3191 assert!(
3194 !node.downstream.is_empty(),
3195 "Expected downstream nodes tracing through CTE, got none"
3196 );
3197 }
3198
3199 #[test]
3200 fn test_lineage_cte_select_star_renamed_column() {
3201 let expr =
3204 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3205 let node = lineage("customer_id", &expr, None, false).unwrap();
3206
3207 assert_eq!(node.name, "customer_id");
3208 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3210 assert!(
3211 all_names.len() >= 2,
3212 "Expected at least 2 nodes (customer_id → source), got: {:?}",
3213 all_names
3214 );
3215 }
3216
3217 #[test]
3218 fn test_lineage_cte_select_star_multiple_columns() {
3219 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3221
3222 for col in &["a", "b", "c"] {
3223 let node = lineage(col, &expr, None, false).unwrap();
3224 assert_eq!(node.name, *col);
3225 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3227 assert!(
3228 all_names.len() >= 2,
3229 "Expected at least 2 nodes for column {}, got: {:?}",
3230 col,
3231 all_names
3232 );
3233 }
3234 }
3235
3236 #[test]
3237 fn test_lineage_nested_cte_select_star() {
3238 let expr = parse(
3240 "WITH cte1 AS (SELECT a FROM t), \
3241 cte2 AS (SELECT * FROM cte1) \
3242 SELECT * FROM cte2",
3243 );
3244 let node = lineage("a", &expr, None, false).unwrap();
3245
3246 assert_eq!(node.name, "a");
3247 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3248 assert!(
3249 all_names.len() >= 3,
3250 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3251 all_names
3252 );
3253 }
3254
3255 #[test]
3256 fn test_lineage_three_level_nested_cte_star() {
3257 let expr = parse(
3259 "WITH cte1 AS (SELECT x FROM t), \
3260 cte2 AS (SELECT * FROM cte1), \
3261 cte3 AS (SELECT * FROM cte2) \
3262 SELECT * FROM cte3",
3263 );
3264 let node = lineage("x", &expr, None, false).unwrap();
3265
3266 assert_eq!(node.name, "x");
3267 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3268 assert!(
3269 all_names.len() >= 4,
3270 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3271 all_names
3272 );
3273 }
3274
3275 #[test]
3276 fn test_lineage_cte_union_star() {
3277 let expr = parse(
3279 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3280 SELECT * FROM cte",
3281 );
3282 let node = lineage("a", &expr, None, false).unwrap();
3283
3284 assert_eq!(node.name, "a");
3285 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3286 assert!(
3287 all_names.len() >= 2,
3288 "Expected at least 2 nodes for CTE union star, got: {:?}",
3289 all_names
3290 );
3291 }
3292
3293 #[test]
3294 fn test_lineage_cte_star_unknown_table() {
3295 let expr = parse(
3298 "WITH cte AS (SELECT * FROM unknown_table) \
3299 SELECT * FROM cte",
3300 );
3301 let _result = lineage("x", &expr, None, false);
3304 }
3305
3306 #[test]
3307 fn test_lineage_cte_explicit_columns() {
3308 let expr = parse(
3310 "WITH cte(x, y) AS (SELECT a, b FROM t) \
3311 SELECT * FROM cte",
3312 );
3313 let node = lineage("x", &expr, None, false).unwrap();
3314 assert_eq!(node.name, "x");
3315 }
3316
3317 #[test]
3318 fn test_lineage_cte_qualified_star() {
3319 let expr = parse(
3321 "WITH cte AS (SELECT a, b FROM t) \
3322 SELECT cte.* FROM cte",
3323 );
3324 for col in &["a", "b"] {
3325 let node = lineage(col, &expr, None, false).unwrap();
3326 assert_eq!(node.name, *col);
3327 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3328 assert!(
3329 all_names.len() >= 2,
3330 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3331 col,
3332 all_names
3333 );
3334 }
3335 }
3336
3337 #[test]
3338 fn test_lineage_subquery_select_star() {
3339 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3342 let node = lineage("x", &expr, None, false).unwrap();
3343
3344 assert_eq!(node.name, "x");
3345 assert!(
3346 !node.downstream.is_empty(),
3347 "Expected downstream nodes for subquery with SELECT *, got none"
3348 );
3349 }
3350
3351 #[test]
3352 fn test_lineage_cte_star_with_schema_external_table() {
3353 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3355SELECT * FROM orders"#;
3356 let expr = parse(sql);
3357
3358 let mut schema = MappingSchema::new();
3359 let cols = vec![
3360 ("order_id".to_string(), DataType::Unknown),
3361 ("customer_id".to_string(), DataType::Unknown),
3362 ("amount".to_string(), DataType::Unknown),
3363 ];
3364 schema.add_table("stg_orders", &cols, None).unwrap();
3365
3366 let node =
3367 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3368 .unwrap();
3369 assert_eq!(node.name, "order_id");
3370 }
3371
3372 #[test]
3373 fn test_lineage_cte_star_with_schema_three_part_name() {
3374 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3376SELECT * FROM orders"#;
3377 let expr = parse(sql);
3378
3379 let mut schema = MappingSchema::new();
3380 let cols = vec![
3381 ("order_id".to_string(), DataType::Unknown),
3382 ("customer_id".to_string(), DataType::Unknown),
3383 ];
3384 schema
3385 .add_table("db.schema.stg_orders", &cols, None)
3386 .unwrap();
3387
3388 let node = lineage_with_schema(
3389 "customer_id",
3390 &expr,
3391 Some(&schema as &dyn Schema),
3392 None,
3393 false,
3394 )
3395 .unwrap();
3396 assert_eq!(node.name, "customer_id");
3397 }
3398
3399 #[test]
3400 fn test_lineage_cte_star_with_schema_nested() {
3401 let sql = r#"WITH
3404 raw AS (SELECT * FROM external_table),
3405 enriched AS (SELECT * FROM raw)
3406 SELECT * FROM enriched"#;
3407 let expr = parse(sql);
3408
3409 let mut schema = MappingSchema::new();
3410 let cols = vec![
3411 ("id".to_string(), DataType::Unknown),
3412 ("name".to_string(), DataType::Unknown),
3413 ];
3414 schema.add_table("external_table", &cols, None).unwrap();
3415
3416 let node =
3417 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3418 assert_eq!(node.name, "name");
3419 }
3420
3421 #[test]
3422 fn test_lineage_cte_qualified_star_with_schema() {
3423 let sql = r#"WITH
3426 orders AS (SELECT * FROM stg_orders),
3427 enriched AS (
3428 SELECT orders.*, 'extra' AS extra
3429 FROM orders
3430 )
3431 SELECT * FROM enriched"#;
3432 let expr = parse(sql);
3433
3434 let mut schema = MappingSchema::new();
3435 let cols = vec![
3436 ("order_id".to_string(), DataType::Unknown),
3437 ("total".to_string(), DataType::Unknown),
3438 ];
3439 schema.add_table("stg_orders", &cols, None).unwrap();
3440
3441 let node =
3442 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3443 .unwrap();
3444 assert_eq!(node.name, "order_id");
3445
3446 let extra =
3448 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3449 assert_eq!(extra.name, "extra");
3450 }
3451
3452 #[test]
3453 fn test_lineage_cte_star_without_schema_still_works() {
3454 let sql = r#"WITH
3456 cte1 AS (SELECT id, name FROM raw_table),
3457 cte2 AS (SELECT * FROM cte1)
3458 SELECT * FROM cte2"#;
3459 let expr = parse(sql);
3460
3461 let node = lineage("id", &expr, None, false).unwrap();
3463 assert_eq!(node.name, "id");
3464 }
3465
3466 #[test]
3467 fn test_lineage_nested_cte_star_with_join_and_schema() {
3468 let sql = r#"WITH
3471base_orders AS (
3472 SELECT * FROM stg_orders
3473),
3474with_payments AS (
3475 SELECT
3476 base_orders.*,
3477 p.amount
3478 FROM base_orders
3479 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3480),
3481final_cte AS (
3482 SELECT * FROM with_payments
3483)
3484SELECT * FROM final_cte"#;
3485 let expr = parse(sql);
3486
3487 let mut schema = MappingSchema::new();
3488 let order_cols = vec![
3489 (
3490 "order_id".to_string(),
3491 crate::expressions::DataType::Unknown,
3492 ),
3493 (
3494 "customer_id".to_string(),
3495 crate::expressions::DataType::Unknown,
3496 ),
3497 ("status".to_string(), crate::expressions::DataType::Unknown),
3498 ];
3499 let pay_cols = vec![
3500 (
3501 "payment_id".to_string(),
3502 crate::expressions::DataType::Unknown,
3503 ),
3504 (
3505 "order_id".to_string(),
3506 crate::expressions::DataType::Unknown,
3507 ),
3508 ("amount".to_string(), crate::expressions::DataType::Unknown),
3509 ];
3510 schema.add_table("stg_orders", &order_cols, None).unwrap();
3511 schema.add_table("stg_payments", &pay_cols, None).unwrap();
3512
3513 let node =
3515 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3516 .unwrap();
3517 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3518
3519 let has_table_qualified = all_names
3521 .iter()
3522 .any(|n| n.contains('.') && n.contains("order_id"));
3523 assert!(
3524 has_table_qualified,
3525 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
3526 all_names
3527 );
3528
3529 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
3531 .unwrap();
3532 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3533
3534 let has_table_qualified = all_names
3535 .iter()
3536 .any(|n| n.contains('.') && n.contains("amount"));
3537 assert!(
3538 has_table_qualified,
3539 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
3540 all_names
3541 );
3542 }
3543
3544 #[test]
3545 fn test_lineage_cte_alias_resolution() {
3546 let sql = r#"WITH import_stg_items AS (
3548 SELECT item_id, name, status FROM stg_items
3549)
3550SELECT base.item_id, base.status
3551FROM import_stg_items AS base"#;
3552 let expr = parse(sql);
3553
3554 let node = lineage("item_id", &expr, None, false).unwrap();
3555 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3556 assert!(
3558 all_names.iter().any(|n| n == "stg_items.item_id"),
3559 "Expected leaf 'stg_items.item_id', got: {:?}",
3560 all_names
3561 );
3562 }
3563
3564 #[test]
3565 fn test_lineage_cte_alias_with_schema_and_star() {
3566 let sql = r#"WITH import_stg AS (
3568 SELECT * FROM stg_items
3569)
3570SELECT base.item_id, base.status
3571FROM import_stg AS base"#;
3572 let expr = parse(sql);
3573
3574 let mut schema = MappingSchema::new();
3575 schema
3576 .add_table(
3577 "stg_items",
3578 &[
3579 ("item_id".to_string(), DataType::Unknown),
3580 ("name".to_string(), DataType::Unknown),
3581 ("status".to_string(), DataType::Unknown),
3582 ],
3583 None,
3584 )
3585 .unwrap();
3586
3587 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
3588 .unwrap();
3589 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3590 assert!(
3591 all_names.iter().any(|n| n == "stg_items.item_id"),
3592 "Expected leaf 'stg_items.item_id', got: {:?}",
3593 all_names
3594 );
3595 }
3596
3597 #[test]
3598 fn test_lineage_cte_alias_with_join() {
3599 let sql = r#"WITH
3601 import_users AS (SELECT id, name FROM users),
3602 import_orders AS (SELECT id, user_id, amount FROM orders)
3603SELECT u.name, o.amount
3604FROM import_users AS u
3605LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3606 let expr = parse(sql);
3607
3608 let node = lineage("name", &expr, None, false).unwrap();
3609 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3610 assert!(
3611 all_names.iter().any(|n| n == "users.name"),
3612 "Expected leaf 'users.name', got: {:?}",
3613 all_names
3614 );
3615
3616 let node = lineage("amount", &expr, None, false).unwrap();
3617 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3618 assert!(
3619 all_names.iter().any(|n| n == "orders.amount"),
3620 "Expected leaf 'orders.amount', got: {:?}",
3621 all_names
3622 );
3623 }
3624
3625 #[test]
3630 fn test_lineage_unquoted_cte_case_insensitive() {
3631 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
3634 let node = lineage("col", &expr, None, false).unwrap();
3635 assert_eq!(node.name, "col");
3636 assert!(
3637 !node.downstream.is_empty(),
3638 "Unquoted CTE should resolve case-insensitively"
3639 );
3640 }
3641
3642 #[test]
3643 fn test_lineage_quoted_cte_case_preserved() {
3644 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
3646 let node = lineage("col", &expr, None, false).unwrap();
3647 assert_eq!(node.name, "col");
3648 assert!(
3649 !node.downstream.is_empty(),
3650 "Quoted CTE with matching case should resolve"
3651 );
3652 }
3653
3654 #[test]
3655 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3656 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
3660 let result = lineage("col", &expr, None, false);
3663 assert!(
3664 result.is_err(),
3665 "Quoted CTE with case mismatch should not expand star: {:?}",
3666 result
3667 );
3668 }
3669
3670 #[test]
3671 fn test_lineage_mixed_quoted_unquoted_cte() {
3672 let expr = parse(
3674 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3675 );
3676 let node = lineage("a", &expr, None, false).unwrap();
3677 assert_eq!(node.name, "a");
3678 assert!(
3679 !node.downstream.is_empty(),
3680 "Mixed quoted/unquoted CTE chain should resolve"
3681 );
3682 }
3683
3684 #[test]
3700 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3701 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
3712 let node = lineage("col", &expr, None, false).unwrap();
3713 assert!(!node.downstream.is_empty());
3714 let child = &node.downstream[0];
3715 assert_eq!(
3717 child.source_name, "MyCte",
3718 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3719 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3720 );
3721 }
3722
3723 #[test]
3724 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
3725 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
3732 let node = lineage("col", &expr, None, false).unwrap();
3733 assert!(!node.downstream.is_empty());
3734 let child = &node.downstream[0];
3735 assert_eq!(
3737 child.source_name, "MyCte",
3738 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3739 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3740 );
3741 }
3742
3743 #[test]
3750 #[ignore = "requires derived table star expansion (separate issue)"]
3751 fn test_node_name_doesnt_contain_comment() {
3752 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
3753 let node = lineage("x", &expr, None, false).unwrap();
3754
3755 assert_eq!(node.name, "x");
3756 assert!(!node.downstream.is_empty());
3757 }
3758
3759 #[test]
3763 fn test_comment_before_first_column_in_cte() {
3764 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
3765 let sql_without_comment = "with t as (select 1 as a) select a from t";
3766
3767 let expr_ok = parse(sql_without_comment);
3769 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
3770
3771 let expr_comment = parse(sql_with_comment);
3773 let node_comment = lineage("a", &expr_comment, None, false)
3774 .expect("with comment before first column should succeed");
3775
3776 assert_eq!(node_ok.name, node_comment.name, "node names should match");
3777 assert_eq!(
3778 node_ok.downstream_names(),
3779 node_comment.downstream_names(),
3780 "downstream lineage should be identical with or without comment"
3781 );
3782 }
3783
3784 #[test]
3786 fn test_block_comment_before_first_column() {
3787 let sql = "with t as (select 1 as a) select /* section */ a from t";
3788 let expr = parse(sql);
3789 let node = lineage("a", &expr, None, false)
3790 .expect("block comment before first column should succeed");
3791 assert_eq!(node.name, "a");
3792 assert!(
3793 !node.downstream.is_empty(),
3794 "should have downstream lineage"
3795 );
3796 }
3797
3798 #[test]
3800 fn test_comment_before_first_column_second_col_ok() {
3801 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
3802 let expr = parse(sql);
3803
3804 let node_a =
3805 lineage("a", &expr, None, false).expect("column a with comment should succeed");
3806 assert_eq!(node_a.name, "a");
3807
3808 let node_b =
3809 lineage("b", &expr, None, false).expect("column b with comment should succeed");
3810 assert_eq!(node_b.name, "b");
3811 }
3812
3813 #[test]
3815 fn test_comment_before_aliased_column() {
3816 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
3817 let expr = parse(sql);
3818 let node =
3819 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
3820 assert_eq!(node.name, "y");
3821 assert!(
3822 !node.downstream.is_empty(),
3823 "aliased column should have downstream lineage"
3824 );
3825 }
3826}