1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, JoinKind, NamedWindow, Select};
10use crate::generator::Generator;
11use crate::optimizer::annotate_types::annotate_types;
12use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
13use crate::schema::{normalize_name, Schema};
14use crate::scope::{
15 build_scope, find_all_in_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind,
16};
17use crate::{Error, Result};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, HashSet};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct LineageNode {
24 pub name: String,
26 pub expression: Expression,
28 pub source: Expression,
30 pub downstream: Vec<LineageNode>,
32 pub source_name: String,
34 pub source_kind: SourceKind,
36 #[serde(default, skip_serializing_if = "Option::is_none")]
38 pub source_alias: Option<String>,
39 pub reference_node_name: String,
41}
42
43impl LineageNode {
44 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
46 Self {
47 name: name.into(),
48 expression,
49 source,
50 downstream: Vec::new(),
51 source_name: String::new(),
52 source_kind: SourceKind::Unknown,
53 source_alias: None,
54 reference_node_name: String::new(),
55 }
56 }
57
58 pub fn walk(&self) -> LineageWalker<'_> {
60 LineageWalker { stack: vec![self] }
61 }
62
63 pub fn downstream_names(&self) -> Vec<String> {
65 self.downstream.iter().map(|n| n.name.clone()).collect()
66 }
67}
68
69fn source_kind_for_scope_context(
70 scope: &Scope,
71 source_name: &str,
72 reference_node_name: &str,
73) -> SourceKind {
74 if source_name.is_empty() && reference_node_name.is_empty() {
75 return SourceKind::Root;
76 }
77 if let Some(source_info) = scope.sources.get(source_name) {
78 return source_info.kind;
79 }
80 if scope.cte_sources.contains_key(source_name) {
81 return SourceKind::Cte;
82 }
83 match scope.scope_type {
84 ScopeType::Cte => SourceKind::Cte,
85 ScopeType::DerivedTable => SourceKind::DerivedTable,
86 ScopeType::Udtf => SourceKind::Virtual,
87 _ => SourceKind::Unknown,
88 }
89}
90
91fn apply_scope_context(
92 node: &mut LineageNode,
93 scope: &Scope,
94 source_name: &str,
95 reference_node_name: &str,
96) {
97 node.source_name = source_name.to_string();
98 node.reference_node_name = reference_node_name.to_string();
99 node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
100}
101
102pub struct LineageWalker<'a> {
104 stack: Vec<&'a LineageNode>,
105}
106
107impl<'a> Iterator for LineageWalker<'a> {
108 type Item = &'a LineageNode;
109
110 fn next(&mut self) -> Option<Self::Item> {
111 if let Some(node) = self.stack.pop() {
112 for child in node.downstream.iter().rev() {
114 self.stack.push(child);
115 }
116 Some(node)
117 } else {
118 None
119 }
120 }
121}
122
123enum ColumnRef<'a> {
129 Name(&'a str),
130 Index(usize),
131}
132
133pub fn lineage(
159 column: &str,
160 sql: &Expression,
161 dialect: Option<DialectType>,
162 trim_selects: bool,
163) -> Result<LineageNode> {
164 let mut owned = lineage_normalized_expression(sql);
165 if has_lineage_with_clause(&owned) {
167 expand_cte_stars(&mut owned, None);
168 }
169 lineage_from_expression(column, &owned, dialect, trim_selects)
170}
171
172pub fn lineage_with_schema(
188 column: &str,
189 sql: &Expression,
190 schema: Option<&dyn Schema>,
191 dialect: Option<DialectType>,
192 trim_selects: bool,
193) -> Result<LineageNode> {
194 let normalized_expression = lineage_normalized_expression(sql);
195 let mut qualified_expression = if let Some(schema) = schema {
196 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197 QualifyColumnsOptions::new()
198 .with_dialect(dialect_type)
199 .with_allow_partial(true)
200 } else {
201 QualifyColumnsOptions::new().with_allow_partial(true)
202 };
203
204 qualify_columns(normalized_expression.clone(), schema, &options).map_err(|e| {
205 Error::internal(format!("Lineage qualification failed with schema: {}", e))
206 })?
207 } else {
208 normalized_expression
209 };
210
211 annotate_types(&mut qualified_expression, schema, dialect);
213
214 expand_cte_stars(&mut qualified_expression, schema);
217
218 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
219}
220
221fn lineage_from_expression(
222 column: &str,
223 sql: &Expression,
224 dialect: Option<DialectType>,
225 trim_selects: bool,
226) -> Result<LineageNode> {
227 let scope = build_scope(sql);
228 to_node(
229 ColumnRef::Name(column),
230 &scope,
231 dialect,
232 "",
233 "",
234 "",
235 trim_selects,
236 )
237}
238
239pub(crate) fn lineage_by_index_from_expression(
240 column_index: usize,
241 sql: &Expression,
242 dialect: Option<DialectType>,
243 trim_selects: bool,
244) -> Result<LineageNode> {
245 let normalized = lineage_normalized_expression(sql);
246 let scope = build_scope(&normalized);
247 to_node(
248 ColumnRef::Index(column_index),
249 &scope,
250 dialect,
251 "",
252 "",
253 "",
254 trim_selects,
255 )
256}
257
258fn lineage_normalized_expression(sql: &Expression) -> Expression {
259 match sql {
260 Expression::Prepare(prepare) => lineage_normalized_expression(&prepare.statement),
261 Expression::CreateTable(create) => create
262 .as_select
263 .as_ref()
264 .map(|query| attach_with_to_query(query.clone(), create.with_cte.clone()))
265 .unwrap_or_else(|| sql.clone()),
266 Expression::CreateView(create) => lineage_normalized_expression(&create.query),
267 Expression::Insert(insert) => insert
268 .query
269 .as_ref()
270 .map(|query| attach_with_to_query(query.clone(), insert.with.clone()))
271 .unwrap_or_else(|| sql.clone()),
272 _ => sql.clone(),
273 }
274}
275
276fn attach_with_to_query(
277 mut query: Expression,
278 with: Option<crate::expressions::With>,
279) -> Expression {
280 if let Some(with) = with {
281 attach_with_to_query_mut(&mut query, with);
282 }
283 query
284}
285
286fn attach_with_to_query_mut(query: &mut Expression, with: crate::expressions::With) {
287 match query {
288 Expression::Select(select) => {
289 if select.with.is_none() {
290 select.with = Some(with);
291 }
292 }
293 Expression::Union(union) => {
294 if union.with.is_none() {
295 union.with = Some(with);
296 }
297 }
298 Expression::Intersect(intersect) => {
299 if intersect.with.is_none() {
300 intersect.with = Some(with);
301 }
302 }
303 Expression::Except(except) => {
304 if except.with.is_none() {
305 except.with = Some(with);
306 }
307 }
308 Expression::Paren(paren) => attach_with_to_query_mut(&mut paren.this, with),
309 _ => {}
310 }
311}
312
313fn has_lineage_with_clause(expr: &Expression) -> bool {
314 match expr {
315 Expression::Select(select) => select.with.is_some(),
316 Expression::Union(union) => {
317 union.with.is_some()
318 || has_lineage_with_clause(&union.left)
319 || has_lineage_with_clause(&union.right)
320 }
321 Expression::Intersect(intersect) => {
322 intersect.with.is_some()
323 || has_lineage_with_clause(&intersect.left)
324 || has_lineage_with_clause(&intersect.right)
325 }
326 Expression::Except(except) => {
327 except.with.is_some()
328 || has_lineage_with_clause(&except.left)
329 || has_lineage_with_clause(&except.right)
330 }
331 Expression::Paren(paren) => has_lineage_with_clause(&paren.this),
332 _ => false,
333 }
334}
335
336fn normalize_cte_name(ident: &Identifier) -> String {
346 if ident.quoted {
347 ident.name.clone()
348 } else {
349 ident.name.to_lowercase()
350 }
351}
352
353pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
365 if let Expression::Prepare(prepare) = expr {
366 expand_cte_stars(&mut prepare.statement, schema);
367 return;
368 }
369
370 let select = match expr {
371 Expression::Select(s) => s,
372 _ => return,
373 };
374
375 let with = match &mut select.with {
376 Some(w) => w,
377 None => return,
378 };
379
380 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
381
382 for cte in &mut with.ctes {
383 let cte_name = normalize_cte_name(&cte.alias);
384
385 if !cte.columns.is_empty() {
387 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
388 resolved_cte_columns.insert(cte_name, cols);
389 continue;
390 }
391
392 if with.recursive {
398 let is_self_referencing =
399 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
400 let body_sources = get_select_sources(body_select);
401 body_sources.iter().any(|s| s.normalized == cte_name)
402 } else {
403 false
404 };
405 if is_self_referencing {
406 continue;
407 }
408 }
409
410 let body_select = match get_leftmost_select_mut(&mut cte.this) {
412 Some(s) => s,
413 None => continue,
414 };
415
416 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
417 resolved_cte_columns.insert(cte_name, columns);
418 }
419
420 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
422}
423
424fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
429 let mut current = expr;
430 for _ in 0..MAX_LINEAGE_DEPTH {
431 match current {
432 Expression::Select(s) => return Some(s),
433 Expression::Union(u) => current = &mut u.left,
434 Expression::Intersect(i) => current = &mut i.left,
435 Expression::Except(e) => current = &mut e.left,
436 Expression::Paren(p) => current = &mut p.this,
437 _ => return None,
438 }
439 }
440 None
441}
442
443fn rewrite_stars_in_select(
447 select: &mut Select,
448 resolved_ctes: &HashMap<String, Vec<String>>,
449 schema: Option<&dyn Schema>,
450) -> Vec<String> {
451 let has_star = select
456 .expressions
457 .iter()
458 .any(|e| matches!(e, Expression::Star(_)));
459 let has_qualified_star = select
460 .expressions
461 .iter()
462 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
463
464 if !has_star && !has_qualified_star {
465 return select
467 .expressions
468 .iter()
469 .filter_map(get_expression_output_name)
470 .collect();
471 }
472
473 let sources = get_select_sources(select);
474 let mut new_expressions = Vec::new();
475 let mut result_columns = Vec::new();
476
477 for expr in &select.expressions {
478 match expr {
479 Expression::Star(star) => {
480 let qual = star.table.as_ref();
481 if let Some(expanded) =
482 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
483 {
484 for (src_alias, col_name) in &expanded {
485 let table_id = Identifier::new(src_alias);
486 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
487 result_columns.push(col_name.clone());
488 }
489 } else {
490 new_expressions.push(expr.clone());
491 result_columns.push("*".to_string());
492 }
493 }
494 Expression::Column(c) if c.name.name == "*" => {
495 let qual = c.table.as_ref();
496 if let Some(expanded) =
497 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
498 {
499 for (_src_alias, col_name) in &expanded {
500 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
502 result_columns.push(col_name.clone());
503 }
504 } else {
505 new_expressions.push(expr.clone());
506 result_columns.push("*".to_string());
507 }
508 }
509 _ => {
510 new_expressions.push(expr.clone());
511 if let Some(name) = get_expression_output_name(expr) {
512 result_columns.push(name);
513 }
514 }
515 }
516 }
517
518 select.expressions = new_expressions;
519 result_columns
520}
521
522fn expand_star_from_sources(
527 qualifier: Option<&Identifier>,
528 sources: &[SourceInfo],
529 resolved_ctes: &HashMap<String, Vec<String>>,
530 schema: Option<&dyn Schema>,
531) -> Option<Vec<(String, String)>> {
532 let mut expanded = Vec::new();
533
534 if let Some(qual) = qualifier {
535 let qual_normalized = normalize_cte_name(qual);
537 for src in sources {
538 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
539 if let Some(cols) = resolved_ctes.get(&src.normalized) {
541 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
542 return Some(expanded);
543 }
544 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
546 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
547 return Some(expanded);
548 }
549 }
550 }
551 None
552 } else {
553 let mut any_expanded = false;
559 for src in sources {
560 if let Some(cols) = resolved_ctes.get(&src.normalized) {
561 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
562 any_expanded = true;
563 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
564 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
565 any_expanded = true;
566 } else {
567 return None;
568 }
569 }
570 if any_expanded {
571 Some(expanded)
572 } else {
573 None
574 }
575 }
576}
577
578fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
580 let schema = schema?;
581 if fq_name.is_empty() {
582 return None;
583 }
584 schema
585 .column_names(fq_name)
586 .ok()
587 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
588}
589
590fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
592 Expression::Column(Box::new(crate::expressions::Column {
593 name: Identifier::new(name),
594 table: table.cloned(),
595 join_mark: false,
596 trailing_comments: Vec::new(),
597 span: None,
598 inferred_type: None,
599 }))
600}
601
602fn get_expression_output_name(expr: &Expression) -> Option<String> {
604 match expr {
605 Expression::Alias(a) => Some(a.alias.name.clone()),
606 Expression::Column(c) => Some(c.name.name.clone()),
607 Expression::Identifier(id) => Some(id.name.clone()),
608 Expression::Star(_) => Some("*".to_string()),
609 _ => None,
610 }
611}
612
613struct SourceInfo {
615 alias: String,
616 quoted: bool,
623 normalized: String,
625 fq_name: String,
627}
628
629fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
632 let mut sources = Vec::new();
633
634 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
635 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
636 SourceInfo {
637 alias: alias.name.clone(),
638 quoted: alias.quoted,
639 normalized: normalize_cte_name(alias),
640 fq_name: alias.name.clone(),
641 }
642 }
643
644 fn named_virtual_source_info(alias: &str) -> SourceInfo {
645 SourceInfo {
646 alias: alias.to_string(),
647 quoted: false,
648 normalized: alias.to_lowercase(),
649 fq_name: alias.to_string(),
650 }
651 }
652
653 match expr {
654 Expression::Table(t) => {
655 let normalized = normalize_cte_name(&t.name);
656 let alias = t
657 .alias
658 .as_ref()
659 .map(|a| a.name.clone())
660 .unwrap_or_else(|| t.name.name.clone());
661 let mut parts = Vec::new();
662 if let Some(catalog) = &t.catalog {
663 parts.push(catalog.name.clone());
664 }
665 if let Some(schema) = &t.schema {
666 parts.push(schema.name.clone());
667 }
668 parts.push(t.name.name.clone());
669 let fq_name = parts.join(".");
670 Some(SourceInfo {
671 alias,
672 quoted: t.name.quoted,
673 normalized,
674 fq_name,
675 })
676 }
677 Expression::Subquery(s) => {
678 let alias_identifier = s.alias.as_ref()?;
679 let alias = alias_identifier.name.clone();
680 let normalized = alias.to_lowercase();
681 let fq_name = alias.clone();
682 Some(SourceInfo {
683 alias,
684 quoted: alias_identifier.quoted,
685 normalized,
686 fq_name,
687 })
688 }
689 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
690 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
691 Some(virtual_source_info(&a.alias))
692 }
693 Expression::Alias(a) if is_query_like_relation(&a.this) => {
694 Some(virtual_source_info(&a.alias))
695 }
696 Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
697 Expression::LateralView(lateral_view) => lateral_view
698 .table_alias
699 .as_ref()
700 .or_else(|| lateral_view.column_aliases.first())
701 .map(virtual_source_info),
702 Expression::Pivot(pivot) => {
703 let alias = pivot_lineage_source_name(
704 &pivot.this,
705 pivot.alias.as_ref().map(|alias| alias.name.as_str()),
706 );
707 Some(SourceInfo {
708 alias: alias.clone(),
709 quoted: false,
710 normalized: alias.to_lowercase(),
711 fq_name: alias,
712 })
713 }
714 Expression::Unpivot(unpivot) => {
715 let alias = pivot_lineage_source_name(
716 &unpivot.this,
717 unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
718 );
719 Some(SourceInfo {
720 alias: alias.clone(),
721 quoted: false,
722 normalized: alias.to_lowercase(),
723 fq_name: alias,
724 })
725 }
726 Expression::Paren(p) => extract_source(&p.this),
727 _ => None,
728 }
729 }
730
731 if let Some(from) = &select.from {
732 for expr in &from.expressions {
733 if let Some(info) = extract_source(expr) {
734 sources.push(info);
735 }
736 }
737 }
738 for join in &select.joins {
739 if is_semi_or_anti_join_kind(join.kind) {
740 continue;
741 }
742 if let Some(info) = extract_source(&join.this) {
743 sources.push(info);
744 }
745 }
746 for lateral_view in &select.lateral_views {
747 if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
748 {
749 sources.push(info);
750 }
751 }
752 sources
753}
754
755fn pivot_lineage_source_name(source: &Expression, explicit_alias: Option<&str>) -> String {
756 if let Some(alias) = explicit_alias {
757 return alias.to_string();
758 }
759
760 match source {
761 Expression::Table(table) => table
762 .alias
763 .as_ref()
764 .map(|alias| alias.name.clone())
765 .unwrap_or_else(|| table.name.name.clone()),
766 Expression::Subquery(subquery) => subquery
767 .alias
768 .as_ref()
769 .map(|alias| alias.name.clone())
770 .unwrap_or_else(|| "_0".to_string()),
771 Expression::Paren(paren) => pivot_lineage_source_name(&paren.this, explicit_alias),
772 _ => "_0".to_string(),
773 }
774}
775
776pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
778 let mut tables = HashSet::new();
779 collect_source_tables(node, &mut tables);
780 tables
781}
782
783pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
785 if let Expression::Table(table) = &node.source {
786 tables.insert(table.name.name.clone());
787 }
788 for child in &node.downstream {
789 collect_source_tables(child, tables);
790 }
791}
792
793const MAX_LINEAGE_DEPTH: usize = 64;
800
801fn to_node(
803 column: ColumnRef<'_>,
804 scope: &Scope,
805 dialect: Option<DialectType>,
806 scope_name: &str,
807 source_name: &str,
808 reference_node_name: &str,
809 trim_selects: bool,
810) -> Result<LineageNode> {
811 to_node_inner(
812 column,
813 scope,
814 dialect,
815 scope_name,
816 source_name,
817 reference_node_name,
818 trim_selects,
819 &[],
820 0,
821 )
822}
823
824fn to_node_inner(
825 column: ColumnRef<'_>,
826 scope: &Scope,
827 dialect: Option<DialectType>,
828 scope_name: &str,
829 source_name: &str,
830 reference_node_name: &str,
831 trim_selects: bool,
832 ancestor_cte_scopes: &[Scope],
833 depth: usize,
834) -> Result<LineageNode> {
835 if depth > MAX_LINEAGE_DEPTH {
836 return Err(Error::internal(format!(
837 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
838 )));
839 }
840 let scope_expr = &scope.expression;
841
842 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
844 for s in ancestor_cte_scopes {
845 all_cte_scopes.push(s);
846 }
847 let descendant_cte_scopes = descendant_cte_scope_clones(&all_cte_scopes, scope);
848
849 let effective_expr = effective_scope_expression(scope_expr);
852
853 if matches!(
855 effective_expr,
856 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
857 ) {
858 if matches!(scope_expr, Expression::Cte(_)) {
860 let mut inner_scope = Scope::new(effective_expr.clone());
861 inner_scope.union_scopes = scope.union_scopes.clone();
862 inner_scope.sources = scope.sources.clone();
863 inner_scope.cte_sources = scope.cte_sources.clone();
864 inner_scope.cte_scopes = scope.cte_scopes.clone();
865 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
866 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
867 return handle_set_operation(
868 &column,
869 &inner_scope,
870 dialect,
871 scope_name,
872 source_name,
873 reference_node_name,
874 trim_selects,
875 &descendant_cte_scopes,
876 depth,
877 );
878 }
879 return handle_set_operation(
880 &column,
881 scope,
882 dialect,
883 scope_name,
884 source_name,
885 reference_node_name,
886 trim_selects,
887 &descendant_cte_scopes,
888 depth,
889 );
890 }
891
892 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
894 let column_name = resolve_column_name(&column, &select_expr);
895
896 let node_source = if trim_selects {
898 trim_source(effective_expr, &select_expr)
899 } else {
900 effective_expr.clone()
901 };
902
903 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
905 apply_scope_context(&mut node, scope, source_name, reference_node_name);
906
907 if let Expression::Star(star) = &select_expr {
909 let star_table = star
910 .table
911 .as_ref()
912 .map(|identifier| identifier.name.as_str());
913 for (name, source_info) in &scope.sources {
914 if let Some(star_table) = star_table {
915 let table_matches = name.eq_ignore_ascii_case(star_table)
916 || source_info
917 .alias
918 .as_deref()
919 .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
920 || matches!(
921 &source_info.expression,
922 Expression::Table(table_ref)
923 if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
924 );
925 if !table_matches {
926 continue;
927 }
928 }
929
930 let mut child = LineageNode::new(
931 format!("{}.*", name),
932 Expression::Star(crate::expressions::Star {
933 table: star.table.clone(),
934 except: None,
935 replace: None,
936 rename: None,
937 trailing_comments: vec![],
938 span: None,
939 }),
940 source_info.expression.clone(),
941 );
942 apply_source_info_context(&mut child, name, source_info);
943 node.downstream.push(child);
944 }
945 return Ok(node);
946 }
947
948 for query in query_expressions_in_scope(&select_expr) {
950 for sq_scope in &scope.subquery_scopes {
951 if sq_scope.expression == *query {
952 if let Ok(child) = to_node_inner(
953 ColumnRef::Index(0),
954 sq_scope,
955 dialect,
956 &column_name,
957 "",
958 "",
959 trim_selects,
960 &descendant_cte_scopes,
961 depth + 1,
962 ) {
963 node.downstream.push(child);
964 }
965 break;
966 }
967 }
968 }
969
970 let col_refs = find_column_refs_in_expr_with_select(&select_expr, effective_expr, dialect);
972 for col_ref in col_refs {
973 let col_name = &col_ref.column;
974 if let Some(ref table_id) = col_ref.table {
975 let tbl = &table_id.name;
976 resolve_qualified_column(
977 &mut node,
978 scope,
979 dialect,
980 tbl,
981 col_name,
982 &column_name,
983 trim_selects,
984 &all_cte_scopes,
985 depth,
986 );
987 } else {
988 if let Some(alias_expr) =
989 find_prior_select_alias_expr(effective_expr, &select_expr, col_name, dialect)
990 {
991 for alias_ref in
992 find_column_refs_in_expr_with_select(&alias_expr, effective_expr, dialect)
993 {
994 if let Some(ref table_id) = alias_ref.table {
995 resolve_qualified_column(
996 &mut node,
997 scope,
998 dialect,
999 &table_id.name,
1000 &alias_ref.column,
1001 &column_name,
1002 trim_selects,
1003 &all_cte_scopes,
1004 depth,
1005 );
1006 } else {
1007 resolve_unqualified_column(
1008 &mut node,
1009 scope,
1010 dialect,
1011 &alias_ref.column,
1012 &column_name,
1013 trim_selects,
1014 &all_cte_scopes,
1015 depth,
1016 );
1017 }
1018 }
1019 continue;
1020 }
1021
1022 resolve_unqualified_column(
1023 &mut node,
1024 scope,
1025 dialect,
1026 col_name,
1027 &column_name,
1028 trim_selects,
1029 &all_cte_scopes,
1030 depth,
1031 );
1032 }
1033 }
1034
1035 Ok(node)
1036}
1037
1038fn descendant_cte_scope_clones(all_cte_scopes: &[&Scope], current_scope: &Scope) -> Vec<Scope> {
1039 all_cte_scopes
1040 .iter()
1041 .filter(|scope| scope.expression != current_scope.expression)
1042 .map(|scope| (*scope).clone())
1043 .collect()
1044}
1045
1046fn effective_scope_expression(expr: &Expression) -> &Expression {
1047 match expr {
1048 Expression::Cte(cte) => effective_scope_expression(&cte.this),
1049 Expression::Subquery(subquery) => effective_scope_expression(&subquery.this),
1050 Expression::Paren(paren) => effective_scope_expression(&paren.this),
1051 other => other,
1052 }
1053}
1054
1055fn query_expressions_in_scope(expr: &Expression) -> Vec<&Expression> {
1056 let mut queries = Vec::new();
1057 let mut seen = HashSet::new();
1058
1059 for node in find_all_in_scope(
1060 expr,
1061 |node| {
1062 matches!(
1063 node,
1064 Expression::Subquery(subquery) if subquery.alias.is_none()
1065 ) || matches!(
1066 node,
1067 Expression::Exists(_) | Expression::In(_) | Expression::Any(_) | Expression::All(_)
1068 )
1069 },
1070 false,
1071 ) {
1072 let query = match node {
1073 Expression::Subquery(subquery) if subquery.alias.is_none() => Some(&subquery.this),
1074 Expression::Exists(exists) => Some(&exists.this),
1075 Expression::In(in_expr) => in_expr.query.as_ref(),
1076 Expression::Any(quantified) | Expression::All(quantified) => Some(&quantified.subquery),
1077 _ => None,
1078 };
1079
1080 if let Some(query) = query {
1081 let key = query as *const Expression as usize;
1082 if seen.insert(key) {
1083 queries.push(query);
1084 }
1085 }
1086 }
1087
1088 queries
1089}
1090
1091fn handle_set_operation(
1096 column: &ColumnRef<'_>,
1097 scope: &Scope,
1098 dialect: Option<DialectType>,
1099 scope_name: &str,
1100 source_name: &str,
1101 reference_node_name: &str,
1102 trim_selects: bool,
1103 ancestor_cte_scopes: &[Scope],
1104 depth: usize,
1105) -> Result<LineageNode> {
1106 let scope_expr = &scope.expression;
1107
1108 let col_index = match column {
1110 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
1111 ColumnRef::Index(i) => *i,
1112 };
1113
1114 let col_name = match column {
1115 ColumnRef::Name(name) => name.to_string(),
1116 ColumnRef::Index(_) => format!("_{col_index}"),
1117 };
1118
1119 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
1120 apply_scope_context(&mut node, scope, source_name, reference_node_name);
1121
1122 for branch_scope in &scope.union_scopes {
1124 if let Ok(child) = to_node_inner(
1125 ColumnRef::Index(col_index),
1126 branch_scope,
1127 dialect,
1128 scope_name,
1129 "",
1130 "",
1131 trim_selects,
1132 ancestor_cte_scopes,
1133 depth + 1,
1134 ) {
1135 node.downstream.push(child);
1136 }
1137 }
1138
1139 Ok(node)
1140}
1141
1142fn resolve_qualified_column(
1147 node: &mut LineageNode,
1148 scope: &Scope,
1149 dialect: Option<DialectType>,
1150 table: &str,
1151 col_name: &str,
1152 parent_name: &str,
1153 trim_selects: bool,
1154 all_cte_scopes: &[&Scope],
1155 depth: usize,
1156) {
1157 let resolved_cte_name = resolve_cte_alias(scope, table);
1160 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
1161
1162 if let Some(source_info) = scope
1163 .sources
1164 .get(table)
1165 .or_else(|| scope.sources.get(effective_table))
1166 {
1167 match &source_info.expression {
1168 Expression::Pivot(pivot) => {
1169 if attach_pivot_dependencies(
1170 node,
1171 scope,
1172 dialect,
1173 pivot,
1174 col_name,
1175 trim_selects,
1176 all_cte_scopes,
1177 depth,
1178 ) {
1179 return;
1180 }
1181 }
1182 Expression::Unpivot(unpivot) => {
1183 if attach_unpivot_dependencies(
1184 node,
1185 scope,
1186 dialect,
1187 unpivot,
1188 col_name,
1189 trim_selects,
1190 all_cte_scopes,
1191 depth,
1192 ) {
1193 return;
1194 }
1195 }
1196 _ => {}
1197 }
1198 }
1199
1200 let is_cte = scope.cte_sources.contains_key(effective_table)
1203 || all_cte_scopes.iter().any(
1204 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
1205 );
1206 if is_cte {
1207 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
1208 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1210 if let Ok(child) = to_node_inner(
1211 ColumnRef::Name(col_name),
1212 child_scope,
1213 dialect,
1214 parent_name,
1215 effective_table,
1216 parent_name,
1217 trim_selects,
1218 &ancestors,
1219 depth + 1,
1220 ) {
1221 node.downstream.push(child);
1222 return;
1223 }
1224 }
1225
1226 if let Some(source_info) = scope
1227 .sources
1228 .get(table)
1229 .or_else(|| scope.sources.get(effective_table))
1230 .filter(|source_info| source_info.kind == SourceKind::Cte)
1231 {
1232 node.downstream.push(make_table_column_node_from_source(
1233 effective_table,
1234 col_name,
1235 source_info,
1236 ));
1237 return;
1238 }
1239 }
1240
1241 if let Some(source_info) = scope.sources.get(table) {
1243 if source_info.is_scope {
1244 if let Some(child_scope) = find_child_scope(scope, table) {
1245 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1246 if let Ok(child) = to_node_inner(
1247 ColumnRef::Name(col_name),
1248 child_scope,
1249 dialect,
1250 parent_name,
1251 table,
1252 parent_name,
1253 trim_selects,
1254 &ancestors,
1255 depth + 1,
1256 ) {
1257 node.downstream.push(child);
1258 return;
1259 }
1260 }
1261 }
1262 }
1263
1264 if let Some(source_info) = scope.sources.get(table) {
1267 if !source_info.is_scope {
1268 let mut child = make_table_column_node_from_source(table, col_name, source_info);
1269 if source_info.kind == SourceKind::Virtual {
1270 attach_virtual_source_dependencies(
1271 &mut child,
1272 scope,
1273 dialect,
1274 table,
1275 &source_info.expression,
1276 trim_selects,
1277 all_cte_scopes,
1278 depth,
1279 );
1280 }
1281 node.downstream.push(child);
1282 return;
1283 }
1284 }
1285
1286 node.downstream
1288 .push(make_table_column_node(table, col_name));
1289}
1290
1291fn attach_pivot_dependencies(
1292 node: &mut LineageNode,
1293 scope: &Scope,
1294 dialect: Option<DialectType>,
1295 pivot: &crate::expressions::Pivot,
1296 col_name: &str,
1297 trim_selects: bool,
1298 all_cte_scopes: &[&Scope],
1299 depth: usize,
1300) -> bool {
1301 if pivot.unpivot {
1302 return false;
1303 }
1304
1305 let mapping = pivot_lineage_column_mapping(pivot, scope, dialect);
1306 let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1307 if pivot_implicit_source_column(pivot, col_name) {
1308 let col_ref = SimpleColumnRef {
1309 table: None,
1310 column: col_name.to_string(),
1311 };
1312 attach_pivot_input_column(
1313 node,
1314 scope,
1315 dialect,
1316 &pivot.this,
1317 &col_ref,
1318 trim_selects,
1319 all_cte_scopes,
1320 depth,
1321 );
1322 return true;
1323 }
1324 return false;
1325 };
1326
1327 for col_ref in input_columns {
1328 attach_pivot_input_column(
1329 node,
1330 scope,
1331 dialect,
1332 &pivot.this,
1333 col_ref,
1334 trim_selects,
1335 all_cte_scopes,
1336 depth,
1337 );
1338 }
1339 true
1340}
1341
1342fn attach_unpivot_dependencies(
1343 node: &mut LineageNode,
1344 scope: &Scope,
1345 dialect: Option<DialectType>,
1346 unpivot: &crate::expressions::Unpivot,
1347 col_name: &str,
1348 trim_selects: bool,
1349 all_cte_scopes: &[&Scope],
1350 depth: usize,
1351) -> bool {
1352 let mapping = unpivot_column_mapping(unpivot, dialect);
1353 let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1354 return false;
1355 };
1356
1357 for col_ref in input_columns {
1358 attach_pivot_input_column(
1359 node,
1360 scope,
1361 dialect,
1362 &unpivot.this,
1363 col_ref,
1364 trim_selects,
1365 all_cte_scopes,
1366 depth,
1367 );
1368 }
1369 true
1370}
1371
1372fn pivot_column_mapping(
1373 pivot: &crate::expressions::Pivot,
1374 dialect: Option<DialectType>,
1375) -> HashMap<String, Vec<SimpleColumnRef>> {
1376 let aggregations = pivot_aggregation_expressions(pivot);
1377 let output_columns = pivot_generated_output_columns(pivot, dialect);
1378 if aggregations.is_empty() || output_columns.is_empty() {
1379 return HashMap::new();
1380 }
1381
1382 let mut mapping = HashMap::new();
1383 for (agg_index, agg) in aggregations.iter().enumerate() {
1384 let input_columns = find_column_refs_in_expr(agg, dialect);
1385 if input_columns.is_empty() {
1386 continue;
1387 }
1388 for col_index in (agg_index..output_columns.len()).step_by(aggregations.len()) {
1389 mapping.insert(
1390 normalize_column_name(&output_columns[col_index], dialect),
1391 input_columns.clone(),
1392 );
1393 }
1394 }
1395 mapping
1396}
1397
1398fn pivot_lineage_column_mapping(
1399 pivot: &crate::expressions::Pivot,
1400 scope: &Scope,
1401 dialect: Option<DialectType>,
1402) -> HashMap<String, Vec<SimpleColumnRef>> {
1403 let mut mapping = pivot_column_mapping(pivot, dialect);
1404 let Some(pre_pivot_columns) = pre_pivot_output_columns(&pivot.this, scope) else {
1405 return mapping;
1406 };
1407
1408 let output_columns = pivot_output_columns(pivot, &pre_pivot_columns, dialect);
1409 if output_columns.is_empty() {
1410 return mapping;
1411 }
1412
1413 let base_mapping = mapping.clone();
1414 for (post_name, pre_name) in output_columns {
1415 let normalized_pre = normalize_column_name(&pre_name, dialect);
1416 let normalized_post = normalize_column_name(&post_name, dialect);
1417
1418 if let Some(input_columns) = base_mapping.get(&normalized_pre) {
1419 mapping.insert(normalized_post, input_columns.clone());
1420 } else {
1421 mapping.insert(
1422 normalized_post,
1423 vec![SimpleColumnRef {
1424 table: None,
1425 column: pre_name,
1426 }],
1427 );
1428 }
1429 }
1430
1431 mapping
1432}
1433
1434fn pre_pivot_output_columns(source: &Expression, scope: &Scope) -> Option<Vec<String>> {
1435 match source {
1436 Expression::Subquery(subquery) => known_output_columns(&subquery.this),
1437 Expression::Table(table) if table.schema.is_none() && table.catalog.is_none() => scope
1438 .cte_sources
1439 .get(&table.name.name)
1440 .and_then(|source| known_output_columns(&source.expression)),
1441 Expression::Paren(paren) => pre_pivot_output_columns(&paren.this, scope),
1442 _ => None,
1443 }
1444}
1445
1446fn known_output_columns(expression: &Expression) -> Option<Vec<String>> {
1447 let expression = match expression {
1448 Expression::Cte(cte) => &cte.this,
1449 Expression::Subquery(subquery) => &subquery.this,
1450 other => other,
1451 };
1452 let columns = crate::ast_transforms::get_output_column_names(expression);
1453 if columns.is_empty() || columns.iter().any(|column| column == "*") {
1454 None
1455 } else {
1456 Some(columns)
1457 }
1458}
1459
1460fn pivot_output_columns(
1461 pivot: &crate::expressions::Pivot,
1462 pre_pivot_columns: &[String],
1463 dialect: Option<DialectType>,
1464) -> Vec<(String, String)> {
1465 let generated_outputs = pivot_generated_output_columns(pivot, dialect);
1466 let excluded = pivot_excluded_source_columns(pivot, dialect);
1467
1468 if excluded.is_empty() || generated_outputs.is_empty() {
1469 return Vec::new();
1470 }
1471
1472 let mut pre_rename: Vec<String> = pre_pivot_columns
1473 .iter()
1474 .filter(|column| !excluded.contains(&normalize_column_name(column, dialect)))
1475 .cloned()
1476 .collect();
1477 pre_rename.extend(generated_outputs);
1478
1479 let post_rename = if pivot.alias_columns.is_empty() {
1480 pre_rename.clone()
1481 } else {
1482 let mut names: Vec<String> = pivot
1483 .alias_columns
1484 .iter()
1485 .map(|column| column.name.clone())
1486 .collect();
1487 names.extend(pre_rename.iter().skip(names.len()).cloned());
1488 names
1489 };
1490
1491 post_rename.into_iter().zip(pre_rename).collect()
1492}
1493
1494fn pivot_excluded_source_columns(
1495 pivot: &crate::expressions::Pivot,
1496 dialect: Option<DialectType>,
1497) -> HashSet<String> {
1498 pivot
1499 .fields
1500 .iter()
1501 .chain(pivot.expressions.iter())
1502 .chain(pivot.using.iter())
1503 .flat_map(|expr| find_column_refs_in_expr(expr, dialect))
1504 .map(|column| normalize_column_name(&column.column, dialect))
1505 .collect()
1506}
1507
1508fn pivot_generated_output_columns(
1509 pivot: &crate::expressions::Pivot,
1510 _dialect: Option<DialectType>,
1511) -> Vec<String> {
1512 let fields = pivot_field_output_names(pivot);
1513 if fields.is_empty() {
1514 return Vec::new();
1515 }
1516
1517 let aggregations = pivot_aggregation_expressions(pivot);
1518 if aggregations.is_empty() {
1519 return Vec::new();
1520 }
1521
1522 let needs_suffix = aggregations.len() > 1;
1523 let mut outputs = Vec::new();
1524 for field in fields {
1525 for aggregation in aggregations {
1526 if let Some(suffix) = pivot_aggregation_output_suffix(aggregation, needs_suffix) {
1527 outputs.push(format!("{field}_{suffix}"));
1528 } else {
1529 outputs.push(field.clone());
1530 }
1531 }
1532 }
1533 outputs
1534}
1535
1536fn pivot_aggregation_expressions(pivot: &crate::expressions::Pivot) -> &[Expression] {
1537 if pivot.using.is_empty() {
1538 &pivot.expressions
1539 } else {
1540 &pivot.using
1541 }
1542}
1543
1544fn pivot_aggregation_output_suffix(expr: &Expression, needs_suffix: bool) -> Option<String> {
1545 match expr {
1546 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1547 _ if needs_suffix => Generator::sql(expr).ok().map(|sql| sql.to_lowercase()),
1548 _ => None,
1549 }
1550}
1551
1552fn pivot_field_output_names(pivot: &crate::expressions::Pivot) -> Vec<String> {
1553 let mut names = Vec::new();
1554 for field in &pivot.fields {
1555 if let Expression::In(in_expr) = field {
1556 for expr in &in_expr.expressions {
1557 if let Some(name) = pivot_expr_output_name(expr) {
1558 names.push(name);
1559 }
1560 }
1561 }
1562 }
1563 names
1564}
1565
1566fn pivot_expr_output_name(expr: &Expression) -> Option<String> {
1567 match expr {
1568 Expression::PivotAlias(alias) => pivot_expr_output_name(&alias.alias),
1569 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1570 Expression::Identifier(identifier) => Some(identifier.name.clone()),
1571 Expression::Column(column) => Some(column.name.name.clone()),
1572 Expression::Literal(literal) => Some(literal.value_str().to_string()),
1573 Expression::Var(var) => Some(var.this.clone()),
1574 Expression::Tuple(tuple) => tuple.expressions.first().and_then(pivot_expr_output_name),
1575 _ => None,
1576 }
1577}
1578
1579fn pivot_implicit_source_column(pivot: &crate::expressions::Pivot, col_name: &str) -> bool {
1580 let pivot_columns: HashSet<String> = pivot
1581 .fields
1582 .iter()
1583 .filter_map(|field| match field {
1584 Expression::In(in_expr) => Some(&in_expr.this),
1585 _ => None,
1586 })
1587 .flat_map(|expr| find_column_refs_in_expr(expr, None))
1588 .map(|col| col.column.to_lowercase())
1589 .collect();
1590 let aggregation_columns: HashSet<String> = pivot
1591 .expressions
1592 .iter()
1593 .flat_map(|expr| find_column_refs_in_expr(expr, None))
1594 .map(|col| col.column.to_lowercase())
1595 .collect();
1596
1597 let normalized = col_name.to_lowercase();
1598 !pivot_columns.contains(&normalized) && !aggregation_columns.contains(&normalized)
1599}
1600
1601fn unpivot_column_mapping(
1602 unpivot: &crate::expressions::Unpivot,
1603 dialect: Option<DialectType>,
1604) -> HashMap<String, Vec<SimpleColumnRef>> {
1605 let value_columns: Vec<String> = std::iter::once(unpivot.value_column.name.clone())
1606 .chain(
1607 unpivot
1608 .extra_value_columns
1609 .iter()
1610 .map(|column| column.name.clone()),
1611 )
1612 .collect();
1613 let mut all_input_columns = Vec::new();
1614 let mut value_input_columns: Vec<Vec<SimpleColumnRef>> = vec![Vec::new(); value_columns.len()];
1615
1616 for entry in &unpivot.columns {
1617 let columns = unpivot_entry_columns(entry);
1618 all_input_columns.extend(columns.clone());
1619 if columns.len() == value_columns.len() {
1620 for (idx, col_ref) in columns.into_iter().enumerate() {
1621 value_input_columns[idx].push(col_ref);
1622 }
1623 } else {
1624 for inputs in &mut value_input_columns {
1625 inputs.extend(columns.clone());
1626 }
1627 }
1628 }
1629
1630 let mut mapping = HashMap::new();
1631 mapping.insert(
1632 normalize_column_name(&unpivot.name_column.name, dialect),
1633 all_input_columns.clone(),
1634 );
1635 for (idx, value_column) in value_columns.iter().enumerate() {
1636 mapping.insert(
1637 normalize_column_name(value_column, dialect),
1638 value_input_columns.get(idx).cloned().unwrap_or_default(),
1639 );
1640 }
1641 mapping
1642}
1643
1644fn unpivot_entry_columns(expr: &Expression) -> Vec<SimpleColumnRef> {
1645 match expr {
1646 Expression::PivotAlias(alias) => unpivot_entry_columns(&alias.this),
1647 Expression::Tuple(tuple) => tuple
1648 .expressions
1649 .iter()
1650 .flat_map(unpivot_entry_columns)
1651 .collect(),
1652 Expression::Column(column) => vec![SimpleColumnRef {
1653 table: column.table.clone(),
1654 column: column.name.name.clone(),
1655 }],
1656 Expression::Identifier(identifier) => vec![SimpleColumnRef {
1657 table: None,
1658 column: identifier.name.clone(),
1659 }],
1660 _ => find_column_refs_in_expr(expr, None),
1661 }
1662}
1663
1664fn attach_pivot_input_column(
1665 node: &mut LineageNode,
1666 scope: &Scope,
1667 dialect: Option<DialectType>,
1668 source_expr: &Expression,
1669 col_ref: &SimpleColumnRef,
1670 trim_selects: bool,
1671 all_cte_scopes: &[&Scope],
1672 depth: usize,
1673) {
1674 match source_expr {
1675 Expression::Table(table) => {
1676 let table_name = col_ref
1677 .table
1678 .as_ref()
1679 .map(|identifier| identifier.name.as_str())
1680 .unwrap_or(table.name.name.as_str());
1681 if scope.cte_sources.contains_key(table_name) {
1682 resolve_qualified_column(
1683 node,
1684 scope,
1685 dialect,
1686 table_name,
1687 &col_ref.column,
1688 &node.name.clone(),
1689 trim_selects,
1690 all_cte_scopes,
1691 depth + 1,
1692 );
1693 } else {
1694 let mut source = ScopeSourceInfo::new(
1695 Expression::Table(Box::new(table.as_ref().clone())),
1696 false,
1697 SourceKind::Table,
1698 );
1699 if let Some(alias) = &table.alias {
1700 source = source.with_alias(alias.name.clone());
1701 }
1702 let source_key = table
1703 .alias
1704 .as_ref()
1705 .map(|alias| alias.name.as_str())
1706 .unwrap_or(table.name.name.as_str());
1707 node.downstream.push(make_table_column_node_from_source(
1708 source_key,
1709 &col_ref.column,
1710 &source,
1711 ));
1712 }
1713 }
1714 Expression::Subquery(subquery) => {
1715 let source_scope = build_scope(&subquery.this);
1716 let child = if let Some(table) = &col_ref.table {
1717 let mut child_node = LineageNode::new(
1718 &col_ref.column,
1719 subquery.this.clone(),
1720 subquery.this.clone(),
1721 );
1722 resolve_qualified_column(
1723 &mut child_node,
1724 &source_scope,
1725 dialect,
1726 &table.name,
1727 &col_ref.column,
1728 &node.name.clone(),
1729 trim_selects,
1730 all_cte_scopes,
1731 depth + 1,
1732 );
1733 Ok(child_node)
1734 } else {
1735 to_node_inner(
1736 ColumnRef::Name(&col_ref.column),
1737 &source_scope,
1738 dialect,
1739 "",
1740 "",
1741 "",
1742 trim_selects,
1743 &all_cte_scopes
1744 .iter()
1745 .map(|scope| (*scope).clone())
1746 .collect::<Vec<_>>(),
1747 depth + 1,
1748 )
1749 };
1750 if let Ok(child) = child {
1751 node.downstream.push(child);
1752 }
1753 }
1754 Expression::Paren(paren) => attach_pivot_input_column(
1755 node,
1756 scope,
1757 dialect,
1758 &paren.this,
1759 col_ref,
1760 trim_selects,
1761 all_cte_scopes,
1762 depth,
1763 ),
1764 _ => {
1765 if let Some(table) = &col_ref.table {
1766 resolve_qualified_column(
1767 node,
1768 scope,
1769 dialect,
1770 &table.name,
1771 &col_ref.column,
1772 &node.name.clone(),
1773 trim_selects,
1774 all_cte_scopes,
1775 depth + 1,
1776 );
1777 } else {
1778 node.downstream
1779 .push(make_table_column_node("_", &col_ref.column));
1780 }
1781 }
1782 }
1783}
1784
1785fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1791 if scope.cte_sources.contains_key(name) {
1793 return None;
1794 }
1795 if let Some(source_info) = scope.sources.get(name) {
1797 if source_info.is_scope {
1798 if let Expression::Cte(cte) = &source_info.expression {
1799 let cte_name = &cte.alias.name;
1800 if scope.cte_sources.contains_key(cte_name) {
1801 return Some(cte_name.clone());
1802 }
1803 }
1804 }
1805 }
1806 None
1807}
1808
1809fn resolve_unqualified_column(
1810 node: &mut LineageNode,
1811 scope: &Scope,
1812 dialect: Option<DialectType>,
1813 col_name: &str,
1814 parent_name: &str,
1815 trim_selects: bool,
1816 all_cte_scopes: &[&Scope],
1817 depth: usize,
1818) {
1819 let from_source_names = source_names_from_from_join(scope);
1823
1824 if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1825 resolve_qualified_column(
1826 node,
1827 scope,
1828 dialect,
1829 &tbl,
1830 col_name,
1831 parent_name,
1832 trim_selects,
1833 all_cte_scopes,
1834 depth,
1835 );
1836 return;
1837 }
1838
1839 if from_source_names.len() == 1 {
1840 let tbl = &from_source_names[0];
1841 resolve_qualified_column(
1842 node,
1843 scope,
1844 dialect,
1845 tbl,
1846 col_name,
1847 parent_name,
1848 trim_selects,
1849 all_cte_scopes,
1850 depth,
1851 );
1852 return;
1853 }
1854
1855 let child = LineageNode::new(
1857 col_name.to_string(),
1858 Expression::Column(Box::new(crate::expressions::Column {
1859 name: crate::expressions::Identifier::new(col_name.to_string()),
1860 table: None,
1861 join_mark: false,
1862 trailing_comments: vec![],
1863 span: None,
1864 inferred_type: None,
1865 })),
1866 node.source.clone(),
1867 );
1868 node.downstream.push(child);
1869}
1870
1871fn unique_virtual_source_for_column(
1872 scope: &Scope,
1873 source_names: &[String],
1874 col_name: &str,
1875) -> Option<String> {
1876 let mut matches = source_names.iter().filter_map(|source_name| {
1877 let source = scope.sources.get(source_name)?;
1878 if source.kind == SourceKind::Virtual
1879 && virtual_source_output_columns(source)
1880 .any(|column| column.eq_ignore_ascii_case(col_name))
1881 {
1882 Some(source_name.clone())
1883 } else {
1884 None
1885 }
1886 });
1887
1888 let first = matches.next()?;
1889 if matches.next().is_none() {
1890 Some(first)
1891 } else {
1892 None
1893 }
1894}
1895
1896fn virtual_source_output_columns(
1897 source_info: &ScopeSourceInfo,
1898) -> Box<dyn Iterator<Item = String> + '_> {
1899 match &source_info.expression {
1900 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1901 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1902 Box::new(alias_output_columns(alias))
1903 }
1904 Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1905 Expression::LateralView(lateral_view) => {
1906 Box::new(lateral_view_output_columns(lateral_view))
1907 }
1908 _ => Box::new(source_info.alias.clone().into_iter()),
1909 }
1910}
1911
1912fn unnest_output_columns(
1913 unnest: &crate::expressions::UnnestFunc,
1914) -> impl Iterator<Item = String> + '_ {
1915 unnest
1916 .alias
1917 .iter()
1918 .map(|alias| alias.name.clone())
1919 .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1920}
1921
1922fn alias_output_columns(
1923 alias: &crate::expressions::Alias,
1924) -> Box<dyn Iterator<Item = String> + '_> {
1925 if alias.column_aliases.is_empty() {
1926 Box::new(std::iter::once(alias.alias.name.clone()))
1927 } else {
1928 Box::new(
1929 alias
1930 .column_aliases
1931 .iter()
1932 .map(|column| column.name.clone()),
1933 )
1934 }
1935}
1936
1937fn lateral_output_columns(
1938 lateral: &crate::expressions::Lateral,
1939) -> Box<dyn Iterator<Item = String> + '_> {
1940 if lateral.column_aliases.is_empty() {
1941 default_virtual_output_columns(&lateral.this)
1942 } else {
1943 Box::new(lateral.column_aliases.iter().cloned())
1944 }
1945}
1946
1947fn lateral_view_output_columns(
1948 lateral_view: &crate::expressions::LateralView,
1949) -> Box<dyn Iterator<Item = String> + '_> {
1950 Box::new(
1951 lateral_view
1952 .column_aliases
1953 .iter()
1954 .map(|column| column.name.clone()),
1955 )
1956}
1957
1958fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1959 match expr {
1960 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1961 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1962 alias_output_columns(alias)
1963 }
1964 Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1965 Box::new(
1966 ["seq", "key", "path", "index", "value", "this"]
1967 .into_iter()
1968 .map(String::from),
1969 )
1970 }
1971 _ => Box::new(std::iter::empty()),
1972 }
1973}
1974
1975fn attach_virtual_source_dependencies(
1976 node: &mut LineageNode,
1977 scope: &Scope,
1978 dialect: Option<DialectType>,
1979 source_alias: &str,
1980 source_expr: &Expression,
1981 trim_selects: bool,
1982 all_cte_scopes: &[&Scope],
1983 depth: usize,
1984) {
1985 let parent_name = node.name.clone();
1986 let mut seen = HashSet::new();
1987 for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1988 let key = (
1989 col_ref.table.as_ref().map(|t| t.name.clone()),
1990 col_ref.column.clone(),
1991 );
1992 if !seen.insert(key) {
1993 continue;
1994 }
1995
1996 if let Some(table_id) = col_ref.table {
1997 let table = table_id.name;
1998 if table == source_alias {
1999 continue;
2000 }
2001 resolve_qualified_column(
2002 node,
2003 scope,
2004 dialect,
2005 &table,
2006 &col_ref.column,
2007 &parent_name,
2008 trim_selects,
2009 all_cte_scopes,
2010 depth + 1,
2011 );
2012 } else {
2013 let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
2014 if non_virtual_sources.len() == 1 {
2015 resolve_qualified_column(
2016 node,
2017 scope,
2018 dialect,
2019 &non_virtual_sources[0],
2020 &col_ref.column,
2021 &parent_name,
2022 trim_selects,
2023 all_cte_scopes,
2024 depth + 1,
2025 );
2026 }
2027 }
2028 }
2029}
2030
2031fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
2032 fn source_name(expr: &Expression) -> Option<String> {
2033 match expr {
2034 Expression::Table(table) => Some(
2035 table
2036 .alias
2037 .as_ref()
2038 .map(|a| a.name.clone())
2039 .unwrap_or_else(|| table.name.name.clone()),
2040 ),
2041 Expression::Subquery(subquery) => {
2042 subquery.alias.as_ref().map(|alias| alias.name.clone())
2043 }
2044 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
2045 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
2046 Some(alias.alias.name.clone())
2047 }
2048 Expression::Alias(alias) if is_query_like_relation(&alias.this) => {
2049 Some(alias.alias.name.clone())
2050 }
2051 Expression::Lateral(lateral) => lateral.alias.clone(),
2052 Expression::LateralView(lateral_view) => lateral_view
2053 .table_alias
2054 .as_ref()
2055 .or_else(|| lateral_view.column_aliases.first())
2056 .map(|alias| alias.name.clone()),
2057 Expression::Pivot(pivot) => Some(pivot_lineage_source_name(
2058 &pivot.this,
2059 pivot.alias.as_ref().map(|alias| alias.name.as_str()),
2060 )),
2061 Expression::Unpivot(unpivot) => Some(pivot_lineage_source_name(
2062 &unpivot.this,
2063 unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
2064 )),
2065 Expression::Paren(paren) => source_name(&paren.this),
2066 _ => None,
2067 }
2068 }
2069
2070 let effective_expr = match &scope.expression {
2071 Expression::Cte(cte) => &cte.this,
2072 expr => expr,
2073 };
2074
2075 let mut names = Vec::new();
2076 let mut seen = std::collections::HashSet::new();
2077
2078 if let Expression::Select(select) = effective_expr {
2079 if let Some(from) = &select.from {
2080 for expr in &from.expressions {
2081 if let Some(name) = source_name(expr) {
2082 if !name.is_empty() && seen.insert(name.clone()) {
2083 names.push(name);
2084 }
2085 }
2086 }
2087 }
2088 for join in &select.joins {
2089 if is_semi_or_anti_join_kind(join.kind) {
2090 continue;
2091 }
2092 if let Some(name) = source_name(&join.this) {
2093 if !name.is_empty() && seen.insert(name.clone()) {
2094 names.push(name);
2095 }
2096 }
2097 }
2098 for lateral_view in &select.lateral_views {
2099 if let Some(name) =
2100 source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
2101 {
2102 if !name.is_empty() && seen.insert(name.clone()) {
2103 names.push(name);
2104 }
2105 }
2106 }
2107 }
2108
2109 names
2110}
2111
2112fn is_semi_or_anti_join_kind(kind: JoinKind) -> bool {
2113 matches!(
2114 kind,
2115 JoinKind::Semi
2116 | JoinKind::Anti
2117 | JoinKind::LeftSemi
2118 | JoinKind::LeftAnti
2119 | JoinKind::RightSemi
2120 | JoinKind::RightAnti
2121 )
2122}
2123
2124fn is_query_like_relation(expr: &Expression) -> bool {
2125 match expr {
2126 Expression::Select(_)
2127 | Expression::Subquery(_)
2128 | Expression::Union(_)
2129 | Expression::Intersect(_)
2130 | Expression::Except(_) => true,
2131 Expression::Paren(paren) => is_query_like_relation(&paren.this),
2132 _ => false,
2133 }
2134}
2135
2136fn derived_source_query(expr: &Expression) -> Option<&Expression> {
2137 match expr {
2138 Expression::Subquery(subquery) => Some(&subquery.this),
2139 Expression::Alias(alias) if is_query_like_relation(&alias.this) => Some(&alias.this),
2140 Expression::Select(_)
2141 | Expression::Union(_)
2142 | Expression::Intersect(_)
2143 | Expression::Except(_) => Some(expr),
2144 Expression::Paren(paren) => derived_source_query(&paren.this),
2145 _ => None,
2146 }
2147}
2148
2149fn expressions_equivalent_after_wrappers(left: &Expression, right: &Expression) -> bool {
2150 left == right || effective_scope_expression(left) == effective_scope_expression(right)
2151}
2152
2153fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
2154 source_names_from_from_join(scope)
2155 .into_iter()
2156 .filter(|name| {
2157 !matches!(
2158 scope.sources.get(name).map(|source| source.kind),
2159 Some(SourceKind::Virtual)
2160 )
2161 })
2162 .collect()
2163}
2164
2165fn get_alias_or_name(expr: &Expression) -> Option<String> {
2171 match expr {
2172 Expression::Alias(alias) => Some(alias.alias.name.clone()),
2173 Expression::Column(col) => Some(col.name.name.clone()),
2174 Expression::Identifier(id) => Some(id.name.clone()),
2175 Expression::Star(_) => Some("*".to_string()),
2176 Expression::Annotated(a) => get_alias_or_name(&a.this),
2179 _ => None,
2180 }
2181}
2182
2183fn find_prior_select_alias_expr(
2184 scope_expr: &Expression,
2185 target_expr: &Expression,
2186 alias_name: &str,
2187 dialect: Option<DialectType>,
2188) -> Option<Expression> {
2189 let Expression::Select(select) = scope_expr else {
2190 return None;
2191 };
2192
2193 let normalized_alias = normalize_column_name(alias_name, dialect);
2194 for expr in &select.expressions {
2195 if expr == target_expr {
2196 return None;
2197 }
2198
2199 if let Expression::Alias(alias) = expr {
2200 if normalize_column_name(&alias.alias.name, dialect) == normalized_alias {
2201 return Some(alias.this.clone());
2202 }
2203 }
2204 }
2205
2206 None
2207}
2208
2209fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
2211 match column {
2212 ColumnRef::Name(n) => n.to_string(),
2213 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
2214 }
2215}
2216
2217fn find_select_expr(
2219 scope_expr: &Expression,
2220 column: &ColumnRef<'_>,
2221 dialect: Option<DialectType>,
2222) -> Result<Expression> {
2223 if let Expression::Select(ref select) = scope_expr {
2224 match column {
2225 ColumnRef::Name(name) => {
2226 let normalized_name = normalize_column_name(name, dialect);
2227 for expr in &select.expressions {
2228 if let Some(alias_or_name) = get_alias_or_name(expr) {
2229 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2230 return Ok(expr.clone());
2231 }
2232 }
2233 }
2234 if let Some(expr) = synthesize_star_passthrough_expr(select, name) {
2235 return Ok(expr);
2236 }
2237 Err(crate::error::Error::parse(
2238 format!("Cannot find column '{}' in query", name),
2239 0,
2240 0,
2241 0,
2242 0,
2243 ))
2244 }
2245 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
2246 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
2247 }),
2248 }
2249 } else {
2250 Err(crate::error::Error::parse(
2251 "Expected SELECT expression for column lookup",
2252 0,
2253 0,
2254 0,
2255 0,
2256 ))
2257 }
2258}
2259
2260fn synthesize_star_passthrough_expr(select: &Select, name: &str) -> Option<Expression> {
2261 let sources = get_select_sources(select);
2262 if sources.is_empty() {
2263 return None;
2264 }
2265
2266 let mut candidate_aliases = Vec::new();
2267 let mut seen = HashSet::new();
2268
2269 for expr in &select.expressions {
2270 let aliases = match star_passthrough_source_aliases(expr, &sources) {
2271 StarPassthroughSources::None => continue,
2272 StarPassthroughSources::Ambiguous => return None,
2273 StarPassthroughSources::Aliases(aliases) => aliases,
2274 };
2275
2276 for alias in aliases {
2277 if seen.insert(alias.clone()) {
2278 candidate_aliases.push(alias);
2279 }
2280 }
2281 }
2282
2283 match candidate_aliases.as_slice() {
2284 [alias] => {
2285 let table = Identifier::new(alias.clone());
2286 Some(make_column_expr(name, Some(&table)))
2287 }
2288 _ => None,
2289 }
2290}
2291
2292enum StarPassthroughSources {
2293 None,
2294 Ambiguous,
2295 Aliases(Vec<String>),
2296}
2297
2298fn star_passthrough_source_aliases(
2299 expr: &Expression,
2300 sources: &[SourceInfo],
2301) -> StarPassthroughSources {
2302 match expr {
2303 Expression::Star(star) => star_source_aliases(star.table.as_ref(), sources),
2304 Expression::Column(column) if column.name.name == "*" => {
2305 star_source_aliases(column.table.as_ref(), sources)
2306 }
2307 Expression::Annotated(annotated) => {
2308 star_passthrough_source_aliases(&annotated.this, sources)
2309 }
2310 _ => StarPassthroughSources::None,
2311 }
2312}
2313
2314fn star_source_aliases(
2315 qualifier: Option<&Identifier>,
2316 sources: &[SourceInfo],
2317) -> StarPassthroughSources {
2318 if let Some(qualifier) = qualifier {
2319 let mut aliases = Vec::new();
2320
2321 for source in sources {
2322 if source_matches_star_qualifier(source, qualifier) {
2323 aliases.push(source.alias.clone());
2324 }
2325 }
2326
2327 return match aliases.len() {
2328 0 => StarPassthroughSources::None,
2329 1 => StarPassthroughSources::Aliases(aliases),
2330 _ => StarPassthroughSources::Ambiguous,
2331 };
2332 }
2333
2334 match sources {
2335 [source] if source.quoted => StarPassthroughSources::None,
2339 [source] => StarPassthroughSources::Aliases(vec![source.alias.clone()]),
2340 [] => StarPassthroughSources::None,
2341 _ => StarPassthroughSources::Ambiguous,
2342 }
2343}
2344
2345fn source_matches_star_qualifier(source: &SourceInfo, qualifier: &Identifier) -> bool {
2346 if source.normalized == normalize_cte_name(qualifier) {
2347 return true;
2348 }
2349
2350 if qualifier.quoted {
2351 source.alias == qualifier.name
2352 } else {
2353 source.alias.eq_ignore_ascii_case(&qualifier.name)
2354 }
2355}
2356
2357fn column_to_index(
2359 set_op_expr: &Expression,
2360 name: &str,
2361 dialect: Option<DialectType>,
2362) -> Result<usize> {
2363 let normalized_name = normalize_column_name(name, dialect);
2364 let mut expr = set_op_expr;
2365 loop {
2366 match expr {
2367 Expression::Union(u) => expr = &u.left,
2368 Expression::Intersect(i) => expr = &i.left,
2369 Expression::Except(e) => expr = &e.left,
2370 Expression::Subquery(subquery) => expr = &subquery.this,
2371 Expression::Cte(cte) => expr = &cte.this,
2372 Expression::Paren(paren) => expr = &paren.this,
2373 Expression::Select(select) => {
2374 for (i, e) in select.expressions.iter().enumerate() {
2375 if let Some(alias_or_name) = get_alias_or_name(e) {
2376 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2377 return Ok(i);
2378 }
2379 }
2380 }
2381 return Err(crate::error::Error::parse(
2382 format!("Cannot find column '{}' in set operation", name),
2383 0,
2384 0,
2385 0,
2386 0,
2387 ));
2388 }
2389 _ => {
2390 return Err(crate::error::Error::parse(
2391 "Expected SELECT or set operation",
2392 0,
2393 0,
2394 0,
2395 0,
2396 ))
2397 }
2398 }
2399 }
2400}
2401
2402fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
2403 normalize_name(name, dialect, false, true)
2404}
2405
2406fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
2408 if let Expression::Select(select) = select_expr {
2409 let mut trimmed = select.as_ref().clone();
2410 trimmed.expressions = vec![target_expr.clone()];
2411 Expression::Select(Box::new(trimmed))
2412 } else {
2413 select_expr.clone()
2414 }
2415}
2416
2417fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
2419 if scope.cte_sources.contains_key(source_name) {
2421 for cte_scope in &scope.cte_scopes {
2422 if let Expression::Cte(cte) = &cte_scope.expression {
2423 if cte.alias.name == source_name {
2424 return Some(cte_scope);
2425 }
2426 }
2427 }
2428 }
2429
2430 if let Some(source_info) = scope.sources.get(source_name) {
2432 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
2433 if let Some(query) = derived_source_query(&source_info.expression) {
2434 for dt_scope in &scope.derived_table_scopes {
2435 if expressions_equivalent_after_wrappers(&dt_scope.expression, query) {
2436 return Some(dt_scope);
2437 }
2438 }
2439 }
2440 }
2441 }
2442
2443 None
2444}
2445
2446fn find_child_scope_in<'a>(
2450 all_cte_scopes: &[&'a Scope],
2451 scope: &'a Scope,
2452 source_name: &str,
2453) -> Option<&'a Scope> {
2454 for cte_scope in &scope.cte_scopes {
2456 if let Expression::Cte(cte) = &cte_scope.expression {
2457 if cte.alias.name == source_name {
2458 return Some(cte_scope);
2459 }
2460 }
2461 }
2462
2463 for cte_scope in all_cte_scopes {
2465 if let Expression::Cte(cte) = &cte_scope.expression {
2466 if cte.alias.name == source_name {
2467 return Some(cte_scope);
2468 }
2469 }
2470 }
2471
2472 if let Some(source_info) = scope.sources.get(source_name) {
2474 if source_info.is_scope {
2475 if let Some(query) = derived_source_query(&source_info.expression) {
2476 for dt_scope in &scope.derived_table_scopes {
2477 if expressions_equivalent_after_wrappers(&dt_scope.expression, query) {
2478 return Some(dt_scope);
2479 }
2480 }
2481 }
2482 }
2483 }
2484
2485 None
2486}
2487
2488fn make_table_column_node(table: &str, column: &str) -> LineageNode {
2490 let mut node = LineageNode::new(
2491 format!("{}.{}", table, column),
2492 Expression::Column(Box::new(crate::expressions::Column {
2493 name: crate::expressions::Identifier::new(column.to_string()),
2494 table: Some(crate::expressions::Identifier::new(table.to_string())),
2495 join_mark: false,
2496 trailing_comments: vec![],
2497 span: None,
2498 inferred_type: None,
2499 })),
2500 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
2501 );
2502 node.source_name = table.to_string();
2503 node.source_kind = SourceKind::Table;
2504 node
2505}
2506
2507fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
2508 let mut parts: Vec<String> = Vec::new();
2509 if let Some(catalog) = &table_ref.catalog {
2510 parts.push(catalog.name.clone());
2511 }
2512 if let Some(schema) = &table_ref.schema {
2513 parts.push(schema.name.clone());
2514 }
2515 parts.push(table_ref.name.name.clone());
2516 parts.join(".")
2517}
2518
2519fn apply_source_info_context(
2520 node: &mut LineageNode,
2521 source_key: &str,
2522 source_info: &ScopeSourceInfo,
2523) {
2524 node.source_kind = source_info.kind;
2525 node.source_name =
2526 source_info
2527 .lineage_name
2528 .clone()
2529 .unwrap_or_else(|| match &source_info.expression {
2530 Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
2531 _ => source_key.to_string(),
2532 });
2533 node.source_alias = source_info.alias.clone();
2534}
2535
2536fn make_table_column_node_from_source(
2537 source_key: &str,
2538 column: &str,
2539 source_info: &ScopeSourceInfo,
2540) -> LineageNode {
2541 let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
2542 let mut node = LineageNode::new(
2543 format!("{}.{}", lineage_name, column),
2544 Expression::Column(Box::new(crate::expressions::Column {
2545 name: crate::expressions::Identifier::new(column.to_string()),
2546 table: Some(crate::expressions::Identifier::new(
2547 lineage_name.to_string(),
2548 )),
2549 join_mark: false,
2550 trailing_comments: vec![],
2551 span: None,
2552 inferred_type: None,
2553 })),
2554 source_info.expression.clone(),
2555 );
2556
2557 apply_source_info_context(&mut node, source_key, source_info);
2558
2559 node
2560}
2561
2562#[derive(Debug, Clone)]
2564struct SimpleColumnRef {
2565 table: Option<crate::expressions::Identifier>,
2566 column: String,
2567}
2568
2569fn find_column_refs_in_expr(
2571 expr: &Expression,
2572 dialect: Option<DialectType>,
2573) -> Vec<SimpleColumnRef> {
2574 let mut refs = Vec::new();
2575 collect_column_refs(expr, dialect, &mut refs, None);
2576 refs
2577}
2578
2579fn find_column_refs_in_expr_with_select(
2580 expr: &Expression,
2581 select_expr: &Expression,
2582 dialect: Option<DialectType>,
2583) -> Vec<SimpleColumnRef> {
2584 let named_windows = match select_expr {
2585 Expression::Select(select) => select.windows.as_deref(),
2586 _ => None,
2587 };
2588 let mut refs = Vec::new();
2589 collect_column_refs(expr, dialect, &mut refs, named_windows);
2590 refs
2591}
2592
2593fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
2594 match expr {
2595 Expression::Column(col) => {
2596 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
2597 }
2598 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
2599 _ => false,
2600 }
2601}
2602
2603fn collect_column_refs(
2604 expr: &Expression,
2605 dialect: Option<DialectType>,
2606 refs: &mut Vec<SimpleColumnRef>,
2607 named_windows: Option<&[NamedWindow]>,
2608) {
2609 let mut stack: Vec<&Expression> = vec![expr];
2610
2611 while let Some(current) = stack.pop() {
2612 match current {
2613 Expression::Column(col) => {
2615 refs.push(SimpleColumnRef {
2616 table: col.table.clone(),
2617 column: col.name.name.clone(),
2618 });
2619 }
2620
2621 Expression::Subquery(_) | Expression::Exists(_) => {}
2623
2624 Expression::And(op)
2626 | Expression::Or(op)
2627 | Expression::Eq(op)
2628 | Expression::Neq(op)
2629 | Expression::Lt(op)
2630 | Expression::Lte(op)
2631 | Expression::Gt(op)
2632 | Expression::Gte(op)
2633 | Expression::Add(op)
2634 | Expression::Sub(op)
2635 | Expression::Mul(op)
2636 | Expression::Div(op)
2637 | Expression::Mod(op)
2638 | Expression::BitwiseAnd(op)
2639 | Expression::BitwiseOr(op)
2640 | Expression::BitwiseXor(op)
2641 | Expression::BitwiseLeftShift(op)
2642 | Expression::BitwiseRightShift(op)
2643 | Expression::Concat(op)
2644 | Expression::Adjacent(op)
2645 | Expression::TsMatch(op)
2646 | Expression::PropertyEQ(op)
2647 | Expression::ArrayContainsAll(op)
2648 | Expression::ArrayContainedBy(op)
2649 | Expression::ArrayOverlaps(op)
2650 | Expression::JSONBContainsAllTopKeys(op)
2651 | Expression::JSONBContainsAnyTopKeys(op)
2652 | Expression::JSONBDeleteAtPath(op)
2653 | Expression::ExtendsLeft(op)
2654 | Expression::ExtendsRight(op)
2655 | Expression::Is(op)
2656 | Expression::MemberOf(op)
2657 | Expression::NullSafeEq(op)
2658 | Expression::NullSafeNeq(op)
2659 | Expression::Glob(op)
2660 | Expression::Match(op) => {
2661 stack.push(&op.left);
2662 stack.push(&op.right);
2663 }
2664
2665 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
2667 stack.push(&u.this);
2668 }
2669
2670 Expression::Upper(f)
2672 | Expression::Lower(f)
2673 | Expression::Length(f)
2674 | Expression::LTrim(f)
2675 | Expression::RTrim(f)
2676 | Expression::Reverse(f)
2677 | Expression::Abs(f)
2678 | Expression::Sqrt(f)
2679 | Expression::Cbrt(f)
2680 | Expression::Ln(f)
2681 | Expression::Exp(f)
2682 | Expression::Sign(f)
2683 | Expression::Date(f)
2684 | Expression::Time(f)
2685 | Expression::DateFromUnixDate(f)
2686 | Expression::UnixDate(f)
2687 | Expression::UnixSeconds(f)
2688 | Expression::UnixMillis(f)
2689 | Expression::UnixMicros(f)
2690 | Expression::TimeStrToDate(f)
2691 | Expression::DateToDi(f)
2692 | Expression::DiToDate(f)
2693 | Expression::TsOrDiToDi(f)
2694 | Expression::TsOrDsToDatetime(f)
2695 | Expression::TsOrDsToTimestamp(f)
2696 | Expression::YearOfWeek(f)
2697 | Expression::YearOfWeekIso(f)
2698 | Expression::Initcap(f)
2699 | Expression::Ascii(f)
2700 | Expression::Chr(f)
2701 | Expression::Soundex(f)
2702 | Expression::ByteLength(f)
2703 | Expression::Hex(f)
2704 | Expression::LowerHex(f)
2705 | Expression::Unicode(f)
2706 | Expression::Radians(f)
2707 | Expression::Degrees(f)
2708 | Expression::Sin(f)
2709 | Expression::Cos(f)
2710 | Expression::Tan(f)
2711 | Expression::Asin(f)
2712 | Expression::Acos(f)
2713 | Expression::Atan(f)
2714 | Expression::IsNan(f)
2715 | Expression::IsInf(f)
2716 | Expression::ArrayLength(f)
2717 | Expression::ArraySize(f)
2718 | Expression::Cardinality(f)
2719 | Expression::ArrayReverse(f)
2720 | Expression::ArrayDistinct(f)
2721 | Expression::ArrayFlatten(f)
2722 | Expression::ArrayCompact(f)
2723 | Expression::Explode(f)
2724 | Expression::ExplodeOuter(f)
2725 | Expression::ToArray(f)
2726 | Expression::MapFromEntries(f)
2727 | Expression::MapKeys(f)
2728 | Expression::MapValues(f)
2729 | Expression::JsonArrayLength(f)
2730 | Expression::JsonKeys(f)
2731 | Expression::JsonType(f)
2732 | Expression::ParseJson(f)
2733 | Expression::ToJson(f)
2734 | Expression::Typeof(f)
2735 | Expression::BitwiseCount(f)
2736 | Expression::Year(f)
2737 | Expression::Month(f)
2738 | Expression::Day(f)
2739 | Expression::Hour(f)
2740 | Expression::Minute(f)
2741 | Expression::Second(f)
2742 | Expression::DayOfWeek(f)
2743 | Expression::DayOfWeekIso(f)
2744 | Expression::DayOfMonth(f)
2745 | Expression::DayOfYear(f)
2746 | Expression::WeekOfYear(f)
2747 | Expression::Quarter(f)
2748 | Expression::Epoch(f)
2749 | Expression::EpochMs(f)
2750 | Expression::TimeStrToUnix(f)
2751 | Expression::SHA(f)
2752 | Expression::SHA1Digest(f)
2753 | Expression::TimeToUnix(f)
2754 | Expression::JSONBool(f)
2755 | Expression::Int64(f)
2756 | Expression::MD5NumberLower64(f)
2757 | Expression::MD5NumberUpper64(f)
2758 | Expression::DateStrToDate(f)
2759 | Expression::DateToDateStr(f) => {
2760 stack.push(&f.this);
2761 }
2762
2763 Expression::Power(f)
2765 | Expression::NullIf(f)
2766 | Expression::IfNull(f)
2767 | Expression::Nvl(f)
2768 | Expression::UnixToTimeStr(f)
2769 | Expression::Contains(f)
2770 | Expression::StartsWith(f)
2771 | Expression::EndsWith(f)
2772 | Expression::Levenshtein(f)
2773 | Expression::ModFunc(f)
2774 | Expression::Atan2(f)
2775 | Expression::IntDiv(f)
2776 | Expression::AddMonths(f)
2777 | Expression::MonthsBetween(f)
2778 | Expression::NextDay(f)
2779 | Expression::ArrayContains(f)
2780 | Expression::ArrayPosition(f)
2781 | Expression::ArrayAppend(f)
2782 | Expression::ArrayPrepend(f)
2783 | Expression::ArrayUnion(f)
2784 | Expression::ArrayExcept(f)
2785 | Expression::ArrayRemove(f)
2786 | Expression::StarMap(f)
2787 | Expression::MapFromArrays(f)
2788 | Expression::MapContainsKey(f)
2789 | Expression::ElementAt(f)
2790 | Expression::JsonMergePatch(f)
2791 | Expression::JSONBContains(f)
2792 | Expression::JSONBExtract(f) => {
2793 stack.push(&f.this);
2794 stack.push(&f.expression);
2795 }
2796
2797 Expression::Greatest(f)
2799 | Expression::Least(f)
2800 | Expression::Coalesce(f)
2801 | Expression::ArrayConcat(f)
2802 | Expression::ArrayIntersect(f)
2803 | Expression::ArrayZip(f)
2804 | Expression::MapConcat(f)
2805 | Expression::JsonArray(f) => {
2806 for e in &f.expressions {
2807 stack.push(e);
2808 }
2809 }
2810
2811 Expression::Sum(f)
2813 | Expression::Avg(f)
2814 | Expression::Min(f)
2815 | Expression::Max(f)
2816 | Expression::ArrayAgg(f)
2817 | Expression::CountIf(f)
2818 | Expression::Stddev(f)
2819 | Expression::StddevPop(f)
2820 | Expression::StddevSamp(f)
2821 | Expression::Variance(f)
2822 | Expression::VarPop(f)
2823 | Expression::VarSamp(f)
2824 | Expression::Median(f)
2825 | Expression::Mode(f)
2826 | Expression::First(f)
2827 | Expression::Last(f)
2828 | Expression::AnyValue(f)
2829 | Expression::ApproxDistinct(f)
2830 | Expression::ApproxCountDistinct(f)
2831 | Expression::LogicalAnd(f)
2832 | Expression::LogicalOr(f)
2833 | Expression::Skewness(f)
2834 | Expression::ArrayConcatAgg(f)
2835 | Expression::ArrayUniqueAgg(f)
2836 | Expression::BoolXorAgg(f)
2837 | Expression::BitwiseAndAgg(f)
2838 | Expression::BitwiseOrAgg(f)
2839 | Expression::BitwiseXorAgg(f) => {
2840 stack.push(&f.this);
2841 if let Some(ref filter) = f.filter {
2842 stack.push(filter);
2843 }
2844 if let Some((ref expr, _)) = f.having_max {
2845 stack.push(expr);
2846 }
2847 if let Some(ref limit) = f.limit {
2848 stack.push(limit);
2849 }
2850 }
2851
2852 Expression::Function(func) => {
2854 for arg in &func.args {
2855 stack.push(arg);
2856 }
2857 }
2858 Expression::AggregateFunction(func) => {
2859 for arg in &func.args {
2860 stack.push(arg);
2861 }
2862 if let Some(ref filter) = func.filter {
2863 stack.push(filter);
2864 }
2865 if let Some(ref limit) = func.limit {
2866 stack.push(limit);
2867 }
2868 }
2869
2870 Expression::WindowFunction(wf) => {
2872 stack.push(&wf.this);
2873 for e in &wf.over.partition_by {
2874 stack.push(e);
2875 }
2876 for e in &wf.over.order_by {
2877 stack.push(&e.this);
2878 }
2879 if let Some(keep) = &wf.keep {
2880 for e in &keep.order_by {
2881 stack.push(&e.this);
2882 }
2883 }
2884 if let (Some(window_name), Some(named_windows)) =
2885 (&wf.over.window_name, named_windows)
2886 {
2887 for named_window in named_windows {
2888 if named_window
2889 .name
2890 .name
2891 .eq_ignore_ascii_case(&window_name.name)
2892 {
2893 for e in &named_window.spec.partition_by {
2894 stack.push(e);
2895 }
2896 for e in &named_window.spec.order_by {
2897 stack.push(&e.this);
2898 }
2899 }
2900 }
2901 }
2902 }
2903
2904 Expression::Alias(a) => {
2906 stack.push(&a.this);
2907 }
2908 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
2909 stack.push(&c.this);
2910 if let Some(ref fmt) = c.format {
2911 stack.push(fmt);
2912 }
2913 if let Some(ref def) = c.default {
2914 stack.push(def);
2915 }
2916 }
2917 Expression::Paren(p) => {
2918 stack.push(&p.this);
2919 }
2920 Expression::Annotated(a) => {
2921 stack.push(&a.this);
2922 }
2923 Expression::Case(case) => {
2924 if let Some(ref operand) = case.operand {
2925 stack.push(operand);
2926 }
2927 for (cond, result) in &case.whens {
2928 stack.push(cond);
2929 stack.push(result);
2930 }
2931 if let Some(ref else_expr) = case.else_ {
2932 stack.push(else_expr);
2933 }
2934 }
2935 Expression::Collation(c) => {
2936 stack.push(&c.this);
2937 }
2938 Expression::In(i) => {
2939 stack.push(&i.this);
2940 for e in &i.expressions {
2941 stack.push(e);
2942 }
2943 if let Some(ref q) = i.query {
2944 stack.push(q);
2945 }
2946 if let Some(ref u) = i.unnest {
2947 stack.push(u);
2948 }
2949 }
2950 Expression::Between(b) => {
2951 stack.push(&b.this);
2952 stack.push(&b.low);
2953 stack.push(&b.high);
2954 }
2955 Expression::IsNull(n) => {
2956 stack.push(&n.this);
2957 }
2958 Expression::IsTrue(t) | Expression::IsFalse(t) => {
2959 stack.push(&t.this);
2960 }
2961 Expression::IsJson(j) => {
2962 stack.push(&j.this);
2963 }
2964 Expression::Like(l) | Expression::ILike(l) => {
2965 stack.push(&l.left);
2966 stack.push(&l.right);
2967 if let Some(ref esc) = l.escape {
2968 stack.push(esc);
2969 }
2970 }
2971 Expression::SimilarTo(s) => {
2972 stack.push(&s.this);
2973 stack.push(&s.pattern);
2974 if let Some(ref esc) = s.escape {
2975 stack.push(esc);
2976 }
2977 }
2978 Expression::Ordered(o) => {
2979 stack.push(&o.this);
2980 }
2981 Expression::Array(a) => {
2982 for e in &a.expressions {
2983 stack.push(e);
2984 }
2985 }
2986 Expression::Tuple(t) => {
2987 for e in &t.expressions {
2988 stack.push(e);
2989 }
2990 }
2991 Expression::Struct(s) => {
2992 for (_, e) in &s.fields {
2993 stack.push(e);
2994 }
2995 }
2996 Expression::Subscript(s) => {
2997 stack.push(&s.this);
2998 stack.push(&s.index);
2999 }
3000 Expression::Dot(d) => {
3001 stack.push(&d.this);
3002 }
3003 Expression::MethodCall(m) => {
3004 if !matches!(dialect, Some(DialectType::BigQuery))
3005 || !is_bigquery_safe_namespace_receiver(&m.this)
3006 {
3007 stack.push(&m.this);
3008 }
3009 for arg in &m.args {
3010 stack.push(arg);
3011 }
3012 }
3013 Expression::ArraySlice(s) => {
3014 stack.push(&s.this);
3015 if let Some(ref start) = s.start {
3016 stack.push(start);
3017 }
3018 if let Some(ref end) = s.end {
3019 stack.push(end);
3020 }
3021 }
3022 Expression::Lambda(l) => {
3023 stack.push(&l.body);
3024 }
3025 Expression::NamedArgument(n) => {
3026 stack.push(&n.value);
3027 }
3028 Expression::Lateral(l) => {
3029 stack.push(&l.this);
3030 if let Some(ref view) = l.view {
3031 stack.push(view);
3032 }
3033 if let Some(ref outer) = l.outer {
3034 stack.push(outer);
3035 }
3036 if let Some(ref ordinality) = l.ordinality {
3037 stack.push(ordinality);
3038 }
3039 }
3040 Expression::LateralView(lv) => {
3041 stack.push(&lv.this);
3042 }
3043 Expression::TryCatch(t) => {
3044 for stmt in &t.try_body {
3045 stack.push(stmt);
3046 }
3047 if let Some(catch_body) = &t.catch_body {
3048 for stmt in catch_body {
3049 stack.push(stmt);
3050 }
3051 }
3052 }
3053 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
3054 stack.push(e);
3055 }
3056
3057 Expression::Substring(f) => {
3059 stack.push(&f.this);
3060 stack.push(&f.start);
3061 if let Some(ref len) = f.length {
3062 stack.push(len);
3063 }
3064 }
3065 Expression::Trim(f) => {
3066 stack.push(&f.this);
3067 if let Some(ref chars) = f.characters {
3068 stack.push(chars);
3069 }
3070 }
3071 Expression::Replace(f) => {
3072 stack.push(&f.this);
3073 stack.push(&f.old);
3074 stack.push(&f.new);
3075 }
3076 Expression::IfFunc(f) => {
3077 stack.push(&f.condition);
3078 stack.push(&f.true_value);
3079 if let Some(ref fv) = f.false_value {
3080 stack.push(fv);
3081 }
3082 }
3083 Expression::Nvl2(f) => {
3084 stack.push(&f.this);
3085 stack.push(&f.true_value);
3086 stack.push(&f.false_value);
3087 }
3088 Expression::ConcatWs(f) => {
3089 stack.push(&f.separator);
3090 for e in &f.expressions {
3091 stack.push(e);
3092 }
3093 }
3094 Expression::Count(f) => {
3095 if let Some(ref this) = f.this {
3096 stack.push(this);
3097 }
3098 if let Some(ref filter) = f.filter {
3099 stack.push(filter);
3100 }
3101 }
3102 Expression::GroupConcat(f) => {
3103 stack.push(&f.this);
3104 if let Some(ref sep) = f.separator {
3105 stack.push(sep);
3106 }
3107 if let Some(ref filter) = f.filter {
3108 stack.push(filter);
3109 }
3110 }
3111 Expression::StringAgg(f) => {
3112 stack.push(&f.this);
3113 if let Some(ref sep) = f.separator {
3114 stack.push(sep);
3115 }
3116 if let Some(ref filter) = f.filter {
3117 stack.push(filter);
3118 }
3119 if let Some(ref limit) = f.limit {
3120 stack.push(limit);
3121 }
3122 }
3123 Expression::ListAgg(f) => {
3124 stack.push(&f.this);
3125 if let Some(ref sep) = f.separator {
3126 stack.push(sep);
3127 }
3128 if let Some(ref filter) = f.filter {
3129 stack.push(filter);
3130 }
3131 }
3132 Expression::SumIf(f) => {
3133 stack.push(&f.this);
3134 stack.push(&f.condition);
3135 if let Some(ref filter) = f.filter {
3136 stack.push(filter);
3137 }
3138 }
3139 Expression::DateAdd(f) | Expression::DateSub(f) => {
3140 stack.push(&f.this);
3141 stack.push(&f.interval);
3142 }
3143 Expression::DateDiff(f) => {
3144 stack.push(&f.this);
3145 stack.push(&f.expression);
3146 }
3147 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
3148 stack.push(&f.this);
3149 }
3150 Expression::Extract(f) => {
3151 stack.push(&f.this);
3152 }
3153 Expression::Round(f) => {
3154 stack.push(&f.this);
3155 if let Some(ref d) = f.decimals {
3156 stack.push(d);
3157 }
3158 }
3159 Expression::Floor(f) => {
3160 stack.push(&f.this);
3161 if let Some(ref s) = f.scale {
3162 stack.push(s);
3163 }
3164 if let Some(ref t) = f.to {
3165 stack.push(t);
3166 }
3167 }
3168 Expression::Ceil(f) => {
3169 stack.push(&f.this);
3170 if let Some(ref d) = f.decimals {
3171 stack.push(d);
3172 }
3173 if let Some(ref t) = f.to {
3174 stack.push(t);
3175 }
3176 }
3177 Expression::Log(f) => {
3178 stack.push(&f.this);
3179 if let Some(ref b) = f.base {
3180 stack.push(b);
3181 }
3182 }
3183 Expression::AtTimeZone(f) => {
3184 stack.push(&f.this);
3185 stack.push(&f.zone);
3186 }
3187 Expression::Lead(f) | Expression::Lag(f) => {
3188 stack.push(&f.this);
3189 if let Some(ref off) = f.offset {
3190 stack.push(off);
3191 }
3192 if let Some(ref def) = f.default {
3193 stack.push(def);
3194 }
3195 }
3196 Expression::FirstValue(f) | Expression::LastValue(f) => {
3197 stack.push(&f.this);
3198 }
3199 Expression::NthValue(f) => {
3200 stack.push(&f.this);
3201 stack.push(&f.offset);
3202 }
3203 Expression::Position(f) => {
3204 stack.push(&f.substring);
3205 stack.push(&f.string);
3206 if let Some(ref start) = f.start {
3207 stack.push(start);
3208 }
3209 }
3210 Expression::Decode(f) => {
3211 stack.push(&f.this);
3212 for (search, result) in &f.search_results {
3213 stack.push(search);
3214 stack.push(result);
3215 }
3216 if let Some(ref def) = f.default {
3217 stack.push(def);
3218 }
3219 }
3220 Expression::CharFunc(f) => {
3221 for arg in &f.args {
3222 stack.push(arg);
3223 }
3224 }
3225 Expression::ArraySort(f) => {
3226 stack.push(&f.this);
3227 if let Some(ref cmp) = f.comparator {
3228 stack.push(cmp);
3229 }
3230 }
3231 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
3232 stack.push(&f.this);
3233 stack.push(&f.separator);
3234 if let Some(ref nr) = f.null_replacement {
3235 stack.push(nr);
3236 }
3237 }
3238 Expression::ArrayFilter(f) => {
3239 stack.push(&f.this);
3240 stack.push(&f.filter);
3241 }
3242 Expression::ArrayTransform(f) => {
3243 stack.push(&f.this);
3244 stack.push(&f.transform);
3245 }
3246 Expression::Sequence(f)
3247 | Expression::Generate(f)
3248 | Expression::ExplodingGenerateSeries(f) => {
3249 stack.push(&f.start);
3250 stack.push(&f.stop);
3251 if let Some(ref step) = f.step {
3252 stack.push(step);
3253 }
3254 }
3255 Expression::JsonExtract(f)
3256 | Expression::JsonExtractScalar(f)
3257 | Expression::JsonQuery(f)
3258 | Expression::JsonValue(f) => {
3259 stack.push(&f.this);
3260 stack.push(&f.path);
3261 }
3262 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
3263 stack.push(&f.this);
3264 for p in &f.paths {
3265 stack.push(p);
3266 }
3267 }
3268 Expression::JsonObject(f) => {
3269 for (k, v) in &f.pairs {
3270 stack.push(k);
3271 stack.push(v);
3272 }
3273 }
3274 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
3275 stack.push(&f.this);
3276 for (path, val) in &f.path_values {
3277 stack.push(path);
3278 stack.push(val);
3279 }
3280 }
3281 Expression::Overlay(f) => {
3282 stack.push(&f.this);
3283 stack.push(&f.replacement);
3284 stack.push(&f.from);
3285 if let Some(ref len) = f.length {
3286 stack.push(len);
3287 }
3288 }
3289 Expression::Convert(f) => {
3290 stack.push(&f.this);
3291 if let Some(ref style) = f.style {
3292 stack.push(style);
3293 }
3294 }
3295 Expression::ApproxPercentile(f) => {
3296 stack.push(&f.this);
3297 stack.push(&f.percentile);
3298 if let Some(ref acc) = f.accuracy {
3299 stack.push(acc);
3300 }
3301 if let Some(ref filter) = f.filter {
3302 stack.push(filter);
3303 }
3304 }
3305 Expression::Percentile(f)
3306 | Expression::PercentileCont(f)
3307 | Expression::PercentileDisc(f) => {
3308 stack.push(&f.this);
3309 stack.push(&f.percentile);
3310 if let Some(ref filter) = f.filter {
3311 stack.push(filter);
3312 }
3313 }
3314 Expression::WithinGroup(f) => {
3315 stack.push(&f.this);
3316 for e in &f.order_by {
3317 stack.push(&e.this);
3318 }
3319 }
3320 Expression::Left(f) | Expression::Right(f) => {
3321 stack.push(&f.this);
3322 stack.push(&f.length);
3323 }
3324 Expression::Repeat(f) => {
3325 stack.push(&f.this);
3326 stack.push(&f.times);
3327 }
3328 Expression::Lpad(f) | Expression::Rpad(f) => {
3329 stack.push(&f.this);
3330 stack.push(&f.length);
3331 if let Some(ref fill) = f.fill {
3332 stack.push(fill);
3333 }
3334 }
3335 Expression::Split(f) => {
3336 stack.push(&f.this);
3337 stack.push(&f.delimiter);
3338 }
3339 Expression::RegexpLike(f) => {
3340 stack.push(&f.this);
3341 stack.push(&f.pattern);
3342 if let Some(ref flags) = f.flags {
3343 stack.push(flags);
3344 }
3345 }
3346 Expression::RegexpReplace(f) => {
3347 stack.push(&f.this);
3348 stack.push(&f.pattern);
3349 stack.push(&f.replacement);
3350 if let Some(ref flags) = f.flags {
3351 stack.push(flags);
3352 }
3353 }
3354 Expression::RegexpExtract(f) => {
3355 stack.push(&f.this);
3356 stack.push(&f.pattern);
3357 if let Some(ref group) = f.group {
3358 stack.push(group);
3359 }
3360 }
3361 Expression::ToDate(f) => {
3362 stack.push(&f.this);
3363 if let Some(ref fmt) = f.format {
3364 stack.push(fmt);
3365 }
3366 }
3367 Expression::ToTimestamp(f) => {
3368 stack.push(&f.this);
3369 if let Some(ref fmt) = f.format {
3370 stack.push(fmt);
3371 }
3372 }
3373 Expression::DateFormat(f) | Expression::FormatDate(f) => {
3374 stack.push(&f.this);
3375 stack.push(&f.format);
3376 }
3377 Expression::LastDay(f) => {
3378 stack.push(&f.this);
3379 }
3380 Expression::FromUnixtime(f) => {
3381 stack.push(&f.this);
3382 if let Some(ref fmt) = f.format {
3383 stack.push(fmt);
3384 }
3385 }
3386 Expression::UnixTimestamp(f) => {
3387 if let Some(ref this) = f.this {
3388 stack.push(this);
3389 }
3390 if let Some(ref fmt) = f.format {
3391 stack.push(fmt);
3392 }
3393 }
3394 Expression::MakeDate(f) => {
3395 stack.push(&f.year);
3396 stack.push(&f.month);
3397 stack.push(&f.day);
3398 }
3399 Expression::MakeTimestamp(f) => {
3400 stack.push(&f.year);
3401 stack.push(&f.month);
3402 stack.push(&f.day);
3403 stack.push(&f.hour);
3404 stack.push(&f.minute);
3405 stack.push(&f.second);
3406 if let Some(ref tz) = f.timezone {
3407 stack.push(tz);
3408 }
3409 }
3410 Expression::TruncFunc(f) => {
3411 stack.push(&f.this);
3412 if let Some(ref d) = f.decimals {
3413 stack.push(d);
3414 }
3415 }
3416 Expression::ArrayFunc(f) => {
3417 for e in &f.expressions {
3418 stack.push(e);
3419 }
3420 }
3421 Expression::Unnest(f) => {
3422 stack.push(&f.this);
3423 for e in &f.expressions {
3424 stack.push(e);
3425 }
3426 }
3427 Expression::StructFunc(f) => {
3428 for (_, e) in &f.fields {
3429 stack.push(e);
3430 }
3431 }
3432 Expression::StructExtract(f) => {
3433 stack.push(&f.this);
3434 }
3435 Expression::NamedStruct(f) => {
3436 for (k, v) in &f.pairs {
3437 stack.push(k);
3438 stack.push(v);
3439 }
3440 }
3441 Expression::MapFunc(f) => {
3442 for k in &f.keys {
3443 stack.push(k);
3444 }
3445 for v in &f.values {
3446 stack.push(v);
3447 }
3448 }
3449 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
3450 stack.push(&f.this);
3451 stack.push(&f.transform);
3452 }
3453 Expression::JsonArrayAgg(f) => {
3454 stack.push(&f.this);
3455 if let Some(ref filter) = f.filter {
3456 stack.push(filter);
3457 }
3458 }
3459 Expression::JsonObjectAgg(f) => {
3460 stack.push(&f.key);
3461 stack.push(&f.value);
3462 if let Some(ref filter) = f.filter {
3463 stack.push(filter);
3464 }
3465 }
3466 Expression::NTile(f) => {
3467 if let Some(ref n) = f.num_buckets {
3468 stack.push(n);
3469 }
3470 }
3471 Expression::Rand(f) => {
3472 if let Some(ref s) = f.seed {
3473 stack.push(s);
3474 }
3475 if let Some(ref lo) = f.lower {
3476 stack.push(lo);
3477 }
3478 if let Some(ref hi) = f.upper {
3479 stack.push(hi);
3480 }
3481 }
3482 Expression::Any(q) | Expression::All(q) => {
3483 stack.push(&q.this);
3484 stack.push(&q.subquery);
3485 }
3486 Expression::Overlaps(o) => {
3487 if let Some(ref this) = o.this {
3488 stack.push(this);
3489 }
3490 if let Some(ref expr) = o.expression {
3491 stack.push(expr);
3492 }
3493 if let Some(ref ls) = o.left_start {
3494 stack.push(ls);
3495 }
3496 if let Some(ref le) = o.left_end {
3497 stack.push(le);
3498 }
3499 if let Some(ref rs) = o.right_start {
3500 stack.push(rs);
3501 }
3502 if let Some(ref re) = o.right_end {
3503 stack.push(re);
3504 }
3505 }
3506 Expression::Interval(i) => {
3507 if let Some(ref this) = i.this {
3508 stack.push(this);
3509 }
3510 }
3511 Expression::TimeStrToTime(f) => {
3512 stack.push(&f.this);
3513 if let Some(ref zone) = f.zone {
3514 stack.push(zone);
3515 }
3516 }
3517 Expression::JSONBExtractScalar(f) => {
3518 stack.push(&f.this);
3519 stack.push(&f.expression);
3520 if let Some(ref jt) = f.json_type {
3521 stack.push(jt);
3522 }
3523 }
3524 Expression::JSONExtract(f) => {
3525 stack.push(&f.this);
3526 stack.push(&f.expression);
3527 for e in &f.expressions {
3528 stack.push(e);
3529 }
3530 if let Some(ref option) = f.option {
3531 stack.push(option);
3532 }
3533 if let Some(ref on_condition) = f.on_condition {
3534 stack.push(on_condition);
3535 }
3536 }
3537
3538 _ => {}
3543 }
3544 }
3545}
3546
3547#[cfg(test)]
3552mod tests {
3553 use super::*;
3554 use crate::dialects::{Dialect, DialectType};
3555 use crate::expressions::DataType;
3556 use crate::optimizer::annotate_types::annotate_types;
3557 use crate::parse_one;
3558 use crate::schema::{MappingSchema, Schema};
3559
3560 fn parse(sql: &str) -> Expression {
3561 let dialect = Dialect::get(DialectType::Generic);
3562 let ast = dialect.parse(sql).unwrap();
3563 ast.into_iter().next().unwrap()
3564 }
3565
3566 fn parse_dialect(sql: &str, dialect_type: DialectType) -> Expression {
3567 let dialect = Dialect::get(dialect_type);
3568 let ast = dialect.parse(sql).unwrap();
3569 ast.into_iter().next().unwrap()
3570 }
3571
3572 fn lineage_names(node: &LineageNode) -> Vec<String> {
3573 node.walk().map(|n| n.name.clone()).collect()
3574 }
3575
3576 fn assert_lineage_contains(node: &LineageNode, expected: &str) {
3577 let names = lineage_names(node);
3578 assert!(
3579 names.iter().any(|name| name == expected),
3580 "expected {expected} in lineage, got {names:?}"
3581 );
3582 }
3583
3584 #[test]
3585 fn test_simple_lineage() {
3586 let expr = parse("SELECT a FROM t");
3587 let node = lineage("a", &expr, None, false).unwrap();
3588
3589 assert_eq!(node.name, "a");
3590 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
3591 let names = node.downstream_names();
3593 assert!(
3594 names.iter().any(|n| n == "t.a"),
3595 "Expected t.a in downstream, got: {:?}",
3596 names
3597 );
3598 }
3599
3600 #[test]
3601 fn test_lineage_walk() {
3602 let root = LineageNode {
3603 name: "col_a".to_string(),
3604 expression: Expression::Null(crate::expressions::Null),
3605 source: Expression::Null(crate::expressions::Null),
3606 downstream: vec![LineageNode::new(
3607 "t.a",
3608 Expression::Null(crate::expressions::Null),
3609 Expression::Null(crate::expressions::Null),
3610 )],
3611 source_name: String::new(),
3612 source_kind: SourceKind::Unknown,
3613 source_alias: None,
3614 reference_node_name: String::new(),
3615 };
3616
3617 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
3618 assert_eq!(names.len(), 2);
3619 assert_eq!(names[0], "col_a");
3620 assert_eq!(names[1], "t.a");
3621 }
3622
3623 #[test]
3624 fn test_aliased_column() {
3625 let expr = parse("SELECT a + 1 AS b FROM t");
3626 let node = lineage("b", &expr, None, false).unwrap();
3627
3628 assert_eq!(node.name, "b");
3629 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3631 assert!(
3632 all_names.iter().any(|n| n.contains("a")),
3633 "Expected to trace to column a, got: {:?}",
3634 all_names
3635 );
3636 }
3637
3638 #[test]
3639 fn test_qualified_column() {
3640 let expr = parse("SELECT t.a FROM t");
3641 let node = lineage("a", &expr, None, false).unwrap();
3642
3643 assert_eq!(node.name, "a");
3644 let names = node.downstream_names();
3645 assert!(
3646 names.iter().any(|n| n == "t.a"),
3647 "Expected t.a, got: {:?}",
3648 names
3649 );
3650 }
3651
3652 #[test]
3653 fn test_unqualified_column() {
3654 let expr = parse("SELECT a FROM t");
3655 let node = lineage("a", &expr, None, false).unwrap();
3656
3657 let names = node.downstream_names();
3659 assert!(
3660 names.iter().any(|n| n == "t.a"),
3661 "Expected t.a, got: {:?}",
3662 names
3663 );
3664 }
3665
3666 #[test]
3667 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
3668 let query = "SELECT name FROM users";
3669 let dialect = Dialect::get(DialectType::BigQuery);
3670 let expr = dialect
3671 .parse(query)
3672 .unwrap()
3673 .into_iter()
3674 .next()
3675 .expect("expected one expression");
3676
3677 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3678 schema
3679 .add_table("users", &[("name".into(), DataType::Text)], None)
3680 .expect("schema setup");
3681
3682 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
3683 .expect("lineage without schema");
3684 let mut expr_without = node_without_schema.expression.clone();
3685 annotate_types(
3686 &mut expr_without,
3687 Some(&schema),
3688 Some(DialectType::BigQuery),
3689 );
3690 assert_eq!(
3691 expr_without.inferred_type(),
3692 None,
3693 "Expected unresolved root type without schema-aware lineage qualification"
3694 );
3695
3696 let node_with_schema = lineage_with_schema(
3697 "name",
3698 &expr,
3699 Some(&schema),
3700 Some(DialectType::BigQuery),
3701 false,
3702 )
3703 .expect("lineage with schema");
3704 let mut expr_with = node_with_schema.expression.clone();
3705 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
3706
3707 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
3708 }
3709
3710 #[test]
3711 fn test_lineage_with_schema_tolerates_partial_schema_for_known_column() {
3712 let expr = parse_dialect("SELECT order_id, amount FROM t", DialectType::DuckDB);
3713 let mut schema = MappingSchema::with_dialect(DialectType::DuckDB);
3714 schema
3715 .add_table(
3716 "t",
3717 &[("amount".into(), DataType::BigInt { length: None })],
3718 None,
3719 )
3720 .expect("schema setup");
3721
3722 let node = lineage_with_schema(
3723 "amount",
3724 &expr,
3725 Some(&schema),
3726 Some(DialectType::DuckDB),
3727 false,
3728 )
3729 .expect("lineage_with_schema should tolerate unrelated unknown columns");
3730
3731 assert_lineage_contains(&node, "t.amount");
3732 }
3733
3734 #[test]
3735 fn test_lineage_with_schema_tolerates_partial_schema_for_unknown_column() {
3736 let expr = parse_dialect("SELECT order_id, amount FROM t", DialectType::DuckDB);
3737 let mut schema = MappingSchema::with_dialect(DialectType::DuckDB);
3738 schema
3739 .add_table(
3740 "t",
3741 &[("amount".into(), DataType::BigInt { length: None })],
3742 None,
3743 )
3744 .expect("schema setup");
3745
3746 let node = lineage_with_schema(
3747 "order_id",
3748 &expr,
3749 Some(&schema),
3750 Some(DialectType::DuckDB),
3751 false,
3752 )
3753 .expect("lineage_with_schema should keep unknown selected columns");
3754
3755 assert_lineage_contains(&node, "t.order_id");
3756 }
3757
3758 #[test]
3759 fn test_lineage_with_schema_tolerates_partial_schema_for_join_conditions() {
3760 let expr = parse_dialect(
3761 "SELECT a.order_id, b.amount FROM t a JOIN u b ON a.id = b.id",
3762 DialectType::DuckDB,
3763 );
3764 let mut schema = MappingSchema::with_dialect(DialectType::DuckDB);
3765 schema
3766 .add_table(
3767 "t",
3768 &[("order_id".into(), DataType::BigInt { length: None })],
3769 None,
3770 )
3771 .expect("schema setup");
3772 schema
3773 .add_table(
3774 "u",
3775 &[("amount".into(), DataType::BigInt { length: None })],
3776 None,
3777 )
3778 .expect("schema setup");
3779
3780 let node = lineage_with_schema(
3781 "amount",
3782 &expr,
3783 Some(&schema),
3784 Some(DialectType::DuckDB),
3785 false,
3786 )
3787 .expect("lineage_with_schema should tolerate unknown join keys");
3788
3789 assert_lineage_contains(&node, "b.amount");
3790 }
3791
3792 #[test]
3793 fn test_lineage_with_schema_correlated_scalar_subquery() {
3794 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
3795 let dialect = Dialect::get(DialectType::BigQuery);
3796 let expr = dialect
3797 .parse(query)
3798 .unwrap()
3799 .into_iter()
3800 .next()
3801 .expect("expected one expression");
3802
3803 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3804 schema
3805 .add_table(
3806 "t1",
3807 &[("id".into(), DataType::BigInt { length: None })],
3808 None,
3809 )
3810 .expect("schema setup");
3811 schema
3812 .add_table(
3813 "t2",
3814 &[
3815 ("id".into(), DataType::BigInt { length: None }),
3816 ("val".into(), DataType::BigInt { length: None }),
3817 ],
3818 None,
3819 )
3820 .expect("schema setup");
3821
3822 let node = lineage_with_schema(
3823 "id",
3824 &expr,
3825 Some(&schema),
3826 Some(DialectType::BigQuery),
3827 false,
3828 )
3829 .expect("lineage_with_schema should handle correlated scalar subqueries");
3830
3831 assert_eq!(node.name, "id");
3832 }
3833
3834 #[test]
3835 fn test_lineage_with_schema_join_using() {
3836 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
3837 let dialect = Dialect::get(DialectType::BigQuery);
3838 let expr = dialect
3839 .parse(query)
3840 .unwrap()
3841 .into_iter()
3842 .next()
3843 .expect("expected one expression");
3844
3845 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3846 schema
3847 .add_table(
3848 "t1",
3849 &[("a".into(), DataType::BigInt { length: None })],
3850 None,
3851 )
3852 .expect("schema setup");
3853 schema
3854 .add_table(
3855 "t2",
3856 &[("a".into(), DataType::BigInt { length: None })],
3857 None,
3858 )
3859 .expect("schema setup");
3860
3861 let node = lineage_with_schema(
3862 "a",
3863 &expr,
3864 Some(&schema),
3865 Some(DialectType::BigQuery),
3866 false,
3867 )
3868 .expect("lineage_with_schema should handle JOIN USING");
3869
3870 assert_eq!(node.name, "a");
3871 }
3872
3873 #[test]
3874 fn test_lineage_with_schema_qualified_table_name() {
3875 let query = "SELECT a FROM raw.t1";
3876 let dialect = Dialect::get(DialectType::BigQuery);
3877 let expr = dialect
3878 .parse(query)
3879 .unwrap()
3880 .into_iter()
3881 .next()
3882 .expect("expected one expression");
3883
3884 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3885 schema
3886 .add_table(
3887 "raw.t1",
3888 &[("a".into(), DataType::BigInt { length: None })],
3889 None,
3890 )
3891 .expect("schema setup");
3892
3893 let node = lineage_with_schema(
3894 "a",
3895 &expr,
3896 Some(&schema),
3897 Some(DialectType::BigQuery),
3898 false,
3899 )
3900 .expect("lineage_with_schema should handle dotted schema.table names");
3901
3902 assert_eq!(node.name, "a");
3903 }
3904
3905 #[test]
3906 fn test_lineage_with_schema_none_matches_lineage() {
3907 let expr = parse("SELECT a FROM t");
3908 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
3909 let with_none =
3910 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
3911
3912 assert_eq!(with_none.name, baseline.name);
3913 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
3914 }
3915
3916 #[test]
3917 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
3918 let dialect = Dialect::get(DialectType::BigQuery);
3919 let expr = dialect
3920 .parse("SELECT Name AS name FROM teams")
3921 .unwrap()
3922 .into_iter()
3923 .next()
3924 .expect("expected one expression");
3925
3926 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3927 schema
3928 .add_table(
3929 "teams",
3930 &[("Name".into(), DataType::String { length: None })],
3931 None,
3932 )
3933 .expect("schema setup");
3934
3935 let node = lineage_with_schema(
3936 "name",
3937 &expr,
3938 Some(&schema),
3939 Some(DialectType::BigQuery),
3940 false,
3941 )
3942 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
3943
3944 let names = node.downstream_names();
3945 assert!(
3946 names.iter().any(|n| n == "teams.Name"),
3947 "Expected teams.Name in downstream, got: {:?}",
3948 names
3949 );
3950 }
3951
3952 #[test]
3953 fn test_lineage_bigquery_mixed_case_alias_lookup() {
3954 let dialect = Dialect::get(DialectType::BigQuery);
3955 let expr = dialect
3956 .parse("SELECT Name AS Name FROM teams")
3957 .unwrap()
3958 .into_iter()
3959 .next()
3960 .expect("expected one expression");
3961
3962 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
3963 .expect("lineage should resolve mixed-case aliases in BigQuery");
3964
3965 assert_eq!(node.name, "name");
3966 }
3967
3968 #[test]
3969 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
3970 let expr = parse_one(
3971 r#"
3972SELECT date_val AS week_start
3973FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
3974"#,
3975 DialectType::BigQuery,
3976 )
3977 .expect("parse");
3978
3979 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
3980 .expect("lineage should resolve UNNEST alias as a source");
3981 let child = node
3982 .downstream
3983 .first()
3984 .expect("week_start should have downstream lineage");
3985
3986 assert_eq!(child.name, "_0.date_val");
3987 assert_eq!(child.source_name, "_0");
3988 assert_eq!(child.source_kind, SourceKind::Virtual);
3989 assert_eq!(child.source_alias.as_deref(), Some("date_val"));
3990
3991 let Expression::Column(column) = &child.expression else {
3992 panic!(
3993 "expected downstream column expression, got {:?}",
3994 child.expression
3995 );
3996 };
3997 assert_eq!(column.name.name, "date_val");
3998 assert_eq!(
3999 column.table.as_ref().map(|table| table.name.as_str()),
4000 Some("_0")
4001 );
4002 assert!(
4003 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
4004 "expected UNNEST source expression, got {:?}",
4005 child.source
4006 );
4007 }
4008
4009 #[test]
4010 fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
4011 let expr =
4012 parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
4013
4014 let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4015 let child = node.downstream.first().expect("id should have lineage");
4016
4017 assert_eq!(child.name, "date_val.id");
4018 assert_eq!(child.source_name, "date_val");
4019 assert_eq!(child.source_kind, SourceKind::Table);
4020 assert_eq!(child.source_alias, None);
4021 }
4022
4023 #[test]
4024 fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
4025 let expr = parse_one(
4026 r#"
4027SELECT a.a AS first_value, b.b AS second_value
4028FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
4029JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
4030"#,
4031 DialectType::BigQuery,
4032 )
4033 .expect("parse");
4034
4035 let first =
4036 lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4037 let second =
4038 lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4039
4040 let first_child = first.downstream.first().expect("first source");
4041 let second_child = second.downstream.first().expect("second source");
4042
4043 assert_eq!(first_child.name, "_0.a");
4044 assert_eq!(first_child.source_name, "_0");
4045 assert_eq!(first_child.source_alias.as_deref(), Some("a"));
4046 assert_eq!(first_child.source_kind, SourceKind::Virtual);
4047
4048 assert_eq!(second_child.name, "_1.b");
4049 assert_eq!(second_child.source_name, "_1");
4050 assert_eq!(second_child.source_alias.as_deref(), Some("b"));
4051 assert_eq!(second_child.source_kind, SourceKind::Virtual);
4052 }
4053
4054 #[test]
4055 fn test_lineage_table_backed_unnest_points_to_real_source_column() {
4056 let expr = parse_one(
4057 r#"
4058SELECT item.item AS item
4059FROM t JOIN UNNEST(t.items) AS item ON TRUE
4060"#,
4061 DialectType::BigQuery,
4062 )
4063 .expect("parse");
4064
4065 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4066 let virtual_child = node.downstream.first().expect("virtual item source");
4067 assert_eq!(virtual_child.name, "_0.item");
4068 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4069
4070 let real_child = virtual_child
4071 .downstream
4072 .first()
4073 .expect("UNNEST(t.items) should depend on t.items");
4074 assert_eq!(real_child.name, "t.items");
4075 assert_eq!(real_child.source_name, "t");
4076 assert_eq!(real_child.source_kind, SourceKind::Table);
4077 }
4078
4079 #[test]
4080 fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
4081 let expr = parse_one(
4082 r#"
4083SELECT item AS item
4084FROM t JOIN UNNEST(t.items) AS item ON TRUE
4085"#,
4086 DialectType::BigQuery,
4087 )
4088 .expect("parse");
4089
4090 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4091 let virtual_child = node.downstream.first().expect("virtual item source");
4092 assert_eq!(virtual_child.name, "_0.item");
4093 assert_eq!(virtual_child.source_name, "_0");
4094 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4095 assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
4096
4097 let real_child = virtual_child
4098 .downstream
4099 .first()
4100 .expect("UNNEST(t.items) should depend on t.items");
4101 assert_eq!(real_child.name, "t.items");
4102 assert_eq!(real_child.source_name, "t");
4103 assert_eq!(real_child.source_kind, SourceKind::Table);
4104 }
4105
4106 #[test]
4107 fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
4108 let cases = [
4109 (
4110 DialectType::PostgreSQL,
4111 "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
4112 ),
4113 (
4114 DialectType::Presto,
4115 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
4116 ),
4117 (
4118 DialectType::Trino,
4119 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
4120 ),
4121 ];
4122
4123 for (dialect, sql) in cases {
4124 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
4125 let node = lineage("out", &expr, Some(dialect), false)
4126 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
4127 let virtual_child = node
4128 .downstream
4129 .first()
4130 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
4131
4132 assert_eq!(
4133 virtual_child.name, "_0.x",
4134 "unexpected virtual child for {dialect:?}"
4135 );
4136 assert_eq!(virtual_child.source_name, "_0");
4137 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4138 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
4139
4140 let real_child = virtual_child
4141 .downstream
4142 .first()
4143 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
4144 assert_eq!(real_child.name, "t.items");
4145 assert_eq!(real_child.source_kind, SourceKind::Table);
4146 }
4147 }
4148
4149 #[test]
4150 fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
4151 let cases = [
4152 (
4153 DialectType::Spark,
4154 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
4155 ),
4156 (
4157 DialectType::Hive,
4158 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
4159 ),
4160 ];
4161
4162 for (dialect, sql) in cases {
4163 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
4164 let node = lineage("out", &expr, Some(dialect), false)
4165 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
4166 let virtual_child = node
4167 .downstream
4168 .first()
4169 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
4170
4171 assert_eq!(virtual_child.name, "_0.x");
4172 assert_eq!(virtual_child.source_name, "_0");
4173 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4174 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
4175
4176 let real_child = virtual_child
4177 .downstream
4178 .first()
4179 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
4180 assert_eq!(real_child.name, "t.items");
4181 assert_eq!(real_child.source_kind, SourceKind::Table);
4182 }
4183 }
4184
4185 #[test]
4186 fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
4187 let expr = parse_one(
4188 "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
4189 DialectType::Snowflake,
4190 )
4191 .expect("parse");
4192
4193 let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
4194 let virtual_child = node.downstream.first().expect("virtual flatten source");
4195 assert_eq!(virtual_child.name, "_0.value");
4196 assert_eq!(virtual_child.source_name, "_0");
4197 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4198 assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
4199
4200 let real_child = virtual_child
4201 .downstream
4202 .first()
4203 .expect("FLATTEN input should depend on raw_events.payload");
4204 assert_eq!(real_child.name, "raw_events.payload");
4205 assert_eq!(real_child.source_kind, SourceKind::Table);
4206 }
4207
4208 #[test]
4209 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
4210 let expr = parse_one(
4211 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
4212 DialectType::Snowflake,
4213 )
4214 .expect("parse");
4215
4216 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4217 schema
4218 .add_table(
4219 "fact.some_daily_metrics",
4220 &[("date_utc".to_string(), DataType::Date)],
4221 None,
4222 )
4223 .expect("schema setup");
4224
4225 let node = lineage_with_schema(
4226 "recency",
4227 &expr,
4228 Some(&schema),
4229 Some(DialectType::Snowflake),
4230 false,
4231 )
4232 .expect("lineage_with_schema should not treat date part as a column");
4233
4234 let names = node.downstream_names();
4235 assert!(
4236 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4237 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4238 names
4239 );
4240 assert!(
4241 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4242 "Did not expect date part to appear as lineage column, got: {:?}",
4243 names
4244 );
4245 }
4246
4247 #[test]
4248 fn test_snowflake_datediff_parses_to_typed_ast() {
4249 let expr = parse_one(
4250 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
4251 DialectType::Snowflake,
4252 )
4253 .expect("parse");
4254
4255 match expr {
4256 Expression::Select(select) => match &select.expressions[0] {
4257 Expression::Alias(alias) => match &alias.this {
4258 Expression::DateDiff(f) => {
4259 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
4260 }
4261 other => panic!("expected DateDiff, got {other:?}"),
4262 },
4263 other => panic!("expected Alias, got {other:?}"),
4264 },
4265 other => panic!("expected Select, got {other:?}"),
4266 }
4267 }
4268
4269 #[test]
4270 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
4271 let expr = parse_one(
4272 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4273 DialectType::Snowflake,
4274 )
4275 .expect("parse");
4276
4277 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4278 schema
4279 .add_table(
4280 "fact.some_daily_metrics",
4281 &[("date_utc".to_string(), DataType::Date)],
4282 None,
4283 )
4284 .expect("schema setup");
4285
4286 let node = lineage_with_schema(
4287 "next_day",
4288 &expr,
4289 Some(&schema),
4290 Some(DialectType::Snowflake),
4291 false,
4292 )
4293 .expect("lineage_with_schema should not treat DATEADD date part as a column");
4294
4295 let names = node.downstream_names();
4296 assert!(
4297 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4298 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4299 names
4300 );
4301 assert!(
4302 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4303 "Did not expect date part to appear as lineage column, got: {:?}",
4304 names
4305 );
4306 }
4307
4308 #[test]
4309 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
4310 let expr = parse_one(
4311 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4312 DialectType::Snowflake,
4313 )
4314 .expect("parse");
4315
4316 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4317 schema
4318 .add_table(
4319 "fact.some_daily_metrics",
4320 &[("date_utc".to_string(), DataType::Date)],
4321 None,
4322 )
4323 .expect("schema setup");
4324
4325 let node = lineage_with_schema(
4326 "day_part",
4327 &expr,
4328 Some(&schema),
4329 Some(DialectType::Snowflake),
4330 false,
4331 )
4332 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
4333
4334 let names = node.downstream_names();
4335 assert!(
4336 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4337 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4338 names
4339 );
4340 assert!(
4341 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4342 "Did not expect date part to appear as lineage column, got: {:?}",
4343 names
4344 );
4345 }
4346
4347 #[test]
4348 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
4349 let expr = parse_one(
4350 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4351 DialectType::Snowflake,
4352 )
4353 .expect("parse");
4354
4355 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4356 schema
4357 .add_table(
4358 "fact.some_daily_metrics",
4359 &[("date_utc".to_string(), DataType::Date)],
4360 None,
4361 )
4362 .expect("schema setup");
4363
4364 let node = lineage_with_schema(
4365 "day_part",
4366 &expr,
4367 Some(&schema),
4368 Some(DialectType::Snowflake),
4369 false,
4370 )
4371 .expect("quoted DATE_PART should continue to work");
4372
4373 let names = node.downstream_names();
4374 assert!(
4375 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4376 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4377 names
4378 );
4379 }
4380
4381 #[test]
4382 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
4383 let expr = parse_one(
4384 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4385 DialectType::Snowflake,
4386 )
4387 .expect("parse");
4388
4389 match expr {
4390 Expression::Select(select) => match &select.expressions[0] {
4391 Expression::Alias(alias) => match &alias.this {
4392 Expression::Function(f) => {
4393 assert_eq!(f.name.to_uppercase(), "DATEADD");
4394 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4395 }
4396 other => panic!("expected generic DATEADD function, got {other:?}"),
4397 },
4398 other => panic!("expected Alias, got {other:?}"),
4399 },
4400 other => panic!("expected Select, got {other:?}"),
4401 }
4402 }
4403
4404 #[test]
4405 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
4406 let expr = parse_one(
4407 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4408 DialectType::Snowflake,
4409 )
4410 .expect("parse");
4411
4412 match expr {
4413 Expression::Select(select) => match &select.expressions[0] {
4414 Expression::Alias(alias) => match &alias.this {
4415 Expression::Function(f) => {
4416 assert_eq!(f.name.to_uppercase(), "DATE_PART");
4417 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4418 }
4419 other => panic!("expected generic DATE_PART function, got {other:?}"),
4420 },
4421 other => panic!("expected Alias, got {other:?}"),
4422 },
4423 other => panic!("expected Select, got {other:?}"),
4424 }
4425 }
4426
4427 #[test]
4428 fn test_snowflake_date_part_string_literal_stays_generic_function() {
4429 let expr = parse_one(
4430 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4431 DialectType::Snowflake,
4432 )
4433 .expect("parse");
4434
4435 match expr {
4436 Expression::Select(select) => match &select.expressions[0] {
4437 Expression::Alias(alias) => match &alias.this {
4438 Expression::Function(f) => {
4439 assert_eq!(f.name.to_uppercase(), "DATE_PART");
4440 }
4441 other => panic!("expected generic DATE_PART function, got {other:?}"),
4442 },
4443 other => panic!("expected Alias, got {other:?}"),
4444 },
4445 other => panic!("expected Select, got {other:?}"),
4446 }
4447 }
4448
4449 #[test]
4450 fn test_lineage_join() {
4451 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4452
4453 let node_a = lineage("a", &expr, None, false).unwrap();
4454 let names_a = node_a.downstream_names();
4455 assert!(
4456 names_a.iter().any(|n| n == "t.a"),
4457 "Expected t.a, got: {:?}",
4458 names_a
4459 );
4460
4461 let node_b = lineage("b", &expr, None, false).unwrap();
4462 let names_b = node_b.downstream_names();
4463 assert!(
4464 names_b.iter().any(|n| n == "s.b"),
4465 "Expected s.b, got: {:?}",
4466 names_b
4467 );
4468 }
4469
4470 #[test]
4471 fn test_lineage_alias_leaf_has_resolved_source_name() {
4472 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
4473 let node = lineage("col1", &expr, None, false).unwrap();
4474
4475 let names = node.downstream_names();
4477 assert!(
4478 names.iter().any(|n| n == "t1.col1"),
4479 "Expected aliased column edge t1.col1, got: {:?}",
4480 names
4481 );
4482
4483 let leaf = node
4485 .downstream
4486 .iter()
4487 .find(|n| n.name == "t1.col1")
4488 .expect("Expected t1.col1 leaf");
4489 assert_eq!(leaf.source_name, "table1");
4490 match &leaf.source {
4491 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
4492 _ => panic!("Expected leaf source to be a table expression"),
4493 }
4494 }
4495
4496 #[test]
4497 fn test_lineage_derived_table() {
4498 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
4499 let node = lineage("a", &expr, None, false).unwrap();
4500
4501 assert_eq!(node.name, "a");
4502 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4504 assert!(
4505 all_names.iter().any(|n| n == "t.a"),
4506 "Expected to trace through derived table to t.a, got: {:?}",
4507 all_names
4508 );
4509 }
4510
4511 #[test]
4512 fn test_lineage_cte() {
4513 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
4514 let node = lineage("a", &expr, None, false).unwrap();
4515
4516 assert_eq!(node.name, "a");
4517 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4518 assert!(
4519 all_names.iter().any(|n| n == "t.a"),
4520 "Expected to trace through CTE to t.a, got: {:?}",
4521 all_names
4522 );
4523 }
4524
4525 #[test]
4526 fn test_lineage_union() {
4527 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
4528 let node = lineage("a", &expr, None, false).unwrap();
4529
4530 assert_eq!(node.name, "a");
4531 assert_eq!(
4533 node.downstream.len(),
4534 2,
4535 "Expected 2 branches for UNION, got {}",
4536 node.downstream.len()
4537 );
4538 }
4539
4540 #[test]
4541 fn test_lineage_cte_union() {
4542 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
4543 let node = lineage("a", &expr, None, false).unwrap();
4544
4545 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4547 assert!(
4548 all_names.len() >= 3,
4549 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
4550 all_names
4551 );
4552 }
4553
4554 #[test]
4555 fn test_lineage_star() {
4556 let expr = parse("SELECT * FROM t");
4557 let node = lineage("*", &expr, None, false).unwrap();
4558
4559 assert_eq!(node.name, "*");
4560 assert!(
4562 !node.downstream.is_empty(),
4563 "Star should produce downstream nodes"
4564 );
4565 }
4566
4567 #[test]
4568 fn test_lineage_subquery_in_select() {
4569 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
4570 let node = lineage("x", &expr, None, false).unwrap();
4571
4572 assert_eq!(node.name, "x");
4573 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4575 assert!(
4576 all_names.len() >= 2,
4577 "Expected tracing into scalar subquery, got: {:?}",
4578 all_names
4579 );
4580 }
4581
4582 #[test]
4583 fn test_lineage_multiple_columns() {
4584 let expr = parse("SELECT a, b FROM t");
4585
4586 let node_a = lineage("a", &expr, None, false).unwrap();
4587 let node_b = lineage("b", &expr, None, false).unwrap();
4588
4589 assert_eq!(node_a.name, "a");
4590 assert_eq!(node_b.name, "b");
4591
4592 let names_a = node_a.downstream_names();
4594 let names_b = node_b.downstream_names();
4595 assert!(names_a.iter().any(|n| n == "t.a"));
4596 assert!(names_b.iter().any(|n| n == "t.b"));
4597 }
4598
4599 #[test]
4600 fn test_get_source_tables() {
4601 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4602 let node = lineage("a", &expr, None, false).unwrap();
4603
4604 let tables = get_source_tables(&node);
4605 assert!(
4606 tables.contains("t"),
4607 "Expected source table 't', got: {:?}",
4608 tables
4609 );
4610 }
4611
4612 #[test]
4613 fn test_lineage_column_not_found() {
4614 let expr = parse("SELECT a FROM t");
4615 let result = lineage("nonexistent", &expr, None, false);
4616 assert!(result.is_err());
4617 }
4618
4619 #[test]
4620 fn test_lineage_nested_cte() {
4621 let expr = parse(
4622 "WITH cte1 AS (SELECT a FROM t), \
4623 cte2 AS (SELECT a FROM cte1) \
4624 SELECT a FROM cte2",
4625 );
4626 let node = lineage("a", &expr, None, false).unwrap();
4627
4628 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4630 assert!(
4631 all_names.len() >= 3,
4632 "Expected to trace through nested CTEs, got: {:?}",
4633 all_names
4634 );
4635 }
4636
4637 #[test]
4638 fn test_trim_selects_true() {
4639 let expr = parse("SELECT a, b, c FROM t");
4640 let node = lineage("a", &expr, None, true).unwrap();
4641
4642 if let Expression::Select(select) = &node.source {
4644 assert_eq!(
4645 select.expressions.len(),
4646 1,
4647 "Trimmed source should have 1 expression, got {}",
4648 select.expressions.len()
4649 );
4650 } else {
4651 panic!("Expected Select source");
4652 }
4653 }
4654
4655 #[test]
4656 fn test_trim_selects_false() {
4657 let expr = parse("SELECT a, b, c FROM t");
4658 let node = lineage("a", &expr, None, false).unwrap();
4659
4660 if let Expression::Select(select) = &node.source {
4662 assert_eq!(
4663 select.expressions.len(),
4664 3,
4665 "Untrimmed source should have 3 expressions"
4666 );
4667 } else {
4668 panic!("Expected Select source");
4669 }
4670 }
4671
4672 #[test]
4673 fn test_lineage_expression_in_select() {
4674 let expr = parse("SELECT a + b AS c FROM t");
4675 let node = lineage("c", &expr, None, false).unwrap();
4676
4677 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4679 assert!(
4680 all_names.len() >= 3,
4681 "Expected to trace a + b to both columns, got: {:?}",
4682 all_names
4683 );
4684 }
4685
4686 #[test]
4687 fn test_set_operation_by_index() {
4688 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
4689
4690 let node = lineage("a", &expr, None, false).unwrap();
4692
4693 assert_eq!(node.downstream.len(), 2);
4695 }
4696
4697 fn print_node(node: &LineageNode, indent: usize) {
4700 let pad = " ".repeat(indent);
4701 println!(
4702 "{pad}name={:?} source_name={:?}",
4703 node.name, node.source_name
4704 );
4705 for child in &node.downstream {
4706 print_node(child, indent + 1);
4707 }
4708 }
4709
4710 #[test]
4711 fn test_issue18_repro() {
4712 let query = "SELECT UPPER(name) as upper_name FROM users";
4714 println!("Query: {query}\n");
4715
4716 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
4717 let exprs = dialect.parse(query).unwrap();
4718 let expr = &exprs[0];
4719
4720 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
4721 println!("lineage(\"upper_name\"):");
4722 print_node(&node, 1);
4723
4724 let names = node.downstream_names();
4725 assert!(
4726 names.iter().any(|n| n == "users.name"),
4727 "Expected users.name in downstream, got: {:?}",
4728 names
4729 );
4730 }
4731
4732 #[test]
4733 fn test_lineage_bigquery_safe_namespace_issue207() {
4734 let query = r#"
4735WITH import_cte AS (
4736 SELECT timestamp, data, operation
4737 FROM `project`.`dataset`.`source_table`
4738),
4739transform_cte AS (
4740 SELECT
4741 timestamp,
4742 SAFE.PARSE_JSON(data) AS json_data
4743 FROM import_cte
4744)
4745SELECT json_data FROM transform_cte
4746"#;
4747 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
4748 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4749 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4750 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4751
4752 assert!(
4753 names.iter().any(|name| name == "source_table.data"),
4754 "expected source_table.data in lineage, got {names:?}"
4755 );
4756 assert!(
4757 !names
4758 .iter()
4759 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
4760 "did not expect SAFE namespace receiver in lineage, got {names:?}"
4761 );
4762 }
4763
4764 #[test]
4765 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
4766 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
4767 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4768 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4769 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4770
4771 assert!(
4772 names.iter().any(|name| name == "t.data"),
4773 "expected t.data in lineage, got {names:?}"
4774 );
4775 assert!(
4776 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
4777 "did not expect SAFE namespace receiver in lineage, got {names:?}"
4778 );
4779 }
4780
4781 #[test]
4782 fn test_lineage_method_call_receiver_control() {
4783 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
4784 let node = lineage("out", &expr, None, false).expect("lineage");
4785 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4786
4787 assert!(
4788 names.iter().any(|name| name == "t.obj"),
4789 "expected ordinary method receiver to remain in lineage, got {names:?}"
4790 );
4791 assert!(
4792 names.iter().any(|name| name == "t.arg"),
4793 "expected method argument in lineage, got {names:?}"
4794 );
4795 }
4796
4797 #[test]
4798 fn test_lineage_upper_function() {
4799 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
4800 let node = lineage("upper_name", &expr, None, false).unwrap();
4801
4802 let names = node.downstream_names();
4803 assert!(
4804 names.iter().any(|n| n == "users.name"),
4805 "Expected users.name in downstream, got: {:?}",
4806 names
4807 );
4808 }
4809
4810 #[test]
4811 fn test_lineage_round_function() {
4812 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
4813 let node = lineage("rounded", &expr, None, false).unwrap();
4814
4815 let names = node.downstream_names();
4816 assert!(
4817 names.iter().any(|n| n == "products.price"),
4818 "Expected products.price in downstream, got: {:?}",
4819 names
4820 );
4821 }
4822
4823 #[test]
4824 fn test_lineage_coalesce_function() {
4825 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
4826 let node = lineage("val", &expr, None, false).unwrap();
4827
4828 let names = node.downstream_names();
4829 assert!(
4830 names.iter().any(|n| n == "t.a"),
4831 "Expected t.a in downstream, got: {:?}",
4832 names
4833 );
4834 assert!(
4835 names.iter().any(|n| n == "t.b"),
4836 "Expected t.b in downstream, got: {:?}",
4837 names
4838 );
4839 }
4840
4841 #[test]
4842 fn test_lineage_count_function() {
4843 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
4844 let node = lineage("cnt", &expr, None, false).unwrap();
4845
4846 let names = node.downstream_names();
4847 assert!(
4848 names.iter().any(|n| n == "t.id"),
4849 "Expected t.id in downstream, got: {:?}",
4850 names
4851 );
4852 }
4853
4854 #[test]
4855 fn test_lineage_sum_function() {
4856 let expr = parse("SELECT SUM(amount) AS total FROM t");
4857 let node = lineage("total", &expr, None, false).unwrap();
4858
4859 let names = node.downstream_names();
4860 assert!(
4861 names.iter().any(|n| n == "t.amount"),
4862 "Expected t.amount in downstream, got: {:?}",
4863 names
4864 );
4865 }
4866
4867 #[test]
4868 fn test_lineage_case_with_nested_functions() {
4869 let expr =
4870 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
4871 let node = lineage("result", &expr, None, false).unwrap();
4872
4873 let names = node.downstream_names();
4874 assert!(
4875 names.iter().any(|n| n == "t.x"),
4876 "Expected t.x in downstream, got: {:?}",
4877 names
4878 );
4879 assert!(
4880 names.iter().any(|n| n == "t.name"),
4881 "Expected t.name in downstream, got: {:?}",
4882 names
4883 );
4884 }
4885
4886 #[test]
4887 fn test_lineage_substring_function() {
4888 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
4889 let node = lineage("short", &expr, None, false).unwrap();
4890
4891 let names = node.downstream_names();
4892 assert!(
4893 names.iter().any(|n| n == "t.name"),
4894 "Expected t.name in downstream, got: {:?}",
4895 names
4896 );
4897 }
4898
4899 #[test]
4902 fn test_lineage_cte_select_star() {
4903 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
4907 let node = lineage("a", &expr, None, false).unwrap();
4908
4909 assert_eq!(node.name, "a");
4910 assert!(
4913 !node.downstream.is_empty(),
4914 "Expected downstream nodes tracing through CTE, got none"
4915 );
4916 }
4917
4918 #[test]
4919 fn test_lineage_schema_less_cte_star_passthrough_resolves_base_column() {
4920 let expr = parse("WITH c AS (SELECT * FROM t) SELECT c.x FROM c");
4921 let node = lineage("x", &expr, None, false).unwrap();
4922
4923 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4924 assert!(
4925 all_names.iter().any(|name| name == "t.x"),
4926 "Expected schema-less CTE star passthrough to reach t.x, got: {:?}",
4927 all_names
4928 );
4929
4930 let cte_node = node
4931 .walk()
4932 .find(|child| child.source_kind == SourceKind::Cte && child.source_name == "c")
4933 .expect("expected CTE hop with source_name c");
4934 assert_eq!(cte_node.source_kind, SourceKind::Cte);
4935 assert_eq!(cte_node.source_name, "c");
4936 }
4937
4938 #[test]
4939 fn test_lineage_schema_less_cte_star_passthrough_with_aggregation() {
4940 let expr = parse(
4941 "WITH c AS (SELECT * FROM t) \
4942 SELECT SUM(c.x) AS s FROM c GROUP BY 1",
4943 );
4944 let node = lineage("s", &expr, None, false).unwrap();
4945
4946 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4947 assert!(
4948 all_names.iter().any(|name| name == "t.x"),
4949 "Expected aggregate over CTE star passthrough to reach t.x, got: {:?}",
4950 all_names
4951 );
4952 }
4953
4954 #[test]
4955 fn test_lineage_schema_less_cte_star_passthrough_with_join_and_alias() {
4956 let expr = parse(
4957 "WITH a AS (SELECT * FROM t1), b AS (SELECT * FROM t2) \
4958 SELECT SUM(b.x) AS s FROM a LEFT JOIN b ON b.id = a.id GROUP BY a.k",
4959 );
4960 let node = lineage("s", &expr, None, false).unwrap();
4961
4962 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4963 assert!(
4964 all_names.iter().any(|name| name == "t2.x"),
4965 "Expected joined CTE star passthrough to reach t2.x, got: {:?}",
4966 all_names
4967 );
4968 }
4969
4970 #[test]
4971 fn test_lineage_schema_less_chained_cte_star_passthrough() {
4972 let expr = parse(
4973 "WITH c1 AS (SELECT * FROM t), \
4974 c2 AS (SELECT * FROM c1), \
4975 c3 AS (SELECT * FROM c2) \
4976 SELECT c3.x FROM c3",
4977 );
4978 let node = lineage("x", &expr, None, false).unwrap();
4979
4980 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4981 assert!(
4982 all_names.iter().any(|name| name == "t.x"),
4983 "Expected chained CTE star passthrough to reach t.x, got: {:?}",
4984 all_names
4985 );
4986 }
4987
4988 #[test]
4989 fn test_lineage_schema_less_unqualified_star_with_multiple_sources_does_not_guess() {
4990 let expr = parse("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
4991 let result = lineage("x", &expr, None, false);
4992
4993 assert!(
4994 result.is_err(),
4995 "Unqualified star over multiple sources should remain ambiguous, got: {:?}",
4996 result
4997 );
4998 }
4999
5000 #[test]
5001 fn test_lineage_cte_select_star_renamed_column() {
5002 let expr =
5005 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
5006 let node = lineage("customer_id", &expr, None, false).unwrap();
5007
5008 assert_eq!(node.name, "customer_id");
5009 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5011 assert!(
5012 all_names.len() >= 2,
5013 "Expected at least 2 nodes (customer_id → source), got: {:?}",
5014 all_names
5015 );
5016 }
5017
5018 #[test]
5019 fn test_lineage_cte_select_star_multiple_columns() {
5020 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
5022
5023 for col in &["a", "b", "c"] {
5024 let node = lineage(col, &expr, None, false).unwrap();
5025 assert_eq!(node.name, *col);
5026 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5028 assert!(
5029 all_names.len() >= 2,
5030 "Expected at least 2 nodes for column {}, got: {:?}",
5031 col,
5032 all_names
5033 );
5034 }
5035 }
5036
5037 #[test]
5038 fn test_lineage_nested_cte_select_star() {
5039 let expr = parse(
5041 "WITH cte1 AS (SELECT a FROM t), \
5042 cte2 AS (SELECT * FROM cte1) \
5043 SELECT * FROM cte2",
5044 );
5045 let node = lineage("a", &expr, None, false).unwrap();
5046
5047 assert_eq!(node.name, "a");
5048 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5049 assert!(
5050 all_names.len() >= 3,
5051 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
5052 all_names
5053 );
5054 }
5055
5056 #[test]
5057 fn test_lineage_three_level_nested_cte_star() {
5058 let expr = parse(
5060 "WITH cte1 AS (SELECT x FROM t), \
5061 cte2 AS (SELECT * FROM cte1), \
5062 cte3 AS (SELECT * FROM cte2) \
5063 SELECT * FROM cte3",
5064 );
5065 let node = lineage("x", &expr, None, false).unwrap();
5066
5067 assert_eq!(node.name, "x");
5068 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5069 assert!(
5070 all_names.len() >= 4,
5071 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
5072 all_names
5073 );
5074 }
5075
5076 #[test]
5077 fn test_lineage_cte_union_star() {
5078 let expr = parse(
5080 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
5081 SELECT * FROM cte",
5082 );
5083 let node = lineage("a", &expr, None, false).unwrap();
5084
5085 assert_eq!(node.name, "a");
5086 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5087 assert!(
5088 all_names.len() >= 2,
5089 "Expected at least 2 nodes for CTE union star, got: {:?}",
5090 all_names
5091 );
5092 }
5093
5094 #[test]
5095 fn test_lineage_cte_star_unknown_table() {
5096 let expr = parse(
5099 "WITH cte AS (SELECT * FROM unknown_table) \
5100 SELECT * FROM cte",
5101 );
5102 let _result = lineage("x", &expr, None, false);
5105 }
5106
5107 #[test]
5108 fn test_lineage_cte_explicit_columns() {
5109 let expr = parse(
5111 "WITH cte(x, y) AS (SELECT a, b FROM t) \
5112 SELECT * FROM cte",
5113 );
5114 let node = lineage("x", &expr, None, false).unwrap();
5115 assert_eq!(node.name, "x");
5116 }
5117
5118 #[test]
5119 fn test_lineage_cte_qualified_star() {
5120 let expr = parse(
5122 "WITH cte AS (SELECT a, b FROM t) \
5123 SELECT cte.* FROM cte",
5124 );
5125 for col in &["a", "b"] {
5126 let node = lineage(col, &expr, None, false).unwrap();
5127 assert_eq!(node.name, *col);
5128 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5129 assert!(
5130 all_names.len() >= 2,
5131 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
5132 col,
5133 all_names
5134 );
5135 }
5136 }
5137
5138 #[test]
5139 fn test_lineage_subquery_select_star() {
5140 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
5143 let node = lineage("x", &expr, None, false).unwrap();
5144
5145 assert_eq!(node.name, "x");
5146 assert!(
5147 !node.downstream.is_empty(),
5148 "Expected downstream nodes for subquery with SELECT *, got none"
5149 );
5150 }
5151
5152 #[test]
5153 fn test_lineage_cte_star_with_schema_external_table() {
5154 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
5156SELECT * FROM orders"#;
5157 let expr = parse(sql);
5158
5159 let mut schema = MappingSchema::new();
5160 let cols = vec![
5161 ("order_id".to_string(), DataType::Unknown),
5162 ("customer_id".to_string(), DataType::Unknown),
5163 ("amount".to_string(), DataType::Unknown),
5164 ];
5165 schema.add_table("stg_orders", &cols, None).unwrap();
5166
5167 let node =
5168 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5169 .unwrap();
5170 assert_eq!(node.name, "order_id");
5171 }
5172
5173 #[test]
5174 fn test_lineage_cte_star_with_schema_three_part_name() {
5175 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
5177SELECT * FROM orders"#;
5178 let expr = parse(sql);
5179
5180 let mut schema = MappingSchema::new();
5181 let cols = vec![
5182 ("order_id".to_string(), DataType::Unknown),
5183 ("customer_id".to_string(), DataType::Unknown),
5184 ];
5185 schema
5186 .add_table("db.schema.stg_orders", &cols, None)
5187 .unwrap();
5188
5189 let node = lineage_with_schema(
5190 "customer_id",
5191 &expr,
5192 Some(&schema as &dyn Schema),
5193 None,
5194 false,
5195 )
5196 .unwrap();
5197 assert_eq!(node.name, "customer_id");
5198 }
5199
5200 #[test]
5201 fn test_lineage_cte_star_with_schema_nested() {
5202 let sql = r#"WITH
5205 raw AS (SELECT * FROM external_table),
5206 enriched AS (SELECT * FROM raw)
5207 SELECT * FROM enriched"#;
5208 let expr = parse(sql);
5209
5210 let mut schema = MappingSchema::new();
5211 let cols = vec![
5212 ("id".to_string(), DataType::Unknown),
5213 ("name".to_string(), DataType::Unknown),
5214 ];
5215 schema.add_table("external_table", &cols, None).unwrap();
5216
5217 let node =
5218 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
5219 assert_eq!(node.name, "name");
5220 }
5221
5222 #[test]
5223 fn test_lineage_cte_qualified_star_with_schema() {
5224 let sql = r#"WITH
5227 orders AS (SELECT * FROM stg_orders),
5228 enriched AS (
5229 SELECT orders.*, 'extra' AS extra
5230 FROM orders
5231 )
5232 SELECT * FROM enriched"#;
5233 let expr = parse(sql);
5234
5235 let mut schema = MappingSchema::new();
5236 let cols = vec![
5237 ("order_id".to_string(), DataType::Unknown),
5238 ("total".to_string(), DataType::Unknown),
5239 ];
5240 schema.add_table("stg_orders", &cols, None).unwrap();
5241
5242 let node =
5243 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5244 .unwrap();
5245 assert_eq!(node.name, "order_id");
5246
5247 let extra =
5249 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
5250 assert_eq!(extra.name, "extra");
5251 }
5252
5253 #[test]
5254 fn test_lineage_cte_star_without_schema_still_works() {
5255 let sql = r#"WITH
5257 cte1 AS (SELECT id, name FROM raw_table),
5258 cte2 AS (SELECT * FROM cte1)
5259 SELECT * FROM cte2"#;
5260 let expr = parse(sql);
5261
5262 let node = lineage("id", &expr, None, false).unwrap();
5264 assert_eq!(node.name, "id");
5265 }
5266
5267 #[test]
5268 fn test_lineage_nested_cte_star_with_join_and_schema() {
5269 let sql = r#"WITH
5272base_orders AS (
5273 SELECT * FROM stg_orders
5274),
5275with_payments AS (
5276 SELECT
5277 base_orders.*,
5278 p.amount
5279 FROM base_orders
5280 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
5281),
5282final_cte AS (
5283 SELECT * FROM with_payments
5284)
5285SELECT * FROM final_cte"#;
5286 let expr = parse(sql);
5287
5288 let mut schema = MappingSchema::new();
5289 let order_cols = vec![
5290 (
5291 "order_id".to_string(),
5292 crate::expressions::DataType::Unknown,
5293 ),
5294 (
5295 "customer_id".to_string(),
5296 crate::expressions::DataType::Unknown,
5297 ),
5298 ("status".to_string(), crate::expressions::DataType::Unknown),
5299 ];
5300 let pay_cols = vec![
5301 (
5302 "payment_id".to_string(),
5303 crate::expressions::DataType::Unknown,
5304 ),
5305 (
5306 "order_id".to_string(),
5307 crate::expressions::DataType::Unknown,
5308 ),
5309 ("amount".to_string(), crate::expressions::DataType::Unknown),
5310 ];
5311 schema.add_table("stg_orders", &order_cols, None).unwrap();
5312 schema.add_table("stg_payments", &pay_cols, None).unwrap();
5313
5314 let node =
5316 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5317 .unwrap();
5318 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5319
5320 let has_table_qualified = all_names
5322 .iter()
5323 .any(|n| n.contains('.') && n.contains("order_id"));
5324 assert!(
5325 has_table_qualified,
5326 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
5327 all_names
5328 );
5329
5330 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
5332 .unwrap();
5333 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5334
5335 let has_table_qualified = all_names
5336 .iter()
5337 .any(|n| n.contains('.') && n.contains("amount"));
5338 assert!(
5339 has_table_qualified,
5340 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
5341 all_names
5342 );
5343 }
5344
5345 #[test]
5346 fn test_lineage_cte_alias_resolution() {
5347 let sql = r#"WITH import_stg_items AS (
5349 SELECT item_id, name, status FROM stg_items
5350)
5351SELECT base.item_id, base.status
5352FROM import_stg_items AS base"#;
5353 let expr = parse(sql);
5354
5355 let node = lineage("item_id", &expr, None, false).unwrap();
5356 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5357 assert!(
5359 all_names.iter().any(|n| n == "stg_items.item_id"),
5360 "Expected leaf 'stg_items.item_id', got: {:?}",
5361 all_names
5362 );
5363 }
5364
5365 #[test]
5366 fn test_lineage_cte_alias_with_schema_and_star() {
5367 let sql = r#"WITH import_stg AS (
5369 SELECT * FROM stg_items
5370)
5371SELECT base.item_id, base.status
5372FROM import_stg AS base"#;
5373 let expr = parse(sql);
5374
5375 let mut schema = MappingSchema::new();
5376 schema
5377 .add_table(
5378 "stg_items",
5379 &[
5380 ("item_id".to_string(), DataType::Unknown),
5381 ("name".to_string(), DataType::Unknown),
5382 ("status".to_string(), DataType::Unknown),
5383 ],
5384 None,
5385 )
5386 .unwrap();
5387
5388 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
5389 .unwrap();
5390 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5391 assert!(
5392 all_names.iter().any(|n| n == "stg_items.item_id"),
5393 "Expected leaf 'stg_items.item_id', got: {:?}",
5394 all_names
5395 );
5396 }
5397
5398 #[test]
5399 fn test_lineage_cte_alias_with_join() {
5400 let sql = r#"WITH
5402 import_users AS (SELECT id, name FROM users),
5403 import_orders AS (SELECT id, user_id, amount FROM orders)
5404SELECT u.name, o.amount
5405FROM import_users AS u
5406LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
5407 let expr = parse(sql);
5408
5409 let node = lineage("name", &expr, None, false).unwrap();
5410 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5411 assert!(
5412 all_names.iter().any(|n| n == "users.name"),
5413 "Expected leaf 'users.name', got: {:?}",
5414 all_names
5415 );
5416
5417 let node = lineage("amount", &expr, None, false).unwrap();
5418 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5419 assert!(
5420 all_names.iter().any(|n| n == "orders.amount"),
5421 "Expected leaf 'orders.amount', got: {:?}",
5422 all_names
5423 );
5424 }
5425
5426 #[test]
5431 fn test_lineage_unquoted_cte_case_insensitive() {
5432 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
5435 let node = lineage("col", &expr, None, false).unwrap();
5436 assert_eq!(node.name, "col");
5437 assert!(
5438 !node.downstream.is_empty(),
5439 "Unquoted CTE should resolve case-insensitively"
5440 );
5441 }
5442
5443 #[test]
5444 fn test_lineage_quoted_cte_case_preserved() {
5445 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
5447 let node = lineage("col", &expr, None, false).unwrap();
5448 assert_eq!(node.name, "col");
5449 assert!(
5450 !node.downstream.is_empty(),
5451 "Quoted CTE with matching case should resolve"
5452 );
5453 }
5454
5455 #[test]
5456 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
5457 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
5461 let result = lineage("col", &expr, None, false);
5464 assert!(
5465 result.is_err(),
5466 "Quoted CTE with case mismatch should not expand star: {:?}",
5467 result
5468 );
5469 }
5470
5471 #[test]
5472 fn test_lineage_mixed_quoted_unquoted_cte() {
5473 let expr = parse(
5475 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
5476 );
5477 let node = lineage("a", &expr, None, false).unwrap();
5478 assert_eq!(node.name, "a");
5479 assert!(
5480 !node.downstream.is_empty(),
5481 "Mixed quoted/unquoted CTE chain should resolve"
5482 );
5483 }
5484
5485 #[test]
5501 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
5502 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
5513 let node = lineage("col", &expr, None, false).unwrap();
5514 assert!(!node.downstream.is_empty());
5515 let child = &node.downstream[0];
5516 assert_eq!(
5518 child.source_name, "MyCte",
5519 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5520 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5521 );
5522 }
5523
5524 #[test]
5525 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
5526 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
5533 let node = lineage("col", &expr, None, false).unwrap();
5534 assert!(!node.downstream.is_empty());
5535 let child = &node.downstream[0];
5536 assert_eq!(
5538 child.source_name, "MyCte",
5539 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5540 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5541 );
5542 }
5543
5544 #[test]
5545 fn test_lineage_recursive_cte_terminates_at_base_case() {
5546 let expr = parse_dialect(
5547 "WITH RECURSIVE nums AS (\
5548 SELECT 1 AS n \
5549 UNION ALL \
5550 SELECT n + 1 FROM nums WHERE n < 5\
5551 ) SELECT n FROM nums",
5552 DialectType::DuckDB,
5553 );
5554 let node = lineage("n", &expr, Some(DialectType::DuckDB), false).unwrap();
5555 let names = lineage_names(&node);
5556
5557 assert!(
5558 names.len() <= 12,
5559 "recursive CTE lineage should not unroll repeatedly, got {names:?}"
5560 );
5561 assert!(
5562 node.walk()
5563 .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "nums"),
5564 "expected recursive source to be marked as a CTE, got {names:?}"
5565 );
5566 }
5567
5568 #[test]
5569 fn test_lineage_window_partition_and_order_columns() {
5570 let expr = parse(
5571 "WITH c AS (SELECT user_id, ts FROM events) \
5572 SELECT ROW_NUMBER() OVER (PARTITION BY c.user_id ORDER BY c.ts) AS out FROM c",
5573 );
5574 let node = lineage("out", &expr, None, false).unwrap();
5575
5576 assert_lineage_contains(&node, "events.user_id");
5577 assert_lineage_contains(&node, "events.ts");
5578 }
5579
5580 #[test]
5581 fn test_lineage_window_aggregate_order_column() {
5582 let expr = parse(
5583 "WITH c AS (SELECT amount, d FROM txns) \
5584 SELECT SUM(c.amount) OVER (ORDER BY c.d) AS running FROM c",
5585 );
5586 let node = lineage("running", &expr, None, false).unwrap();
5587
5588 assert_lineage_contains(&node, "txns.amount");
5589 assert_lineage_contains(&node, "txns.d");
5590 }
5591
5592 #[test]
5593 fn test_lineage_named_window_columns() {
5594 let expr = parse(
5595 "SELECT ROW_NUMBER() OVER w AS out \
5596 FROM events \
5597 WINDOW w AS (PARTITION BY user_id ORDER BY ts)",
5598 );
5599 let node = lineage("out", &expr, None, false).unwrap();
5600
5601 assert_lineage_contains(&node, "events.user_id");
5602 assert_lineage_contains(&node, "events.ts");
5603 }
5604
5605 #[test]
5606 fn test_lineage_within_group_order_column() {
5607 let expr =
5608 parse("SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS p FROM txns");
5609 let node = lineage("p", &expr, None, false).unwrap();
5610
5611 assert_lineage_contains(&node, "txns.amount");
5612 }
5613
5614 #[test]
5615 fn test_lineage_query_wrappers_resolve_inner_select() {
5616 for sql in [
5617 "CREATE TABLE tgt AS SELECT x FROM src",
5618 "CREATE VIEW v AS SELECT x FROM src",
5619 "INSERT INTO tgt SELECT x FROM src",
5620 ] {
5621 let expr = parse(sql);
5622 let node = lineage("x", &expr, None, false).unwrap();
5623 assert_lineage_contains(&node, "src.x");
5624 }
5625 }
5626
5627 #[test]
5628 fn test_lineage_scalar_subquery_through_cte_reaches_base_table() {
5629 let expr = parse(
5630 "WITH c AS (SELECT x FROM t) \
5631 SELECT (SELECT SUM(x) FROM c) AS s FROM c LIMIT 1",
5632 );
5633 let node = lineage("s", &expr, None, false).unwrap();
5634
5635 assert_lineage_contains(&node, "t.x");
5636 assert!(
5637 node.walk()
5638 .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "c"),
5639 "expected scalar subquery CTE hop in lineage, got {:?}",
5640 lineage_names(&node)
5641 );
5642 }
5643
5644 #[test]
5645 fn test_lineage_scalar_subqueries_inside_expression_wrappers() {
5646 for sql in [
5647 "WITH c AS (SELECT a, b FROM t) \
5648 SELECT CASE WHEN c.a > 0 THEN c.b ELSE (SELECT MAX(z) FROM o) END AS r FROM c",
5649 "WITH c AS (SELECT a FROM t) \
5650 SELECT COALESCE(c.a, (SELECT MAX(z) FROM o)) AS r FROM c",
5651 "WITH c AS (SELECT a FROM t) \
5652 SELECT CAST((SELECT MAX(z) FROM o) AS INT) + c.a AS r FROM c",
5653 "WITH c AS (SELECT a FROM t) \
5654 SELECT CASE WHEN c.a BETWEEN 0 AND (SELECT MAX(z) FROM o) THEN c.a END AS r FROM c",
5655 ] {
5656 let expr = parse_dialect(sql, DialectType::DuckDB);
5657 let node = lineage("r", &expr, Some(DialectType::DuckDB), false)
5658 .unwrap_or_else(|error| panic!("lineage failed for {sql}: {error}"));
5659
5660 assert_lineage_contains(&node, "o.z");
5661 assert_lineage_contains(&node, "t.a");
5662 }
5663 }
5664
5665 #[test]
5666 fn test_lineage_nested_set_operation_inside_derived_table() {
5667 let expr = parse_dialect(
5668 "SELECT v FROM ((SELECT v FROM t1 UNION ALL SELECT v FROM t2) \
5669 UNION ALL SELECT v FROM t3) u",
5670 DialectType::DuckDB,
5671 );
5672 let node = lineage("v", &expr, Some(DialectType::DuckDB), false).unwrap();
5673
5674 assert_lineage_contains(&node, "t1.v");
5675 assert_lineage_contains(&node, "t2.v");
5676 assert_lineage_contains(&node, "t3.v");
5677 }
5678
5679 #[test]
5680 fn test_lineage_select_alias_reference_resolves_to_alias_source() {
5681 let expr = parse_dialect(
5682 "WITH c AS (SELECT x FROM t) SELECT c.x AS a, a + 1 AS b FROM c",
5683 DialectType::DuckDB,
5684 );
5685 let node = lineage("b", &expr, Some(DialectType::DuckDB), false).unwrap();
5686
5687 assert_lineage_contains(&node, "t.x");
5688 }
5689
5690 #[test]
5691 fn test_lineage_pivot_output_resolves_aggregation_input() {
5692 let expr = parse_dialect(
5693 "SELECT * FROM (SELECT region, q, amt FROM sales) \
5694 PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5695 DialectType::DuckDB,
5696 );
5697 let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5698
5699 assert_lineage_contains(&node, "sales.amt");
5700 }
5701
5702 #[test]
5703 fn test_lineage_pivot_multi_aggregate_and_alias_columns() {
5704 let multi = parse_dialect(
5705 "SELECT * FROM (SELECT category, value, price FROM t) \
5706 PIVOT(SUM(value) AS value_sum, MAX(price) FOR category IN ('a' AS cat_a, 'b'))",
5707 DialectType::DuckDB,
5708 );
5709 let value_sum =
5710 lineage("cat_a_value_sum", &multi, Some(DialectType::DuckDB), false).unwrap();
5711 assert_lineage_contains(&value_sum, "t.value");
5712
5713 let max_price =
5714 lineage("cat_a_max(price)", &multi, Some(DialectType::DuckDB), false).unwrap();
5715 assert_lineage_contains(&max_price, "t.price");
5716
5717 let aliased = parse_dialect(
5718 "SELECT * FROM (SELECT region, q, amt FROM sales) \
5719 PIVOT(SUM(amt) FOR q IN ('Q1')) AS p(region2, p1)",
5720 DialectType::DuckDB,
5721 );
5722 let region = lineage("region2", &aliased, Some(DialectType::DuckDB), false).unwrap();
5723 assert_lineage_contains(®ion, "sales.region");
5724
5725 let pivot_value = lineage("p1", &aliased, Some(DialectType::DuckDB), false).unwrap();
5726 assert_lineage_contains(&pivot_value, "sales.amt");
5727 }
5728
5729 #[test]
5730 fn test_lineage_pivot_through_cte_resolves_aggregation_input() {
5731 let expr = parse_dialect(
5732 "WITH src AS (SELECT region, q, amt FROM sales) \
5733 SELECT q1 FROM src PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5734 DialectType::DuckDB,
5735 );
5736 let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5737
5738 assert_lineage_contains(&node, "sales.amt");
5739 }
5740
5741 #[test]
5742 fn test_lineage_unpivot_value_resolves_input_columns() {
5743 let expr = parse_dialect(
5744 "SELECT name, val FROM t UNPIVOT(val FOR col IN (a, b, c))",
5745 DialectType::DuckDB,
5746 );
5747 let node = lineage("val", &expr, Some(DialectType::DuckDB), false).unwrap();
5748
5749 assert_lineage_contains(&node, "t.a");
5750 assert_lineage_contains(&node, "t.b");
5751 assert_lineage_contains(&node, "t.c");
5752 }
5753
5754 #[test]
5755 fn test_lineage_unpivot_multi_value_columns_resolve_positionally() {
5756 let expr = parse_dialect(
5757 "SELECT first_half_sales, second_half_sales, semester \
5758 FROM produce \
5759 UNPIVOT((first_half_sales, second_half_sales) \
5760 FOR semester IN ((q1, q2) AS 'semester_1', (q3, q4) AS 'semester_2'))",
5761 DialectType::BigQuery,
5762 );
5763
5764 let first = lineage(
5765 "first_half_sales",
5766 &expr,
5767 Some(DialectType::BigQuery),
5768 false,
5769 )
5770 .unwrap();
5771 assert_lineage_contains(&first, "produce.q1");
5772 assert_lineage_contains(&first, "produce.q3");
5773
5774 let second = lineage(
5775 "second_half_sales",
5776 &expr,
5777 Some(DialectType::BigQuery),
5778 false,
5779 )
5780 .unwrap();
5781 assert_lineage_contains(&second, "produce.q2");
5782 assert_lineage_contains(&second, "produce.q4");
5783 }
5784
5785 #[test]
5786 fn test_lineage_top_level_union_over_ctes_reaches_base_tables() {
5787 let expr = parse(
5788 "WITH a AS (SELECT x FROM t1), b AS (SELECT x FROM t2) \
5789 SELECT x FROM a UNION SELECT x FROM b",
5790 );
5791 let node = lineage("x", &expr, None, false).unwrap();
5792
5793 assert_lineage_contains(&node, "t1.x");
5794 assert_lineage_contains(&node, "t2.x");
5795 }
5796
5797 #[test]
5798 fn test_lineage_star_excludes_semi_join_rhs_source() {
5799 let expr = parse_dialect(
5800 "SELECT * FROM orders LEFT SEMI JOIN customers ON orders.customer_id = customers.id",
5801 DialectType::DuckDB,
5802 );
5803 let node = lineage("customer_id", &expr, Some(DialectType::DuckDB), false).unwrap();
5804
5805 assert_lineage_contains(&node, "orders.customer_id");
5806 }
5807
5808 #[test]
5815 #[ignore = "requires derived table star expansion (separate issue)"]
5816 fn test_node_name_doesnt_contain_comment() {
5817 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
5818 let node = lineage("x", &expr, None, false).unwrap();
5819
5820 assert_eq!(node.name, "x");
5821 assert!(!node.downstream.is_empty());
5822 }
5823
5824 #[test]
5828 fn test_comment_before_first_column_in_cte() {
5829 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
5830 let sql_without_comment = "with t as (select 1 as a) select a from t";
5831
5832 let expr_ok = parse(sql_without_comment);
5834 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
5835
5836 let expr_comment = parse(sql_with_comment);
5838 let node_comment = lineage("a", &expr_comment, None, false)
5839 .expect("with comment before first column should succeed");
5840
5841 assert_eq!(node_ok.name, node_comment.name, "node names should match");
5842 assert_eq!(
5843 node_ok.downstream_names(),
5844 node_comment.downstream_names(),
5845 "downstream lineage should be identical with or without comment"
5846 );
5847 }
5848
5849 #[test]
5851 fn test_block_comment_before_first_column() {
5852 let sql = "with t as (select 1 as a) select /* section */ a from t";
5853 let expr = parse(sql);
5854 let node = lineage("a", &expr, None, false)
5855 .expect("block comment before first column should succeed");
5856 assert_eq!(node.name, "a");
5857 assert!(
5858 !node.downstream.is_empty(),
5859 "should have downstream lineage"
5860 );
5861 }
5862
5863 #[test]
5865 fn test_comment_before_first_column_second_col_ok() {
5866 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
5867 let expr = parse(sql);
5868
5869 let node_a =
5870 lineage("a", &expr, None, false).expect("column a with comment should succeed");
5871 assert_eq!(node_a.name, "a");
5872
5873 let node_b =
5874 lineage("b", &expr, None, false).expect("column b with comment should succeed");
5875 assert_eq!(node_b.name, "b");
5876 }
5877
5878 #[test]
5880 fn test_comment_before_aliased_column() {
5881 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
5882 let expr = parse(sql);
5883 let node =
5884 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
5885 assert_eq!(node.name, "y");
5886 assert!(
5887 !node.downstream.is_empty(),
5888 "aliased column should have downstream lineage"
5889 );
5890 }
5891}