1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub reference_node_name: String,
34}
35
36impl LineageNode {
37 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39 Self {
40 name: name.into(),
41 expression,
42 source,
43 downstream: Vec::new(),
44 source_name: String::new(),
45 reference_node_name: String::new(),
46 }
47 }
48
49 pub fn walk(&self) -> LineageWalker<'_> {
51 LineageWalker { stack: vec![self] }
52 }
53
54 pub fn downstream_names(&self) -> Vec<String> {
56 self.downstream.iter().map(|n| n.name.clone()).collect()
57 }
58}
59
60pub struct LineageWalker<'a> {
62 stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66 type Item = &'a LineageNode;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 if let Some(node) = self.stack.pop() {
70 for child in node.downstream.iter().rev() {
72 self.stack.push(child);
73 }
74 Some(node)
75 } else {
76 None
77 }
78 }
79}
80
81enum ColumnRef<'a> {
87 Name(&'a str),
88 Index(usize),
89}
90
91pub fn lineage(
117 column: &str,
118 sql: &Expression,
119 dialect: Option<DialectType>,
120 trim_selects: bool,
121) -> Result<LineageNode> {
122 let has_with = matches!(sql, Expression::Select(s) if s.with.is_some());
124 if !has_with {
125 return lineage_from_expression(column, sql, dialect, trim_selects);
126 }
127 let mut owned = sql.clone();
128 expand_cte_stars(&mut owned, None);
129 lineage_from_expression(column, &owned, dialect, trim_selects)
130}
131
132pub fn lineage_with_schema(
148 column: &str,
149 sql: &Expression,
150 schema: Option<&dyn Schema>,
151 dialect: Option<DialectType>,
152 trim_selects: bool,
153) -> Result<LineageNode> {
154 let mut qualified_expression = if let Some(schema) = schema {
155 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
156 QualifyColumnsOptions::new().with_dialect(dialect_type)
157 } else {
158 QualifyColumnsOptions::new()
159 };
160
161 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
162 Error::internal(format!("Lineage qualification failed with schema: {}", e))
163 })?
164 } else {
165 sql.clone()
166 };
167
168 annotate_types(&mut qualified_expression, schema, dialect);
170
171 expand_cte_stars(&mut qualified_expression, schema);
174
175 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
176}
177
178fn lineage_from_expression(
179 column: &str,
180 sql: &Expression,
181 dialect: Option<DialectType>,
182 trim_selects: bool,
183) -> Result<LineageNode> {
184 let scope = build_scope(sql);
185 to_node(
186 ColumnRef::Name(column),
187 &scope,
188 dialect,
189 "",
190 "",
191 "",
192 trim_selects,
193 )
194}
195
196fn normalize_cte_name(ident: &Identifier) -> String {
206 if ident.quoted {
207 ident.name.clone()
208 } else {
209 ident.name.to_lowercase()
210 }
211}
212
213pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
225 let select = match expr {
226 Expression::Select(s) => s,
227 _ => return,
228 };
229
230 let with = match &mut select.with {
231 Some(w) => w,
232 None => return,
233 };
234
235 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
236
237 for cte in &mut with.ctes {
238 let cte_name = normalize_cte_name(&cte.alias);
239
240 if !cte.columns.is_empty() {
242 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
243 resolved_cte_columns.insert(cte_name, cols);
244 continue;
245 }
246
247 if with.recursive {
253 let is_self_referencing = if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
254 let body_sources = get_select_sources(body_select);
255 body_sources.iter().any(|s| s.normalized == cte_name)
256 } else {
257 false
258 };
259 if is_self_referencing {
260 continue;
261 }
262 }
263
264 let body_select = match get_leftmost_select_mut(&mut cte.this) {
266 Some(s) => s,
267 None => continue,
268 };
269
270 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
271 resolved_cte_columns.insert(cte_name, columns);
272 }
273
274 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
276}
277
278fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
283 let mut current = expr;
284 for _ in 0..MAX_LINEAGE_DEPTH {
285 match current {
286 Expression::Select(s) => return Some(s),
287 Expression::Union(u) => current = &mut u.left,
288 Expression::Intersect(i) => current = &mut i.left,
289 Expression::Except(e) => current = &mut e.left,
290 Expression::Paren(p) => current = &mut p.this,
291 _ => return None,
292 }
293 }
294 None
295}
296
297fn rewrite_stars_in_select(
301 select: &mut Select,
302 resolved_ctes: &HashMap<String, Vec<String>>,
303 schema: Option<&dyn Schema>,
304) -> Vec<String> {
305 let has_star = select
310 .expressions
311 .iter()
312 .any(|e| matches!(e, Expression::Star(_)));
313 let has_qualified_star = select
314 .expressions
315 .iter()
316 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
317
318 if !has_star && !has_qualified_star {
319 return select
321 .expressions
322 .iter()
323 .filter_map(get_expression_output_name)
324 .collect();
325 }
326
327 let sources = get_select_sources(select);
328 let mut new_expressions = Vec::new();
329 let mut result_columns = Vec::new();
330
331 for expr in &select.expressions {
332 match expr {
333 Expression::Star(star) => {
334 let qual = star.table.as_ref();
335 if let Some(expanded) = expand_star_from_sources(
336 qual,
337 &sources,
338 resolved_ctes,
339 schema,
340 ) {
341 for (src_alias, col_name) in &expanded {
342 let table_id = Identifier::new(src_alias);
343 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
344 result_columns.push(col_name.clone());
345 }
346 } else {
347 new_expressions.push(expr.clone());
348 result_columns.push("*".to_string());
349 }
350 }
351 Expression::Column(c) if c.name.name == "*" => {
352 let qual = c.table.as_ref();
353 if let Some(expanded) = expand_star_from_sources(
354 qual,
355 &sources,
356 resolved_ctes,
357 schema,
358 ) {
359 for (_src_alias, col_name) in &expanded {
360 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
362 result_columns.push(col_name.clone());
363 }
364 } else {
365 new_expressions.push(expr.clone());
366 result_columns.push("*".to_string());
367 }
368 }
369 _ => {
370 new_expressions.push(expr.clone());
371 if let Some(name) = get_expression_output_name(expr) {
372 result_columns.push(name);
373 }
374 }
375 }
376 }
377
378 select.expressions = new_expressions;
379 result_columns
380}
381
382fn expand_star_from_sources(
387 qualifier: Option<&Identifier>,
388 sources: &[SourceInfo],
389 resolved_ctes: &HashMap<String, Vec<String>>,
390 schema: Option<&dyn Schema>,
391) -> Option<Vec<(String, String)>> {
392 let mut expanded = Vec::new();
393
394 if let Some(qual) = qualifier {
395 let qual_normalized = normalize_cte_name(qual);
397 for src in sources {
398 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
399 if let Some(cols) = resolved_ctes.get(&src.normalized) {
401 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
402 return Some(expanded);
403 }
404 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
406 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
407 return Some(expanded);
408 }
409 }
410 }
411 None
412 } else {
413 let mut any_expanded = false;
419 for src in sources {
420 if let Some(cols) = resolved_ctes.get(&src.normalized) {
421 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
422 any_expanded = true;
423 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
424 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
425 any_expanded = true;
426 } else {
427 return None;
428 }
429 }
430 if any_expanded {
431 Some(expanded)
432 } else {
433 None
434 }
435 }
436}
437
438fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
440 let schema = schema?;
441 if fq_name.is_empty() {
442 return None;
443 }
444 schema.column_names(fq_name).ok().filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
445}
446
447fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
449 Expression::Column(Box::new(crate::expressions::Column {
450 name: Identifier::new(name),
451 table: table.cloned(),
452 join_mark: false,
453 trailing_comments: Vec::new(),
454 span: None,
455 inferred_type: None,
456 }))
457}
458
459fn get_expression_output_name(expr: &Expression) -> Option<String> {
461 match expr {
462 Expression::Alias(a) => Some(a.alias.name.clone()),
463 Expression::Column(c) => Some(c.name.name.clone()),
464 Expression::Identifier(id) => Some(id.name.clone()),
465 Expression::Star(_) => Some("*".to_string()),
466 _ => None,
467 }
468}
469
470struct SourceInfo {
472 alias: String,
473 normalized: String,
475 fq_name: String,
477}
478
479fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
482 let mut sources = Vec::new();
483
484 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
485 match expr {
486 Expression::Table(t) => {
487 let normalized = normalize_cte_name(&t.name);
488 let alias = t
489 .alias
490 .as_ref()
491 .map(|a| a.name.clone())
492 .unwrap_or_else(|| t.name.name.clone());
493 let mut parts = Vec::new();
494 if let Some(catalog) = &t.catalog {
495 parts.push(catalog.name.clone());
496 }
497 if let Some(schema) = &t.schema {
498 parts.push(schema.name.clone());
499 }
500 parts.push(t.name.name.clone());
501 let fq_name = parts.join(".");
502 Some(SourceInfo { alias, normalized, fq_name })
503 }
504 Expression::Subquery(s) => {
505 let alias = s.alias.as_ref()?.name.clone();
506 let normalized = alias.to_lowercase();
507 let fq_name = alias.clone();
508 Some(SourceInfo { alias, normalized, fq_name })
509 }
510 Expression::Paren(p) => extract_source(&p.this),
511 _ => None,
512 }
513 }
514
515 if let Some(from) = &select.from {
516 for expr in &from.expressions {
517 if let Some(info) = extract_source(expr) {
518 sources.push(info);
519 }
520 }
521 }
522 for join in &select.joins {
523 if let Some(info) = extract_source(&join.this) {
524 sources.push(info);
525 }
526 }
527 sources
528}
529
530pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
532 let mut tables = HashSet::new();
533 collect_source_tables(node, &mut tables);
534 tables
535}
536
537pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
539 if let Expression::Table(table) = &node.source {
540 tables.insert(table.name.name.clone());
541 }
542 for child in &node.downstream {
543 collect_source_tables(child, tables);
544 }
545}
546
547const MAX_LINEAGE_DEPTH: usize = 64;
554
555fn to_node(
557 column: ColumnRef<'_>,
558 scope: &Scope,
559 dialect: Option<DialectType>,
560 scope_name: &str,
561 source_name: &str,
562 reference_node_name: &str,
563 trim_selects: bool,
564) -> Result<LineageNode> {
565 to_node_inner(
566 column,
567 scope,
568 dialect,
569 scope_name,
570 source_name,
571 reference_node_name,
572 trim_selects,
573 &[],
574 0,
575 )
576}
577
578fn to_node_inner(
579 column: ColumnRef<'_>,
580 scope: &Scope,
581 dialect: Option<DialectType>,
582 scope_name: &str,
583 source_name: &str,
584 reference_node_name: &str,
585 trim_selects: bool,
586 ancestor_cte_scopes: &[Scope],
587 depth: usize,
588) -> Result<LineageNode> {
589 if depth > MAX_LINEAGE_DEPTH {
590 return Err(Error::internal(format!(
591 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
592 )));
593 }
594 let scope_expr = &scope.expression;
595
596 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
598 for s in ancestor_cte_scopes {
599 all_cte_scopes.push(s);
600 }
601
602 let effective_expr = match scope_expr {
605 Expression::Cte(cte) => &cte.this,
606 other => other,
607 };
608
609 if matches!(
611 effective_expr,
612 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
613 ) {
614 if matches!(scope_expr, Expression::Cte(_)) {
616 let mut inner_scope = Scope::new(effective_expr.clone());
617 inner_scope.union_scopes = scope.union_scopes.clone();
618 inner_scope.sources = scope.sources.clone();
619 inner_scope.cte_sources = scope.cte_sources.clone();
620 inner_scope.cte_scopes = scope.cte_scopes.clone();
621 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
622 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
623 return handle_set_operation(
624 &column,
625 &inner_scope,
626 dialect,
627 scope_name,
628 source_name,
629 reference_node_name,
630 trim_selects,
631 ancestor_cte_scopes,
632 depth,
633 );
634 }
635 return handle_set_operation(
636 &column,
637 scope,
638 dialect,
639 scope_name,
640 source_name,
641 reference_node_name,
642 trim_selects,
643 ancestor_cte_scopes,
644 depth,
645 );
646 }
647
648 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
650 let column_name = resolve_column_name(&column, &select_expr);
651
652 let node_source = if trim_selects {
654 trim_source(effective_expr, &select_expr)
655 } else {
656 effective_expr.clone()
657 };
658
659 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
661 node.source_name = source_name.to_string();
662 node.reference_node_name = reference_node_name.to_string();
663
664 if matches!(&select_expr, Expression::Star(_)) {
666 for (name, source_info) in &scope.sources {
667 let child = LineageNode::new(
668 format!("{}.*", name),
669 Expression::Star(crate::expressions::Star {
670 table: None,
671 except: None,
672 replace: None,
673 rename: None,
674 trailing_comments: vec![],
675 span: None,
676 }),
677 source_info.expression.clone(),
678 );
679 node.downstream.push(child);
680 }
681 return Ok(node);
682 }
683
684 let subqueries: Vec<&Expression> =
686 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
687 for sq_expr in subqueries {
688 if let Expression::Subquery(sq) = sq_expr {
689 for sq_scope in &scope.subquery_scopes {
690 if sq_scope.expression == sq.this {
691 if let Ok(child) = to_node_inner(
692 ColumnRef::Index(0),
693 sq_scope,
694 dialect,
695 &column_name,
696 "",
697 "",
698 trim_selects,
699 ancestor_cte_scopes,
700 depth + 1,
701 ) {
702 node.downstream.push(child);
703 }
704 break;
705 }
706 }
707 }
708 }
709
710 let col_refs = find_column_refs_in_expr(&select_expr);
712 for col_ref in col_refs {
713 let col_name = &col_ref.column;
714 if let Some(ref table_id) = col_ref.table {
715 let tbl = &table_id.name;
716 resolve_qualified_column(
717 &mut node,
718 scope,
719 dialect,
720 tbl,
721 col_name,
722 &column_name,
723 trim_selects,
724 &all_cte_scopes,
725 depth,
726 );
727 } else {
728 resolve_unqualified_column(
729 &mut node,
730 scope,
731 dialect,
732 col_name,
733 &column_name,
734 trim_selects,
735 &all_cte_scopes,
736 depth,
737 );
738 }
739 }
740
741 Ok(node)
742}
743
744fn handle_set_operation(
749 column: &ColumnRef<'_>,
750 scope: &Scope,
751 dialect: Option<DialectType>,
752 scope_name: &str,
753 source_name: &str,
754 reference_node_name: &str,
755 trim_selects: bool,
756 ancestor_cte_scopes: &[Scope],
757 depth: usize,
758) -> Result<LineageNode> {
759 let scope_expr = &scope.expression;
760
761 let col_index = match column {
763 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
764 ColumnRef::Index(i) => *i,
765 };
766
767 let col_name = match column {
768 ColumnRef::Name(name) => name.to_string(),
769 ColumnRef::Index(_) => format!("_{col_index}"),
770 };
771
772 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
773 node.source_name = source_name.to_string();
774 node.reference_node_name = reference_node_name.to_string();
775
776 for branch_scope in &scope.union_scopes {
778 if let Ok(child) = to_node_inner(
779 ColumnRef::Index(col_index),
780 branch_scope,
781 dialect,
782 scope_name,
783 "",
784 "",
785 trim_selects,
786 ancestor_cte_scopes,
787 depth + 1,
788 ) {
789 node.downstream.push(child);
790 }
791 }
792
793 Ok(node)
794}
795
796fn resolve_qualified_column(
801 node: &mut LineageNode,
802 scope: &Scope,
803 dialect: Option<DialectType>,
804 table: &str,
805 col_name: &str,
806 parent_name: &str,
807 trim_selects: bool,
808 all_cte_scopes: &[&Scope],
809 depth: usize,
810) {
811 let resolved_cte_name = resolve_cte_alias(scope, table);
814 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
815
816 let is_cte = scope.cte_sources.contains_key(effective_table)
819 || all_cte_scopes.iter().any(|s| {
820 matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table)
821 });
822 if is_cte {
823 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
824 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
826 if let Ok(child) = to_node_inner(
827 ColumnRef::Name(col_name),
828 child_scope,
829 dialect,
830 parent_name,
831 effective_table,
832 parent_name,
833 trim_selects,
834 &ancestors,
835 depth + 1,
836 ) {
837 node.downstream.push(child);
838 return;
839 }
840 }
841 }
842
843 if let Some(source_info) = scope.sources.get(table) {
845 if source_info.is_scope {
846 if let Some(child_scope) = find_child_scope(scope, table) {
847 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
848 if let Ok(child) = to_node_inner(
849 ColumnRef::Name(col_name),
850 child_scope,
851 dialect,
852 parent_name,
853 table,
854 parent_name,
855 trim_selects,
856 &ancestors,
857 depth + 1,
858 ) {
859 node.downstream.push(child);
860 return;
861 }
862 }
863 }
864 }
865
866 if let Some(source_info) = scope.sources.get(table) {
869 if !source_info.is_scope {
870 node.downstream.push(make_table_column_node_from_source(
871 table,
872 col_name,
873 &source_info.expression,
874 ));
875 return;
876 }
877 }
878
879 node.downstream
881 .push(make_table_column_node(table, col_name));
882}
883
884fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
890 if scope.cte_sources.contains_key(name) {
892 return None;
893 }
894 if let Some(source_info) = scope.sources.get(name) {
896 if source_info.is_scope {
897 if let Expression::Cte(cte) = &source_info.expression {
898 let cte_name = &cte.alias.name;
899 if scope.cte_sources.contains_key(cte_name) {
900 return Some(cte_name.clone());
901 }
902 }
903 }
904 }
905 None
906}
907
908fn resolve_unqualified_column(
909 node: &mut LineageNode,
910 scope: &Scope,
911 dialect: Option<DialectType>,
912 col_name: &str,
913 parent_name: &str,
914 trim_selects: bool,
915 all_cte_scopes: &[&Scope],
916 depth: usize,
917) {
918 let from_source_names = source_names_from_from_join(scope);
922
923 if from_source_names.len() == 1 {
924 let tbl = &from_source_names[0];
925 resolve_qualified_column(
926 node,
927 scope,
928 dialect,
929 tbl,
930 col_name,
931 parent_name,
932 trim_selects,
933 all_cte_scopes,
934 depth,
935 );
936 return;
937 }
938
939 let child = LineageNode::new(
941 col_name.to_string(),
942 Expression::Column(Box::new(crate::expressions::Column {
943 name: crate::expressions::Identifier::new(col_name.to_string()),
944 table: None,
945 join_mark: false,
946 trailing_comments: vec![],
947 span: None,
948 inferred_type: None,
949 })),
950 node.source.clone(),
951 );
952 node.downstream.push(child);
953}
954
955fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
956 fn source_name(expr: &Expression) -> Option<String> {
957 match expr {
958 Expression::Table(table) => Some(
959 table
960 .alias
961 .as_ref()
962 .map(|a| a.name.clone())
963 .unwrap_or_else(|| table.name.name.clone()),
964 ),
965 Expression::Subquery(subquery) => {
966 subquery.alias.as_ref().map(|alias| alias.name.clone())
967 }
968 Expression::Paren(paren) => source_name(&paren.this),
969 _ => None,
970 }
971 }
972
973 let effective_expr = match &scope.expression {
974 Expression::Cte(cte) => &cte.this,
975 expr => expr,
976 };
977
978 let mut names = Vec::new();
979 let mut seen = std::collections::HashSet::new();
980
981 if let Expression::Select(select) = effective_expr {
982 if let Some(from) = &select.from {
983 for expr in &from.expressions {
984 if let Some(name) = source_name(expr) {
985 if !name.is_empty() && seen.insert(name.clone()) {
986 names.push(name);
987 }
988 }
989 }
990 }
991 for join in &select.joins {
992 if let Some(name) = source_name(&join.this) {
993 if !name.is_empty() && seen.insert(name.clone()) {
994 names.push(name);
995 }
996 }
997 }
998 }
999
1000 names
1001}
1002
1003fn get_alias_or_name(expr: &Expression) -> Option<String> {
1009 match expr {
1010 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1011 Expression::Column(col) => Some(col.name.name.clone()),
1012 Expression::Identifier(id) => Some(id.name.clone()),
1013 Expression::Star(_) => Some("*".to_string()),
1014 Expression::Annotated(a) => get_alias_or_name(&a.this),
1017 _ => None,
1018 }
1019}
1020
1021fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1023 match column {
1024 ColumnRef::Name(n) => n.to_string(),
1025 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1026 }
1027}
1028
1029fn find_select_expr(
1031 scope_expr: &Expression,
1032 column: &ColumnRef<'_>,
1033 dialect: Option<DialectType>,
1034) -> Result<Expression> {
1035 if let Expression::Select(ref select) = scope_expr {
1036 match column {
1037 ColumnRef::Name(name) => {
1038 let normalized_name = normalize_column_name(name, dialect);
1039 for expr in &select.expressions {
1040 if let Some(alias_or_name) = get_alias_or_name(expr) {
1041 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1042 return Ok(expr.clone());
1043 }
1044 }
1045 }
1046 Err(crate::error::Error::parse(
1047 format!("Cannot find column '{}' in query", name),
1048 0,
1049 0,
1050 0,
1051 0,
1052 ))
1053 }
1054 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1055 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1056 }),
1057 }
1058 } else {
1059 Err(crate::error::Error::parse(
1060 "Expected SELECT expression for column lookup",
1061 0,
1062 0,
1063 0,
1064 0,
1065 ))
1066 }
1067}
1068
1069fn column_to_index(
1071 set_op_expr: &Expression,
1072 name: &str,
1073 dialect: Option<DialectType>,
1074) -> Result<usize> {
1075 let normalized_name = normalize_column_name(name, dialect);
1076 let mut expr = set_op_expr;
1077 loop {
1078 match expr {
1079 Expression::Union(u) => expr = &u.left,
1080 Expression::Intersect(i) => expr = &i.left,
1081 Expression::Except(e) => expr = &e.left,
1082 Expression::Select(select) => {
1083 for (i, e) in select.expressions.iter().enumerate() {
1084 if let Some(alias_or_name) = get_alias_or_name(e) {
1085 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1086 return Ok(i);
1087 }
1088 }
1089 }
1090 return Err(crate::error::Error::parse(
1091 format!("Cannot find column '{}' in set operation", name),
1092 0,
1093 0,
1094 0,
1095 0,
1096 ));
1097 }
1098 _ => {
1099 return Err(crate::error::Error::parse(
1100 "Expected SELECT or set operation",
1101 0,
1102 0,
1103 0,
1104 0,
1105 ))
1106 }
1107 }
1108 }
1109}
1110
1111fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1112 normalize_name(name, dialect, false, true)
1113}
1114
1115fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1117 if let Expression::Select(select) = select_expr {
1118 let mut trimmed = select.as_ref().clone();
1119 trimmed.expressions = vec![target_expr.clone()];
1120 Expression::Select(Box::new(trimmed))
1121 } else {
1122 select_expr.clone()
1123 }
1124}
1125
1126fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1128 if scope.cte_sources.contains_key(source_name) {
1130 for cte_scope in &scope.cte_scopes {
1131 if let Expression::Cte(cte) = &cte_scope.expression {
1132 if cte.alias.name == source_name {
1133 return Some(cte_scope);
1134 }
1135 }
1136 }
1137 }
1138
1139 if let Some(source_info) = scope.sources.get(source_name) {
1141 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1142 if let Expression::Subquery(sq) = &source_info.expression {
1143 for dt_scope in &scope.derived_table_scopes {
1144 if dt_scope.expression == sq.this {
1145 return Some(dt_scope);
1146 }
1147 }
1148 }
1149 }
1150 }
1151
1152 None
1153}
1154
1155fn find_child_scope_in<'a>(
1159 all_cte_scopes: &[&'a Scope],
1160 scope: &'a Scope,
1161 source_name: &str,
1162) -> Option<&'a Scope> {
1163 for cte_scope in &scope.cte_scopes {
1165 if let Expression::Cte(cte) = &cte_scope.expression {
1166 if cte.alias.name == source_name {
1167 return Some(cte_scope);
1168 }
1169 }
1170 }
1171
1172 for cte_scope in all_cte_scopes {
1174 if let Expression::Cte(cte) = &cte_scope.expression {
1175 if cte.alias.name == source_name {
1176 return Some(cte_scope);
1177 }
1178 }
1179 }
1180
1181 if let Some(source_info) = scope.sources.get(source_name) {
1183 if source_info.is_scope {
1184 if let Expression::Subquery(sq) = &source_info.expression {
1185 for dt_scope in &scope.derived_table_scopes {
1186 if dt_scope.expression == sq.this {
1187 return Some(dt_scope);
1188 }
1189 }
1190 }
1191 }
1192 }
1193
1194 None
1195}
1196
1197fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1199 let mut node = LineageNode::new(
1200 format!("{}.{}", table, column),
1201 Expression::Column(Box::new(crate::expressions::Column {
1202 name: crate::expressions::Identifier::new(column.to_string()),
1203 table: Some(crate::expressions::Identifier::new(table.to_string())),
1204 join_mark: false,
1205 trailing_comments: vec![],
1206 span: None,
1207 inferred_type: None,
1208 })),
1209 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1210 );
1211 node.source_name = table.to_string();
1212 node
1213}
1214
1215fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1216 let mut parts: Vec<String> = Vec::new();
1217 if let Some(catalog) = &table_ref.catalog {
1218 parts.push(catalog.name.clone());
1219 }
1220 if let Some(schema) = &table_ref.schema {
1221 parts.push(schema.name.clone());
1222 }
1223 parts.push(table_ref.name.name.clone());
1224 parts.join(".")
1225}
1226
1227fn make_table_column_node_from_source(
1228 table_alias: &str,
1229 column: &str,
1230 source: &Expression,
1231) -> LineageNode {
1232 let mut node = LineageNode::new(
1233 format!("{}.{}", table_alias, column),
1234 Expression::Column(Box::new(crate::expressions::Column {
1235 name: crate::expressions::Identifier::new(column.to_string()),
1236 table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
1237 join_mark: false,
1238 trailing_comments: vec![],
1239 span: None,
1240 inferred_type: None,
1241 })),
1242 source.clone(),
1243 );
1244
1245 if let Expression::Table(table_ref) = source {
1246 node.source_name = table_name_from_table_ref(table_ref);
1247 } else {
1248 node.source_name = table_alias.to_string();
1249 }
1250
1251 node
1252}
1253
1254#[derive(Debug, Clone)]
1256struct SimpleColumnRef {
1257 table: Option<crate::expressions::Identifier>,
1258 column: String,
1259}
1260
1261fn find_column_refs_in_expr(expr: &Expression) -> Vec<SimpleColumnRef> {
1263 let mut refs = Vec::new();
1264 collect_column_refs(expr, &mut refs);
1265 refs
1266}
1267
1268fn collect_column_refs(expr: &Expression, refs: &mut Vec<SimpleColumnRef>) {
1269 let mut stack: Vec<&Expression> = vec![expr];
1270
1271 while let Some(current) = stack.pop() {
1272 match current {
1273 Expression::Column(col) => {
1275 refs.push(SimpleColumnRef {
1276 table: col.table.clone(),
1277 column: col.name.name.clone(),
1278 });
1279 }
1280
1281 Expression::Subquery(_) | Expression::Exists(_) => {}
1283
1284 Expression::And(op)
1286 | Expression::Or(op)
1287 | Expression::Eq(op)
1288 | Expression::Neq(op)
1289 | Expression::Lt(op)
1290 | Expression::Lte(op)
1291 | Expression::Gt(op)
1292 | Expression::Gte(op)
1293 | Expression::Add(op)
1294 | Expression::Sub(op)
1295 | Expression::Mul(op)
1296 | Expression::Div(op)
1297 | Expression::Mod(op)
1298 | Expression::BitwiseAnd(op)
1299 | Expression::BitwiseOr(op)
1300 | Expression::BitwiseXor(op)
1301 | Expression::BitwiseLeftShift(op)
1302 | Expression::BitwiseRightShift(op)
1303 | Expression::Concat(op)
1304 | Expression::Adjacent(op)
1305 | Expression::TsMatch(op)
1306 | Expression::PropertyEQ(op)
1307 | Expression::ArrayContainsAll(op)
1308 | Expression::ArrayContainedBy(op)
1309 | Expression::ArrayOverlaps(op)
1310 | Expression::JSONBContainsAllTopKeys(op)
1311 | Expression::JSONBContainsAnyTopKeys(op)
1312 | Expression::JSONBDeleteAtPath(op)
1313 | Expression::ExtendsLeft(op)
1314 | Expression::ExtendsRight(op)
1315 | Expression::Is(op)
1316 | Expression::MemberOf(op)
1317 | Expression::NullSafeEq(op)
1318 | Expression::NullSafeNeq(op)
1319 | Expression::Glob(op)
1320 | Expression::Match(op) => {
1321 stack.push(&op.left);
1322 stack.push(&op.right);
1323 }
1324
1325 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1327 stack.push(&u.this);
1328 }
1329
1330 Expression::Upper(f)
1332 | Expression::Lower(f)
1333 | Expression::Length(f)
1334 | Expression::LTrim(f)
1335 | Expression::RTrim(f)
1336 | Expression::Reverse(f)
1337 | Expression::Abs(f)
1338 | Expression::Sqrt(f)
1339 | Expression::Cbrt(f)
1340 | Expression::Ln(f)
1341 | Expression::Exp(f)
1342 | Expression::Sign(f)
1343 | Expression::Date(f)
1344 | Expression::Time(f)
1345 | Expression::DateFromUnixDate(f)
1346 | Expression::UnixDate(f)
1347 | Expression::UnixSeconds(f)
1348 | Expression::UnixMillis(f)
1349 | Expression::UnixMicros(f)
1350 | Expression::TimeStrToDate(f)
1351 | Expression::DateToDi(f)
1352 | Expression::DiToDate(f)
1353 | Expression::TsOrDiToDi(f)
1354 | Expression::TsOrDsToDatetime(f)
1355 | Expression::TsOrDsToTimestamp(f)
1356 | Expression::YearOfWeek(f)
1357 | Expression::YearOfWeekIso(f)
1358 | Expression::Initcap(f)
1359 | Expression::Ascii(f)
1360 | Expression::Chr(f)
1361 | Expression::Soundex(f)
1362 | Expression::ByteLength(f)
1363 | Expression::Hex(f)
1364 | Expression::LowerHex(f)
1365 | Expression::Unicode(f)
1366 | Expression::Radians(f)
1367 | Expression::Degrees(f)
1368 | Expression::Sin(f)
1369 | Expression::Cos(f)
1370 | Expression::Tan(f)
1371 | Expression::Asin(f)
1372 | Expression::Acos(f)
1373 | Expression::Atan(f)
1374 | Expression::IsNan(f)
1375 | Expression::IsInf(f)
1376 | Expression::ArrayLength(f)
1377 | Expression::ArraySize(f)
1378 | Expression::Cardinality(f)
1379 | Expression::ArrayReverse(f)
1380 | Expression::ArrayDistinct(f)
1381 | Expression::ArrayFlatten(f)
1382 | Expression::ArrayCompact(f)
1383 | Expression::Explode(f)
1384 | Expression::ExplodeOuter(f)
1385 | Expression::ToArray(f)
1386 | Expression::MapFromEntries(f)
1387 | Expression::MapKeys(f)
1388 | Expression::MapValues(f)
1389 | Expression::JsonArrayLength(f)
1390 | Expression::JsonKeys(f)
1391 | Expression::JsonType(f)
1392 | Expression::ParseJson(f)
1393 | Expression::ToJson(f)
1394 | Expression::Typeof(f)
1395 | Expression::BitwiseCount(f)
1396 | Expression::Year(f)
1397 | Expression::Month(f)
1398 | Expression::Day(f)
1399 | Expression::Hour(f)
1400 | Expression::Minute(f)
1401 | Expression::Second(f)
1402 | Expression::DayOfWeek(f)
1403 | Expression::DayOfWeekIso(f)
1404 | Expression::DayOfMonth(f)
1405 | Expression::DayOfYear(f)
1406 | Expression::WeekOfYear(f)
1407 | Expression::Quarter(f)
1408 | Expression::Epoch(f)
1409 | Expression::EpochMs(f)
1410 | Expression::TimeStrToUnix(f)
1411 | Expression::SHA(f)
1412 | Expression::SHA1Digest(f)
1413 | Expression::TimeToUnix(f)
1414 | Expression::JSONBool(f)
1415 | Expression::Int64(f)
1416 | Expression::MD5NumberLower64(f)
1417 | Expression::MD5NumberUpper64(f)
1418 | Expression::DateStrToDate(f)
1419 | Expression::DateToDateStr(f) => {
1420 stack.push(&f.this);
1421 }
1422
1423 Expression::Power(f)
1425 | Expression::NullIf(f)
1426 | Expression::IfNull(f)
1427 | Expression::Nvl(f)
1428 | Expression::UnixToTimeStr(f)
1429 | Expression::Contains(f)
1430 | Expression::StartsWith(f)
1431 | Expression::EndsWith(f)
1432 | Expression::Levenshtein(f)
1433 | Expression::ModFunc(f)
1434 | Expression::Atan2(f)
1435 | Expression::IntDiv(f)
1436 | Expression::AddMonths(f)
1437 | Expression::MonthsBetween(f)
1438 | Expression::NextDay(f)
1439 | Expression::ArrayContains(f)
1440 | Expression::ArrayPosition(f)
1441 | Expression::ArrayAppend(f)
1442 | Expression::ArrayPrepend(f)
1443 | Expression::ArrayUnion(f)
1444 | Expression::ArrayExcept(f)
1445 | Expression::ArrayRemove(f)
1446 | Expression::StarMap(f)
1447 | Expression::MapFromArrays(f)
1448 | Expression::MapContainsKey(f)
1449 | Expression::ElementAt(f)
1450 | Expression::JsonMergePatch(f)
1451 | Expression::JSONBContains(f)
1452 | Expression::JSONBExtract(f) => {
1453 stack.push(&f.this);
1454 stack.push(&f.expression);
1455 }
1456
1457 Expression::Greatest(f)
1459 | Expression::Least(f)
1460 | Expression::Coalesce(f)
1461 | Expression::ArrayConcat(f)
1462 | Expression::ArrayIntersect(f)
1463 | Expression::ArrayZip(f)
1464 | Expression::MapConcat(f)
1465 | Expression::JsonArray(f) => {
1466 for e in &f.expressions {
1467 stack.push(e);
1468 }
1469 }
1470
1471 Expression::Sum(f)
1473 | Expression::Avg(f)
1474 | Expression::Min(f)
1475 | Expression::Max(f)
1476 | Expression::ArrayAgg(f)
1477 | Expression::CountIf(f)
1478 | Expression::Stddev(f)
1479 | Expression::StddevPop(f)
1480 | Expression::StddevSamp(f)
1481 | Expression::Variance(f)
1482 | Expression::VarPop(f)
1483 | Expression::VarSamp(f)
1484 | Expression::Median(f)
1485 | Expression::Mode(f)
1486 | Expression::First(f)
1487 | Expression::Last(f)
1488 | Expression::AnyValue(f)
1489 | Expression::ApproxDistinct(f)
1490 | Expression::ApproxCountDistinct(f)
1491 | Expression::LogicalAnd(f)
1492 | Expression::LogicalOr(f)
1493 | Expression::Skewness(f)
1494 | Expression::ArrayConcatAgg(f)
1495 | Expression::ArrayUniqueAgg(f)
1496 | Expression::BoolXorAgg(f)
1497 | Expression::BitwiseAndAgg(f)
1498 | Expression::BitwiseOrAgg(f)
1499 | Expression::BitwiseXorAgg(f) => {
1500 stack.push(&f.this);
1501 if let Some(ref filter) = f.filter {
1502 stack.push(filter);
1503 }
1504 if let Some((ref expr, _)) = f.having_max {
1505 stack.push(expr);
1506 }
1507 if let Some(ref limit) = f.limit {
1508 stack.push(limit);
1509 }
1510 }
1511
1512 Expression::Function(func) => {
1514 for arg in &func.args {
1515 stack.push(arg);
1516 }
1517 }
1518 Expression::AggregateFunction(func) => {
1519 for arg in &func.args {
1520 stack.push(arg);
1521 }
1522 if let Some(ref filter) = func.filter {
1523 stack.push(filter);
1524 }
1525 if let Some(ref limit) = func.limit {
1526 stack.push(limit);
1527 }
1528 }
1529
1530 Expression::WindowFunction(wf) => {
1532 stack.push(&wf.this);
1533 }
1534
1535 Expression::Alias(a) => {
1537 stack.push(&a.this);
1538 }
1539 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1540 stack.push(&c.this);
1541 if let Some(ref fmt) = c.format {
1542 stack.push(fmt);
1543 }
1544 if let Some(ref def) = c.default {
1545 stack.push(def);
1546 }
1547 }
1548 Expression::Paren(p) => {
1549 stack.push(&p.this);
1550 }
1551 Expression::Annotated(a) => {
1552 stack.push(&a.this);
1553 }
1554 Expression::Case(case) => {
1555 if let Some(ref operand) = case.operand {
1556 stack.push(operand);
1557 }
1558 for (cond, result) in &case.whens {
1559 stack.push(cond);
1560 stack.push(result);
1561 }
1562 if let Some(ref else_expr) = case.else_ {
1563 stack.push(else_expr);
1564 }
1565 }
1566 Expression::Collation(c) => {
1567 stack.push(&c.this);
1568 }
1569 Expression::In(i) => {
1570 stack.push(&i.this);
1571 for e in &i.expressions {
1572 stack.push(e);
1573 }
1574 if let Some(ref q) = i.query {
1575 stack.push(q);
1576 }
1577 if let Some(ref u) = i.unnest {
1578 stack.push(u);
1579 }
1580 }
1581 Expression::Between(b) => {
1582 stack.push(&b.this);
1583 stack.push(&b.low);
1584 stack.push(&b.high);
1585 }
1586 Expression::IsNull(n) => {
1587 stack.push(&n.this);
1588 }
1589 Expression::IsTrue(t) | Expression::IsFalse(t) => {
1590 stack.push(&t.this);
1591 }
1592 Expression::IsJson(j) => {
1593 stack.push(&j.this);
1594 }
1595 Expression::Like(l) | Expression::ILike(l) => {
1596 stack.push(&l.left);
1597 stack.push(&l.right);
1598 if let Some(ref esc) = l.escape {
1599 stack.push(esc);
1600 }
1601 }
1602 Expression::SimilarTo(s) => {
1603 stack.push(&s.this);
1604 stack.push(&s.pattern);
1605 if let Some(ref esc) = s.escape {
1606 stack.push(esc);
1607 }
1608 }
1609 Expression::Ordered(o) => {
1610 stack.push(&o.this);
1611 }
1612 Expression::Array(a) => {
1613 for e in &a.expressions {
1614 stack.push(e);
1615 }
1616 }
1617 Expression::Tuple(t) => {
1618 for e in &t.expressions {
1619 stack.push(e);
1620 }
1621 }
1622 Expression::Struct(s) => {
1623 for (_, e) in &s.fields {
1624 stack.push(e);
1625 }
1626 }
1627 Expression::Subscript(s) => {
1628 stack.push(&s.this);
1629 stack.push(&s.index);
1630 }
1631 Expression::Dot(d) => {
1632 stack.push(&d.this);
1633 }
1634 Expression::MethodCall(m) => {
1635 stack.push(&m.this);
1636 for arg in &m.args {
1637 stack.push(arg);
1638 }
1639 }
1640 Expression::ArraySlice(s) => {
1641 stack.push(&s.this);
1642 if let Some(ref start) = s.start {
1643 stack.push(start);
1644 }
1645 if let Some(ref end) = s.end {
1646 stack.push(end);
1647 }
1648 }
1649 Expression::Lambda(l) => {
1650 stack.push(&l.body);
1651 }
1652 Expression::NamedArgument(n) => {
1653 stack.push(&n.value);
1654 }
1655 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1656 stack.push(e);
1657 }
1658
1659 Expression::Substring(f) => {
1661 stack.push(&f.this);
1662 stack.push(&f.start);
1663 if let Some(ref len) = f.length {
1664 stack.push(len);
1665 }
1666 }
1667 Expression::Trim(f) => {
1668 stack.push(&f.this);
1669 if let Some(ref chars) = f.characters {
1670 stack.push(chars);
1671 }
1672 }
1673 Expression::Replace(f) => {
1674 stack.push(&f.this);
1675 stack.push(&f.old);
1676 stack.push(&f.new);
1677 }
1678 Expression::IfFunc(f) => {
1679 stack.push(&f.condition);
1680 stack.push(&f.true_value);
1681 if let Some(ref fv) = f.false_value {
1682 stack.push(fv);
1683 }
1684 }
1685 Expression::Nvl2(f) => {
1686 stack.push(&f.this);
1687 stack.push(&f.true_value);
1688 stack.push(&f.false_value);
1689 }
1690 Expression::ConcatWs(f) => {
1691 stack.push(&f.separator);
1692 for e in &f.expressions {
1693 stack.push(e);
1694 }
1695 }
1696 Expression::Count(f) => {
1697 if let Some(ref this) = f.this {
1698 stack.push(this);
1699 }
1700 if let Some(ref filter) = f.filter {
1701 stack.push(filter);
1702 }
1703 }
1704 Expression::GroupConcat(f) => {
1705 stack.push(&f.this);
1706 if let Some(ref sep) = f.separator {
1707 stack.push(sep);
1708 }
1709 if let Some(ref filter) = f.filter {
1710 stack.push(filter);
1711 }
1712 }
1713 Expression::StringAgg(f) => {
1714 stack.push(&f.this);
1715 if let Some(ref sep) = f.separator {
1716 stack.push(sep);
1717 }
1718 if let Some(ref filter) = f.filter {
1719 stack.push(filter);
1720 }
1721 if let Some(ref limit) = f.limit {
1722 stack.push(limit);
1723 }
1724 }
1725 Expression::ListAgg(f) => {
1726 stack.push(&f.this);
1727 if let Some(ref sep) = f.separator {
1728 stack.push(sep);
1729 }
1730 if let Some(ref filter) = f.filter {
1731 stack.push(filter);
1732 }
1733 }
1734 Expression::SumIf(f) => {
1735 stack.push(&f.this);
1736 stack.push(&f.condition);
1737 if let Some(ref filter) = f.filter {
1738 stack.push(filter);
1739 }
1740 }
1741 Expression::DateAdd(f) | Expression::DateSub(f) => {
1742 stack.push(&f.this);
1743 stack.push(&f.interval);
1744 }
1745 Expression::DateDiff(f) => {
1746 stack.push(&f.this);
1747 stack.push(&f.expression);
1748 }
1749 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1750 stack.push(&f.this);
1751 }
1752 Expression::Extract(f) => {
1753 stack.push(&f.this);
1754 }
1755 Expression::Round(f) => {
1756 stack.push(&f.this);
1757 if let Some(ref d) = f.decimals {
1758 stack.push(d);
1759 }
1760 }
1761 Expression::Floor(f) => {
1762 stack.push(&f.this);
1763 if let Some(ref s) = f.scale {
1764 stack.push(s);
1765 }
1766 if let Some(ref t) = f.to {
1767 stack.push(t);
1768 }
1769 }
1770 Expression::Ceil(f) => {
1771 stack.push(&f.this);
1772 if let Some(ref d) = f.decimals {
1773 stack.push(d);
1774 }
1775 if let Some(ref t) = f.to {
1776 stack.push(t);
1777 }
1778 }
1779 Expression::Log(f) => {
1780 stack.push(&f.this);
1781 if let Some(ref b) = f.base {
1782 stack.push(b);
1783 }
1784 }
1785 Expression::AtTimeZone(f) => {
1786 stack.push(&f.this);
1787 stack.push(&f.zone);
1788 }
1789 Expression::Lead(f) | Expression::Lag(f) => {
1790 stack.push(&f.this);
1791 if let Some(ref off) = f.offset {
1792 stack.push(off);
1793 }
1794 if let Some(ref def) = f.default {
1795 stack.push(def);
1796 }
1797 }
1798 Expression::FirstValue(f) | Expression::LastValue(f) => {
1799 stack.push(&f.this);
1800 }
1801 Expression::NthValue(f) => {
1802 stack.push(&f.this);
1803 stack.push(&f.offset);
1804 }
1805 Expression::Position(f) => {
1806 stack.push(&f.substring);
1807 stack.push(&f.string);
1808 if let Some(ref start) = f.start {
1809 stack.push(start);
1810 }
1811 }
1812 Expression::Decode(f) => {
1813 stack.push(&f.this);
1814 for (search, result) in &f.search_results {
1815 stack.push(search);
1816 stack.push(result);
1817 }
1818 if let Some(ref def) = f.default {
1819 stack.push(def);
1820 }
1821 }
1822 Expression::CharFunc(f) => {
1823 for arg in &f.args {
1824 stack.push(arg);
1825 }
1826 }
1827 Expression::ArraySort(f) => {
1828 stack.push(&f.this);
1829 if let Some(ref cmp) = f.comparator {
1830 stack.push(cmp);
1831 }
1832 }
1833 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1834 stack.push(&f.this);
1835 stack.push(&f.separator);
1836 if let Some(ref nr) = f.null_replacement {
1837 stack.push(nr);
1838 }
1839 }
1840 Expression::ArrayFilter(f) => {
1841 stack.push(&f.this);
1842 stack.push(&f.filter);
1843 }
1844 Expression::ArrayTransform(f) => {
1845 stack.push(&f.this);
1846 stack.push(&f.transform);
1847 }
1848 Expression::Sequence(f)
1849 | Expression::Generate(f)
1850 | Expression::ExplodingGenerateSeries(f) => {
1851 stack.push(&f.start);
1852 stack.push(&f.stop);
1853 if let Some(ref step) = f.step {
1854 stack.push(step);
1855 }
1856 }
1857 Expression::JsonExtract(f)
1858 | Expression::JsonExtractScalar(f)
1859 | Expression::JsonQuery(f)
1860 | Expression::JsonValue(f) => {
1861 stack.push(&f.this);
1862 stack.push(&f.path);
1863 }
1864 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1865 stack.push(&f.this);
1866 for p in &f.paths {
1867 stack.push(p);
1868 }
1869 }
1870 Expression::JsonObject(f) => {
1871 for (k, v) in &f.pairs {
1872 stack.push(k);
1873 stack.push(v);
1874 }
1875 }
1876 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1877 stack.push(&f.this);
1878 for (path, val) in &f.path_values {
1879 stack.push(path);
1880 stack.push(val);
1881 }
1882 }
1883 Expression::Overlay(f) => {
1884 stack.push(&f.this);
1885 stack.push(&f.replacement);
1886 stack.push(&f.from);
1887 if let Some(ref len) = f.length {
1888 stack.push(len);
1889 }
1890 }
1891 Expression::Convert(f) => {
1892 stack.push(&f.this);
1893 if let Some(ref style) = f.style {
1894 stack.push(style);
1895 }
1896 }
1897 Expression::ApproxPercentile(f) => {
1898 stack.push(&f.this);
1899 stack.push(&f.percentile);
1900 if let Some(ref acc) = f.accuracy {
1901 stack.push(acc);
1902 }
1903 if let Some(ref filter) = f.filter {
1904 stack.push(filter);
1905 }
1906 }
1907 Expression::Percentile(f)
1908 | Expression::PercentileCont(f)
1909 | Expression::PercentileDisc(f) => {
1910 stack.push(&f.this);
1911 stack.push(&f.percentile);
1912 if let Some(ref filter) = f.filter {
1913 stack.push(filter);
1914 }
1915 }
1916 Expression::WithinGroup(f) => {
1917 stack.push(&f.this);
1918 }
1919 Expression::Left(f) | Expression::Right(f) => {
1920 stack.push(&f.this);
1921 stack.push(&f.length);
1922 }
1923 Expression::Repeat(f) => {
1924 stack.push(&f.this);
1925 stack.push(&f.times);
1926 }
1927 Expression::Lpad(f) | Expression::Rpad(f) => {
1928 stack.push(&f.this);
1929 stack.push(&f.length);
1930 if let Some(ref fill) = f.fill {
1931 stack.push(fill);
1932 }
1933 }
1934 Expression::Split(f) => {
1935 stack.push(&f.this);
1936 stack.push(&f.delimiter);
1937 }
1938 Expression::RegexpLike(f) => {
1939 stack.push(&f.this);
1940 stack.push(&f.pattern);
1941 if let Some(ref flags) = f.flags {
1942 stack.push(flags);
1943 }
1944 }
1945 Expression::RegexpReplace(f) => {
1946 stack.push(&f.this);
1947 stack.push(&f.pattern);
1948 stack.push(&f.replacement);
1949 if let Some(ref flags) = f.flags {
1950 stack.push(flags);
1951 }
1952 }
1953 Expression::RegexpExtract(f) => {
1954 stack.push(&f.this);
1955 stack.push(&f.pattern);
1956 if let Some(ref group) = f.group {
1957 stack.push(group);
1958 }
1959 }
1960 Expression::ToDate(f) => {
1961 stack.push(&f.this);
1962 if let Some(ref fmt) = f.format {
1963 stack.push(fmt);
1964 }
1965 }
1966 Expression::ToTimestamp(f) => {
1967 stack.push(&f.this);
1968 if let Some(ref fmt) = f.format {
1969 stack.push(fmt);
1970 }
1971 }
1972 Expression::DateFormat(f) | Expression::FormatDate(f) => {
1973 stack.push(&f.this);
1974 stack.push(&f.format);
1975 }
1976 Expression::LastDay(f) => {
1977 stack.push(&f.this);
1978 }
1979 Expression::FromUnixtime(f) => {
1980 stack.push(&f.this);
1981 if let Some(ref fmt) = f.format {
1982 stack.push(fmt);
1983 }
1984 }
1985 Expression::UnixTimestamp(f) => {
1986 if let Some(ref this) = f.this {
1987 stack.push(this);
1988 }
1989 if let Some(ref fmt) = f.format {
1990 stack.push(fmt);
1991 }
1992 }
1993 Expression::MakeDate(f) => {
1994 stack.push(&f.year);
1995 stack.push(&f.month);
1996 stack.push(&f.day);
1997 }
1998 Expression::MakeTimestamp(f) => {
1999 stack.push(&f.year);
2000 stack.push(&f.month);
2001 stack.push(&f.day);
2002 stack.push(&f.hour);
2003 stack.push(&f.minute);
2004 stack.push(&f.second);
2005 if let Some(ref tz) = f.timezone {
2006 stack.push(tz);
2007 }
2008 }
2009 Expression::TruncFunc(f) => {
2010 stack.push(&f.this);
2011 if let Some(ref d) = f.decimals {
2012 stack.push(d);
2013 }
2014 }
2015 Expression::ArrayFunc(f) => {
2016 for e in &f.expressions {
2017 stack.push(e);
2018 }
2019 }
2020 Expression::Unnest(f) => {
2021 stack.push(&f.this);
2022 for e in &f.expressions {
2023 stack.push(e);
2024 }
2025 }
2026 Expression::StructFunc(f) => {
2027 for (_, e) in &f.fields {
2028 stack.push(e);
2029 }
2030 }
2031 Expression::StructExtract(f) => {
2032 stack.push(&f.this);
2033 }
2034 Expression::NamedStruct(f) => {
2035 for (k, v) in &f.pairs {
2036 stack.push(k);
2037 stack.push(v);
2038 }
2039 }
2040 Expression::MapFunc(f) => {
2041 for k in &f.keys {
2042 stack.push(k);
2043 }
2044 for v in &f.values {
2045 stack.push(v);
2046 }
2047 }
2048 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2049 stack.push(&f.this);
2050 stack.push(&f.transform);
2051 }
2052 Expression::JsonArrayAgg(f) => {
2053 stack.push(&f.this);
2054 if let Some(ref filter) = f.filter {
2055 stack.push(filter);
2056 }
2057 }
2058 Expression::JsonObjectAgg(f) => {
2059 stack.push(&f.key);
2060 stack.push(&f.value);
2061 if let Some(ref filter) = f.filter {
2062 stack.push(filter);
2063 }
2064 }
2065 Expression::NTile(f) => {
2066 if let Some(ref n) = f.num_buckets {
2067 stack.push(n);
2068 }
2069 }
2070 Expression::Rand(f) => {
2071 if let Some(ref s) = f.seed {
2072 stack.push(s);
2073 }
2074 if let Some(ref lo) = f.lower {
2075 stack.push(lo);
2076 }
2077 if let Some(ref hi) = f.upper {
2078 stack.push(hi);
2079 }
2080 }
2081 Expression::Any(q) | Expression::All(q) => {
2082 stack.push(&q.this);
2083 stack.push(&q.subquery);
2084 }
2085 Expression::Overlaps(o) => {
2086 if let Some(ref this) = o.this {
2087 stack.push(this);
2088 }
2089 if let Some(ref expr) = o.expression {
2090 stack.push(expr);
2091 }
2092 if let Some(ref ls) = o.left_start {
2093 stack.push(ls);
2094 }
2095 if let Some(ref le) = o.left_end {
2096 stack.push(le);
2097 }
2098 if let Some(ref rs) = o.right_start {
2099 stack.push(rs);
2100 }
2101 if let Some(ref re) = o.right_end {
2102 stack.push(re);
2103 }
2104 }
2105 Expression::Interval(i) => {
2106 if let Some(ref this) = i.this {
2107 stack.push(this);
2108 }
2109 }
2110 Expression::TimeStrToTime(f) => {
2111 stack.push(&f.this);
2112 if let Some(ref zone) = f.zone {
2113 stack.push(zone);
2114 }
2115 }
2116 Expression::JSONBExtractScalar(f) => {
2117 stack.push(&f.this);
2118 stack.push(&f.expression);
2119 if let Some(ref jt) = f.json_type {
2120 stack.push(jt);
2121 }
2122 }
2123
2124 _ => {}
2129 }
2130 }
2131}
2132
2133#[cfg(test)]
2138mod tests {
2139 use super::*;
2140 use crate::dialects::{Dialect, DialectType};
2141 use crate::expressions::DataType;
2142 use crate::optimizer::annotate_types::annotate_types;
2143 use crate::parse_one;
2144 use crate::schema::{MappingSchema, Schema};
2145
2146 fn parse(sql: &str) -> Expression {
2147 let dialect = Dialect::get(DialectType::Generic);
2148 let ast = dialect.parse(sql).unwrap();
2149 ast.into_iter().next().unwrap()
2150 }
2151
2152 #[test]
2153 fn test_simple_lineage() {
2154 let expr = parse("SELECT a FROM t");
2155 let node = lineage("a", &expr, None, false).unwrap();
2156
2157 assert_eq!(node.name, "a");
2158 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2159 let names = node.downstream_names();
2161 assert!(
2162 names.iter().any(|n| n == "t.a"),
2163 "Expected t.a in downstream, got: {:?}",
2164 names
2165 );
2166 }
2167
2168 #[test]
2169 fn test_lineage_walk() {
2170 let root = LineageNode {
2171 name: "col_a".to_string(),
2172 expression: Expression::Null(crate::expressions::Null),
2173 source: Expression::Null(crate::expressions::Null),
2174 downstream: vec![LineageNode::new(
2175 "t.a",
2176 Expression::Null(crate::expressions::Null),
2177 Expression::Null(crate::expressions::Null),
2178 )],
2179 source_name: String::new(),
2180 reference_node_name: String::new(),
2181 };
2182
2183 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2184 assert_eq!(names.len(), 2);
2185 assert_eq!(names[0], "col_a");
2186 assert_eq!(names[1], "t.a");
2187 }
2188
2189 #[test]
2190 fn test_aliased_column() {
2191 let expr = parse("SELECT a + 1 AS b FROM t");
2192 let node = lineage("b", &expr, None, false).unwrap();
2193
2194 assert_eq!(node.name, "b");
2195 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2197 assert!(
2198 all_names.iter().any(|n| n.contains("a")),
2199 "Expected to trace to column a, got: {:?}",
2200 all_names
2201 );
2202 }
2203
2204 #[test]
2205 fn test_qualified_column() {
2206 let expr = parse("SELECT t.a FROM t");
2207 let node = lineage("a", &expr, None, false).unwrap();
2208
2209 assert_eq!(node.name, "a");
2210 let names = node.downstream_names();
2211 assert!(
2212 names.iter().any(|n| n == "t.a"),
2213 "Expected t.a, got: {:?}",
2214 names
2215 );
2216 }
2217
2218 #[test]
2219 fn test_unqualified_column() {
2220 let expr = parse("SELECT a FROM t");
2221 let node = lineage("a", &expr, None, false).unwrap();
2222
2223 let names = node.downstream_names();
2225 assert!(
2226 names.iter().any(|n| n == "t.a"),
2227 "Expected t.a, got: {:?}",
2228 names
2229 );
2230 }
2231
2232 #[test]
2233 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2234 let query = "SELECT name FROM users";
2235 let dialect = Dialect::get(DialectType::BigQuery);
2236 let expr = dialect
2237 .parse(query)
2238 .unwrap()
2239 .into_iter()
2240 .next()
2241 .expect("expected one expression");
2242
2243 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2244 schema
2245 .add_table("users", &[("name".into(), DataType::Text)], None)
2246 .expect("schema setup");
2247
2248 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2249 .expect("lineage without schema");
2250 let mut expr_without = node_without_schema.expression.clone();
2251 annotate_types(
2252 &mut expr_without,
2253 Some(&schema),
2254 Some(DialectType::BigQuery),
2255 );
2256 assert_eq!(
2257 expr_without.inferred_type(),
2258 None,
2259 "Expected unresolved root type without schema-aware lineage qualification"
2260 );
2261
2262 let node_with_schema = lineage_with_schema(
2263 "name",
2264 &expr,
2265 Some(&schema),
2266 Some(DialectType::BigQuery),
2267 false,
2268 )
2269 .expect("lineage with schema");
2270 let mut expr_with = node_with_schema.expression.clone();
2271 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2272
2273 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2274 }
2275
2276 #[test]
2277 fn test_lineage_with_schema_correlated_scalar_subquery() {
2278 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2279 let dialect = Dialect::get(DialectType::BigQuery);
2280 let expr = dialect
2281 .parse(query)
2282 .unwrap()
2283 .into_iter()
2284 .next()
2285 .expect("expected one expression");
2286
2287 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2288 schema
2289 .add_table(
2290 "t1",
2291 &[("id".into(), DataType::BigInt { length: None })],
2292 None,
2293 )
2294 .expect("schema setup");
2295 schema
2296 .add_table(
2297 "t2",
2298 &[
2299 ("id".into(), DataType::BigInt { length: None }),
2300 ("val".into(), DataType::BigInt { length: None }),
2301 ],
2302 None,
2303 )
2304 .expect("schema setup");
2305
2306 let node = lineage_with_schema(
2307 "id",
2308 &expr,
2309 Some(&schema),
2310 Some(DialectType::BigQuery),
2311 false,
2312 )
2313 .expect("lineage_with_schema should handle correlated scalar subqueries");
2314
2315 assert_eq!(node.name, "id");
2316 }
2317
2318 #[test]
2319 fn test_lineage_with_schema_join_using() {
2320 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2321 let dialect = Dialect::get(DialectType::BigQuery);
2322 let expr = dialect
2323 .parse(query)
2324 .unwrap()
2325 .into_iter()
2326 .next()
2327 .expect("expected one expression");
2328
2329 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2330 schema
2331 .add_table(
2332 "t1",
2333 &[("a".into(), DataType::BigInt { length: None })],
2334 None,
2335 )
2336 .expect("schema setup");
2337 schema
2338 .add_table(
2339 "t2",
2340 &[("a".into(), DataType::BigInt { length: None })],
2341 None,
2342 )
2343 .expect("schema setup");
2344
2345 let node = lineage_with_schema(
2346 "a",
2347 &expr,
2348 Some(&schema),
2349 Some(DialectType::BigQuery),
2350 false,
2351 )
2352 .expect("lineage_with_schema should handle JOIN USING");
2353
2354 assert_eq!(node.name, "a");
2355 }
2356
2357 #[test]
2358 fn test_lineage_with_schema_qualified_table_name() {
2359 let query = "SELECT a FROM raw.t1";
2360 let dialect = Dialect::get(DialectType::BigQuery);
2361 let expr = dialect
2362 .parse(query)
2363 .unwrap()
2364 .into_iter()
2365 .next()
2366 .expect("expected one expression");
2367
2368 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2369 schema
2370 .add_table(
2371 "raw.t1",
2372 &[("a".into(), DataType::BigInt { length: None })],
2373 None,
2374 )
2375 .expect("schema setup");
2376
2377 let node = lineage_with_schema(
2378 "a",
2379 &expr,
2380 Some(&schema),
2381 Some(DialectType::BigQuery),
2382 false,
2383 )
2384 .expect("lineage_with_schema should handle dotted schema.table names");
2385
2386 assert_eq!(node.name, "a");
2387 }
2388
2389 #[test]
2390 fn test_lineage_with_schema_none_matches_lineage() {
2391 let expr = parse("SELECT a FROM t");
2392 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2393 let with_none =
2394 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2395
2396 assert_eq!(with_none.name, baseline.name);
2397 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2398 }
2399
2400 #[test]
2401 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2402 let dialect = Dialect::get(DialectType::BigQuery);
2403 let expr = dialect
2404 .parse("SELECT Name AS name FROM teams")
2405 .unwrap()
2406 .into_iter()
2407 .next()
2408 .expect("expected one expression");
2409
2410 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2411 schema
2412 .add_table(
2413 "teams",
2414 &[("Name".into(), DataType::String { length: None })],
2415 None,
2416 )
2417 .expect("schema setup");
2418
2419 let node = lineage_with_schema(
2420 "name",
2421 &expr,
2422 Some(&schema),
2423 Some(DialectType::BigQuery),
2424 false,
2425 )
2426 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2427
2428 let names = node.downstream_names();
2429 assert!(
2430 names.iter().any(|n| n == "teams.Name"),
2431 "Expected teams.Name in downstream, got: {:?}",
2432 names
2433 );
2434 }
2435
2436 #[test]
2437 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2438 let dialect = Dialect::get(DialectType::BigQuery);
2439 let expr = dialect
2440 .parse("SELECT Name AS Name FROM teams")
2441 .unwrap()
2442 .into_iter()
2443 .next()
2444 .expect("expected one expression");
2445
2446 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2447 .expect("lineage should resolve mixed-case aliases in BigQuery");
2448
2449 assert_eq!(node.name, "name");
2450 }
2451
2452 #[test]
2453 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2454 let expr = parse_one(
2455 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2456 DialectType::Snowflake,
2457 )
2458 .expect("parse");
2459
2460 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2461 schema
2462 .add_table(
2463 "fact.some_daily_metrics",
2464 &[("date_utc".to_string(), DataType::Date)],
2465 None,
2466 )
2467 .expect("schema setup");
2468
2469 let node = lineage_with_schema(
2470 "recency",
2471 &expr,
2472 Some(&schema),
2473 Some(DialectType::Snowflake),
2474 false,
2475 )
2476 .expect("lineage_with_schema should not treat date part as a column");
2477
2478 let names = node.downstream_names();
2479 assert!(
2480 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2481 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2482 names
2483 );
2484 assert!(
2485 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2486 "Did not expect date part to appear as lineage column, got: {:?}",
2487 names
2488 );
2489 }
2490
2491 #[test]
2492 fn test_snowflake_datediff_parses_to_typed_ast() {
2493 let expr = parse_one(
2494 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2495 DialectType::Snowflake,
2496 )
2497 .expect("parse");
2498
2499 match expr {
2500 Expression::Select(select) => match &select.expressions[0] {
2501 Expression::Alias(alias) => match &alias.this {
2502 Expression::DateDiff(f) => {
2503 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2504 }
2505 other => panic!("expected DateDiff, got {other:?}"),
2506 },
2507 other => panic!("expected Alias, got {other:?}"),
2508 },
2509 other => panic!("expected Select, got {other:?}"),
2510 }
2511 }
2512
2513 #[test]
2514 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2515 let expr = parse_one(
2516 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2517 DialectType::Snowflake,
2518 )
2519 .expect("parse");
2520
2521 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2522 schema
2523 .add_table(
2524 "fact.some_daily_metrics",
2525 &[("date_utc".to_string(), DataType::Date)],
2526 None,
2527 )
2528 .expect("schema setup");
2529
2530 let node = lineage_with_schema(
2531 "next_day",
2532 &expr,
2533 Some(&schema),
2534 Some(DialectType::Snowflake),
2535 false,
2536 )
2537 .expect("lineage_with_schema should not treat DATEADD date part as a column");
2538
2539 let names = node.downstream_names();
2540 assert!(
2541 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2542 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2543 names
2544 );
2545 assert!(
2546 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2547 "Did not expect date part to appear as lineage column, got: {:?}",
2548 names
2549 );
2550 }
2551
2552 #[test]
2553 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2554 let expr = parse_one(
2555 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2556 DialectType::Snowflake,
2557 )
2558 .expect("parse");
2559
2560 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2561 schema
2562 .add_table(
2563 "fact.some_daily_metrics",
2564 &[("date_utc".to_string(), DataType::Date)],
2565 None,
2566 )
2567 .expect("schema setup");
2568
2569 let node = lineage_with_schema(
2570 "day_part",
2571 &expr,
2572 Some(&schema),
2573 Some(DialectType::Snowflake),
2574 false,
2575 )
2576 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2577
2578 let names = node.downstream_names();
2579 assert!(
2580 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2581 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2582 names
2583 );
2584 assert!(
2585 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2586 "Did not expect date part to appear as lineage column, got: {:?}",
2587 names
2588 );
2589 }
2590
2591 #[test]
2592 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2593 let expr = parse_one(
2594 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2595 DialectType::Snowflake,
2596 )
2597 .expect("parse");
2598
2599 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2600 schema
2601 .add_table(
2602 "fact.some_daily_metrics",
2603 &[("date_utc".to_string(), DataType::Date)],
2604 None,
2605 )
2606 .expect("schema setup");
2607
2608 let node = lineage_with_schema(
2609 "day_part",
2610 &expr,
2611 Some(&schema),
2612 Some(DialectType::Snowflake),
2613 false,
2614 )
2615 .expect("quoted DATE_PART should continue to work");
2616
2617 let names = node.downstream_names();
2618 assert!(
2619 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2620 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2621 names
2622 );
2623 }
2624
2625 #[test]
2626 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2627 let expr = parse_one(
2628 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2629 DialectType::Snowflake,
2630 )
2631 .expect("parse");
2632
2633 match expr {
2634 Expression::Select(select) => match &select.expressions[0] {
2635 Expression::Alias(alias) => match &alias.this {
2636 Expression::Function(f) => {
2637 assert_eq!(f.name.to_uppercase(), "DATEADD");
2638 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2639 }
2640 other => panic!("expected generic DATEADD function, got {other:?}"),
2641 },
2642 other => panic!("expected Alias, got {other:?}"),
2643 },
2644 other => panic!("expected Select, got {other:?}"),
2645 }
2646 }
2647
2648 #[test]
2649 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2650 let expr = parse_one(
2651 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2652 DialectType::Snowflake,
2653 )
2654 .expect("parse");
2655
2656 match expr {
2657 Expression::Select(select) => match &select.expressions[0] {
2658 Expression::Alias(alias) => match &alias.this {
2659 Expression::Function(f) => {
2660 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2661 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2662 }
2663 other => panic!("expected generic DATE_PART function, got {other:?}"),
2664 },
2665 other => panic!("expected Alias, got {other:?}"),
2666 },
2667 other => panic!("expected Select, got {other:?}"),
2668 }
2669 }
2670
2671 #[test]
2672 fn test_snowflake_date_part_string_literal_stays_generic_function() {
2673 let expr = parse_one(
2674 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2675 DialectType::Snowflake,
2676 )
2677 .expect("parse");
2678
2679 match expr {
2680 Expression::Select(select) => match &select.expressions[0] {
2681 Expression::Alias(alias) => match &alias.this {
2682 Expression::Function(f) => {
2683 assert_eq!(f.name.to_uppercase(), "DATE_PART");
2684 }
2685 other => panic!("expected generic DATE_PART function, got {other:?}"),
2686 },
2687 other => panic!("expected Alias, got {other:?}"),
2688 },
2689 other => panic!("expected Select, got {other:?}"),
2690 }
2691 }
2692
2693 #[test]
2694 fn test_lineage_join() {
2695 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2696
2697 let node_a = lineage("a", &expr, None, false).unwrap();
2698 let names_a = node_a.downstream_names();
2699 assert!(
2700 names_a.iter().any(|n| n == "t.a"),
2701 "Expected t.a, got: {:?}",
2702 names_a
2703 );
2704
2705 let node_b = lineage("b", &expr, None, false).unwrap();
2706 let names_b = node_b.downstream_names();
2707 assert!(
2708 names_b.iter().any(|n| n == "s.b"),
2709 "Expected s.b, got: {:?}",
2710 names_b
2711 );
2712 }
2713
2714 #[test]
2715 fn test_lineage_alias_leaf_has_resolved_source_name() {
2716 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2717 let node = lineage("col1", &expr, None, false).unwrap();
2718
2719 let names = node.downstream_names();
2721 assert!(
2722 names.iter().any(|n| n == "t1.col1"),
2723 "Expected aliased column edge t1.col1, got: {:?}",
2724 names
2725 );
2726
2727 let leaf = node
2729 .downstream
2730 .iter()
2731 .find(|n| n.name == "t1.col1")
2732 .expect("Expected t1.col1 leaf");
2733 assert_eq!(leaf.source_name, "table1");
2734 match &leaf.source {
2735 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2736 _ => panic!("Expected leaf source to be a table expression"),
2737 }
2738 }
2739
2740 #[test]
2741 fn test_lineage_derived_table() {
2742 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2743 let node = lineage("a", &expr, None, false).unwrap();
2744
2745 assert_eq!(node.name, "a");
2746 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2748 assert!(
2749 all_names.iter().any(|n| n == "t.a"),
2750 "Expected to trace through derived table to t.a, got: {:?}",
2751 all_names
2752 );
2753 }
2754
2755 #[test]
2756 fn test_lineage_cte() {
2757 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2758 let node = lineage("a", &expr, None, false).unwrap();
2759
2760 assert_eq!(node.name, "a");
2761 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2762 assert!(
2763 all_names.iter().any(|n| n == "t.a"),
2764 "Expected to trace through CTE to t.a, got: {:?}",
2765 all_names
2766 );
2767 }
2768
2769 #[test]
2770 fn test_lineage_union() {
2771 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2772 let node = lineage("a", &expr, None, false).unwrap();
2773
2774 assert_eq!(node.name, "a");
2775 assert_eq!(
2777 node.downstream.len(),
2778 2,
2779 "Expected 2 branches for UNION, got {}",
2780 node.downstream.len()
2781 );
2782 }
2783
2784 #[test]
2785 fn test_lineage_cte_union() {
2786 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2787 let node = lineage("a", &expr, None, false).unwrap();
2788
2789 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2791 assert!(
2792 all_names.len() >= 3,
2793 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2794 all_names
2795 );
2796 }
2797
2798 #[test]
2799 fn test_lineage_star() {
2800 let expr = parse("SELECT * FROM t");
2801 let node = lineage("*", &expr, None, false).unwrap();
2802
2803 assert_eq!(node.name, "*");
2804 assert!(
2806 !node.downstream.is_empty(),
2807 "Star should produce downstream nodes"
2808 );
2809 }
2810
2811 #[test]
2812 fn test_lineage_subquery_in_select() {
2813 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2814 let node = lineage("x", &expr, None, false).unwrap();
2815
2816 assert_eq!(node.name, "x");
2817 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2819 assert!(
2820 all_names.len() >= 2,
2821 "Expected tracing into scalar subquery, got: {:?}",
2822 all_names
2823 );
2824 }
2825
2826 #[test]
2827 fn test_lineage_multiple_columns() {
2828 let expr = parse("SELECT a, b FROM t");
2829
2830 let node_a = lineage("a", &expr, None, false).unwrap();
2831 let node_b = lineage("b", &expr, None, false).unwrap();
2832
2833 assert_eq!(node_a.name, "a");
2834 assert_eq!(node_b.name, "b");
2835
2836 let names_a = node_a.downstream_names();
2838 let names_b = node_b.downstream_names();
2839 assert!(names_a.iter().any(|n| n == "t.a"));
2840 assert!(names_b.iter().any(|n| n == "t.b"));
2841 }
2842
2843 #[test]
2844 fn test_get_source_tables() {
2845 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2846 let node = lineage("a", &expr, None, false).unwrap();
2847
2848 let tables = get_source_tables(&node);
2849 assert!(
2850 tables.contains("t"),
2851 "Expected source table 't', got: {:?}",
2852 tables
2853 );
2854 }
2855
2856 #[test]
2857 fn test_lineage_column_not_found() {
2858 let expr = parse("SELECT a FROM t");
2859 let result = lineage("nonexistent", &expr, None, false);
2860 assert!(result.is_err());
2861 }
2862
2863 #[test]
2864 fn test_lineage_nested_cte() {
2865 let expr = parse(
2866 "WITH cte1 AS (SELECT a FROM t), \
2867 cte2 AS (SELECT a FROM cte1) \
2868 SELECT a FROM cte2",
2869 );
2870 let node = lineage("a", &expr, None, false).unwrap();
2871
2872 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2874 assert!(
2875 all_names.len() >= 3,
2876 "Expected to trace through nested CTEs, got: {:?}",
2877 all_names
2878 );
2879 }
2880
2881 #[test]
2882 fn test_trim_selects_true() {
2883 let expr = parse("SELECT a, b, c FROM t");
2884 let node = lineage("a", &expr, None, true).unwrap();
2885
2886 if let Expression::Select(select) = &node.source {
2888 assert_eq!(
2889 select.expressions.len(),
2890 1,
2891 "Trimmed source should have 1 expression, got {}",
2892 select.expressions.len()
2893 );
2894 } else {
2895 panic!("Expected Select source");
2896 }
2897 }
2898
2899 #[test]
2900 fn test_trim_selects_false() {
2901 let expr = parse("SELECT a, b, c FROM t");
2902 let node = lineage("a", &expr, None, false).unwrap();
2903
2904 if let Expression::Select(select) = &node.source {
2906 assert_eq!(
2907 select.expressions.len(),
2908 3,
2909 "Untrimmed source should have 3 expressions"
2910 );
2911 } else {
2912 panic!("Expected Select source");
2913 }
2914 }
2915
2916 #[test]
2917 fn test_lineage_expression_in_select() {
2918 let expr = parse("SELECT a + b AS c FROM t");
2919 let node = lineage("c", &expr, None, false).unwrap();
2920
2921 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2923 assert!(
2924 all_names.len() >= 3,
2925 "Expected to trace a + b to both columns, got: {:?}",
2926 all_names
2927 );
2928 }
2929
2930 #[test]
2931 fn test_set_operation_by_index() {
2932 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
2933
2934 let node = lineage("a", &expr, None, false).unwrap();
2936
2937 assert_eq!(node.downstream.len(), 2);
2939 }
2940
2941 fn print_node(node: &LineageNode, indent: usize) {
2944 let pad = " ".repeat(indent);
2945 println!(
2946 "{pad}name={:?} source_name={:?}",
2947 node.name, node.source_name
2948 );
2949 for child in &node.downstream {
2950 print_node(child, indent + 1);
2951 }
2952 }
2953
2954 #[test]
2955 fn test_issue18_repro() {
2956 let query = "SELECT UPPER(name) as upper_name FROM users";
2958 println!("Query: {query}\n");
2959
2960 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
2961 let exprs = dialect.parse(query).unwrap();
2962 let expr = &exprs[0];
2963
2964 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
2965 println!("lineage(\"upper_name\"):");
2966 print_node(&node, 1);
2967
2968 let names = node.downstream_names();
2969 assert!(
2970 names.iter().any(|n| n == "users.name"),
2971 "Expected users.name in downstream, got: {:?}",
2972 names
2973 );
2974 }
2975
2976 #[test]
2977 fn test_lineage_upper_function() {
2978 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
2979 let node = lineage("upper_name", &expr, None, false).unwrap();
2980
2981 let names = node.downstream_names();
2982 assert!(
2983 names.iter().any(|n| n == "users.name"),
2984 "Expected users.name in downstream, got: {:?}",
2985 names
2986 );
2987 }
2988
2989 #[test]
2990 fn test_lineage_round_function() {
2991 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
2992 let node = lineage("rounded", &expr, None, false).unwrap();
2993
2994 let names = node.downstream_names();
2995 assert!(
2996 names.iter().any(|n| n == "products.price"),
2997 "Expected products.price in downstream, got: {:?}",
2998 names
2999 );
3000 }
3001
3002 #[test]
3003 fn test_lineage_coalesce_function() {
3004 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3005 let node = lineage("val", &expr, None, false).unwrap();
3006
3007 let names = node.downstream_names();
3008 assert!(
3009 names.iter().any(|n| n == "t.a"),
3010 "Expected t.a in downstream, got: {:?}",
3011 names
3012 );
3013 assert!(
3014 names.iter().any(|n| n == "t.b"),
3015 "Expected t.b in downstream, got: {:?}",
3016 names
3017 );
3018 }
3019
3020 #[test]
3021 fn test_lineage_count_function() {
3022 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3023 let node = lineage("cnt", &expr, None, false).unwrap();
3024
3025 let names = node.downstream_names();
3026 assert!(
3027 names.iter().any(|n| n == "t.id"),
3028 "Expected t.id in downstream, got: {:?}",
3029 names
3030 );
3031 }
3032
3033 #[test]
3034 fn test_lineage_sum_function() {
3035 let expr = parse("SELECT SUM(amount) AS total FROM t");
3036 let node = lineage("total", &expr, None, false).unwrap();
3037
3038 let names = node.downstream_names();
3039 assert!(
3040 names.iter().any(|n| n == "t.amount"),
3041 "Expected t.amount in downstream, got: {:?}",
3042 names
3043 );
3044 }
3045
3046 #[test]
3047 fn test_lineage_case_with_nested_functions() {
3048 let expr =
3049 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3050 let node = lineage("result", &expr, None, false).unwrap();
3051
3052 let names = node.downstream_names();
3053 assert!(
3054 names.iter().any(|n| n == "t.x"),
3055 "Expected t.x in downstream, got: {:?}",
3056 names
3057 );
3058 assert!(
3059 names.iter().any(|n| n == "t.name"),
3060 "Expected t.name in downstream, got: {:?}",
3061 names
3062 );
3063 }
3064
3065 #[test]
3066 fn test_lineage_substring_function() {
3067 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3068 let node = lineage("short", &expr, None, false).unwrap();
3069
3070 let names = node.downstream_names();
3071 assert!(
3072 names.iter().any(|n| n == "t.name"),
3073 "Expected t.name in downstream, got: {:?}",
3074 names
3075 );
3076 }
3077
3078 #[test]
3081 fn test_lineage_cte_select_star() {
3082 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3086 let node = lineage("a", &expr, None, false).unwrap();
3087
3088 assert_eq!(node.name, "a");
3089 assert!(
3092 !node.downstream.is_empty(),
3093 "Expected downstream nodes tracing through CTE, got none"
3094 );
3095 }
3096
3097 #[test]
3098 fn test_lineage_cte_select_star_renamed_column() {
3099 let expr =
3102 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3103 let node = lineage("customer_id", &expr, None, false).unwrap();
3104
3105 assert_eq!(node.name, "customer_id");
3106 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3108 assert!(
3109 all_names.len() >= 2,
3110 "Expected at least 2 nodes (customer_id → source), got: {:?}",
3111 all_names
3112 );
3113 }
3114
3115 #[test]
3116 fn test_lineage_cte_select_star_multiple_columns() {
3117 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3119
3120 for col in &["a", "b", "c"] {
3121 let node = lineage(col, &expr, None, false).unwrap();
3122 assert_eq!(node.name, *col);
3123 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3125 assert!(
3126 all_names.len() >= 2,
3127 "Expected at least 2 nodes for column {}, got: {:?}",
3128 col,
3129 all_names
3130 );
3131 }
3132 }
3133
3134 #[test]
3135 fn test_lineage_nested_cte_select_star() {
3136 let expr = parse(
3138 "WITH cte1 AS (SELECT a FROM t), \
3139 cte2 AS (SELECT * FROM cte1) \
3140 SELECT * FROM cte2",
3141 );
3142 let node = lineage("a", &expr, None, false).unwrap();
3143
3144 assert_eq!(node.name, "a");
3145 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3146 assert!(
3147 all_names.len() >= 3,
3148 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3149 all_names
3150 );
3151 }
3152
3153 #[test]
3154 fn test_lineage_three_level_nested_cte_star() {
3155 let expr = parse(
3157 "WITH cte1 AS (SELECT x FROM t), \
3158 cte2 AS (SELECT * FROM cte1), \
3159 cte3 AS (SELECT * FROM cte2) \
3160 SELECT * FROM cte3",
3161 );
3162 let node = lineage("x", &expr, None, false).unwrap();
3163
3164 assert_eq!(node.name, "x");
3165 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3166 assert!(
3167 all_names.len() >= 4,
3168 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3169 all_names
3170 );
3171 }
3172
3173 #[test]
3174 fn test_lineage_cte_union_star() {
3175 let expr = parse(
3177 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3178 SELECT * FROM cte",
3179 );
3180 let node = lineage("a", &expr, None, false).unwrap();
3181
3182 assert_eq!(node.name, "a");
3183 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3184 assert!(
3185 all_names.len() >= 2,
3186 "Expected at least 2 nodes for CTE union star, got: {:?}",
3187 all_names
3188 );
3189 }
3190
3191 #[test]
3192 fn test_lineage_cte_star_unknown_table() {
3193 let expr = parse(
3196 "WITH cte AS (SELECT * FROM unknown_table) \
3197 SELECT * FROM cte",
3198 );
3199 let _result = lineage("x", &expr, None, false);
3202 }
3203
3204 #[test]
3205 fn test_lineage_cte_explicit_columns() {
3206 let expr = parse(
3208 "WITH cte(x, y) AS (SELECT a, b FROM t) \
3209 SELECT * FROM cte",
3210 );
3211 let node = lineage("x", &expr, None, false).unwrap();
3212 assert_eq!(node.name, "x");
3213 }
3214
3215 #[test]
3216 fn test_lineage_cte_qualified_star() {
3217 let expr = parse(
3219 "WITH cte AS (SELECT a, b FROM t) \
3220 SELECT cte.* FROM cte",
3221 );
3222 for col in &["a", "b"] {
3223 let node = lineage(col, &expr, None, false).unwrap();
3224 assert_eq!(node.name, *col);
3225 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3226 assert!(
3227 all_names.len() >= 2,
3228 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3229 col,
3230 all_names
3231 );
3232 }
3233 }
3234
3235 #[test]
3236 fn test_lineage_subquery_select_star() {
3237 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3240 let node = lineage("x", &expr, None, false).unwrap();
3241
3242 assert_eq!(node.name, "x");
3243 assert!(
3244 !node.downstream.is_empty(),
3245 "Expected downstream nodes for subquery with SELECT *, got none"
3246 );
3247 }
3248
3249 #[test]
3250 fn test_lineage_cte_star_with_schema_external_table() {
3251 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3253SELECT * FROM orders"#;
3254 let expr = parse(sql);
3255
3256 let mut schema = MappingSchema::new();
3257 let cols = vec![
3258 ("order_id".to_string(), DataType::Unknown),
3259 ("customer_id".to_string(), DataType::Unknown),
3260 ("amount".to_string(), DataType::Unknown),
3261 ];
3262 schema.add_table("stg_orders", &cols, None).unwrap();
3263
3264 let node = lineage_with_schema(
3265 "order_id",
3266 &expr,
3267 Some(&schema as &dyn Schema),
3268 None,
3269 false,
3270 )
3271 .unwrap();
3272 assert_eq!(node.name, "order_id");
3273 }
3274
3275 #[test]
3276 fn test_lineage_cte_star_with_schema_three_part_name() {
3277 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3279SELECT * FROM orders"#;
3280 let expr = parse(sql);
3281
3282 let mut schema = MappingSchema::new();
3283 let cols = vec![
3284 ("order_id".to_string(), DataType::Unknown),
3285 ("customer_id".to_string(), DataType::Unknown),
3286 ];
3287 schema.add_table("db.schema.stg_orders", &cols, None).unwrap();
3288
3289 let node = lineage_with_schema(
3290 "customer_id",
3291 &expr,
3292 Some(&schema as &dyn Schema),
3293 None,
3294 false,
3295 )
3296 .unwrap();
3297 assert_eq!(node.name, "customer_id");
3298 }
3299
3300 #[test]
3301 fn test_lineage_cte_star_with_schema_nested() {
3302 let sql = r#"WITH
3305 raw AS (SELECT * FROM external_table),
3306 enriched AS (SELECT * FROM raw)
3307 SELECT * FROM enriched"#;
3308 let expr = parse(sql);
3309
3310 let mut schema = MappingSchema::new();
3311 let cols = vec![
3312 ("id".to_string(), DataType::Unknown),
3313 ("name".to_string(), DataType::Unknown),
3314 ];
3315 schema.add_table("external_table", &cols, None).unwrap();
3316
3317 let node = lineage_with_schema(
3318 "name",
3319 &expr,
3320 Some(&schema as &dyn Schema),
3321 None,
3322 false,
3323 )
3324 .unwrap();
3325 assert_eq!(node.name, "name");
3326 }
3327
3328 #[test]
3329 fn test_lineage_cte_qualified_star_with_schema() {
3330 let sql = r#"WITH
3333 orders AS (SELECT * FROM stg_orders),
3334 enriched AS (
3335 SELECT orders.*, 'extra' AS extra
3336 FROM orders
3337 )
3338 SELECT * FROM enriched"#;
3339 let expr = parse(sql);
3340
3341 let mut schema = MappingSchema::new();
3342 let cols = vec![
3343 ("order_id".to_string(), DataType::Unknown),
3344 ("total".to_string(), DataType::Unknown),
3345 ];
3346 schema.add_table("stg_orders", &cols, None).unwrap();
3347
3348 let node = lineage_with_schema(
3349 "order_id",
3350 &expr,
3351 Some(&schema as &dyn Schema),
3352 None,
3353 false,
3354 )
3355 .unwrap();
3356 assert_eq!(node.name, "order_id");
3357
3358 let extra = lineage_with_schema(
3360 "extra",
3361 &expr,
3362 Some(&schema as &dyn Schema),
3363 None,
3364 false,
3365 )
3366 .unwrap();
3367 assert_eq!(extra.name, "extra");
3368 }
3369
3370 #[test]
3371 fn test_lineage_cte_star_without_schema_still_works() {
3372 let sql = r#"WITH
3374 cte1 AS (SELECT id, name FROM raw_table),
3375 cte2 AS (SELECT * FROM cte1)
3376 SELECT * FROM cte2"#;
3377 let expr = parse(sql);
3378
3379 let node = lineage("id", &expr, None, false).unwrap();
3381 assert_eq!(node.name, "id");
3382 }
3383
3384 #[test]
3385 fn test_lineage_nested_cte_star_with_join_and_schema() {
3386 let sql = r#"WITH
3389base_orders AS (
3390 SELECT * FROM stg_orders
3391),
3392with_payments AS (
3393 SELECT
3394 base_orders.*,
3395 p.amount
3396 FROM base_orders
3397 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3398),
3399final_cte AS (
3400 SELECT * FROM with_payments
3401)
3402SELECT * FROM final_cte"#;
3403 let expr = parse(sql);
3404
3405 let mut schema = MappingSchema::new();
3406 let order_cols = vec![
3407 ("order_id".to_string(), crate::expressions::DataType::Unknown),
3408 ("customer_id".to_string(), crate::expressions::DataType::Unknown),
3409 ("status".to_string(), crate::expressions::DataType::Unknown),
3410 ];
3411 let pay_cols = vec![
3412 ("payment_id".to_string(), crate::expressions::DataType::Unknown),
3413 ("order_id".to_string(), crate::expressions::DataType::Unknown),
3414 ("amount".to_string(), crate::expressions::DataType::Unknown),
3415 ];
3416 schema.add_table("stg_orders", &order_cols, None).unwrap();
3417 schema.add_table("stg_payments", &pay_cols, None).unwrap();
3418
3419 let node = lineage_with_schema(
3421 "order_id", &expr, Some(&schema as &dyn Schema), None, false,
3422 ).unwrap();
3423 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3424
3425 let has_table_qualified = all_names.iter().any(|n| n.contains('.') && n.contains("order_id"));
3427 assert!(has_table_qualified,
3428 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}", all_names);
3429
3430 let node = lineage_with_schema(
3432 "amount", &expr, Some(&schema as &dyn Schema), None, false,
3433 ).unwrap();
3434 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3435
3436 let has_table_qualified = all_names.iter().any(|n| n.contains('.') && n.contains("amount"));
3437 assert!(has_table_qualified,
3438 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}", all_names);
3439 }
3440
3441 #[test]
3442 fn test_lineage_cte_alias_resolution() {
3443 let sql = r#"WITH import_stg_items AS (
3445 SELECT item_id, name, status FROM stg_items
3446)
3447SELECT base.item_id, base.status
3448FROM import_stg_items AS base"#;
3449 let expr = parse(sql);
3450
3451 let node = lineage("item_id", &expr, None, false).unwrap();
3452 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3453 assert!(
3455 all_names.iter().any(|n| n == "stg_items.item_id"),
3456 "Expected leaf 'stg_items.item_id', got: {:?}",
3457 all_names
3458 );
3459 }
3460
3461 #[test]
3462 fn test_lineage_cte_alias_with_schema_and_star() {
3463 let sql = r#"WITH import_stg AS (
3465 SELECT * FROM stg_items
3466)
3467SELECT base.item_id, base.status
3468FROM import_stg AS base"#;
3469 let expr = parse(sql);
3470
3471 let mut schema = MappingSchema::new();
3472 schema
3473 .add_table(
3474 "stg_items",
3475 &[
3476 ("item_id".to_string(), DataType::Unknown),
3477 ("name".to_string(), DataType::Unknown),
3478 ("status".to_string(), DataType::Unknown),
3479 ],
3480 None,
3481 )
3482 .unwrap();
3483
3484 let node = lineage_with_schema(
3485 "item_id",
3486 &expr,
3487 Some(&schema as &dyn Schema),
3488 None,
3489 false,
3490 )
3491 .unwrap();
3492 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3493 assert!(
3494 all_names.iter().any(|n| n == "stg_items.item_id"),
3495 "Expected leaf 'stg_items.item_id', got: {:?}",
3496 all_names
3497 );
3498 }
3499
3500 #[test]
3501 fn test_lineage_cte_alias_with_join() {
3502 let sql = r#"WITH
3504 import_users AS (SELECT id, name FROM users),
3505 import_orders AS (SELECT id, user_id, amount FROM orders)
3506SELECT u.name, o.amount
3507FROM import_users AS u
3508LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3509 let expr = parse(sql);
3510
3511 let node = lineage("name", &expr, None, false).unwrap();
3512 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3513 assert!(
3514 all_names.iter().any(|n| n == "users.name"),
3515 "Expected leaf 'users.name', got: {:?}",
3516 all_names
3517 );
3518
3519 let node = lineage("amount", &expr, None, false).unwrap();
3520 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3521 assert!(
3522 all_names.iter().any(|n| n == "orders.amount"),
3523 "Expected leaf 'orders.amount', got: {:?}",
3524 all_names
3525 );
3526 }
3527
3528 #[test]
3533 fn test_lineage_unquoted_cte_case_insensitive() {
3534 let expr = parse(
3537 "WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE",
3538 );
3539 let node = lineage("col", &expr, None, false).unwrap();
3540 assert_eq!(node.name, "col");
3541 assert!(
3542 !node.downstream.is_empty(),
3543 "Unquoted CTE should resolve case-insensitively"
3544 );
3545 }
3546
3547 #[test]
3548 fn test_lineage_quoted_cte_case_preserved() {
3549 let expr = parse(
3551 r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#,
3552 );
3553 let node = lineage("col", &expr, None, false).unwrap();
3554 assert_eq!(node.name, "col");
3555 assert!(
3556 !node.downstream.is_empty(),
3557 "Quoted CTE with matching case should resolve"
3558 );
3559 }
3560
3561 #[test]
3562 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3563 let expr = parse(
3567 r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#,
3568 );
3569 let result = lineage("col", &expr, None, false);
3572 assert!(
3573 result.is_err(),
3574 "Quoted CTE with case mismatch should not expand star: {:?}",
3575 result
3576 );
3577 }
3578
3579 #[test]
3580 fn test_lineage_mixed_quoted_unquoted_cte() {
3581 let expr = parse(
3583 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3584 );
3585 let node = lineage("a", &expr, None, false).unwrap();
3586 assert_eq!(node.name, "a");
3587 assert!(
3588 !node.downstream.is_empty(),
3589 "Mixed quoted/unquoted CTE chain should resolve"
3590 );
3591 }
3592
3593 #[test]
3609 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3610 let expr = parse(
3621 r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#,
3622 );
3623 let node = lineage("col", &expr, None, false).unwrap();
3624 assert!(!node.downstream.is_empty());
3625 let child = &node.downstream[0];
3626 assert_eq!(
3628 child.source_name, "MyCte",
3629 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3630 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3631 );
3632 }
3633
3634 #[test]
3635 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
3636 let expr = parse(
3643 r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#,
3644 );
3645 let node = lineage("col", &expr, None, false).unwrap();
3646 assert!(!node.downstream.is_empty());
3647 let child = &node.downstream[0];
3648 assert_eq!(
3650 child.source_name, "MyCte",
3651 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3652 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3653 );
3654 }
3655
3656 #[test]
3663 #[ignore = "requires derived table star expansion (separate issue)"]
3664 fn test_node_name_doesnt_contain_comment() {
3665 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
3666 let node = lineage("x", &expr, None, false).unwrap();
3667
3668 assert_eq!(node.name, "x");
3669 assert!(!node.downstream.is_empty());
3670 }
3671
3672 #[test]
3676 fn test_comment_before_first_column_in_cte() {
3677 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
3678 let sql_without_comment = "with t as (select 1 as a) select a from t";
3679
3680 let expr_ok = parse(sql_without_comment);
3682 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
3683
3684 let expr_comment = parse(sql_with_comment);
3686 let node_comment = lineage("a", &expr_comment, None, false)
3687 .expect("with comment before first column should succeed");
3688
3689 assert_eq!(node_ok.name, node_comment.name, "node names should match");
3690 assert_eq!(
3691 node_ok.downstream_names(),
3692 node_comment.downstream_names(),
3693 "downstream lineage should be identical with or without comment"
3694 );
3695 }
3696
3697 #[test]
3699 fn test_block_comment_before_first_column() {
3700 let sql = "with t as (select 1 as a) select /* section */ a from t";
3701 let expr = parse(sql);
3702 let node = lineage("a", &expr, None, false)
3703 .expect("block comment before first column should succeed");
3704 assert_eq!(node.name, "a");
3705 assert!(
3706 !node.downstream.is_empty(),
3707 "should have downstream lineage"
3708 );
3709 }
3710
3711 #[test]
3713 fn test_comment_before_first_column_second_col_ok() {
3714 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
3715 let expr = parse(sql);
3716
3717 let node_a =
3718 lineage("a", &expr, None, false).expect("column a with comment should succeed");
3719 assert_eq!(node_a.name, "a");
3720
3721 let node_b =
3722 lineage("b", &expr, None, false).expect("column b with comment should succeed");
3723 assert_eq!(node_b.name, "b");
3724 }
3725
3726 #[test]
3728 fn test_comment_before_aliased_column() {
3729 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
3730 let expr = parse(sql);
3731 let node =
3732 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
3733 assert_eq!(node.name, "y");
3734 assert!(
3735 !node.downstream.is_empty(),
3736 "aliased column should have downstream lineage"
3737 );
3738 }
3739}