1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub source_kind: SourceKind,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub source_alias: Option<String>,
37 pub reference_node_name: String,
39}
40
41impl LineageNode {
42 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44 Self {
45 name: name.into(),
46 expression,
47 source,
48 downstream: Vec::new(),
49 source_name: String::new(),
50 source_kind: SourceKind::Unknown,
51 source_alias: None,
52 reference_node_name: String::new(),
53 }
54 }
55
56 pub fn walk(&self) -> LineageWalker<'_> {
58 LineageWalker { stack: vec![self] }
59 }
60
61 pub fn downstream_names(&self) -> Vec<String> {
63 self.downstream.iter().map(|n| n.name.clone()).collect()
64 }
65}
66
67fn source_kind_for_scope_context(
68 scope: &Scope,
69 source_name: &str,
70 reference_node_name: &str,
71) -> SourceKind {
72 if source_name.is_empty() && reference_node_name.is_empty() {
73 return SourceKind::Root;
74 }
75 if let Some(source_info) = scope.sources.get(source_name) {
76 return source_info.kind;
77 }
78 if scope.cte_sources.contains_key(source_name) {
79 return SourceKind::Cte;
80 }
81 match scope.scope_type {
82 ScopeType::Cte => SourceKind::Cte,
83 ScopeType::DerivedTable => SourceKind::DerivedTable,
84 ScopeType::Udtf => SourceKind::Virtual,
85 _ => SourceKind::Unknown,
86 }
87}
88
89fn apply_scope_context(
90 node: &mut LineageNode,
91 scope: &Scope,
92 source_name: &str,
93 reference_node_name: &str,
94) {
95 node.source_name = source_name.to_string();
96 node.reference_node_name = reference_node_name.to_string();
97 node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100pub struct LineageWalker<'a> {
102 stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106 type Item = &'a LineageNode;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 if let Some(node) = self.stack.pop() {
110 for child in node.downstream.iter().rev() {
112 self.stack.push(child);
113 }
114 Some(node)
115 } else {
116 None
117 }
118 }
119}
120
121enum ColumnRef<'a> {
127 Name(&'a str),
128 Index(usize),
129}
130
131pub fn lineage(
157 column: &str,
158 sql: &Expression,
159 dialect: Option<DialectType>,
160 trim_selects: bool,
161) -> Result<LineageNode> {
162 let effective_sql = lineage_effective_expression(sql);
164 let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165 if !has_with {
166 return lineage_from_expression(column, sql, dialect, trim_selects);
167 }
168 let mut owned = sql.clone();
169 expand_cte_stars(&mut owned, None);
170 lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173pub fn lineage_with_schema(
189 column: &str,
190 sql: &Expression,
191 schema: Option<&dyn Schema>,
192 dialect: Option<DialectType>,
193 trim_selects: bool,
194) -> Result<LineageNode> {
195 let mut qualified_expression = if let Some(schema) = schema {
196 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197 QualifyColumnsOptions::new().with_dialect(dialect_type)
198 } else {
199 QualifyColumnsOptions::new()
200 };
201
202 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203 Error::internal(format!("Lineage qualification failed with schema: {}", e))
204 })?
205 } else {
206 sql.clone()
207 };
208
209 annotate_types(&mut qualified_expression, schema, dialect);
211
212 expand_cte_stars(&mut qualified_expression, schema);
215
216 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220 column: &str,
221 sql: &Expression,
222 dialect: Option<DialectType>,
223 trim_selects: bool,
224) -> Result<LineageNode> {
225 let sql = lineage_effective_expression(sql);
226 let scope = build_scope(sql);
227 to_node(
228 ColumnRef::Name(column),
229 &scope,
230 dialect,
231 "",
232 "",
233 "",
234 trim_selects,
235 )
236}
237
238fn lineage_effective_expression(sql: &Expression) -> &Expression {
239 match sql {
240 Expression::Prepare(prepare) => &prepare.statement,
241 _ => sql,
242 }
243}
244
245fn normalize_cte_name(ident: &Identifier) -> String {
255 if ident.quoted {
256 ident.name.clone()
257 } else {
258 ident.name.to_lowercase()
259 }
260}
261
262pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
274 if let Expression::Prepare(prepare) = expr {
275 expand_cte_stars(&mut prepare.statement, schema);
276 return;
277 }
278
279 let select = match expr {
280 Expression::Select(s) => s,
281 _ => return,
282 };
283
284 let with = match &mut select.with {
285 Some(w) => w,
286 None => return,
287 };
288
289 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
290
291 for cte in &mut with.ctes {
292 let cte_name = normalize_cte_name(&cte.alias);
293
294 if !cte.columns.is_empty() {
296 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
297 resolved_cte_columns.insert(cte_name, cols);
298 continue;
299 }
300
301 if with.recursive {
307 let is_self_referencing =
308 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
309 let body_sources = get_select_sources(body_select);
310 body_sources.iter().any(|s| s.normalized == cte_name)
311 } else {
312 false
313 };
314 if is_self_referencing {
315 continue;
316 }
317 }
318
319 let body_select = match get_leftmost_select_mut(&mut cte.this) {
321 Some(s) => s,
322 None => continue,
323 };
324
325 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
326 resolved_cte_columns.insert(cte_name, columns);
327 }
328
329 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
331}
332
333fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
338 let mut current = expr;
339 for _ in 0..MAX_LINEAGE_DEPTH {
340 match current {
341 Expression::Select(s) => return Some(s),
342 Expression::Union(u) => current = &mut u.left,
343 Expression::Intersect(i) => current = &mut i.left,
344 Expression::Except(e) => current = &mut e.left,
345 Expression::Paren(p) => current = &mut p.this,
346 _ => return None,
347 }
348 }
349 None
350}
351
352fn rewrite_stars_in_select(
356 select: &mut Select,
357 resolved_ctes: &HashMap<String, Vec<String>>,
358 schema: Option<&dyn Schema>,
359) -> Vec<String> {
360 let has_star = select
365 .expressions
366 .iter()
367 .any(|e| matches!(e, Expression::Star(_)));
368 let has_qualified_star = select
369 .expressions
370 .iter()
371 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
372
373 if !has_star && !has_qualified_star {
374 return select
376 .expressions
377 .iter()
378 .filter_map(get_expression_output_name)
379 .collect();
380 }
381
382 let sources = get_select_sources(select);
383 let mut new_expressions = Vec::new();
384 let mut result_columns = Vec::new();
385
386 for expr in &select.expressions {
387 match expr {
388 Expression::Star(star) => {
389 let qual = star.table.as_ref();
390 if let Some(expanded) =
391 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
392 {
393 for (src_alias, col_name) in &expanded {
394 let table_id = Identifier::new(src_alias);
395 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
396 result_columns.push(col_name.clone());
397 }
398 } else {
399 new_expressions.push(expr.clone());
400 result_columns.push("*".to_string());
401 }
402 }
403 Expression::Column(c) if c.name.name == "*" => {
404 let qual = c.table.as_ref();
405 if let Some(expanded) =
406 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
407 {
408 for (_src_alias, col_name) in &expanded {
409 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
411 result_columns.push(col_name.clone());
412 }
413 } else {
414 new_expressions.push(expr.clone());
415 result_columns.push("*".to_string());
416 }
417 }
418 _ => {
419 new_expressions.push(expr.clone());
420 if let Some(name) = get_expression_output_name(expr) {
421 result_columns.push(name);
422 }
423 }
424 }
425 }
426
427 select.expressions = new_expressions;
428 result_columns
429}
430
431fn expand_star_from_sources(
436 qualifier: Option<&Identifier>,
437 sources: &[SourceInfo],
438 resolved_ctes: &HashMap<String, Vec<String>>,
439 schema: Option<&dyn Schema>,
440) -> Option<Vec<(String, String)>> {
441 let mut expanded = Vec::new();
442
443 if let Some(qual) = qualifier {
444 let qual_normalized = normalize_cte_name(qual);
446 for src in sources {
447 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
448 if let Some(cols) = resolved_ctes.get(&src.normalized) {
450 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
451 return Some(expanded);
452 }
453 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
455 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
456 return Some(expanded);
457 }
458 }
459 }
460 None
461 } else {
462 let mut any_expanded = false;
468 for src in sources {
469 if let Some(cols) = resolved_ctes.get(&src.normalized) {
470 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
471 any_expanded = true;
472 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
473 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
474 any_expanded = true;
475 } else {
476 return None;
477 }
478 }
479 if any_expanded {
480 Some(expanded)
481 } else {
482 None
483 }
484 }
485}
486
487fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
489 let schema = schema?;
490 if fq_name.is_empty() {
491 return None;
492 }
493 schema
494 .column_names(fq_name)
495 .ok()
496 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
497}
498
499fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
501 Expression::Column(Box::new(crate::expressions::Column {
502 name: Identifier::new(name),
503 table: table.cloned(),
504 join_mark: false,
505 trailing_comments: Vec::new(),
506 span: None,
507 inferred_type: None,
508 }))
509}
510
511fn get_expression_output_name(expr: &Expression) -> Option<String> {
513 match expr {
514 Expression::Alias(a) => Some(a.alias.name.clone()),
515 Expression::Column(c) => Some(c.name.name.clone()),
516 Expression::Identifier(id) => Some(id.name.clone()),
517 Expression::Star(_) => Some("*".to_string()),
518 _ => None,
519 }
520}
521
522struct SourceInfo {
524 alias: String,
525 normalized: String,
527 fq_name: String,
529}
530
531fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
534 let mut sources = Vec::new();
535
536 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
537 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
538 SourceInfo {
539 alias: alias.name.clone(),
540 normalized: normalize_cte_name(alias),
541 fq_name: alias.name.clone(),
542 }
543 }
544
545 match expr {
546 Expression::Table(t) => {
547 let normalized = normalize_cte_name(&t.name);
548 let alias = t
549 .alias
550 .as_ref()
551 .map(|a| a.name.clone())
552 .unwrap_or_else(|| t.name.name.clone());
553 let mut parts = Vec::new();
554 if let Some(catalog) = &t.catalog {
555 parts.push(catalog.name.clone());
556 }
557 if let Some(schema) = &t.schema {
558 parts.push(schema.name.clone());
559 }
560 parts.push(t.name.name.clone());
561 let fq_name = parts.join(".");
562 Some(SourceInfo {
563 alias,
564 normalized,
565 fq_name,
566 })
567 }
568 Expression::Subquery(s) => {
569 let alias = s.alias.as_ref()?.name.clone();
570 let normalized = alias.to_lowercase();
571 let fq_name = alias.clone();
572 Some(SourceInfo {
573 alias,
574 normalized,
575 fq_name,
576 })
577 }
578 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
579 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
580 Some(virtual_source_info(&a.alias))
581 }
582 Expression::Paren(p) => extract_source(&p.this),
583 _ => None,
584 }
585 }
586
587 if let Some(from) = &select.from {
588 for expr in &from.expressions {
589 if let Some(info) = extract_source(expr) {
590 sources.push(info);
591 }
592 }
593 }
594 for join in &select.joins {
595 if let Some(info) = extract_source(&join.this) {
596 sources.push(info);
597 }
598 }
599 sources
600}
601
602pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
604 let mut tables = HashSet::new();
605 collect_source_tables(node, &mut tables);
606 tables
607}
608
609pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
611 if let Expression::Table(table) = &node.source {
612 tables.insert(table.name.name.clone());
613 }
614 for child in &node.downstream {
615 collect_source_tables(child, tables);
616 }
617}
618
619const MAX_LINEAGE_DEPTH: usize = 64;
626
627fn to_node(
629 column: ColumnRef<'_>,
630 scope: &Scope,
631 dialect: Option<DialectType>,
632 scope_name: &str,
633 source_name: &str,
634 reference_node_name: &str,
635 trim_selects: bool,
636) -> Result<LineageNode> {
637 to_node_inner(
638 column,
639 scope,
640 dialect,
641 scope_name,
642 source_name,
643 reference_node_name,
644 trim_selects,
645 &[],
646 0,
647 )
648}
649
650fn to_node_inner(
651 column: ColumnRef<'_>,
652 scope: &Scope,
653 dialect: Option<DialectType>,
654 scope_name: &str,
655 source_name: &str,
656 reference_node_name: &str,
657 trim_selects: bool,
658 ancestor_cte_scopes: &[Scope],
659 depth: usize,
660) -> Result<LineageNode> {
661 if depth > MAX_LINEAGE_DEPTH {
662 return Err(Error::internal(format!(
663 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
664 )));
665 }
666 let scope_expr = &scope.expression;
667
668 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
670 for s in ancestor_cte_scopes {
671 all_cte_scopes.push(s);
672 }
673
674 let effective_expr = match scope_expr {
677 Expression::Cte(cte) => &cte.this,
678 other => other,
679 };
680
681 if matches!(
683 effective_expr,
684 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
685 ) {
686 if matches!(scope_expr, Expression::Cte(_)) {
688 let mut inner_scope = Scope::new(effective_expr.clone());
689 inner_scope.union_scopes = scope.union_scopes.clone();
690 inner_scope.sources = scope.sources.clone();
691 inner_scope.cte_sources = scope.cte_sources.clone();
692 inner_scope.cte_scopes = scope.cte_scopes.clone();
693 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
694 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
695 return handle_set_operation(
696 &column,
697 &inner_scope,
698 dialect,
699 scope_name,
700 source_name,
701 reference_node_name,
702 trim_selects,
703 ancestor_cte_scopes,
704 depth,
705 );
706 }
707 return handle_set_operation(
708 &column,
709 scope,
710 dialect,
711 scope_name,
712 source_name,
713 reference_node_name,
714 trim_selects,
715 ancestor_cte_scopes,
716 depth,
717 );
718 }
719
720 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
722 let column_name = resolve_column_name(&column, &select_expr);
723
724 let node_source = if trim_selects {
726 trim_source(effective_expr, &select_expr)
727 } else {
728 effective_expr.clone()
729 };
730
731 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
733 apply_scope_context(&mut node, scope, source_name, reference_node_name);
734
735 if matches!(&select_expr, Expression::Star(_)) {
737 for (name, source_info) in &scope.sources {
738 let mut child = LineageNode::new(
739 format!("{}.*", name),
740 Expression::Star(crate::expressions::Star {
741 table: None,
742 except: None,
743 replace: None,
744 rename: None,
745 trailing_comments: vec![],
746 span: None,
747 }),
748 source_info.expression.clone(),
749 );
750 apply_source_info_context(&mut child, name, source_info);
751 node.downstream.push(child);
752 }
753 return Ok(node);
754 }
755
756 let subqueries: Vec<&Expression> =
758 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
759 for sq_expr in subqueries {
760 if let Expression::Subquery(sq) = sq_expr {
761 for sq_scope in &scope.subquery_scopes {
762 if sq_scope.expression == sq.this {
763 if let Ok(child) = to_node_inner(
764 ColumnRef::Index(0),
765 sq_scope,
766 dialect,
767 &column_name,
768 "",
769 "",
770 trim_selects,
771 ancestor_cte_scopes,
772 depth + 1,
773 ) {
774 node.downstream.push(child);
775 }
776 break;
777 }
778 }
779 }
780 }
781
782 let col_refs = find_column_refs_in_expr(&select_expr, dialect);
784 for col_ref in col_refs {
785 let col_name = &col_ref.column;
786 if let Some(ref table_id) = col_ref.table {
787 let tbl = &table_id.name;
788 resolve_qualified_column(
789 &mut node,
790 scope,
791 dialect,
792 tbl,
793 col_name,
794 &column_name,
795 trim_selects,
796 &all_cte_scopes,
797 depth,
798 );
799 } else {
800 resolve_unqualified_column(
801 &mut node,
802 scope,
803 dialect,
804 col_name,
805 &column_name,
806 trim_selects,
807 &all_cte_scopes,
808 depth,
809 );
810 }
811 }
812
813 Ok(node)
814}
815
816fn handle_set_operation(
821 column: &ColumnRef<'_>,
822 scope: &Scope,
823 dialect: Option<DialectType>,
824 scope_name: &str,
825 source_name: &str,
826 reference_node_name: &str,
827 trim_selects: bool,
828 ancestor_cte_scopes: &[Scope],
829 depth: usize,
830) -> Result<LineageNode> {
831 let scope_expr = &scope.expression;
832
833 let col_index = match column {
835 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
836 ColumnRef::Index(i) => *i,
837 };
838
839 let col_name = match column {
840 ColumnRef::Name(name) => name.to_string(),
841 ColumnRef::Index(_) => format!("_{col_index}"),
842 };
843
844 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
845 apply_scope_context(&mut node, scope, source_name, reference_node_name);
846
847 for branch_scope in &scope.union_scopes {
849 if let Ok(child) = to_node_inner(
850 ColumnRef::Index(col_index),
851 branch_scope,
852 dialect,
853 scope_name,
854 "",
855 "",
856 trim_selects,
857 ancestor_cte_scopes,
858 depth + 1,
859 ) {
860 node.downstream.push(child);
861 }
862 }
863
864 Ok(node)
865}
866
867fn resolve_qualified_column(
872 node: &mut LineageNode,
873 scope: &Scope,
874 dialect: Option<DialectType>,
875 table: &str,
876 col_name: &str,
877 parent_name: &str,
878 trim_selects: bool,
879 all_cte_scopes: &[&Scope],
880 depth: usize,
881) {
882 let resolved_cte_name = resolve_cte_alias(scope, table);
885 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
886
887 let is_cte = scope.cte_sources.contains_key(effective_table)
890 || all_cte_scopes.iter().any(
891 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
892 );
893 if is_cte {
894 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
895 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
897 if let Ok(child) = to_node_inner(
898 ColumnRef::Name(col_name),
899 child_scope,
900 dialect,
901 parent_name,
902 effective_table,
903 parent_name,
904 trim_selects,
905 &ancestors,
906 depth + 1,
907 ) {
908 node.downstream.push(child);
909 return;
910 }
911 }
912 }
913
914 if let Some(source_info) = scope.sources.get(table) {
916 if source_info.is_scope {
917 if let Some(child_scope) = find_child_scope(scope, table) {
918 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
919 if let Ok(child) = to_node_inner(
920 ColumnRef::Name(col_name),
921 child_scope,
922 dialect,
923 parent_name,
924 table,
925 parent_name,
926 trim_selects,
927 &ancestors,
928 depth + 1,
929 ) {
930 node.downstream.push(child);
931 return;
932 }
933 }
934 }
935 }
936
937 if let Some(source_info) = scope.sources.get(table) {
940 if !source_info.is_scope {
941 let mut child = make_table_column_node_from_source(table, col_name, source_info);
942 if source_info.kind == SourceKind::Virtual {
943 attach_virtual_source_dependencies(
944 &mut child,
945 scope,
946 dialect,
947 table,
948 &source_info.expression,
949 trim_selects,
950 all_cte_scopes,
951 depth,
952 );
953 }
954 node.downstream.push(child);
955 return;
956 }
957 }
958
959 node.downstream
961 .push(make_table_column_node(table, col_name));
962}
963
964fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
970 if scope.cte_sources.contains_key(name) {
972 return None;
973 }
974 if let Some(source_info) = scope.sources.get(name) {
976 if source_info.is_scope {
977 if let Expression::Cte(cte) = &source_info.expression {
978 let cte_name = &cte.alias.name;
979 if scope.cte_sources.contains_key(cte_name) {
980 return Some(cte_name.clone());
981 }
982 }
983 }
984 }
985 None
986}
987
988fn resolve_unqualified_column(
989 node: &mut LineageNode,
990 scope: &Scope,
991 dialect: Option<DialectType>,
992 col_name: &str,
993 parent_name: &str,
994 trim_selects: bool,
995 all_cte_scopes: &[&Scope],
996 depth: usize,
997) {
998 let from_source_names = source_names_from_from_join(scope);
1002
1003 if from_source_names.len() == 1 {
1004 let tbl = &from_source_names[0];
1005 resolve_qualified_column(
1006 node,
1007 scope,
1008 dialect,
1009 tbl,
1010 col_name,
1011 parent_name,
1012 trim_selects,
1013 all_cte_scopes,
1014 depth,
1015 );
1016 return;
1017 }
1018
1019 let child = LineageNode::new(
1021 col_name.to_string(),
1022 Expression::Column(Box::new(crate::expressions::Column {
1023 name: crate::expressions::Identifier::new(col_name.to_string()),
1024 table: None,
1025 join_mark: false,
1026 trailing_comments: vec![],
1027 span: None,
1028 inferred_type: None,
1029 })),
1030 node.source.clone(),
1031 );
1032 node.downstream.push(child);
1033}
1034
1035fn attach_virtual_source_dependencies(
1036 node: &mut LineageNode,
1037 scope: &Scope,
1038 dialect: Option<DialectType>,
1039 source_alias: &str,
1040 source_expr: &Expression,
1041 trim_selects: bool,
1042 all_cte_scopes: &[&Scope],
1043 depth: usize,
1044) {
1045 let parent_name = node.name.clone();
1046 let mut seen = HashSet::new();
1047 for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1048 let key = (
1049 col_ref.table.as_ref().map(|t| t.name.clone()),
1050 col_ref.column.clone(),
1051 );
1052 if !seen.insert(key) {
1053 continue;
1054 }
1055
1056 if let Some(table_id) = col_ref.table {
1057 let table = table_id.name;
1058 if table == source_alias {
1059 continue;
1060 }
1061 resolve_qualified_column(
1062 node,
1063 scope,
1064 dialect,
1065 &table,
1066 &col_ref.column,
1067 &parent_name,
1068 trim_selects,
1069 all_cte_scopes,
1070 depth + 1,
1071 );
1072 } else {
1073 let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1074 if non_virtual_sources.len() == 1 {
1075 resolve_qualified_column(
1076 node,
1077 scope,
1078 dialect,
1079 &non_virtual_sources[0],
1080 &col_ref.column,
1081 &parent_name,
1082 trim_selects,
1083 all_cte_scopes,
1084 depth + 1,
1085 );
1086 }
1087 }
1088 }
1089}
1090
1091fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1092 fn source_name(expr: &Expression) -> Option<String> {
1093 match expr {
1094 Expression::Table(table) => Some(
1095 table
1096 .alias
1097 .as_ref()
1098 .map(|a| a.name.clone())
1099 .unwrap_or_else(|| table.name.name.clone()),
1100 ),
1101 Expression::Subquery(subquery) => {
1102 subquery.alias.as_ref().map(|alias| alias.name.clone())
1103 }
1104 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1105 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1106 Some(alias.alias.name.clone())
1107 }
1108 Expression::Paren(paren) => source_name(&paren.this),
1109 _ => None,
1110 }
1111 }
1112
1113 let effective_expr = match &scope.expression {
1114 Expression::Cte(cte) => &cte.this,
1115 expr => expr,
1116 };
1117
1118 let mut names = Vec::new();
1119 let mut seen = std::collections::HashSet::new();
1120
1121 if let Expression::Select(select) = effective_expr {
1122 if let Some(from) = &select.from {
1123 for expr in &from.expressions {
1124 if let Some(name) = source_name(expr) {
1125 if !name.is_empty() && seen.insert(name.clone()) {
1126 names.push(name);
1127 }
1128 }
1129 }
1130 }
1131 for join in &select.joins {
1132 if let Some(name) = source_name(&join.this) {
1133 if !name.is_empty() && seen.insert(name.clone()) {
1134 names.push(name);
1135 }
1136 }
1137 }
1138 }
1139
1140 names
1141}
1142
1143fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1144 source_names_from_from_join(scope)
1145 .into_iter()
1146 .filter(|name| {
1147 !matches!(
1148 scope.sources.get(name).map(|source| source.kind),
1149 Some(SourceKind::Virtual)
1150 )
1151 })
1152 .collect()
1153}
1154
1155fn get_alias_or_name(expr: &Expression) -> Option<String> {
1161 match expr {
1162 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1163 Expression::Column(col) => Some(col.name.name.clone()),
1164 Expression::Identifier(id) => Some(id.name.clone()),
1165 Expression::Star(_) => Some("*".to_string()),
1166 Expression::Annotated(a) => get_alias_or_name(&a.this),
1169 _ => None,
1170 }
1171}
1172
1173fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1175 match column {
1176 ColumnRef::Name(n) => n.to_string(),
1177 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1178 }
1179}
1180
1181fn find_select_expr(
1183 scope_expr: &Expression,
1184 column: &ColumnRef<'_>,
1185 dialect: Option<DialectType>,
1186) -> Result<Expression> {
1187 if let Expression::Select(ref select) = scope_expr {
1188 match column {
1189 ColumnRef::Name(name) => {
1190 let normalized_name = normalize_column_name(name, dialect);
1191 for expr in &select.expressions {
1192 if let Some(alias_or_name) = get_alias_or_name(expr) {
1193 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1194 return Ok(expr.clone());
1195 }
1196 }
1197 }
1198 Err(crate::error::Error::parse(
1199 format!("Cannot find column '{}' in query", name),
1200 0,
1201 0,
1202 0,
1203 0,
1204 ))
1205 }
1206 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1207 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1208 }),
1209 }
1210 } else {
1211 Err(crate::error::Error::parse(
1212 "Expected SELECT expression for column lookup",
1213 0,
1214 0,
1215 0,
1216 0,
1217 ))
1218 }
1219}
1220
1221fn column_to_index(
1223 set_op_expr: &Expression,
1224 name: &str,
1225 dialect: Option<DialectType>,
1226) -> Result<usize> {
1227 let normalized_name = normalize_column_name(name, dialect);
1228 let mut expr = set_op_expr;
1229 loop {
1230 match expr {
1231 Expression::Union(u) => expr = &u.left,
1232 Expression::Intersect(i) => expr = &i.left,
1233 Expression::Except(e) => expr = &e.left,
1234 Expression::Select(select) => {
1235 for (i, e) in select.expressions.iter().enumerate() {
1236 if let Some(alias_or_name) = get_alias_or_name(e) {
1237 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1238 return Ok(i);
1239 }
1240 }
1241 }
1242 return Err(crate::error::Error::parse(
1243 format!("Cannot find column '{}' in set operation", name),
1244 0,
1245 0,
1246 0,
1247 0,
1248 ));
1249 }
1250 _ => {
1251 return Err(crate::error::Error::parse(
1252 "Expected SELECT or set operation",
1253 0,
1254 0,
1255 0,
1256 0,
1257 ))
1258 }
1259 }
1260 }
1261}
1262
1263fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1264 normalize_name(name, dialect, false, true)
1265}
1266
1267fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1269 if let Expression::Select(select) = select_expr {
1270 let mut trimmed = select.as_ref().clone();
1271 trimmed.expressions = vec![target_expr.clone()];
1272 Expression::Select(Box::new(trimmed))
1273 } else {
1274 select_expr.clone()
1275 }
1276}
1277
1278fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1280 if scope.cte_sources.contains_key(source_name) {
1282 for cte_scope in &scope.cte_scopes {
1283 if let Expression::Cte(cte) = &cte_scope.expression {
1284 if cte.alias.name == source_name {
1285 return Some(cte_scope);
1286 }
1287 }
1288 }
1289 }
1290
1291 if let Some(source_info) = scope.sources.get(source_name) {
1293 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1294 if let Expression::Subquery(sq) = &source_info.expression {
1295 for dt_scope in &scope.derived_table_scopes {
1296 if dt_scope.expression == sq.this {
1297 return Some(dt_scope);
1298 }
1299 }
1300 }
1301 }
1302 }
1303
1304 None
1305}
1306
1307fn find_child_scope_in<'a>(
1311 all_cte_scopes: &[&'a Scope],
1312 scope: &'a Scope,
1313 source_name: &str,
1314) -> Option<&'a Scope> {
1315 for cte_scope in &scope.cte_scopes {
1317 if let Expression::Cte(cte) = &cte_scope.expression {
1318 if cte.alias.name == source_name {
1319 return Some(cte_scope);
1320 }
1321 }
1322 }
1323
1324 for cte_scope in all_cte_scopes {
1326 if let Expression::Cte(cte) = &cte_scope.expression {
1327 if cte.alias.name == source_name {
1328 return Some(cte_scope);
1329 }
1330 }
1331 }
1332
1333 if let Some(source_info) = scope.sources.get(source_name) {
1335 if source_info.is_scope {
1336 if let Expression::Subquery(sq) = &source_info.expression {
1337 for dt_scope in &scope.derived_table_scopes {
1338 if dt_scope.expression == sq.this {
1339 return Some(dt_scope);
1340 }
1341 }
1342 }
1343 }
1344 }
1345
1346 None
1347}
1348
1349fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1351 let mut node = LineageNode::new(
1352 format!("{}.{}", table, column),
1353 Expression::Column(Box::new(crate::expressions::Column {
1354 name: crate::expressions::Identifier::new(column.to_string()),
1355 table: Some(crate::expressions::Identifier::new(table.to_string())),
1356 join_mark: false,
1357 trailing_comments: vec![],
1358 span: None,
1359 inferred_type: None,
1360 })),
1361 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1362 );
1363 node.source_name = table.to_string();
1364 node.source_kind = SourceKind::Table;
1365 node
1366}
1367
1368fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1369 let mut parts: Vec<String> = Vec::new();
1370 if let Some(catalog) = &table_ref.catalog {
1371 parts.push(catalog.name.clone());
1372 }
1373 if let Some(schema) = &table_ref.schema {
1374 parts.push(schema.name.clone());
1375 }
1376 parts.push(table_ref.name.name.clone());
1377 parts.join(".")
1378}
1379
1380fn apply_source_info_context(
1381 node: &mut LineageNode,
1382 source_key: &str,
1383 source_info: &ScopeSourceInfo,
1384) {
1385 node.source_kind = source_info.kind;
1386 node.source_name =
1387 source_info
1388 .lineage_name
1389 .clone()
1390 .unwrap_or_else(|| match &source_info.expression {
1391 Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1392 _ => source_key.to_string(),
1393 });
1394 node.source_alias = source_info.alias.clone();
1395}
1396
1397fn make_table_column_node_from_source(
1398 source_key: &str,
1399 column: &str,
1400 source_info: &ScopeSourceInfo,
1401) -> LineageNode {
1402 let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1403 let mut node = LineageNode::new(
1404 format!("{}.{}", lineage_name, column),
1405 Expression::Column(Box::new(crate::expressions::Column {
1406 name: crate::expressions::Identifier::new(column.to_string()),
1407 table: Some(crate::expressions::Identifier::new(
1408 lineage_name.to_string(),
1409 )),
1410 join_mark: false,
1411 trailing_comments: vec![],
1412 span: None,
1413 inferred_type: None,
1414 })),
1415 source_info.expression.clone(),
1416 );
1417
1418 apply_source_info_context(&mut node, source_key, source_info);
1419
1420 node
1421}
1422
1423#[derive(Debug, Clone)]
1425struct SimpleColumnRef {
1426 table: Option<crate::expressions::Identifier>,
1427 column: String,
1428}
1429
1430fn find_column_refs_in_expr(
1432 expr: &Expression,
1433 dialect: Option<DialectType>,
1434) -> Vec<SimpleColumnRef> {
1435 let mut refs = Vec::new();
1436 collect_column_refs(expr, dialect, &mut refs);
1437 refs
1438}
1439
1440fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1441 match expr {
1442 Expression::Column(col) => {
1443 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1444 }
1445 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1446 _ => false,
1447 }
1448}
1449
1450fn collect_column_refs(
1451 expr: &Expression,
1452 dialect: Option<DialectType>,
1453 refs: &mut Vec<SimpleColumnRef>,
1454) {
1455 let mut stack: Vec<&Expression> = vec![expr];
1456
1457 while let Some(current) = stack.pop() {
1458 match current {
1459 Expression::Column(col) => {
1461 refs.push(SimpleColumnRef {
1462 table: col.table.clone(),
1463 column: col.name.name.clone(),
1464 });
1465 }
1466
1467 Expression::Subquery(_) | Expression::Exists(_) => {}
1469
1470 Expression::And(op)
1472 | Expression::Or(op)
1473 | Expression::Eq(op)
1474 | Expression::Neq(op)
1475 | Expression::Lt(op)
1476 | Expression::Lte(op)
1477 | Expression::Gt(op)
1478 | Expression::Gte(op)
1479 | Expression::Add(op)
1480 | Expression::Sub(op)
1481 | Expression::Mul(op)
1482 | Expression::Div(op)
1483 | Expression::Mod(op)
1484 | Expression::BitwiseAnd(op)
1485 | Expression::BitwiseOr(op)
1486 | Expression::BitwiseXor(op)
1487 | Expression::BitwiseLeftShift(op)
1488 | Expression::BitwiseRightShift(op)
1489 | Expression::Concat(op)
1490 | Expression::Adjacent(op)
1491 | Expression::TsMatch(op)
1492 | Expression::PropertyEQ(op)
1493 | Expression::ArrayContainsAll(op)
1494 | Expression::ArrayContainedBy(op)
1495 | Expression::ArrayOverlaps(op)
1496 | Expression::JSONBContainsAllTopKeys(op)
1497 | Expression::JSONBContainsAnyTopKeys(op)
1498 | Expression::JSONBDeleteAtPath(op)
1499 | Expression::ExtendsLeft(op)
1500 | Expression::ExtendsRight(op)
1501 | Expression::Is(op)
1502 | Expression::MemberOf(op)
1503 | Expression::NullSafeEq(op)
1504 | Expression::NullSafeNeq(op)
1505 | Expression::Glob(op)
1506 | Expression::Match(op) => {
1507 stack.push(&op.left);
1508 stack.push(&op.right);
1509 }
1510
1511 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1513 stack.push(&u.this);
1514 }
1515
1516 Expression::Upper(f)
1518 | Expression::Lower(f)
1519 | Expression::Length(f)
1520 | Expression::LTrim(f)
1521 | Expression::RTrim(f)
1522 | Expression::Reverse(f)
1523 | Expression::Abs(f)
1524 | Expression::Sqrt(f)
1525 | Expression::Cbrt(f)
1526 | Expression::Ln(f)
1527 | Expression::Exp(f)
1528 | Expression::Sign(f)
1529 | Expression::Date(f)
1530 | Expression::Time(f)
1531 | Expression::DateFromUnixDate(f)
1532 | Expression::UnixDate(f)
1533 | Expression::UnixSeconds(f)
1534 | Expression::UnixMillis(f)
1535 | Expression::UnixMicros(f)
1536 | Expression::TimeStrToDate(f)
1537 | Expression::DateToDi(f)
1538 | Expression::DiToDate(f)
1539 | Expression::TsOrDiToDi(f)
1540 | Expression::TsOrDsToDatetime(f)
1541 | Expression::TsOrDsToTimestamp(f)
1542 | Expression::YearOfWeek(f)
1543 | Expression::YearOfWeekIso(f)
1544 | Expression::Initcap(f)
1545 | Expression::Ascii(f)
1546 | Expression::Chr(f)
1547 | Expression::Soundex(f)
1548 | Expression::ByteLength(f)
1549 | Expression::Hex(f)
1550 | Expression::LowerHex(f)
1551 | Expression::Unicode(f)
1552 | Expression::Radians(f)
1553 | Expression::Degrees(f)
1554 | Expression::Sin(f)
1555 | Expression::Cos(f)
1556 | Expression::Tan(f)
1557 | Expression::Asin(f)
1558 | Expression::Acos(f)
1559 | Expression::Atan(f)
1560 | Expression::IsNan(f)
1561 | Expression::IsInf(f)
1562 | Expression::ArrayLength(f)
1563 | Expression::ArraySize(f)
1564 | Expression::Cardinality(f)
1565 | Expression::ArrayReverse(f)
1566 | Expression::ArrayDistinct(f)
1567 | Expression::ArrayFlatten(f)
1568 | Expression::ArrayCompact(f)
1569 | Expression::Explode(f)
1570 | Expression::ExplodeOuter(f)
1571 | Expression::ToArray(f)
1572 | Expression::MapFromEntries(f)
1573 | Expression::MapKeys(f)
1574 | Expression::MapValues(f)
1575 | Expression::JsonArrayLength(f)
1576 | Expression::JsonKeys(f)
1577 | Expression::JsonType(f)
1578 | Expression::ParseJson(f)
1579 | Expression::ToJson(f)
1580 | Expression::Typeof(f)
1581 | Expression::BitwiseCount(f)
1582 | Expression::Year(f)
1583 | Expression::Month(f)
1584 | Expression::Day(f)
1585 | Expression::Hour(f)
1586 | Expression::Minute(f)
1587 | Expression::Second(f)
1588 | Expression::DayOfWeek(f)
1589 | Expression::DayOfWeekIso(f)
1590 | Expression::DayOfMonth(f)
1591 | Expression::DayOfYear(f)
1592 | Expression::WeekOfYear(f)
1593 | Expression::Quarter(f)
1594 | Expression::Epoch(f)
1595 | Expression::EpochMs(f)
1596 | Expression::TimeStrToUnix(f)
1597 | Expression::SHA(f)
1598 | Expression::SHA1Digest(f)
1599 | Expression::TimeToUnix(f)
1600 | Expression::JSONBool(f)
1601 | Expression::Int64(f)
1602 | Expression::MD5NumberLower64(f)
1603 | Expression::MD5NumberUpper64(f)
1604 | Expression::DateStrToDate(f)
1605 | Expression::DateToDateStr(f) => {
1606 stack.push(&f.this);
1607 }
1608
1609 Expression::Power(f)
1611 | Expression::NullIf(f)
1612 | Expression::IfNull(f)
1613 | Expression::Nvl(f)
1614 | Expression::UnixToTimeStr(f)
1615 | Expression::Contains(f)
1616 | Expression::StartsWith(f)
1617 | Expression::EndsWith(f)
1618 | Expression::Levenshtein(f)
1619 | Expression::ModFunc(f)
1620 | Expression::Atan2(f)
1621 | Expression::IntDiv(f)
1622 | Expression::AddMonths(f)
1623 | Expression::MonthsBetween(f)
1624 | Expression::NextDay(f)
1625 | Expression::ArrayContains(f)
1626 | Expression::ArrayPosition(f)
1627 | Expression::ArrayAppend(f)
1628 | Expression::ArrayPrepend(f)
1629 | Expression::ArrayUnion(f)
1630 | Expression::ArrayExcept(f)
1631 | Expression::ArrayRemove(f)
1632 | Expression::StarMap(f)
1633 | Expression::MapFromArrays(f)
1634 | Expression::MapContainsKey(f)
1635 | Expression::ElementAt(f)
1636 | Expression::JsonMergePatch(f)
1637 | Expression::JSONBContains(f)
1638 | Expression::JSONBExtract(f) => {
1639 stack.push(&f.this);
1640 stack.push(&f.expression);
1641 }
1642
1643 Expression::Greatest(f)
1645 | Expression::Least(f)
1646 | Expression::Coalesce(f)
1647 | Expression::ArrayConcat(f)
1648 | Expression::ArrayIntersect(f)
1649 | Expression::ArrayZip(f)
1650 | Expression::MapConcat(f)
1651 | Expression::JsonArray(f) => {
1652 for e in &f.expressions {
1653 stack.push(e);
1654 }
1655 }
1656
1657 Expression::Sum(f)
1659 | Expression::Avg(f)
1660 | Expression::Min(f)
1661 | Expression::Max(f)
1662 | Expression::ArrayAgg(f)
1663 | Expression::CountIf(f)
1664 | Expression::Stddev(f)
1665 | Expression::StddevPop(f)
1666 | Expression::StddevSamp(f)
1667 | Expression::Variance(f)
1668 | Expression::VarPop(f)
1669 | Expression::VarSamp(f)
1670 | Expression::Median(f)
1671 | Expression::Mode(f)
1672 | Expression::First(f)
1673 | Expression::Last(f)
1674 | Expression::AnyValue(f)
1675 | Expression::ApproxDistinct(f)
1676 | Expression::ApproxCountDistinct(f)
1677 | Expression::LogicalAnd(f)
1678 | Expression::LogicalOr(f)
1679 | Expression::Skewness(f)
1680 | Expression::ArrayConcatAgg(f)
1681 | Expression::ArrayUniqueAgg(f)
1682 | Expression::BoolXorAgg(f)
1683 | Expression::BitwiseAndAgg(f)
1684 | Expression::BitwiseOrAgg(f)
1685 | Expression::BitwiseXorAgg(f) => {
1686 stack.push(&f.this);
1687 if let Some(ref filter) = f.filter {
1688 stack.push(filter);
1689 }
1690 if let Some((ref expr, _)) = f.having_max {
1691 stack.push(expr);
1692 }
1693 if let Some(ref limit) = f.limit {
1694 stack.push(limit);
1695 }
1696 }
1697
1698 Expression::Function(func) => {
1700 for arg in &func.args {
1701 stack.push(arg);
1702 }
1703 }
1704 Expression::AggregateFunction(func) => {
1705 for arg in &func.args {
1706 stack.push(arg);
1707 }
1708 if let Some(ref filter) = func.filter {
1709 stack.push(filter);
1710 }
1711 if let Some(ref limit) = func.limit {
1712 stack.push(limit);
1713 }
1714 }
1715
1716 Expression::WindowFunction(wf) => {
1718 stack.push(&wf.this);
1719 }
1720
1721 Expression::Alias(a) => {
1723 stack.push(&a.this);
1724 }
1725 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1726 stack.push(&c.this);
1727 if let Some(ref fmt) = c.format {
1728 stack.push(fmt);
1729 }
1730 if let Some(ref def) = c.default {
1731 stack.push(def);
1732 }
1733 }
1734 Expression::Paren(p) => {
1735 stack.push(&p.this);
1736 }
1737 Expression::Annotated(a) => {
1738 stack.push(&a.this);
1739 }
1740 Expression::Case(case) => {
1741 if let Some(ref operand) = case.operand {
1742 stack.push(operand);
1743 }
1744 for (cond, result) in &case.whens {
1745 stack.push(cond);
1746 stack.push(result);
1747 }
1748 if let Some(ref else_expr) = case.else_ {
1749 stack.push(else_expr);
1750 }
1751 }
1752 Expression::Collation(c) => {
1753 stack.push(&c.this);
1754 }
1755 Expression::In(i) => {
1756 stack.push(&i.this);
1757 for e in &i.expressions {
1758 stack.push(e);
1759 }
1760 if let Some(ref q) = i.query {
1761 stack.push(q);
1762 }
1763 if let Some(ref u) = i.unnest {
1764 stack.push(u);
1765 }
1766 }
1767 Expression::Between(b) => {
1768 stack.push(&b.this);
1769 stack.push(&b.low);
1770 stack.push(&b.high);
1771 }
1772 Expression::IsNull(n) => {
1773 stack.push(&n.this);
1774 }
1775 Expression::IsTrue(t) | Expression::IsFalse(t) => {
1776 stack.push(&t.this);
1777 }
1778 Expression::IsJson(j) => {
1779 stack.push(&j.this);
1780 }
1781 Expression::Like(l) | Expression::ILike(l) => {
1782 stack.push(&l.left);
1783 stack.push(&l.right);
1784 if let Some(ref esc) = l.escape {
1785 stack.push(esc);
1786 }
1787 }
1788 Expression::SimilarTo(s) => {
1789 stack.push(&s.this);
1790 stack.push(&s.pattern);
1791 if let Some(ref esc) = s.escape {
1792 stack.push(esc);
1793 }
1794 }
1795 Expression::Ordered(o) => {
1796 stack.push(&o.this);
1797 }
1798 Expression::Array(a) => {
1799 for e in &a.expressions {
1800 stack.push(e);
1801 }
1802 }
1803 Expression::Tuple(t) => {
1804 for e in &t.expressions {
1805 stack.push(e);
1806 }
1807 }
1808 Expression::Struct(s) => {
1809 for (_, e) in &s.fields {
1810 stack.push(e);
1811 }
1812 }
1813 Expression::Subscript(s) => {
1814 stack.push(&s.this);
1815 stack.push(&s.index);
1816 }
1817 Expression::Dot(d) => {
1818 stack.push(&d.this);
1819 }
1820 Expression::MethodCall(m) => {
1821 if !matches!(dialect, Some(DialectType::BigQuery))
1822 || !is_bigquery_safe_namespace_receiver(&m.this)
1823 {
1824 stack.push(&m.this);
1825 }
1826 for arg in &m.args {
1827 stack.push(arg);
1828 }
1829 }
1830 Expression::ArraySlice(s) => {
1831 stack.push(&s.this);
1832 if let Some(ref start) = s.start {
1833 stack.push(start);
1834 }
1835 if let Some(ref end) = s.end {
1836 stack.push(end);
1837 }
1838 }
1839 Expression::Lambda(l) => {
1840 stack.push(&l.body);
1841 }
1842 Expression::NamedArgument(n) => {
1843 stack.push(&n.value);
1844 }
1845 Expression::TryCatch(t) => {
1846 for stmt in &t.try_body {
1847 stack.push(stmt);
1848 }
1849 if let Some(catch_body) = &t.catch_body {
1850 for stmt in catch_body {
1851 stack.push(stmt);
1852 }
1853 }
1854 }
1855 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1856 stack.push(e);
1857 }
1858
1859 Expression::Substring(f) => {
1861 stack.push(&f.this);
1862 stack.push(&f.start);
1863 if let Some(ref len) = f.length {
1864 stack.push(len);
1865 }
1866 }
1867 Expression::Trim(f) => {
1868 stack.push(&f.this);
1869 if let Some(ref chars) = f.characters {
1870 stack.push(chars);
1871 }
1872 }
1873 Expression::Replace(f) => {
1874 stack.push(&f.this);
1875 stack.push(&f.old);
1876 stack.push(&f.new);
1877 }
1878 Expression::IfFunc(f) => {
1879 stack.push(&f.condition);
1880 stack.push(&f.true_value);
1881 if let Some(ref fv) = f.false_value {
1882 stack.push(fv);
1883 }
1884 }
1885 Expression::Nvl2(f) => {
1886 stack.push(&f.this);
1887 stack.push(&f.true_value);
1888 stack.push(&f.false_value);
1889 }
1890 Expression::ConcatWs(f) => {
1891 stack.push(&f.separator);
1892 for e in &f.expressions {
1893 stack.push(e);
1894 }
1895 }
1896 Expression::Count(f) => {
1897 if let Some(ref this) = f.this {
1898 stack.push(this);
1899 }
1900 if let Some(ref filter) = f.filter {
1901 stack.push(filter);
1902 }
1903 }
1904 Expression::GroupConcat(f) => {
1905 stack.push(&f.this);
1906 if let Some(ref sep) = f.separator {
1907 stack.push(sep);
1908 }
1909 if let Some(ref filter) = f.filter {
1910 stack.push(filter);
1911 }
1912 }
1913 Expression::StringAgg(f) => {
1914 stack.push(&f.this);
1915 if let Some(ref sep) = f.separator {
1916 stack.push(sep);
1917 }
1918 if let Some(ref filter) = f.filter {
1919 stack.push(filter);
1920 }
1921 if let Some(ref limit) = f.limit {
1922 stack.push(limit);
1923 }
1924 }
1925 Expression::ListAgg(f) => {
1926 stack.push(&f.this);
1927 if let Some(ref sep) = f.separator {
1928 stack.push(sep);
1929 }
1930 if let Some(ref filter) = f.filter {
1931 stack.push(filter);
1932 }
1933 }
1934 Expression::SumIf(f) => {
1935 stack.push(&f.this);
1936 stack.push(&f.condition);
1937 if let Some(ref filter) = f.filter {
1938 stack.push(filter);
1939 }
1940 }
1941 Expression::DateAdd(f) | Expression::DateSub(f) => {
1942 stack.push(&f.this);
1943 stack.push(&f.interval);
1944 }
1945 Expression::DateDiff(f) => {
1946 stack.push(&f.this);
1947 stack.push(&f.expression);
1948 }
1949 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1950 stack.push(&f.this);
1951 }
1952 Expression::Extract(f) => {
1953 stack.push(&f.this);
1954 }
1955 Expression::Round(f) => {
1956 stack.push(&f.this);
1957 if let Some(ref d) = f.decimals {
1958 stack.push(d);
1959 }
1960 }
1961 Expression::Floor(f) => {
1962 stack.push(&f.this);
1963 if let Some(ref s) = f.scale {
1964 stack.push(s);
1965 }
1966 if let Some(ref t) = f.to {
1967 stack.push(t);
1968 }
1969 }
1970 Expression::Ceil(f) => {
1971 stack.push(&f.this);
1972 if let Some(ref d) = f.decimals {
1973 stack.push(d);
1974 }
1975 if let Some(ref t) = f.to {
1976 stack.push(t);
1977 }
1978 }
1979 Expression::Log(f) => {
1980 stack.push(&f.this);
1981 if let Some(ref b) = f.base {
1982 stack.push(b);
1983 }
1984 }
1985 Expression::AtTimeZone(f) => {
1986 stack.push(&f.this);
1987 stack.push(&f.zone);
1988 }
1989 Expression::Lead(f) | Expression::Lag(f) => {
1990 stack.push(&f.this);
1991 if let Some(ref off) = f.offset {
1992 stack.push(off);
1993 }
1994 if let Some(ref def) = f.default {
1995 stack.push(def);
1996 }
1997 }
1998 Expression::FirstValue(f) | Expression::LastValue(f) => {
1999 stack.push(&f.this);
2000 }
2001 Expression::NthValue(f) => {
2002 stack.push(&f.this);
2003 stack.push(&f.offset);
2004 }
2005 Expression::Position(f) => {
2006 stack.push(&f.substring);
2007 stack.push(&f.string);
2008 if let Some(ref start) = f.start {
2009 stack.push(start);
2010 }
2011 }
2012 Expression::Decode(f) => {
2013 stack.push(&f.this);
2014 for (search, result) in &f.search_results {
2015 stack.push(search);
2016 stack.push(result);
2017 }
2018 if let Some(ref def) = f.default {
2019 stack.push(def);
2020 }
2021 }
2022 Expression::CharFunc(f) => {
2023 for arg in &f.args {
2024 stack.push(arg);
2025 }
2026 }
2027 Expression::ArraySort(f) => {
2028 stack.push(&f.this);
2029 if let Some(ref cmp) = f.comparator {
2030 stack.push(cmp);
2031 }
2032 }
2033 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2034 stack.push(&f.this);
2035 stack.push(&f.separator);
2036 if let Some(ref nr) = f.null_replacement {
2037 stack.push(nr);
2038 }
2039 }
2040 Expression::ArrayFilter(f) => {
2041 stack.push(&f.this);
2042 stack.push(&f.filter);
2043 }
2044 Expression::ArrayTransform(f) => {
2045 stack.push(&f.this);
2046 stack.push(&f.transform);
2047 }
2048 Expression::Sequence(f)
2049 | Expression::Generate(f)
2050 | Expression::ExplodingGenerateSeries(f) => {
2051 stack.push(&f.start);
2052 stack.push(&f.stop);
2053 if let Some(ref step) = f.step {
2054 stack.push(step);
2055 }
2056 }
2057 Expression::JsonExtract(f)
2058 | Expression::JsonExtractScalar(f)
2059 | Expression::JsonQuery(f)
2060 | Expression::JsonValue(f) => {
2061 stack.push(&f.this);
2062 stack.push(&f.path);
2063 }
2064 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2065 stack.push(&f.this);
2066 for p in &f.paths {
2067 stack.push(p);
2068 }
2069 }
2070 Expression::JsonObject(f) => {
2071 for (k, v) in &f.pairs {
2072 stack.push(k);
2073 stack.push(v);
2074 }
2075 }
2076 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2077 stack.push(&f.this);
2078 for (path, val) in &f.path_values {
2079 stack.push(path);
2080 stack.push(val);
2081 }
2082 }
2083 Expression::Overlay(f) => {
2084 stack.push(&f.this);
2085 stack.push(&f.replacement);
2086 stack.push(&f.from);
2087 if let Some(ref len) = f.length {
2088 stack.push(len);
2089 }
2090 }
2091 Expression::Convert(f) => {
2092 stack.push(&f.this);
2093 if let Some(ref style) = f.style {
2094 stack.push(style);
2095 }
2096 }
2097 Expression::ApproxPercentile(f) => {
2098 stack.push(&f.this);
2099 stack.push(&f.percentile);
2100 if let Some(ref acc) = f.accuracy {
2101 stack.push(acc);
2102 }
2103 if let Some(ref filter) = f.filter {
2104 stack.push(filter);
2105 }
2106 }
2107 Expression::Percentile(f)
2108 | Expression::PercentileCont(f)
2109 | Expression::PercentileDisc(f) => {
2110 stack.push(&f.this);
2111 stack.push(&f.percentile);
2112 if let Some(ref filter) = f.filter {
2113 stack.push(filter);
2114 }
2115 }
2116 Expression::WithinGroup(f) => {
2117 stack.push(&f.this);
2118 }
2119 Expression::Left(f) | Expression::Right(f) => {
2120 stack.push(&f.this);
2121 stack.push(&f.length);
2122 }
2123 Expression::Repeat(f) => {
2124 stack.push(&f.this);
2125 stack.push(&f.times);
2126 }
2127 Expression::Lpad(f) | Expression::Rpad(f) => {
2128 stack.push(&f.this);
2129 stack.push(&f.length);
2130 if let Some(ref fill) = f.fill {
2131 stack.push(fill);
2132 }
2133 }
2134 Expression::Split(f) => {
2135 stack.push(&f.this);
2136 stack.push(&f.delimiter);
2137 }
2138 Expression::RegexpLike(f) => {
2139 stack.push(&f.this);
2140 stack.push(&f.pattern);
2141 if let Some(ref flags) = f.flags {
2142 stack.push(flags);
2143 }
2144 }
2145 Expression::RegexpReplace(f) => {
2146 stack.push(&f.this);
2147 stack.push(&f.pattern);
2148 stack.push(&f.replacement);
2149 if let Some(ref flags) = f.flags {
2150 stack.push(flags);
2151 }
2152 }
2153 Expression::RegexpExtract(f) => {
2154 stack.push(&f.this);
2155 stack.push(&f.pattern);
2156 if let Some(ref group) = f.group {
2157 stack.push(group);
2158 }
2159 }
2160 Expression::ToDate(f) => {
2161 stack.push(&f.this);
2162 if let Some(ref fmt) = f.format {
2163 stack.push(fmt);
2164 }
2165 }
2166 Expression::ToTimestamp(f) => {
2167 stack.push(&f.this);
2168 if let Some(ref fmt) = f.format {
2169 stack.push(fmt);
2170 }
2171 }
2172 Expression::DateFormat(f) | Expression::FormatDate(f) => {
2173 stack.push(&f.this);
2174 stack.push(&f.format);
2175 }
2176 Expression::LastDay(f) => {
2177 stack.push(&f.this);
2178 }
2179 Expression::FromUnixtime(f) => {
2180 stack.push(&f.this);
2181 if let Some(ref fmt) = f.format {
2182 stack.push(fmt);
2183 }
2184 }
2185 Expression::UnixTimestamp(f) => {
2186 if let Some(ref this) = f.this {
2187 stack.push(this);
2188 }
2189 if let Some(ref fmt) = f.format {
2190 stack.push(fmt);
2191 }
2192 }
2193 Expression::MakeDate(f) => {
2194 stack.push(&f.year);
2195 stack.push(&f.month);
2196 stack.push(&f.day);
2197 }
2198 Expression::MakeTimestamp(f) => {
2199 stack.push(&f.year);
2200 stack.push(&f.month);
2201 stack.push(&f.day);
2202 stack.push(&f.hour);
2203 stack.push(&f.minute);
2204 stack.push(&f.second);
2205 if let Some(ref tz) = f.timezone {
2206 stack.push(tz);
2207 }
2208 }
2209 Expression::TruncFunc(f) => {
2210 stack.push(&f.this);
2211 if let Some(ref d) = f.decimals {
2212 stack.push(d);
2213 }
2214 }
2215 Expression::ArrayFunc(f) => {
2216 for e in &f.expressions {
2217 stack.push(e);
2218 }
2219 }
2220 Expression::Unnest(f) => {
2221 stack.push(&f.this);
2222 for e in &f.expressions {
2223 stack.push(e);
2224 }
2225 }
2226 Expression::StructFunc(f) => {
2227 for (_, e) in &f.fields {
2228 stack.push(e);
2229 }
2230 }
2231 Expression::StructExtract(f) => {
2232 stack.push(&f.this);
2233 }
2234 Expression::NamedStruct(f) => {
2235 for (k, v) in &f.pairs {
2236 stack.push(k);
2237 stack.push(v);
2238 }
2239 }
2240 Expression::MapFunc(f) => {
2241 for k in &f.keys {
2242 stack.push(k);
2243 }
2244 for v in &f.values {
2245 stack.push(v);
2246 }
2247 }
2248 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2249 stack.push(&f.this);
2250 stack.push(&f.transform);
2251 }
2252 Expression::JsonArrayAgg(f) => {
2253 stack.push(&f.this);
2254 if let Some(ref filter) = f.filter {
2255 stack.push(filter);
2256 }
2257 }
2258 Expression::JsonObjectAgg(f) => {
2259 stack.push(&f.key);
2260 stack.push(&f.value);
2261 if let Some(ref filter) = f.filter {
2262 stack.push(filter);
2263 }
2264 }
2265 Expression::NTile(f) => {
2266 if let Some(ref n) = f.num_buckets {
2267 stack.push(n);
2268 }
2269 }
2270 Expression::Rand(f) => {
2271 if let Some(ref s) = f.seed {
2272 stack.push(s);
2273 }
2274 if let Some(ref lo) = f.lower {
2275 stack.push(lo);
2276 }
2277 if let Some(ref hi) = f.upper {
2278 stack.push(hi);
2279 }
2280 }
2281 Expression::Any(q) | Expression::All(q) => {
2282 stack.push(&q.this);
2283 stack.push(&q.subquery);
2284 }
2285 Expression::Overlaps(o) => {
2286 if let Some(ref this) = o.this {
2287 stack.push(this);
2288 }
2289 if let Some(ref expr) = o.expression {
2290 stack.push(expr);
2291 }
2292 if let Some(ref ls) = o.left_start {
2293 stack.push(ls);
2294 }
2295 if let Some(ref le) = o.left_end {
2296 stack.push(le);
2297 }
2298 if let Some(ref rs) = o.right_start {
2299 stack.push(rs);
2300 }
2301 if let Some(ref re) = o.right_end {
2302 stack.push(re);
2303 }
2304 }
2305 Expression::Interval(i) => {
2306 if let Some(ref this) = i.this {
2307 stack.push(this);
2308 }
2309 }
2310 Expression::TimeStrToTime(f) => {
2311 stack.push(&f.this);
2312 if let Some(ref zone) = f.zone {
2313 stack.push(zone);
2314 }
2315 }
2316 Expression::JSONBExtractScalar(f) => {
2317 stack.push(&f.this);
2318 stack.push(&f.expression);
2319 if let Some(ref jt) = f.json_type {
2320 stack.push(jt);
2321 }
2322 }
2323
2324 _ => {}
2329 }
2330 }
2331}
2332
2333#[cfg(test)]
2338mod tests {
2339 use super::*;
2340 use crate::dialects::{Dialect, DialectType};
2341 use crate::expressions::DataType;
2342 use crate::optimizer::annotate_types::annotate_types;
2343 use crate::parse_one;
2344 use crate::schema::{MappingSchema, Schema};
2345
2346 fn parse(sql: &str) -> Expression {
2347 let dialect = Dialect::get(DialectType::Generic);
2348 let ast = dialect.parse(sql).unwrap();
2349 ast.into_iter().next().unwrap()
2350 }
2351
2352 #[test]
2353 fn test_simple_lineage() {
2354 let expr = parse("SELECT a FROM t");
2355 let node = lineage("a", &expr, None, false).unwrap();
2356
2357 assert_eq!(node.name, "a");
2358 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2359 let names = node.downstream_names();
2361 assert!(
2362 names.iter().any(|n| n == "t.a"),
2363 "Expected t.a in downstream, got: {:?}",
2364 names
2365 );
2366 }
2367
2368 #[test]
2369 fn test_lineage_walk() {
2370 let root = LineageNode {
2371 name: "col_a".to_string(),
2372 expression: Expression::Null(crate::expressions::Null),
2373 source: Expression::Null(crate::expressions::Null),
2374 downstream: vec![LineageNode::new(
2375 "t.a",
2376 Expression::Null(crate::expressions::Null),
2377 Expression::Null(crate::expressions::Null),
2378 )],
2379 source_name: String::new(),
2380 source_kind: SourceKind::Unknown,
2381 source_alias: None,
2382 reference_node_name: String::new(),
2383 };
2384
2385 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2386 assert_eq!(names.len(), 2);
2387 assert_eq!(names[0], "col_a");
2388 assert_eq!(names[1], "t.a");
2389 }
2390
2391 #[test]
2392 fn test_aliased_column() {
2393 let expr = parse("SELECT a + 1 AS b FROM t");
2394 let node = lineage("b", &expr, None, false).unwrap();
2395
2396 assert_eq!(node.name, "b");
2397 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2399 assert!(
2400 all_names.iter().any(|n| n.contains("a")),
2401 "Expected to trace to column a, got: {:?}",
2402 all_names
2403 );
2404 }
2405
2406 #[test]
2407 fn test_qualified_column() {
2408 let expr = parse("SELECT t.a FROM t");
2409 let node = lineage("a", &expr, None, false).unwrap();
2410
2411 assert_eq!(node.name, "a");
2412 let names = node.downstream_names();
2413 assert!(
2414 names.iter().any(|n| n == "t.a"),
2415 "Expected t.a, got: {:?}",
2416 names
2417 );
2418 }
2419
2420 #[test]
2421 fn test_unqualified_column() {
2422 let expr = parse("SELECT a FROM t");
2423 let node = lineage("a", &expr, None, false).unwrap();
2424
2425 let names = node.downstream_names();
2427 assert!(
2428 names.iter().any(|n| n == "t.a"),
2429 "Expected t.a, got: {:?}",
2430 names
2431 );
2432 }
2433
2434 #[test]
2435 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2436 let query = "SELECT name FROM users";
2437 let dialect = Dialect::get(DialectType::BigQuery);
2438 let expr = dialect
2439 .parse(query)
2440 .unwrap()
2441 .into_iter()
2442 .next()
2443 .expect("expected one expression");
2444
2445 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2446 schema
2447 .add_table("users", &[("name".into(), DataType::Text)], None)
2448 .expect("schema setup");
2449
2450 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2451 .expect("lineage without schema");
2452 let mut expr_without = node_without_schema.expression.clone();
2453 annotate_types(
2454 &mut expr_without,
2455 Some(&schema),
2456 Some(DialectType::BigQuery),
2457 );
2458 assert_eq!(
2459 expr_without.inferred_type(),
2460 None,
2461 "Expected unresolved root type without schema-aware lineage qualification"
2462 );
2463
2464 let node_with_schema = lineage_with_schema(
2465 "name",
2466 &expr,
2467 Some(&schema),
2468 Some(DialectType::BigQuery),
2469 false,
2470 )
2471 .expect("lineage with schema");
2472 let mut expr_with = node_with_schema.expression.clone();
2473 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2474
2475 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2476 }
2477
2478 #[test]
2479 fn test_lineage_with_schema_correlated_scalar_subquery() {
2480 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2481 let dialect = Dialect::get(DialectType::BigQuery);
2482 let expr = dialect
2483 .parse(query)
2484 .unwrap()
2485 .into_iter()
2486 .next()
2487 .expect("expected one expression");
2488
2489 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2490 schema
2491 .add_table(
2492 "t1",
2493 &[("id".into(), DataType::BigInt { length: None })],
2494 None,
2495 )
2496 .expect("schema setup");
2497 schema
2498 .add_table(
2499 "t2",
2500 &[
2501 ("id".into(), DataType::BigInt { length: None }),
2502 ("val".into(), DataType::BigInt { length: None }),
2503 ],
2504 None,
2505 )
2506 .expect("schema setup");
2507
2508 let node = lineage_with_schema(
2509 "id",
2510 &expr,
2511 Some(&schema),
2512 Some(DialectType::BigQuery),
2513 false,
2514 )
2515 .expect("lineage_with_schema should handle correlated scalar subqueries");
2516
2517 assert_eq!(node.name, "id");
2518 }
2519
2520 #[test]
2521 fn test_lineage_with_schema_join_using() {
2522 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2523 let dialect = Dialect::get(DialectType::BigQuery);
2524 let expr = dialect
2525 .parse(query)
2526 .unwrap()
2527 .into_iter()
2528 .next()
2529 .expect("expected one expression");
2530
2531 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2532 schema
2533 .add_table(
2534 "t1",
2535 &[("a".into(), DataType::BigInt { length: None })],
2536 None,
2537 )
2538 .expect("schema setup");
2539 schema
2540 .add_table(
2541 "t2",
2542 &[("a".into(), DataType::BigInt { length: None })],
2543 None,
2544 )
2545 .expect("schema setup");
2546
2547 let node = lineage_with_schema(
2548 "a",
2549 &expr,
2550 Some(&schema),
2551 Some(DialectType::BigQuery),
2552 false,
2553 )
2554 .expect("lineage_with_schema should handle JOIN USING");
2555
2556 assert_eq!(node.name, "a");
2557 }
2558
2559 #[test]
2560 fn test_lineage_with_schema_qualified_table_name() {
2561 let query = "SELECT a FROM raw.t1";
2562 let dialect = Dialect::get(DialectType::BigQuery);
2563 let expr = dialect
2564 .parse(query)
2565 .unwrap()
2566 .into_iter()
2567 .next()
2568 .expect("expected one expression");
2569
2570 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2571 schema
2572 .add_table(
2573 "raw.t1",
2574 &[("a".into(), DataType::BigInt { length: None })],
2575 None,
2576 )
2577 .expect("schema setup");
2578
2579 let node = lineage_with_schema(
2580 "a",
2581 &expr,
2582 Some(&schema),
2583 Some(DialectType::BigQuery),
2584 false,
2585 )
2586 .expect("lineage_with_schema should handle dotted schema.table names");
2587
2588 assert_eq!(node.name, "a");
2589 }
2590
2591 #[test]
2592 fn test_lineage_with_schema_none_matches_lineage() {
2593 let expr = parse("SELECT a FROM t");
2594 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2595 let with_none =
2596 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2597
2598 assert_eq!(with_none.name, baseline.name);
2599 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2600 }
2601
2602 #[test]
2603 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2604 let dialect = Dialect::get(DialectType::BigQuery);
2605 let expr = dialect
2606 .parse("SELECT Name AS name FROM teams")
2607 .unwrap()
2608 .into_iter()
2609 .next()
2610 .expect("expected one expression");
2611
2612 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2613 schema
2614 .add_table(
2615 "teams",
2616 &[("Name".into(), DataType::String { length: None })],
2617 None,
2618 )
2619 .expect("schema setup");
2620
2621 let node = lineage_with_schema(
2622 "name",
2623 &expr,
2624 Some(&schema),
2625 Some(DialectType::BigQuery),
2626 false,
2627 )
2628 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2629
2630 let names = node.downstream_names();
2631 assert!(
2632 names.iter().any(|n| n == "teams.Name"),
2633 "Expected teams.Name in downstream, got: {:?}",
2634 names
2635 );
2636 }
2637
2638 #[test]
2639 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2640 let dialect = Dialect::get(DialectType::BigQuery);
2641 let expr = dialect
2642 .parse("SELECT Name AS Name FROM teams")
2643 .unwrap()
2644 .into_iter()
2645 .next()
2646 .expect("expected one expression");
2647
2648 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2649 .expect("lineage should resolve mixed-case aliases in BigQuery");
2650
2651 assert_eq!(node.name, "name");
2652 }
2653
2654 #[test]
2655 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2656 let expr = parse_one(
2657 r#"
2658SELECT date_val AS week_start
2659FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2660"#,
2661 DialectType::BigQuery,
2662 )
2663 .expect("parse");
2664
2665 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2666 .expect("lineage should resolve UNNEST alias as a source");
2667 let child = node
2668 .downstream
2669 .first()
2670 .expect("week_start should have downstream lineage");
2671
2672 assert_eq!(child.name, "_0.date_val");
2673 assert_eq!(child.source_name, "_0");
2674 assert_eq!(child.source_kind, SourceKind::Virtual);
2675 assert_eq!(child.source_alias.as_deref(), Some("date_val"));
2676
2677 let Expression::Column(column) = &child.expression else {
2678 panic!(
2679 "expected downstream column expression, got {:?}",
2680 child.expression
2681 );
2682 };
2683 assert_eq!(column.name.name, "date_val");
2684 assert_eq!(
2685 column.table.as_ref().map(|table| table.name.as_str()),
2686 Some("_0")
2687 );
2688 assert!(
2689 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2690 "expected UNNEST source expression, got {:?}",
2691 child.source
2692 );
2693 }
2694
2695 #[test]
2696 fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
2697 let expr =
2698 parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
2699
2700 let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2701 let child = node.downstream.first().expect("id should have lineage");
2702
2703 assert_eq!(child.name, "date_val.id");
2704 assert_eq!(child.source_name, "date_val");
2705 assert_eq!(child.source_kind, SourceKind::Table);
2706 assert_eq!(child.source_alias, None);
2707 }
2708
2709 #[test]
2710 fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
2711 let expr = parse_one(
2712 r#"
2713SELECT a.a AS first_value, b.b AS second_value
2714FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
2715JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
2716"#,
2717 DialectType::BigQuery,
2718 )
2719 .expect("parse");
2720
2721 let first =
2722 lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2723 let second =
2724 lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2725
2726 let first_child = first.downstream.first().expect("first source");
2727 let second_child = second.downstream.first().expect("second source");
2728
2729 assert_eq!(first_child.name, "_0.a");
2730 assert_eq!(first_child.source_name, "_0");
2731 assert_eq!(first_child.source_alias.as_deref(), Some("a"));
2732 assert_eq!(first_child.source_kind, SourceKind::Virtual);
2733
2734 assert_eq!(second_child.name, "_1.b");
2735 assert_eq!(second_child.source_name, "_1");
2736 assert_eq!(second_child.source_alias.as_deref(), Some("b"));
2737 assert_eq!(second_child.source_kind, SourceKind::Virtual);
2738 }
2739
2740 #[test]
2741 fn test_lineage_table_backed_unnest_points_to_real_source_column() {
2742 let expr = parse_one(
2743 r#"
2744SELECT item.item AS item
2745FROM t JOIN UNNEST(t.items) AS item ON TRUE
2746"#,
2747 DialectType::BigQuery,
2748 )
2749 .expect("parse");
2750
2751 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2752 let virtual_child = node.downstream.first().expect("virtual item source");
2753 assert_eq!(virtual_child.name, "_0.item");
2754 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2755
2756 let real_child = virtual_child
2757 .downstream
2758 .first()
2759 .expect("UNNEST(t.items) should depend on t.items");
2760 assert_eq!(real_child.name, "t.items");
2761 assert_eq!(real_child.source_name, "t");
2762 assert_eq!(real_child.source_kind, SourceKind::Table);
2763 }
2764
2765 #[test]
2766 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2767 let expr = parse_one(
2768 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2769 DialectType::Snowflake,
2770 )
2771 .expect("parse");
2772
2773 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2774 schema
2775 .add_table(
2776 "fact.some_daily_metrics",
2777 &[("date_utc".to_string(), DataType::Date)],
2778 None,
2779 )
2780 .expect("schema setup");
2781
2782 let node = lineage_with_schema(
2783 "recency",
2784 &expr,
2785 Some(&schema),
2786 Some(DialectType::Snowflake),
2787 false,
2788 )
2789 .expect("lineage_with_schema should not treat date part as a column");
2790
2791 let names = node.downstream_names();
2792 assert!(
2793 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2794 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2795 names
2796 );
2797 assert!(
2798 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2799 "Did not expect date part to appear as lineage column, got: {:?}",
2800 names
2801 );
2802 }
2803
2804 #[test]
2805 fn test_snowflake_datediff_parses_to_typed_ast() {
2806 let expr = parse_one(
2807 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2808 DialectType::Snowflake,
2809 )
2810 .expect("parse");
2811
2812 match expr {
2813 Expression::Select(select) => match &select.expressions[0] {
2814 Expression::Alias(alias) => match &alias.this {
2815 Expression::DateDiff(f) => {
2816 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2817 }
2818 other => panic!("expected DateDiff, got {other:?}"),
2819 },
2820 other => panic!("expected Alias, got {other:?}"),
2821 },
2822 other => panic!("expected Select, got {other:?}"),
2823 }
2824 }
2825
2826 #[test]
2827 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2828 let expr = parse_one(
2829 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2830 DialectType::Snowflake,
2831 )
2832 .expect("parse");
2833
2834 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2835 schema
2836 .add_table(
2837 "fact.some_daily_metrics",
2838 &[("date_utc".to_string(), DataType::Date)],
2839 None,
2840 )
2841 .expect("schema setup");
2842
2843 let node = lineage_with_schema(
2844 "next_day",
2845 &expr,
2846 Some(&schema),
2847 Some(DialectType::Snowflake),
2848 false,
2849 )
2850 .expect("lineage_with_schema should not treat DATEADD date part as a column");
2851
2852 let names = node.downstream_names();
2853 assert!(
2854 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2855 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2856 names
2857 );
2858 assert!(
2859 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2860 "Did not expect date part to appear as lineage column, got: {:?}",
2861 names
2862 );
2863 }
2864
2865 #[test]
2866 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2867 let expr = parse_one(
2868 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2869 DialectType::Snowflake,
2870 )
2871 .expect("parse");
2872
2873 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2874 schema
2875 .add_table(
2876 "fact.some_daily_metrics",
2877 &[("date_utc".to_string(), DataType::Date)],
2878 None,
2879 )
2880 .expect("schema setup");
2881
2882 let node = lineage_with_schema(
2883 "day_part",
2884 &expr,
2885 Some(&schema),
2886 Some(DialectType::Snowflake),
2887 false,
2888 )
2889 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2890
2891 let names = node.downstream_names();
2892 assert!(
2893 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2894 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2895 names
2896 );
2897 assert!(
2898 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2899 "Did not expect date part to appear as lineage column, got: {:?}",
2900 names
2901 );
2902 }
2903
2904 #[test]
2905 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2906 let expr = parse_one(
2907 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2908 DialectType::Snowflake,
2909 )
2910 .expect("parse");
2911
2912 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2913 schema
2914 .add_table(
2915 "fact.some_daily_metrics",
2916 &[("date_utc".to_string(), DataType::Date)],
2917 None,
2918 )
2919 .expect("schema setup");
2920
2921 let node = lineage_with_schema(
2922 "day_part",
2923 &expr,
2924 Some(&schema),
2925 Some(DialectType::Snowflake),
2926 false,
2927 )
2928 .expect("quoted DATE_PART should continue to work");
2929
2930 let names = node.downstream_names();
2931 assert!(
2932 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2933 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2934 names
2935 );
2936 }
2937
2938 #[test]
2939 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2940 let expr = parse_one(
2941 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2942 DialectType::Snowflake,
2943 )
2944 .expect("parse");
2945
2946 match expr {
2947 Expression::Select(select) => match &select.expressions[0] {
2948 Expression::Alias(alias) => match &alias.this {
2949 Expression::Function(f) => {
2950 assert_eq!(f.name.to_uppercase(), "DATEADD");
2951 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2952 }
2953 other => panic!("expected generic DATEADD function, got {other:?}"),
2954 },
2955 other => panic!("expected Alias, got {other:?}"),
2956 },
2957 other => panic!("expected Select, got {other:?}"),
2958 }
2959 }
2960
2961 #[test]
2962 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2963 let expr = parse_one(
2964 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2965 DialectType::Snowflake,
2966 )
2967 .expect("parse");
2968
2969 match expr {
2970 Expression::Select(select) => match &select.expressions[0] {
2971 Expression::Alias(alias) => match &alias.this {
2972 Expression::Function(f) => {
2973 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2974 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2975 }
2976 other => panic!("expected generic DATE_PART function, got {other:?}"),
2977 },
2978 other => panic!("expected Alias, got {other:?}"),
2979 },
2980 other => panic!("expected Select, got {other:?}"),
2981 }
2982 }
2983
2984 #[test]
2985 fn test_snowflake_date_part_string_literal_stays_generic_function() {
2986 let expr = parse_one(
2987 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2988 DialectType::Snowflake,
2989 )
2990 .expect("parse");
2991
2992 match expr {
2993 Expression::Select(select) => match &select.expressions[0] {
2994 Expression::Alias(alias) => match &alias.this {
2995 Expression::Function(f) => {
2996 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2997 }
2998 other => panic!("expected generic DATE_PART function, got {other:?}"),
2999 },
3000 other => panic!("expected Alias, got {other:?}"),
3001 },
3002 other => panic!("expected Select, got {other:?}"),
3003 }
3004 }
3005
3006 #[test]
3007 fn test_lineage_join() {
3008 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3009
3010 let node_a = lineage("a", &expr, None, false).unwrap();
3011 let names_a = node_a.downstream_names();
3012 assert!(
3013 names_a.iter().any(|n| n == "t.a"),
3014 "Expected t.a, got: {:?}",
3015 names_a
3016 );
3017
3018 let node_b = lineage("b", &expr, None, false).unwrap();
3019 let names_b = node_b.downstream_names();
3020 assert!(
3021 names_b.iter().any(|n| n == "s.b"),
3022 "Expected s.b, got: {:?}",
3023 names_b
3024 );
3025 }
3026
3027 #[test]
3028 fn test_lineage_alias_leaf_has_resolved_source_name() {
3029 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3030 let node = lineage("col1", &expr, None, false).unwrap();
3031
3032 let names = node.downstream_names();
3034 assert!(
3035 names.iter().any(|n| n == "t1.col1"),
3036 "Expected aliased column edge t1.col1, got: {:?}",
3037 names
3038 );
3039
3040 let leaf = node
3042 .downstream
3043 .iter()
3044 .find(|n| n.name == "t1.col1")
3045 .expect("Expected t1.col1 leaf");
3046 assert_eq!(leaf.source_name, "table1");
3047 match &leaf.source {
3048 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3049 _ => panic!("Expected leaf source to be a table expression"),
3050 }
3051 }
3052
3053 #[test]
3054 fn test_lineage_derived_table() {
3055 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3056 let node = lineage("a", &expr, None, false).unwrap();
3057
3058 assert_eq!(node.name, "a");
3059 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3061 assert!(
3062 all_names.iter().any(|n| n == "t.a"),
3063 "Expected to trace through derived table to t.a, got: {:?}",
3064 all_names
3065 );
3066 }
3067
3068 #[test]
3069 fn test_lineage_cte() {
3070 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3071 let node = lineage("a", &expr, None, false).unwrap();
3072
3073 assert_eq!(node.name, "a");
3074 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3075 assert!(
3076 all_names.iter().any(|n| n == "t.a"),
3077 "Expected to trace through CTE to t.a, got: {:?}",
3078 all_names
3079 );
3080 }
3081
3082 #[test]
3083 fn test_lineage_union() {
3084 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3085 let node = lineage("a", &expr, None, false).unwrap();
3086
3087 assert_eq!(node.name, "a");
3088 assert_eq!(
3090 node.downstream.len(),
3091 2,
3092 "Expected 2 branches for UNION, got {}",
3093 node.downstream.len()
3094 );
3095 }
3096
3097 #[test]
3098 fn test_lineage_cte_union() {
3099 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3100 let node = lineage("a", &expr, None, false).unwrap();
3101
3102 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3104 assert!(
3105 all_names.len() >= 3,
3106 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3107 all_names
3108 );
3109 }
3110
3111 #[test]
3112 fn test_lineage_star() {
3113 let expr = parse("SELECT * FROM t");
3114 let node = lineage("*", &expr, None, false).unwrap();
3115
3116 assert_eq!(node.name, "*");
3117 assert!(
3119 !node.downstream.is_empty(),
3120 "Star should produce downstream nodes"
3121 );
3122 }
3123
3124 #[test]
3125 fn test_lineage_subquery_in_select() {
3126 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3127 let node = lineage("x", &expr, None, false).unwrap();
3128
3129 assert_eq!(node.name, "x");
3130 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3132 assert!(
3133 all_names.len() >= 2,
3134 "Expected tracing into scalar subquery, got: {:?}",
3135 all_names
3136 );
3137 }
3138
3139 #[test]
3140 fn test_lineage_multiple_columns() {
3141 let expr = parse("SELECT a, b FROM t");
3142
3143 let node_a = lineage("a", &expr, None, false).unwrap();
3144 let node_b = lineage("b", &expr, None, false).unwrap();
3145
3146 assert_eq!(node_a.name, "a");
3147 assert_eq!(node_b.name, "b");
3148
3149 let names_a = node_a.downstream_names();
3151 let names_b = node_b.downstream_names();
3152 assert!(names_a.iter().any(|n| n == "t.a"));
3153 assert!(names_b.iter().any(|n| n == "t.b"));
3154 }
3155
3156 #[test]
3157 fn test_get_source_tables() {
3158 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3159 let node = lineage("a", &expr, None, false).unwrap();
3160
3161 let tables = get_source_tables(&node);
3162 assert!(
3163 tables.contains("t"),
3164 "Expected source table 't', got: {:?}",
3165 tables
3166 );
3167 }
3168
3169 #[test]
3170 fn test_lineage_column_not_found() {
3171 let expr = parse("SELECT a FROM t");
3172 let result = lineage("nonexistent", &expr, None, false);
3173 assert!(result.is_err());
3174 }
3175
3176 #[test]
3177 fn test_lineage_nested_cte() {
3178 let expr = parse(
3179 "WITH cte1 AS (SELECT a FROM t), \
3180 cte2 AS (SELECT a FROM cte1) \
3181 SELECT a FROM cte2",
3182 );
3183 let node = lineage("a", &expr, None, false).unwrap();
3184
3185 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3187 assert!(
3188 all_names.len() >= 3,
3189 "Expected to trace through nested CTEs, got: {:?}",
3190 all_names
3191 );
3192 }
3193
3194 #[test]
3195 fn test_trim_selects_true() {
3196 let expr = parse("SELECT a, b, c FROM t");
3197 let node = lineage("a", &expr, None, true).unwrap();
3198
3199 if let Expression::Select(select) = &node.source {
3201 assert_eq!(
3202 select.expressions.len(),
3203 1,
3204 "Trimmed source should have 1 expression, got {}",
3205 select.expressions.len()
3206 );
3207 } else {
3208 panic!("Expected Select source");
3209 }
3210 }
3211
3212 #[test]
3213 fn test_trim_selects_false() {
3214 let expr = parse("SELECT a, b, c FROM t");
3215 let node = lineage("a", &expr, None, false).unwrap();
3216
3217 if let Expression::Select(select) = &node.source {
3219 assert_eq!(
3220 select.expressions.len(),
3221 3,
3222 "Untrimmed source should have 3 expressions"
3223 );
3224 } else {
3225 panic!("Expected Select source");
3226 }
3227 }
3228
3229 #[test]
3230 fn test_lineage_expression_in_select() {
3231 let expr = parse("SELECT a + b AS c FROM t");
3232 let node = lineage("c", &expr, None, false).unwrap();
3233
3234 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3236 assert!(
3237 all_names.len() >= 3,
3238 "Expected to trace a + b to both columns, got: {:?}",
3239 all_names
3240 );
3241 }
3242
3243 #[test]
3244 fn test_set_operation_by_index() {
3245 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3246
3247 let node = lineage("a", &expr, None, false).unwrap();
3249
3250 assert_eq!(node.downstream.len(), 2);
3252 }
3253
3254 fn print_node(node: &LineageNode, indent: usize) {
3257 let pad = " ".repeat(indent);
3258 println!(
3259 "{pad}name={:?} source_name={:?}",
3260 node.name, node.source_name
3261 );
3262 for child in &node.downstream {
3263 print_node(child, indent + 1);
3264 }
3265 }
3266
3267 #[test]
3268 fn test_issue18_repro() {
3269 let query = "SELECT UPPER(name) as upper_name FROM users";
3271 println!("Query: {query}\n");
3272
3273 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3274 let exprs = dialect.parse(query).unwrap();
3275 let expr = &exprs[0];
3276
3277 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3278 println!("lineage(\"upper_name\"):");
3279 print_node(&node, 1);
3280
3281 let names = node.downstream_names();
3282 assert!(
3283 names.iter().any(|n| n == "users.name"),
3284 "Expected users.name in downstream, got: {:?}",
3285 names
3286 );
3287 }
3288
3289 #[test]
3290 fn test_lineage_bigquery_safe_namespace_issue207() {
3291 let query = r#"
3292WITH import_cte AS (
3293 SELECT timestamp, data, operation
3294 FROM `project`.`dataset`.`source_table`
3295),
3296transform_cte AS (
3297 SELECT
3298 timestamp,
3299 SAFE.PARSE_JSON(data) AS json_data
3300 FROM import_cte
3301)
3302SELECT json_data FROM transform_cte
3303"#;
3304 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3305 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3306 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3307 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3308
3309 assert!(
3310 names.iter().any(|name| name == "source_table.data"),
3311 "expected source_table.data in lineage, got {names:?}"
3312 );
3313 assert!(
3314 !names
3315 .iter()
3316 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3317 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3318 );
3319 }
3320
3321 #[test]
3322 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3323 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3324 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3325 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3326 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3327
3328 assert!(
3329 names.iter().any(|name| name == "t.data"),
3330 "expected t.data in lineage, got {names:?}"
3331 );
3332 assert!(
3333 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3334 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3335 );
3336 }
3337
3338 #[test]
3339 fn test_lineage_method_call_receiver_control() {
3340 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3341 let node = lineage("out", &expr, None, false).expect("lineage");
3342 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3343
3344 assert!(
3345 names.iter().any(|name| name == "t.obj"),
3346 "expected ordinary method receiver to remain in lineage, got {names:?}"
3347 );
3348 assert!(
3349 names.iter().any(|name| name == "t.arg"),
3350 "expected method argument in lineage, got {names:?}"
3351 );
3352 }
3353
3354 #[test]
3355 fn test_lineage_upper_function() {
3356 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3357 let node = lineage("upper_name", &expr, None, false).unwrap();
3358
3359 let names = node.downstream_names();
3360 assert!(
3361 names.iter().any(|n| n == "users.name"),
3362 "Expected users.name in downstream, got: {:?}",
3363 names
3364 );
3365 }
3366
3367 #[test]
3368 fn test_lineage_round_function() {
3369 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3370 let node = lineage("rounded", &expr, None, false).unwrap();
3371
3372 let names = node.downstream_names();
3373 assert!(
3374 names.iter().any(|n| n == "products.price"),
3375 "Expected products.price in downstream, got: {:?}",
3376 names
3377 );
3378 }
3379
3380 #[test]
3381 fn test_lineage_coalesce_function() {
3382 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3383 let node = lineage("val", &expr, None, false).unwrap();
3384
3385 let names = node.downstream_names();
3386 assert!(
3387 names.iter().any(|n| n == "t.a"),
3388 "Expected t.a in downstream, got: {:?}",
3389 names
3390 );
3391 assert!(
3392 names.iter().any(|n| n == "t.b"),
3393 "Expected t.b in downstream, got: {:?}",
3394 names
3395 );
3396 }
3397
3398 #[test]
3399 fn test_lineage_count_function() {
3400 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3401 let node = lineage("cnt", &expr, None, false).unwrap();
3402
3403 let names = node.downstream_names();
3404 assert!(
3405 names.iter().any(|n| n == "t.id"),
3406 "Expected t.id in downstream, got: {:?}",
3407 names
3408 );
3409 }
3410
3411 #[test]
3412 fn test_lineage_sum_function() {
3413 let expr = parse("SELECT SUM(amount) AS total FROM t");
3414 let node = lineage("total", &expr, None, false).unwrap();
3415
3416 let names = node.downstream_names();
3417 assert!(
3418 names.iter().any(|n| n == "t.amount"),
3419 "Expected t.amount in downstream, got: {:?}",
3420 names
3421 );
3422 }
3423
3424 #[test]
3425 fn test_lineage_case_with_nested_functions() {
3426 let expr =
3427 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3428 let node = lineage("result", &expr, None, false).unwrap();
3429
3430 let names = node.downstream_names();
3431 assert!(
3432 names.iter().any(|n| n == "t.x"),
3433 "Expected t.x in downstream, got: {:?}",
3434 names
3435 );
3436 assert!(
3437 names.iter().any(|n| n == "t.name"),
3438 "Expected t.name in downstream, got: {:?}",
3439 names
3440 );
3441 }
3442
3443 #[test]
3444 fn test_lineage_substring_function() {
3445 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3446 let node = lineage("short", &expr, None, false).unwrap();
3447
3448 let names = node.downstream_names();
3449 assert!(
3450 names.iter().any(|n| n == "t.name"),
3451 "Expected t.name in downstream, got: {:?}",
3452 names
3453 );
3454 }
3455
3456 #[test]
3459 fn test_lineage_cte_select_star() {
3460 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3464 let node = lineage("a", &expr, None, false).unwrap();
3465
3466 assert_eq!(node.name, "a");
3467 assert!(
3470 !node.downstream.is_empty(),
3471 "Expected downstream nodes tracing through CTE, got none"
3472 );
3473 }
3474
3475 #[test]
3476 fn test_lineage_cte_select_star_renamed_column() {
3477 let expr =
3480 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3481 let node = lineage("customer_id", &expr, None, false).unwrap();
3482
3483 assert_eq!(node.name, "customer_id");
3484 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3486 assert!(
3487 all_names.len() >= 2,
3488 "Expected at least 2 nodes (customer_id → source), got: {:?}",
3489 all_names
3490 );
3491 }
3492
3493 #[test]
3494 fn test_lineage_cte_select_star_multiple_columns() {
3495 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3497
3498 for col in &["a", "b", "c"] {
3499 let node = lineage(col, &expr, None, false).unwrap();
3500 assert_eq!(node.name, *col);
3501 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3503 assert!(
3504 all_names.len() >= 2,
3505 "Expected at least 2 nodes for column {}, got: {:?}",
3506 col,
3507 all_names
3508 );
3509 }
3510 }
3511
3512 #[test]
3513 fn test_lineage_nested_cte_select_star() {
3514 let expr = parse(
3516 "WITH cte1 AS (SELECT a FROM t), \
3517 cte2 AS (SELECT * FROM cte1) \
3518 SELECT * FROM cte2",
3519 );
3520 let node = lineage("a", &expr, None, false).unwrap();
3521
3522 assert_eq!(node.name, "a");
3523 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3524 assert!(
3525 all_names.len() >= 3,
3526 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3527 all_names
3528 );
3529 }
3530
3531 #[test]
3532 fn test_lineage_three_level_nested_cte_star() {
3533 let expr = parse(
3535 "WITH cte1 AS (SELECT x FROM t), \
3536 cte2 AS (SELECT * FROM cte1), \
3537 cte3 AS (SELECT * FROM cte2) \
3538 SELECT * FROM cte3",
3539 );
3540 let node = lineage("x", &expr, None, false).unwrap();
3541
3542 assert_eq!(node.name, "x");
3543 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3544 assert!(
3545 all_names.len() >= 4,
3546 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3547 all_names
3548 );
3549 }
3550
3551 #[test]
3552 fn test_lineage_cte_union_star() {
3553 let expr = parse(
3555 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3556 SELECT * FROM cte",
3557 );
3558 let node = lineage("a", &expr, None, false).unwrap();
3559
3560 assert_eq!(node.name, "a");
3561 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3562 assert!(
3563 all_names.len() >= 2,
3564 "Expected at least 2 nodes for CTE union star, got: {:?}",
3565 all_names
3566 );
3567 }
3568
3569 #[test]
3570 fn test_lineage_cte_star_unknown_table() {
3571 let expr = parse(
3574 "WITH cte AS (SELECT * FROM unknown_table) \
3575 SELECT * FROM cte",
3576 );
3577 let _result = lineage("x", &expr, None, false);
3580 }
3581
3582 #[test]
3583 fn test_lineage_cte_explicit_columns() {
3584 let expr = parse(
3586 "WITH cte(x, y) AS (SELECT a, b FROM t) \
3587 SELECT * FROM cte",
3588 );
3589 let node = lineage("x", &expr, None, false).unwrap();
3590 assert_eq!(node.name, "x");
3591 }
3592
3593 #[test]
3594 fn test_lineage_cte_qualified_star() {
3595 let expr = parse(
3597 "WITH cte AS (SELECT a, b FROM t) \
3598 SELECT cte.* FROM cte",
3599 );
3600 for col in &["a", "b"] {
3601 let node = lineage(col, &expr, None, false).unwrap();
3602 assert_eq!(node.name, *col);
3603 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3604 assert!(
3605 all_names.len() >= 2,
3606 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3607 col,
3608 all_names
3609 );
3610 }
3611 }
3612
3613 #[test]
3614 fn test_lineage_subquery_select_star() {
3615 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3618 let node = lineage("x", &expr, None, false).unwrap();
3619
3620 assert_eq!(node.name, "x");
3621 assert!(
3622 !node.downstream.is_empty(),
3623 "Expected downstream nodes for subquery with SELECT *, got none"
3624 );
3625 }
3626
3627 #[test]
3628 fn test_lineage_cte_star_with_schema_external_table() {
3629 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3631SELECT * FROM orders"#;
3632 let expr = parse(sql);
3633
3634 let mut schema = MappingSchema::new();
3635 let cols = vec![
3636 ("order_id".to_string(), DataType::Unknown),
3637 ("customer_id".to_string(), DataType::Unknown),
3638 ("amount".to_string(), DataType::Unknown),
3639 ];
3640 schema.add_table("stg_orders", &cols, None).unwrap();
3641
3642 let node =
3643 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3644 .unwrap();
3645 assert_eq!(node.name, "order_id");
3646 }
3647
3648 #[test]
3649 fn test_lineage_cte_star_with_schema_three_part_name() {
3650 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3652SELECT * FROM orders"#;
3653 let expr = parse(sql);
3654
3655 let mut schema = MappingSchema::new();
3656 let cols = vec![
3657 ("order_id".to_string(), DataType::Unknown),
3658 ("customer_id".to_string(), DataType::Unknown),
3659 ];
3660 schema
3661 .add_table("db.schema.stg_orders", &cols, None)
3662 .unwrap();
3663
3664 let node = lineage_with_schema(
3665 "customer_id",
3666 &expr,
3667 Some(&schema as &dyn Schema),
3668 None,
3669 false,
3670 )
3671 .unwrap();
3672 assert_eq!(node.name, "customer_id");
3673 }
3674
3675 #[test]
3676 fn test_lineage_cte_star_with_schema_nested() {
3677 let sql = r#"WITH
3680 raw AS (SELECT * FROM external_table),
3681 enriched AS (SELECT * FROM raw)
3682 SELECT * FROM enriched"#;
3683 let expr = parse(sql);
3684
3685 let mut schema = MappingSchema::new();
3686 let cols = vec![
3687 ("id".to_string(), DataType::Unknown),
3688 ("name".to_string(), DataType::Unknown),
3689 ];
3690 schema.add_table("external_table", &cols, None).unwrap();
3691
3692 let node =
3693 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3694 assert_eq!(node.name, "name");
3695 }
3696
3697 #[test]
3698 fn test_lineage_cte_qualified_star_with_schema() {
3699 let sql = r#"WITH
3702 orders AS (SELECT * FROM stg_orders),
3703 enriched AS (
3704 SELECT orders.*, 'extra' AS extra
3705 FROM orders
3706 )
3707 SELECT * FROM enriched"#;
3708 let expr = parse(sql);
3709
3710 let mut schema = MappingSchema::new();
3711 let cols = vec![
3712 ("order_id".to_string(), DataType::Unknown),
3713 ("total".to_string(), DataType::Unknown),
3714 ];
3715 schema.add_table("stg_orders", &cols, None).unwrap();
3716
3717 let node =
3718 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3719 .unwrap();
3720 assert_eq!(node.name, "order_id");
3721
3722 let extra =
3724 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3725 assert_eq!(extra.name, "extra");
3726 }
3727
3728 #[test]
3729 fn test_lineage_cte_star_without_schema_still_works() {
3730 let sql = r#"WITH
3732 cte1 AS (SELECT id, name FROM raw_table),
3733 cte2 AS (SELECT * FROM cte1)
3734 SELECT * FROM cte2"#;
3735 let expr = parse(sql);
3736
3737 let node = lineage("id", &expr, None, false).unwrap();
3739 assert_eq!(node.name, "id");
3740 }
3741
3742 #[test]
3743 fn test_lineage_nested_cte_star_with_join_and_schema() {
3744 let sql = r#"WITH
3747base_orders AS (
3748 SELECT * FROM stg_orders
3749),
3750with_payments AS (
3751 SELECT
3752 base_orders.*,
3753 p.amount
3754 FROM base_orders
3755 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3756),
3757final_cte AS (
3758 SELECT * FROM with_payments
3759)
3760SELECT * FROM final_cte"#;
3761 let expr = parse(sql);
3762
3763 let mut schema = MappingSchema::new();
3764 let order_cols = vec![
3765 (
3766 "order_id".to_string(),
3767 crate::expressions::DataType::Unknown,
3768 ),
3769 (
3770 "customer_id".to_string(),
3771 crate::expressions::DataType::Unknown,
3772 ),
3773 ("status".to_string(), crate::expressions::DataType::Unknown),
3774 ];
3775 let pay_cols = vec![
3776 (
3777 "payment_id".to_string(),
3778 crate::expressions::DataType::Unknown,
3779 ),
3780 (
3781 "order_id".to_string(),
3782 crate::expressions::DataType::Unknown,
3783 ),
3784 ("amount".to_string(), crate::expressions::DataType::Unknown),
3785 ];
3786 schema.add_table("stg_orders", &order_cols, None).unwrap();
3787 schema.add_table("stg_payments", &pay_cols, None).unwrap();
3788
3789 let node =
3791 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3792 .unwrap();
3793 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3794
3795 let has_table_qualified = all_names
3797 .iter()
3798 .any(|n| n.contains('.') && n.contains("order_id"));
3799 assert!(
3800 has_table_qualified,
3801 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
3802 all_names
3803 );
3804
3805 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
3807 .unwrap();
3808 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3809
3810 let has_table_qualified = all_names
3811 .iter()
3812 .any(|n| n.contains('.') && n.contains("amount"));
3813 assert!(
3814 has_table_qualified,
3815 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
3816 all_names
3817 );
3818 }
3819
3820 #[test]
3821 fn test_lineage_cte_alias_resolution() {
3822 let sql = r#"WITH import_stg_items AS (
3824 SELECT item_id, name, status FROM stg_items
3825)
3826SELECT base.item_id, base.status
3827FROM import_stg_items AS base"#;
3828 let expr = parse(sql);
3829
3830 let node = lineage("item_id", &expr, None, false).unwrap();
3831 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3832 assert!(
3834 all_names.iter().any(|n| n == "stg_items.item_id"),
3835 "Expected leaf 'stg_items.item_id', got: {:?}",
3836 all_names
3837 );
3838 }
3839
3840 #[test]
3841 fn test_lineage_cte_alias_with_schema_and_star() {
3842 let sql = r#"WITH import_stg AS (
3844 SELECT * FROM stg_items
3845)
3846SELECT base.item_id, base.status
3847FROM import_stg AS base"#;
3848 let expr = parse(sql);
3849
3850 let mut schema = MappingSchema::new();
3851 schema
3852 .add_table(
3853 "stg_items",
3854 &[
3855 ("item_id".to_string(), DataType::Unknown),
3856 ("name".to_string(), DataType::Unknown),
3857 ("status".to_string(), DataType::Unknown),
3858 ],
3859 None,
3860 )
3861 .unwrap();
3862
3863 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
3864 .unwrap();
3865 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3866 assert!(
3867 all_names.iter().any(|n| n == "stg_items.item_id"),
3868 "Expected leaf 'stg_items.item_id', got: {:?}",
3869 all_names
3870 );
3871 }
3872
3873 #[test]
3874 fn test_lineage_cte_alias_with_join() {
3875 let sql = r#"WITH
3877 import_users AS (SELECT id, name FROM users),
3878 import_orders AS (SELECT id, user_id, amount FROM orders)
3879SELECT u.name, o.amount
3880FROM import_users AS u
3881LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3882 let expr = parse(sql);
3883
3884 let node = lineage("name", &expr, None, false).unwrap();
3885 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3886 assert!(
3887 all_names.iter().any(|n| n == "users.name"),
3888 "Expected leaf 'users.name', got: {:?}",
3889 all_names
3890 );
3891
3892 let node = lineage("amount", &expr, None, false).unwrap();
3893 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3894 assert!(
3895 all_names.iter().any(|n| n == "orders.amount"),
3896 "Expected leaf 'orders.amount', got: {:?}",
3897 all_names
3898 );
3899 }
3900
3901 #[test]
3906 fn test_lineage_unquoted_cte_case_insensitive() {
3907 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
3910 let node = lineage("col", &expr, None, false).unwrap();
3911 assert_eq!(node.name, "col");
3912 assert!(
3913 !node.downstream.is_empty(),
3914 "Unquoted CTE should resolve case-insensitively"
3915 );
3916 }
3917
3918 #[test]
3919 fn test_lineage_quoted_cte_case_preserved() {
3920 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
3922 let node = lineage("col", &expr, None, false).unwrap();
3923 assert_eq!(node.name, "col");
3924 assert!(
3925 !node.downstream.is_empty(),
3926 "Quoted CTE with matching case should resolve"
3927 );
3928 }
3929
3930 #[test]
3931 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3932 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
3936 let result = lineage("col", &expr, None, false);
3939 assert!(
3940 result.is_err(),
3941 "Quoted CTE with case mismatch should not expand star: {:?}",
3942 result
3943 );
3944 }
3945
3946 #[test]
3947 fn test_lineage_mixed_quoted_unquoted_cte() {
3948 let expr = parse(
3950 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3951 );
3952 let node = lineage("a", &expr, None, false).unwrap();
3953 assert_eq!(node.name, "a");
3954 assert!(
3955 !node.downstream.is_empty(),
3956 "Mixed quoted/unquoted CTE chain should resolve"
3957 );
3958 }
3959
3960 #[test]
3976 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3977 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
3988 let node = lineage("col", &expr, None, false).unwrap();
3989 assert!(!node.downstream.is_empty());
3990 let child = &node.downstream[0];
3991 assert_eq!(
3993 child.source_name, "MyCte",
3994 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3995 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3996 );
3997 }
3998
3999 #[test]
4000 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4001 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4008 let node = lineage("col", &expr, None, false).unwrap();
4009 assert!(!node.downstream.is_empty());
4010 let child = &node.downstream[0];
4011 assert_eq!(
4013 child.source_name, "MyCte",
4014 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4015 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4016 );
4017 }
4018
4019 #[test]
4026 #[ignore = "requires derived table star expansion (separate issue)"]
4027 fn test_node_name_doesnt_contain_comment() {
4028 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4029 let node = lineage("x", &expr, None, false).unwrap();
4030
4031 assert_eq!(node.name, "x");
4032 assert!(!node.downstream.is_empty());
4033 }
4034
4035 #[test]
4039 fn test_comment_before_first_column_in_cte() {
4040 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
4041 let sql_without_comment = "with t as (select 1 as a) select a from t";
4042
4043 let expr_ok = parse(sql_without_comment);
4045 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4046
4047 let expr_comment = parse(sql_with_comment);
4049 let node_comment = lineage("a", &expr_comment, None, false)
4050 .expect("with comment before first column should succeed");
4051
4052 assert_eq!(node_ok.name, node_comment.name, "node names should match");
4053 assert_eq!(
4054 node_ok.downstream_names(),
4055 node_comment.downstream_names(),
4056 "downstream lineage should be identical with or without comment"
4057 );
4058 }
4059
4060 #[test]
4062 fn test_block_comment_before_first_column() {
4063 let sql = "with t as (select 1 as a) select /* section */ a from t";
4064 let expr = parse(sql);
4065 let node = lineage("a", &expr, None, false)
4066 .expect("block comment before first column should succeed");
4067 assert_eq!(node.name, "a");
4068 assert!(
4069 !node.downstream.is_empty(),
4070 "should have downstream lineage"
4071 );
4072 }
4073
4074 #[test]
4076 fn test_comment_before_first_column_second_col_ok() {
4077 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
4078 let expr = parse(sql);
4079
4080 let node_a =
4081 lineage("a", &expr, None, false).expect("column a with comment should succeed");
4082 assert_eq!(node_a.name, "a");
4083
4084 let node_b =
4085 lineage("b", &expr, None, false).expect("column b with comment should succeed");
4086 assert_eq!(node_b.name, "b");
4087 }
4088
4089 #[test]
4091 fn test_comment_before_aliased_column() {
4092 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
4093 let expr = parse(sql);
4094 let node =
4095 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4096 assert_eq!(node.name, "y");
4097 assert!(
4098 !node.downstream.is_empty(),
4099 "aliased column should have downstream lineage"
4100 );
4101 }
4102}