1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub source_kind: SourceKind,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub source_alias: Option<String>,
37 pub reference_node_name: String,
39}
40
41impl LineageNode {
42 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44 Self {
45 name: name.into(),
46 expression,
47 source,
48 downstream: Vec::new(),
49 source_name: String::new(),
50 source_kind: SourceKind::Unknown,
51 source_alias: None,
52 reference_node_name: String::new(),
53 }
54 }
55
56 pub fn walk(&self) -> LineageWalker<'_> {
58 LineageWalker { stack: vec![self] }
59 }
60
61 pub fn downstream_names(&self) -> Vec<String> {
63 self.downstream.iter().map(|n| n.name.clone()).collect()
64 }
65}
66
67fn source_kind_for_scope_context(
68 scope: &Scope,
69 source_name: &str,
70 reference_node_name: &str,
71) -> SourceKind {
72 if source_name.is_empty() && reference_node_name.is_empty() {
73 return SourceKind::Root;
74 }
75 if let Some(source_info) = scope.sources.get(source_name) {
76 return source_info.kind;
77 }
78 if scope.cte_sources.contains_key(source_name) {
79 return SourceKind::Cte;
80 }
81 match scope.scope_type {
82 ScopeType::Cte => SourceKind::Cte,
83 ScopeType::DerivedTable => SourceKind::DerivedTable,
84 ScopeType::Udtf => SourceKind::Virtual,
85 _ => SourceKind::Unknown,
86 }
87}
88
89fn apply_scope_context(
90 node: &mut LineageNode,
91 scope: &Scope,
92 source_name: &str,
93 reference_node_name: &str,
94) {
95 node.source_name = source_name.to_string();
96 node.reference_node_name = reference_node_name.to_string();
97 node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100pub struct LineageWalker<'a> {
102 stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106 type Item = &'a LineageNode;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 if let Some(node) = self.stack.pop() {
110 for child in node.downstream.iter().rev() {
112 self.stack.push(child);
113 }
114 Some(node)
115 } else {
116 None
117 }
118 }
119}
120
121enum ColumnRef<'a> {
127 Name(&'a str),
128 Index(usize),
129}
130
131pub fn lineage(
157 column: &str,
158 sql: &Expression,
159 dialect: Option<DialectType>,
160 trim_selects: bool,
161) -> Result<LineageNode> {
162 let effective_sql = lineage_effective_expression(sql);
164 let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165 if !has_with {
166 return lineage_from_expression(column, sql, dialect, trim_selects);
167 }
168 let mut owned = sql.clone();
169 expand_cte_stars(&mut owned, None);
170 lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173pub fn lineage_with_schema(
189 column: &str,
190 sql: &Expression,
191 schema: Option<&dyn Schema>,
192 dialect: Option<DialectType>,
193 trim_selects: bool,
194) -> Result<LineageNode> {
195 let mut qualified_expression = if let Some(schema) = schema {
196 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197 QualifyColumnsOptions::new().with_dialect(dialect_type)
198 } else {
199 QualifyColumnsOptions::new()
200 };
201
202 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203 Error::internal(format!("Lineage qualification failed with schema: {}", e))
204 })?
205 } else {
206 sql.clone()
207 };
208
209 annotate_types(&mut qualified_expression, schema, dialect);
211
212 expand_cte_stars(&mut qualified_expression, schema);
215
216 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220 column: &str,
221 sql: &Expression,
222 dialect: Option<DialectType>,
223 trim_selects: bool,
224) -> Result<LineageNode> {
225 let sql = lineage_effective_expression(sql);
226 let scope = build_scope(sql);
227 to_node(
228 ColumnRef::Name(column),
229 &scope,
230 dialect,
231 "",
232 "",
233 "",
234 trim_selects,
235 )
236}
237
238fn lineage_effective_expression(sql: &Expression) -> &Expression {
239 match sql {
240 Expression::Prepare(prepare) => &prepare.statement,
241 _ => sql,
242 }
243}
244
245fn normalize_cte_name(ident: &Identifier) -> String {
255 if ident.quoted {
256 ident.name.clone()
257 } else {
258 ident.name.to_lowercase()
259 }
260}
261
262pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
274 if let Expression::Prepare(prepare) = expr {
275 expand_cte_stars(&mut prepare.statement, schema);
276 return;
277 }
278
279 let select = match expr {
280 Expression::Select(s) => s,
281 _ => return,
282 };
283
284 let with = match &mut select.with {
285 Some(w) => w,
286 None => return,
287 };
288
289 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
290
291 for cte in &mut with.ctes {
292 let cte_name = normalize_cte_name(&cte.alias);
293
294 if !cte.columns.is_empty() {
296 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
297 resolved_cte_columns.insert(cte_name, cols);
298 continue;
299 }
300
301 if with.recursive {
307 let is_self_referencing =
308 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
309 let body_sources = get_select_sources(body_select);
310 body_sources.iter().any(|s| s.normalized == cte_name)
311 } else {
312 false
313 };
314 if is_self_referencing {
315 continue;
316 }
317 }
318
319 let body_select = match get_leftmost_select_mut(&mut cte.this) {
321 Some(s) => s,
322 None => continue,
323 };
324
325 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
326 resolved_cte_columns.insert(cte_name, columns);
327 }
328
329 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
331}
332
333fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
338 let mut current = expr;
339 for _ in 0..MAX_LINEAGE_DEPTH {
340 match current {
341 Expression::Select(s) => return Some(s),
342 Expression::Union(u) => current = &mut u.left,
343 Expression::Intersect(i) => current = &mut i.left,
344 Expression::Except(e) => current = &mut e.left,
345 Expression::Paren(p) => current = &mut p.this,
346 _ => return None,
347 }
348 }
349 None
350}
351
352fn rewrite_stars_in_select(
356 select: &mut Select,
357 resolved_ctes: &HashMap<String, Vec<String>>,
358 schema: Option<&dyn Schema>,
359) -> Vec<String> {
360 let has_star = select
365 .expressions
366 .iter()
367 .any(|e| matches!(e, Expression::Star(_)));
368 let has_qualified_star = select
369 .expressions
370 .iter()
371 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
372
373 if !has_star && !has_qualified_star {
374 return select
376 .expressions
377 .iter()
378 .filter_map(get_expression_output_name)
379 .collect();
380 }
381
382 let sources = get_select_sources(select);
383 let mut new_expressions = Vec::new();
384 let mut result_columns = Vec::new();
385
386 for expr in &select.expressions {
387 match expr {
388 Expression::Star(star) => {
389 let qual = star.table.as_ref();
390 if let Some(expanded) =
391 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
392 {
393 for (src_alias, col_name) in &expanded {
394 let table_id = Identifier::new(src_alias);
395 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
396 result_columns.push(col_name.clone());
397 }
398 } else {
399 new_expressions.push(expr.clone());
400 result_columns.push("*".to_string());
401 }
402 }
403 Expression::Column(c) if c.name.name == "*" => {
404 let qual = c.table.as_ref();
405 if let Some(expanded) =
406 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
407 {
408 for (_src_alias, col_name) in &expanded {
409 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
411 result_columns.push(col_name.clone());
412 }
413 } else {
414 new_expressions.push(expr.clone());
415 result_columns.push("*".to_string());
416 }
417 }
418 _ => {
419 new_expressions.push(expr.clone());
420 if let Some(name) = get_expression_output_name(expr) {
421 result_columns.push(name);
422 }
423 }
424 }
425 }
426
427 select.expressions = new_expressions;
428 result_columns
429}
430
431fn expand_star_from_sources(
436 qualifier: Option<&Identifier>,
437 sources: &[SourceInfo],
438 resolved_ctes: &HashMap<String, Vec<String>>,
439 schema: Option<&dyn Schema>,
440) -> Option<Vec<(String, String)>> {
441 let mut expanded = Vec::new();
442
443 if let Some(qual) = qualifier {
444 let qual_normalized = normalize_cte_name(qual);
446 for src in sources {
447 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
448 if let Some(cols) = resolved_ctes.get(&src.normalized) {
450 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
451 return Some(expanded);
452 }
453 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
455 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
456 return Some(expanded);
457 }
458 }
459 }
460 None
461 } else {
462 let mut any_expanded = false;
468 for src in sources {
469 if let Some(cols) = resolved_ctes.get(&src.normalized) {
470 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
471 any_expanded = true;
472 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
473 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
474 any_expanded = true;
475 } else {
476 return None;
477 }
478 }
479 if any_expanded {
480 Some(expanded)
481 } else {
482 None
483 }
484 }
485}
486
487fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
489 let schema = schema?;
490 if fq_name.is_empty() {
491 return None;
492 }
493 schema
494 .column_names(fq_name)
495 .ok()
496 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
497}
498
499fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
501 Expression::Column(Box::new(crate::expressions::Column {
502 name: Identifier::new(name),
503 table: table.cloned(),
504 join_mark: false,
505 trailing_comments: Vec::new(),
506 span: None,
507 inferred_type: None,
508 }))
509}
510
511fn get_expression_output_name(expr: &Expression) -> Option<String> {
513 match expr {
514 Expression::Alias(a) => Some(a.alias.name.clone()),
515 Expression::Column(c) => Some(c.name.name.clone()),
516 Expression::Identifier(id) => Some(id.name.clone()),
517 Expression::Star(_) => Some("*".to_string()),
518 _ => None,
519 }
520}
521
522struct SourceInfo {
524 alias: String,
525 normalized: String,
527 fq_name: String,
529}
530
531fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
534 let mut sources = Vec::new();
535
536 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
537 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
538 SourceInfo {
539 alias: alias.name.clone(),
540 normalized: normalize_cte_name(alias),
541 fq_name: alias.name.clone(),
542 }
543 }
544
545 fn named_virtual_source_info(alias: &str) -> SourceInfo {
546 SourceInfo {
547 alias: alias.to_string(),
548 normalized: alias.to_lowercase(),
549 fq_name: alias.to_string(),
550 }
551 }
552
553 match expr {
554 Expression::Table(t) => {
555 let normalized = normalize_cte_name(&t.name);
556 let alias = t
557 .alias
558 .as_ref()
559 .map(|a| a.name.clone())
560 .unwrap_or_else(|| t.name.name.clone());
561 let mut parts = Vec::new();
562 if let Some(catalog) = &t.catalog {
563 parts.push(catalog.name.clone());
564 }
565 if let Some(schema) = &t.schema {
566 parts.push(schema.name.clone());
567 }
568 parts.push(t.name.name.clone());
569 let fq_name = parts.join(".");
570 Some(SourceInfo {
571 alias,
572 normalized,
573 fq_name,
574 })
575 }
576 Expression::Subquery(s) => {
577 let alias = s.alias.as_ref()?.name.clone();
578 let normalized = alias.to_lowercase();
579 let fq_name = alias.clone();
580 Some(SourceInfo {
581 alias,
582 normalized,
583 fq_name,
584 })
585 }
586 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
587 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
588 Some(virtual_source_info(&a.alias))
589 }
590 Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
591 Expression::LateralView(lateral_view) => lateral_view
592 .table_alias
593 .as_ref()
594 .or_else(|| lateral_view.column_aliases.first())
595 .map(virtual_source_info),
596 Expression::Paren(p) => extract_source(&p.this),
597 _ => None,
598 }
599 }
600
601 if let Some(from) = &select.from {
602 for expr in &from.expressions {
603 if let Some(info) = extract_source(expr) {
604 sources.push(info);
605 }
606 }
607 }
608 for join in &select.joins {
609 if let Some(info) = extract_source(&join.this) {
610 sources.push(info);
611 }
612 }
613 for lateral_view in &select.lateral_views {
614 if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
615 {
616 sources.push(info);
617 }
618 }
619 sources
620}
621
622pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
624 let mut tables = HashSet::new();
625 collect_source_tables(node, &mut tables);
626 tables
627}
628
629pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
631 if let Expression::Table(table) = &node.source {
632 tables.insert(table.name.name.clone());
633 }
634 for child in &node.downstream {
635 collect_source_tables(child, tables);
636 }
637}
638
639const MAX_LINEAGE_DEPTH: usize = 64;
646
647fn to_node(
649 column: ColumnRef<'_>,
650 scope: &Scope,
651 dialect: Option<DialectType>,
652 scope_name: &str,
653 source_name: &str,
654 reference_node_name: &str,
655 trim_selects: bool,
656) -> Result<LineageNode> {
657 to_node_inner(
658 column,
659 scope,
660 dialect,
661 scope_name,
662 source_name,
663 reference_node_name,
664 trim_selects,
665 &[],
666 0,
667 )
668}
669
670fn to_node_inner(
671 column: ColumnRef<'_>,
672 scope: &Scope,
673 dialect: Option<DialectType>,
674 scope_name: &str,
675 source_name: &str,
676 reference_node_name: &str,
677 trim_selects: bool,
678 ancestor_cte_scopes: &[Scope],
679 depth: usize,
680) -> Result<LineageNode> {
681 if depth > MAX_LINEAGE_DEPTH {
682 return Err(Error::internal(format!(
683 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
684 )));
685 }
686 let scope_expr = &scope.expression;
687
688 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
690 for s in ancestor_cte_scopes {
691 all_cte_scopes.push(s);
692 }
693
694 let effective_expr = match scope_expr {
697 Expression::Cte(cte) => &cte.this,
698 other => other,
699 };
700
701 if matches!(
703 effective_expr,
704 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
705 ) {
706 if matches!(scope_expr, Expression::Cte(_)) {
708 let mut inner_scope = Scope::new(effective_expr.clone());
709 inner_scope.union_scopes = scope.union_scopes.clone();
710 inner_scope.sources = scope.sources.clone();
711 inner_scope.cte_sources = scope.cte_sources.clone();
712 inner_scope.cte_scopes = scope.cte_scopes.clone();
713 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
714 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
715 return handle_set_operation(
716 &column,
717 &inner_scope,
718 dialect,
719 scope_name,
720 source_name,
721 reference_node_name,
722 trim_selects,
723 ancestor_cte_scopes,
724 depth,
725 );
726 }
727 return handle_set_operation(
728 &column,
729 scope,
730 dialect,
731 scope_name,
732 source_name,
733 reference_node_name,
734 trim_selects,
735 ancestor_cte_scopes,
736 depth,
737 );
738 }
739
740 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
742 let column_name = resolve_column_name(&column, &select_expr);
743
744 let node_source = if trim_selects {
746 trim_source(effective_expr, &select_expr)
747 } else {
748 effective_expr.clone()
749 };
750
751 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
753 apply_scope_context(&mut node, scope, source_name, reference_node_name);
754
755 if matches!(&select_expr, Expression::Star(_)) {
757 for (name, source_info) in &scope.sources {
758 let mut child = LineageNode::new(
759 format!("{}.*", name),
760 Expression::Star(crate::expressions::Star {
761 table: None,
762 except: None,
763 replace: None,
764 rename: None,
765 trailing_comments: vec![],
766 span: None,
767 }),
768 source_info.expression.clone(),
769 );
770 apply_source_info_context(&mut child, name, source_info);
771 node.downstream.push(child);
772 }
773 return Ok(node);
774 }
775
776 let subqueries: Vec<&Expression> =
778 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
779 for sq_expr in subqueries {
780 if let Expression::Subquery(sq) = sq_expr {
781 for sq_scope in &scope.subquery_scopes {
782 if sq_scope.expression == sq.this {
783 if let Ok(child) = to_node_inner(
784 ColumnRef::Index(0),
785 sq_scope,
786 dialect,
787 &column_name,
788 "",
789 "",
790 trim_selects,
791 ancestor_cte_scopes,
792 depth + 1,
793 ) {
794 node.downstream.push(child);
795 }
796 break;
797 }
798 }
799 }
800 }
801
802 let col_refs = find_column_refs_in_expr(&select_expr, dialect);
804 for col_ref in col_refs {
805 let col_name = &col_ref.column;
806 if let Some(ref table_id) = col_ref.table {
807 let tbl = &table_id.name;
808 resolve_qualified_column(
809 &mut node,
810 scope,
811 dialect,
812 tbl,
813 col_name,
814 &column_name,
815 trim_selects,
816 &all_cte_scopes,
817 depth,
818 );
819 } else {
820 resolve_unqualified_column(
821 &mut node,
822 scope,
823 dialect,
824 col_name,
825 &column_name,
826 trim_selects,
827 &all_cte_scopes,
828 depth,
829 );
830 }
831 }
832
833 Ok(node)
834}
835
836fn handle_set_operation(
841 column: &ColumnRef<'_>,
842 scope: &Scope,
843 dialect: Option<DialectType>,
844 scope_name: &str,
845 source_name: &str,
846 reference_node_name: &str,
847 trim_selects: bool,
848 ancestor_cte_scopes: &[Scope],
849 depth: usize,
850) -> Result<LineageNode> {
851 let scope_expr = &scope.expression;
852
853 let col_index = match column {
855 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
856 ColumnRef::Index(i) => *i,
857 };
858
859 let col_name = match column {
860 ColumnRef::Name(name) => name.to_string(),
861 ColumnRef::Index(_) => format!("_{col_index}"),
862 };
863
864 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
865 apply_scope_context(&mut node, scope, source_name, reference_node_name);
866
867 for branch_scope in &scope.union_scopes {
869 if let Ok(child) = to_node_inner(
870 ColumnRef::Index(col_index),
871 branch_scope,
872 dialect,
873 scope_name,
874 "",
875 "",
876 trim_selects,
877 ancestor_cte_scopes,
878 depth + 1,
879 ) {
880 node.downstream.push(child);
881 }
882 }
883
884 Ok(node)
885}
886
887fn resolve_qualified_column(
892 node: &mut LineageNode,
893 scope: &Scope,
894 dialect: Option<DialectType>,
895 table: &str,
896 col_name: &str,
897 parent_name: &str,
898 trim_selects: bool,
899 all_cte_scopes: &[&Scope],
900 depth: usize,
901) {
902 let resolved_cte_name = resolve_cte_alias(scope, table);
905 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
906
907 let is_cte = scope.cte_sources.contains_key(effective_table)
910 || all_cte_scopes.iter().any(
911 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
912 );
913 if is_cte {
914 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
915 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
917 if let Ok(child) = to_node_inner(
918 ColumnRef::Name(col_name),
919 child_scope,
920 dialect,
921 parent_name,
922 effective_table,
923 parent_name,
924 trim_selects,
925 &ancestors,
926 depth + 1,
927 ) {
928 node.downstream.push(child);
929 return;
930 }
931 }
932 }
933
934 if let Some(source_info) = scope.sources.get(table) {
936 if source_info.is_scope {
937 if let Some(child_scope) = find_child_scope(scope, table) {
938 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
939 if let Ok(child) = to_node_inner(
940 ColumnRef::Name(col_name),
941 child_scope,
942 dialect,
943 parent_name,
944 table,
945 parent_name,
946 trim_selects,
947 &ancestors,
948 depth + 1,
949 ) {
950 node.downstream.push(child);
951 return;
952 }
953 }
954 }
955 }
956
957 if let Some(source_info) = scope.sources.get(table) {
960 if !source_info.is_scope {
961 let mut child = make_table_column_node_from_source(table, col_name, source_info);
962 if source_info.kind == SourceKind::Virtual {
963 attach_virtual_source_dependencies(
964 &mut child,
965 scope,
966 dialect,
967 table,
968 &source_info.expression,
969 trim_selects,
970 all_cte_scopes,
971 depth,
972 );
973 }
974 node.downstream.push(child);
975 return;
976 }
977 }
978
979 node.downstream
981 .push(make_table_column_node(table, col_name));
982}
983
984fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
990 if scope.cte_sources.contains_key(name) {
992 return None;
993 }
994 if let Some(source_info) = scope.sources.get(name) {
996 if source_info.is_scope {
997 if let Expression::Cte(cte) = &source_info.expression {
998 let cte_name = &cte.alias.name;
999 if scope.cte_sources.contains_key(cte_name) {
1000 return Some(cte_name.clone());
1001 }
1002 }
1003 }
1004 }
1005 None
1006}
1007
1008fn resolve_unqualified_column(
1009 node: &mut LineageNode,
1010 scope: &Scope,
1011 dialect: Option<DialectType>,
1012 col_name: &str,
1013 parent_name: &str,
1014 trim_selects: bool,
1015 all_cte_scopes: &[&Scope],
1016 depth: usize,
1017) {
1018 let from_source_names = source_names_from_from_join(scope);
1022
1023 if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1024 resolve_qualified_column(
1025 node,
1026 scope,
1027 dialect,
1028 &tbl,
1029 col_name,
1030 parent_name,
1031 trim_selects,
1032 all_cte_scopes,
1033 depth,
1034 );
1035 return;
1036 }
1037
1038 if from_source_names.len() == 1 {
1039 let tbl = &from_source_names[0];
1040 resolve_qualified_column(
1041 node,
1042 scope,
1043 dialect,
1044 tbl,
1045 col_name,
1046 parent_name,
1047 trim_selects,
1048 all_cte_scopes,
1049 depth,
1050 );
1051 return;
1052 }
1053
1054 let child = LineageNode::new(
1056 col_name.to_string(),
1057 Expression::Column(Box::new(crate::expressions::Column {
1058 name: crate::expressions::Identifier::new(col_name.to_string()),
1059 table: None,
1060 join_mark: false,
1061 trailing_comments: vec![],
1062 span: None,
1063 inferred_type: None,
1064 })),
1065 node.source.clone(),
1066 );
1067 node.downstream.push(child);
1068}
1069
1070fn unique_virtual_source_for_column(
1071 scope: &Scope,
1072 source_names: &[String],
1073 col_name: &str,
1074) -> Option<String> {
1075 let mut matches = source_names.iter().filter_map(|source_name| {
1076 let source = scope.sources.get(source_name)?;
1077 if source.kind == SourceKind::Virtual
1078 && virtual_source_output_columns(source)
1079 .any(|column| column.eq_ignore_ascii_case(col_name))
1080 {
1081 Some(source_name.clone())
1082 } else {
1083 None
1084 }
1085 });
1086
1087 let first = matches.next()?;
1088 if matches.next().is_none() {
1089 Some(first)
1090 } else {
1091 None
1092 }
1093}
1094
1095fn virtual_source_output_columns(
1096 source_info: &ScopeSourceInfo,
1097) -> Box<dyn Iterator<Item = String> + '_> {
1098 match &source_info.expression {
1099 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1100 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1101 Box::new(alias_output_columns(alias))
1102 }
1103 Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1104 Expression::LateralView(lateral_view) => {
1105 Box::new(lateral_view_output_columns(lateral_view))
1106 }
1107 _ => Box::new(source_info.alias.clone().into_iter()),
1108 }
1109}
1110
1111fn unnest_output_columns(
1112 unnest: &crate::expressions::UnnestFunc,
1113) -> impl Iterator<Item = String> + '_ {
1114 unnest
1115 .alias
1116 .iter()
1117 .map(|alias| alias.name.clone())
1118 .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1119}
1120
1121fn alias_output_columns(
1122 alias: &crate::expressions::Alias,
1123) -> Box<dyn Iterator<Item = String> + '_> {
1124 if alias.column_aliases.is_empty() {
1125 Box::new(std::iter::once(alias.alias.name.clone()))
1126 } else {
1127 Box::new(
1128 alias
1129 .column_aliases
1130 .iter()
1131 .map(|column| column.name.clone()),
1132 )
1133 }
1134}
1135
1136fn lateral_output_columns(
1137 lateral: &crate::expressions::Lateral,
1138) -> Box<dyn Iterator<Item = String> + '_> {
1139 if lateral.column_aliases.is_empty() {
1140 default_virtual_output_columns(&lateral.this)
1141 } else {
1142 Box::new(lateral.column_aliases.iter().cloned())
1143 }
1144}
1145
1146fn lateral_view_output_columns(
1147 lateral_view: &crate::expressions::LateralView,
1148) -> Box<dyn Iterator<Item = String> + '_> {
1149 Box::new(
1150 lateral_view
1151 .column_aliases
1152 .iter()
1153 .map(|column| column.name.clone()),
1154 )
1155}
1156
1157fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1158 match expr {
1159 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1160 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1161 alias_output_columns(alias)
1162 }
1163 Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1164 Box::new(
1165 ["seq", "key", "path", "index", "value", "this"]
1166 .into_iter()
1167 .map(String::from),
1168 )
1169 }
1170 _ => Box::new(std::iter::empty()),
1171 }
1172}
1173
1174fn attach_virtual_source_dependencies(
1175 node: &mut LineageNode,
1176 scope: &Scope,
1177 dialect: Option<DialectType>,
1178 source_alias: &str,
1179 source_expr: &Expression,
1180 trim_selects: bool,
1181 all_cte_scopes: &[&Scope],
1182 depth: usize,
1183) {
1184 let parent_name = node.name.clone();
1185 let mut seen = HashSet::new();
1186 for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1187 let key = (
1188 col_ref.table.as_ref().map(|t| t.name.clone()),
1189 col_ref.column.clone(),
1190 );
1191 if !seen.insert(key) {
1192 continue;
1193 }
1194
1195 if let Some(table_id) = col_ref.table {
1196 let table = table_id.name;
1197 if table == source_alias {
1198 continue;
1199 }
1200 resolve_qualified_column(
1201 node,
1202 scope,
1203 dialect,
1204 &table,
1205 &col_ref.column,
1206 &parent_name,
1207 trim_selects,
1208 all_cte_scopes,
1209 depth + 1,
1210 );
1211 } else {
1212 let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1213 if non_virtual_sources.len() == 1 {
1214 resolve_qualified_column(
1215 node,
1216 scope,
1217 dialect,
1218 &non_virtual_sources[0],
1219 &col_ref.column,
1220 &parent_name,
1221 trim_selects,
1222 all_cte_scopes,
1223 depth + 1,
1224 );
1225 }
1226 }
1227 }
1228}
1229
1230fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1231 fn source_name(expr: &Expression) -> Option<String> {
1232 match expr {
1233 Expression::Table(table) => Some(
1234 table
1235 .alias
1236 .as_ref()
1237 .map(|a| a.name.clone())
1238 .unwrap_or_else(|| table.name.name.clone()),
1239 ),
1240 Expression::Subquery(subquery) => {
1241 subquery.alias.as_ref().map(|alias| alias.name.clone())
1242 }
1243 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1244 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1245 Some(alias.alias.name.clone())
1246 }
1247 Expression::Lateral(lateral) => lateral.alias.clone(),
1248 Expression::LateralView(lateral_view) => lateral_view
1249 .table_alias
1250 .as_ref()
1251 .or_else(|| lateral_view.column_aliases.first())
1252 .map(|alias| alias.name.clone()),
1253 Expression::Paren(paren) => source_name(&paren.this),
1254 _ => None,
1255 }
1256 }
1257
1258 let effective_expr = match &scope.expression {
1259 Expression::Cte(cte) => &cte.this,
1260 expr => expr,
1261 };
1262
1263 let mut names = Vec::new();
1264 let mut seen = std::collections::HashSet::new();
1265
1266 if let Expression::Select(select) = effective_expr {
1267 if let Some(from) = &select.from {
1268 for expr in &from.expressions {
1269 if let Some(name) = source_name(expr) {
1270 if !name.is_empty() && seen.insert(name.clone()) {
1271 names.push(name);
1272 }
1273 }
1274 }
1275 }
1276 for join in &select.joins {
1277 if let Some(name) = source_name(&join.this) {
1278 if !name.is_empty() && seen.insert(name.clone()) {
1279 names.push(name);
1280 }
1281 }
1282 }
1283 for lateral_view in &select.lateral_views {
1284 if let Some(name) =
1285 source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1286 {
1287 if !name.is_empty() && seen.insert(name.clone()) {
1288 names.push(name);
1289 }
1290 }
1291 }
1292 }
1293
1294 names
1295}
1296
1297fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1298 source_names_from_from_join(scope)
1299 .into_iter()
1300 .filter(|name| {
1301 !matches!(
1302 scope.sources.get(name).map(|source| source.kind),
1303 Some(SourceKind::Virtual)
1304 )
1305 })
1306 .collect()
1307}
1308
1309fn get_alias_or_name(expr: &Expression) -> Option<String> {
1315 match expr {
1316 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1317 Expression::Column(col) => Some(col.name.name.clone()),
1318 Expression::Identifier(id) => Some(id.name.clone()),
1319 Expression::Star(_) => Some("*".to_string()),
1320 Expression::Annotated(a) => get_alias_or_name(&a.this),
1323 _ => None,
1324 }
1325}
1326
1327fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1329 match column {
1330 ColumnRef::Name(n) => n.to_string(),
1331 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1332 }
1333}
1334
1335fn find_select_expr(
1337 scope_expr: &Expression,
1338 column: &ColumnRef<'_>,
1339 dialect: Option<DialectType>,
1340) -> Result<Expression> {
1341 if let Expression::Select(ref select) = scope_expr {
1342 match column {
1343 ColumnRef::Name(name) => {
1344 let normalized_name = normalize_column_name(name, dialect);
1345 for expr in &select.expressions {
1346 if let Some(alias_or_name) = get_alias_or_name(expr) {
1347 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1348 return Ok(expr.clone());
1349 }
1350 }
1351 }
1352 Err(crate::error::Error::parse(
1353 format!("Cannot find column '{}' in query", name),
1354 0,
1355 0,
1356 0,
1357 0,
1358 ))
1359 }
1360 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1361 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1362 }),
1363 }
1364 } else {
1365 Err(crate::error::Error::parse(
1366 "Expected SELECT expression for column lookup",
1367 0,
1368 0,
1369 0,
1370 0,
1371 ))
1372 }
1373}
1374
1375fn column_to_index(
1377 set_op_expr: &Expression,
1378 name: &str,
1379 dialect: Option<DialectType>,
1380) -> Result<usize> {
1381 let normalized_name = normalize_column_name(name, dialect);
1382 let mut expr = set_op_expr;
1383 loop {
1384 match expr {
1385 Expression::Union(u) => expr = &u.left,
1386 Expression::Intersect(i) => expr = &i.left,
1387 Expression::Except(e) => expr = &e.left,
1388 Expression::Select(select) => {
1389 for (i, e) in select.expressions.iter().enumerate() {
1390 if let Some(alias_or_name) = get_alias_or_name(e) {
1391 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1392 return Ok(i);
1393 }
1394 }
1395 }
1396 return Err(crate::error::Error::parse(
1397 format!("Cannot find column '{}' in set operation", name),
1398 0,
1399 0,
1400 0,
1401 0,
1402 ));
1403 }
1404 _ => {
1405 return Err(crate::error::Error::parse(
1406 "Expected SELECT or set operation",
1407 0,
1408 0,
1409 0,
1410 0,
1411 ))
1412 }
1413 }
1414 }
1415}
1416
1417fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1418 normalize_name(name, dialect, false, true)
1419}
1420
1421fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1423 if let Expression::Select(select) = select_expr {
1424 let mut trimmed = select.as_ref().clone();
1425 trimmed.expressions = vec![target_expr.clone()];
1426 Expression::Select(Box::new(trimmed))
1427 } else {
1428 select_expr.clone()
1429 }
1430}
1431
1432fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1434 if scope.cte_sources.contains_key(source_name) {
1436 for cte_scope in &scope.cte_scopes {
1437 if let Expression::Cte(cte) = &cte_scope.expression {
1438 if cte.alias.name == source_name {
1439 return Some(cte_scope);
1440 }
1441 }
1442 }
1443 }
1444
1445 if let Some(source_info) = scope.sources.get(source_name) {
1447 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1448 if let Expression::Subquery(sq) = &source_info.expression {
1449 for dt_scope in &scope.derived_table_scopes {
1450 if dt_scope.expression == sq.this {
1451 return Some(dt_scope);
1452 }
1453 }
1454 }
1455 }
1456 }
1457
1458 None
1459}
1460
1461fn find_child_scope_in<'a>(
1465 all_cte_scopes: &[&'a Scope],
1466 scope: &'a Scope,
1467 source_name: &str,
1468) -> Option<&'a Scope> {
1469 for cte_scope in &scope.cte_scopes {
1471 if let Expression::Cte(cte) = &cte_scope.expression {
1472 if cte.alias.name == source_name {
1473 return Some(cte_scope);
1474 }
1475 }
1476 }
1477
1478 for cte_scope in all_cte_scopes {
1480 if let Expression::Cte(cte) = &cte_scope.expression {
1481 if cte.alias.name == source_name {
1482 return Some(cte_scope);
1483 }
1484 }
1485 }
1486
1487 if let Some(source_info) = scope.sources.get(source_name) {
1489 if source_info.is_scope {
1490 if let Expression::Subquery(sq) = &source_info.expression {
1491 for dt_scope in &scope.derived_table_scopes {
1492 if dt_scope.expression == sq.this {
1493 return Some(dt_scope);
1494 }
1495 }
1496 }
1497 }
1498 }
1499
1500 None
1501}
1502
1503fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1505 let mut node = LineageNode::new(
1506 format!("{}.{}", table, column),
1507 Expression::Column(Box::new(crate::expressions::Column {
1508 name: crate::expressions::Identifier::new(column.to_string()),
1509 table: Some(crate::expressions::Identifier::new(table.to_string())),
1510 join_mark: false,
1511 trailing_comments: vec![],
1512 span: None,
1513 inferred_type: None,
1514 })),
1515 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1516 );
1517 node.source_name = table.to_string();
1518 node.source_kind = SourceKind::Table;
1519 node
1520}
1521
1522fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1523 let mut parts: Vec<String> = Vec::new();
1524 if let Some(catalog) = &table_ref.catalog {
1525 parts.push(catalog.name.clone());
1526 }
1527 if let Some(schema) = &table_ref.schema {
1528 parts.push(schema.name.clone());
1529 }
1530 parts.push(table_ref.name.name.clone());
1531 parts.join(".")
1532}
1533
1534fn apply_source_info_context(
1535 node: &mut LineageNode,
1536 source_key: &str,
1537 source_info: &ScopeSourceInfo,
1538) {
1539 node.source_kind = source_info.kind;
1540 node.source_name =
1541 source_info
1542 .lineage_name
1543 .clone()
1544 .unwrap_or_else(|| match &source_info.expression {
1545 Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1546 _ => source_key.to_string(),
1547 });
1548 node.source_alias = source_info.alias.clone();
1549}
1550
1551fn make_table_column_node_from_source(
1552 source_key: &str,
1553 column: &str,
1554 source_info: &ScopeSourceInfo,
1555) -> LineageNode {
1556 let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1557 let mut node = LineageNode::new(
1558 format!("{}.{}", lineage_name, column),
1559 Expression::Column(Box::new(crate::expressions::Column {
1560 name: crate::expressions::Identifier::new(column.to_string()),
1561 table: Some(crate::expressions::Identifier::new(
1562 lineage_name.to_string(),
1563 )),
1564 join_mark: false,
1565 trailing_comments: vec![],
1566 span: None,
1567 inferred_type: None,
1568 })),
1569 source_info.expression.clone(),
1570 );
1571
1572 apply_source_info_context(&mut node, source_key, source_info);
1573
1574 node
1575}
1576
1577#[derive(Debug, Clone)]
1579struct SimpleColumnRef {
1580 table: Option<crate::expressions::Identifier>,
1581 column: String,
1582}
1583
1584fn find_column_refs_in_expr(
1586 expr: &Expression,
1587 dialect: Option<DialectType>,
1588) -> Vec<SimpleColumnRef> {
1589 let mut refs = Vec::new();
1590 collect_column_refs(expr, dialect, &mut refs);
1591 refs
1592}
1593
1594fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1595 match expr {
1596 Expression::Column(col) => {
1597 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1598 }
1599 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1600 _ => false,
1601 }
1602}
1603
1604fn collect_column_refs(
1605 expr: &Expression,
1606 dialect: Option<DialectType>,
1607 refs: &mut Vec<SimpleColumnRef>,
1608) {
1609 let mut stack: Vec<&Expression> = vec![expr];
1610
1611 while let Some(current) = stack.pop() {
1612 match current {
1613 Expression::Column(col) => {
1615 refs.push(SimpleColumnRef {
1616 table: col.table.clone(),
1617 column: col.name.name.clone(),
1618 });
1619 }
1620
1621 Expression::Subquery(_) | Expression::Exists(_) => {}
1623
1624 Expression::And(op)
1626 | Expression::Or(op)
1627 | Expression::Eq(op)
1628 | Expression::Neq(op)
1629 | Expression::Lt(op)
1630 | Expression::Lte(op)
1631 | Expression::Gt(op)
1632 | Expression::Gte(op)
1633 | Expression::Add(op)
1634 | Expression::Sub(op)
1635 | Expression::Mul(op)
1636 | Expression::Div(op)
1637 | Expression::Mod(op)
1638 | Expression::BitwiseAnd(op)
1639 | Expression::BitwiseOr(op)
1640 | Expression::BitwiseXor(op)
1641 | Expression::BitwiseLeftShift(op)
1642 | Expression::BitwiseRightShift(op)
1643 | Expression::Concat(op)
1644 | Expression::Adjacent(op)
1645 | Expression::TsMatch(op)
1646 | Expression::PropertyEQ(op)
1647 | Expression::ArrayContainsAll(op)
1648 | Expression::ArrayContainedBy(op)
1649 | Expression::ArrayOverlaps(op)
1650 | Expression::JSONBContainsAllTopKeys(op)
1651 | Expression::JSONBContainsAnyTopKeys(op)
1652 | Expression::JSONBDeleteAtPath(op)
1653 | Expression::ExtendsLeft(op)
1654 | Expression::ExtendsRight(op)
1655 | Expression::Is(op)
1656 | Expression::MemberOf(op)
1657 | Expression::NullSafeEq(op)
1658 | Expression::NullSafeNeq(op)
1659 | Expression::Glob(op)
1660 | Expression::Match(op) => {
1661 stack.push(&op.left);
1662 stack.push(&op.right);
1663 }
1664
1665 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1667 stack.push(&u.this);
1668 }
1669
1670 Expression::Upper(f)
1672 | Expression::Lower(f)
1673 | Expression::Length(f)
1674 | Expression::LTrim(f)
1675 | Expression::RTrim(f)
1676 | Expression::Reverse(f)
1677 | Expression::Abs(f)
1678 | Expression::Sqrt(f)
1679 | Expression::Cbrt(f)
1680 | Expression::Ln(f)
1681 | Expression::Exp(f)
1682 | Expression::Sign(f)
1683 | Expression::Date(f)
1684 | Expression::Time(f)
1685 | Expression::DateFromUnixDate(f)
1686 | Expression::UnixDate(f)
1687 | Expression::UnixSeconds(f)
1688 | Expression::UnixMillis(f)
1689 | Expression::UnixMicros(f)
1690 | Expression::TimeStrToDate(f)
1691 | Expression::DateToDi(f)
1692 | Expression::DiToDate(f)
1693 | Expression::TsOrDiToDi(f)
1694 | Expression::TsOrDsToDatetime(f)
1695 | Expression::TsOrDsToTimestamp(f)
1696 | Expression::YearOfWeek(f)
1697 | Expression::YearOfWeekIso(f)
1698 | Expression::Initcap(f)
1699 | Expression::Ascii(f)
1700 | Expression::Chr(f)
1701 | Expression::Soundex(f)
1702 | Expression::ByteLength(f)
1703 | Expression::Hex(f)
1704 | Expression::LowerHex(f)
1705 | Expression::Unicode(f)
1706 | Expression::Radians(f)
1707 | Expression::Degrees(f)
1708 | Expression::Sin(f)
1709 | Expression::Cos(f)
1710 | Expression::Tan(f)
1711 | Expression::Asin(f)
1712 | Expression::Acos(f)
1713 | Expression::Atan(f)
1714 | Expression::IsNan(f)
1715 | Expression::IsInf(f)
1716 | Expression::ArrayLength(f)
1717 | Expression::ArraySize(f)
1718 | Expression::Cardinality(f)
1719 | Expression::ArrayReverse(f)
1720 | Expression::ArrayDistinct(f)
1721 | Expression::ArrayFlatten(f)
1722 | Expression::ArrayCompact(f)
1723 | Expression::Explode(f)
1724 | Expression::ExplodeOuter(f)
1725 | Expression::ToArray(f)
1726 | Expression::MapFromEntries(f)
1727 | Expression::MapKeys(f)
1728 | Expression::MapValues(f)
1729 | Expression::JsonArrayLength(f)
1730 | Expression::JsonKeys(f)
1731 | Expression::JsonType(f)
1732 | Expression::ParseJson(f)
1733 | Expression::ToJson(f)
1734 | Expression::Typeof(f)
1735 | Expression::BitwiseCount(f)
1736 | Expression::Year(f)
1737 | Expression::Month(f)
1738 | Expression::Day(f)
1739 | Expression::Hour(f)
1740 | Expression::Minute(f)
1741 | Expression::Second(f)
1742 | Expression::DayOfWeek(f)
1743 | Expression::DayOfWeekIso(f)
1744 | Expression::DayOfMonth(f)
1745 | Expression::DayOfYear(f)
1746 | Expression::WeekOfYear(f)
1747 | Expression::Quarter(f)
1748 | Expression::Epoch(f)
1749 | Expression::EpochMs(f)
1750 | Expression::TimeStrToUnix(f)
1751 | Expression::SHA(f)
1752 | Expression::SHA1Digest(f)
1753 | Expression::TimeToUnix(f)
1754 | Expression::JSONBool(f)
1755 | Expression::Int64(f)
1756 | Expression::MD5NumberLower64(f)
1757 | Expression::MD5NumberUpper64(f)
1758 | Expression::DateStrToDate(f)
1759 | Expression::DateToDateStr(f) => {
1760 stack.push(&f.this);
1761 }
1762
1763 Expression::Power(f)
1765 | Expression::NullIf(f)
1766 | Expression::IfNull(f)
1767 | Expression::Nvl(f)
1768 | Expression::UnixToTimeStr(f)
1769 | Expression::Contains(f)
1770 | Expression::StartsWith(f)
1771 | Expression::EndsWith(f)
1772 | Expression::Levenshtein(f)
1773 | Expression::ModFunc(f)
1774 | Expression::Atan2(f)
1775 | Expression::IntDiv(f)
1776 | Expression::AddMonths(f)
1777 | Expression::MonthsBetween(f)
1778 | Expression::NextDay(f)
1779 | Expression::ArrayContains(f)
1780 | Expression::ArrayPosition(f)
1781 | Expression::ArrayAppend(f)
1782 | Expression::ArrayPrepend(f)
1783 | Expression::ArrayUnion(f)
1784 | Expression::ArrayExcept(f)
1785 | Expression::ArrayRemove(f)
1786 | Expression::StarMap(f)
1787 | Expression::MapFromArrays(f)
1788 | Expression::MapContainsKey(f)
1789 | Expression::ElementAt(f)
1790 | Expression::JsonMergePatch(f)
1791 | Expression::JSONBContains(f)
1792 | Expression::JSONBExtract(f) => {
1793 stack.push(&f.this);
1794 stack.push(&f.expression);
1795 }
1796
1797 Expression::Greatest(f)
1799 | Expression::Least(f)
1800 | Expression::Coalesce(f)
1801 | Expression::ArrayConcat(f)
1802 | Expression::ArrayIntersect(f)
1803 | Expression::ArrayZip(f)
1804 | Expression::MapConcat(f)
1805 | Expression::JsonArray(f) => {
1806 for e in &f.expressions {
1807 stack.push(e);
1808 }
1809 }
1810
1811 Expression::Sum(f)
1813 | Expression::Avg(f)
1814 | Expression::Min(f)
1815 | Expression::Max(f)
1816 | Expression::ArrayAgg(f)
1817 | Expression::CountIf(f)
1818 | Expression::Stddev(f)
1819 | Expression::StddevPop(f)
1820 | Expression::StddevSamp(f)
1821 | Expression::Variance(f)
1822 | Expression::VarPop(f)
1823 | Expression::VarSamp(f)
1824 | Expression::Median(f)
1825 | Expression::Mode(f)
1826 | Expression::First(f)
1827 | Expression::Last(f)
1828 | Expression::AnyValue(f)
1829 | Expression::ApproxDistinct(f)
1830 | Expression::ApproxCountDistinct(f)
1831 | Expression::LogicalAnd(f)
1832 | Expression::LogicalOr(f)
1833 | Expression::Skewness(f)
1834 | Expression::ArrayConcatAgg(f)
1835 | Expression::ArrayUniqueAgg(f)
1836 | Expression::BoolXorAgg(f)
1837 | Expression::BitwiseAndAgg(f)
1838 | Expression::BitwiseOrAgg(f)
1839 | Expression::BitwiseXorAgg(f) => {
1840 stack.push(&f.this);
1841 if let Some(ref filter) = f.filter {
1842 stack.push(filter);
1843 }
1844 if let Some((ref expr, _)) = f.having_max {
1845 stack.push(expr);
1846 }
1847 if let Some(ref limit) = f.limit {
1848 stack.push(limit);
1849 }
1850 }
1851
1852 Expression::Function(func) => {
1854 for arg in &func.args {
1855 stack.push(arg);
1856 }
1857 }
1858 Expression::AggregateFunction(func) => {
1859 for arg in &func.args {
1860 stack.push(arg);
1861 }
1862 if let Some(ref filter) = func.filter {
1863 stack.push(filter);
1864 }
1865 if let Some(ref limit) = func.limit {
1866 stack.push(limit);
1867 }
1868 }
1869
1870 Expression::WindowFunction(wf) => {
1872 stack.push(&wf.this);
1873 }
1874
1875 Expression::Alias(a) => {
1877 stack.push(&a.this);
1878 }
1879 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1880 stack.push(&c.this);
1881 if let Some(ref fmt) = c.format {
1882 stack.push(fmt);
1883 }
1884 if let Some(ref def) = c.default {
1885 stack.push(def);
1886 }
1887 }
1888 Expression::Paren(p) => {
1889 stack.push(&p.this);
1890 }
1891 Expression::Annotated(a) => {
1892 stack.push(&a.this);
1893 }
1894 Expression::Case(case) => {
1895 if let Some(ref operand) = case.operand {
1896 stack.push(operand);
1897 }
1898 for (cond, result) in &case.whens {
1899 stack.push(cond);
1900 stack.push(result);
1901 }
1902 if let Some(ref else_expr) = case.else_ {
1903 stack.push(else_expr);
1904 }
1905 }
1906 Expression::Collation(c) => {
1907 stack.push(&c.this);
1908 }
1909 Expression::In(i) => {
1910 stack.push(&i.this);
1911 for e in &i.expressions {
1912 stack.push(e);
1913 }
1914 if let Some(ref q) = i.query {
1915 stack.push(q);
1916 }
1917 if let Some(ref u) = i.unnest {
1918 stack.push(u);
1919 }
1920 }
1921 Expression::Between(b) => {
1922 stack.push(&b.this);
1923 stack.push(&b.low);
1924 stack.push(&b.high);
1925 }
1926 Expression::IsNull(n) => {
1927 stack.push(&n.this);
1928 }
1929 Expression::IsTrue(t) | Expression::IsFalse(t) => {
1930 stack.push(&t.this);
1931 }
1932 Expression::IsJson(j) => {
1933 stack.push(&j.this);
1934 }
1935 Expression::Like(l) | Expression::ILike(l) => {
1936 stack.push(&l.left);
1937 stack.push(&l.right);
1938 if let Some(ref esc) = l.escape {
1939 stack.push(esc);
1940 }
1941 }
1942 Expression::SimilarTo(s) => {
1943 stack.push(&s.this);
1944 stack.push(&s.pattern);
1945 if let Some(ref esc) = s.escape {
1946 stack.push(esc);
1947 }
1948 }
1949 Expression::Ordered(o) => {
1950 stack.push(&o.this);
1951 }
1952 Expression::Array(a) => {
1953 for e in &a.expressions {
1954 stack.push(e);
1955 }
1956 }
1957 Expression::Tuple(t) => {
1958 for e in &t.expressions {
1959 stack.push(e);
1960 }
1961 }
1962 Expression::Struct(s) => {
1963 for (_, e) in &s.fields {
1964 stack.push(e);
1965 }
1966 }
1967 Expression::Subscript(s) => {
1968 stack.push(&s.this);
1969 stack.push(&s.index);
1970 }
1971 Expression::Dot(d) => {
1972 stack.push(&d.this);
1973 }
1974 Expression::MethodCall(m) => {
1975 if !matches!(dialect, Some(DialectType::BigQuery))
1976 || !is_bigquery_safe_namespace_receiver(&m.this)
1977 {
1978 stack.push(&m.this);
1979 }
1980 for arg in &m.args {
1981 stack.push(arg);
1982 }
1983 }
1984 Expression::ArraySlice(s) => {
1985 stack.push(&s.this);
1986 if let Some(ref start) = s.start {
1987 stack.push(start);
1988 }
1989 if let Some(ref end) = s.end {
1990 stack.push(end);
1991 }
1992 }
1993 Expression::Lambda(l) => {
1994 stack.push(&l.body);
1995 }
1996 Expression::NamedArgument(n) => {
1997 stack.push(&n.value);
1998 }
1999 Expression::Lateral(l) => {
2000 stack.push(&l.this);
2001 if let Some(ref view) = l.view {
2002 stack.push(view);
2003 }
2004 if let Some(ref outer) = l.outer {
2005 stack.push(outer);
2006 }
2007 if let Some(ref ordinality) = l.ordinality {
2008 stack.push(ordinality);
2009 }
2010 }
2011 Expression::LateralView(lv) => {
2012 stack.push(&lv.this);
2013 }
2014 Expression::TryCatch(t) => {
2015 for stmt in &t.try_body {
2016 stack.push(stmt);
2017 }
2018 if let Some(catch_body) = &t.catch_body {
2019 for stmt in catch_body {
2020 stack.push(stmt);
2021 }
2022 }
2023 }
2024 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2025 stack.push(e);
2026 }
2027
2028 Expression::Substring(f) => {
2030 stack.push(&f.this);
2031 stack.push(&f.start);
2032 if let Some(ref len) = f.length {
2033 stack.push(len);
2034 }
2035 }
2036 Expression::Trim(f) => {
2037 stack.push(&f.this);
2038 if let Some(ref chars) = f.characters {
2039 stack.push(chars);
2040 }
2041 }
2042 Expression::Replace(f) => {
2043 stack.push(&f.this);
2044 stack.push(&f.old);
2045 stack.push(&f.new);
2046 }
2047 Expression::IfFunc(f) => {
2048 stack.push(&f.condition);
2049 stack.push(&f.true_value);
2050 if let Some(ref fv) = f.false_value {
2051 stack.push(fv);
2052 }
2053 }
2054 Expression::Nvl2(f) => {
2055 stack.push(&f.this);
2056 stack.push(&f.true_value);
2057 stack.push(&f.false_value);
2058 }
2059 Expression::ConcatWs(f) => {
2060 stack.push(&f.separator);
2061 for e in &f.expressions {
2062 stack.push(e);
2063 }
2064 }
2065 Expression::Count(f) => {
2066 if let Some(ref this) = f.this {
2067 stack.push(this);
2068 }
2069 if let Some(ref filter) = f.filter {
2070 stack.push(filter);
2071 }
2072 }
2073 Expression::GroupConcat(f) => {
2074 stack.push(&f.this);
2075 if let Some(ref sep) = f.separator {
2076 stack.push(sep);
2077 }
2078 if let Some(ref filter) = f.filter {
2079 stack.push(filter);
2080 }
2081 }
2082 Expression::StringAgg(f) => {
2083 stack.push(&f.this);
2084 if let Some(ref sep) = f.separator {
2085 stack.push(sep);
2086 }
2087 if let Some(ref filter) = f.filter {
2088 stack.push(filter);
2089 }
2090 if let Some(ref limit) = f.limit {
2091 stack.push(limit);
2092 }
2093 }
2094 Expression::ListAgg(f) => {
2095 stack.push(&f.this);
2096 if let Some(ref sep) = f.separator {
2097 stack.push(sep);
2098 }
2099 if let Some(ref filter) = f.filter {
2100 stack.push(filter);
2101 }
2102 }
2103 Expression::SumIf(f) => {
2104 stack.push(&f.this);
2105 stack.push(&f.condition);
2106 if let Some(ref filter) = f.filter {
2107 stack.push(filter);
2108 }
2109 }
2110 Expression::DateAdd(f) | Expression::DateSub(f) => {
2111 stack.push(&f.this);
2112 stack.push(&f.interval);
2113 }
2114 Expression::DateDiff(f) => {
2115 stack.push(&f.this);
2116 stack.push(&f.expression);
2117 }
2118 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2119 stack.push(&f.this);
2120 }
2121 Expression::Extract(f) => {
2122 stack.push(&f.this);
2123 }
2124 Expression::Round(f) => {
2125 stack.push(&f.this);
2126 if let Some(ref d) = f.decimals {
2127 stack.push(d);
2128 }
2129 }
2130 Expression::Floor(f) => {
2131 stack.push(&f.this);
2132 if let Some(ref s) = f.scale {
2133 stack.push(s);
2134 }
2135 if let Some(ref t) = f.to {
2136 stack.push(t);
2137 }
2138 }
2139 Expression::Ceil(f) => {
2140 stack.push(&f.this);
2141 if let Some(ref d) = f.decimals {
2142 stack.push(d);
2143 }
2144 if let Some(ref t) = f.to {
2145 stack.push(t);
2146 }
2147 }
2148 Expression::Log(f) => {
2149 stack.push(&f.this);
2150 if let Some(ref b) = f.base {
2151 stack.push(b);
2152 }
2153 }
2154 Expression::AtTimeZone(f) => {
2155 stack.push(&f.this);
2156 stack.push(&f.zone);
2157 }
2158 Expression::Lead(f) | Expression::Lag(f) => {
2159 stack.push(&f.this);
2160 if let Some(ref off) = f.offset {
2161 stack.push(off);
2162 }
2163 if let Some(ref def) = f.default {
2164 stack.push(def);
2165 }
2166 }
2167 Expression::FirstValue(f) | Expression::LastValue(f) => {
2168 stack.push(&f.this);
2169 }
2170 Expression::NthValue(f) => {
2171 stack.push(&f.this);
2172 stack.push(&f.offset);
2173 }
2174 Expression::Position(f) => {
2175 stack.push(&f.substring);
2176 stack.push(&f.string);
2177 if let Some(ref start) = f.start {
2178 stack.push(start);
2179 }
2180 }
2181 Expression::Decode(f) => {
2182 stack.push(&f.this);
2183 for (search, result) in &f.search_results {
2184 stack.push(search);
2185 stack.push(result);
2186 }
2187 if let Some(ref def) = f.default {
2188 stack.push(def);
2189 }
2190 }
2191 Expression::CharFunc(f) => {
2192 for arg in &f.args {
2193 stack.push(arg);
2194 }
2195 }
2196 Expression::ArraySort(f) => {
2197 stack.push(&f.this);
2198 if let Some(ref cmp) = f.comparator {
2199 stack.push(cmp);
2200 }
2201 }
2202 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2203 stack.push(&f.this);
2204 stack.push(&f.separator);
2205 if let Some(ref nr) = f.null_replacement {
2206 stack.push(nr);
2207 }
2208 }
2209 Expression::ArrayFilter(f) => {
2210 stack.push(&f.this);
2211 stack.push(&f.filter);
2212 }
2213 Expression::ArrayTransform(f) => {
2214 stack.push(&f.this);
2215 stack.push(&f.transform);
2216 }
2217 Expression::Sequence(f)
2218 | Expression::Generate(f)
2219 | Expression::ExplodingGenerateSeries(f) => {
2220 stack.push(&f.start);
2221 stack.push(&f.stop);
2222 if let Some(ref step) = f.step {
2223 stack.push(step);
2224 }
2225 }
2226 Expression::JsonExtract(f)
2227 | Expression::JsonExtractScalar(f)
2228 | Expression::JsonQuery(f)
2229 | Expression::JsonValue(f) => {
2230 stack.push(&f.this);
2231 stack.push(&f.path);
2232 }
2233 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2234 stack.push(&f.this);
2235 for p in &f.paths {
2236 stack.push(p);
2237 }
2238 }
2239 Expression::JsonObject(f) => {
2240 for (k, v) in &f.pairs {
2241 stack.push(k);
2242 stack.push(v);
2243 }
2244 }
2245 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2246 stack.push(&f.this);
2247 for (path, val) in &f.path_values {
2248 stack.push(path);
2249 stack.push(val);
2250 }
2251 }
2252 Expression::Overlay(f) => {
2253 stack.push(&f.this);
2254 stack.push(&f.replacement);
2255 stack.push(&f.from);
2256 if let Some(ref len) = f.length {
2257 stack.push(len);
2258 }
2259 }
2260 Expression::Convert(f) => {
2261 stack.push(&f.this);
2262 if let Some(ref style) = f.style {
2263 stack.push(style);
2264 }
2265 }
2266 Expression::ApproxPercentile(f) => {
2267 stack.push(&f.this);
2268 stack.push(&f.percentile);
2269 if let Some(ref acc) = f.accuracy {
2270 stack.push(acc);
2271 }
2272 if let Some(ref filter) = f.filter {
2273 stack.push(filter);
2274 }
2275 }
2276 Expression::Percentile(f)
2277 | Expression::PercentileCont(f)
2278 | Expression::PercentileDisc(f) => {
2279 stack.push(&f.this);
2280 stack.push(&f.percentile);
2281 if let Some(ref filter) = f.filter {
2282 stack.push(filter);
2283 }
2284 }
2285 Expression::WithinGroup(f) => {
2286 stack.push(&f.this);
2287 }
2288 Expression::Left(f) | Expression::Right(f) => {
2289 stack.push(&f.this);
2290 stack.push(&f.length);
2291 }
2292 Expression::Repeat(f) => {
2293 stack.push(&f.this);
2294 stack.push(&f.times);
2295 }
2296 Expression::Lpad(f) | Expression::Rpad(f) => {
2297 stack.push(&f.this);
2298 stack.push(&f.length);
2299 if let Some(ref fill) = f.fill {
2300 stack.push(fill);
2301 }
2302 }
2303 Expression::Split(f) => {
2304 stack.push(&f.this);
2305 stack.push(&f.delimiter);
2306 }
2307 Expression::RegexpLike(f) => {
2308 stack.push(&f.this);
2309 stack.push(&f.pattern);
2310 if let Some(ref flags) = f.flags {
2311 stack.push(flags);
2312 }
2313 }
2314 Expression::RegexpReplace(f) => {
2315 stack.push(&f.this);
2316 stack.push(&f.pattern);
2317 stack.push(&f.replacement);
2318 if let Some(ref flags) = f.flags {
2319 stack.push(flags);
2320 }
2321 }
2322 Expression::RegexpExtract(f) => {
2323 stack.push(&f.this);
2324 stack.push(&f.pattern);
2325 if let Some(ref group) = f.group {
2326 stack.push(group);
2327 }
2328 }
2329 Expression::ToDate(f) => {
2330 stack.push(&f.this);
2331 if let Some(ref fmt) = f.format {
2332 stack.push(fmt);
2333 }
2334 }
2335 Expression::ToTimestamp(f) => {
2336 stack.push(&f.this);
2337 if let Some(ref fmt) = f.format {
2338 stack.push(fmt);
2339 }
2340 }
2341 Expression::DateFormat(f) | Expression::FormatDate(f) => {
2342 stack.push(&f.this);
2343 stack.push(&f.format);
2344 }
2345 Expression::LastDay(f) => {
2346 stack.push(&f.this);
2347 }
2348 Expression::FromUnixtime(f) => {
2349 stack.push(&f.this);
2350 if let Some(ref fmt) = f.format {
2351 stack.push(fmt);
2352 }
2353 }
2354 Expression::UnixTimestamp(f) => {
2355 if let Some(ref this) = f.this {
2356 stack.push(this);
2357 }
2358 if let Some(ref fmt) = f.format {
2359 stack.push(fmt);
2360 }
2361 }
2362 Expression::MakeDate(f) => {
2363 stack.push(&f.year);
2364 stack.push(&f.month);
2365 stack.push(&f.day);
2366 }
2367 Expression::MakeTimestamp(f) => {
2368 stack.push(&f.year);
2369 stack.push(&f.month);
2370 stack.push(&f.day);
2371 stack.push(&f.hour);
2372 stack.push(&f.minute);
2373 stack.push(&f.second);
2374 if let Some(ref tz) = f.timezone {
2375 stack.push(tz);
2376 }
2377 }
2378 Expression::TruncFunc(f) => {
2379 stack.push(&f.this);
2380 if let Some(ref d) = f.decimals {
2381 stack.push(d);
2382 }
2383 }
2384 Expression::ArrayFunc(f) => {
2385 for e in &f.expressions {
2386 stack.push(e);
2387 }
2388 }
2389 Expression::Unnest(f) => {
2390 stack.push(&f.this);
2391 for e in &f.expressions {
2392 stack.push(e);
2393 }
2394 }
2395 Expression::StructFunc(f) => {
2396 for (_, e) in &f.fields {
2397 stack.push(e);
2398 }
2399 }
2400 Expression::StructExtract(f) => {
2401 stack.push(&f.this);
2402 }
2403 Expression::NamedStruct(f) => {
2404 for (k, v) in &f.pairs {
2405 stack.push(k);
2406 stack.push(v);
2407 }
2408 }
2409 Expression::MapFunc(f) => {
2410 for k in &f.keys {
2411 stack.push(k);
2412 }
2413 for v in &f.values {
2414 stack.push(v);
2415 }
2416 }
2417 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2418 stack.push(&f.this);
2419 stack.push(&f.transform);
2420 }
2421 Expression::JsonArrayAgg(f) => {
2422 stack.push(&f.this);
2423 if let Some(ref filter) = f.filter {
2424 stack.push(filter);
2425 }
2426 }
2427 Expression::JsonObjectAgg(f) => {
2428 stack.push(&f.key);
2429 stack.push(&f.value);
2430 if let Some(ref filter) = f.filter {
2431 stack.push(filter);
2432 }
2433 }
2434 Expression::NTile(f) => {
2435 if let Some(ref n) = f.num_buckets {
2436 stack.push(n);
2437 }
2438 }
2439 Expression::Rand(f) => {
2440 if let Some(ref s) = f.seed {
2441 stack.push(s);
2442 }
2443 if let Some(ref lo) = f.lower {
2444 stack.push(lo);
2445 }
2446 if let Some(ref hi) = f.upper {
2447 stack.push(hi);
2448 }
2449 }
2450 Expression::Any(q) | Expression::All(q) => {
2451 stack.push(&q.this);
2452 stack.push(&q.subquery);
2453 }
2454 Expression::Overlaps(o) => {
2455 if let Some(ref this) = o.this {
2456 stack.push(this);
2457 }
2458 if let Some(ref expr) = o.expression {
2459 stack.push(expr);
2460 }
2461 if let Some(ref ls) = o.left_start {
2462 stack.push(ls);
2463 }
2464 if let Some(ref le) = o.left_end {
2465 stack.push(le);
2466 }
2467 if let Some(ref rs) = o.right_start {
2468 stack.push(rs);
2469 }
2470 if let Some(ref re) = o.right_end {
2471 stack.push(re);
2472 }
2473 }
2474 Expression::Interval(i) => {
2475 if let Some(ref this) = i.this {
2476 stack.push(this);
2477 }
2478 }
2479 Expression::TimeStrToTime(f) => {
2480 stack.push(&f.this);
2481 if let Some(ref zone) = f.zone {
2482 stack.push(zone);
2483 }
2484 }
2485 Expression::JSONBExtractScalar(f) => {
2486 stack.push(&f.this);
2487 stack.push(&f.expression);
2488 if let Some(ref jt) = f.json_type {
2489 stack.push(jt);
2490 }
2491 }
2492 Expression::JSONExtract(f) => {
2493 stack.push(&f.this);
2494 stack.push(&f.expression);
2495 for e in &f.expressions {
2496 stack.push(e);
2497 }
2498 if let Some(ref option) = f.option {
2499 stack.push(option);
2500 }
2501 if let Some(ref on_condition) = f.on_condition {
2502 stack.push(on_condition);
2503 }
2504 }
2505
2506 _ => {}
2511 }
2512 }
2513}
2514
2515#[cfg(test)]
2520mod tests {
2521 use super::*;
2522 use crate::dialects::{Dialect, DialectType};
2523 use crate::expressions::DataType;
2524 use crate::optimizer::annotate_types::annotate_types;
2525 use crate::parse_one;
2526 use crate::schema::{MappingSchema, Schema};
2527
2528 fn parse(sql: &str) -> Expression {
2529 let dialect = Dialect::get(DialectType::Generic);
2530 let ast = dialect.parse(sql).unwrap();
2531 ast.into_iter().next().unwrap()
2532 }
2533
2534 #[test]
2535 fn test_simple_lineage() {
2536 let expr = parse("SELECT a FROM t");
2537 let node = lineage("a", &expr, None, false).unwrap();
2538
2539 assert_eq!(node.name, "a");
2540 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2541 let names = node.downstream_names();
2543 assert!(
2544 names.iter().any(|n| n == "t.a"),
2545 "Expected t.a in downstream, got: {:?}",
2546 names
2547 );
2548 }
2549
2550 #[test]
2551 fn test_lineage_walk() {
2552 let root = LineageNode {
2553 name: "col_a".to_string(),
2554 expression: Expression::Null(crate::expressions::Null),
2555 source: Expression::Null(crate::expressions::Null),
2556 downstream: vec![LineageNode::new(
2557 "t.a",
2558 Expression::Null(crate::expressions::Null),
2559 Expression::Null(crate::expressions::Null),
2560 )],
2561 source_name: String::new(),
2562 source_kind: SourceKind::Unknown,
2563 source_alias: None,
2564 reference_node_name: String::new(),
2565 };
2566
2567 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2568 assert_eq!(names.len(), 2);
2569 assert_eq!(names[0], "col_a");
2570 assert_eq!(names[1], "t.a");
2571 }
2572
2573 #[test]
2574 fn test_aliased_column() {
2575 let expr = parse("SELECT a + 1 AS b FROM t");
2576 let node = lineage("b", &expr, None, false).unwrap();
2577
2578 assert_eq!(node.name, "b");
2579 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2581 assert!(
2582 all_names.iter().any(|n| n.contains("a")),
2583 "Expected to trace to column a, got: {:?}",
2584 all_names
2585 );
2586 }
2587
2588 #[test]
2589 fn test_qualified_column() {
2590 let expr = parse("SELECT t.a FROM t");
2591 let node = lineage("a", &expr, None, false).unwrap();
2592
2593 assert_eq!(node.name, "a");
2594 let names = node.downstream_names();
2595 assert!(
2596 names.iter().any(|n| n == "t.a"),
2597 "Expected t.a, got: {:?}",
2598 names
2599 );
2600 }
2601
2602 #[test]
2603 fn test_unqualified_column() {
2604 let expr = parse("SELECT a FROM t");
2605 let node = lineage("a", &expr, None, false).unwrap();
2606
2607 let names = node.downstream_names();
2609 assert!(
2610 names.iter().any(|n| n == "t.a"),
2611 "Expected t.a, got: {:?}",
2612 names
2613 );
2614 }
2615
2616 #[test]
2617 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2618 let query = "SELECT name FROM users";
2619 let dialect = Dialect::get(DialectType::BigQuery);
2620 let expr = dialect
2621 .parse(query)
2622 .unwrap()
2623 .into_iter()
2624 .next()
2625 .expect("expected one expression");
2626
2627 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2628 schema
2629 .add_table("users", &[("name".into(), DataType::Text)], None)
2630 .expect("schema setup");
2631
2632 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2633 .expect("lineage without schema");
2634 let mut expr_without = node_without_schema.expression.clone();
2635 annotate_types(
2636 &mut expr_without,
2637 Some(&schema),
2638 Some(DialectType::BigQuery),
2639 );
2640 assert_eq!(
2641 expr_without.inferred_type(),
2642 None,
2643 "Expected unresolved root type without schema-aware lineage qualification"
2644 );
2645
2646 let node_with_schema = lineage_with_schema(
2647 "name",
2648 &expr,
2649 Some(&schema),
2650 Some(DialectType::BigQuery),
2651 false,
2652 )
2653 .expect("lineage with schema");
2654 let mut expr_with = node_with_schema.expression.clone();
2655 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2656
2657 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2658 }
2659
2660 #[test]
2661 fn test_lineage_with_schema_correlated_scalar_subquery() {
2662 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2663 let dialect = Dialect::get(DialectType::BigQuery);
2664 let expr = dialect
2665 .parse(query)
2666 .unwrap()
2667 .into_iter()
2668 .next()
2669 .expect("expected one expression");
2670
2671 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2672 schema
2673 .add_table(
2674 "t1",
2675 &[("id".into(), DataType::BigInt { length: None })],
2676 None,
2677 )
2678 .expect("schema setup");
2679 schema
2680 .add_table(
2681 "t2",
2682 &[
2683 ("id".into(), DataType::BigInt { length: None }),
2684 ("val".into(), DataType::BigInt { length: None }),
2685 ],
2686 None,
2687 )
2688 .expect("schema setup");
2689
2690 let node = lineage_with_schema(
2691 "id",
2692 &expr,
2693 Some(&schema),
2694 Some(DialectType::BigQuery),
2695 false,
2696 )
2697 .expect("lineage_with_schema should handle correlated scalar subqueries");
2698
2699 assert_eq!(node.name, "id");
2700 }
2701
2702 #[test]
2703 fn test_lineage_with_schema_join_using() {
2704 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2705 let dialect = Dialect::get(DialectType::BigQuery);
2706 let expr = dialect
2707 .parse(query)
2708 .unwrap()
2709 .into_iter()
2710 .next()
2711 .expect("expected one expression");
2712
2713 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2714 schema
2715 .add_table(
2716 "t1",
2717 &[("a".into(), DataType::BigInt { length: None })],
2718 None,
2719 )
2720 .expect("schema setup");
2721 schema
2722 .add_table(
2723 "t2",
2724 &[("a".into(), DataType::BigInt { length: None })],
2725 None,
2726 )
2727 .expect("schema setup");
2728
2729 let node = lineage_with_schema(
2730 "a",
2731 &expr,
2732 Some(&schema),
2733 Some(DialectType::BigQuery),
2734 false,
2735 )
2736 .expect("lineage_with_schema should handle JOIN USING");
2737
2738 assert_eq!(node.name, "a");
2739 }
2740
2741 #[test]
2742 fn test_lineage_with_schema_qualified_table_name() {
2743 let query = "SELECT a FROM raw.t1";
2744 let dialect = Dialect::get(DialectType::BigQuery);
2745 let expr = dialect
2746 .parse(query)
2747 .unwrap()
2748 .into_iter()
2749 .next()
2750 .expect("expected one expression");
2751
2752 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2753 schema
2754 .add_table(
2755 "raw.t1",
2756 &[("a".into(), DataType::BigInt { length: None })],
2757 None,
2758 )
2759 .expect("schema setup");
2760
2761 let node = lineage_with_schema(
2762 "a",
2763 &expr,
2764 Some(&schema),
2765 Some(DialectType::BigQuery),
2766 false,
2767 )
2768 .expect("lineage_with_schema should handle dotted schema.table names");
2769
2770 assert_eq!(node.name, "a");
2771 }
2772
2773 #[test]
2774 fn test_lineage_with_schema_none_matches_lineage() {
2775 let expr = parse("SELECT a FROM t");
2776 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2777 let with_none =
2778 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2779
2780 assert_eq!(with_none.name, baseline.name);
2781 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2782 }
2783
2784 #[test]
2785 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2786 let dialect = Dialect::get(DialectType::BigQuery);
2787 let expr = dialect
2788 .parse("SELECT Name AS name FROM teams")
2789 .unwrap()
2790 .into_iter()
2791 .next()
2792 .expect("expected one expression");
2793
2794 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2795 schema
2796 .add_table(
2797 "teams",
2798 &[("Name".into(), DataType::String { length: None })],
2799 None,
2800 )
2801 .expect("schema setup");
2802
2803 let node = lineage_with_schema(
2804 "name",
2805 &expr,
2806 Some(&schema),
2807 Some(DialectType::BigQuery),
2808 false,
2809 )
2810 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2811
2812 let names = node.downstream_names();
2813 assert!(
2814 names.iter().any(|n| n == "teams.Name"),
2815 "Expected teams.Name in downstream, got: {:?}",
2816 names
2817 );
2818 }
2819
2820 #[test]
2821 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2822 let dialect = Dialect::get(DialectType::BigQuery);
2823 let expr = dialect
2824 .parse("SELECT Name AS Name FROM teams")
2825 .unwrap()
2826 .into_iter()
2827 .next()
2828 .expect("expected one expression");
2829
2830 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2831 .expect("lineage should resolve mixed-case aliases in BigQuery");
2832
2833 assert_eq!(node.name, "name");
2834 }
2835
2836 #[test]
2837 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2838 let expr = parse_one(
2839 r#"
2840SELECT date_val AS week_start
2841FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2842"#,
2843 DialectType::BigQuery,
2844 )
2845 .expect("parse");
2846
2847 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2848 .expect("lineage should resolve UNNEST alias as a source");
2849 let child = node
2850 .downstream
2851 .first()
2852 .expect("week_start should have downstream lineage");
2853
2854 assert_eq!(child.name, "_0.date_val");
2855 assert_eq!(child.source_name, "_0");
2856 assert_eq!(child.source_kind, SourceKind::Virtual);
2857 assert_eq!(child.source_alias.as_deref(), Some("date_val"));
2858
2859 let Expression::Column(column) = &child.expression else {
2860 panic!(
2861 "expected downstream column expression, got {:?}",
2862 child.expression
2863 );
2864 };
2865 assert_eq!(column.name.name, "date_val");
2866 assert_eq!(
2867 column.table.as_ref().map(|table| table.name.as_str()),
2868 Some("_0")
2869 );
2870 assert!(
2871 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2872 "expected UNNEST source expression, got {:?}",
2873 child.source
2874 );
2875 }
2876
2877 #[test]
2878 fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
2879 let expr =
2880 parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
2881
2882 let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2883 let child = node.downstream.first().expect("id should have lineage");
2884
2885 assert_eq!(child.name, "date_val.id");
2886 assert_eq!(child.source_name, "date_val");
2887 assert_eq!(child.source_kind, SourceKind::Table);
2888 assert_eq!(child.source_alias, None);
2889 }
2890
2891 #[test]
2892 fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
2893 let expr = parse_one(
2894 r#"
2895SELECT a.a AS first_value, b.b AS second_value
2896FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
2897JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
2898"#,
2899 DialectType::BigQuery,
2900 )
2901 .expect("parse");
2902
2903 let first =
2904 lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2905 let second =
2906 lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2907
2908 let first_child = first.downstream.first().expect("first source");
2909 let second_child = second.downstream.first().expect("second source");
2910
2911 assert_eq!(first_child.name, "_0.a");
2912 assert_eq!(first_child.source_name, "_0");
2913 assert_eq!(first_child.source_alias.as_deref(), Some("a"));
2914 assert_eq!(first_child.source_kind, SourceKind::Virtual);
2915
2916 assert_eq!(second_child.name, "_1.b");
2917 assert_eq!(second_child.source_name, "_1");
2918 assert_eq!(second_child.source_alias.as_deref(), Some("b"));
2919 assert_eq!(second_child.source_kind, SourceKind::Virtual);
2920 }
2921
2922 #[test]
2923 fn test_lineage_table_backed_unnest_points_to_real_source_column() {
2924 let expr = parse_one(
2925 r#"
2926SELECT item.item AS item
2927FROM t JOIN UNNEST(t.items) AS item ON TRUE
2928"#,
2929 DialectType::BigQuery,
2930 )
2931 .expect("parse");
2932
2933 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2934 let virtual_child = node.downstream.first().expect("virtual item source");
2935 assert_eq!(virtual_child.name, "_0.item");
2936 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2937
2938 let real_child = virtual_child
2939 .downstream
2940 .first()
2941 .expect("UNNEST(t.items) should depend on t.items");
2942 assert_eq!(real_child.name, "t.items");
2943 assert_eq!(real_child.source_name, "t");
2944 assert_eq!(real_child.source_kind, SourceKind::Table);
2945 }
2946
2947 #[test]
2948 fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
2949 let expr = parse_one(
2950 r#"
2951SELECT item AS item
2952FROM t JOIN UNNEST(t.items) AS item ON TRUE
2953"#,
2954 DialectType::BigQuery,
2955 )
2956 .expect("parse");
2957
2958 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2959 let virtual_child = node.downstream.first().expect("virtual item source");
2960 assert_eq!(virtual_child.name, "_0.item");
2961 assert_eq!(virtual_child.source_name, "_0");
2962 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2963 assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
2964
2965 let real_child = virtual_child
2966 .downstream
2967 .first()
2968 .expect("UNNEST(t.items) should depend on t.items");
2969 assert_eq!(real_child.name, "t.items");
2970 assert_eq!(real_child.source_name, "t");
2971 assert_eq!(real_child.source_kind, SourceKind::Table);
2972 }
2973
2974 #[test]
2975 fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
2976 let cases = [
2977 (
2978 DialectType::PostgreSQL,
2979 "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
2980 ),
2981 (
2982 DialectType::Presto,
2983 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
2984 ),
2985 (
2986 DialectType::Trino,
2987 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
2988 ),
2989 ];
2990
2991 for (dialect, sql) in cases {
2992 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
2993 let node = lineage("out", &expr, Some(dialect), false)
2994 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
2995 let virtual_child = node
2996 .downstream
2997 .first()
2998 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
2999
3000 assert_eq!(
3001 virtual_child.name, "_0.x",
3002 "unexpected virtual child for {dialect:?}"
3003 );
3004 assert_eq!(virtual_child.source_name, "_0");
3005 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3006 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3007
3008 let real_child = virtual_child
3009 .downstream
3010 .first()
3011 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3012 assert_eq!(real_child.name, "t.items");
3013 assert_eq!(real_child.source_kind, SourceKind::Table);
3014 }
3015 }
3016
3017 #[test]
3018 fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3019 let cases = [
3020 (
3021 DialectType::Spark,
3022 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3023 ),
3024 (
3025 DialectType::Hive,
3026 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3027 ),
3028 ];
3029
3030 for (dialect, sql) in cases {
3031 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3032 let node = lineage("out", &expr, Some(dialect), false)
3033 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3034 let virtual_child = node
3035 .downstream
3036 .first()
3037 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3038
3039 assert_eq!(virtual_child.name, "_0.x");
3040 assert_eq!(virtual_child.source_name, "_0");
3041 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3042 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3043
3044 let real_child = virtual_child
3045 .downstream
3046 .first()
3047 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3048 assert_eq!(real_child.name, "t.items");
3049 assert_eq!(real_child.source_kind, SourceKind::Table);
3050 }
3051 }
3052
3053 #[test]
3054 fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3055 let expr = parse_one(
3056 "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3057 DialectType::Snowflake,
3058 )
3059 .expect("parse");
3060
3061 let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3062 let virtual_child = node.downstream.first().expect("virtual flatten source");
3063 assert_eq!(virtual_child.name, "_0.value");
3064 assert_eq!(virtual_child.source_name, "_0");
3065 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3066 assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3067
3068 let real_child = virtual_child
3069 .downstream
3070 .first()
3071 .expect("FLATTEN input should depend on raw_events.payload");
3072 assert_eq!(real_child.name, "raw_events.payload");
3073 assert_eq!(real_child.source_kind, SourceKind::Table);
3074 }
3075
3076 #[test]
3077 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3078 let expr = parse_one(
3079 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3080 DialectType::Snowflake,
3081 )
3082 .expect("parse");
3083
3084 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3085 schema
3086 .add_table(
3087 "fact.some_daily_metrics",
3088 &[("date_utc".to_string(), DataType::Date)],
3089 None,
3090 )
3091 .expect("schema setup");
3092
3093 let node = lineage_with_schema(
3094 "recency",
3095 &expr,
3096 Some(&schema),
3097 Some(DialectType::Snowflake),
3098 false,
3099 )
3100 .expect("lineage_with_schema should not treat date part as a column");
3101
3102 let names = node.downstream_names();
3103 assert!(
3104 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3105 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3106 names
3107 );
3108 assert!(
3109 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3110 "Did not expect date part to appear as lineage column, got: {:?}",
3111 names
3112 );
3113 }
3114
3115 #[test]
3116 fn test_snowflake_datediff_parses_to_typed_ast() {
3117 let expr = parse_one(
3118 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3119 DialectType::Snowflake,
3120 )
3121 .expect("parse");
3122
3123 match expr {
3124 Expression::Select(select) => match &select.expressions[0] {
3125 Expression::Alias(alias) => match &alias.this {
3126 Expression::DateDiff(f) => {
3127 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3128 }
3129 other => panic!("expected DateDiff, got {other:?}"),
3130 },
3131 other => panic!("expected Alias, got {other:?}"),
3132 },
3133 other => panic!("expected Select, got {other:?}"),
3134 }
3135 }
3136
3137 #[test]
3138 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3139 let expr = parse_one(
3140 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3141 DialectType::Snowflake,
3142 )
3143 .expect("parse");
3144
3145 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3146 schema
3147 .add_table(
3148 "fact.some_daily_metrics",
3149 &[("date_utc".to_string(), DataType::Date)],
3150 None,
3151 )
3152 .expect("schema setup");
3153
3154 let node = lineage_with_schema(
3155 "next_day",
3156 &expr,
3157 Some(&schema),
3158 Some(DialectType::Snowflake),
3159 false,
3160 )
3161 .expect("lineage_with_schema should not treat DATEADD date part as a column");
3162
3163 let names = node.downstream_names();
3164 assert!(
3165 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3166 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3167 names
3168 );
3169 assert!(
3170 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3171 "Did not expect date part to appear as lineage column, got: {:?}",
3172 names
3173 );
3174 }
3175
3176 #[test]
3177 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3178 let expr = parse_one(
3179 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3180 DialectType::Snowflake,
3181 )
3182 .expect("parse");
3183
3184 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3185 schema
3186 .add_table(
3187 "fact.some_daily_metrics",
3188 &[("date_utc".to_string(), DataType::Date)],
3189 None,
3190 )
3191 .expect("schema setup");
3192
3193 let node = lineage_with_schema(
3194 "day_part",
3195 &expr,
3196 Some(&schema),
3197 Some(DialectType::Snowflake),
3198 false,
3199 )
3200 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3201
3202 let names = node.downstream_names();
3203 assert!(
3204 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3205 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3206 names
3207 );
3208 assert!(
3209 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3210 "Did not expect date part to appear as lineage column, got: {:?}",
3211 names
3212 );
3213 }
3214
3215 #[test]
3216 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3217 let expr = parse_one(
3218 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3219 DialectType::Snowflake,
3220 )
3221 .expect("parse");
3222
3223 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3224 schema
3225 .add_table(
3226 "fact.some_daily_metrics",
3227 &[("date_utc".to_string(), DataType::Date)],
3228 None,
3229 )
3230 .expect("schema setup");
3231
3232 let node = lineage_with_schema(
3233 "day_part",
3234 &expr,
3235 Some(&schema),
3236 Some(DialectType::Snowflake),
3237 false,
3238 )
3239 .expect("quoted DATE_PART should continue to work");
3240
3241 let names = node.downstream_names();
3242 assert!(
3243 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3244 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3245 names
3246 );
3247 }
3248
3249 #[test]
3250 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
3251 let expr = parse_one(
3252 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3253 DialectType::Snowflake,
3254 )
3255 .expect("parse");
3256
3257 match expr {
3258 Expression::Select(select) => match &select.expressions[0] {
3259 Expression::Alias(alias) => match &alias.this {
3260 Expression::Function(f) => {
3261 assert_eq!(f.name.to_uppercase(), "DATEADD");
3262 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3263 }
3264 other => panic!("expected generic DATEADD function, got {other:?}"),
3265 },
3266 other => panic!("expected Alias, got {other:?}"),
3267 },
3268 other => panic!("expected Select, got {other:?}"),
3269 }
3270 }
3271
3272 #[test]
3273 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
3274 let expr = parse_one(
3275 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3276 DialectType::Snowflake,
3277 )
3278 .expect("parse");
3279
3280 match expr {
3281 Expression::Select(select) => match &select.expressions[0] {
3282 Expression::Alias(alias) => match &alias.this {
3283 Expression::Function(f) => {
3284 assert_eq!(f.name.to_uppercase(), "DATE_PART");
3285 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3286 }
3287 other => panic!("expected generic DATE_PART function, got {other:?}"),
3288 },
3289 other => panic!("expected Alias, got {other:?}"),
3290 },
3291 other => panic!("expected Select, got {other:?}"),
3292 }
3293 }
3294
3295 #[test]
3296 fn test_snowflake_date_part_string_literal_stays_generic_function() {
3297 let expr = parse_one(
3298 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3299 DialectType::Snowflake,
3300 )
3301 .expect("parse");
3302
3303 match expr {
3304 Expression::Select(select) => match &select.expressions[0] {
3305 Expression::Alias(alias) => match &alias.this {
3306 Expression::Function(f) => {
3307 assert_eq!(f.name.to_uppercase(), "DATE_PART");
3308 }
3309 other => panic!("expected generic DATE_PART function, got {other:?}"),
3310 },
3311 other => panic!("expected Alias, got {other:?}"),
3312 },
3313 other => panic!("expected Select, got {other:?}"),
3314 }
3315 }
3316
3317 #[test]
3318 fn test_lineage_join() {
3319 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3320
3321 let node_a = lineage("a", &expr, None, false).unwrap();
3322 let names_a = node_a.downstream_names();
3323 assert!(
3324 names_a.iter().any(|n| n == "t.a"),
3325 "Expected t.a, got: {:?}",
3326 names_a
3327 );
3328
3329 let node_b = lineage("b", &expr, None, false).unwrap();
3330 let names_b = node_b.downstream_names();
3331 assert!(
3332 names_b.iter().any(|n| n == "s.b"),
3333 "Expected s.b, got: {:?}",
3334 names_b
3335 );
3336 }
3337
3338 #[test]
3339 fn test_lineage_alias_leaf_has_resolved_source_name() {
3340 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3341 let node = lineage("col1", &expr, None, false).unwrap();
3342
3343 let names = node.downstream_names();
3345 assert!(
3346 names.iter().any(|n| n == "t1.col1"),
3347 "Expected aliased column edge t1.col1, got: {:?}",
3348 names
3349 );
3350
3351 let leaf = node
3353 .downstream
3354 .iter()
3355 .find(|n| n.name == "t1.col1")
3356 .expect("Expected t1.col1 leaf");
3357 assert_eq!(leaf.source_name, "table1");
3358 match &leaf.source {
3359 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3360 _ => panic!("Expected leaf source to be a table expression"),
3361 }
3362 }
3363
3364 #[test]
3365 fn test_lineage_derived_table() {
3366 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3367 let node = lineage("a", &expr, None, false).unwrap();
3368
3369 assert_eq!(node.name, "a");
3370 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3372 assert!(
3373 all_names.iter().any(|n| n == "t.a"),
3374 "Expected to trace through derived table to t.a, got: {:?}",
3375 all_names
3376 );
3377 }
3378
3379 #[test]
3380 fn test_lineage_cte() {
3381 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3382 let node = lineage("a", &expr, None, false).unwrap();
3383
3384 assert_eq!(node.name, "a");
3385 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3386 assert!(
3387 all_names.iter().any(|n| n == "t.a"),
3388 "Expected to trace through CTE to t.a, got: {:?}",
3389 all_names
3390 );
3391 }
3392
3393 #[test]
3394 fn test_lineage_union() {
3395 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3396 let node = lineage("a", &expr, None, false).unwrap();
3397
3398 assert_eq!(node.name, "a");
3399 assert_eq!(
3401 node.downstream.len(),
3402 2,
3403 "Expected 2 branches for UNION, got {}",
3404 node.downstream.len()
3405 );
3406 }
3407
3408 #[test]
3409 fn test_lineage_cte_union() {
3410 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3411 let node = lineage("a", &expr, None, false).unwrap();
3412
3413 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3415 assert!(
3416 all_names.len() >= 3,
3417 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3418 all_names
3419 );
3420 }
3421
3422 #[test]
3423 fn test_lineage_star() {
3424 let expr = parse("SELECT * FROM t");
3425 let node = lineage("*", &expr, None, false).unwrap();
3426
3427 assert_eq!(node.name, "*");
3428 assert!(
3430 !node.downstream.is_empty(),
3431 "Star should produce downstream nodes"
3432 );
3433 }
3434
3435 #[test]
3436 fn test_lineage_subquery_in_select() {
3437 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3438 let node = lineage("x", &expr, None, false).unwrap();
3439
3440 assert_eq!(node.name, "x");
3441 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3443 assert!(
3444 all_names.len() >= 2,
3445 "Expected tracing into scalar subquery, got: {:?}",
3446 all_names
3447 );
3448 }
3449
3450 #[test]
3451 fn test_lineage_multiple_columns() {
3452 let expr = parse("SELECT a, b FROM t");
3453
3454 let node_a = lineage("a", &expr, None, false).unwrap();
3455 let node_b = lineage("b", &expr, None, false).unwrap();
3456
3457 assert_eq!(node_a.name, "a");
3458 assert_eq!(node_b.name, "b");
3459
3460 let names_a = node_a.downstream_names();
3462 let names_b = node_b.downstream_names();
3463 assert!(names_a.iter().any(|n| n == "t.a"));
3464 assert!(names_b.iter().any(|n| n == "t.b"));
3465 }
3466
3467 #[test]
3468 fn test_get_source_tables() {
3469 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3470 let node = lineage("a", &expr, None, false).unwrap();
3471
3472 let tables = get_source_tables(&node);
3473 assert!(
3474 tables.contains("t"),
3475 "Expected source table 't', got: {:?}",
3476 tables
3477 );
3478 }
3479
3480 #[test]
3481 fn test_lineage_column_not_found() {
3482 let expr = parse("SELECT a FROM t");
3483 let result = lineage("nonexistent", &expr, None, false);
3484 assert!(result.is_err());
3485 }
3486
3487 #[test]
3488 fn test_lineage_nested_cte() {
3489 let expr = parse(
3490 "WITH cte1 AS (SELECT a FROM t), \
3491 cte2 AS (SELECT a FROM cte1) \
3492 SELECT a FROM cte2",
3493 );
3494 let node = lineage("a", &expr, None, false).unwrap();
3495
3496 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3498 assert!(
3499 all_names.len() >= 3,
3500 "Expected to trace through nested CTEs, got: {:?}",
3501 all_names
3502 );
3503 }
3504
3505 #[test]
3506 fn test_trim_selects_true() {
3507 let expr = parse("SELECT a, b, c FROM t");
3508 let node = lineage("a", &expr, None, true).unwrap();
3509
3510 if let Expression::Select(select) = &node.source {
3512 assert_eq!(
3513 select.expressions.len(),
3514 1,
3515 "Trimmed source should have 1 expression, got {}",
3516 select.expressions.len()
3517 );
3518 } else {
3519 panic!("Expected Select source");
3520 }
3521 }
3522
3523 #[test]
3524 fn test_trim_selects_false() {
3525 let expr = parse("SELECT a, b, c FROM t");
3526 let node = lineage("a", &expr, None, false).unwrap();
3527
3528 if let Expression::Select(select) = &node.source {
3530 assert_eq!(
3531 select.expressions.len(),
3532 3,
3533 "Untrimmed source should have 3 expressions"
3534 );
3535 } else {
3536 panic!("Expected Select source");
3537 }
3538 }
3539
3540 #[test]
3541 fn test_lineage_expression_in_select() {
3542 let expr = parse("SELECT a + b AS c FROM t");
3543 let node = lineage("c", &expr, None, false).unwrap();
3544
3545 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3547 assert!(
3548 all_names.len() >= 3,
3549 "Expected to trace a + b to both columns, got: {:?}",
3550 all_names
3551 );
3552 }
3553
3554 #[test]
3555 fn test_set_operation_by_index() {
3556 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3557
3558 let node = lineage("a", &expr, None, false).unwrap();
3560
3561 assert_eq!(node.downstream.len(), 2);
3563 }
3564
3565 fn print_node(node: &LineageNode, indent: usize) {
3568 let pad = " ".repeat(indent);
3569 println!(
3570 "{pad}name={:?} source_name={:?}",
3571 node.name, node.source_name
3572 );
3573 for child in &node.downstream {
3574 print_node(child, indent + 1);
3575 }
3576 }
3577
3578 #[test]
3579 fn test_issue18_repro() {
3580 let query = "SELECT UPPER(name) as upper_name FROM users";
3582 println!("Query: {query}\n");
3583
3584 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3585 let exprs = dialect.parse(query).unwrap();
3586 let expr = &exprs[0];
3587
3588 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3589 println!("lineage(\"upper_name\"):");
3590 print_node(&node, 1);
3591
3592 let names = node.downstream_names();
3593 assert!(
3594 names.iter().any(|n| n == "users.name"),
3595 "Expected users.name in downstream, got: {:?}",
3596 names
3597 );
3598 }
3599
3600 #[test]
3601 fn test_lineage_bigquery_safe_namespace_issue207() {
3602 let query = r#"
3603WITH import_cte AS (
3604 SELECT timestamp, data, operation
3605 FROM `project`.`dataset`.`source_table`
3606),
3607transform_cte AS (
3608 SELECT
3609 timestamp,
3610 SAFE.PARSE_JSON(data) AS json_data
3611 FROM import_cte
3612)
3613SELECT json_data FROM transform_cte
3614"#;
3615 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3616 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3617 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3618 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3619
3620 assert!(
3621 names.iter().any(|name| name == "source_table.data"),
3622 "expected source_table.data in lineage, got {names:?}"
3623 );
3624 assert!(
3625 !names
3626 .iter()
3627 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3628 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3629 );
3630 }
3631
3632 #[test]
3633 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3634 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3635 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3636 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3637 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3638
3639 assert!(
3640 names.iter().any(|name| name == "t.data"),
3641 "expected t.data in lineage, got {names:?}"
3642 );
3643 assert!(
3644 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3645 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3646 );
3647 }
3648
3649 #[test]
3650 fn test_lineage_method_call_receiver_control() {
3651 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3652 let node = lineage("out", &expr, None, false).expect("lineage");
3653 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3654
3655 assert!(
3656 names.iter().any(|name| name == "t.obj"),
3657 "expected ordinary method receiver to remain in lineage, got {names:?}"
3658 );
3659 assert!(
3660 names.iter().any(|name| name == "t.arg"),
3661 "expected method argument in lineage, got {names:?}"
3662 );
3663 }
3664
3665 #[test]
3666 fn test_lineage_upper_function() {
3667 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3668 let node = lineage("upper_name", &expr, None, false).unwrap();
3669
3670 let names = node.downstream_names();
3671 assert!(
3672 names.iter().any(|n| n == "users.name"),
3673 "Expected users.name in downstream, got: {:?}",
3674 names
3675 );
3676 }
3677
3678 #[test]
3679 fn test_lineage_round_function() {
3680 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3681 let node = lineage("rounded", &expr, None, false).unwrap();
3682
3683 let names = node.downstream_names();
3684 assert!(
3685 names.iter().any(|n| n == "products.price"),
3686 "Expected products.price in downstream, got: {:?}",
3687 names
3688 );
3689 }
3690
3691 #[test]
3692 fn test_lineage_coalesce_function() {
3693 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3694 let node = lineage("val", &expr, None, false).unwrap();
3695
3696 let names = node.downstream_names();
3697 assert!(
3698 names.iter().any(|n| n == "t.a"),
3699 "Expected t.a in downstream, got: {:?}",
3700 names
3701 );
3702 assert!(
3703 names.iter().any(|n| n == "t.b"),
3704 "Expected t.b in downstream, got: {:?}",
3705 names
3706 );
3707 }
3708
3709 #[test]
3710 fn test_lineage_count_function() {
3711 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3712 let node = lineage("cnt", &expr, None, false).unwrap();
3713
3714 let names = node.downstream_names();
3715 assert!(
3716 names.iter().any(|n| n == "t.id"),
3717 "Expected t.id in downstream, got: {:?}",
3718 names
3719 );
3720 }
3721
3722 #[test]
3723 fn test_lineage_sum_function() {
3724 let expr = parse("SELECT SUM(amount) AS total FROM t");
3725 let node = lineage("total", &expr, None, false).unwrap();
3726
3727 let names = node.downstream_names();
3728 assert!(
3729 names.iter().any(|n| n == "t.amount"),
3730 "Expected t.amount in downstream, got: {:?}",
3731 names
3732 );
3733 }
3734
3735 #[test]
3736 fn test_lineage_case_with_nested_functions() {
3737 let expr =
3738 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3739 let node = lineage("result", &expr, None, false).unwrap();
3740
3741 let names = node.downstream_names();
3742 assert!(
3743 names.iter().any(|n| n == "t.x"),
3744 "Expected t.x in downstream, got: {:?}",
3745 names
3746 );
3747 assert!(
3748 names.iter().any(|n| n == "t.name"),
3749 "Expected t.name in downstream, got: {:?}",
3750 names
3751 );
3752 }
3753
3754 #[test]
3755 fn test_lineage_substring_function() {
3756 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3757 let node = lineage("short", &expr, None, false).unwrap();
3758
3759 let names = node.downstream_names();
3760 assert!(
3761 names.iter().any(|n| n == "t.name"),
3762 "Expected t.name in downstream, got: {:?}",
3763 names
3764 );
3765 }
3766
3767 #[test]
3770 fn test_lineage_cte_select_star() {
3771 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3775 let node = lineage("a", &expr, None, false).unwrap();
3776
3777 assert_eq!(node.name, "a");
3778 assert!(
3781 !node.downstream.is_empty(),
3782 "Expected downstream nodes tracing through CTE, got none"
3783 );
3784 }
3785
3786 #[test]
3787 fn test_lineage_cte_select_star_renamed_column() {
3788 let expr =
3791 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3792 let node = lineage("customer_id", &expr, None, false).unwrap();
3793
3794 assert_eq!(node.name, "customer_id");
3795 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3797 assert!(
3798 all_names.len() >= 2,
3799 "Expected at least 2 nodes (customer_id → source), got: {:?}",
3800 all_names
3801 );
3802 }
3803
3804 #[test]
3805 fn test_lineage_cte_select_star_multiple_columns() {
3806 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3808
3809 for col in &["a", "b", "c"] {
3810 let node = lineage(col, &expr, None, false).unwrap();
3811 assert_eq!(node.name, *col);
3812 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3814 assert!(
3815 all_names.len() >= 2,
3816 "Expected at least 2 nodes for column {}, got: {:?}",
3817 col,
3818 all_names
3819 );
3820 }
3821 }
3822
3823 #[test]
3824 fn test_lineage_nested_cte_select_star() {
3825 let expr = parse(
3827 "WITH cte1 AS (SELECT a FROM t), \
3828 cte2 AS (SELECT * FROM cte1) \
3829 SELECT * FROM cte2",
3830 );
3831 let node = lineage("a", &expr, None, false).unwrap();
3832
3833 assert_eq!(node.name, "a");
3834 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3835 assert!(
3836 all_names.len() >= 3,
3837 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3838 all_names
3839 );
3840 }
3841
3842 #[test]
3843 fn test_lineage_three_level_nested_cte_star() {
3844 let expr = parse(
3846 "WITH cte1 AS (SELECT x FROM t), \
3847 cte2 AS (SELECT * FROM cte1), \
3848 cte3 AS (SELECT * FROM cte2) \
3849 SELECT * FROM cte3",
3850 );
3851 let node = lineage("x", &expr, None, false).unwrap();
3852
3853 assert_eq!(node.name, "x");
3854 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3855 assert!(
3856 all_names.len() >= 4,
3857 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3858 all_names
3859 );
3860 }
3861
3862 #[test]
3863 fn test_lineage_cte_union_star() {
3864 let expr = parse(
3866 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3867 SELECT * FROM cte",
3868 );
3869 let node = lineage("a", &expr, None, false).unwrap();
3870
3871 assert_eq!(node.name, "a");
3872 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3873 assert!(
3874 all_names.len() >= 2,
3875 "Expected at least 2 nodes for CTE union star, got: {:?}",
3876 all_names
3877 );
3878 }
3879
3880 #[test]
3881 fn test_lineage_cte_star_unknown_table() {
3882 let expr = parse(
3885 "WITH cte AS (SELECT * FROM unknown_table) \
3886 SELECT * FROM cte",
3887 );
3888 let _result = lineage("x", &expr, None, false);
3891 }
3892
3893 #[test]
3894 fn test_lineage_cte_explicit_columns() {
3895 let expr = parse(
3897 "WITH cte(x, y) AS (SELECT a, b FROM t) \
3898 SELECT * FROM cte",
3899 );
3900 let node = lineage("x", &expr, None, false).unwrap();
3901 assert_eq!(node.name, "x");
3902 }
3903
3904 #[test]
3905 fn test_lineage_cte_qualified_star() {
3906 let expr = parse(
3908 "WITH cte AS (SELECT a, b FROM t) \
3909 SELECT cte.* FROM cte",
3910 );
3911 for col in &["a", "b"] {
3912 let node = lineage(col, &expr, None, false).unwrap();
3913 assert_eq!(node.name, *col);
3914 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3915 assert!(
3916 all_names.len() >= 2,
3917 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3918 col,
3919 all_names
3920 );
3921 }
3922 }
3923
3924 #[test]
3925 fn test_lineage_subquery_select_star() {
3926 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3929 let node = lineage("x", &expr, None, false).unwrap();
3930
3931 assert_eq!(node.name, "x");
3932 assert!(
3933 !node.downstream.is_empty(),
3934 "Expected downstream nodes for subquery with SELECT *, got none"
3935 );
3936 }
3937
3938 #[test]
3939 fn test_lineage_cte_star_with_schema_external_table() {
3940 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3942SELECT * FROM orders"#;
3943 let expr = parse(sql);
3944
3945 let mut schema = MappingSchema::new();
3946 let cols = vec![
3947 ("order_id".to_string(), DataType::Unknown),
3948 ("customer_id".to_string(), DataType::Unknown),
3949 ("amount".to_string(), DataType::Unknown),
3950 ];
3951 schema.add_table("stg_orders", &cols, None).unwrap();
3952
3953 let node =
3954 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3955 .unwrap();
3956 assert_eq!(node.name, "order_id");
3957 }
3958
3959 #[test]
3960 fn test_lineage_cte_star_with_schema_three_part_name() {
3961 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3963SELECT * FROM orders"#;
3964 let expr = parse(sql);
3965
3966 let mut schema = MappingSchema::new();
3967 let cols = vec![
3968 ("order_id".to_string(), DataType::Unknown),
3969 ("customer_id".to_string(), DataType::Unknown),
3970 ];
3971 schema
3972 .add_table("db.schema.stg_orders", &cols, None)
3973 .unwrap();
3974
3975 let node = lineage_with_schema(
3976 "customer_id",
3977 &expr,
3978 Some(&schema as &dyn Schema),
3979 None,
3980 false,
3981 )
3982 .unwrap();
3983 assert_eq!(node.name, "customer_id");
3984 }
3985
3986 #[test]
3987 fn test_lineage_cte_star_with_schema_nested() {
3988 let sql = r#"WITH
3991 raw AS (SELECT * FROM external_table),
3992 enriched AS (SELECT * FROM raw)
3993 SELECT * FROM enriched"#;
3994 let expr = parse(sql);
3995
3996 let mut schema = MappingSchema::new();
3997 let cols = vec![
3998 ("id".to_string(), DataType::Unknown),
3999 ("name".to_string(), DataType::Unknown),
4000 ];
4001 schema.add_table("external_table", &cols, None).unwrap();
4002
4003 let node =
4004 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4005 assert_eq!(node.name, "name");
4006 }
4007
4008 #[test]
4009 fn test_lineage_cte_qualified_star_with_schema() {
4010 let sql = r#"WITH
4013 orders AS (SELECT * FROM stg_orders),
4014 enriched AS (
4015 SELECT orders.*, 'extra' AS extra
4016 FROM orders
4017 )
4018 SELECT * FROM enriched"#;
4019 let expr = parse(sql);
4020
4021 let mut schema = MappingSchema::new();
4022 let cols = vec![
4023 ("order_id".to_string(), DataType::Unknown),
4024 ("total".to_string(), DataType::Unknown),
4025 ];
4026 schema.add_table("stg_orders", &cols, None).unwrap();
4027
4028 let node =
4029 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4030 .unwrap();
4031 assert_eq!(node.name, "order_id");
4032
4033 let extra =
4035 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4036 assert_eq!(extra.name, "extra");
4037 }
4038
4039 #[test]
4040 fn test_lineage_cte_star_without_schema_still_works() {
4041 let sql = r#"WITH
4043 cte1 AS (SELECT id, name FROM raw_table),
4044 cte2 AS (SELECT * FROM cte1)
4045 SELECT * FROM cte2"#;
4046 let expr = parse(sql);
4047
4048 let node = lineage("id", &expr, None, false).unwrap();
4050 assert_eq!(node.name, "id");
4051 }
4052
4053 #[test]
4054 fn test_lineage_nested_cte_star_with_join_and_schema() {
4055 let sql = r#"WITH
4058base_orders AS (
4059 SELECT * FROM stg_orders
4060),
4061with_payments AS (
4062 SELECT
4063 base_orders.*,
4064 p.amount
4065 FROM base_orders
4066 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4067),
4068final_cte AS (
4069 SELECT * FROM with_payments
4070)
4071SELECT * FROM final_cte"#;
4072 let expr = parse(sql);
4073
4074 let mut schema = MappingSchema::new();
4075 let order_cols = vec![
4076 (
4077 "order_id".to_string(),
4078 crate::expressions::DataType::Unknown,
4079 ),
4080 (
4081 "customer_id".to_string(),
4082 crate::expressions::DataType::Unknown,
4083 ),
4084 ("status".to_string(), crate::expressions::DataType::Unknown),
4085 ];
4086 let pay_cols = vec![
4087 (
4088 "payment_id".to_string(),
4089 crate::expressions::DataType::Unknown,
4090 ),
4091 (
4092 "order_id".to_string(),
4093 crate::expressions::DataType::Unknown,
4094 ),
4095 ("amount".to_string(), crate::expressions::DataType::Unknown),
4096 ];
4097 schema.add_table("stg_orders", &order_cols, None).unwrap();
4098 schema.add_table("stg_payments", &pay_cols, None).unwrap();
4099
4100 let node =
4102 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4103 .unwrap();
4104 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4105
4106 let has_table_qualified = all_names
4108 .iter()
4109 .any(|n| n.contains('.') && n.contains("order_id"));
4110 assert!(
4111 has_table_qualified,
4112 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4113 all_names
4114 );
4115
4116 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4118 .unwrap();
4119 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4120
4121 let has_table_qualified = all_names
4122 .iter()
4123 .any(|n| n.contains('.') && n.contains("amount"));
4124 assert!(
4125 has_table_qualified,
4126 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4127 all_names
4128 );
4129 }
4130
4131 #[test]
4132 fn test_lineage_cte_alias_resolution() {
4133 let sql = r#"WITH import_stg_items AS (
4135 SELECT item_id, name, status FROM stg_items
4136)
4137SELECT base.item_id, base.status
4138FROM import_stg_items AS base"#;
4139 let expr = parse(sql);
4140
4141 let node = lineage("item_id", &expr, None, false).unwrap();
4142 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4143 assert!(
4145 all_names.iter().any(|n| n == "stg_items.item_id"),
4146 "Expected leaf 'stg_items.item_id', got: {:?}",
4147 all_names
4148 );
4149 }
4150
4151 #[test]
4152 fn test_lineage_cte_alias_with_schema_and_star() {
4153 let sql = r#"WITH import_stg AS (
4155 SELECT * FROM stg_items
4156)
4157SELECT base.item_id, base.status
4158FROM import_stg AS base"#;
4159 let expr = parse(sql);
4160
4161 let mut schema = MappingSchema::new();
4162 schema
4163 .add_table(
4164 "stg_items",
4165 &[
4166 ("item_id".to_string(), DataType::Unknown),
4167 ("name".to_string(), DataType::Unknown),
4168 ("status".to_string(), DataType::Unknown),
4169 ],
4170 None,
4171 )
4172 .unwrap();
4173
4174 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
4175 .unwrap();
4176 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4177 assert!(
4178 all_names.iter().any(|n| n == "stg_items.item_id"),
4179 "Expected leaf 'stg_items.item_id', got: {:?}",
4180 all_names
4181 );
4182 }
4183
4184 #[test]
4185 fn test_lineage_cte_alias_with_join() {
4186 let sql = r#"WITH
4188 import_users AS (SELECT id, name FROM users),
4189 import_orders AS (SELECT id, user_id, amount FROM orders)
4190SELECT u.name, o.amount
4191FROM import_users AS u
4192LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
4193 let expr = parse(sql);
4194
4195 let node = lineage("name", &expr, None, false).unwrap();
4196 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4197 assert!(
4198 all_names.iter().any(|n| n == "users.name"),
4199 "Expected leaf 'users.name', got: {:?}",
4200 all_names
4201 );
4202
4203 let node = lineage("amount", &expr, None, false).unwrap();
4204 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4205 assert!(
4206 all_names.iter().any(|n| n == "orders.amount"),
4207 "Expected leaf 'orders.amount', got: {:?}",
4208 all_names
4209 );
4210 }
4211
4212 #[test]
4217 fn test_lineage_unquoted_cte_case_insensitive() {
4218 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
4221 let node = lineage("col", &expr, None, false).unwrap();
4222 assert_eq!(node.name, "col");
4223 assert!(
4224 !node.downstream.is_empty(),
4225 "Unquoted CTE should resolve case-insensitively"
4226 );
4227 }
4228
4229 #[test]
4230 fn test_lineage_quoted_cte_case_preserved() {
4231 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
4233 let node = lineage("col", &expr, None, false).unwrap();
4234 assert_eq!(node.name, "col");
4235 assert!(
4236 !node.downstream.is_empty(),
4237 "Quoted CTE with matching case should resolve"
4238 );
4239 }
4240
4241 #[test]
4242 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
4243 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
4247 let result = lineage("col", &expr, None, false);
4250 assert!(
4251 result.is_err(),
4252 "Quoted CTE with case mismatch should not expand star: {:?}",
4253 result
4254 );
4255 }
4256
4257 #[test]
4258 fn test_lineage_mixed_quoted_unquoted_cte() {
4259 let expr = parse(
4261 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
4262 );
4263 let node = lineage("a", &expr, None, false).unwrap();
4264 assert_eq!(node.name, "a");
4265 assert!(
4266 !node.downstream.is_empty(),
4267 "Mixed quoted/unquoted CTE chain should resolve"
4268 );
4269 }
4270
4271 #[test]
4287 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
4288 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
4299 let node = lineage("col", &expr, None, false).unwrap();
4300 assert!(!node.downstream.is_empty());
4301 let child = &node.downstream[0];
4302 assert_eq!(
4304 child.source_name, "MyCte",
4305 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4306 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4307 );
4308 }
4309
4310 #[test]
4311 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4312 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4319 let node = lineage("col", &expr, None, false).unwrap();
4320 assert!(!node.downstream.is_empty());
4321 let child = &node.downstream[0];
4322 assert_eq!(
4324 child.source_name, "MyCte",
4325 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4326 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4327 );
4328 }
4329
4330 #[test]
4337 #[ignore = "requires derived table star expansion (separate issue)"]
4338 fn test_node_name_doesnt_contain_comment() {
4339 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4340 let node = lineage("x", &expr, None, false).unwrap();
4341
4342 assert_eq!(node.name, "x");
4343 assert!(!node.downstream.is_empty());
4344 }
4345
4346 #[test]
4350 fn test_comment_before_first_column_in_cte() {
4351 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
4352 let sql_without_comment = "with t as (select 1 as a) select a from t";
4353
4354 let expr_ok = parse(sql_without_comment);
4356 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4357
4358 let expr_comment = parse(sql_with_comment);
4360 let node_comment = lineage("a", &expr_comment, None, false)
4361 .expect("with comment before first column should succeed");
4362
4363 assert_eq!(node_ok.name, node_comment.name, "node names should match");
4364 assert_eq!(
4365 node_ok.downstream_names(),
4366 node_comment.downstream_names(),
4367 "downstream lineage should be identical with or without comment"
4368 );
4369 }
4370
4371 #[test]
4373 fn test_block_comment_before_first_column() {
4374 let sql = "with t as (select 1 as a) select /* section */ a from t";
4375 let expr = parse(sql);
4376 let node = lineage("a", &expr, None, false)
4377 .expect("block comment before first column should succeed");
4378 assert_eq!(node.name, "a");
4379 assert!(
4380 !node.downstream.is_empty(),
4381 "should have downstream lineage"
4382 );
4383 }
4384
4385 #[test]
4387 fn test_comment_before_first_column_second_col_ok() {
4388 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
4389 let expr = parse(sql);
4390
4391 let node_a =
4392 lineage("a", &expr, None, false).expect("column a with comment should succeed");
4393 assert_eq!(node_a.name, "a");
4394
4395 let node_b =
4396 lineage("b", &expr, None, false).expect("column b with comment should succeed");
4397 assert_eq!(node_b.name, "b");
4398 }
4399
4400 #[test]
4402 fn test_comment_before_aliased_column() {
4403 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
4404 let expr = parse(sql);
4405 let node =
4406 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4407 assert_eq!(node.name, "y");
4408 assert!(
4409 !node.downstream.is_empty(),
4410 "aliased column should have downstream lineage"
4411 );
4412 }
4413}