1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub source_kind: SourceKind,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub source_alias: Option<String>,
37 pub reference_node_name: String,
39}
40
41impl LineageNode {
42 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44 Self {
45 name: name.into(),
46 expression,
47 source,
48 downstream: Vec::new(),
49 source_name: String::new(),
50 source_kind: SourceKind::Unknown,
51 source_alias: None,
52 reference_node_name: String::new(),
53 }
54 }
55
56 pub fn walk(&self) -> LineageWalker<'_> {
58 LineageWalker { stack: vec![self] }
59 }
60
61 pub fn downstream_names(&self) -> Vec<String> {
63 self.downstream.iter().map(|n| n.name.clone()).collect()
64 }
65}
66
67fn source_kind_for_scope_context(
68 scope: &Scope,
69 source_name: &str,
70 reference_node_name: &str,
71) -> SourceKind {
72 if source_name.is_empty() && reference_node_name.is_empty() {
73 return SourceKind::Root;
74 }
75 if let Some(source_info) = scope.sources.get(source_name) {
76 return source_info.kind;
77 }
78 if scope.cte_sources.contains_key(source_name) {
79 return SourceKind::Cte;
80 }
81 match scope.scope_type {
82 ScopeType::Cte => SourceKind::Cte,
83 ScopeType::DerivedTable => SourceKind::DerivedTable,
84 ScopeType::Udtf => SourceKind::Virtual,
85 _ => SourceKind::Unknown,
86 }
87}
88
89fn apply_scope_context(
90 node: &mut LineageNode,
91 scope: &Scope,
92 source_name: &str,
93 reference_node_name: &str,
94) {
95 node.source_name = source_name.to_string();
96 node.reference_node_name = reference_node_name.to_string();
97 node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100pub struct LineageWalker<'a> {
102 stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106 type Item = &'a LineageNode;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 if let Some(node) = self.stack.pop() {
110 for child in node.downstream.iter().rev() {
112 self.stack.push(child);
113 }
114 Some(node)
115 } else {
116 None
117 }
118 }
119}
120
121enum ColumnRef<'a> {
127 Name(&'a str),
128 Index(usize),
129}
130
131pub fn lineage(
157 column: &str,
158 sql: &Expression,
159 dialect: Option<DialectType>,
160 trim_selects: bool,
161) -> Result<LineageNode> {
162 let effective_sql = lineage_effective_expression(sql);
164 let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165 if !has_with {
166 return lineage_from_expression(column, sql, dialect, trim_selects);
167 }
168 let mut owned = sql.clone();
169 expand_cte_stars(&mut owned, None);
170 lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173pub fn lineage_with_schema(
189 column: &str,
190 sql: &Expression,
191 schema: Option<&dyn Schema>,
192 dialect: Option<DialectType>,
193 trim_selects: bool,
194) -> Result<LineageNode> {
195 let mut qualified_expression = if let Some(schema) = schema {
196 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197 QualifyColumnsOptions::new().with_dialect(dialect_type)
198 } else {
199 QualifyColumnsOptions::new()
200 };
201
202 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203 Error::internal(format!("Lineage qualification failed with schema: {}", e))
204 })?
205 } else {
206 sql.clone()
207 };
208
209 annotate_types(&mut qualified_expression, schema, dialect);
211
212 expand_cte_stars(&mut qualified_expression, schema);
215
216 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220 column: &str,
221 sql: &Expression,
222 dialect: Option<DialectType>,
223 trim_selects: bool,
224) -> Result<LineageNode> {
225 let sql = lineage_effective_expression(sql);
226 let scope = build_scope(sql);
227 to_node(
228 ColumnRef::Name(column),
229 &scope,
230 dialect,
231 "",
232 "",
233 "",
234 trim_selects,
235 )
236}
237
238pub(crate) fn lineage_by_index_from_expression(
239 column_index: usize,
240 sql: &Expression,
241 dialect: Option<DialectType>,
242 trim_selects: bool,
243) -> Result<LineageNode> {
244 let sql = lineage_effective_expression(sql);
245 let scope = build_scope(sql);
246 to_node(
247 ColumnRef::Index(column_index),
248 &scope,
249 dialect,
250 "",
251 "",
252 "",
253 trim_selects,
254 )
255}
256
257fn lineage_effective_expression(sql: &Expression) -> &Expression {
258 match sql {
259 Expression::Prepare(prepare) => &prepare.statement,
260 _ => sql,
261 }
262}
263
264fn normalize_cte_name(ident: &Identifier) -> String {
274 if ident.quoted {
275 ident.name.clone()
276 } else {
277 ident.name.to_lowercase()
278 }
279}
280
281pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
293 if let Expression::Prepare(prepare) = expr {
294 expand_cte_stars(&mut prepare.statement, schema);
295 return;
296 }
297
298 let select = match expr {
299 Expression::Select(s) => s,
300 _ => return,
301 };
302
303 let with = match &mut select.with {
304 Some(w) => w,
305 None => return,
306 };
307
308 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
309
310 for cte in &mut with.ctes {
311 let cte_name = normalize_cte_name(&cte.alias);
312
313 if !cte.columns.is_empty() {
315 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
316 resolved_cte_columns.insert(cte_name, cols);
317 continue;
318 }
319
320 if with.recursive {
326 let is_self_referencing =
327 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
328 let body_sources = get_select_sources(body_select);
329 body_sources.iter().any(|s| s.normalized == cte_name)
330 } else {
331 false
332 };
333 if is_self_referencing {
334 continue;
335 }
336 }
337
338 let body_select = match get_leftmost_select_mut(&mut cte.this) {
340 Some(s) => s,
341 None => continue,
342 };
343
344 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
345 resolved_cte_columns.insert(cte_name, columns);
346 }
347
348 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
350}
351
352fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
357 let mut current = expr;
358 for _ in 0..MAX_LINEAGE_DEPTH {
359 match current {
360 Expression::Select(s) => return Some(s),
361 Expression::Union(u) => current = &mut u.left,
362 Expression::Intersect(i) => current = &mut i.left,
363 Expression::Except(e) => current = &mut e.left,
364 Expression::Paren(p) => current = &mut p.this,
365 _ => return None,
366 }
367 }
368 None
369}
370
371fn rewrite_stars_in_select(
375 select: &mut Select,
376 resolved_ctes: &HashMap<String, Vec<String>>,
377 schema: Option<&dyn Schema>,
378) -> Vec<String> {
379 let has_star = select
384 .expressions
385 .iter()
386 .any(|e| matches!(e, Expression::Star(_)));
387 let has_qualified_star = select
388 .expressions
389 .iter()
390 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
391
392 if !has_star && !has_qualified_star {
393 return select
395 .expressions
396 .iter()
397 .filter_map(get_expression_output_name)
398 .collect();
399 }
400
401 let sources = get_select_sources(select);
402 let mut new_expressions = Vec::new();
403 let mut result_columns = Vec::new();
404
405 for expr in &select.expressions {
406 match expr {
407 Expression::Star(star) => {
408 let qual = star.table.as_ref();
409 if let Some(expanded) =
410 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
411 {
412 for (src_alias, col_name) in &expanded {
413 let table_id = Identifier::new(src_alias);
414 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
415 result_columns.push(col_name.clone());
416 }
417 } else {
418 new_expressions.push(expr.clone());
419 result_columns.push("*".to_string());
420 }
421 }
422 Expression::Column(c) if c.name.name == "*" => {
423 let qual = c.table.as_ref();
424 if let Some(expanded) =
425 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
426 {
427 for (_src_alias, col_name) in &expanded {
428 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
430 result_columns.push(col_name.clone());
431 }
432 } else {
433 new_expressions.push(expr.clone());
434 result_columns.push("*".to_string());
435 }
436 }
437 _ => {
438 new_expressions.push(expr.clone());
439 if let Some(name) = get_expression_output_name(expr) {
440 result_columns.push(name);
441 }
442 }
443 }
444 }
445
446 select.expressions = new_expressions;
447 result_columns
448}
449
450fn expand_star_from_sources(
455 qualifier: Option<&Identifier>,
456 sources: &[SourceInfo],
457 resolved_ctes: &HashMap<String, Vec<String>>,
458 schema: Option<&dyn Schema>,
459) -> Option<Vec<(String, String)>> {
460 let mut expanded = Vec::new();
461
462 if let Some(qual) = qualifier {
463 let qual_normalized = normalize_cte_name(qual);
465 for src in sources {
466 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
467 if let Some(cols) = resolved_ctes.get(&src.normalized) {
469 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
470 return Some(expanded);
471 }
472 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
474 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
475 return Some(expanded);
476 }
477 }
478 }
479 None
480 } else {
481 let mut any_expanded = false;
487 for src in sources {
488 if let Some(cols) = resolved_ctes.get(&src.normalized) {
489 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
490 any_expanded = true;
491 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
492 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
493 any_expanded = true;
494 } else {
495 return None;
496 }
497 }
498 if any_expanded {
499 Some(expanded)
500 } else {
501 None
502 }
503 }
504}
505
506fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
508 let schema = schema?;
509 if fq_name.is_empty() {
510 return None;
511 }
512 schema
513 .column_names(fq_name)
514 .ok()
515 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
516}
517
518fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
520 Expression::Column(Box::new(crate::expressions::Column {
521 name: Identifier::new(name),
522 table: table.cloned(),
523 join_mark: false,
524 trailing_comments: Vec::new(),
525 span: None,
526 inferred_type: None,
527 }))
528}
529
530fn get_expression_output_name(expr: &Expression) -> Option<String> {
532 match expr {
533 Expression::Alias(a) => Some(a.alias.name.clone()),
534 Expression::Column(c) => Some(c.name.name.clone()),
535 Expression::Identifier(id) => Some(id.name.clone()),
536 Expression::Star(_) => Some("*".to_string()),
537 _ => None,
538 }
539}
540
541struct SourceInfo {
543 alias: String,
544 normalized: String,
546 fq_name: String,
548}
549
550fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
553 let mut sources = Vec::new();
554
555 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
556 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
557 SourceInfo {
558 alias: alias.name.clone(),
559 normalized: normalize_cte_name(alias),
560 fq_name: alias.name.clone(),
561 }
562 }
563
564 fn named_virtual_source_info(alias: &str) -> SourceInfo {
565 SourceInfo {
566 alias: alias.to_string(),
567 normalized: alias.to_lowercase(),
568 fq_name: alias.to_string(),
569 }
570 }
571
572 match expr {
573 Expression::Table(t) => {
574 let normalized = normalize_cte_name(&t.name);
575 let alias = t
576 .alias
577 .as_ref()
578 .map(|a| a.name.clone())
579 .unwrap_or_else(|| t.name.name.clone());
580 let mut parts = Vec::new();
581 if let Some(catalog) = &t.catalog {
582 parts.push(catalog.name.clone());
583 }
584 if let Some(schema) = &t.schema {
585 parts.push(schema.name.clone());
586 }
587 parts.push(t.name.name.clone());
588 let fq_name = parts.join(".");
589 Some(SourceInfo {
590 alias,
591 normalized,
592 fq_name,
593 })
594 }
595 Expression::Subquery(s) => {
596 let alias = s.alias.as_ref()?.name.clone();
597 let normalized = alias.to_lowercase();
598 let fq_name = alias.clone();
599 Some(SourceInfo {
600 alias,
601 normalized,
602 fq_name,
603 })
604 }
605 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
606 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
607 Some(virtual_source_info(&a.alias))
608 }
609 Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
610 Expression::LateralView(lateral_view) => lateral_view
611 .table_alias
612 .as_ref()
613 .or_else(|| lateral_view.column_aliases.first())
614 .map(virtual_source_info),
615 Expression::Paren(p) => extract_source(&p.this),
616 _ => None,
617 }
618 }
619
620 if let Some(from) = &select.from {
621 for expr in &from.expressions {
622 if let Some(info) = extract_source(expr) {
623 sources.push(info);
624 }
625 }
626 }
627 for join in &select.joins {
628 if let Some(info) = extract_source(&join.this) {
629 sources.push(info);
630 }
631 }
632 for lateral_view in &select.lateral_views {
633 if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
634 {
635 sources.push(info);
636 }
637 }
638 sources
639}
640
641pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
643 let mut tables = HashSet::new();
644 collect_source_tables(node, &mut tables);
645 tables
646}
647
648pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
650 if let Expression::Table(table) = &node.source {
651 tables.insert(table.name.name.clone());
652 }
653 for child in &node.downstream {
654 collect_source_tables(child, tables);
655 }
656}
657
658const MAX_LINEAGE_DEPTH: usize = 64;
665
666fn to_node(
668 column: ColumnRef<'_>,
669 scope: &Scope,
670 dialect: Option<DialectType>,
671 scope_name: &str,
672 source_name: &str,
673 reference_node_name: &str,
674 trim_selects: bool,
675) -> Result<LineageNode> {
676 to_node_inner(
677 column,
678 scope,
679 dialect,
680 scope_name,
681 source_name,
682 reference_node_name,
683 trim_selects,
684 &[],
685 0,
686 )
687}
688
689fn to_node_inner(
690 column: ColumnRef<'_>,
691 scope: &Scope,
692 dialect: Option<DialectType>,
693 scope_name: &str,
694 source_name: &str,
695 reference_node_name: &str,
696 trim_selects: bool,
697 ancestor_cte_scopes: &[Scope],
698 depth: usize,
699) -> Result<LineageNode> {
700 if depth > MAX_LINEAGE_DEPTH {
701 return Err(Error::internal(format!(
702 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
703 )));
704 }
705 let scope_expr = &scope.expression;
706
707 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
709 for s in ancestor_cte_scopes {
710 all_cte_scopes.push(s);
711 }
712
713 let effective_expr = match scope_expr {
716 Expression::Cte(cte) => &cte.this,
717 other => other,
718 };
719
720 if matches!(
722 effective_expr,
723 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
724 ) {
725 if matches!(scope_expr, Expression::Cte(_)) {
727 let mut inner_scope = Scope::new(effective_expr.clone());
728 inner_scope.union_scopes = scope.union_scopes.clone();
729 inner_scope.sources = scope.sources.clone();
730 inner_scope.cte_sources = scope.cte_sources.clone();
731 inner_scope.cte_scopes = scope.cte_scopes.clone();
732 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
733 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
734 return handle_set_operation(
735 &column,
736 &inner_scope,
737 dialect,
738 scope_name,
739 source_name,
740 reference_node_name,
741 trim_selects,
742 ancestor_cte_scopes,
743 depth,
744 );
745 }
746 return handle_set_operation(
747 &column,
748 scope,
749 dialect,
750 scope_name,
751 source_name,
752 reference_node_name,
753 trim_selects,
754 ancestor_cte_scopes,
755 depth,
756 );
757 }
758
759 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
761 let column_name = resolve_column_name(&column, &select_expr);
762
763 let node_source = if trim_selects {
765 trim_source(effective_expr, &select_expr)
766 } else {
767 effective_expr.clone()
768 };
769
770 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
772 apply_scope_context(&mut node, scope, source_name, reference_node_name);
773
774 if let Expression::Star(star) = &select_expr {
776 let star_table = star
777 .table
778 .as_ref()
779 .map(|identifier| identifier.name.as_str());
780 for (name, source_info) in &scope.sources {
781 if let Some(star_table) = star_table {
782 let table_matches = name.eq_ignore_ascii_case(star_table)
783 || source_info
784 .alias
785 .as_deref()
786 .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
787 || matches!(
788 &source_info.expression,
789 Expression::Table(table_ref)
790 if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
791 );
792 if !table_matches {
793 continue;
794 }
795 }
796
797 let mut child = LineageNode::new(
798 format!("{}.*", name),
799 Expression::Star(crate::expressions::Star {
800 table: star.table.clone(),
801 except: None,
802 replace: None,
803 rename: None,
804 trailing_comments: vec![],
805 span: None,
806 }),
807 source_info.expression.clone(),
808 );
809 apply_source_info_context(&mut child, name, source_info);
810 node.downstream.push(child);
811 }
812 return Ok(node);
813 }
814
815 let subqueries: Vec<&Expression> =
817 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
818 for sq_expr in subqueries {
819 if let Expression::Subquery(sq) = sq_expr {
820 for sq_scope in &scope.subquery_scopes {
821 if sq_scope.expression == sq.this {
822 if let Ok(child) = to_node_inner(
823 ColumnRef::Index(0),
824 sq_scope,
825 dialect,
826 &column_name,
827 "",
828 "",
829 trim_selects,
830 ancestor_cte_scopes,
831 depth + 1,
832 ) {
833 node.downstream.push(child);
834 }
835 break;
836 }
837 }
838 }
839 }
840
841 let col_refs = find_column_refs_in_expr(&select_expr, dialect);
843 for col_ref in col_refs {
844 let col_name = &col_ref.column;
845 if let Some(ref table_id) = col_ref.table {
846 let tbl = &table_id.name;
847 resolve_qualified_column(
848 &mut node,
849 scope,
850 dialect,
851 tbl,
852 col_name,
853 &column_name,
854 trim_selects,
855 &all_cte_scopes,
856 depth,
857 );
858 } else {
859 resolve_unqualified_column(
860 &mut node,
861 scope,
862 dialect,
863 col_name,
864 &column_name,
865 trim_selects,
866 &all_cte_scopes,
867 depth,
868 );
869 }
870 }
871
872 Ok(node)
873}
874
875fn handle_set_operation(
880 column: &ColumnRef<'_>,
881 scope: &Scope,
882 dialect: Option<DialectType>,
883 scope_name: &str,
884 source_name: &str,
885 reference_node_name: &str,
886 trim_selects: bool,
887 ancestor_cte_scopes: &[Scope],
888 depth: usize,
889) -> Result<LineageNode> {
890 let scope_expr = &scope.expression;
891
892 let col_index = match column {
894 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
895 ColumnRef::Index(i) => *i,
896 };
897
898 let col_name = match column {
899 ColumnRef::Name(name) => name.to_string(),
900 ColumnRef::Index(_) => format!("_{col_index}"),
901 };
902
903 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
904 apply_scope_context(&mut node, scope, source_name, reference_node_name);
905
906 for branch_scope in &scope.union_scopes {
908 if let Ok(child) = to_node_inner(
909 ColumnRef::Index(col_index),
910 branch_scope,
911 dialect,
912 scope_name,
913 "",
914 "",
915 trim_selects,
916 ancestor_cte_scopes,
917 depth + 1,
918 ) {
919 node.downstream.push(child);
920 }
921 }
922
923 Ok(node)
924}
925
926fn resolve_qualified_column(
931 node: &mut LineageNode,
932 scope: &Scope,
933 dialect: Option<DialectType>,
934 table: &str,
935 col_name: &str,
936 parent_name: &str,
937 trim_selects: bool,
938 all_cte_scopes: &[&Scope],
939 depth: usize,
940) {
941 let resolved_cte_name = resolve_cte_alias(scope, table);
944 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
945
946 let is_cte = scope.cte_sources.contains_key(effective_table)
949 || all_cte_scopes.iter().any(
950 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
951 );
952 if is_cte {
953 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
954 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
956 if let Ok(child) = to_node_inner(
957 ColumnRef::Name(col_name),
958 child_scope,
959 dialect,
960 parent_name,
961 effective_table,
962 parent_name,
963 trim_selects,
964 &ancestors,
965 depth + 1,
966 ) {
967 node.downstream.push(child);
968 return;
969 }
970 }
971 }
972
973 if let Some(source_info) = scope.sources.get(table) {
975 if source_info.is_scope {
976 if let Some(child_scope) = find_child_scope(scope, table) {
977 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
978 if let Ok(child) = to_node_inner(
979 ColumnRef::Name(col_name),
980 child_scope,
981 dialect,
982 parent_name,
983 table,
984 parent_name,
985 trim_selects,
986 &ancestors,
987 depth + 1,
988 ) {
989 node.downstream.push(child);
990 return;
991 }
992 }
993 }
994 }
995
996 if let Some(source_info) = scope.sources.get(table) {
999 if !source_info.is_scope {
1000 let mut child = make_table_column_node_from_source(table, col_name, source_info);
1001 if source_info.kind == SourceKind::Virtual {
1002 attach_virtual_source_dependencies(
1003 &mut child,
1004 scope,
1005 dialect,
1006 table,
1007 &source_info.expression,
1008 trim_selects,
1009 all_cte_scopes,
1010 depth,
1011 );
1012 }
1013 node.downstream.push(child);
1014 return;
1015 }
1016 }
1017
1018 node.downstream
1020 .push(make_table_column_node(table, col_name));
1021}
1022
1023fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1029 if scope.cte_sources.contains_key(name) {
1031 return None;
1032 }
1033 if let Some(source_info) = scope.sources.get(name) {
1035 if source_info.is_scope {
1036 if let Expression::Cte(cte) = &source_info.expression {
1037 let cte_name = &cte.alias.name;
1038 if scope.cte_sources.contains_key(cte_name) {
1039 return Some(cte_name.clone());
1040 }
1041 }
1042 }
1043 }
1044 None
1045}
1046
1047fn resolve_unqualified_column(
1048 node: &mut LineageNode,
1049 scope: &Scope,
1050 dialect: Option<DialectType>,
1051 col_name: &str,
1052 parent_name: &str,
1053 trim_selects: bool,
1054 all_cte_scopes: &[&Scope],
1055 depth: usize,
1056) {
1057 let from_source_names = source_names_from_from_join(scope);
1061
1062 if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1063 resolve_qualified_column(
1064 node,
1065 scope,
1066 dialect,
1067 &tbl,
1068 col_name,
1069 parent_name,
1070 trim_selects,
1071 all_cte_scopes,
1072 depth,
1073 );
1074 return;
1075 }
1076
1077 if from_source_names.len() == 1 {
1078 let tbl = &from_source_names[0];
1079 resolve_qualified_column(
1080 node,
1081 scope,
1082 dialect,
1083 tbl,
1084 col_name,
1085 parent_name,
1086 trim_selects,
1087 all_cte_scopes,
1088 depth,
1089 );
1090 return;
1091 }
1092
1093 let child = LineageNode::new(
1095 col_name.to_string(),
1096 Expression::Column(Box::new(crate::expressions::Column {
1097 name: crate::expressions::Identifier::new(col_name.to_string()),
1098 table: None,
1099 join_mark: false,
1100 trailing_comments: vec![],
1101 span: None,
1102 inferred_type: None,
1103 })),
1104 node.source.clone(),
1105 );
1106 node.downstream.push(child);
1107}
1108
1109fn unique_virtual_source_for_column(
1110 scope: &Scope,
1111 source_names: &[String],
1112 col_name: &str,
1113) -> Option<String> {
1114 let mut matches = source_names.iter().filter_map(|source_name| {
1115 let source = scope.sources.get(source_name)?;
1116 if source.kind == SourceKind::Virtual
1117 && virtual_source_output_columns(source)
1118 .any(|column| column.eq_ignore_ascii_case(col_name))
1119 {
1120 Some(source_name.clone())
1121 } else {
1122 None
1123 }
1124 });
1125
1126 let first = matches.next()?;
1127 if matches.next().is_none() {
1128 Some(first)
1129 } else {
1130 None
1131 }
1132}
1133
1134fn virtual_source_output_columns(
1135 source_info: &ScopeSourceInfo,
1136) -> Box<dyn Iterator<Item = String> + '_> {
1137 match &source_info.expression {
1138 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1139 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1140 Box::new(alias_output_columns(alias))
1141 }
1142 Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1143 Expression::LateralView(lateral_view) => {
1144 Box::new(lateral_view_output_columns(lateral_view))
1145 }
1146 _ => Box::new(source_info.alias.clone().into_iter()),
1147 }
1148}
1149
1150fn unnest_output_columns(
1151 unnest: &crate::expressions::UnnestFunc,
1152) -> impl Iterator<Item = String> + '_ {
1153 unnest
1154 .alias
1155 .iter()
1156 .map(|alias| alias.name.clone())
1157 .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1158}
1159
1160fn alias_output_columns(
1161 alias: &crate::expressions::Alias,
1162) -> Box<dyn Iterator<Item = String> + '_> {
1163 if alias.column_aliases.is_empty() {
1164 Box::new(std::iter::once(alias.alias.name.clone()))
1165 } else {
1166 Box::new(
1167 alias
1168 .column_aliases
1169 .iter()
1170 .map(|column| column.name.clone()),
1171 )
1172 }
1173}
1174
1175fn lateral_output_columns(
1176 lateral: &crate::expressions::Lateral,
1177) -> Box<dyn Iterator<Item = String> + '_> {
1178 if lateral.column_aliases.is_empty() {
1179 default_virtual_output_columns(&lateral.this)
1180 } else {
1181 Box::new(lateral.column_aliases.iter().cloned())
1182 }
1183}
1184
1185fn lateral_view_output_columns(
1186 lateral_view: &crate::expressions::LateralView,
1187) -> Box<dyn Iterator<Item = String> + '_> {
1188 Box::new(
1189 lateral_view
1190 .column_aliases
1191 .iter()
1192 .map(|column| column.name.clone()),
1193 )
1194}
1195
1196fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1197 match expr {
1198 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1199 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1200 alias_output_columns(alias)
1201 }
1202 Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1203 Box::new(
1204 ["seq", "key", "path", "index", "value", "this"]
1205 .into_iter()
1206 .map(String::from),
1207 )
1208 }
1209 _ => Box::new(std::iter::empty()),
1210 }
1211}
1212
1213fn attach_virtual_source_dependencies(
1214 node: &mut LineageNode,
1215 scope: &Scope,
1216 dialect: Option<DialectType>,
1217 source_alias: &str,
1218 source_expr: &Expression,
1219 trim_selects: bool,
1220 all_cte_scopes: &[&Scope],
1221 depth: usize,
1222) {
1223 let parent_name = node.name.clone();
1224 let mut seen = HashSet::new();
1225 for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1226 let key = (
1227 col_ref.table.as_ref().map(|t| t.name.clone()),
1228 col_ref.column.clone(),
1229 );
1230 if !seen.insert(key) {
1231 continue;
1232 }
1233
1234 if let Some(table_id) = col_ref.table {
1235 let table = table_id.name;
1236 if table == source_alias {
1237 continue;
1238 }
1239 resolve_qualified_column(
1240 node,
1241 scope,
1242 dialect,
1243 &table,
1244 &col_ref.column,
1245 &parent_name,
1246 trim_selects,
1247 all_cte_scopes,
1248 depth + 1,
1249 );
1250 } else {
1251 let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1252 if non_virtual_sources.len() == 1 {
1253 resolve_qualified_column(
1254 node,
1255 scope,
1256 dialect,
1257 &non_virtual_sources[0],
1258 &col_ref.column,
1259 &parent_name,
1260 trim_selects,
1261 all_cte_scopes,
1262 depth + 1,
1263 );
1264 }
1265 }
1266 }
1267}
1268
1269fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1270 fn source_name(expr: &Expression) -> Option<String> {
1271 match expr {
1272 Expression::Table(table) => Some(
1273 table
1274 .alias
1275 .as_ref()
1276 .map(|a| a.name.clone())
1277 .unwrap_or_else(|| table.name.name.clone()),
1278 ),
1279 Expression::Subquery(subquery) => {
1280 subquery.alias.as_ref().map(|alias| alias.name.clone())
1281 }
1282 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1283 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1284 Some(alias.alias.name.clone())
1285 }
1286 Expression::Lateral(lateral) => lateral.alias.clone(),
1287 Expression::LateralView(lateral_view) => lateral_view
1288 .table_alias
1289 .as_ref()
1290 .or_else(|| lateral_view.column_aliases.first())
1291 .map(|alias| alias.name.clone()),
1292 Expression::Paren(paren) => source_name(&paren.this),
1293 _ => None,
1294 }
1295 }
1296
1297 let effective_expr = match &scope.expression {
1298 Expression::Cte(cte) => &cte.this,
1299 expr => expr,
1300 };
1301
1302 let mut names = Vec::new();
1303 let mut seen = std::collections::HashSet::new();
1304
1305 if let Expression::Select(select) = effective_expr {
1306 if let Some(from) = &select.from {
1307 for expr in &from.expressions {
1308 if let Some(name) = source_name(expr) {
1309 if !name.is_empty() && seen.insert(name.clone()) {
1310 names.push(name);
1311 }
1312 }
1313 }
1314 }
1315 for join in &select.joins {
1316 if let Some(name) = source_name(&join.this) {
1317 if !name.is_empty() && seen.insert(name.clone()) {
1318 names.push(name);
1319 }
1320 }
1321 }
1322 for lateral_view in &select.lateral_views {
1323 if let Some(name) =
1324 source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1325 {
1326 if !name.is_empty() && seen.insert(name.clone()) {
1327 names.push(name);
1328 }
1329 }
1330 }
1331 }
1332
1333 names
1334}
1335
1336fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1337 source_names_from_from_join(scope)
1338 .into_iter()
1339 .filter(|name| {
1340 !matches!(
1341 scope.sources.get(name).map(|source| source.kind),
1342 Some(SourceKind::Virtual)
1343 )
1344 })
1345 .collect()
1346}
1347
1348fn get_alias_or_name(expr: &Expression) -> Option<String> {
1354 match expr {
1355 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1356 Expression::Column(col) => Some(col.name.name.clone()),
1357 Expression::Identifier(id) => Some(id.name.clone()),
1358 Expression::Star(_) => Some("*".to_string()),
1359 Expression::Annotated(a) => get_alias_or_name(&a.this),
1362 _ => None,
1363 }
1364}
1365
1366fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1368 match column {
1369 ColumnRef::Name(n) => n.to_string(),
1370 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1371 }
1372}
1373
1374fn find_select_expr(
1376 scope_expr: &Expression,
1377 column: &ColumnRef<'_>,
1378 dialect: Option<DialectType>,
1379) -> Result<Expression> {
1380 if let Expression::Select(ref select) = scope_expr {
1381 match column {
1382 ColumnRef::Name(name) => {
1383 let normalized_name = normalize_column_name(name, dialect);
1384 for expr in &select.expressions {
1385 if let Some(alias_or_name) = get_alias_or_name(expr) {
1386 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1387 return Ok(expr.clone());
1388 }
1389 }
1390 }
1391 Err(crate::error::Error::parse(
1392 format!("Cannot find column '{}' in query", name),
1393 0,
1394 0,
1395 0,
1396 0,
1397 ))
1398 }
1399 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1400 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1401 }),
1402 }
1403 } else {
1404 Err(crate::error::Error::parse(
1405 "Expected SELECT expression for column lookup",
1406 0,
1407 0,
1408 0,
1409 0,
1410 ))
1411 }
1412}
1413
1414fn column_to_index(
1416 set_op_expr: &Expression,
1417 name: &str,
1418 dialect: Option<DialectType>,
1419) -> Result<usize> {
1420 let normalized_name = normalize_column_name(name, dialect);
1421 let mut expr = set_op_expr;
1422 loop {
1423 match expr {
1424 Expression::Union(u) => expr = &u.left,
1425 Expression::Intersect(i) => expr = &i.left,
1426 Expression::Except(e) => expr = &e.left,
1427 Expression::Select(select) => {
1428 for (i, e) in select.expressions.iter().enumerate() {
1429 if let Some(alias_or_name) = get_alias_or_name(e) {
1430 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1431 return Ok(i);
1432 }
1433 }
1434 }
1435 return Err(crate::error::Error::parse(
1436 format!("Cannot find column '{}' in set operation", name),
1437 0,
1438 0,
1439 0,
1440 0,
1441 ));
1442 }
1443 _ => {
1444 return Err(crate::error::Error::parse(
1445 "Expected SELECT or set operation",
1446 0,
1447 0,
1448 0,
1449 0,
1450 ))
1451 }
1452 }
1453 }
1454}
1455
1456fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1457 normalize_name(name, dialect, false, true)
1458}
1459
1460fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1462 if let Expression::Select(select) = select_expr {
1463 let mut trimmed = select.as_ref().clone();
1464 trimmed.expressions = vec![target_expr.clone()];
1465 Expression::Select(Box::new(trimmed))
1466 } else {
1467 select_expr.clone()
1468 }
1469}
1470
1471fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1473 if scope.cte_sources.contains_key(source_name) {
1475 for cte_scope in &scope.cte_scopes {
1476 if let Expression::Cte(cte) = &cte_scope.expression {
1477 if cte.alias.name == source_name {
1478 return Some(cte_scope);
1479 }
1480 }
1481 }
1482 }
1483
1484 if let Some(source_info) = scope.sources.get(source_name) {
1486 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1487 if let Expression::Subquery(sq) = &source_info.expression {
1488 for dt_scope in &scope.derived_table_scopes {
1489 if dt_scope.expression == sq.this {
1490 return Some(dt_scope);
1491 }
1492 }
1493 }
1494 }
1495 }
1496
1497 None
1498}
1499
1500fn find_child_scope_in<'a>(
1504 all_cte_scopes: &[&'a Scope],
1505 scope: &'a Scope,
1506 source_name: &str,
1507) -> Option<&'a Scope> {
1508 for cte_scope in &scope.cte_scopes {
1510 if let Expression::Cte(cte) = &cte_scope.expression {
1511 if cte.alias.name == source_name {
1512 return Some(cte_scope);
1513 }
1514 }
1515 }
1516
1517 for cte_scope in all_cte_scopes {
1519 if let Expression::Cte(cte) = &cte_scope.expression {
1520 if cte.alias.name == source_name {
1521 return Some(cte_scope);
1522 }
1523 }
1524 }
1525
1526 if let Some(source_info) = scope.sources.get(source_name) {
1528 if source_info.is_scope {
1529 if let Expression::Subquery(sq) = &source_info.expression {
1530 for dt_scope in &scope.derived_table_scopes {
1531 if dt_scope.expression == sq.this {
1532 return Some(dt_scope);
1533 }
1534 }
1535 }
1536 }
1537 }
1538
1539 None
1540}
1541
1542fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1544 let mut node = LineageNode::new(
1545 format!("{}.{}", table, column),
1546 Expression::Column(Box::new(crate::expressions::Column {
1547 name: crate::expressions::Identifier::new(column.to_string()),
1548 table: Some(crate::expressions::Identifier::new(table.to_string())),
1549 join_mark: false,
1550 trailing_comments: vec![],
1551 span: None,
1552 inferred_type: None,
1553 })),
1554 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1555 );
1556 node.source_name = table.to_string();
1557 node.source_kind = SourceKind::Table;
1558 node
1559}
1560
1561fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1562 let mut parts: Vec<String> = Vec::new();
1563 if let Some(catalog) = &table_ref.catalog {
1564 parts.push(catalog.name.clone());
1565 }
1566 if let Some(schema) = &table_ref.schema {
1567 parts.push(schema.name.clone());
1568 }
1569 parts.push(table_ref.name.name.clone());
1570 parts.join(".")
1571}
1572
1573fn apply_source_info_context(
1574 node: &mut LineageNode,
1575 source_key: &str,
1576 source_info: &ScopeSourceInfo,
1577) {
1578 node.source_kind = source_info.kind;
1579 node.source_name =
1580 source_info
1581 .lineage_name
1582 .clone()
1583 .unwrap_or_else(|| match &source_info.expression {
1584 Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1585 _ => source_key.to_string(),
1586 });
1587 node.source_alias = source_info.alias.clone();
1588}
1589
1590fn make_table_column_node_from_source(
1591 source_key: &str,
1592 column: &str,
1593 source_info: &ScopeSourceInfo,
1594) -> LineageNode {
1595 let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1596 let mut node = LineageNode::new(
1597 format!("{}.{}", lineage_name, column),
1598 Expression::Column(Box::new(crate::expressions::Column {
1599 name: crate::expressions::Identifier::new(column.to_string()),
1600 table: Some(crate::expressions::Identifier::new(
1601 lineage_name.to_string(),
1602 )),
1603 join_mark: false,
1604 trailing_comments: vec![],
1605 span: None,
1606 inferred_type: None,
1607 })),
1608 source_info.expression.clone(),
1609 );
1610
1611 apply_source_info_context(&mut node, source_key, source_info);
1612
1613 node
1614}
1615
1616#[derive(Debug, Clone)]
1618struct SimpleColumnRef {
1619 table: Option<crate::expressions::Identifier>,
1620 column: String,
1621}
1622
1623fn find_column_refs_in_expr(
1625 expr: &Expression,
1626 dialect: Option<DialectType>,
1627) -> Vec<SimpleColumnRef> {
1628 let mut refs = Vec::new();
1629 collect_column_refs(expr, dialect, &mut refs);
1630 refs
1631}
1632
1633fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1634 match expr {
1635 Expression::Column(col) => {
1636 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1637 }
1638 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1639 _ => false,
1640 }
1641}
1642
1643fn collect_column_refs(
1644 expr: &Expression,
1645 dialect: Option<DialectType>,
1646 refs: &mut Vec<SimpleColumnRef>,
1647) {
1648 let mut stack: Vec<&Expression> = vec![expr];
1649
1650 while let Some(current) = stack.pop() {
1651 match current {
1652 Expression::Column(col) => {
1654 refs.push(SimpleColumnRef {
1655 table: col.table.clone(),
1656 column: col.name.name.clone(),
1657 });
1658 }
1659
1660 Expression::Subquery(_) | Expression::Exists(_) => {}
1662
1663 Expression::And(op)
1665 | Expression::Or(op)
1666 | Expression::Eq(op)
1667 | Expression::Neq(op)
1668 | Expression::Lt(op)
1669 | Expression::Lte(op)
1670 | Expression::Gt(op)
1671 | Expression::Gte(op)
1672 | Expression::Add(op)
1673 | Expression::Sub(op)
1674 | Expression::Mul(op)
1675 | Expression::Div(op)
1676 | Expression::Mod(op)
1677 | Expression::BitwiseAnd(op)
1678 | Expression::BitwiseOr(op)
1679 | Expression::BitwiseXor(op)
1680 | Expression::BitwiseLeftShift(op)
1681 | Expression::BitwiseRightShift(op)
1682 | Expression::Concat(op)
1683 | Expression::Adjacent(op)
1684 | Expression::TsMatch(op)
1685 | Expression::PropertyEQ(op)
1686 | Expression::ArrayContainsAll(op)
1687 | Expression::ArrayContainedBy(op)
1688 | Expression::ArrayOverlaps(op)
1689 | Expression::JSONBContainsAllTopKeys(op)
1690 | Expression::JSONBContainsAnyTopKeys(op)
1691 | Expression::JSONBDeleteAtPath(op)
1692 | Expression::ExtendsLeft(op)
1693 | Expression::ExtendsRight(op)
1694 | Expression::Is(op)
1695 | Expression::MemberOf(op)
1696 | Expression::NullSafeEq(op)
1697 | Expression::NullSafeNeq(op)
1698 | Expression::Glob(op)
1699 | Expression::Match(op) => {
1700 stack.push(&op.left);
1701 stack.push(&op.right);
1702 }
1703
1704 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1706 stack.push(&u.this);
1707 }
1708
1709 Expression::Upper(f)
1711 | Expression::Lower(f)
1712 | Expression::Length(f)
1713 | Expression::LTrim(f)
1714 | Expression::RTrim(f)
1715 | Expression::Reverse(f)
1716 | Expression::Abs(f)
1717 | Expression::Sqrt(f)
1718 | Expression::Cbrt(f)
1719 | Expression::Ln(f)
1720 | Expression::Exp(f)
1721 | Expression::Sign(f)
1722 | Expression::Date(f)
1723 | Expression::Time(f)
1724 | Expression::DateFromUnixDate(f)
1725 | Expression::UnixDate(f)
1726 | Expression::UnixSeconds(f)
1727 | Expression::UnixMillis(f)
1728 | Expression::UnixMicros(f)
1729 | Expression::TimeStrToDate(f)
1730 | Expression::DateToDi(f)
1731 | Expression::DiToDate(f)
1732 | Expression::TsOrDiToDi(f)
1733 | Expression::TsOrDsToDatetime(f)
1734 | Expression::TsOrDsToTimestamp(f)
1735 | Expression::YearOfWeek(f)
1736 | Expression::YearOfWeekIso(f)
1737 | Expression::Initcap(f)
1738 | Expression::Ascii(f)
1739 | Expression::Chr(f)
1740 | Expression::Soundex(f)
1741 | Expression::ByteLength(f)
1742 | Expression::Hex(f)
1743 | Expression::LowerHex(f)
1744 | Expression::Unicode(f)
1745 | Expression::Radians(f)
1746 | Expression::Degrees(f)
1747 | Expression::Sin(f)
1748 | Expression::Cos(f)
1749 | Expression::Tan(f)
1750 | Expression::Asin(f)
1751 | Expression::Acos(f)
1752 | Expression::Atan(f)
1753 | Expression::IsNan(f)
1754 | Expression::IsInf(f)
1755 | Expression::ArrayLength(f)
1756 | Expression::ArraySize(f)
1757 | Expression::Cardinality(f)
1758 | Expression::ArrayReverse(f)
1759 | Expression::ArrayDistinct(f)
1760 | Expression::ArrayFlatten(f)
1761 | Expression::ArrayCompact(f)
1762 | Expression::Explode(f)
1763 | Expression::ExplodeOuter(f)
1764 | Expression::ToArray(f)
1765 | Expression::MapFromEntries(f)
1766 | Expression::MapKeys(f)
1767 | Expression::MapValues(f)
1768 | Expression::JsonArrayLength(f)
1769 | Expression::JsonKeys(f)
1770 | Expression::JsonType(f)
1771 | Expression::ParseJson(f)
1772 | Expression::ToJson(f)
1773 | Expression::Typeof(f)
1774 | Expression::BitwiseCount(f)
1775 | Expression::Year(f)
1776 | Expression::Month(f)
1777 | Expression::Day(f)
1778 | Expression::Hour(f)
1779 | Expression::Minute(f)
1780 | Expression::Second(f)
1781 | Expression::DayOfWeek(f)
1782 | Expression::DayOfWeekIso(f)
1783 | Expression::DayOfMonth(f)
1784 | Expression::DayOfYear(f)
1785 | Expression::WeekOfYear(f)
1786 | Expression::Quarter(f)
1787 | Expression::Epoch(f)
1788 | Expression::EpochMs(f)
1789 | Expression::TimeStrToUnix(f)
1790 | Expression::SHA(f)
1791 | Expression::SHA1Digest(f)
1792 | Expression::TimeToUnix(f)
1793 | Expression::JSONBool(f)
1794 | Expression::Int64(f)
1795 | Expression::MD5NumberLower64(f)
1796 | Expression::MD5NumberUpper64(f)
1797 | Expression::DateStrToDate(f)
1798 | Expression::DateToDateStr(f) => {
1799 stack.push(&f.this);
1800 }
1801
1802 Expression::Power(f)
1804 | Expression::NullIf(f)
1805 | Expression::IfNull(f)
1806 | Expression::Nvl(f)
1807 | Expression::UnixToTimeStr(f)
1808 | Expression::Contains(f)
1809 | Expression::StartsWith(f)
1810 | Expression::EndsWith(f)
1811 | Expression::Levenshtein(f)
1812 | Expression::ModFunc(f)
1813 | Expression::Atan2(f)
1814 | Expression::IntDiv(f)
1815 | Expression::AddMonths(f)
1816 | Expression::MonthsBetween(f)
1817 | Expression::NextDay(f)
1818 | Expression::ArrayContains(f)
1819 | Expression::ArrayPosition(f)
1820 | Expression::ArrayAppend(f)
1821 | Expression::ArrayPrepend(f)
1822 | Expression::ArrayUnion(f)
1823 | Expression::ArrayExcept(f)
1824 | Expression::ArrayRemove(f)
1825 | Expression::StarMap(f)
1826 | Expression::MapFromArrays(f)
1827 | Expression::MapContainsKey(f)
1828 | Expression::ElementAt(f)
1829 | Expression::JsonMergePatch(f)
1830 | Expression::JSONBContains(f)
1831 | Expression::JSONBExtract(f) => {
1832 stack.push(&f.this);
1833 stack.push(&f.expression);
1834 }
1835
1836 Expression::Greatest(f)
1838 | Expression::Least(f)
1839 | Expression::Coalesce(f)
1840 | Expression::ArrayConcat(f)
1841 | Expression::ArrayIntersect(f)
1842 | Expression::ArrayZip(f)
1843 | Expression::MapConcat(f)
1844 | Expression::JsonArray(f) => {
1845 for e in &f.expressions {
1846 stack.push(e);
1847 }
1848 }
1849
1850 Expression::Sum(f)
1852 | Expression::Avg(f)
1853 | Expression::Min(f)
1854 | Expression::Max(f)
1855 | Expression::ArrayAgg(f)
1856 | Expression::CountIf(f)
1857 | Expression::Stddev(f)
1858 | Expression::StddevPop(f)
1859 | Expression::StddevSamp(f)
1860 | Expression::Variance(f)
1861 | Expression::VarPop(f)
1862 | Expression::VarSamp(f)
1863 | Expression::Median(f)
1864 | Expression::Mode(f)
1865 | Expression::First(f)
1866 | Expression::Last(f)
1867 | Expression::AnyValue(f)
1868 | Expression::ApproxDistinct(f)
1869 | Expression::ApproxCountDistinct(f)
1870 | Expression::LogicalAnd(f)
1871 | Expression::LogicalOr(f)
1872 | Expression::Skewness(f)
1873 | Expression::ArrayConcatAgg(f)
1874 | Expression::ArrayUniqueAgg(f)
1875 | Expression::BoolXorAgg(f)
1876 | Expression::BitwiseAndAgg(f)
1877 | Expression::BitwiseOrAgg(f)
1878 | Expression::BitwiseXorAgg(f) => {
1879 stack.push(&f.this);
1880 if let Some(ref filter) = f.filter {
1881 stack.push(filter);
1882 }
1883 if let Some((ref expr, _)) = f.having_max {
1884 stack.push(expr);
1885 }
1886 if let Some(ref limit) = f.limit {
1887 stack.push(limit);
1888 }
1889 }
1890
1891 Expression::Function(func) => {
1893 for arg in &func.args {
1894 stack.push(arg);
1895 }
1896 }
1897 Expression::AggregateFunction(func) => {
1898 for arg in &func.args {
1899 stack.push(arg);
1900 }
1901 if let Some(ref filter) = func.filter {
1902 stack.push(filter);
1903 }
1904 if let Some(ref limit) = func.limit {
1905 stack.push(limit);
1906 }
1907 }
1908
1909 Expression::WindowFunction(wf) => {
1911 stack.push(&wf.this);
1912 }
1913
1914 Expression::Alias(a) => {
1916 stack.push(&a.this);
1917 }
1918 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1919 stack.push(&c.this);
1920 if let Some(ref fmt) = c.format {
1921 stack.push(fmt);
1922 }
1923 if let Some(ref def) = c.default {
1924 stack.push(def);
1925 }
1926 }
1927 Expression::Paren(p) => {
1928 stack.push(&p.this);
1929 }
1930 Expression::Annotated(a) => {
1931 stack.push(&a.this);
1932 }
1933 Expression::Case(case) => {
1934 if let Some(ref operand) = case.operand {
1935 stack.push(operand);
1936 }
1937 for (cond, result) in &case.whens {
1938 stack.push(cond);
1939 stack.push(result);
1940 }
1941 if let Some(ref else_expr) = case.else_ {
1942 stack.push(else_expr);
1943 }
1944 }
1945 Expression::Collation(c) => {
1946 stack.push(&c.this);
1947 }
1948 Expression::In(i) => {
1949 stack.push(&i.this);
1950 for e in &i.expressions {
1951 stack.push(e);
1952 }
1953 if let Some(ref q) = i.query {
1954 stack.push(q);
1955 }
1956 if let Some(ref u) = i.unnest {
1957 stack.push(u);
1958 }
1959 }
1960 Expression::Between(b) => {
1961 stack.push(&b.this);
1962 stack.push(&b.low);
1963 stack.push(&b.high);
1964 }
1965 Expression::IsNull(n) => {
1966 stack.push(&n.this);
1967 }
1968 Expression::IsTrue(t) | Expression::IsFalse(t) => {
1969 stack.push(&t.this);
1970 }
1971 Expression::IsJson(j) => {
1972 stack.push(&j.this);
1973 }
1974 Expression::Like(l) | Expression::ILike(l) => {
1975 stack.push(&l.left);
1976 stack.push(&l.right);
1977 if let Some(ref esc) = l.escape {
1978 stack.push(esc);
1979 }
1980 }
1981 Expression::SimilarTo(s) => {
1982 stack.push(&s.this);
1983 stack.push(&s.pattern);
1984 if let Some(ref esc) = s.escape {
1985 stack.push(esc);
1986 }
1987 }
1988 Expression::Ordered(o) => {
1989 stack.push(&o.this);
1990 }
1991 Expression::Array(a) => {
1992 for e in &a.expressions {
1993 stack.push(e);
1994 }
1995 }
1996 Expression::Tuple(t) => {
1997 for e in &t.expressions {
1998 stack.push(e);
1999 }
2000 }
2001 Expression::Struct(s) => {
2002 for (_, e) in &s.fields {
2003 stack.push(e);
2004 }
2005 }
2006 Expression::Subscript(s) => {
2007 stack.push(&s.this);
2008 stack.push(&s.index);
2009 }
2010 Expression::Dot(d) => {
2011 stack.push(&d.this);
2012 }
2013 Expression::MethodCall(m) => {
2014 if !matches!(dialect, Some(DialectType::BigQuery))
2015 || !is_bigquery_safe_namespace_receiver(&m.this)
2016 {
2017 stack.push(&m.this);
2018 }
2019 for arg in &m.args {
2020 stack.push(arg);
2021 }
2022 }
2023 Expression::ArraySlice(s) => {
2024 stack.push(&s.this);
2025 if let Some(ref start) = s.start {
2026 stack.push(start);
2027 }
2028 if let Some(ref end) = s.end {
2029 stack.push(end);
2030 }
2031 }
2032 Expression::Lambda(l) => {
2033 stack.push(&l.body);
2034 }
2035 Expression::NamedArgument(n) => {
2036 stack.push(&n.value);
2037 }
2038 Expression::Lateral(l) => {
2039 stack.push(&l.this);
2040 if let Some(ref view) = l.view {
2041 stack.push(view);
2042 }
2043 if let Some(ref outer) = l.outer {
2044 stack.push(outer);
2045 }
2046 if let Some(ref ordinality) = l.ordinality {
2047 stack.push(ordinality);
2048 }
2049 }
2050 Expression::LateralView(lv) => {
2051 stack.push(&lv.this);
2052 }
2053 Expression::TryCatch(t) => {
2054 for stmt in &t.try_body {
2055 stack.push(stmt);
2056 }
2057 if let Some(catch_body) = &t.catch_body {
2058 for stmt in catch_body {
2059 stack.push(stmt);
2060 }
2061 }
2062 }
2063 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2064 stack.push(e);
2065 }
2066
2067 Expression::Substring(f) => {
2069 stack.push(&f.this);
2070 stack.push(&f.start);
2071 if let Some(ref len) = f.length {
2072 stack.push(len);
2073 }
2074 }
2075 Expression::Trim(f) => {
2076 stack.push(&f.this);
2077 if let Some(ref chars) = f.characters {
2078 stack.push(chars);
2079 }
2080 }
2081 Expression::Replace(f) => {
2082 stack.push(&f.this);
2083 stack.push(&f.old);
2084 stack.push(&f.new);
2085 }
2086 Expression::IfFunc(f) => {
2087 stack.push(&f.condition);
2088 stack.push(&f.true_value);
2089 if let Some(ref fv) = f.false_value {
2090 stack.push(fv);
2091 }
2092 }
2093 Expression::Nvl2(f) => {
2094 stack.push(&f.this);
2095 stack.push(&f.true_value);
2096 stack.push(&f.false_value);
2097 }
2098 Expression::ConcatWs(f) => {
2099 stack.push(&f.separator);
2100 for e in &f.expressions {
2101 stack.push(e);
2102 }
2103 }
2104 Expression::Count(f) => {
2105 if let Some(ref this) = f.this {
2106 stack.push(this);
2107 }
2108 if let Some(ref filter) = f.filter {
2109 stack.push(filter);
2110 }
2111 }
2112 Expression::GroupConcat(f) => {
2113 stack.push(&f.this);
2114 if let Some(ref sep) = f.separator {
2115 stack.push(sep);
2116 }
2117 if let Some(ref filter) = f.filter {
2118 stack.push(filter);
2119 }
2120 }
2121 Expression::StringAgg(f) => {
2122 stack.push(&f.this);
2123 if let Some(ref sep) = f.separator {
2124 stack.push(sep);
2125 }
2126 if let Some(ref filter) = f.filter {
2127 stack.push(filter);
2128 }
2129 if let Some(ref limit) = f.limit {
2130 stack.push(limit);
2131 }
2132 }
2133 Expression::ListAgg(f) => {
2134 stack.push(&f.this);
2135 if let Some(ref sep) = f.separator {
2136 stack.push(sep);
2137 }
2138 if let Some(ref filter) = f.filter {
2139 stack.push(filter);
2140 }
2141 }
2142 Expression::SumIf(f) => {
2143 stack.push(&f.this);
2144 stack.push(&f.condition);
2145 if let Some(ref filter) = f.filter {
2146 stack.push(filter);
2147 }
2148 }
2149 Expression::DateAdd(f) | Expression::DateSub(f) => {
2150 stack.push(&f.this);
2151 stack.push(&f.interval);
2152 }
2153 Expression::DateDiff(f) => {
2154 stack.push(&f.this);
2155 stack.push(&f.expression);
2156 }
2157 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2158 stack.push(&f.this);
2159 }
2160 Expression::Extract(f) => {
2161 stack.push(&f.this);
2162 }
2163 Expression::Round(f) => {
2164 stack.push(&f.this);
2165 if let Some(ref d) = f.decimals {
2166 stack.push(d);
2167 }
2168 }
2169 Expression::Floor(f) => {
2170 stack.push(&f.this);
2171 if let Some(ref s) = f.scale {
2172 stack.push(s);
2173 }
2174 if let Some(ref t) = f.to {
2175 stack.push(t);
2176 }
2177 }
2178 Expression::Ceil(f) => {
2179 stack.push(&f.this);
2180 if let Some(ref d) = f.decimals {
2181 stack.push(d);
2182 }
2183 if let Some(ref t) = f.to {
2184 stack.push(t);
2185 }
2186 }
2187 Expression::Log(f) => {
2188 stack.push(&f.this);
2189 if let Some(ref b) = f.base {
2190 stack.push(b);
2191 }
2192 }
2193 Expression::AtTimeZone(f) => {
2194 stack.push(&f.this);
2195 stack.push(&f.zone);
2196 }
2197 Expression::Lead(f) | Expression::Lag(f) => {
2198 stack.push(&f.this);
2199 if let Some(ref off) = f.offset {
2200 stack.push(off);
2201 }
2202 if let Some(ref def) = f.default {
2203 stack.push(def);
2204 }
2205 }
2206 Expression::FirstValue(f) | Expression::LastValue(f) => {
2207 stack.push(&f.this);
2208 }
2209 Expression::NthValue(f) => {
2210 stack.push(&f.this);
2211 stack.push(&f.offset);
2212 }
2213 Expression::Position(f) => {
2214 stack.push(&f.substring);
2215 stack.push(&f.string);
2216 if let Some(ref start) = f.start {
2217 stack.push(start);
2218 }
2219 }
2220 Expression::Decode(f) => {
2221 stack.push(&f.this);
2222 for (search, result) in &f.search_results {
2223 stack.push(search);
2224 stack.push(result);
2225 }
2226 if let Some(ref def) = f.default {
2227 stack.push(def);
2228 }
2229 }
2230 Expression::CharFunc(f) => {
2231 for arg in &f.args {
2232 stack.push(arg);
2233 }
2234 }
2235 Expression::ArraySort(f) => {
2236 stack.push(&f.this);
2237 if let Some(ref cmp) = f.comparator {
2238 stack.push(cmp);
2239 }
2240 }
2241 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2242 stack.push(&f.this);
2243 stack.push(&f.separator);
2244 if let Some(ref nr) = f.null_replacement {
2245 stack.push(nr);
2246 }
2247 }
2248 Expression::ArrayFilter(f) => {
2249 stack.push(&f.this);
2250 stack.push(&f.filter);
2251 }
2252 Expression::ArrayTransform(f) => {
2253 stack.push(&f.this);
2254 stack.push(&f.transform);
2255 }
2256 Expression::Sequence(f)
2257 | Expression::Generate(f)
2258 | Expression::ExplodingGenerateSeries(f) => {
2259 stack.push(&f.start);
2260 stack.push(&f.stop);
2261 if let Some(ref step) = f.step {
2262 stack.push(step);
2263 }
2264 }
2265 Expression::JsonExtract(f)
2266 | Expression::JsonExtractScalar(f)
2267 | Expression::JsonQuery(f)
2268 | Expression::JsonValue(f) => {
2269 stack.push(&f.this);
2270 stack.push(&f.path);
2271 }
2272 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2273 stack.push(&f.this);
2274 for p in &f.paths {
2275 stack.push(p);
2276 }
2277 }
2278 Expression::JsonObject(f) => {
2279 for (k, v) in &f.pairs {
2280 stack.push(k);
2281 stack.push(v);
2282 }
2283 }
2284 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2285 stack.push(&f.this);
2286 for (path, val) in &f.path_values {
2287 stack.push(path);
2288 stack.push(val);
2289 }
2290 }
2291 Expression::Overlay(f) => {
2292 stack.push(&f.this);
2293 stack.push(&f.replacement);
2294 stack.push(&f.from);
2295 if let Some(ref len) = f.length {
2296 stack.push(len);
2297 }
2298 }
2299 Expression::Convert(f) => {
2300 stack.push(&f.this);
2301 if let Some(ref style) = f.style {
2302 stack.push(style);
2303 }
2304 }
2305 Expression::ApproxPercentile(f) => {
2306 stack.push(&f.this);
2307 stack.push(&f.percentile);
2308 if let Some(ref acc) = f.accuracy {
2309 stack.push(acc);
2310 }
2311 if let Some(ref filter) = f.filter {
2312 stack.push(filter);
2313 }
2314 }
2315 Expression::Percentile(f)
2316 | Expression::PercentileCont(f)
2317 | Expression::PercentileDisc(f) => {
2318 stack.push(&f.this);
2319 stack.push(&f.percentile);
2320 if let Some(ref filter) = f.filter {
2321 stack.push(filter);
2322 }
2323 }
2324 Expression::WithinGroup(f) => {
2325 stack.push(&f.this);
2326 }
2327 Expression::Left(f) | Expression::Right(f) => {
2328 stack.push(&f.this);
2329 stack.push(&f.length);
2330 }
2331 Expression::Repeat(f) => {
2332 stack.push(&f.this);
2333 stack.push(&f.times);
2334 }
2335 Expression::Lpad(f) | Expression::Rpad(f) => {
2336 stack.push(&f.this);
2337 stack.push(&f.length);
2338 if let Some(ref fill) = f.fill {
2339 stack.push(fill);
2340 }
2341 }
2342 Expression::Split(f) => {
2343 stack.push(&f.this);
2344 stack.push(&f.delimiter);
2345 }
2346 Expression::RegexpLike(f) => {
2347 stack.push(&f.this);
2348 stack.push(&f.pattern);
2349 if let Some(ref flags) = f.flags {
2350 stack.push(flags);
2351 }
2352 }
2353 Expression::RegexpReplace(f) => {
2354 stack.push(&f.this);
2355 stack.push(&f.pattern);
2356 stack.push(&f.replacement);
2357 if let Some(ref flags) = f.flags {
2358 stack.push(flags);
2359 }
2360 }
2361 Expression::RegexpExtract(f) => {
2362 stack.push(&f.this);
2363 stack.push(&f.pattern);
2364 if let Some(ref group) = f.group {
2365 stack.push(group);
2366 }
2367 }
2368 Expression::ToDate(f) => {
2369 stack.push(&f.this);
2370 if let Some(ref fmt) = f.format {
2371 stack.push(fmt);
2372 }
2373 }
2374 Expression::ToTimestamp(f) => {
2375 stack.push(&f.this);
2376 if let Some(ref fmt) = f.format {
2377 stack.push(fmt);
2378 }
2379 }
2380 Expression::DateFormat(f) | Expression::FormatDate(f) => {
2381 stack.push(&f.this);
2382 stack.push(&f.format);
2383 }
2384 Expression::LastDay(f) => {
2385 stack.push(&f.this);
2386 }
2387 Expression::FromUnixtime(f) => {
2388 stack.push(&f.this);
2389 if let Some(ref fmt) = f.format {
2390 stack.push(fmt);
2391 }
2392 }
2393 Expression::UnixTimestamp(f) => {
2394 if let Some(ref this) = f.this {
2395 stack.push(this);
2396 }
2397 if let Some(ref fmt) = f.format {
2398 stack.push(fmt);
2399 }
2400 }
2401 Expression::MakeDate(f) => {
2402 stack.push(&f.year);
2403 stack.push(&f.month);
2404 stack.push(&f.day);
2405 }
2406 Expression::MakeTimestamp(f) => {
2407 stack.push(&f.year);
2408 stack.push(&f.month);
2409 stack.push(&f.day);
2410 stack.push(&f.hour);
2411 stack.push(&f.minute);
2412 stack.push(&f.second);
2413 if let Some(ref tz) = f.timezone {
2414 stack.push(tz);
2415 }
2416 }
2417 Expression::TruncFunc(f) => {
2418 stack.push(&f.this);
2419 if let Some(ref d) = f.decimals {
2420 stack.push(d);
2421 }
2422 }
2423 Expression::ArrayFunc(f) => {
2424 for e in &f.expressions {
2425 stack.push(e);
2426 }
2427 }
2428 Expression::Unnest(f) => {
2429 stack.push(&f.this);
2430 for e in &f.expressions {
2431 stack.push(e);
2432 }
2433 }
2434 Expression::StructFunc(f) => {
2435 for (_, e) in &f.fields {
2436 stack.push(e);
2437 }
2438 }
2439 Expression::StructExtract(f) => {
2440 stack.push(&f.this);
2441 }
2442 Expression::NamedStruct(f) => {
2443 for (k, v) in &f.pairs {
2444 stack.push(k);
2445 stack.push(v);
2446 }
2447 }
2448 Expression::MapFunc(f) => {
2449 for k in &f.keys {
2450 stack.push(k);
2451 }
2452 for v in &f.values {
2453 stack.push(v);
2454 }
2455 }
2456 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2457 stack.push(&f.this);
2458 stack.push(&f.transform);
2459 }
2460 Expression::JsonArrayAgg(f) => {
2461 stack.push(&f.this);
2462 if let Some(ref filter) = f.filter {
2463 stack.push(filter);
2464 }
2465 }
2466 Expression::JsonObjectAgg(f) => {
2467 stack.push(&f.key);
2468 stack.push(&f.value);
2469 if let Some(ref filter) = f.filter {
2470 stack.push(filter);
2471 }
2472 }
2473 Expression::NTile(f) => {
2474 if let Some(ref n) = f.num_buckets {
2475 stack.push(n);
2476 }
2477 }
2478 Expression::Rand(f) => {
2479 if let Some(ref s) = f.seed {
2480 stack.push(s);
2481 }
2482 if let Some(ref lo) = f.lower {
2483 stack.push(lo);
2484 }
2485 if let Some(ref hi) = f.upper {
2486 stack.push(hi);
2487 }
2488 }
2489 Expression::Any(q) | Expression::All(q) => {
2490 stack.push(&q.this);
2491 stack.push(&q.subquery);
2492 }
2493 Expression::Overlaps(o) => {
2494 if let Some(ref this) = o.this {
2495 stack.push(this);
2496 }
2497 if let Some(ref expr) = o.expression {
2498 stack.push(expr);
2499 }
2500 if let Some(ref ls) = o.left_start {
2501 stack.push(ls);
2502 }
2503 if let Some(ref le) = o.left_end {
2504 stack.push(le);
2505 }
2506 if let Some(ref rs) = o.right_start {
2507 stack.push(rs);
2508 }
2509 if let Some(ref re) = o.right_end {
2510 stack.push(re);
2511 }
2512 }
2513 Expression::Interval(i) => {
2514 if let Some(ref this) = i.this {
2515 stack.push(this);
2516 }
2517 }
2518 Expression::TimeStrToTime(f) => {
2519 stack.push(&f.this);
2520 if let Some(ref zone) = f.zone {
2521 stack.push(zone);
2522 }
2523 }
2524 Expression::JSONBExtractScalar(f) => {
2525 stack.push(&f.this);
2526 stack.push(&f.expression);
2527 if let Some(ref jt) = f.json_type {
2528 stack.push(jt);
2529 }
2530 }
2531 Expression::JSONExtract(f) => {
2532 stack.push(&f.this);
2533 stack.push(&f.expression);
2534 for e in &f.expressions {
2535 stack.push(e);
2536 }
2537 if let Some(ref option) = f.option {
2538 stack.push(option);
2539 }
2540 if let Some(ref on_condition) = f.on_condition {
2541 stack.push(on_condition);
2542 }
2543 }
2544
2545 _ => {}
2550 }
2551 }
2552}
2553
2554#[cfg(test)]
2559mod tests {
2560 use super::*;
2561 use crate::dialects::{Dialect, DialectType};
2562 use crate::expressions::DataType;
2563 use crate::optimizer::annotate_types::annotate_types;
2564 use crate::parse_one;
2565 use crate::schema::{MappingSchema, Schema};
2566
2567 fn parse(sql: &str) -> Expression {
2568 let dialect = Dialect::get(DialectType::Generic);
2569 let ast = dialect.parse(sql).unwrap();
2570 ast.into_iter().next().unwrap()
2571 }
2572
2573 #[test]
2574 fn test_simple_lineage() {
2575 let expr = parse("SELECT a FROM t");
2576 let node = lineage("a", &expr, None, false).unwrap();
2577
2578 assert_eq!(node.name, "a");
2579 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2580 let names = node.downstream_names();
2582 assert!(
2583 names.iter().any(|n| n == "t.a"),
2584 "Expected t.a in downstream, got: {:?}",
2585 names
2586 );
2587 }
2588
2589 #[test]
2590 fn test_lineage_walk() {
2591 let root = LineageNode {
2592 name: "col_a".to_string(),
2593 expression: Expression::Null(crate::expressions::Null),
2594 source: Expression::Null(crate::expressions::Null),
2595 downstream: vec![LineageNode::new(
2596 "t.a",
2597 Expression::Null(crate::expressions::Null),
2598 Expression::Null(crate::expressions::Null),
2599 )],
2600 source_name: String::new(),
2601 source_kind: SourceKind::Unknown,
2602 source_alias: None,
2603 reference_node_name: String::new(),
2604 };
2605
2606 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2607 assert_eq!(names.len(), 2);
2608 assert_eq!(names[0], "col_a");
2609 assert_eq!(names[1], "t.a");
2610 }
2611
2612 #[test]
2613 fn test_aliased_column() {
2614 let expr = parse("SELECT a + 1 AS b FROM t");
2615 let node = lineage("b", &expr, None, false).unwrap();
2616
2617 assert_eq!(node.name, "b");
2618 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2620 assert!(
2621 all_names.iter().any(|n| n.contains("a")),
2622 "Expected to trace to column a, got: {:?}",
2623 all_names
2624 );
2625 }
2626
2627 #[test]
2628 fn test_qualified_column() {
2629 let expr = parse("SELECT t.a FROM t");
2630 let node = lineage("a", &expr, None, false).unwrap();
2631
2632 assert_eq!(node.name, "a");
2633 let names = node.downstream_names();
2634 assert!(
2635 names.iter().any(|n| n == "t.a"),
2636 "Expected t.a, got: {:?}",
2637 names
2638 );
2639 }
2640
2641 #[test]
2642 fn test_unqualified_column() {
2643 let expr = parse("SELECT a FROM t");
2644 let node = lineage("a", &expr, None, false).unwrap();
2645
2646 let names = node.downstream_names();
2648 assert!(
2649 names.iter().any(|n| n == "t.a"),
2650 "Expected t.a, got: {:?}",
2651 names
2652 );
2653 }
2654
2655 #[test]
2656 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2657 let query = "SELECT name FROM users";
2658 let dialect = Dialect::get(DialectType::BigQuery);
2659 let expr = dialect
2660 .parse(query)
2661 .unwrap()
2662 .into_iter()
2663 .next()
2664 .expect("expected one expression");
2665
2666 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2667 schema
2668 .add_table("users", &[("name".into(), DataType::Text)], None)
2669 .expect("schema setup");
2670
2671 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2672 .expect("lineage without schema");
2673 let mut expr_without = node_without_schema.expression.clone();
2674 annotate_types(
2675 &mut expr_without,
2676 Some(&schema),
2677 Some(DialectType::BigQuery),
2678 );
2679 assert_eq!(
2680 expr_without.inferred_type(),
2681 None,
2682 "Expected unresolved root type without schema-aware lineage qualification"
2683 );
2684
2685 let node_with_schema = lineage_with_schema(
2686 "name",
2687 &expr,
2688 Some(&schema),
2689 Some(DialectType::BigQuery),
2690 false,
2691 )
2692 .expect("lineage with schema");
2693 let mut expr_with = node_with_schema.expression.clone();
2694 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2695
2696 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2697 }
2698
2699 #[test]
2700 fn test_lineage_with_schema_correlated_scalar_subquery() {
2701 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2702 let dialect = Dialect::get(DialectType::BigQuery);
2703 let expr = dialect
2704 .parse(query)
2705 .unwrap()
2706 .into_iter()
2707 .next()
2708 .expect("expected one expression");
2709
2710 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2711 schema
2712 .add_table(
2713 "t1",
2714 &[("id".into(), DataType::BigInt { length: None })],
2715 None,
2716 )
2717 .expect("schema setup");
2718 schema
2719 .add_table(
2720 "t2",
2721 &[
2722 ("id".into(), DataType::BigInt { length: None }),
2723 ("val".into(), DataType::BigInt { length: None }),
2724 ],
2725 None,
2726 )
2727 .expect("schema setup");
2728
2729 let node = lineage_with_schema(
2730 "id",
2731 &expr,
2732 Some(&schema),
2733 Some(DialectType::BigQuery),
2734 false,
2735 )
2736 .expect("lineage_with_schema should handle correlated scalar subqueries");
2737
2738 assert_eq!(node.name, "id");
2739 }
2740
2741 #[test]
2742 fn test_lineage_with_schema_join_using() {
2743 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2744 let dialect = Dialect::get(DialectType::BigQuery);
2745 let expr = dialect
2746 .parse(query)
2747 .unwrap()
2748 .into_iter()
2749 .next()
2750 .expect("expected one expression");
2751
2752 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2753 schema
2754 .add_table(
2755 "t1",
2756 &[("a".into(), DataType::BigInt { length: None })],
2757 None,
2758 )
2759 .expect("schema setup");
2760 schema
2761 .add_table(
2762 "t2",
2763 &[("a".into(), DataType::BigInt { length: None })],
2764 None,
2765 )
2766 .expect("schema setup");
2767
2768 let node = lineage_with_schema(
2769 "a",
2770 &expr,
2771 Some(&schema),
2772 Some(DialectType::BigQuery),
2773 false,
2774 )
2775 .expect("lineage_with_schema should handle JOIN USING");
2776
2777 assert_eq!(node.name, "a");
2778 }
2779
2780 #[test]
2781 fn test_lineage_with_schema_qualified_table_name() {
2782 let query = "SELECT a FROM raw.t1";
2783 let dialect = Dialect::get(DialectType::BigQuery);
2784 let expr = dialect
2785 .parse(query)
2786 .unwrap()
2787 .into_iter()
2788 .next()
2789 .expect("expected one expression");
2790
2791 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2792 schema
2793 .add_table(
2794 "raw.t1",
2795 &[("a".into(), DataType::BigInt { length: None })],
2796 None,
2797 )
2798 .expect("schema setup");
2799
2800 let node = lineage_with_schema(
2801 "a",
2802 &expr,
2803 Some(&schema),
2804 Some(DialectType::BigQuery),
2805 false,
2806 )
2807 .expect("lineage_with_schema should handle dotted schema.table names");
2808
2809 assert_eq!(node.name, "a");
2810 }
2811
2812 #[test]
2813 fn test_lineage_with_schema_none_matches_lineage() {
2814 let expr = parse("SELECT a FROM t");
2815 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2816 let with_none =
2817 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2818
2819 assert_eq!(with_none.name, baseline.name);
2820 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2821 }
2822
2823 #[test]
2824 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2825 let dialect = Dialect::get(DialectType::BigQuery);
2826 let expr = dialect
2827 .parse("SELECT Name AS name FROM teams")
2828 .unwrap()
2829 .into_iter()
2830 .next()
2831 .expect("expected one expression");
2832
2833 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2834 schema
2835 .add_table(
2836 "teams",
2837 &[("Name".into(), DataType::String { length: None })],
2838 None,
2839 )
2840 .expect("schema setup");
2841
2842 let node = lineage_with_schema(
2843 "name",
2844 &expr,
2845 Some(&schema),
2846 Some(DialectType::BigQuery),
2847 false,
2848 )
2849 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2850
2851 let names = node.downstream_names();
2852 assert!(
2853 names.iter().any(|n| n == "teams.Name"),
2854 "Expected teams.Name in downstream, got: {:?}",
2855 names
2856 );
2857 }
2858
2859 #[test]
2860 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2861 let dialect = Dialect::get(DialectType::BigQuery);
2862 let expr = dialect
2863 .parse("SELECT Name AS Name FROM teams")
2864 .unwrap()
2865 .into_iter()
2866 .next()
2867 .expect("expected one expression");
2868
2869 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2870 .expect("lineage should resolve mixed-case aliases in BigQuery");
2871
2872 assert_eq!(node.name, "name");
2873 }
2874
2875 #[test]
2876 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2877 let expr = parse_one(
2878 r#"
2879SELECT date_val AS week_start
2880FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2881"#,
2882 DialectType::BigQuery,
2883 )
2884 .expect("parse");
2885
2886 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2887 .expect("lineage should resolve UNNEST alias as a source");
2888 let child = node
2889 .downstream
2890 .first()
2891 .expect("week_start should have downstream lineage");
2892
2893 assert_eq!(child.name, "_0.date_val");
2894 assert_eq!(child.source_name, "_0");
2895 assert_eq!(child.source_kind, SourceKind::Virtual);
2896 assert_eq!(child.source_alias.as_deref(), Some("date_val"));
2897
2898 let Expression::Column(column) = &child.expression else {
2899 panic!(
2900 "expected downstream column expression, got {:?}",
2901 child.expression
2902 );
2903 };
2904 assert_eq!(column.name.name, "date_val");
2905 assert_eq!(
2906 column.table.as_ref().map(|table| table.name.as_str()),
2907 Some("_0")
2908 );
2909 assert!(
2910 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2911 "expected UNNEST source expression, got {:?}",
2912 child.source
2913 );
2914 }
2915
2916 #[test]
2917 fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
2918 let expr =
2919 parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
2920
2921 let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2922 let child = node.downstream.first().expect("id should have lineage");
2923
2924 assert_eq!(child.name, "date_val.id");
2925 assert_eq!(child.source_name, "date_val");
2926 assert_eq!(child.source_kind, SourceKind::Table);
2927 assert_eq!(child.source_alias, None);
2928 }
2929
2930 #[test]
2931 fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
2932 let expr = parse_one(
2933 r#"
2934SELECT a.a AS first_value, b.b AS second_value
2935FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
2936JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
2937"#,
2938 DialectType::BigQuery,
2939 )
2940 .expect("parse");
2941
2942 let first =
2943 lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2944 let second =
2945 lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2946
2947 let first_child = first.downstream.first().expect("first source");
2948 let second_child = second.downstream.first().expect("second source");
2949
2950 assert_eq!(first_child.name, "_0.a");
2951 assert_eq!(first_child.source_name, "_0");
2952 assert_eq!(first_child.source_alias.as_deref(), Some("a"));
2953 assert_eq!(first_child.source_kind, SourceKind::Virtual);
2954
2955 assert_eq!(second_child.name, "_1.b");
2956 assert_eq!(second_child.source_name, "_1");
2957 assert_eq!(second_child.source_alias.as_deref(), Some("b"));
2958 assert_eq!(second_child.source_kind, SourceKind::Virtual);
2959 }
2960
2961 #[test]
2962 fn test_lineage_table_backed_unnest_points_to_real_source_column() {
2963 let expr = parse_one(
2964 r#"
2965SELECT item.item AS item
2966FROM t JOIN UNNEST(t.items) AS item ON TRUE
2967"#,
2968 DialectType::BigQuery,
2969 )
2970 .expect("parse");
2971
2972 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2973 let virtual_child = node.downstream.first().expect("virtual item source");
2974 assert_eq!(virtual_child.name, "_0.item");
2975 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2976
2977 let real_child = virtual_child
2978 .downstream
2979 .first()
2980 .expect("UNNEST(t.items) should depend on t.items");
2981 assert_eq!(real_child.name, "t.items");
2982 assert_eq!(real_child.source_name, "t");
2983 assert_eq!(real_child.source_kind, SourceKind::Table);
2984 }
2985
2986 #[test]
2987 fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
2988 let expr = parse_one(
2989 r#"
2990SELECT item AS item
2991FROM t JOIN UNNEST(t.items) AS item ON TRUE
2992"#,
2993 DialectType::BigQuery,
2994 )
2995 .expect("parse");
2996
2997 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2998 let virtual_child = node.downstream.first().expect("virtual item source");
2999 assert_eq!(virtual_child.name, "_0.item");
3000 assert_eq!(virtual_child.source_name, "_0");
3001 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3002 assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
3003
3004 let real_child = virtual_child
3005 .downstream
3006 .first()
3007 .expect("UNNEST(t.items) should depend on t.items");
3008 assert_eq!(real_child.name, "t.items");
3009 assert_eq!(real_child.source_name, "t");
3010 assert_eq!(real_child.source_kind, SourceKind::Table);
3011 }
3012
3013 #[test]
3014 fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
3015 let cases = [
3016 (
3017 DialectType::PostgreSQL,
3018 "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
3019 ),
3020 (
3021 DialectType::Presto,
3022 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3023 ),
3024 (
3025 DialectType::Trino,
3026 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3027 ),
3028 ];
3029
3030 for (dialect, sql) in cases {
3031 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3032 let node = lineage("out", &expr, Some(dialect), false)
3033 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3034 let virtual_child = node
3035 .downstream
3036 .first()
3037 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3038
3039 assert_eq!(
3040 virtual_child.name, "_0.x",
3041 "unexpected virtual child for {dialect:?}"
3042 );
3043 assert_eq!(virtual_child.source_name, "_0");
3044 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3045 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3046
3047 let real_child = virtual_child
3048 .downstream
3049 .first()
3050 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3051 assert_eq!(real_child.name, "t.items");
3052 assert_eq!(real_child.source_kind, SourceKind::Table);
3053 }
3054 }
3055
3056 #[test]
3057 fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3058 let cases = [
3059 (
3060 DialectType::Spark,
3061 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3062 ),
3063 (
3064 DialectType::Hive,
3065 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3066 ),
3067 ];
3068
3069 for (dialect, sql) in cases {
3070 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3071 let node = lineage("out", &expr, Some(dialect), false)
3072 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3073 let virtual_child = node
3074 .downstream
3075 .first()
3076 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3077
3078 assert_eq!(virtual_child.name, "_0.x");
3079 assert_eq!(virtual_child.source_name, "_0");
3080 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3081 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3082
3083 let real_child = virtual_child
3084 .downstream
3085 .first()
3086 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3087 assert_eq!(real_child.name, "t.items");
3088 assert_eq!(real_child.source_kind, SourceKind::Table);
3089 }
3090 }
3091
3092 #[test]
3093 fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3094 let expr = parse_one(
3095 "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3096 DialectType::Snowflake,
3097 )
3098 .expect("parse");
3099
3100 let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3101 let virtual_child = node.downstream.first().expect("virtual flatten source");
3102 assert_eq!(virtual_child.name, "_0.value");
3103 assert_eq!(virtual_child.source_name, "_0");
3104 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3105 assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3106
3107 let real_child = virtual_child
3108 .downstream
3109 .first()
3110 .expect("FLATTEN input should depend on raw_events.payload");
3111 assert_eq!(real_child.name, "raw_events.payload");
3112 assert_eq!(real_child.source_kind, SourceKind::Table);
3113 }
3114
3115 #[test]
3116 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3117 let expr = parse_one(
3118 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3119 DialectType::Snowflake,
3120 )
3121 .expect("parse");
3122
3123 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3124 schema
3125 .add_table(
3126 "fact.some_daily_metrics",
3127 &[("date_utc".to_string(), DataType::Date)],
3128 None,
3129 )
3130 .expect("schema setup");
3131
3132 let node = lineage_with_schema(
3133 "recency",
3134 &expr,
3135 Some(&schema),
3136 Some(DialectType::Snowflake),
3137 false,
3138 )
3139 .expect("lineage_with_schema should not treat date part as a column");
3140
3141 let names = node.downstream_names();
3142 assert!(
3143 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3144 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3145 names
3146 );
3147 assert!(
3148 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3149 "Did not expect date part to appear as lineage column, got: {:?}",
3150 names
3151 );
3152 }
3153
3154 #[test]
3155 fn test_snowflake_datediff_parses_to_typed_ast() {
3156 let expr = parse_one(
3157 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3158 DialectType::Snowflake,
3159 )
3160 .expect("parse");
3161
3162 match expr {
3163 Expression::Select(select) => match &select.expressions[0] {
3164 Expression::Alias(alias) => match &alias.this {
3165 Expression::DateDiff(f) => {
3166 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3167 }
3168 other => panic!("expected DateDiff, got {other:?}"),
3169 },
3170 other => panic!("expected Alias, got {other:?}"),
3171 },
3172 other => panic!("expected Select, got {other:?}"),
3173 }
3174 }
3175
3176 #[test]
3177 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3178 let expr = parse_one(
3179 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3180 DialectType::Snowflake,
3181 )
3182 .expect("parse");
3183
3184 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3185 schema
3186 .add_table(
3187 "fact.some_daily_metrics",
3188 &[("date_utc".to_string(), DataType::Date)],
3189 None,
3190 )
3191 .expect("schema setup");
3192
3193 let node = lineage_with_schema(
3194 "next_day",
3195 &expr,
3196 Some(&schema),
3197 Some(DialectType::Snowflake),
3198 false,
3199 )
3200 .expect("lineage_with_schema should not treat DATEADD date part as a column");
3201
3202 let names = node.downstream_names();
3203 assert!(
3204 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3205 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3206 names
3207 );
3208 assert!(
3209 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3210 "Did not expect date part to appear as lineage column, got: {:?}",
3211 names
3212 );
3213 }
3214
3215 #[test]
3216 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3217 let expr = parse_one(
3218 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3219 DialectType::Snowflake,
3220 )
3221 .expect("parse");
3222
3223 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3224 schema
3225 .add_table(
3226 "fact.some_daily_metrics",
3227 &[("date_utc".to_string(), DataType::Date)],
3228 None,
3229 )
3230 .expect("schema setup");
3231
3232 let node = lineage_with_schema(
3233 "day_part",
3234 &expr,
3235 Some(&schema),
3236 Some(DialectType::Snowflake),
3237 false,
3238 )
3239 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3240
3241 let names = node.downstream_names();
3242 assert!(
3243 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3244 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3245 names
3246 );
3247 assert!(
3248 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3249 "Did not expect date part to appear as lineage column, got: {:?}",
3250 names
3251 );
3252 }
3253
3254 #[test]
3255 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3256 let expr = parse_one(
3257 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3258 DialectType::Snowflake,
3259 )
3260 .expect("parse");
3261
3262 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3263 schema
3264 .add_table(
3265 "fact.some_daily_metrics",
3266 &[("date_utc".to_string(), DataType::Date)],
3267 None,
3268 )
3269 .expect("schema setup");
3270
3271 let node = lineage_with_schema(
3272 "day_part",
3273 &expr,
3274 Some(&schema),
3275 Some(DialectType::Snowflake),
3276 false,
3277 )
3278 .expect("quoted DATE_PART should continue to work");
3279
3280 let names = node.downstream_names();
3281 assert!(
3282 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3283 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3284 names
3285 );
3286 }
3287
3288 #[test]
3289 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
3290 let expr = parse_one(
3291 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3292 DialectType::Snowflake,
3293 )
3294 .expect("parse");
3295
3296 match expr {
3297 Expression::Select(select) => match &select.expressions[0] {
3298 Expression::Alias(alias) => match &alias.this {
3299 Expression::Function(f) => {
3300 assert_eq!(f.name.to_uppercase(), "DATEADD");
3301 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3302 }
3303 other => panic!("expected generic DATEADD function, got {other:?}"),
3304 },
3305 other => panic!("expected Alias, got {other:?}"),
3306 },
3307 other => panic!("expected Select, got {other:?}"),
3308 }
3309 }
3310
3311 #[test]
3312 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
3313 let expr = parse_one(
3314 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3315 DialectType::Snowflake,
3316 )
3317 .expect("parse");
3318
3319 match expr {
3320 Expression::Select(select) => match &select.expressions[0] {
3321 Expression::Alias(alias) => match &alias.this {
3322 Expression::Function(f) => {
3323 assert_eq!(f.name.to_uppercase(), "DATE_PART");
3324 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3325 }
3326 other => panic!("expected generic DATE_PART function, got {other:?}"),
3327 },
3328 other => panic!("expected Alias, got {other:?}"),
3329 },
3330 other => panic!("expected Select, got {other:?}"),
3331 }
3332 }
3333
3334 #[test]
3335 fn test_snowflake_date_part_string_literal_stays_generic_function() {
3336 let expr = parse_one(
3337 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3338 DialectType::Snowflake,
3339 )
3340 .expect("parse");
3341
3342 match expr {
3343 Expression::Select(select) => match &select.expressions[0] {
3344 Expression::Alias(alias) => match &alias.this {
3345 Expression::Function(f) => {
3346 assert_eq!(f.name.to_uppercase(), "DATE_PART");
3347 }
3348 other => panic!("expected generic DATE_PART function, got {other:?}"),
3349 },
3350 other => panic!("expected Alias, got {other:?}"),
3351 },
3352 other => panic!("expected Select, got {other:?}"),
3353 }
3354 }
3355
3356 #[test]
3357 fn test_lineage_join() {
3358 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3359
3360 let node_a = lineage("a", &expr, None, false).unwrap();
3361 let names_a = node_a.downstream_names();
3362 assert!(
3363 names_a.iter().any(|n| n == "t.a"),
3364 "Expected t.a, got: {:?}",
3365 names_a
3366 );
3367
3368 let node_b = lineage("b", &expr, None, false).unwrap();
3369 let names_b = node_b.downstream_names();
3370 assert!(
3371 names_b.iter().any(|n| n == "s.b"),
3372 "Expected s.b, got: {:?}",
3373 names_b
3374 );
3375 }
3376
3377 #[test]
3378 fn test_lineage_alias_leaf_has_resolved_source_name() {
3379 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3380 let node = lineage("col1", &expr, None, false).unwrap();
3381
3382 let names = node.downstream_names();
3384 assert!(
3385 names.iter().any(|n| n == "t1.col1"),
3386 "Expected aliased column edge t1.col1, got: {:?}",
3387 names
3388 );
3389
3390 let leaf = node
3392 .downstream
3393 .iter()
3394 .find(|n| n.name == "t1.col1")
3395 .expect("Expected t1.col1 leaf");
3396 assert_eq!(leaf.source_name, "table1");
3397 match &leaf.source {
3398 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3399 _ => panic!("Expected leaf source to be a table expression"),
3400 }
3401 }
3402
3403 #[test]
3404 fn test_lineage_derived_table() {
3405 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3406 let node = lineage("a", &expr, None, false).unwrap();
3407
3408 assert_eq!(node.name, "a");
3409 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3411 assert!(
3412 all_names.iter().any(|n| n == "t.a"),
3413 "Expected to trace through derived table to t.a, got: {:?}",
3414 all_names
3415 );
3416 }
3417
3418 #[test]
3419 fn test_lineage_cte() {
3420 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3421 let node = lineage("a", &expr, None, false).unwrap();
3422
3423 assert_eq!(node.name, "a");
3424 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3425 assert!(
3426 all_names.iter().any(|n| n == "t.a"),
3427 "Expected to trace through CTE to t.a, got: {:?}",
3428 all_names
3429 );
3430 }
3431
3432 #[test]
3433 fn test_lineage_union() {
3434 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3435 let node = lineage("a", &expr, None, false).unwrap();
3436
3437 assert_eq!(node.name, "a");
3438 assert_eq!(
3440 node.downstream.len(),
3441 2,
3442 "Expected 2 branches for UNION, got {}",
3443 node.downstream.len()
3444 );
3445 }
3446
3447 #[test]
3448 fn test_lineage_cte_union() {
3449 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3450 let node = lineage("a", &expr, None, false).unwrap();
3451
3452 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3454 assert!(
3455 all_names.len() >= 3,
3456 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3457 all_names
3458 );
3459 }
3460
3461 #[test]
3462 fn test_lineage_star() {
3463 let expr = parse("SELECT * FROM t");
3464 let node = lineage("*", &expr, None, false).unwrap();
3465
3466 assert_eq!(node.name, "*");
3467 assert!(
3469 !node.downstream.is_empty(),
3470 "Star should produce downstream nodes"
3471 );
3472 }
3473
3474 #[test]
3475 fn test_lineage_subquery_in_select() {
3476 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3477 let node = lineage("x", &expr, None, false).unwrap();
3478
3479 assert_eq!(node.name, "x");
3480 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3482 assert!(
3483 all_names.len() >= 2,
3484 "Expected tracing into scalar subquery, got: {:?}",
3485 all_names
3486 );
3487 }
3488
3489 #[test]
3490 fn test_lineage_multiple_columns() {
3491 let expr = parse("SELECT a, b FROM t");
3492
3493 let node_a = lineage("a", &expr, None, false).unwrap();
3494 let node_b = lineage("b", &expr, None, false).unwrap();
3495
3496 assert_eq!(node_a.name, "a");
3497 assert_eq!(node_b.name, "b");
3498
3499 let names_a = node_a.downstream_names();
3501 let names_b = node_b.downstream_names();
3502 assert!(names_a.iter().any(|n| n == "t.a"));
3503 assert!(names_b.iter().any(|n| n == "t.b"));
3504 }
3505
3506 #[test]
3507 fn test_get_source_tables() {
3508 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3509 let node = lineage("a", &expr, None, false).unwrap();
3510
3511 let tables = get_source_tables(&node);
3512 assert!(
3513 tables.contains("t"),
3514 "Expected source table 't', got: {:?}",
3515 tables
3516 );
3517 }
3518
3519 #[test]
3520 fn test_lineage_column_not_found() {
3521 let expr = parse("SELECT a FROM t");
3522 let result = lineage("nonexistent", &expr, None, false);
3523 assert!(result.is_err());
3524 }
3525
3526 #[test]
3527 fn test_lineage_nested_cte() {
3528 let expr = parse(
3529 "WITH cte1 AS (SELECT a FROM t), \
3530 cte2 AS (SELECT a FROM cte1) \
3531 SELECT a FROM cte2",
3532 );
3533 let node = lineage("a", &expr, None, false).unwrap();
3534
3535 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3537 assert!(
3538 all_names.len() >= 3,
3539 "Expected to trace through nested CTEs, got: {:?}",
3540 all_names
3541 );
3542 }
3543
3544 #[test]
3545 fn test_trim_selects_true() {
3546 let expr = parse("SELECT a, b, c FROM t");
3547 let node = lineage("a", &expr, None, true).unwrap();
3548
3549 if let Expression::Select(select) = &node.source {
3551 assert_eq!(
3552 select.expressions.len(),
3553 1,
3554 "Trimmed source should have 1 expression, got {}",
3555 select.expressions.len()
3556 );
3557 } else {
3558 panic!("Expected Select source");
3559 }
3560 }
3561
3562 #[test]
3563 fn test_trim_selects_false() {
3564 let expr = parse("SELECT a, b, c FROM t");
3565 let node = lineage("a", &expr, None, false).unwrap();
3566
3567 if let Expression::Select(select) = &node.source {
3569 assert_eq!(
3570 select.expressions.len(),
3571 3,
3572 "Untrimmed source should have 3 expressions"
3573 );
3574 } else {
3575 panic!("Expected Select source");
3576 }
3577 }
3578
3579 #[test]
3580 fn test_lineage_expression_in_select() {
3581 let expr = parse("SELECT a + b AS c FROM t");
3582 let node = lineage("c", &expr, None, false).unwrap();
3583
3584 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3586 assert!(
3587 all_names.len() >= 3,
3588 "Expected to trace a + b to both columns, got: {:?}",
3589 all_names
3590 );
3591 }
3592
3593 #[test]
3594 fn test_set_operation_by_index() {
3595 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3596
3597 let node = lineage("a", &expr, None, false).unwrap();
3599
3600 assert_eq!(node.downstream.len(), 2);
3602 }
3603
3604 fn print_node(node: &LineageNode, indent: usize) {
3607 let pad = " ".repeat(indent);
3608 println!(
3609 "{pad}name={:?} source_name={:?}",
3610 node.name, node.source_name
3611 );
3612 for child in &node.downstream {
3613 print_node(child, indent + 1);
3614 }
3615 }
3616
3617 #[test]
3618 fn test_issue18_repro() {
3619 let query = "SELECT UPPER(name) as upper_name FROM users";
3621 println!("Query: {query}\n");
3622
3623 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3624 let exprs = dialect.parse(query).unwrap();
3625 let expr = &exprs[0];
3626
3627 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3628 println!("lineage(\"upper_name\"):");
3629 print_node(&node, 1);
3630
3631 let names = node.downstream_names();
3632 assert!(
3633 names.iter().any(|n| n == "users.name"),
3634 "Expected users.name in downstream, got: {:?}",
3635 names
3636 );
3637 }
3638
3639 #[test]
3640 fn test_lineage_bigquery_safe_namespace_issue207() {
3641 let query = r#"
3642WITH import_cte AS (
3643 SELECT timestamp, data, operation
3644 FROM `project`.`dataset`.`source_table`
3645),
3646transform_cte AS (
3647 SELECT
3648 timestamp,
3649 SAFE.PARSE_JSON(data) AS json_data
3650 FROM import_cte
3651)
3652SELECT json_data FROM transform_cte
3653"#;
3654 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3655 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3656 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3657 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3658
3659 assert!(
3660 names.iter().any(|name| name == "source_table.data"),
3661 "expected source_table.data in lineage, got {names:?}"
3662 );
3663 assert!(
3664 !names
3665 .iter()
3666 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3667 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3668 );
3669 }
3670
3671 #[test]
3672 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3673 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3674 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3675 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3676 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3677
3678 assert!(
3679 names.iter().any(|name| name == "t.data"),
3680 "expected t.data in lineage, got {names:?}"
3681 );
3682 assert!(
3683 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3684 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3685 );
3686 }
3687
3688 #[test]
3689 fn test_lineage_method_call_receiver_control() {
3690 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3691 let node = lineage("out", &expr, None, false).expect("lineage");
3692 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3693
3694 assert!(
3695 names.iter().any(|name| name == "t.obj"),
3696 "expected ordinary method receiver to remain in lineage, got {names:?}"
3697 );
3698 assert!(
3699 names.iter().any(|name| name == "t.arg"),
3700 "expected method argument in lineage, got {names:?}"
3701 );
3702 }
3703
3704 #[test]
3705 fn test_lineage_upper_function() {
3706 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3707 let node = lineage("upper_name", &expr, None, false).unwrap();
3708
3709 let names = node.downstream_names();
3710 assert!(
3711 names.iter().any(|n| n == "users.name"),
3712 "Expected users.name in downstream, got: {:?}",
3713 names
3714 );
3715 }
3716
3717 #[test]
3718 fn test_lineage_round_function() {
3719 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3720 let node = lineage("rounded", &expr, None, false).unwrap();
3721
3722 let names = node.downstream_names();
3723 assert!(
3724 names.iter().any(|n| n == "products.price"),
3725 "Expected products.price in downstream, got: {:?}",
3726 names
3727 );
3728 }
3729
3730 #[test]
3731 fn test_lineage_coalesce_function() {
3732 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3733 let node = lineage("val", &expr, None, false).unwrap();
3734
3735 let names = node.downstream_names();
3736 assert!(
3737 names.iter().any(|n| n == "t.a"),
3738 "Expected t.a in downstream, got: {:?}",
3739 names
3740 );
3741 assert!(
3742 names.iter().any(|n| n == "t.b"),
3743 "Expected t.b in downstream, got: {:?}",
3744 names
3745 );
3746 }
3747
3748 #[test]
3749 fn test_lineage_count_function() {
3750 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3751 let node = lineage("cnt", &expr, None, false).unwrap();
3752
3753 let names = node.downstream_names();
3754 assert!(
3755 names.iter().any(|n| n == "t.id"),
3756 "Expected t.id in downstream, got: {:?}",
3757 names
3758 );
3759 }
3760
3761 #[test]
3762 fn test_lineage_sum_function() {
3763 let expr = parse("SELECT SUM(amount) AS total FROM t");
3764 let node = lineage("total", &expr, None, false).unwrap();
3765
3766 let names = node.downstream_names();
3767 assert!(
3768 names.iter().any(|n| n == "t.amount"),
3769 "Expected t.amount in downstream, got: {:?}",
3770 names
3771 );
3772 }
3773
3774 #[test]
3775 fn test_lineage_case_with_nested_functions() {
3776 let expr =
3777 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3778 let node = lineage("result", &expr, None, false).unwrap();
3779
3780 let names = node.downstream_names();
3781 assert!(
3782 names.iter().any(|n| n == "t.x"),
3783 "Expected t.x in downstream, got: {:?}",
3784 names
3785 );
3786 assert!(
3787 names.iter().any(|n| n == "t.name"),
3788 "Expected t.name in downstream, got: {:?}",
3789 names
3790 );
3791 }
3792
3793 #[test]
3794 fn test_lineage_substring_function() {
3795 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3796 let node = lineage("short", &expr, None, false).unwrap();
3797
3798 let names = node.downstream_names();
3799 assert!(
3800 names.iter().any(|n| n == "t.name"),
3801 "Expected t.name in downstream, got: {:?}",
3802 names
3803 );
3804 }
3805
3806 #[test]
3809 fn test_lineage_cte_select_star() {
3810 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3814 let node = lineage("a", &expr, None, false).unwrap();
3815
3816 assert_eq!(node.name, "a");
3817 assert!(
3820 !node.downstream.is_empty(),
3821 "Expected downstream nodes tracing through CTE, got none"
3822 );
3823 }
3824
3825 #[test]
3826 fn test_lineage_cte_select_star_renamed_column() {
3827 let expr =
3830 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3831 let node = lineage("customer_id", &expr, None, false).unwrap();
3832
3833 assert_eq!(node.name, "customer_id");
3834 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3836 assert!(
3837 all_names.len() >= 2,
3838 "Expected at least 2 nodes (customer_id → source), got: {:?}",
3839 all_names
3840 );
3841 }
3842
3843 #[test]
3844 fn test_lineage_cte_select_star_multiple_columns() {
3845 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3847
3848 for col in &["a", "b", "c"] {
3849 let node = lineage(col, &expr, None, false).unwrap();
3850 assert_eq!(node.name, *col);
3851 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3853 assert!(
3854 all_names.len() >= 2,
3855 "Expected at least 2 nodes for column {}, got: {:?}",
3856 col,
3857 all_names
3858 );
3859 }
3860 }
3861
3862 #[test]
3863 fn test_lineage_nested_cte_select_star() {
3864 let expr = parse(
3866 "WITH cte1 AS (SELECT a FROM t), \
3867 cte2 AS (SELECT * FROM cte1) \
3868 SELECT * FROM cte2",
3869 );
3870 let node = lineage("a", &expr, None, false).unwrap();
3871
3872 assert_eq!(node.name, "a");
3873 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3874 assert!(
3875 all_names.len() >= 3,
3876 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3877 all_names
3878 );
3879 }
3880
3881 #[test]
3882 fn test_lineage_three_level_nested_cte_star() {
3883 let expr = parse(
3885 "WITH cte1 AS (SELECT x FROM t), \
3886 cte2 AS (SELECT * FROM cte1), \
3887 cte3 AS (SELECT * FROM cte2) \
3888 SELECT * FROM cte3",
3889 );
3890 let node = lineage("x", &expr, None, false).unwrap();
3891
3892 assert_eq!(node.name, "x");
3893 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3894 assert!(
3895 all_names.len() >= 4,
3896 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3897 all_names
3898 );
3899 }
3900
3901 #[test]
3902 fn test_lineage_cte_union_star() {
3903 let expr = parse(
3905 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3906 SELECT * FROM cte",
3907 );
3908 let node = lineage("a", &expr, None, false).unwrap();
3909
3910 assert_eq!(node.name, "a");
3911 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3912 assert!(
3913 all_names.len() >= 2,
3914 "Expected at least 2 nodes for CTE union star, got: {:?}",
3915 all_names
3916 );
3917 }
3918
3919 #[test]
3920 fn test_lineage_cte_star_unknown_table() {
3921 let expr = parse(
3924 "WITH cte AS (SELECT * FROM unknown_table) \
3925 SELECT * FROM cte",
3926 );
3927 let _result = lineage("x", &expr, None, false);
3930 }
3931
3932 #[test]
3933 fn test_lineage_cte_explicit_columns() {
3934 let expr = parse(
3936 "WITH cte(x, y) AS (SELECT a, b FROM t) \
3937 SELECT * FROM cte",
3938 );
3939 let node = lineage("x", &expr, None, false).unwrap();
3940 assert_eq!(node.name, "x");
3941 }
3942
3943 #[test]
3944 fn test_lineage_cte_qualified_star() {
3945 let expr = parse(
3947 "WITH cte AS (SELECT a, b FROM t) \
3948 SELECT cte.* FROM cte",
3949 );
3950 for col in &["a", "b"] {
3951 let node = lineage(col, &expr, None, false).unwrap();
3952 assert_eq!(node.name, *col);
3953 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3954 assert!(
3955 all_names.len() >= 2,
3956 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3957 col,
3958 all_names
3959 );
3960 }
3961 }
3962
3963 #[test]
3964 fn test_lineage_subquery_select_star() {
3965 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3968 let node = lineage("x", &expr, None, false).unwrap();
3969
3970 assert_eq!(node.name, "x");
3971 assert!(
3972 !node.downstream.is_empty(),
3973 "Expected downstream nodes for subquery with SELECT *, got none"
3974 );
3975 }
3976
3977 #[test]
3978 fn test_lineage_cte_star_with_schema_external_table() {
3979 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3981SELECT * FROM orders"#;
3982 let expr = parse(sql);
3983
3984 let mut schema = MappingSchema::new();
3985 let cols = vec![
3986 ("order_id".to_string(), DataType::Unknown),
3987 ("customer_id".to_string(), DataType::Unknown),
3988 ("amount".to_string(), DataType::Unknown),
3989 ];
3990 schema.add_table("stg_orders", &cols, None).unwrap();
3991
3992 let node =
3993 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3994 .unwrap();
3995 assert_eq!(node.name, "order_id");
3996 }
3997
3998 #[test]
3999 fn test_lineage_cte_star_with_schema_three_part_name() {
4000 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
4002SELECT * FROM orders"#;
4003 let expr = parse(sql);
4004
4005 let mut schema = MappingSchema::new();
4006 let cols = vec![
4007 ("order_id".to_string(), DataType::Unknown),
4008 ("customer_id".to_string(), DataType::Unknown),
4009 ];
4010 schema
4011 .add_table("db.schema.stg_orders", &cols, None)
4012 .unwrap();
4013
4014 let node = lineage_with_schema(
4015 "customer_id",
4016 &expr,
4017 Some(&schema as &dyn Schema),
4018 None,
4019 false,
4020 )
4021 .unwrap();
4022 assert_eq!(node.name, "customer_id");
4023 }
4024
4025 #[test]
4026 fn test_lineage_cte_star_with_schema_nested() {
4027 let sql = r#"WITH
4030 raw AS (SELECT * FROM external_table),
4031 enriched AS (SELECT * FROM raw)
4032 SELECT * FROM enriched"#;
4033 let expr = parse(sql);
4034
4035 let mut schema = MappingSchema::new();
4036 let cols = vec![
4037 ("id".to_string(), DataType::Unknown),
4038 ("name".to_string(), DataType::Unknown),
4039 ];
4040 schema.add_table("external_table", &cols, None).unwrap();
4041
4042 let node =
4043 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4044 assert_eq!(node.name, "name");
4045 }
4046
4047 #[test]
4048 fn test_lineage_cte_qualified_star_with_schema() {
4049 let sql = r#"WITH
4052 orders AS (SELECT * FROM stg_orders),
4053 enriched AS (
4054 SELECT orders.*, 'extra' AS extra
4055 FROM orders
4056 )
4057 SELECT * FROM enriched"#;
4058 let expr = parse(sql);
4059
4060 let mut schema = MappingSchema::new();
4061 let cols = vec![
4062 ("order_id".to_string(), DataType::Unknown),
4063 ("total".to_string(), DataType::Unknown),
4064 ];
4065 schema.add_table("stg_orders", &cols, None).unwrap();
4066
4067 let node =
4068 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4069 .unwrap();
4070 assert_eq!(node.name, "order_id");
4071
4072 let extra =
4074 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4075 assert_eq!(extra.name, "extra");
4076 }
4077
4078 #[test]
4079 fn test_lineage_cte_star_without_schema_still_works() {
4080 let sql = r#"WITH
4082 cte1 AS (SELECT id, name FROM raw_table),
4083 cte2 AS (SELECT * FROM cte1)
4084 SELECT * FROM cte2"#;
4085 let expr = parse(sql);
4086
4087 let node = lineage("id", &expr, None, false).unwrap();
4089 assert_eq!(node.name, "id");
4090 }
4091
4092 #[test]
4093 fn test_lineage_nested_cte_star_with_join_and_schema() {
4094 let sql = r#"WITH
4097base_orders AS (
4098 SELECT * FROM stg_orders
4099),
4100with_payments AS (
4101 SELECT
4102 base_orders.*,
4103 p.amount
4104 FROM base_orders
4105 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4106),
4107final_cte AS (
4108 SELECT * FROM with_payments
4109)
4110SELECT * FROM final_cte"#;
4111 let expr = parse(sql);
4112
4113 let mut schema = MappingSchema::new();
4114 let order_cols = vec![
4115 (
4116 "order_id".to_string(),
4117 crate::expressions::DataType::Unknown,
4118 ),
4119 (
4120 "customer_id".to_string(),
4121 crate::expressions::DataType::Unknown,
4122 ),
4123 ("status".to_string(), crate::expressions::DataType::Unknown),
4124 ];
4125 let pay_cols = vec![
4126 (
4127 "payment_id".to_string(),
4128 crate::expressions::DataType::Unknown,
4129 ),
4130 (
4131 "order_id".to_string(),
4132 crate::expressions::DataType::Unknown,
4133 ),
4134 ("amount".to_string(), crate::expressions::DataType::Unknown),
4135 ];
4136 schema.add_table("stg_orders", &order_cols, None).unwrap();
4137 schema.add_table("stg_payments", &pay_cols, None).unwrap();
4138
4139 let node =
4141 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4142 .unwrap();
4143 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4144
4145 let has_table_qualified = all_names
4147 .iter()
4148 .any(|n| n.contains('.') && n.contains("order_id"));
4149 assert!(
4150 has_table_qualified,
4151 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4152 all_names
4153 );
4154
4155 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4157 .unwrap();
4158 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4159
4160 let has_table_qualified = all_names
4161 .iter()
4162 .any(|n| n.contains('.') && n.contains("amount"));
4163 assert!(
4164 has_table_qualified,
4165 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4166 all_names
4167 );
4168 }
4169
4170 #[test]
4171 fn test_lineage_cte_alias_resolution() {
4172 let sql = r#"WITH import_stg_items AS (
4174 SELECT item_id, name, status FROM stg_items
4175)
4176SELECT base.item_id, base.status
4177FROM import_stg_items AS base"#;
4178 let expr = parse(sql);
4179
4180 let node = lineage("item_id", &expr, None, false).unwrap();
4181 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4182 assert!(
4184 all_names.iter().any(|n| n == "stg_items.item_id"),
4185 "Expected leaf 'stg_items.item_id', got: {:?}",
4186 all_names
4187 );
4188 }
4189
4190 #[test]
4191 fn test_lineage_cte_alias_with_schema_and_star() {
4192 let sql = r#"WITH import_stg AS (
4194 SELECT * FROM stg_items
4195)
4196SELECT base.item_id, base.status
4197FROM import_stg AS base"#;
4198 let expr = parse(sql);
4199
4200 let mut schema = MappingSchema::new();
4201 schema
4202 .add_table(
4203 "stg_items",
4204 &[
4205 ("item_id".to_string(), DataType::Unknown),
4206 ("name".to_string(), DataType::Unknown),
4207 ("status".to_string(), DataType::Unknown),
4208 ],
4209 None,
4210 )
4211 .unwrap();
4212
4213 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
4214 .unwrap();
4215 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4216 assert!(
4217 all_names.iter().any(|n| n == "stg_items.item_id"),
4218 "Expected leaf 'stg_items.item_id', got: {:?}",
4219 all_names
4220 );
4221 }
4222
4223 #[test]
4224 fn test_lineage_cte_alias_with_join() {
4225 let sql = r#"WITH
4227 import_users AS (SELECT id, name FROM users),
4228 import_orders AS (SELECT id, user_id, amount FROM orders)
4229SELECT u.name, o.amount
4230FROM import_users AS u
4231LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
4232 let expr = parse(sql);
4233
4234 let node = lineage("name", &expr, None, false).unwrap();
4235 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4236 assert!(
4237 all_names.iter().any(|n| n == "users.name"),
4238 "Expected leaf 'users.name', got: {:?}",
4239 all_names
4240 );
4241
4242 let node = lineage("amount", &expr, None, false).unwrap();
4243 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4244 assert!(
4245 all_names.iter().any(|n| n == "orders.amount"),
4246 "Expected leaf 'orders.amount', got: {:?}",
4247 all_names
4248 );
4249 }
4250
4251 #[test]
4256 fn test_lineage_unquoted_cte_case_insensitive() {
4257 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
4260 let node = lineage("col", &expr, None, false).unwrap();
4261 assert_eq!(node.name, "col");
4262 assert!(
4263 !node.downstream.is_empty(),
4264 "Unquoted CTE should resolve case-insensitively"
4265 );
4266 }
4267
4268 #[test]
4269 fn test_lineage_quoted_cte_case_preserved() {
4270 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
4272 let node = lineage("col", &expr, None, false).unwrap();
4273 assert_eq!(node.name, "col");
4274 assert!(
4275 !node.downstream.is_empty(),
4276 "Quoted CTE with matching case should resolve"
4277 );
4278 }
4279
4280 #[test]
4281 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
4282 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
4286 let result = lineage("col", &expr, None, false);
4289 assert!(
4290 result.is_err(),
4291 "Quoted CTE with case mismatch should not expand star: {:?}",
4292 result
4293 );
4294 }
4295
4296 #[test]
4297 fn test_lineage_mixed_quoted_unquoted_cte() {
4298 let expr = parse(
4300 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
4301 );
4302 let node = lineage("a", &expr, None, false).unwrap();
4303 assert_eq!(node.name, "a");
4304 assert!(
4305 !node.downstream.is_empty(),
4306 "Mixed quoted/unquoted CTE chain should resolve"
4307 );
4308 }
4309
4310 #[test]
4326 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
4327 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
4338 let node = lineage("col", &expr, None, false).unwrap();
4339 assert!(!node.downstream.is_empty());
4340 let child = &node.downstream[0];
4341 assert_eq!(
4343 child.source_name, "MyCte",
4344 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4345 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4346 );
4347 }
4348
4349 #[test]
4350 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4351 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4358 let node = lineage("col", &expr, None, false).unwrap();
4359 assert!(!node.downstream.is_empty());
4360 let child = &node.downstream[0];
4361 assert_eq!(
4363 child.source_name, "MyCte",
4364 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4365 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4366 );
4367 }
4368
4369 #[test]
4376 #[ignore = "requires derived table star expansion (separate issue)"]
4377 fn test_node_name_doesnt_contain_comment() {
4378 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4379 let node = lineage("x", &expr, None, false).unwrap();
4380
4381 assert_eq!(node.name, "x");
4382 assert!(!node.downstream.is_empty());
4383 }
4384
4385 #[test]
4389 fn test_comment_before_first_column_in_cte() {
4390 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
4391 let sql_without_comment = "with t as (select 1 as a) select a from t";
4392
4393 let expr_ok = parse(sql_without_comment);
4395 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4396
4397 let expr_comment = parse(sql_with_comment);
4399 let node_comment = lineage("a", &expr_comment, None, false)
4400 .expect("with comment before first column should succeed");
4401
4402 assert_eq!(node_ok.name, node_comment.name, "node names should match");
4403 assert_eq!(
4404 node_ok.downstream_names(),
4405 node_comment.downstream_names(),
4406 "downstream lineage should be identical with or without comment"
4407 );
4408 }
4409
4410 #[test]
4412 fn test_block_comment_before_first_column() {
4413 let sql = "with t as (select 1 as a) select /* section */ a from t";
4414 let expr = parse(sql);
4415 let node = lineage("a", &expr, None, false)
4416 .expect("block comment before first column should succeed");
4417 assert_eq!(node.name, "a");
4418 assert!(
4419 !node.downstream.is_empty(),
4420 "should have downstream lineage"
4421 );
4422 }
4423
4424 #[test]
4426 fn test_comment_before_first_column_second_col_ok() {
4427 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
4428 let expr = parse(sql);
4429
4430 let node_a =
4431 lineage("a", &expr, None, false).expect("column a with comment should succeed");
4432 assert_eq!(node_a.name, "a");
4433
4434 let node_b =
4435 lineage("b", &expr, None, false).expect("column b with comment should succeed");
4436 assert_eq!(node_b.name, "b");
4437 }
4438
4439 #[test]
4441 fn test_comment_before_aliased_column() {
4442 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
4443 let expr = parse(sql);
4444 let node =
4445 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4446 assert_eq!(node.name, "y");
4447 assert!(
4448 !node.downstream.is_empty(),
4449 "aliased column should have downstream lineage"
4450 );
4451 }
4452}