1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub source_kind: SourceKind,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub source_alias: Option<String>,
37 pub reference_node_name: String,
39}
40
41impl LineageNode {
42 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44 Self {
45 name: name.into(),
46 expression,
47 source,
48 downstream: Vec::new(),
49 source_name: String::new(),
50 source_kind: SourceKind::Unknown,
51 source_alias: None,
52 reference_node_name: String::new(),
53 }
54 }
55
56 pub fn walk(&self) -> LineageWalker<'_> {
58 LineageWalker { stack: vec![self] }
59 }
60
61 pub fn downstream_names(&self) -> Vec<String> {
63 self.downstream.iter().map(|n| n.name.clone()).collect()
64 }
65}
66
67fn source_kind_for_scope_context(
68 scope: &Scope,
69 source_name: &str,
70 reference_node_name: &str,
71) -> SourceKind {
72 if source_name.is_empty() && reference_node_name.is_empty() {
73 return SourceKind::Root;
74 }
75 if let Some(source_info) = scope.sources.get(source_name) {
76 return source_info.kind;
77 }
78 if scope.cte_sources.contains_key(source_name) {
79 return SourceKind::Cte;
80 }
81 match scope.scope_type {
82 ScopeType::Cte => SourceKind::Cte,
83 ScopeType::DerivedTable => SourceKind::DerivedTable,
84 ScopeType::Udtf => SourceKind::Virtual,
85 _ => SourceKind::Unknown,
86 }
87}
88
89fn apply_scope_context(
90 node: &mut LineageNode,
91 scope: &Scope,
92 source_name: &str,
93 reference_node_name: &str,
94) {
95 node.source_name = source_name.to_string();
96 node.reference_node_name = reference_node_name.to_string();
97 node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100pub struct LineageWalker<'a> {
102 stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106 type Item = &'a LineageNode;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 if let Some(node) = self.stack.pop() {
110 for child in node.downstream.iter().rev() {
112 self.stack.push(child);
113 }
114 Some(node)
115 } else {
116 None
117 }
118 }
119}
120
121enum ColumnRef<'a> {
127 Name(&'a str),
128 Index(usize),
129}
130
131pub fn lineage(
157 column: &str,
158 sql: &Expression,
159 dialect: Option<DialectType>,
160 trim_selects: bool,
161) -> Result<LineageNode> {
162 let effective_sql = lineage_effective_expression(sql);
164 let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165 if !has_with {
166 return lineage_from_expression(column, sql, dialect, trim_selects);
167 }
168 let mut owned = sql.clone();
169 expand_cte_stars(&mut owned, None);
170 lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173pub fn lineage_with_schema(
189 column: &str,
190 sql: &Expression,
191 schema: Option<&dyn Schema>,
192 dialect: Option<DialectType>,
193 trim_selects: bool,
194) -> Result<LineageNode> {
195 let mut qualified_expression = if let Some(schema) = schema {
196 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197 QualifyColumnsOptions::new().with_dialect(dialect_type)
198 } else {
199 QualifyColumnsOptions::new()
200 };
201
202 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203 Error::internal(format!("Lineage qualification failed with schema: {}", e))
204 })?
205 } else {
206 sql.clone()
207 };
208
209 annotate_types(&mut qualified_expression, schema, dialect);
211
212 expand_cte_stars(&mut qualified_expression, schema);
215
216 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220 column: &str,
221 sql: &Expression,
222 dialect: Option<DialectType>,
223 trim_selects: bool,
224) -> Result<LineageNode> {
225 let sql = lineage_effective_expression(sql);
226 let scope = build_scope(sql);
227 to_node(
228 ColumnRef::Name(column),
229 &scope,
230 dialect,
231 "",
232 "",
233 "",
234 trim_selects,
235 )
236}
237
238pub(crate) fn lineage_by_index_from_expression(
239 column_index: usize,
240 sql: &Expression,
241 dialect: Option<DialectType>,
242 trim_selects: bool,
243) -> Result<LineageNode> {
244 let sql = lineage_effective_expression(sql);
245 let scope = build_scope(sql);
246 to_node(
247 ColumnRef::Index(column_index),
248 &scope,
249 dialect,
250 "",
251 "",
252 "",
253 trim_selects,
254 )
255}
256
257fn lineage_effective_expression(sql: &Expression) -> &Expression {
258 match sql {
259 Expression::Prepare(prepare) => &prepare.statement,
260 _ => sql,
261 }
262}
263
264fn normalize_cte_name(ident: &Identifier) -> String {
274 if ident.quoted {
275 ident.name.clone()
276 } else {
277 ident.name.to_lowercase()
278 }
279}
280
281pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
293 if let Expression::Prepare(prepare) = expr {
294 expand_cte_stars(&mut prepare.statement, schema);
295 return;
296 }
297
298 let select = match expr {
299 Expression::Select(s) => s,
300 _ => return,
301 };
302
303 let with = match &mut select.with {
304 Some(w) => w,
305 None => return,
306 };
307
308 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
309
310 for cte in &mut with.ctes {
311 let cte_name = normalize_cte_name(&cte.alias);
312
313 if !cte.columns.is_empty() {
315 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
316 resolved_cte_columns.insert(cte_name, cols);
317 continue;
318 }
319
320 if with.recursive {
326 let is_self_referencing =
327 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
328 let body_sources = get_select_sources(body_select);
329 body_sources.iter().any(|s| s.normalized == cte_name)
330 } else {
331 false
332 };
333 if is_self_referencing {
334 continue;
335 }
336 }
337
338 let body_select = match get_leftmost_select_mut(&mut cte.this) {
340 Some(s) => s,
341 None => continue,
342 };
343
344 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
345 resolved_cte_columns.insert(cte_name, columns);
346 }
347
348 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
350}
351
352fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
357 let mut current = expr;
358 for _ in 0..MAX_LINEAGE_DEPTH {
359 match current {
360 Expression::Select(s) => return Some(s),
361 Expression::Union(u) => current = &mut u.left,
362 Expression::Intersect(i) => current = &mut i.left,
363 Expression::Except(e) => current = &mut e.left,
364 Expression::Paren(p) => current = &mut p.this,
365 _ => return None,
366 }
367 }
368 None
369}
370
371fn rewrite_stars_in_select(
375 select: &mut Select,
376 resolved_ctes: &HashMap<String, Vec<String>>,
377 schema: Option<&dyn Schema>,
378) -> Vec<String> {
379 let has_star = select
384 .expressions
385 .iter()
386 .any(|e| matches!(e, Expression::Star(_)));
387 let has_qualified_star = select
388 .expressions
389 .iter()
390 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
391
392 if !has_star && !has_qualified_star {
393 return select
395 .expressions
396 .iter()
397 .filter_map(get_expression_output_name)
398 .collect();
399 }
400
401 let sources = get_select_sources(select);
402 let mut new_expressions = Vec::new();
403 let mut result_columns = Vec::new();
404
405 for expr in &select.expressions {
406 match expr {
407 Expression::Star(star) => {
408 let qual = star.table.as_ref();
409 if let Some(expanded) =
410 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
411 {
412 for (src_alias, col_name) in &expanded {
413 let table_id = Identifier::new(src_alias);
414 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
415 result_columns.push(col_name.clone());
416 }
417 } else {
418 new_expressions.push(expr.clone());
419 result_columns.push("*".to_string());
420 }
421 }
422 Expression::Column(c) if c.name.name == "*" => {
423 let qual = c.table.as_ref();
424 if let Some(expanded) =
425 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
426 {
427 for (_src_alias, col_name) in &expanded {
428 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
430 result_columns.push(col_name.clone());
431 }
432 } else {
433 new_expressions.push(expr.clone());
434 result_columns.push("*".to_string());
435 }
436 }
437 _ => {
438 new_expressions.push(expr.clone());
439 if let Some(name) = get_expression_output_name(expr) {
440 result_columns.push(name);
441 }
442 }
443 }
444 }
445
446 select.expressions = new_expressions;
447 result_columns
448}
449
450fn expand_star_from_sources(
455 qualifier: Option<&Identifier>,
456 sources: &[SourceInfo],
457 resolved_ctes: &HashMap<String, Vec<String>>,
458 schema: Option<&dyn Schema>,
459) -> Option<Vec<(String, String)>> {
460 let mut expanded = Vec::new();
461
462 if let Some(qual) = qualifier {
463 let qual_normalized = normalize_cte_name(qual);
465 for src in sources {
466 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
467 if let Some(cols) = resolved_ctes.get(&src.normalized) {
469 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
470 return Some(expanded);
471 }
472 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
474 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
475 return Some(expanded);
476 }
477 }
478 }
479 None
480 } else {
481 let mut any_expanded = false;
487 for src in sources {
488 if let Some(cols) = resolved_ctes.get(&src.normalized) {
489 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
490 any_expanded = true;
491 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
492 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
493 any_expanded = true;
494 } else {
495 return None;
496 }
497 }
498 if any_expanded {
499 Some(expanded)
500 } else {
501 None
502 }
503 }
504}
505
506fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
508 let schema = schema?;
509 if fq_name.is_empty() {
510 return None;
511 }
512 schema
513 .column_names(fq_name)
514 .ok()
515 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
516}
517
518fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
520 Expression::Column(Box::new(crate::expressions::Column {
521 name: Identifier::new(name),
522 table: table.cloned(),
523 join_mark: false,
524 trailing_comments: Vec::new(),
525 span: None,
526 inferred_type: None,
527 }))
528}
529
530fn get_expression_output_name(expr: &Expression) -> Option<String> {
532 match expr {
533 Expression::Alias(a) => Some(a.alias.name.clone()),
534 Expression::Column(c) => Some(c.name.name.clone()),
535 Expression::Identifier(id) => Some(id.name.clone()),
536 Expression::Star(_) => Some("*".to_string()),
537 _ => None,
538 }
539}
540
541struct SourceInfo {
543 alias: String,
544 normalized: String,
546 fq_name: String,
548}
549
550fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
553 let mut sources = Vec::new();
554
555 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
556 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
557 SourceInfo {
558 alias: alias.name.clone(),
559 normalized: normalize_cte_name(alias),
560 fq_name: alias.name.clone(),
561 }
562 }
563
564 fn named_virtual_source_info(alias: &str) -> SourceInfo {
565 SourceInfo {
566 alias: alias.to_string(),
567 normalized: alias.to_lowercase(),
568 fq_name: alias.to_string(),
569 }
570 }
571
572 match expr {
573 Expression::Table(t) => {
574 let normalized = normalize_cte_name(&t.name);
575 let alias = t
576 .alias
577 .as_ref()
578 .map(|a| a.name.clone())
579 .unwrap_or_else(|| t.name.name.clone());
580 let mut parts = Vec::new();
581 if let Some(catalog) = &t.catalog {
582 parts.push(catalog.name.clone());
583 }
584 if let Some(schema) = &t.schema {
585 parts.push(schema.name.clone());
586 }
587 parts.push(t.name.name.clone());
588 let fq_name = parts.join(".");
589 Some(SourceInfo {
590 alias,
591 normalized,
592 fq_name,
593 })
594 }
595 Expression::Subquery(s) => {
596 let alias = s.alias.as_ref()?.name.clone();
597 let normalized = alias.to_lowercase();
598 let fq_name = alias.clone();
599 Some(SourceInfo {
600 alias,
601 normalized,
602 fq_name,
603 })
604 }
605 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
606 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
607 Some(virtual_source_info(&a.alias))
608 }
609 Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
610 Expression::LateralView(lateral_view) => lateral_view
611 .table_alias
612 .as_ref()
613 .or_else(|| lateral_view.column_aliases.first())
614 .map(virtual_source_info),
615 Expression::Paren(p) => extract_source(&p.this),
616 _ => None,
617 }
618 }
619
620 if let Some(from) = &select.from {
621 for expr in &from.expressions {
622 if let Some(info) = extract_source(expr) {
623 sources.push(info);
624 }
625 }
626 }
627 for join in &select.joins {
628 if let Some(info) = extract_source(&join.this) {
629 sources.push(info);
630 }
631 }
632 for lateral_view in &select.lateral_views {
633 if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
634 {
635 sources.push(info);
636 }
637 }
638 sources
639}
640
641pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
643 let mut tables = HashSet::new();
644 collect_source_tables(node, &mut tables);
645 tables
646}
647
648pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
650 if let Expression::Table(table) = &node.source {
651 tables.insert(table.name.name.clone());
652 }
653 for child in &node.downstream {
654 collect_source_tables(child, tables);
655 }
656}
657
658const MAX_LINEAGE_DEPTH: usize = 64;
665
666fn to_node(
668 column: ColumnRef<'_>,
669 scope: &Scope,
670 dialect: Option<DialectType>,
671 scope_name: &str,
672 source_name: &str,
673 reference_node_name: &str,
674 trim_selects: bool,
675) -> Result<LineageNode> {
676 to_node_inner(
677 column,
678 scope,
679 dialect,
680 scope_name,
681 source_name,
682 reference_node_name,
683 trim_selects,
684 &[],
685 0,
686 )
687}
688
689fn to_node_inner(
690 column: ColumnRef<'_>,
691 scope: &Scope,
692 dialect: Option<DialectType>,
693 scope_name: &str,
694 source_name: &str,
695 reference_node_name: &str,
696 trim_selects: bool,
697 ancestor_cte_scopes: &[Scope],
698 depth: usize,
699) -> Result<LineageNode> {
700 if depth > MAX_LINEAGE_DEPTH {
701 return Err(Error::internal(format!(
702 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
703 )));
704 }
705 let scope_expr = &scope.expression;
706
707 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
709 for s in ancestor_cte_scopes {
710 all_cte_scopes.push(s);
711 }
712
713 let effective_expr = match scope_expr {
716 Expression::Cte(cte) => &cte.this,
717 other => other,
718 };
719
720 if matches!(
722 effective_expr,
723 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
724 ) {
725 if matches!(scope_expr, Expression::Cte(_)) {
727 let mut inner_scope = Scope::new(effective_expr.clone());
728 inner_scope.union_scopes = scope.union_scopes.clone();
729 inner_scope.sources = scope.sources.clone();
730 inner_scope.cte_sources = scope.cte_sources.clone();
731 inner_scope.cte_scopes = scope.cte_scopes.clone();
732 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
733 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
734 return handle_set_operation(
735 &column,
736 &inner_scope,
737 dialect,
738 scope_name,
739 source_name,
740 reference_node_name,
741 trim_selects,
742 ancestor_cte_scopes,
743 depth,
744 );
745 }
746 return handle_set_operation(
747 &column,
748 scope,
749 dialect,
750 scope_name,
751 source_name,
752 reference_node_name,
753 trim_selects,
754 ancestor_cte_scopes,
755 depth,
756 );
757 }
758
759 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
761 let column_name = resolve_column_name(&column, &select_expr);
762
763 let node_source = if trim_selects {
765 trim_source(effective_expr, &select_expr)
766 } else {
767 effective_expr.clone()
768 };
769
770 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
772 apply_scope_context(&mut node, scope, source_name, reference_node_name);
773
774 if matches!(&select_expr, Expression::Star(_)) {
776 for (name, source_info) in &scope.sources {
777 let mut child = LineageNode::new(
778 format!("{}.*", name),
779 Expression::Star(crate::expressions::Star {
780 table: None,
781 except: None,
782 replace: None,
783 rename: None,
784 trailing_comments: vec![],
785 span: None,
786 }),
787 source_info.expression.clone(),
788 );
789 apply_source_info_context(&mut child, name, source_info);
790 node.downstream.push(child);
791 }
792 return Ok(node);
793 }
794
795 let subqueries: Vec<&Expression> =
797 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
798 for sq_expr in subqueries {
799 if let Expression::Subquery(sq) = sq_expr {
800 for sq_scope in &scope.subquery_scopes {
801 if sq_scope.expression == sq.this {
802 if let Ok(child) = to_node_inner(
803 ColumnRef::Index(0),
804 sq_scope,
805 dialect,
806 &column_name,
807 "",
808 "",
809 trim_selects,
810 ancestor_cte_scopes,
811 depth + 1,
812 ) {
813 node.downstream.push(child);
814 }
815 break;
816 }
817 }
818 }
819 }
820
821 let col_refs = find_column_refs_in_expr(&select_expr, dialect);
823 for col_ref in col_refs {
824 let col_name = &col_ref.column;
825 if let Some(ref table_id) = col_ref.table {
826 let tbl = &table_id.name;
827 resolve_qualified_column(
828 &mut node,
829 scope,
830 dialect,
831 tbl,
832 col_name,
833 &column_name,
834 trim_selects,
835 &all_cte_scopes,
836 depth,
837 );
838 } else {
839 resolve_unqualified_column(
840 &mut node,
841 scope,
842 dialect,
843 col_name,
844 &column_name,
845 trim_selects,
846 &all_cte_scopes,
847 depth,
848 );
849 }
850 }
851
852 Ok(node)
853}
854
855fn handle_set_operation(
860 column: &ColumnRef<'_>,
861 scope: &Scope,
862 dialect: Option<DialectType>,
863 scope_name: &str,
864 source_name: &str,
865 reference_node_name: &str,
866 trim_selects: bool,
867 ancestor_cte_scopes: &[Scope],
868 depth: usize,
869) -> Result<LineageNode> {
870 let scope_expr = &scope.expression;
871
872 let col_index = match column {
874 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
875 ColumnRef::Index(i) => *i,
876 };
877
878 let col_name = match column {
879 ColumnRef::Name(name) => name.to_string(),
880 ColumnRef::Index(_) => format!("_{col_index}"),
881 };
882
883 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
884 apply_scope_context(&mut node, scope, source_name, reference_node_name);
885
886 for branch_scope in &scope.union_scopes {
888 if let Ok(child) = to_node_inner(
889 ColumnRef::Index(col_index),
890 branch_scope,
891 dialect,
892 scope_name,
893 "",
894 "",
895 trim_selects,
896 ancestor_cte_scopes,
897 depth + 1,
898 ) {
899 node.downstream.push(child);
900 }
901 }
902
903 Ok(node)
904}
905
906fn resolve_qualified_column(
911 node: &mut LineageNode,
912 scope: &Scope,
913 dialect: Option<DialectType>,
914 table: &str,
915 col_name: &str,
916 parent_name: &str,
917 trim_selects: bool,
918 all_cte_scopes: &[&Scope],
919 depth: usize,
920) {
921 let resolved_cte_name = resolve_cte_alias(scope, table);
924 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
925
926 let is_cte = scope.cte_sources.contains_key(effective_table)
929 || all_cte_scopes.iter().any(
930 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
931 );
932 if is_cte {
933 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
934 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
936 if let Ok(child) = to_node_inner(
937 ColumnRef::Name(col_name),
938 child_scope,
939 dialect,
940 parent_name,
941 effective_table,
942 parent_name,
943 trim_selects,
944 &ancestors,
945 depth + 1,
946 ) {
947 node.downstream.push(child);
948 return;
949 }
950 }
951 }
952
953 if let Some(source_info) = scope.sources.get(table) {
955 if source_info.is_scope {
956 if let Some(child_scope) = find_child_scope(scope, table) {
957 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
958 if let Ok(child) = to_node_inner(
959 ColumnRef::Name(col_name),
960 child_scope,
961 dialect,
962 parent_name,
963 table,
964 parent_name,
965 trim_selects,
966 &ancestors,
967 depth + 1,
968 ) {
969 node.downstream.push(child);
970 return;
971 }
972 }
973 }
974 }
975
976 if let Some(source_info) = scope.sources.get(table) {
979 if !source_info.is_scope {
980 let mut child = make_table_column_node_from_source(table, col_name, source_info);
981 if source_info.kind == SourceKind::Virtual {
982 attach_virtual_source_dependencies(
983 &mut child,
984 scope,
985 dialect,
986 table,
987 &source_info.expression,
988 trim_selects,
989 all_cte_scopes,
990 depth,
991 );
992 }
993 node.downstream.push(child);
994 return;
995 }
996 }
997
998 node.downstream
1000 .push(make_table_column_node(table, col_name));
1001}
1002
1003fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1009 if scope.cte_sources.contains_key(name) {
1011 return None;
1012 }
1013 if let Some(source_info) = scope.sources.get(name) {
1015 if source_info.is_scope {
1016 if let Expression::Cte(cte) = &source_info.expression {
1017 let cte_name = &cte.alias.name;
1018 if scope.cte_sources.contains_key(cte_name) {
1019 return Some(cte_name.clone());
1020 }
1021 }
1022 }
1023 }
1024 None
1025}
1026
1027fn resolve_unqualified_column(
1028 node: &mut LineageNode,
1029 scope: &Scope,
1030 dialect: Option<DialectType>,
1031 col_name: &str,
1032 parent_name: &str,
1033 trim_selects: bool,
1034 all_cte_scopes: &[&Scope],
1035 depth: usize,
1036) {
1037 let from_source_names = source_names_from_from_join(scope);
1041
1042 if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1043 resolve_qualified_column(
1044 node,
1045 scope,
1046 dialect,
1047 &tbl,
1048 col_name,
1049 parent_name,
1050 trim_selects,
1051 all_cte_scopes,
1052 depth,
1053 );
1054 return;
1055 }
1056
1057 if from_source_names.len() == 1 {
1058 let tbl = &from_source_names[0];
1059 resolve_qualified_column(
1060 node,
1061 scope,
1062 dialect,
1063 tbl,
1064 col_name,
1065 parent_name,
1066 trim_selects,
1067 all_cte_scopes,
1068 depth,
1069 );
1070 return;
1071 }
1072
1073 let child = LineageNode::new(
1075 col_name.to_string(),
1076 Expression::Column(Box::new(crate::expressions::Column {
1077 name: crate::expressions::Identifier::new(col_name.to_string()),
1078 table: None,
1079 join_mark: false,
1080 trailing_comments: vec![],
1081 span: None,
1082 inferred_type: None,
1083 })),
1084 node.source.clone(),
1085 );
1086 node.downstream.push(child);
1087}
1088
1089fn unique_virtual_source_for_column(
1090 scope: &Scope,
1091 source_names: &[String],
1092 col_name: &str,
1093) -> Option<String> {
1094 let mut matches = source_names.iter().filter_map(|source_name| {
1095 let source = scope.sources.get(source_name)?;
1096 if source.kind == SourceKind::Virtual
1097 && virtual_source_output_columns(source)
1098 .any(|column| column.eq_ignore_ascii_case(col_name))
1099 {
1100 Some(source_name.clone())
1101 } else {
1102 None
1103 }
1104 });
1105
1106 let first = matches.next()?;
1107 if matches.next().is_none() {
1108 Some(first)
1109 } else {
1110 None
1111 }
1112}
1113
1114fn virtual_source_output_columns(
1115 source_info: &ScopeSourceInfo,
1116) -> Box<dyn Iterator<Item = String> + '_> {
1117 match &source_info.expression {
1118 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1119 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1120 Box::new(alias_output_columns(alias))
1121 }
1122 Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1123 Expression::LateralView(lateral_view) => {
1124 Box::new(lateral_view_output_columns(lateral_view))
1125 }
1126 _ => Box::new(source_info.alias.clone().into_iter()),
1127 }
1128}
1129
1130fn unnest_output_columns(
1131 unnest: &crate::expressions::UnnestFunc,
1132) -> impl Iterator<Item = String> + '_ {
1133 unnest
1134 .alias
1135 .iter()
1136 .map(|alias| alias.name.clone())
1137 .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1138}
1139
1140fn alias_output_columns(
1141 alias: &crate::expressions::Alias,
1142) -> Box<dyn Iterator<Item = String> + '_> {
1143 if alias.column_aliases.is_empty() {
1144 Box::new(std::iter::once(alias.alias.name.clone()))
1145 } else {
1146 Box::new(
1147 alias
1148 .column_aliases
1149 .iter()
1150 .map(|column| column.name.clone()),
1151 )
1152 }
1153}
1154
1155fn lateral_output_columns(
1156 lateral: &crate::expressions::Lateral,
1157) -> Box<dyn Iterator<Item = String> + '_> {
1158 if lateral.column_aliases.is_empty() {
1159 default_virtual_output_columns(&lateral.this)
1160 } else {
1161 Box::new(lateral.column_aliases.iter().cloned())
1162 }
1163}
1164
1165fn lateral_view_output_columns(
1166 lateral_view: &crate::expressions::LateralView,
1167) -> Box<dyn Iterator<Item = String> + '_> {
1168 Box::new(
1169 lateral_view
1170 .column_aliases
1171 .iter()
1172 .map(|column| column.name.clone()),
1173 )
1174}
1175
1176fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1177 match expr {
1178 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1179 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1180 alias_output_columns(alias)
1181 }
1182 Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1183 Box::new(
1184 ["seq", "key", "path", "index", "value", "this"]
1185 .into_iter()
1186 .map(String::from),
1187 )
1188 }
1189 _ => Box::new(std::iter::empty()),
1190 }
1191}
1192
1193fn attach_virtual_source_dependencies(
1194 node: &mut LineageNode,
1195 scope: &Scope,
1196 dialect: Option<DialectType>,
1197 source_alias: &str,
1198 source_expr: &Expression,
1199 trim_selects: bool,
1200 all_cte_scopes: &[&Scope],
1201 depth: usize,
1202) {
1203 let parent_name = node.name.clone();
1204 let mut seen = HashSet::new();
1205 for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1206 let key = (
1207 col_ref.table.as_ref().map(|t| t.name.clone()),
1208 col_ref.column.clone(),
1209 );
1210 if !seen.insert(key) {
1211 continue;
1212 }
1213
1214 if let Some(table_id) = col_ref.table {
1215 let table = table_id.name;
1216 if table == source_alias {
1217 continue;
1218 }
1219 resolve_qualified_column(
1220 node,
1221 scope,
1222 dialect,
1223 &table,
1224 &col_ref.column,
1225 &parent_name,
1226 trim_selects,
1227 all_cte_scopes,
1228 depth + 1,
1229 );
1230 } else {
1231 let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1232 if non_virtual_sources.len() == 1 {
1233 resolve_qualified_column(
1234 node,
1235 scope,
1236 dialect,
1237 &non_virtual_sources[0],
1238 &col_ref.column,
1239 &parent_name,
1240 trim_selects,
1241 all_cte_scopes,
1242 depth + 1,
1243 );
1244 }
1245 }
1246 }
1247}
1248
1249fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1250 fn source_name(expr: &Expression) -> Option<String> {
1251 match expr {
1252 Expression::Table(table) => Some(
1253 table
1254 .alias
1255 .as_ref()
1256 .map(|a| a.name.clone())
1257 .unwrap_or_else(|| table.name.name.clone()),
1258 ),
1259 Expression::Subquery(subquery) => {
1260 subquery.alias.as_ref().map(|alias| alias.name.clone())
1261 }
1262 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1263 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1264 Some(alias.alias.name.clone())
1265 }
1266 Expression::Lateral(lateral) => lateral.alias.clone(),
1267 Expression::LateralView(lateral_view) => lateral_view
1268 .table_alias
1269 .as_ref()
1270 .or_else(|| lateral_view.column_aliases.first())
1271 .map(|alias| alias.name.clone()),
1272 Expression::Paren(paren) => source_name(&paren.this),
1273 _ => None,
1274 }
1275 }
1276
1277 let effective_expr = match &scope.expression {
1278 Expression::Cte(cte) => &cte.this,
1279 expr => expr,
1280 };
1281
1282 let mut names = Vec::new();
1283 let mut seen = std::collections::HashSet::new();
1284
1285 if let Expression::Select(select) = effective_expr {
1286 if let Some(from) = &select.from {
1287 for expr in &from.expressions {
1288 if let Some(name) = source_name(expr) {
1289 if !name.is_empty() && seen.insert(name.clone()) {
1290 names.push(name);
1291 }
1292 }
1293 }
1294 }
1295 for join in &select.joins {
1296 if let Some(name) = source_name(&join.this) {
1297 if !name.is_empty() && seen.insert(name.clone()) {
1298 names.push(name);
1299 }
1300 }
1301 }
1302 for lateral_view in &select.lateral_views {
1303 if let Some(name) =
1304 source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1305 {
1306 if !name.is_empty() && seen.insert(name.clone()) {
1307 names.push(name);
1308 }
1309 }
1310 }
1311 }
1312
1313 names
1314}
1315
1316fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1317 source_names_from_from_join(scope)
1318 .into_iter()
1319 .filter(|name| {
1320 !matches!(
1321 scope.sources.get(name).map(|source| source.kind),
1322 Some(SourceKind::Virtual)
1323 )
1324 })
1325 .collect()
1326}
1327
1328fn get_alias_or_name(expr: &Expression) -> Option<String> {
1334 match expr {
1335 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1336 Expression::Column(col) => Some(col.name.name.clone()),
1337 Expression::Identifier(id) => Some(id.name.clone()),
1338 Expression::Star(_) => Some("*".to_string()),
1339 Expression::Annotated(a) => get_alias_or_name(&a.this),
1342 _ => None,
1343 }
1344}
1345
1346fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1348 match column {
1349 ColumnRef::Name(n) => n.to_string(),
1350 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1351 }
1352}
1353
1354fn find_select_expr(
1356 scope_expr: &Expression,
1357 column: &ColumnRef<'_>,
1358 dialect: Option<DialectType>,
1359) -> Result<Expression> {
1360 if let Expression::Select(ref select) = scope_expr {
1361 match column {
1362 ColumnRef::Name(name) => {
1363 let normalized_name = normalize_column_name(name, dialect);
1364 for expr in &select.expressions {
1365 if let Some(alias_or_name) = get_alias_or_name(expr) {
1366 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1367 return Ok(expr.clone());
1368 }
1369 }
1370 }
1371 Err(crate::error::Error::parse(
1372 format!("Cannot find column '{}' in query", name),
1373 0,
1374 0,
1375 0,
1376 0,
1377 ))
1378 }
1379 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1380 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1381 }),
1382 }
1383 } else {
1384 Err(crate::error::Error::parse(
1385 "Expected SELECT expression for column lookup",
1386 0,
1387 0,
1388 0,
1389 0,
1390 ))
1391 }
1392}
1393
1394fn column_to_index(
1396 set_op_expr: &Expression,
1397 name: &str,
1398 dialect: Option<DialectType>,
1399) -> Result<usize> {
1400 let normalized_name = normalize_column_name(name, dialect);
1401 let mut expr = set_op_expr;
1402 loop {
1403 match expr {
1404 Expression::Union(u) => expr = &u.left,
1405 Expression::Intersect(i) => expr = &i.left,
1406 Expression::Except(e) => expr = &e.left,
1407 Expression::Select(select) => {
1408 for (i, e) in select.expressions.iter().enumerate() {
1409 if let Some(alias_or_name) = get_alias_or_name(e) {
1410 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1411 return Ok(i);
1412 }
1413 }
1414 }
1415 return Err(crate::error::Error::parse(
1416 format!("Cannot find column '{}' in set operation", name),
1417 0,
1418 0,
1419 0,
1420 0,
1421 ));
1422 }
1423 _ => {
1424 return Err(crate::error::Error::parse(
1425 "Expected SELECT or set operation",
1426 0,
1427 0,
1428 0,
1429 0,
1430 ))
1431 }
1432 }
1433 }
1434}
1435
1436fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1437 normalize_name(name, dialect, false, true)
1438}
1439
1440fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1442 if let Expression::Select(select) = select_expr {
1443 let mut trimmed = select.as_ref().clone();
1444 trimmed.expressions = vec![target_expr.clone()];
1445 Expression::Select(Box::new(trimmed))
1446 } else {
1447 select_expr.clone()
1448 }
1449}
1450
1451fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1453 if scope.cte_sources.contains_key(source_name) {
1455 for cte_scope in &scope.cte_scopes {
1456 if let Expression::Cte(cte) = &cte_scope.expression {
1457 if cte.alias.name == source_name {
1458 return Some(cte_scope);
1459 }
1460 }
1461 }
1462 }
1463
1464 if let Some(source_info) = scope.sources.get(source_name) {
1466 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1467 if let Expression::Subquery(sq) = &source_info.expression {
1468 for dt_scope in &scope.derived_table_scopes {
1469 if dt_scope.expression == sq.this {
1470 return Some(dt_scope);
1471 }
1472 }
1473 }
1474 }
1475 }
1476
1477 None
1478}
1479
1480fn find_child_scope_in<'a>(
1484 all_cte_scopes: &[&'a Scope],
1485 scope: &'a Scope,
1486 source_name: &str,
1487) -> Option<&'a Scope> {
1488 for cte_scope in &scope.cte_scopes {
1490 if let Expression::Cte(cte) = &cte_scope.expression {
1491 if cte.alias.name == source_name {
1492 return Some(cte_scope);
1493 }
1494 }
1495 }
1496
1497 for cte_scope in all_cte_scopes {
1499 if let Expression::Cte(cte) = &cte_scope.expression {
1500 if cte.alias.name == source_name {
1501 return Some(cte_scope);
1502 }
1503 }
1504 }
1505
1506 if let Some(source_info) = scope.sources.get(source_name) {
1508 if source_info.is_scope {
1509 if let Expression::Subquery(sq) = &source_info.expression {
1510 for dt_scope in &scope.derived_table_scopes {
1511 if dt_scope.expression == sq.this {
1512 return Some(dt_scope);
1513 }
1514 }
1515 }
1516 }
1517 }
1518
1519 None
1520}
1521
1522fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1524 let mut node = LineageNode::new(
1525 format!("{}.{}", table, column),
1526 Expression::Column(Box::new(crate::expressions::Column {
1527 name: crate::expressions::Identifier::new(column.to_string()),
1528 table: Some(crate::expressions::Identifier::new(table.to_string())),
1529 join_mark: false,
1530 trailing_comments: vec![],
1531 span: None,
1532 inferred_type: None,
1533 })),
1534 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1535 );
1536 node.source_name = table.to_string();
1537 node.source_kind = SourceKind::Table;
1538 node
1539}
1540
1541fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1542 let mut parts: Vec<String> = Vec::new();
1543 if let Some(catalog) = &table_ref.catalog {
1544 parts.push(catalog.name.clone());
1545 }
1546 if let Some(schema) = &table_ref.schema {
1547 parts.push(schema.name.clone());
1548 }
1549 parts.push(table_ref.name.name.clone());
1550 parts.join(".")
1551}
1552
1553fn apply_source_info_context(
1554 node: &mut LineageNode,
1555 source_key: &str,
1556 source_info: &ScopeSourceInfo,
1557) {
1558 node.source_kind = source_info.kind;
1559 node.source_name =
1560 source_info
1561 .lineage_name
1562 .clone()
1563 .unwrap_or_else(|| match &source_info.expression {
1564 Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1565 _ => source_key.to_string(),
1566 });
1567 node.source_alias = source_info.alias.clone();
1568}
1569
1570fn make_table_column_node_from_source(
1571 source_key: &str,
1572 column: &str,
1573 source_info: &ScopeSourceInfo,
1574) -> LineageNode {
1575 let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1576 let mut node = LineageNode::new(
1577 format!("{}.{}", lineage_name, column),
1578 Expression::Column(Box::new(crate::expressions::Column {
1579 name: crate::expressions::Identifier::new(column.to_string()),
1580 table: Some(crate::expressions::Identifier::new(
1581 lineage_name.to_string(),
1582 )),
1583 join_mark: false,
1584 trailing_comments: vec![],
1585 span: None,
1586 inferred_type: None,
1587 })),
1588 source_info.expression.clone(),
1589 );
1590
1591 apply_source_info_context(&mut node, source_key, source_info);
1592
1593 node
1594}
1595
1596#[derive(Debug, Clone)]
1598struct SimpleColumnRef {
1599 table: Option<crate::expressions::Identifier>,
1600 column: String,
1601}
1602
1603fn find_column_refs_in_expr(
1605 expr: &Expression,
1606 dialect: Option<DialectType>,
1607) -> Vec<SimpleColumnRef> {
1608 let mut refs = Vec::new();
1609 collect_column_refs(expr, dialect, &mut refs);
1610 refs
1611}
1612
1613fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1614 match expr {
1615 Expression::Column(col) => {
1616 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1617 }
1618 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1619 _ => false,
1620 }
1621}
1622
1623fn collect_column_refs(
1624 expr: &Expression,
1625 dialect: Option<DialectType>,
1626 refs: &mut Vec<SimpleColumnRef>,
1627) {
1628 let mut stack: Vec<&Expression> = vec![expr];
1629
1630 while let Some(current) = stack.pop() {
1631 match current {
1632 Expression::Column(col) => {
1634 refs.push(SimpleColumnRef {
1635 table: col.table.clone(),
1636 column: col.name.name.clone(),
1637 });
1638 }
1639
1640 Expression::Subquery(_) | Expression::Exists(_) => {}
1642
1643 Expression::And(op)
1645 | Expression::Or(op)
1646 | Expression::Eq(op)
1647 | Expression::Neq(op)
1648 | Expression::Lt(op)
1649 | Expression::Lte(op)
1650 | Expression::Gt(op)
1651 | Expression::Gte(op)
1652 | Expression::Add(op)
1653 | Expression::Sub(op)
1654 | Expression::Mul(op)
1655 | Expression::Div(op)
1656 | Expression::Mod(op)
1657 | Expression::BitwiseAnd(op)
1658 | Expression::BitwiseOr(op)
1659 | Expression::BitwiseXor(op)
1660 | Expression::BitwiseLeftShift(op)
1661 | Expression::BitwiseRightShift(op)
1662 | Expression::Concat(op)
1663 | Expression::Adjacent(op)
1664 | Expression::TsMatch(op)
1665 | Expression::PropertyEQ(op)
1666 | Expression::ArrayContainsAll(op)
1667 | Expression::ArrayContainedBy(op)
1668 | Expression::ArrayOverlaps(op)
1669 | Expression::JSONBContainsAllTopKeys(op)
1670 | Expression::JSONBContainsAnyTopKeys(op)
1671 | Expression::JSONBDeleteAtPath(op)
1672 | Expression::ExtendsLeft(op)
1673 | Expression::ExtendsRight(op)
1674 | Expression::Is(op)
1675 | Expression::MemberOf(op)
1676 | Expression::NullSafeEq(op)
1677 | Expression::NullSafeNeq(op)
1678 | Expression::Glob(op)
1679 | Expression::Match(op) => {
1680 stack.push(&op.left);
1681 stack.push(&op.right);
1682 }
1683
1684 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1686 stack.push(&u.this);
1687 }
1688
1689 Expression::Upper(f)
1691 | Expression::Lower(f)
1692 | Expression::Length(f)
1693 | Expression::LTrim(f)
1694 | Expression::RTrim(f)
1695 | Expression::Reverse(f)
1696 | Expression::Abs(f)
1697 | Expression::Sqrt(f)
1698 | Expression::Cbrt(f)
1699 | Expression::Ln(f)
1700 | Expression::Exp(f)
1701 | Expression::Sign(f)
1702 | Expression::Date(f)
1703 | Expression::Time(f)
1704 | Expression::DateFromUnixDate(f)
1705 | Expression::UnixDate(f)
1706 | Expression::UnixSeconds(f)
1707 | Expression::UnixMillis(f)
1708 | Expression::UnixMicros(f)
1709 | Expression::TimeStrToDate(f)
1710 | Expression::DateToDi(f)
1711 | Expression::DiToDate(f)
1712 | Expression::TsOrDiToDi(f)
1713 | Expression::TsOrDsToDatetime(f)
1714 | Expression::TsOrDsToTimestamp(f)
1715 | Expression::YearOfWeek(f)
1716 | Expression::YearOfWeekIso(f)
1717 | Expression::Initcap(f)
1718 | Expression::Ascii(f)
1719 | Expression::Chr(f)
1720 | Expression::Soundex(f)
1721 | Expression::ByteLength(f)
1722 | Expression::Hex(f)
1723 | Expression::LowerHex(f)
1724 | Expression::Unicode(f)
1725 | Expression::Radians(f)
1726 | Expression::Degrees(f)
1727 | Expression::Sin(f)
1728 | Expression::Cos(f)
1729 | Expression::Tan(f)
1730 | Expression::Asin(f)
1731 | Expression::Acos(f)
1732 | Expression::Atan(f)
1733 | Expression::IsNan(f)
1734 | Expression::IsInf(f)
1735 | Expression::ArrayLength(f)
1736 | Expression::ArraySize(f)
1737 | Expression::Cardinality(f)
1738 | Expression::ArrayReverse(f)
1739 | Expression::ArrayDistinct(f)
1740 | Expression::ArrayFlatten(f)
1741 | Expression::ArrayCompact(f)
1742 | Expression::Explode(f)
1743 | Expression::ExplodeOuter(f)
1744 | Expression::ToArray(f)
1745 | Expression::MapFromEntries(f)
1746 | Expression::MapKeys(f)
1747 | Expression::MapValues(f)
1748 | Expression::JsonArrayLength(f)
1749 | Expression::JsonKeys(f)
1750 | Expression::JsonType(f)
1751 | Expression::ParseJson(f)
1752 | Expression::ToJson(f)
1753 | Expression::Typeof(f)
1754 | Expression::BitwiseCount(f)
1755 | Expression::Year(f)
1756 | Expression::Month(f)
1757 | Expression::Day(f)
1758 | Expression::Hour(f)
1759 | Expression::Minute(f)
1760 | Expression::Second(f)
1761 | Expression::DayOfWeek(f)
1762 | Expression::DayOfWeekIso(f)
1763 | Expression::DayOfMonth(f)
1764 | Expression::DayOfYear(f)
1765 | Expression::WeekOfYear(f)
1766 | Expression::Quarter(f)
1767 | Expression::Epoch(f)
1768 | Expression::EpochMs(f)
1769 | Expression::TimeStrToUnix(f)
1770 | Expression::SHA(f)
1771 | Expression::SHA1Digest(f)
1772 | Expression::TimeToUnix(f)
1773 | Expression::JSONBool(f)
1774 | Expression::Int64(f)
1775 | Expression::MD5NumberLower64(f)
1776 | Expression::MD5NumberUpper64(f)
1777 | Expression::DateStrToDate(f)
1778 | Expression::DateToDateStr(f) => {
1779 stack.push(&f.this);
1780 }
1781
1782 Expression::Power(f)
1784 | Expression::NullIf(f)
1785 | Expression::IfNull(f)
1786 | Expression::Nvl(f)
1787 | Expression::UnixToTimeStr(f)
1788 | Expression::Contains(f)
1789 | Expression::StartsWith(f)
1790 | Expression::EndsWith(f)
1791 | Expression::Levenshtein(f)
1792 | Expression::ModFunc(f)
1793 | Expression::Atan2(f)
1794 | Expression::IntDiv(f)
1795 | Expression::AddMonths(f)
1796 | Expression::MonthsBetween(f)
1797 | Expression::NextDay(f)
1798 | Expression::ArrayContains(f)
1799 | Expression::ArrayPosition(f)
1800 | Expression::ArrayAppend(f)
1801 | Expression::ArrayPrepend(f)
1802 | Expression::ArrayUnion(f)
1803 | Expression::ArrayExcept(f)
1804 | Expression::ArrayRemove(f)
1805 | Expression::StarMap(f)
1806 | Expression::MapFromArrays(f)
1807 | Expression::MapContainsKey(f)
1808 | Expression::ElementAt(f)
1809 | Expression::JsonMergePatch(f)
1810 | Expression::JSONBContains(f)
1811 | Expression::JSONBExtract(f) => {
1812 stack.push(&f.this);
1813 stack.push(&f.expression);
1814 }
1815
1816 Expression::Greatest(f)
1818 | Expression::Least(f)
1819 | Expression::Coalesce(f)
1820 | Expression::ArrayConcat(f)
1821 | Expression::ArrayIntersect(f)
1822 | Expression::ArrayZip(f)
1823 | Expression::MapConcat(f)
1824 | Expression::JsonArray(f) => {
1825 for e in &f.expressions {
1826 stack.push(e);
1827 }
1828 }
1829
1830 Expression::Sum(f)
1832 | Expression::Avg(f)
1833 | Expression::Min(f)
1834 | Expression::Max(f)
1835 | Expression::ArrayAgg(f)
1836 | Expression::CountIf(f)
1837 | Expression::Stddev(f)
1838 | Expression::StddevPop(f)
1839 | Expression::StddevSamp(f)
1840 | Expression::Variance(f)
1841 | Expression::VarPop(f)
1842 | Expression::VarSamp(f)
1843 | Expression::Median(f)
1844 | Expression::Mode(f)
1845 | Expression::First(f)
1846 | Expression::Last(f)
1847 | Expression::AnyValue(f)
1848 | Expression::ApproxDistinct(f)
1849 | Expression::ApproxCountDistinct(f)
1850 | Expression::LogicalAnd(f)
1851 | Expression::LogicalOr(f)
1852 | Expression::Skewness(f)
1853 | Expression::ArrayConcatAgg(f)
1854 | Expression::ArrayUniqueAgg(f)
1855 | Expression::BoolXorAgg(f)
1856 | Expression::BitwiseAndAgg(f)
1857 | Expression::BitwiseOrAgg(f)
1858 | Expression::BitwiseXorAgg(f) => {
1859 stack.push(&f.this);
1860 if let Some(ref filter) = f.filter {
1861 stack.push(filter);
1862 }
1863 if let Some((ref expr, _)) = f.having_max {
1864 stack.push(expr);
1865 }
1866 if let Some(ref limit) = f.limit {
1867 stack.push(limit);
1868 }
1869 }
1870
1871 Expression::Function(func) => {
1873 for arg in &func.args {
1874 stack.push(arg);
1875 }
1876 }
1877 Expression::AggregateFunction(func) => {
1878 for arg in &func.args {
1879 stack.push(arg);
1880 }
1881 if let Some(ref filter) = func.filter {
1882 stack.push(filter);
1883 }
1884 if let Some(ref limit) = func.limit {
1885 stack.push(limit);
1886 }
1887 }
1888
1889 Expression::WindowFunction(wf) => {
1891 stack.push(&wf.this);
1892 }
1893
1894 Expression::Alias(a) => {
1896 stack.push(&a.this);
1897 }
1898 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1899 stack.push(&c.this);
1900 if let Some(ref fmt) = c.format {
1901 stack.push(fmt);
1902 }
1903 if let Some(ref def) = c.default {
1904 stack.push(def);
1905 }
1906 }
1907 Expression::Paren(p) => {
1908 stack.push(&p.this);
1909 }
1910 Expression::Annotated(a) => {
1911 stack.push(&a.this);
1912 }
1913 Expression::Case(case) => {
1914 if let Some(ref operand) = case.operand {
1915 stack.push(operand);
1916 }
1917 for (cond, result) in &case.whens {
1918 stack.push(cond);
1919 stack.push(result);
1920 }
1921 if let Some(ref else_expr) = case.else_ {
1922 stack.push(else_expr);
1923 }
1924 }
1925 Expression::Collation(c) => {
1926 stack.push(&c.this);
1927 }
1928 Expression::In(i) => {
1929 stack.push(&i.this);
1930 for e in &i.expressions {
1931 stack.push(e);
1932 }
1933 if let Some(ref q) = i.query {
1934 stack.push(q);
1935 }
1936 if let Some(ref u) = i.unnest {
1937 stack.push(u);
1938 }
1939 }
1940 Expression::Between(b) => {
1941 stack.push(&b.this);
1942 stack.push(&b.low);
1943 stack.push(&b.high);
1944 }
1945 Expression::IsNull(n) => {
1946 stack.push(&n.this);
1947 }
1948 Expression::IsTrue(t) | Expression::IsFalse(t) => {
1949 stack.push(&t.this);
1950 }
1951 Expression::IsJson(j) => {
1952 stack.push(&j.this);
1953 }
1954 Expression::Like(l) | Expression::ILike(l) => {
1955 stack.push(&l.left);
1956 stack.push(&l.right);
1957 if let Some(ref esc) = l.escape {
1958 stack.push(esc);
1959 }
1960 }
1961 Expression::SimilarTo(s) => {
1962 stack.push(&s.this);
1963 stack.push(&s.pattern);
1964 if let Some(ref esc) = s.escape {
1965 stack.push(esc);
1966 }
1967 }
1968 Expression::Ordered(o) => {
1969 stack.push(&o.this);
1970 }
1971 Expression::Array(a) => {
1972 for e in &a.expressions {
1973 stack.push(e);
1974 }
1975 }
1976 Expression::Tuple(t) => {
1977 for e in &t.expressions {
1978 stack.push(e);
1979 }
1980 }
1981 Expression::Struct(s) => {
1982 for (_, e) in &s.fields {
1983 stack.push(e);
1984 }
1985 }
1986 Expression::Subscript(s) => {
1987 stack.push(&s.this);
1988 stack.push(&s.index);
1989 }
1990 Expression::Dot(d) => {
1991 stack.push(&d.this);
1992 }
1993 Expression::MethodCall(m) => {
1994 if !matches!(dialect, Some(DialectType::BigQuery))
1995 || !is_bigquery_safe_namespace_receiver(&m.this)
1996 {
1997 stack.push(&m.this);
1998 }
1999 for arg in &m.args {
2000 stack.push(arg);
2001 }
2002 }
2003 Expression::ArraySlice(s) => {
2004 stack.push(&s.this);
2005 if let Some(ref start) = s.start {
2006 stack.push(start);
2007 }
2008 if let Some(ref end) = s.end {
2009 stack.push(end);
2010 }
2011 }
2012 Expression::Lambda(l) => {
2013 stack.push(&l.body);
2014 }
2015 Expression::NamedArgument(n) => {
2016 stack.push(&n.value);
2017 }
2018 Expression::Lateral(l) => {
2019 stack.push(&l.this);
2020 if let Some(ref view) = l.view {
2021 stack.push(view);
2022 }
2023 if let Some(ref outer) = l.outer {
2024 stack.push(outer);
2025 }
2026 if let Some(ref ordinality) = l.ordinality {
2027 stack.push(ordinality);
2028 }
2029 }
2030 Expression::LateralView(lv) => {
2031 stack.push(&lv.this);
2032 }
2033 Expression::TryCatch(t) => {
2034 for stmt in &t.try_body {
2035 stack.push(stmt);
2036 }
2037 if let Some(catch_body) = &t.catch_body {
2038 for stmt in catch_body {
2039 stack.push(stmt);
2040 }
2041 }
2042 }
2043 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2044 stack.push(e);
2045 }
2046
2047 Expression::Substring(f) => {
2049 stack.push(&f.this);
2050 stack.push(&f.start);
2051 if let Some(ref len) = f.length {
2052 stack.push(len);
2053 }
2054 }
2055 Expression::Trim(f) => {
2056 stack.push(&f.this);
2057 if let Some(ref chars) = f.characters {
2058 stack.push(chars);
2059 }
2060 }
2061 Expression::Replace(f) => {
2062 stack.push(&f.this);
2063 stack.push(&f.old);
2064 stack.push(&f.new);
2065 }
2066 Expression::IfFunc(f) => {
2067 stack.push(&f.condition);
2068 stack.push(&f.true_value);
2069 if let Some(ref fv) = f.false_value {
2070 stack.push(fv);
2071 }
2072 }
2073 Expression::Nvl2(f) => {
2074 stack.push(&f.this);
2075 stack.push(&f.true_value);
2076 stack.push(&f.false_value);
2077 }
2078 Expression::ConcatWs(f) => {
2079 stack.push(&f.separator);
2080 for e in &f.expressions {
2081 stack.push(e);
2082 }
2083 }
2084 Expression::Count(f) => {
2085 if let Some(ref this) = f.this {
2086 stack.push(this);
2087 }
2088 if let Some(ref filter) = f.filter {
2089 stack.push(filter);
2090 }
2091 }
2092 Expression::GroupConcat(f) => {
2093 stack.push(&f.this);
2094 if let Some(ref sep) = f.separator {
2095 stack.push(sep);
2096 }
2097 if let Some(ref filter) = f.filter {
2098 stack.push(filter);
2099 }
2100 }
2101 Expression::StringAgg(f) => {
2102 stack.push(&f.this);
2103 if let Some(ref sep) = f.separator {
2104 stack.push(sep);
2105 }
2106 if let Some(ref filter) = f.filter {
2107 stack.push(filter);
2108 }
2109 if let Some(ref limit) = f.limit {
2110 stack.push(limit);
2111 }
2112 }
2113 Expression::ListAgg(f) => {
2114 stack.push(&f.this);
2115 if let Some(ref sep) = f.separator {
2116 stack.push(sep);
2117 }
2118 if let Some(ref filter) = f.filter {
2119 stack.push(filter);
2120 }
2121 }
2122 Expression::SumIf(f) => {
2123 stack.push(&f.this);
2124 stack.push(&f.condition);
2125 if let Some(ref filter) = f.filter {
2126 stack.push(filter);
2127 }
2128 }
2129 Expression::DateAdd(f) | Expression::DateSub(f) => {
2130 stack.push(&f.this);
2131 stack.push(&f.interval);
2132 }
2133 Expression::DateDiff(f) => {
2134 stack.push(&f.this);
2135 stack.push(&f.expression);
2136 }
2137 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2138 stack.push(&f.this);
2139 }
2140 Expression::Extract(f) => {
2141 stack.push(&f.this);
2142 }
2143 Expression::Round(f) => {
2144 stack.push(&f.this);
2145 if let Some(ref d) = f.decimals {
2146 stack.push(d);
2147 }
2148 }
2149 Expression::Floor(f) => {
2150 stack.push(&f.this);
2151 if let Some(ref s) = f.scale {
2152 stack.push(s);
2153 }
2154 if let Some(ref t) = f.to {
2155 stack.push(t);
2156 }
2157 }
2158 Expression::Ceil(f) => {
2159 stack.push(&f.this);
2160 if let Some(ref d) = f.decimals {
2161 stack.push(d);
2162 }
2163 if let Some(ref t) = f.to {
2164 stack.push(t);
2165 }
2166 }
2167 Expression::Log(f) => {
2168 stack.push(&f.this);
2169 if let Some(ref b) = f.base {
2170 stack.push(b);
2171 }
2172 }
2173 Expression::AtTimeZone(f) => {
2174 stack.push(&f.this);
2175 stack.push(&f.zone);
2176 }
2177 Expression::Lead(f) | Expression::Lag(f) => {
2178 stack.push(&f.this);
2179 if let Some(ref off) = f.offset {
2180 stack.push(off);
2181 }
2182 if let Some(ref def) = f.default {
2183 stack.push(def);
2184 }
2185 }
2186 Expression::FirstValue(f) | Expression::LastValue(f) => {
2187 stack.push(&f.this);
2188 }
2189 Expression::NthValue(f) => {
2190 stack.push(&f.this);
2191 stack.push(&f.offset);
2192 }
2193 Expression::Position(f) => {
2194 stack.push(&f.substring);
2195 stack.push(&f.string);
2196 if let Some(ref start) = f.start {
2197 stack.push(start);
2198 }
2199 }
2200 Expression::Decode(f) => {
2201 stack.push(&f.this);
2202 for (search, result) in &f.search_results {
2203 stack.push(search);
2204 stack.push(result);
2205 }
2206 if let Some(ref def) = f.default {
2207 stack.push(def);
2208 }
2209 }
2210 Expression::CharFunc(f) => {
2211 for arg in &f.args {
2212 stack.push(arg);
2213 }
2214 }
2215 Expression::ArraySort(f) => {
2216 stack.push(&f.this);
2217 if let Some(ref cmp) = f.comparator {
2218 stack.push(cmp);
2219 }
2220 }
2221 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2222 stack.push(&f.this);
2223 stack.push(&f.separator);
2224 if let Some(ref nr) = f.null_replacement {
2225 stack.push(nr);
2226 }
2227 }
2228 Expression::ArrayFilter(f) => {
2229 stack.push(&f.this);
2230 stack.push(&f.filter);
2231 }
2232 Expression::ArrayTransform(f) => {
2233 stack.push(&f.this);
2234 stack.push(&f.transform);
2235 }
2236 Expression::Sequence(f)
2237 | Expression::Generate(f)
2238 | Expression::ExplodingGenerateSeries(f) => {
2239 stack.push(&f.start);
2240 stack.push(&f.stop);
2241 if let Some(ref step) = f.step {
2242 stack.push(step);
2243 }
2244 }
2245 Expression::JsonExtract(f)
2246 | Expression::JsonExtractScalar(f)
2247 | Expression::JsonQuery(f)
2248 | Expression::JsonValue(f) => {
2249 stack.push(&f.this);
2250 stack.push(&f.path);
2251 }
2252 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2253 stack.push(&f.this);
2254 for p in &f.paths {
2255 stack.push(p);
2256 }
2257 }
2258 Expression::JsonObject(f) => {
2259 for (k, v) in &f.pairs {
2260 stack.push(k);
2261 stack.push(v);
2262 }
2263 }
2264 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2265 stack.push(&f.this);
2266 for (path, val) in &f.path_values {
2267 stack.push(path);
2268 stack.push(val);
2269 }
2270 }
2271 Expression::Overlay(f) => {
2272 stack.push(&f.this);
2273 stack.push(&f.replacement);
2274 stack.push(&f.from);
2275 if let Some(ref len) = f.length {
2276 stack.push(len);
2277 }
2278 }
2279 Expression::Convert(f) => {
2280 stack.push(&f.this);
2281 if let Some(ref style) = f.style {
2282 stack.push(style);
2283 }
2284 }
2285 Expression::ApproxPercentile(f) => {
2286 stack.push(&f.this);
2287 stack.push(&f.percentile);
2288 if let Some(ref acc) = f.accuracy {
2289 stack.push(acc);
2290 }
2291 if let Some(ref filter) = f.filter {
2292 stack.push(filter);
2293 }
2294 }
2295 Expression::Percentile(f)
2296 | Expression::PercentileCont(f)
2297 | Expression::PercentileDisc(f) => {
2298 stack.push(&f.this);
2299 stack.push(&f.percentile);
2300 if let Some(ref filter) = f.filter {
2301 stack.push(filter);
2302 }
2303 }
2304 Expression::WithinGroup(f) => {
2305 stack.push(&f.this);
2306 }
2307 Expression::Left(f) | Expression::Right(f) => {
2308 stack.push(&f.this);
2309 stack.push(&f.length);
2310 }
2311 Expression::Repeat(f) => {
2312 stack.push(&f.this);
2313 stack.push(&f.times);
2314 }
2315 Expression::Lpad(f) | Expression::Rpad(f) => {
2316 stack.push(&f.this);
2317 stack.push(&f.length);
2318 if let Some(ref fill) = f.fill {
2319 stack.push(fill);
2320 }
2321 }
2322 Expression::Split(f) => {
2323 stack.push(&f.this);
2324 stack.push(&f.delimiter);
2325 }
2326 Expression::RegexpLike(f) => {
2327 stack.push(&f.this);
2328 stack.push(&f.pattern);
2329 if let Some(ref flags) = f.flags {
2330 stack.push(flags);
2331 }
2332 }
2333 Expression::RegexpReplace(f) => {
2334 stack.push(&f.this);
2335 stack.push(&f.pattern);
2336 stack.push(&f.replacement);
2337 if let Some(ref flags) = f.flags {
2338 stack.push(flags);
2339 }
2340 }
2341 Expression::RegexpExtract(f) => {
2342 stack.push(&f.this);
2343 stack.push(&f.pattern);
2344 if let Some(ref group) = f.group {
2345 stack.push(group);
2346 }
2347 }
2348 Expression::ToDate(f) => {
2349 stack.push(&f.this);
2350 if let Some(ref fmt) = f.format {
2351 stack.push(fmt);
2352 }
2353 }
2354 Expression::ToTimestamp(f) => {
2355 stack.push(&f.this);
2356 if let Some(ref fmt) = f.format {
2357 stack.push(fmt);
2358 }
2359 }
2360 Expression::DateFormat(f) | Expression::FormatDate(f) => {
2361 stack.push(&f.this);
2362 stack.push(&f.format);
2363 }
2364 Expression::LastDay(f) => {
2365 stack.push(&f.this);
2366 }
2367 Expression::FromUnixtime(f) => {
2368 stack.push(&f.this);
2369 if let Some(ref fmt) = f.format {
2370 stack.push(fmt);
2371 }
2372 }
2373 Expression::UnixTimestamp(f) => {
2374 if let Some(ref this) = f.this {
2375 stack.push(this);
2376 }
2377 if let Some(ref fmt) = f.format {
2378 stack.push(fmt);
2379 }
2380 }
2381 Expression::MakeDate(f) => {
2382 stack.push(&f.year);
2383 stack.push(&f.month);
2384 stack.push(&f.day);
2385 }
2386 Expression::MakeTimestamp(f) => {
2387 stack.push(&f.year);
2388 stack.push(&f.month);
2389 stack.push(&f.day);
2390 stack.push(&f.hour);
2391 stack.push(&f.minute);
2392 stack.push(&f.second);
2393 if let Some(ref tz) = f.timezone {
2394 stack.push(tz);
2395 }
2396 }
2397 Expression::TruncFunc(f) => {
2398 stack.push(&f.this);
2399 if let Some(ref d) = f.decimals {
2400 stack.push(d);
2401 }
2402 }
2403 Expression::ArrayFunc(f) => {
2404 for e in &f.expressions {
2405 stack.push(e);
2406 }
2407 }
2408 Expression::Unnest(f) => {
2409 stack.push(&f.this);
2410 for e in &f.expressions {
2411 stack.push(e);
2412 }
2413 }
2414 Expression::StructFunc(f) => {
2415 for (_, e) in &f.fields {
2416 stack.push(e);
2417 }
2418 }
2419 Expression::StructExtract(f) => {
2420 stack.push(&f.this);
2421 }
2422 Expression::NamedStruct(f) => {
2423 for (k, v) in &f.pairs {
2424 stack.push(k);
2425 stack.push(v);
2426 }
2427 }
2428 Expression::MapFunc(f) => {
2429 for k in &f.keys {
2430 stack.push(k);
2431 }
2432 for v in &f.values {
2433 stack.push(v);
2434 }
2435 }
2436 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2437 stack.push(&f.this);
2438 stack.push(&f.transform);
2439 }
2440 Expression::JsonArrayAgg(f) => {
2441 stack.push(&f.this);
2442 if let Some(ref filter) = f.filter {
2443 stack.push(filter);
2444 }
2445 }
2446 Expression::JsonObjectAgg(f) => {
2447 stack.push(&f.key);
2448 stack.push(&f.value);
2449 if let Some(ref filter) = f.filter {
2450 stack.push(filter);
2451 }
2452 }
2453 Expression::NTile(f) => {
2454 if let Some(ref n) = f.num_buckets {
2455 stack.push(n);
2456 }
2457 }
2458 Expression::Rand(f) => {
2459 if let Some(ref s) = f.seed {
2460 stack.push(s);
2461 }
2462 if let Some(ref lo) = f.lower {
2463 stack.push(lo);
2464 }
2465 if let Some(ref hi) = f.upper {
2466 stack.push(hi);
2467 }
2468 }
2469 Expression::Any(q) | Expression::All(q) => {
2470 stack.push(&q.this);
2471 stack.push(&q.subquery);
2472 }
2473 Expression::Overlaps(o) => {
2474 if let Some(ref this) = o.this {
2475 stack.push(this);
2476 }
2477 if let Some(ref expr) = o.expression {
2478 stack.push(expr);
2479 }
2480 if let Some(ref ls) = o.left_start {
2481 stack.push(ls);
2482 }
2483 if let Some(ref le) = o.left_end {
2484 stack.push(le);
2485 }
2486 if let Some(ref rs) = o.right_start {
2487 stack.push(rs);
2488 }
2489 if let Some(ref re) = o.right_end {
2490 stack.push(re);
2491 }
2492 }
2493 Expression::Interval(i) => {
2494 if let Some(ref this) = i.this {
2495 stack.push(this);
2496 }
2497 }
2498 Expression::TimeStrToTime(f) => {
2499 stack.push(&f.this);
2500 if let Some(ref zone) = f.zone {
2501 stack.push(zone);
2502 }
2503 }
2504 Expression::JSONBExtractScalar(f) => {
2505 stack.push(&f.this);
2506 stack.push(&f.expression);
2507 if let Some(ref jt) = f.json_type {
2508 stack.push(jt);
2509 }
2510 }
2511 Expression::JSONExtract(f) => {
2512 stack.push(&f.this);
2513 stack.push(&f.expression);
2514 for e in &f.expressions {
2515 stack.push(e);
2516 }
2517 if let Some(ref option) = f.option {
2518 stack.push(option);
2519 }
2520 if let Some(ref on_condition) = f.on_condition {
2521 stack.push(on_condition);
2522 }
2523 }
2524
2525 _ => {}
2530 }
2531 }
2532}
2533
2534#[cfg(test)]
2539mod tests {
2540 use super::*;
2541 use crate::dialects::{Dialect, DialectType};
2542 use crate::expressions::DataType;
2543 use crate::optimizer::annotate_types::annotate_types;
2544 use crate::parse_one;
2545 use crate::schema::{MappingSchema, Schema};
2546
2547 fn parse(sql: &str) -> Expression {
2548 let dialect = Dialect::get(DialectType::Generic);
2549 let ast = dialect.parse(sql).unwrap();
2550 ast.into_iter().next().unwrap()
2551 }
2552
2553 #[test]
2554 fn test_simple_lineage() {
2555 let expr = parse("SELECT a FROM t");
2556 let node = lineage("a", &expr, None, false).unwrap();
2557
2558 assert_eq!(node.name, "a");
2559 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2560 let names = node.downstream_names();
2562 assert!(
2563 names.iter().any(|n| n == "t.a"),
2564 "Expected t.a in downstream, got: {:?}",
2565 names
2566 );
2567 }
2568
2569 #[test]
2570 fn test_lineage_walk() {
2571 let root = LineageNode {
2572 name: "col_a".to_string(),
2573 expression: Expression::Null(crate::expressions::Null),
2574 source: Expression::Null(crate::expressions::Null),
2575 downstream: vec![LineageNode::new(
2576 "t.a",
2577 Expression::Null(crate::expressions::Null),
2578 Expression::Null(crate::expressions::Null),
2579 )],
2580 source_name: String::new(),
2581 source_kind: SourceKind::Unknown,
2582 source_alias: None,
2583 reference_node_name: String::new(),
2584 };
2585
2586 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2587 assert_eq!(names.len(), 2);
2588 assert_eq!(names[0], "col_a");
2589 assert_eq!(names[1], "t.a");
2590 }
2591
2592 #[test]
2593 fn test_aliased_column() {
2594 let expr = parse("SELECT a + 1 AS b FROM t");
2595 let node = lineage("b", &expr, None, false).unwrap();
2596
2597 assert_eq!(node.name, "b");
2598 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2600 assert!(
2601 all_names.iter().any(|n| n.contains("a")),
2602 "Expected to trace to column a, got: {:?}",
2603 all_names
2604 );
2605 }
2606
2607 #[test]
2608 fn test_qualified_column() {
2609 let expr = parse("SELECT t.a FROM t");
2610 let node = lineage("a", &expr, None, false).unwrap();
2611
2612 assert_eq!(node.name, "a");
2613 let names = node.downstream_names();
2614 assert!(
2615 names.iter().any(|n| n == "t.a"),
2616 "Expected t.a, got: {:?}",
2617 names
2618 );
2619 }
2620
2621 #[test]
2622 fn test_unqualified_column() {
2623 let expr = parse("SELECT a FROM t");
2624 let node = lineage("a", &expr, None, false).unwrap();
2625
2626 let names = node.downstream_names();
2628 assert!(
2629 names.iter().any(|n| n == "t.a"),
2630 "Expected t.a, got: {:?}",
2631 names
2632 );
2633 }
2634
2635 #[test]
2636 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2637 let query = "SELECT name FROM users";
2638 let dialect = Dialect::get(DialectType::BigQuery);
2639 let expr = dialect
2640 .parse(query)
2641 .unwrap()
2642 .into_iter()
2643 .next()
2644 .expect("expected one expression");
2645
2646 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2647 schema
2648 .add_table("users", &[("name".into(), DataType::Text)], None)
2649 .expect("schema setup");
2650
2651 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2652 .expect("lineage without schema");
2653 let mut expr_without = node_without_schema.expression.clone();
2654 annotate_types(
2655 &mut expr_without,
2656 Some(&schema),
2657 Some(DialectType::BigQuery),
2658 );
2659 assert_eq!(
2660 expr_without.inferred_type(),
2661 None,
2662 "Expected unresolved root type without schema-aware lineage qualification"
2663 );
2664
2665 let node_with_schema = lineage_with_schema(
2666 "name",
2667 &expr,
2668 Some(&schema),
2669 Some(DialectType::BigQuery),
2670 false,
2671 )
2672 .expect("lineage with schema");
2673 let mut expr_with = node_with_schema.expression.clone();
2674 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2675
2676 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2677 }
2678
2679 #[test]
2680 fn test_lineage_with_schema_correlated_scalar_subquery() {
2681 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2682 let dialect = Dialect::get(DialectType::BigQuery);
2683 let expr = dialect
2684 .parse(query)
2685 .unwrap()
2686 .into_iter()
2687 .next()
2688 .expect("expected one expression");
2689
2690 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2691 schema
2692 .add_table(
2693 "t1",
2694 &[("id".into(), DataType::BigInt { length: None })],
2695 None,
2696 )
2697 .expect("schema setup");
2698 schema
2699 .add_table(
2700 "t2",
2701 &[
2702 ("id".into(), DataType::BigInt { length: None }),
2703 ("val".into(), DataType::BigInt { length: None }),
2704 ],
2705 None,
2706 )
2707 .expect("schema setup");
2708
2709 let node = lineage_with_schema(
2710 "id",
2711 &expr,
2712 Some(&schema),
2713 Some(DialectType::BigQuery),
2714 false,
2715 )
2716 .expect("lineage_with_schema should handle correlated scalar subqueries");
2717
2718 assert_eq!(node.name, "id");
2719 }
2720
2721 #[test]
2722 fn test_lineage_with_schema_join_using() {
2723 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2724 let dialect = Dialect::get(DialectType::BigQuery);
2725 let expr = dialect
2726 .parse(query)
2727 .unwrap()
2728 .into_iter()
2729 .next()
2730 .expect("expected one expression");
2731
2732 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2733 schema
2734 .add_table(
2735 "t1",
2736 &[("a".into(), DataType::BigInt { length: None })],
2737 None,
2738 )
2739 .expect("schema setup");
2740 schema
2741 .add_table(
2742 "t2",
2743 &[("a".into(), DataType::BigInt { length: None })],
2744 None,
2745 )
2746 .expect("schema setup");
2747
2748 let node = lineage_with_schema(
2749 "a",
2750 &expr,
2751 Some(&schema),
2752 Some(DialectType::BigQuery),
2753 false,
2754 )
2755 .expect("lineage_with_schema should handle JOIN USING");
2756
2757 assert_eq!(node.name, "a");
2758 }
2759
2760 #[test]
2761 fn test_lineage_with_schema_qualified_table_name() {
2762 let query = "SELECT a FROM raw.t1";
2763 let dialect = Dialect::get(DialectType::BigQuery);
2764 let expr = dialect
2765 .parse(query)
2766 .unwrap()
2767 .into_iter()
2768 .next()
2769 .expect("expected one expression");
2770
2771 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2772 schema
2773 .add_table(
2774 "raw.t1",
2775 &[("a".into(), DataType::BigInt { length: None })],
2776 None,
2777 )
2778 .expect("schema setup");
2779
2780 let node = lineage_with_schema(
2781 "a",
2782 &expr,
2783 Some(&schema),
2784 Some(DialectType::BigQuery),
2785 false,
2786 )
2787 .expect("lineage_with_schema should handle dotted schema.table names");
2788
2789 assert_eq!(node.name, "a");
2790 }
2791
2792 #[test]
2793 fn test_lineage_with_schema_none_matches_lineage() {
2794 let expr = parse("SELECT a FROM t");
2795 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2796 let with_none =
2797 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2798
2799 assert_eq!(with_none.name, baseline.name);
2800 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2801 }
2802
2803 #[test]
2804 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2805 let dialect = Dialect::get(DialectType::BigQuery);
2806 let expr = dialect
2807 .parse("SELECT Name AS name FROM teams")
2808 .unwrap()
2809 .into_iter()
2810 .next()
2811 .expect("expected one expression");
2812
2813 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2814 schema
2815 .add_table(
2816 "teams",
2817 &[("Name".into(), DataType::String { length: None })],
2818 None,
2819 )
2820 .expect("schema setup");
2821
2822 let node = lineage_with_schema(
2823 "name",
2824 &expr,
2825 Some(&schema),
2826 Some(DialectType::BigQuery),
2827 false,
2828 )
2829 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2830
2831 let names = node.downstream_names();
2832 assert!(
2833 names.iter().any(|n| n == "teams.Name"),
2834 "Expected teams.Name in downstream, got: {:?}",
2835 names
2836 );
2837 }
2838
2839 #[test]
2840 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2841 let dialect = Dialect::get(DialectType::BigQuery);
2842 let expr = dialect
2843 .parse("SELECT Name AS Name FROM teams")
2844 .unwrap()
2845 .into_iter()
2846 .next()
2847 .expect("expected one expression");
2848
2849 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2850 .expect("lineage should resolve mixed-case aliases in BigQuery");
2851
2852 assert_eq!(node.name, "name");
2853 }
2854
2855 #[test]
2856 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2857 let expr = parse_one(
2858 r#"
2859SELECT date_val AS week_start
2860FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2861"#,
2862 DialectType::BigQuery,
2863 )
2864 .expect("parse");
2865
2866 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2867 .expect("lineage should resolve UNNEST alias as a source");
2868 let child = node
2869 .downstream
2870 .first()
2871 .expect("week_start should have downstream lineage");
2872
2873 assert_eq!(child.name, "_0.date_val");
2874 assert_eq!(child.source_name, "_0");
2875 assert_eq!(child.source_kind, SourceKind::Virtual);
2876 assert_eq!(child.source_alias.as_deref(), Some("date_val"));
2877
2878 let Expression::Column(column) = &child.expression else {
2879 panic!(
2880 "expected downstream column expression, got {:?}",
2881 child.expression
2882 );
2883 };
2884 assert_eq!(column.name.name, "date_val");
2885 assert_eq!(
2886 column.table.as_ref().map(|table| table.name.as_str()),
2887 Some("_0")
2888 );
2889 assert!(
2890 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2891 "expected UNNEST source expression, got {:?}",
2892 child.source
2893 );
2894 }
2895
2896 #[test]
2897 fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
2898 let expr =
2899 parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
2900
2901 let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2902 let child = node.downstream.first().expect("id should have lineage");
2903
2904 assert_eq!(child.name, "date_val.id");
2905 assert_eq!(child.source_name, "date_val");
2906 assert_eq!(child.source_kind, SourceKind::Table);
2907 assert_eq!(child.source_alias, None);
2908 }
2909
2910 #[test]
2911 fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
2912 let expr = parse_one(
2913 r#"
2914SELECT a.a AS first_value, b.b AS second_value
2915FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
2916JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
2917"#,
2918 DialectType::BigQuery,
2919 )
2920 .expect("parse");
2921
2922 let first =
2923 lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2924 let second =
2925 lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2926
2927 let first_child = first.downstream.first().expect("first source");
2928 let second_child = second.downstream.first().expect("second source");
2929
2930 assert_eq!(first_child.name, "_0.a");
2931 assert_eq!(first_child.source_name, "_0");
2932 assert_eq!(first_child.source_alias.as_deref(), Some("a"));
2933 assert_eq!(first_child.source_kind, SourceKind::Virtual);
2934
2935 assert_eq!(second_child.name, "_1.b");
2936 assert_eq!(second_child.source_name, "_1");
2937 assert_eq!(second_child.source_alias.as_deref(), Some("b"));
2938 assert_eq!(second_child.source_kind, SourceKind::Virtual);
2939 }
2940
2941 #[test]
2942 fn test_lineage_table_backed_unnest_points_to_real_source_column() {
2943 let expr = parse_one(
2944 r#"
2945SELECT item.item AS item
2946FROM t JOIN UNNEST(t.items) AS item ON TRUE
2947"#,
2948 DialectType::BigQuery,
2949 )
2950 .expect("parse");
2951
2952 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2953 let virtual_child = node.downstream.first().expect("virtual item source");
2954 assert_eq!(virtual_child.name, "_0.item");
2955 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2956
2957 let real_child = virtual_child
2958 .downstream
2959 .first()
2960 .expect("UNNEST(t.items) should depend on t.items");
2961 assert_eq!(real_child.name, "t.items");
2962 assert_eq!(real_child.source_name, "t");
2963 assert_eq!(real_child.source_kind, SourceKind::Table);
2964 }
2965
2966 #[test]
2967 fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
2968 let expr = parse_one(
2969 r#"
2970SELECT item AS item
2971FROM t JOIN UNNEST(t.items) AS item ON TRUE
2972"#,
2973 DialectType::BigQuery,
2974 )
2975 .expect("parse");
2976
2977 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2978 let virtual_child = node.downstream.first().expect("virtual item source");
2979 assert_eq!(virtual_child.name, "_0.item");
2980 assert_eq!(virtual_child.source_name, "_0");
2981 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2982 assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
2983
2984 let real_child = virtual_child
2985 .downstream
2986 .first()
2987 .expect("UNNEST(t.items) should depend on t.items");
2988 assert_eq!(real_child.name, "t.items");
2989 assert_eq!(real_child.source_name, "t");
2990 assert_eq!(real_child.source_kind, SourceKind::Table);
2991 }
2992
2993 #[test]
2994 fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
2995 let cases = [
2996 (
2997 DialectType::PostgreSQL,
2998 "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
2999 ),
3000 (
3001 DialectType::Presto,
3002 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3003 ),
3004 (
3005 DialectType::Trino,
3006 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3007 ),
3008 ];
3009
3010 for (dialect, sql) in cases {
3011 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3012 let node = lineage("out", &expr, Some(dialect), false)
3013 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3014 let virtual_child = node
3015 .downstream
3016 .first()
3017 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3018
3019 assert_eq!(
3020 virtual_child.name, "_0.x",
3021 "unexpected virtual child for {dialect:?}"
3022 );
3023 assert_eq!(virtual_child.source_name, "_0");
3024 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3025 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3026
3027 let real_child = virtual_child
3028 .downstream
3029 .first()
3030 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3031 assert_eq!(real_child.name, "t.items");
3032 assert_eq!(real_child.source_kind, SourceKind::Table);
3033 }
3034 }
3035
3036 #[test]
3037 fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3038 let cases = [
3039 (
3040 DialectType::Spark,
3041 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3042 ),
3043 (
3044 DialectType::Hive,
3045 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3046 ),
3047 ];
3048
3049 for (dialect, sql) in cases {
3050 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3051 let node = lineage("out", &expr, Some(dialect), false)
3052 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3053 let virtual_child = node
3054 .downstream
3055 .first()
3056 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3057
3058 assert_eq!(virtual_child.name, "_0.x");
3059 assert_eq!(virtual_child.source_name, "_0");
3060 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3061 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3062
3063 let real_child = virtual_child
3064 .downstream
3065 .first()
3066 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3067 assert_eq!(real_child.name, "t.items");
3068 assert_eq!(real_child.source_kind, SourceKind::Table);
3069 }
3070 }
3071
3072 #[test]
3073 fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3074 let expr = parse_one(
3075 "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3076 DialectType::Snowflake,
3077 )
3078 .expect("parse");
3079
3080 let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3081 let virtual_child = node.downstream.first().expect("virtual flatten source");
3082 assert_eq!(virtual_child.name, "_0.value");
3083 assert_eq!(virtual_child.source_name, "_0");
3084 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3085 assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3086
3087 let real_child = virtual_child
3088 .downstream
3089 .first()
3090 .expect("FLATTEN input should depend on raw_events.payload");
3091 assert_eq!(real_child.name, "raw_events.payload");
3092 assert_eq!(real_child.source_kind, SourceKind::Table);
3093 }
3094
3095 #[test]
3096 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3097 let expr = parse_one(
3098 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3099 DialectType::Snowflake,
3100 )
3101 .expect("parse");
3102
3103 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3104 schema
3105 .add_table(
3106 "fact.some_daily_metrics",
3107 &[("date_utc".to_string(), DataType::Date)],
3108 None,
3109 )
3110 .expect("schema setup");
3111
3112 let node = lineage_with_schema(
3113 "recency",
3114 &expr,
3115 Some(&schema),
3116 Some(DialectType::Snowflake),
3117 false,
3118 )
3119 .expect("lineage_with_schema should not treat date part as a column");
3120
3121 let names = node.downstream_names();
3122 assert!(
3123 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3124 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3125 names
3126 );
3127 assert!(
3128 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3129 "Did not expect date part to appear as lineage column, got: {:?}",
3130 names
3131 );
3132 }
3133
3134 #[test]
3135 fn test_snowflake_datediff_parses_to_typed_ast() {
3136 let expr = parse_one(
3137 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3138 DialectType::Snowflake,
3139 )
3140 .expect("parse");
3141
3142 match expr {
3143 Expression::Select(select) => match &select.expressions[0] {
3144 Expression::Alias(alias) => match &alias.this {
3145 Expression::DateDiff(f) => {
3146 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3147 }
3148 other => panic!("expected DateDiff, got {other:?}"),
3149 },
3150 other => panic!("expected Alias, got {other:?}"),
3151 },
3152 other => panic!("expected Select, got {other:?}"),
3153 }
3154 }
3155
3156 #[test]
3157 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3158 let expr = parse_one(
3159 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3160 DialectType::Snowflake,
3161 )
3162 .expect("parse");
3163
3164 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3165 schema
3166 .add_table(
3167 "fact.some_daily_metrics",
3168 &[("date_utc".to_string(), DataType::Date)],
3169 None,
3170 )
3171 .expect("schema setup");
3172
3173 let node = lineage_with_schema(
3174 "next_day",
3175 &expr,
3176 Some(&schema),
3177 Some(DialectType::Snowflake),
3178 false,
3179 )
3180 .expect("lineage_with_schema should not treat DATEADD date part as a column");
3181
3182 let names = node.downstream_names();
3183 assert!(
3184 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3185 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3186 names
3187 );
3188 assert!(
3189 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3190 "Did not expect date part to appear as lineage column, got: {:?}",
3191 names
3192 );
3193 }
3194
3195 #[test]
3196 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3197 let expr = parse_one(
3198 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3199 DialectType::Snowflake,
3200 )
3201 .expect("parse");
3202
3203 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3204 schema
3205 .add_table(
3206 "fact.some_daily_metrics",
3207 &[("date_utc".to_string(), DataType::Date)],
3208 None,
3209 )
3210 .expect("schema setup");
3211
3212 let node = lineage_with_schema(
3213 "day_part",
3214 &expr,
3215 Some(&schema),
3216 Some(DialectType::Snowflake),
3217 false,
3218 )
3219 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3220
3221 let names = node.downstream_names();
3222 assert!(
3223 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3224 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3225 names
3226 );
3227 assert!(
3228 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3229 "Did not expect date part to appear as lineage column, got: {:?}",
3230 names
3231 );
3232 }
3233
3234 #[test]
3235 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3236 let expr = parse_one(
3237 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3238 DialectType::Snowflake,
3239 )
3240 .expect("parse");
3241
3242 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3243 schema
3244 .add_table(
3245 "fact.some_daily_metrics",
3246 &[("date_utc".to_string(), DataType::Date)],
3247 None,
3248 )
3249 .expect("schema setup");
3250
3251 let node = lineage_with_schema(
3252 "day_part",
3253 &expr,
3254 Some(&schema),
3255 Some(DialectType::Snowflake),
3256 false,
3257 )
3258 .expect("quoted DATE_PART should continue to work");
3259
3260 let names = node.downstream_names();
3261 assert!(
3262 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3263 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3264 names
3265 );
3266 }
3267
3268 #[test]
3269 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
3270 let expr = parse_one(
3271 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3272 DialectType::Snowflake,
3273 )
3274 .expect("parse");
3275
3276 match expr {
3277 Expression::Select(select) => match &select.expressions[0] {
3278 Expression::Alias(alias) => match &alias.this {
3279 Expression::Function(f) => {
3280 assert_eq!(f.name.to_uppercase(), "DATEADD");
3281 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3282 }
3283 other => panic!("expected generic DATEADD function, got {other:?}"),
3284 },
3285 other => panic!("expected Alias, got {other:?}"),
3286 },
3287 other => panic!("expected Select, got {other:?}"),
3288 }
3289 }
3290
3291 #[test]
3292 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
3293 let expr = parse_one(
3294 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3295 DialectType::Snowflake,
3296 )
3297 .expect("parse");
3298
3299 match expr {
3300 Expression::Select(select) => match &select.expressions[0] {
3301 Expression::Alias(alias) => match &alias.this {
3302 Expression::Function(f) => {
3303 assert_eq!(f.name.to_uppercase(), "DATE_PART");
3304 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3305 }
3306 other => panic!("expected generic DATE_PART function, got {other:?}"),
3307 },
3308 other => panic!("expected Alias, got {other:?}"),
3309 },
3310 other => panic!("expected Select, got {other:?}"),
3311 }
3312 }
3313
3314 #[test]
3315 fn test_snowflake_date_part_string_literal_stays_generic_function() {
3316 let expr = parse_one(
3317 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3318 DialectType::Snowflake,
3319 )
3320 .expect("parse");
3321
3322 match expr {
3323 Expression::Select(select) => match &select.expressions[0] {
3324 Expression::Alias(alias) => match &alias.this {
3325 Expression::Function(f) => {
3326 assert_eq!(f.name.to_uppercase(), "DATE_PART");
3327 }
3328 other => panic!("expected generic DATE_PART function, got {other:?}"),
3329 },
3330 other => panic!("expected Alias, got {other:?}"),
3331 },
3332 other => panic!("expected Select, got {other:?}"),
3333 }
3334 }
3335
3336 #[test]
3337 fn test_lineage_join() {
3338 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3339
3340 let node_a = lineage("a", &expr, None, false).unwrap();
3341 let names_a = node_a.downstream_names();
3342 assert!(
3343 names_a.iter().any(|n| n == "t.a"),
3344 "Expected t.a, got: {:?}",
3345 names_a
3346 );
3347
3348 let node_b = lineage("b", &expr, None, false).unwrap();
3349 let names_b = node_b.downstream_names();
3350 assert!(
3351 names_b.iter().any(|n| n == "s.b"),
3352 "Expected s.b, got: {:?}",
3353 names_b
3354 );
3355 }
3356
3357 #[test]
3358 fn test_lineage_alias_leaf_has_resolved_source_name() {
3359 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3360 let node = lineage("col1", &expr, None, false).unwrap();
3361
3362 let names = node.downstream_names();
3364 assert!(
3365 names.iter().any(|n| n == "t1.col1"),
3366 "Expected aliased column edge t1.col1, got: {:?}",
3367 names
3368 );
3369
3370 let leaf = node
3372 .downstream
3373 .iter()
3374 .find(|n| n.name == "t1.col1")
3375 .expect("Expected t1.col1 leaf");
3376 assert_eq!(leaf.source_name, "table1");
3377 match &leaf.source {
3378 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3379 _ => panic!("Expected leaf source to be a table expression"),
3380 }
3381 }
3382
3383 #[test]
3384 fn test_lineage_derived_table() {
3385 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3386 let node = lineage("a", &expr, None, false).unwrap();
3387
3388 assert_eq!(node.name, "a");
3389 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3391 assert!(
3392 all_names.iter().any(|n| n == "t.a"),
3393 "Expected to trace through derived table to t.a, got: {:?}",
3394 all_names
3395 );
3396 }
3397
3398 #[test]
3399 fn test_lineage_cte() {
3400 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3401 let node = lineage("a", &expr, None, false).unwrap();
3402
3403 assert_eq!(node.name, "a");
3404 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3405 assert!(
3406 all_names.iter().any(|n| n == "t.a"),
3407 "Expected to trace through CTE to t.a, got: {:?}",
3408 all_names
3409 );
3410 }
3411
3412 #[test]
3413 fn test_lineage_union() {
3414 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3415 let node = lineage("a", &expr, None, false).unwrap();
3416
3417 assert_eq!(node.name, "a");
3418 assert_eq!(
3420 node.downstream.len(),
3421 2,
3422 "Expected 2 branches for UNION, got {}",
3423 node.downstream.len()
3424 );
3425 }
3426
3427 #[test]
3428 fn test_lineage_cte_union() {
3429 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3430 let node = lineage("a", &expr, None, false).unwrap();
3431
3432 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3434 assert!(
3435 all_names.len() >= 3,
3436 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3437 all_names
3438 );
3439 }
3440
3441 #[test]
3442 fn test_lineage_star() {
3443 let expr = parse("SELECT * FROM t");
3444 let node = lineage("*", &expr, None, false).unwrap();
3445
3446 assert_eq!(node.name, "*");
3447 assert!(
3449 !node.downstream.is_empty(),
3450 "Star should produce downstream nodes"
3451 );
3452 }
3453
3454 #[test]
3455 fn test_lineage_subquery_in_select() {
3456 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3457 let node = lineage("x", &expr, None, false).unwrap();
3458
3459 assert_eq!(node.name, "x");
3460 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3462 assert!(
3463 all_names.len() >= 2,
3464 "Expected tracing into scalar subquery, got: {:?}",
3465 all_names
3466 );
3467 }
3468
3469 #[test]
3470 fn test_lineage_multiple_columns() {
3471 let expr = parse("SELECT a, b FROM t");
3472
3473 let node_a = lineage("a", &expr, None, false).unwrap();
3474 let node_b = lineage("b", &expr, None, false).unwrap();
3475
3476 assert_eq!(node_a.name, "a");
3477 assert_eq!(node_b.name, "b");
3478
3479 let names_a = node_a.downstream_names();
3481 let names_b = node_b.downstream_names();
3482 assert!(names_a.iter().any(|n| n == "t.a"));
3483 assert!(names_b.iter().any(|n| n == "t.b"));
3484 }
3485
3486 #[test]
3487 fn test_get_source_tables() {
3488 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3489 let node = lineage("a", &expr, None, false).unwrap();
3490
3491 let tables = get_source_tables(&node);
3492 assert!(
3493 tables.contains("t"),
3494 "Expected source table 't', got: {:?}",
3495 tables
3496 );
3497 }
3498
3499 #[test]
3500 fn test_lineage_column_not_found() {
3501 let expr = parse("SELECT a FROM t");
3502 let result = lineage("nonexistent", &expr, None, false);
3503 assert!(result.is_err());
3504 }
3505
3506 #[test]
3507 fn test_lineage_nested_cte() {
3508 let expr = parse(
3509 "WITH cte1 AS (SELECT a FROM t), \
3510 cte2 AS (SELECT a FROM cte1) \
3511 SELECT a FROM cte2",
3512 );
3513 let node = lineage("a", &expr, None, false).unwrap();
3514
3515 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3517 assert!(
3518 all_names.len() >= 3,
3519 "Expected to trace through nested CTEs, got: {:?}",
3520 all_names
3521 );
3522 }
3523
3524 #[test]
3525 fn test_trim_selects_true() {
3526 let expr = parse("SELECT a, b, c FROM t");
3527 let node = lineage("a", &expr, None, true).unwrap();
3528
3529 if let Expression::Select(select) = &node.source {
3531 assert_eq!(
3532 select.expressions.len(),
3533 1,
3534 "Trimmed source should have 1 expression, got {}",
3535 select.expressions.len()
3536 );
3537 } else {
3538 panic!("Expected Select source");
3539 }
3540 }
3541
3542 #[test]
3543 fn test_trim_selects_false() {
3544 let expr = parse("SELECT a, b, c FROM t");
3545 let node = lineage("a", &expr, None, false).unwrap();
3546
3547 if let Expression::Select(select) = &node.source {
3549 assert_eq!(
3550 select.expressions.len(),
3551 3,
3552 "Untrimmed source should have 3 expressions"
3553 );
3554 } else {
3555 panic!("Expected Select source");
3556 }
3557 }
3558
3559 #[test]
3560 fn test_lineage_expression_in_select() {
3561 let expr = parse("SELECT a + b AS c FROM t");
3562 let node = lineage("c", &expr, None, false).unwrap();
3563
3564 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3566 assert!(
3567 all_names.len() >= 3,
3568 "Expected to trace a + b to both columns, got: {:?}",
3569 all_names
3570 );
3571 }
3572
3573 #[test]
3574 fn test_set_operation_by_index() {
3575 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3576
3577 let node = lineage("a", &expr, None, false).unwrap();
3579
3580 assert_eq!(node.downstream.len(), 2);
3582 }
3583
3584 fn print_node(node: &LineageNode, indent: usize) {
3587 let pad = " ".repeat(indent);
3588 println!(
3589 "{pad}name={:?} source_name={:?}",
3590 node.name, node.source_name
3591 );
3592 for child in &node.downstream {
3593 print_node(child, indent + 1);
3594 }
3595 }
3596
3597 #[test]
3598 fn test_issue18_repro() {
3599 let query = "SELECT UPPER(name) as upper_name FROM users";
3601 println!("Query: {query}\n");
3602
3603 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3604 let exprs = dialect.parse(query).unwrap();
3605 let expr = &exprs[0];
3606
3607 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3608 println!("lineage(\"upper_name\"):");
3609 print_node(&node, 1);
3610
3611 let names = node.downstream_names();
3612 assert!(
3613 names.iter().any(|n| n == "users.name"),
3614 "Expected users.name in downstream, got: {:?}",
3615 names
3616 );
3617 }
3618
3619 #[test]
3620 fn test_lineage_bigquery_safe_namespace_issue207() {
3621 let query = r#"
3622WITH import_cte AS (
3623 SELECT timestamp, data, operation
3624 FROM `project`.`dataset`.`source_table`
3625),
3626transform_cte AS (
3627 SELECT
3628 timestamp,
3629 SAFE.PARSE_JSON(data) AS json_data
3630 FROM import_cte
3631)
3632SELECT json_data FROM transform_cte
3633"#;
3634 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3635 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3636 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3637 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3638
3639 assert!(
3640 names.iter().any(|name| name == "source_table.data"),
3641 "expected source_table.data in lineage, got {names:?}"
3642 );
3643 assert!(
3644 !names
3645 .iter()
3646 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3647 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3648 );
3649 }
3650
3651 #[test]
3652 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3653 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3654 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3655 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3656 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3657
3658 assert!(
3659 names.iter().any(|name| name == "t.data"),
3660 "expected t.data in lineage, got {names:?}"
3661 );
3662 assert!(
3663 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3664 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3665 );
3666 }
3667
3668 #[test]
3669 fn test_lineage_method_call_receiver_control() {
3670 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3671 let node = lineage("out", &expr, None, false).expect("lineage");
3672 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3673
3674 assert!(
3675 names.iter().any(|name| name == "t.obj"),
3676 "expected ordinary method receiver to remain in lineage, got {names:?}"
3677 );
3678 assert!(
3679 names.iter().any(|name| name == "t.arg"),
3680 "expected method argument in lineage, got {names:?}"
3681 );
3682 }
3683
3684 #[test]
3685 fn test_lineage_upper_function() {
3686 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3687 let node = lineage("upper_name", &expr, None, false).unwrap();
3688
3689 let names = node.downstream_names();
3690 assert!(
3691 names.iter().any(|n| n == "users.name"),
3692 "Expected users.name in downstream, got: {:?}",
3693 names
3694 );
3695 }
3696
3697 #[test]
3698 fn test_lineage_round_function() {
3699 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3700 let node = lineage("rounded", &expr, None, false).unwrap();
3701
3702 let names = node.downstream_names();
3703 assert!(
3704 names.iter().any(|n| n == "products.price"),
3705 "Expected products.price in downstream, got: {:?}",
3706 names
3707 );
3708 }
3709
3710 #[test]
3711 fn test_lineage_coalesce_function() {
3712 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3713 let node = lineage("val", &expr, None, false).unwrap();
3714
3715 let names = node.downstream_names();
3716 assert!(
3717 names.iter().any(|n| n == "t.a"),
3718 "Expected t.a in downstream, got: {:?}",
3719 names
3720 );
3721 assert!(
3722 names.iter().any(|n| n == "t.b"),
3723 "Expected t.b in downstream, got: {:?}",
3724 names
3725 );
3726 }
3727
3728 #[test]
3729 fn test_lineage_count_function() {
3730 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3731 let node = lineage("cnt", &expr, None, false).unwrap();
3732
3733 let names = node.downstream_names();
3734 assert!(
3735 names.iter().any(|n| n == "t.id"),
3736 "Expected t.id in downstream, got: {:?}",
3737 names
3738 );
3739 }
3740
3741 #[test]
3742 fn test_lineage_sum_function() {
3743 let expr = parse("SELECT SUM(amount) AS total FROM t");
3744 let node = lineage("total", &expr, None, false).unwrap();
3745
3746 let names = node.downstream_names();
3747 assert!(
3748 names.iter().any(|n| n == "t.amount"),
3749 "Expected t.amount in downstream, got: {:?}",
3750 names
3751 );
3752 }
3753
3754 #[test]
3755 fn test_lineage_case_with_nested_functions() {
3756 let expr =
3757 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3758 let node = lineage("result", &expr, None, false).unwrap();
3759
3760 let names = node.downstream_names();
3761 assert!(
3762 names.iter().any(|n| n == "t.x"),
3763 "Expected t.x in downstream, got: {:?}",
3764 names
3765 );
3766 assert!(
3767 names.iter().any(|n| n == "t.name"),
3768 "Expected t.name in downstream, got: {:?}",
3769 names
3770 );
3771 }
3772
3773 #[test]
3774 fn test_lineage_substring_function() {
3775 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3776 let node = lineage("short", &expr, None, false).unwrap();
3777
3778 let names = node.downstream_names();
3779 assert!(
3780 names.iter().any(|n| n == "t.name"),
3781 "Expected t.name in downstream, got: {:?}",
3782 names
3783 );
3784 }
3785
3786 #[test]
3789 fn test_lineage_cte_select_star() {
3790 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3794 let node = lineage("a", &expr, None, false).unwrap();
3795
3796 assert_eq!(node.name, "a");
3797 assert!(
3800 !node.downstream.is_empty(),
3801 "Expected downstream nodes tracing through CTE, got none"
3802 );
3803 }
3804
3805 #[test]
3806 fn test_lineage_cte_select_star_renamed_column() {
3807 let expr =
3810 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3811 let node = lineage("customer_id", &expr, None, false).unwrap();
3812
3813 assert_eq!(node.name, "customer_id");
3814 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3816 assert!(
3817 all_names.len() >= 2,
3818 "Expected at least 2 nodes (customer_id → source), got: {:?}",
3819 all_names
3820 );
3821 }
3822
3823 #[test]
3824 fn test_lineage_cte_select_star_multiple_columns() {
3825 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3827
3828 for col in &["a", "b", "c"] {
3829 let node = lineage(col, &expr, None, false).unwrap();
3830 assert_eq!(node.name, *col);
3831 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3833 assert!(
3834 all_names.len() >= 2,
3835 "Expected at least 2 nodes for column {}, got: {:?}",
3836 col,
3837 all_names
3838 );
3839 }
3840 }
3841
3842 #[test]
3843 fn test_lineage_nested_cte_select_star() {
3844 let expr = parse(
3846 "WITH cte1 AS (SELECT a FROM t), \
3847 cte2 AS (SELECT * FROM cte1) \
3848 SELECT * FROM cte2",
3849 );
3850 let node = lineage("a", &expr, None, false).unwrap();
3851
3852 assert_eq!(node.name, "a");
3853 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3854 assert!(
3855 all_names.len() >= 3,
3856 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3857 all_names
3858 );
3859 }
3860
3861 #[test]
3862 fn test_lineage_three_level_nested_cte_star() {
3863 let expr = parse(
3865 "WITH cte1 AS (SELECT x FROM t), \
3866 cte2 AS (SELECT * FROM cte1), \
3867 cte3 AS (SELECT * FROM cte2) \
3868 SELECT * FROM cte3",
3869 );
3870 let node = lineage("x", &expr, None, false).unwrap();
3871
3872 assert_eq!(node.name, "x");
3873 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3874 assert!(
3875 all_names.len() >= 4,
3876 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3877 all_names
3878 );
3879 }
3880
3881 #[test]
3882 fn test_lineage_cte_union_star() {
3883 let expr = parse(
3885 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3886 SELECT * FROM cte",
3887 );
3888 let node = lineage("a", &expr, None, false).unwrap();
3889
3890 assert_eq!(node.name, "a");
3891 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3892 assert!(
3893 all_names.len() >= 2,
3894 "Expected at least 2 nodes for CTE union star, got: {:?}",
3895 all_names
3896 );
3897 }
3898
3899 #[test]
3900 fn test_lineage_cte_star_unknown_table() {
3901 let expr = parse(
3904 "WITH cte AS (SELECT * FROM unknown_table) \
3905 SELECT * FROM cte",
3906 );
3907 let _result = lineage("x", &expr, None, false);
3910 }
3911
3912 #[test]
3913 fn test_lineage_cte_explicit_columns() {
3914 let expr = parse(
3916 "WITH cte(x, y) AS (SELECT a, b FROM t) \
3917 SELECT * FROM cte",
3918 );
3919 let node = lineage("x", &expr, None, false).unwrap();
3920 assert_eq!(node.name, "x");
3921 }
3922
3923 #[test]
3924 fn test_lineage_cte_qualified_star() {
3925 let expr = parse(
3927 "WITH cte AS (SELECT a, b FROM t) \
3928 SELECT cte.* FROM cte",
3929 );
3930 for col in &["a", "b"] {
3931 let node = lineage(col, &expr, None, false).unwrap();
3932 assert_eq!(node.name, *col);
3933 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3934 assert!(
3935 all_names.len() >= 2,
3936 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3937 col,
3938 all_names
3939 );
3940 }
3941 }
3942
3943 #[test]
3944 fn test_lineage_subquery_select_star() {
3945 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3948 let node = lineage("x", &expr, None, false).unwrap();
3949
3950 assert_eq!(node.name, "x");
3951 assert!(
3952 !node.downstream.is_empty(),
3953 "Expected downstream nodes for subquery with SELECT *, got none"
3954 );
3955 }
3956
3957 #[test]
3958 fn test_lineage_cte_star_with_schema_external_table() {
3959 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3961SELECT * FROM orders"#;
3962 let expr = parse(sql);
3963
3964 let mut schema = MappingSchema::new();
3965 let cols = vec![
3966 ("order_id".to_string(), DataType::Unknown),
3967 ("customer_id".to_string(), DataType::Unknown),
3968 ("amount".to_string(), DataType::Unknown),
3969 ];
3970 schema.add_table("stg_orders", &cols, None).unwrap();
3971
3972 let node =
3973 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3974 .unwrap();
3975 assert_eq!(node.name, "order_id");
3976 }
3977
3978 #[test]
3979 fn test_lineage_cte_star_with_schema_three_part_name() {
3980 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3982SELECT * FROM orders"#;
3983 let expr = parse(sql);
3984
3985 let mut schema = MappingSchema::new();
3986 let cols = vec![
3987 ("order_id".to_string(), DataType::Unknown),
3988 ("customer_id".to_string(), DataType::Unknown),
3989 ];
3990 schema
3991 .add_table("db.schema.stg_orders", &cols, None)
3992 .unwrap();
3993
3994 let node = lineage_with_schema(
3995 "customer_id",
3996 &expr,
3997 Some(&schema as &dyn Schema),
3998 None,
3999 false,
4000 )
4001 .unwrap();
4002 assert_eq!(node.name, "customer_id");
4003 }
4004
4005 #[test]
4006 fn test_lineage_cte_star_with_schema_nested() {
4007 let sql = r#"WITH
4010 raw AS (SELECT * FROM external_table),
4011 enriched AS (SELECT * FROM raw)
4012 SELECT * FROM enriched"#;
4013 let expr = parse(sql);
4014
4015 let mut schema = MappingSchema::new();
4016 let cols = vec![
4017 ("id".to_string(), DataType::Unknown),
4018 ("name".to_string(), DataType::Unknown),
4019 ];
4020 schema.add_table("external_table", &cols, None).unwrap();
4021
4022 let node =
4023 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4024 assert_eq!(node.name, "name");
4025 }
4026
4027 #[test]
4028 fn test_lineage_cte_qualified_star_with_schema() {
4029 let sql = r#"WITH
4032 orders AS (SELECT * FROM stg_orders),
4033 enriched AS (
4034 SELECT orders.*, 'extra' AS extra
4035 FROM orders
4036 )
4037 SELECT * FROM enriched"#;
4038 let expr = parse(sql);
4039
4040 let mut schema = MappingSchema::new();
4041 let cols = vec![
4042 ("order_id".to_string(), DataType::Unknown),
4043 ("total".to_string(), DataType::Unknown),
4044 ];
4045 schema.add_table("stg_orders", &cols, None).unwrap();
4046
4047 let node =
4048 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4049 .unwrap();
4050 assert_eq!(node.name, "order_id");
4051
4052 let extra =
4054 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4055 assert_eq!(extra.name, "extra");
4056 }
4057
4058 #[test]
4059 fn test_lineage_cte_star_without_schema_still_works() {
4060 let sql = r#"WITH
4062 cte1 AS (SELECT id, name FROM raw_table),
4063 cte2 AS (SELECT * FROM cte1)
4064 SELECT * FROM cte2"#;
4065 let expr = parse(sql);
4066
4067 let node = lineage("id", &expr, None, false).unwrap();
4069 assert_eq!(node.name, "id");
4070 }
4071
4072 #[test]
4073 fn test_lineage_nested_cte_star_with_join_and_schema() {
4074 let sql = r#"WITH
4077base_orders AS (
4078 SELECT * FROM stg_orders
4079),
4080with_payments AS (
4081 SELECT
4082 base_orders.*,
4083 p.amount
4084 FROM base_orders
4085 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4086),
4087final_cte AS (
4088 SELECT * FROM with_payments
4089)
4090SELECT * FROM final_cte"#;
4091 let expr = parse(sql);
4092
4093 let mut schema = MappingSchema::new();
4094 let order_cols = vec![
4095 (
4096 "order_id".to_string(),
4097 crate::expressions::DataType::Unknown,
4098 ),
4099 (
4100 "customer_id".to_string(),
4101 crate::expressions::DataType::Unknown,
4102 ),
4103 ("status".to_string(), crate::expressions::DataType::Unknown),
4104 ];
4105 let pay_cols = vec![
4106 (
4107 "payment_id".to_string(),
4108 crate::expressions::DataType::Unknown,
4109 ),
4110 (
4111 "order_id".to_string(),
4112 crate::expressions::DataType::Unknown,
4113 ),
4114 ("amount".to_string(), crate::expressions::DataType::Unknown),
4115 ];
4116 schema.add_table("stg_orders", &order_cols, None).unwrap();
4117 schema.add_table("stg_payments", &pay_cols, None).unwrap();
4118
4119 let node =
4121 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4122 .unwrap();
4123 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4124
4125 let has_table_qualified = all_names
4127 .iter()
4128 .any(|n| n.contains('.') && n.contains("order_id"));
4129 assert!(
4130 has_table_qualified,
4131 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4132 all_names
4133 );
4134
4135 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4137 .unwrap();
4138 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4139
4140 let has_table_qualified = all_names
4141 .iter()
4142 .any(|n| n.contains('.') && n.contains("amount"));
4143 assert!(
4144 has_table_qualified,
4145 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4146 all_names
4147 );
4148 }
4149
4150 #[test]
4151 fn test_lineage_cte_alias_resolution() {
4152 let sql = r#"WITH import_stg_items AS (
4154 SELECT item_id, name, status FROM stg_items
4155)
4156SELECT base.item_id, base.status
4157FROM import_stg_items AS base"#;
4158 let expr = parse(sql);
4159
4160 let node = lineage("item_id", &expr, None, false).unwrap();
4161 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4162 assert!(
4164 all_names.iter().any(|n| n == "stg_items.item_id"),
4165 "Expected leaf 'stg_items.item_id', got: {:?}",
4166 all_names
4167 );
4168 }
4169
4170 #[test]
4171 fn test_lineage_cte_alias_with_schema_and_star() {
4172 let sql = r#"WITH import_stg AS (
4174 SELECT * FROM stg_items
4175)
4176SELECT base.item_id, base.status
4177FROM import_stg AS base"#;
4178 let expr = parse(sql);
4179
4180 let mut schema = MappingSchema::new();
4181 schema
4182 .add_table(
4183 "stg_items",
4184 &[
4185 ("item_id".to_string(), DataType::Unknown),
4186 ("name".to_string(), DataType::Unknown),
4187 ("status".to_string(), DataType::Unknown),
4188 ],
4189 None,
4190 )
4191 .unwrap();
4192
4193 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
4194 .unwrap();
4195 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4196 assert!(
4197 all_names.iter().any(|n| n == "stg_items.item_id"),
4198 "Expected leaf 'stg_items.item_id', got: {:?}",
4199 all_names
4200 );
4201 }
4202
4203 #[test]
4204 fn test_lineage_cte_alias_with_join() {
4205 let sql = r#"WITH
4207 import_users AS (SELECT id, name FROM users),
4208 import_orders AS (SELECT id, user_id, amount FROM orders)
4209SELECT u.name, o.amount
4210FROM import_users AS u
4211LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
4212 let expr = parse(sql);
4213
4214 let node = lineage("name", &expr, None, false).unwrap();
4215 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4216 assert!(
4217 all_names.iter().any(|n| n == "users.name"),
4218 "Expected leaf 'users.name', got: {:?}",
4219 all_names
4220 );
4221
4222 let node = lineage("amount", &expr, None, false).unwrap();
4223 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4224 assert!(
4225 all_names.iter().any(|n| n == "orders.amount"),
4226 "Expected leaf 'orders.amount', got: {:?}",
4227 all_names
4228 );
4229 }
4230
4231 #[test]
4236 fn test_lineage_unquoted_cte_case_insensitive() {
4237 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
4240 let node = lineage("col", &expr, None, false).unwrap();
4241 assert_eq!(node.name, "col");
4242 assert!(
4243 !node.downstream.is_empty(),
4244 "Unquoted CTE should resolve case-insensitively"
4245 );
4246 }
4247
4248 #[test]
4249 fn test_lineage_quoted_cte_case_preserved() {
4250 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
4252 let node = lineage("col", &expr, None, false).unwrap();
4253 assert_eq!(node.name, "col");
4254 assert!(
4255 !node.downstream.is_empty(),
4256 "Quoted CTE with matching case should resolve"
4257 );
4258 }
4259
4260 #[test]
4261 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
4262 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
4266 let result = lineage("col", &expr, None, false);
4269 assert!(
4270 result.is_err(),
4271 "Quoted CTE with case mismatch should not expand star: {:?}",
4272 result
4273 );
4274 }
4275
4276 #[test]
4277 fn test_lineage_mixed_quoted_unquoted_cte() {
4278 let expr = parse(
4280 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
4281 );
4282 let node = lineage("a", &expr, None, false).unwrap();
4283 assert_eq!(node.name, "a");
4284 assert!(
4285 !node.downstream.is_empty(),
4286 "Mixed quoted/unquoted CTE chain should resolve"
4287 );
4288 }
4289
4290 #[test]
4306 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
4307 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
4318 let node = lineage("col", &expr, None, false).unwrap();
4319 assert!(!node.downstream.is_empty());
4320 let child = &node.downstream[0];
4321 assert_eq!(
4323 child.source_name, "MyCte",
4324 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4325 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4326 );
4327 }
4328
4329 #[test]
4330 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4331 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4338 let node = lineage("col", &expr, None, false).unwrap();
4339 assert!(!node.downstream.is_empty());
4340 let child = &node.downstream[0];
4341 assert_eq!(
4343 child.source_name, "MyCte",
4344 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4345 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4346 );
4347 }
4348
4349 #[test]
4356 #[ignore = "requires derived table star expansion (separate issue)"]
4357 fn test_node_name_doesnt_contain_comment() {
4358 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4359 let node = lineage("x", &expr, None, false).unwrap();
4360
4361 assert_eq!(node.name, "x");
4362 assert!(!node.downstream.is_empty());
4363 }
4364
4365 #[test]
4369 fn test_comment_before_first_column_in_cte() {
4370 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
4371 let sql_without_comment = "with t as (select 1 as a) select a from t";
4372
4373 let expr_ok = parse(sql_without_comment);
4375 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4376
4377 let expr_comment = parse(sql_with_comment);
4379 let node_comment = lineage("a", &expr_comment, None, false)
4380 .expect("with comment before first column should succeed");
4381
4382 assert_eq!(node_ok.name, node_comment.name, "node names should match");
4383 assert_eq!(
4384 node_ok.downstream_names(),
4385 node_comment.downstream_names(),
4386 "downstream lineage should be identical with or without comment"
4387 );
4388 }
4389
4390 #[test]
4392 fn test_block_comment_before_first_column() {
4393 let sql = "with t as (select 1 as a) select /* section */ a from t";
4394 let expr = parse(sql);
4395 let node = lineage("a", &expr, None, false)
4396 .expect("block comment before first column should succeed");
4397 assert_eq!(node.name, "a");
4398 assert!(
4399 !node.downstream.is_empty(),
4400 "should have downstream lineage"
4401 );
4402 }
4403
4404 #[test]
4406 fn test_comment_before_first_column_second_col_ok() {
4407 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
4408 let expr = parse(sql);
4409
4410 let node_a =
4411 lineage("a", &expr, None, false).expect("column a with comment should succeed");
4412 assert_eq!(node_a.name, "a");
4413
4414 let node_b =
4415 lineage("b", &expr, None, false).expect("column b with comment should succeed");
4416 assert_eq!(node_b.name, "b");
4417 }
4418
4419 #[test]
4421 fn test_comment_before_aliased_column() {
4422 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
4423 let expr = parse(sql);
4424 let node =
4425 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4426 assert_eq!(node.name, "y");
4427 assert!(
4428 !node.downstream.is_empty(),
4429 "aliased column should have downstream lineage"
4430 );
4431 }
4432}