1use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22 pub name: String,
24 pub expression: Expression,
26 pub source: Expression,
28 pub downstream: Vec<LineageNode>,
30 pub source_name: String,
32 pub source_kind: SourceKind,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub source_alias: Option<String>,
37 pub reference_node_name: String,
39}
40
41impl LineageNode {
42 pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44 Self {
45 name: name.into(),
46 expression,
47 source,
48 downstream: Vec::new(),
49 source_name: String::new(),
50 source_kind: SourceKind::Unknown,
51 source_alias: None,
52 reference_node_name: String::new(),
53 }
54 }
55
56 pub fn walk(&self) -> LineageWalker<'_> {
58 LineageWalker { stack: vec![self] }
59 }
60
61 pub fn downstream_names(&self) -> Vec<String> {
63 self.downstream.iter().map(|n| n.name.clone()).collect()
64 }
65}
66
67fn source_kind_for_scope_context(
68 scope: &Scope,
69 source_name: &str,
70 reference_node_name: &str,
71) -> SourceKind {
72 if source_name.is_empty() && reference_node_name.is_empty() {
73 return SourceKind::Root;
74 }
75 if let Some(source_info) = scope.sources.get(source_name) {
76 return source_info.kind;
77 }
78 if scope.cte_sources.contains_key(source_name) {
79 return SourceKind::Cte;
80 }
81 match scope.scope_type {
82 ScopeType::Cte => SourceKind::Cte,
83 ScopeType::DerivedTable => SourceKind::DerivedTable,
84 ScopeType::Udtf => SourceKind::Virtual,
85 _ => SourceKind::Unknown,
86 }
87}
88
89fn apply_scope_context(
90 node: &mut LineageNode,
91 scope: &Scope,
92 source_name: &str,
93 reference_node_name: &str,
94) {
95 node.source_name = source_name.to_string();
96 node.reference_node_name = reference_node_name.to_string();
97 node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100pub struct LineageWalker<'a> {
102 stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106 type Item = &'a LineageNode;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 if let Some(node) = self.stack.pop() {
110 for child in node.downstream.iter().rev() {
112 self.stack.push(child);
113 }
114 Some(node)
115 } else {
116 None
117 }
118 }
119}
120
121enum ColumnRef<'a> {
127 Name(&'a str),
128 Index(usize),
129}
130
131pub fn lineage(
157 column: &str,
158 sql: &Expression,
159 dialect: Option<DialectType>,
160 trim_selects: bool,
161) -> Result<LineageNode> {
162 let effective_sql = lineage_effective_expression(sql);
164 let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165 if !has_with {
166 return lineage_from_expression(column, sql, dialect, trim_selects);
167 }
168 let mut owned = sql.clone();
169 expand_cte_stars(&mut owned, None);
170 lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173pub fn lineage_with_schema(
189 column: &str,
190 sql: &Expression,
191 schema: Option<&dyn Schema>,
192 dialect: Option<DialectType>,
193 trim_selects: bool,
194) -> Result<LineageNode> {
195 let mut qualified_expression = if let Some(schema) = schema {
196 let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197 QualifyColumnsOptions::new().with_dialect(dialect_type)
198 } else {
199 QualifyColumnsOptions::new()
200 };
201
202 qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203 Error::internal(format!("Lineage qualification failed with schema: {}", e))
204 })?
205 } else {
206 sql.clone()
207 };
208
209 annotate_types(&mut qualified_expression, schema, dialect);
211
212 expand_cte_stars(&mut qualified_expression, schema);
215
216 lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220 column: &str,
221 sql: &Expression,
222 dialect: Option<DialectType>,
223 trim_selects: bool,
224) -> Result<LineageNode> {
225 let sql = lineage_effective_expression(sql);
226 let scope = build_scope(sql);
227 to_node(
228 ColumnRef::Name(column),
229 &scope,
230 dialect,
231 "",
232 "",
233 "",
234 trim_selects,
235 )
236}
237
238pub(crate) fn lineage_by_index_from_expression(
239 column_index: usize,
240 sql: &Expression,
241 dialect: Option<DialectType>,
242 trim_selects: bool,
243) -> Result<LineageNode> {
244 let sql = lineage_effective_expression(sql);
245 let scope = build_scope(sql);
246 to_node(
247 ColumnRef::Index(column_index),
248 &scope,
249 dialect,
250 "",
251 "",
252 "",
253 trim_selects,
254 )
255}
256
257fn lineage_effective_expression(sql: &Expression) -> &Expression {
258 match sql {
259 Expression::Prepare(prepare) => &prepare.statement,
260 _ => sql,
261 }
262}
263
264fn normalize_cte_name(ident: &Identifier) -> String {
274 if ident.quoted {
275 ident.name.clone()
276 } else {
277 ident.name.to_lowercase()
278 }
279}
280
281pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
293 if let Expression::Prepare(prepare) = expr {
294 expand_cte_stars(&mut prepare.statement, schema);
295 return;
296 }
297
298 let select = match expr {
299 Expression::Select(s) => s,
300 _ => return,
301 };
302
303 let with = match &mut select.with {
304 Some(w) => w,
305 None => return,
306 };
307
308 let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
309
310 for cte in &mut with.ctes {
311 let cte_name = normalize_cte_name(&cte.alias);
312
313 if !cte.columns.is_empty() {
315 let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
316 resolved_cte_columns.insert(cte_name, cols);
317 continue;
318 }
319
320 if with.recursive {
326 let is_self_referencing =
327 if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
328 let body_sources = get_select_sources(body_select);
329 body_sources.iter().any(|s| s.normalized == cte_name)
330 } else {
331 false
332 };
333 if is_self_referencing {
334 continue;
335 }
336 }
337
338 let body_select = match get_leftmost_select_mut(&mut cte.this) {
340 Some(s) => s,
341 None => continue,
342 };
343
344 let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
345 resolved_cte_columns.insert(cte_name, columns);
346 }
347
348 rewrite_stars_in_select(select, &resolved_cte_columns, schema);
350}
351
352fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
357 let mut current = expr;
358 for _ in 0..MAX_LINEAGE_DEPTH {
359 match current {
360 Expression::Select(s) => return Some(s),
361 Expression::Union(u) => current = &mut u.left,
362 Expression::Intersect(i) => current = &mut i.left,
363 Expression::Except(e) => current = &mut e.left,
364 Expression::Paren(p) => current = &mut p.this,
365 _ => return None,
366 }
367 }
368 None
369}
370
371fn rewrite_stars_in_select(
375 select: &mut Select,
376 resolved_ctes: &HashMap<String, Vec<String>>,
377 schema: Option<&dyn Schema>,
378) -> Vec<String> {
379 let has_star = select
384 .expressions
385 .iter()
386 .any(|e| matches!(e, Expression::Star(_)));
387 let has_qualified_star = select
388 .expressions
389 .iter()
390 .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
391
392 if !has_star && !has_qualified_star {
393 return select
395 .expressions
396 .iter()
397 .filter_map(get_expression_output_name)
398 .collect();
399 }
400
401 let sources = get_select_sources(select);
402 let mut new_expressions = Vec::new();
403 let mut result_columns = Vec::new();
404
405 for expr in &select.expressions {
406 match expr {
407 Expression::Star(star) => {
408 let qual = star.table.as_ref();
409 if let Some(expanded) =
410 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
411 {
412 for (src_alias, col_name) in &expanded {
413 let table_id = Identifier::new(src_alias);
414 new_expressions.push(make_column_expr(col_name, Some(&table_id)));
415 result_columns.push(col_name.clone());
416 }
417 } else {
418 new_expressions.push(expr.clone());
419 result_columns.push("*".to_string());
420 }
421 }
422 Expression::Column(c) if c.name.name == "*" => {
423 let qual = c.table.as_ref();
424 if let Some(expanded) =
425 expand_star_from_sources(qual, &sources, resolved_ctes, schema)
426 {
427 for (_src_alias, col_name) in &expanded {
428 new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
430 result_columns.push(col_name.clone());
431 }
432 } else {
433 new_expressions.push(expr.clone());
434 result_columns.push("*".to_string());
435 }
436 }
437 _ => {
438 new_expressions.push(expr.clone());
439 if let Some(name) = get_expression_output_name(expr) {
440 result_columns.push(name);
441 }
442 }
443 }
444 }
445
446 select.expressions = new_expressions;
447 result_columns
448}
449
450fn expand_star_from_sources(
455 qualifier: Option<&Identifier>,
456 sources: &[SourceInfo],
457 resolved_ctes: &HashMap<String, Vec<String>>,
458 schema: Option<&dyn Schema>,
459) -> Option<Vec<(String, String)>> {
460 let mut expanded = Vec::new();
461
462 if let Some(qual) = qualifier {
463 let qual_normalized = normalize_cte_name(qual);
465 for src in sources {
466 if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
467 if let Some(cols) = resolved_ctes.get(&src.normalized) {
469 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
470 return Some(expanded);
471 }
472 if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
474 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
475 return Some(expanded);
476 }
477 }
478 }
479 None
480 } else {
481 let mut any_expanded = false;
487 for src in sources {
488 if let Some(cols) = resolved_ctes.get(&src.normalized) {
489 expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
490 any_expanded = true;
491 } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
492 expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
493 any_expanded = true;
494 } else {
495 return None;
496 }
497 }
498 if any_expanded {
499 Some(expanded)
500 } else {
501 None
502 }
503 }
504}
505
506fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
508 let schema = schema?;
509 if fq_name.is_empty() {
510 return None;
511 }
512 schema
513 .column_names(fq_name)
514 .ok()
515 .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
516}
517
518fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
520 Expression::Column(Box::new(crate::expressions::Column {
521 name: Identifier::new(name),
522 table: table.cloned(),
523 join_mark: false,
524 trailing_comments: Vec::new(),
525 span: None,
526 inferred_type: None,
527 }))
528}
529
530fn get_expression_output_name(expr: &Expression) -> Option<String> {
532 match expr {
533 Expression::Alias(a) => Some(a.alias.name.clone()),
534 Expression::Column(c) => Some(c.name.name.clone()),
535 Expression::Identifier(id) => Some(id.name.clone()),
536 Expression::Star(_) => Some("*".to_string()),
537 _ => None,
538 }
539}
540
541struct SourceInfo {
543 alias: String,
544 quoted: bool,
551 normalized: String,
553 fq_name: String,
555}
556
557fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
560 let mut sources = Vec::new();
561
562 fn extract_source(expr: &Expression) -> Option<SourceInfo> {
563 fn virtual_source_info(alias: &Identifier) -> SourceInfo {
564 SourceInfo {
565 alias: alias.name.clone(),
566 quoted: alias.quoted,
567 normalized: normalize_cte_name(alias),
568 fq_name: alias.name.clone(),
569 }
570 }
571
572 fn named_virtual_source_info(alias: &str) -> SourceInfo {
573 SourceInfo {
574 alias: alias.to_string(),
575 quoted: false,
576 normalized: alias.to_lowercase(),
577 fq_name: alias.to_string(),
578 }
579 }
580
581 match expr {
582 Expression::Table(t) => {
583 let normalized = normalize_cte_name(&t.name);
584 let alias = t
585 .alias
586 .as_ref()
587 .map(|a| a.name.clone())
588 .unwrap_or_else(|| t.name.name.clone());
589 let mut parts = Vec::new();
590 if let Some(catalog) = &t.catalog {
591 parts.push(catalog.name.clone());
592 }
593 if let Some(schema) = &t.schema {
594 parts.push(schema.name.clone());
595 }
596 parts.push(t.name.name.clone());
597 let fq_name = parts.join(".");
598 Some(SourceInfo {
599 alias,
600 quoted: t.name.quoted,
601 normalized,
602 fq_name,
603 })
604 }
605 Expression::Subquery(s) => {
606 let alias_identifier = s.alias.as_ref()?;
607 let alias = alias_identifier.name.clone();
608 let normalized = alias.to_lowercase();
609 let fq_name = alias.clone();
610 Some(SourceInfo {
611 alias,
612 quoted: alias_identifier.quoted,
613 normalized,
614 fq_name,
615 })
616 }
617 Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
618 Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
619 Some(virtual_source_info(&a.alias))
620 }
621 Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
622 Expression::LateralView(lateral_view) => lateral_view
623 .table_alias
624 .as_ref()
625 .or_else(|| lateral_view.column_aliases.first())
626 .map(virtual_source_info),
627 Expression::Paren(p) => extract_source(&p.this),
628 _ => None,
629 }
630 }
631
632 if let Some(from) = &select.from {
633 for expr in &from.expressions {
634 if let Some(info) = extract_source(expr) {
635 sources.push(info);
636 }
637 }
638 }
639 for join in &select.joins {
640 if let Some(info) = extract_source(&join.this) {
641 sources.push(info);
642 }
643 }
644 for lateral_view in &select.lateral_views {
645 if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
646 {
647 sources.push(info);
648 }
649 }
650 sources
651}
652
653pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
655 let mut tables = HashSet::new();
656 collect_source_tables(node, &mut tables);
657 tables
658}
659
660pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
662 if let Expression::Table(table) = &node.source {
663 tables.insert(table.name.name.clone());
664 }
665 for child in &node.downstream {
666 collect_source_tables(child, tables);
667 }
668}
669
670const MAX_LINEAGE_DEPTH: usize = 64;
677
678fn to_node(
680 column: ColumnRef<'_>,
681 scope: &Scope,
682 dialect: Option<DialectType>,
683 scope_name: &str,
684 source_name: &str,
685 reference_node_name: &str,
686 trim_selects: bool,
687) -> Result<LineageNode> {
688 to_node_inner(
689 column,
690 scope,
691 dialect,
692 scope_name,
693 source_name,
694 reference_node_name,
695 trim_selects,
696 &[],
697 0,
698 )
699}
700
701fn to_node_inner(
702 column: ColumnRef<'_>,
703 scope: &Scope,
704 dialect: Option<DialectType>,
705 scope_name: &str,
706 source_name: &str,
707 reference_node_name: &str,
708 trim_selects: bool,
709 ancestor_cte_scopes: &[Scope],
710 depth: usize,
711) -> Result<LineageNode> {
712 if depth > MAX_LINEAGE_DEPTH {
713 return Err(Error::internal(format!(
714 "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
715 )));
716 }
717 let scope_expr = &scope.expression;
718
719 let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
721 for s in ancestor_cte_scopes {
722 all_cte_scopes.push(s);
723 }
724
725 let effective_expr = match scope_expr {
728 Expression::Cte(cte) => &cte.this,
729 other => other,
730 };
731
732 if matches!(
734 effective_expr,
735 Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
736 ) {
737 if matches!(scope_expr, Expression::Cte(_)) {
739 let mut inner_scope = Scope::new(effective_expr.clone());
740 inner_scope.union_scopes = scope.union_scopes.clone();
741 inner_scope.sources = scope.sources.clone();
742 inner_scope.cte_sources = scope.cte_sources.clone();
743 inner_scope.cte_scopes = scope.cte_scopes.clone();
744 inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
745 inner_scope.subquery_scopes = scope.subquery_scopes.clone();
746 return handle_set_operation(
747 &column,
748 &inner_scope,
749 dialect,
750 scope_name,
751 source_name,
752 reference_node_name,
753 trim_selects,
754 ancestor_cte_scopes,
755 depth,
756 );
757 }
758 return handle_set_operation(
759 &column,
760 scope,
761 dialect,
762 scope_name,
763 source_name,
764 reference_node_name,
765 trim_selects,
766 ancestor_cte_scopes,
767 depth,
768 );
769 }
770
771 let select_expr = find_select_expr(effective_expr, &column, dialect)?;
773 let column_name = resolve_column_name(&column, &select_expr);
774
775 let node_source = if trim_selects {
777 trim_source(effective_expr, &select_expr)
778 } else {
779 effective_expr.clone()
780 };
781
782 let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
784 apply_scope_context(&mut node, scope, source_name, reference_node_name);
785
786 if let Expression::Star(star) = &select_expr {
788 let star_table = star
789 .table
790 .as_ref()
791 .map(|identifier| identifier.name.as_str());
792 for (name, source_info) in &scope.sources {
793 if let Some(star_table) = star_table {
794 let table_matches = name.eq_ignore_ascii_case(star_table)
795 || source_info
796 .alias
797 .as_deref()
798 .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
799 || matches!(
800 &source_info.expression,
801 Expression::Table(table_ref)
802 if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
803 );
804 if !table_matches {
805 continue;
806 }
807 }
808
809 let mut child = LineageNode::new(
810 format!("{}.*", name),
811 Expression::Star(crate::expressions::Star {
812 table: star.table.clone(),
813 except: None,
814 replace: None,
815 rename: None,
816 trailing_comments: vec![],
817 span: None,
818 }),
819 source_info.expression.clone(),
820 );
821 apply_source_info_context(&mut child, name, source_info);
822 node.downstream.push(child);
823 }
824 return Ok(node);
825 }
826
827 let subqueries: Vec<&Expression> =
829 select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
830 for sq_expr in subqueries {
831 if let Expression::Subquery(sq) = sq_expr {
832 for sq_scope in &scope.subquery_scopes {
833 if sq_scope.expression == sq.this {
834 if let Ok(child) = to_node_inner(
835 ColumnRef::Index(0),
836 sq_scope,
837 dialect,
838 &column_name,
839 "",
840 "",
841 trim_selects,
842 ancestor_cte_scopes,
843 depth + 1,
844 ) {
845 node.downstream.push(child);
846 }
847 break;
848 }
849 }
850 }
851 }
852
853 let col_refs = find_column_refs_in_expr(&select_expr, dialect);
855 for col_ref in col_refs {
856 let col_name = &col_ref.column;
857 if let Some(ref table_id) = col_ref.table {
858 let tbl = &table_id.name;
859 resolve_qualified_column(
860 &mut node,
861 scope,
862 dialect,
863 tbl,
864 col_name,
865 &column_name,
866 trim_selects,
867 &all_cte_scopes,
868 depth,
869 );
870 } else {
871 resolve_unqualified_column(
872 &mut node,
873 scope,
874 dialect,
875 col_name,
876 &column_name,
877 trim_selects,
878 &all_cte_scopes,
879 depth,
880 );
881 }
882 }
883
884 Ok(node)
885}
886
887fn handle_set_operation(
892 column: &ColumnRef<'_>,
893 scope: &Scope,
894 dialect: Option<DialectType>,
895 scope_name: &str,
896 source_name: &str,
897 reference_node_name: &str,
898 trim_selects: bool,
899 ancestor_cte_scopes: &[Scope],
900 depth: usize,
901) -> Result<LineageNode> {
902 let scope_expr = &scope.expression;
903
904 let col_index = match column {
906 ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
907 ColumnRef::Index(i) => *i,
908 };
909
910 let col_name = match column {
911 ColumnRef::Name(name) => name.to_string(),
912 ColumnRef::Index(_) => format!("_{col_index}"),
913 };
914
915 let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
916 apply_scope_context(&mut node, scope, source_name, reference_node_name);
917
918 for branch_scope in &scope.union_scopes {
920 if let Ok(child) = to_node_inner(
921 ColumnRef::Index(col_index),
922 branch_scope,
923 dialect,
924 scope_name,
925 "",
926 "",
927 trim_selects,
928 ancestor_cte_scopes,
929 depth + 1,
930 ) {
931 node.downstream.push(child);
932 }
933 }
934
935 Ok(node)
936}
937
938fn resolve_qualified_column(
943 node: &mut LineageNode,
944 scope: &Scope,
945 dialect: Option<DialectType>,
946 table: &str,
947 col_name: &str,
948 parent_name: &str,
949 trim_selects: bool,
950 all_cte_scopes: &[&Scope],
951 depth: usize,
952) {
953 let resolved_cte_name = resolve_cte_alias(scope, table);
956 let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
957
958 let is_cte = scope.cte_sources.contains_key(effective_table)
961 || all_cte_scopes.iter().any(
962 |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
963 );
964 if is_cte {
965 if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
966 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
968 if let Ok(child) = to_node_inner(
969 ColumnRef::Name(col_name),
970 child_scope,
971 dialect,
972 parent_name,
973 effective_table,
974 parent_name,
975 trim_selects,
976 &ancestors,
977 depth + 1,
978 ) {
979 node.downstream.push(child);
980 return;
981 }
982 }
983 }
984
985 if let Some(source_info) = scope.sources.get(table) {
987 if source_info.is_scope {
988 if let Some(child_scope) = find_child_scope(scope, table) {
989 let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
990 if let Ok(child) = to_node_inner(
991 ColumnRef::Name(col_name),
992 child_scope,
993 dialect,
994 parent_name,
995 table,
996 parent_name,
997 trim_selects,
998 &ancestors,
999 depth + 1,
1000 ) {
1001 node.downstream.push(child);
1002 return;
1003 }
1004 }
1005 }
1006 }
1007
1008 if let Some(source_info) = scope.sources.get(table) {
1011 if !source_info.is_scope {
1012 let mut child = make_table_column_node_from_source(table, col_name, source_info);
1013 if source_info.kind == SourceKind::Virtual {
1014 attach_virtual_source_dependencies(
1015 &mut child,
1016 scope,
1017 dialect,
1018 table,
1019 &source_info.expression,
1020 trim_selects,
1021 all_cte_scopes,
1022 depth,
1023 );
1024 }
1025 node.downstream.push(child);
1026 return;
1027 }
1028 }
1029
1030 node.downstream
1032 .push(make_table_column_node(table, col_name));
1033}
1034
1035fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1041 if scope.cte_sources.contains_key(name) {
1043 return None;
1044 }
1045 if let Some(source_info) = scope.sources.get(name) {
1047 if source_info.is_scope {
1048 if let Expression::Cte(cte) = &source_info.expression {
1049 let cte_name = &cte.alias.name;
1050 if scope.cte_sources.contains_key(cte_name) {
1051 return Some(cte_name.clone());
1052 }
1053 }
1054 }
1055 }
1056 None
1057}
1058
1059fn resolve_unqualified_column(
1060 node: &mut LineageNode,
1061 scope: &Scope,
1062 dialect: Option<DialectType>,
1063 col_name: &str,
1064 parent_name: &str,
1065 trim_selects: bool,
1066 all_cte_scopes: &[&Scope],
1067 depth: usize,
1068) {
1069 let from_source_names = source_names_from_from_join(scope);
1073
1074 if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1075 resolve_qualified_column(
1076 node,
1077 scope,
1078 dialect,
1079 &tbl,
1080 col_name,
1081 parent_name,
1082 trim_selects,
1083 all_cte_scopes,
1084 depth,
1085 );
1086 return;
1087 }
1088
1089 if from_source_names.len() == 1 {
1090 let tbl = &from_source_names[0];
1091 resolve_qualified_column(
1092 node,
1093 scope,
1094 dialect,
1095 tbl,
1096 col_name,
1097 parent_name,
1098 trim_selects,
1099 all_cte_scopes,
1100 depth,
1101 );
1102 return;
1103 }
1104
1105 let child = LineageNode::new(
1107 col_name.to_string(),
1108 Expression::Column(Box::new(crate::expressions::Column {
1109 name: crate::expressions::Identifier::new(col_name.to_string()),
1110 table: None,
1111 join_mark: false,
1112 trailing_comments: vec![],
1113 span: None,
1114 inferred_type: None,
1115 })),
1116 node.source.clone(),
1117 );
1118 node.downstream.push(child);
1119}
1120
1121fn unique_virtual_source_for_column(
1122 scope: &Scope,
1123 source_names: &[String],
1124 col_name: &str,
1125) -> Option<String> {
1126 let mut matches = source_names.iter().filter_map(|source_name| {
1127 let source = scope.sources.get(source_name)?;
1128 if source.kind == SourceKind::Virtual
1129 && virtual_source_output_columns(source)
1130 .any(|column| column.eq_ignore_ascii_case(col_name))
1131 {
1132 Some(source_name.clone())
1133 } else {
1134 None
1135 }
1136 });
1137
1138 let first = matches.next()?;
1139 if matches.next().is_none() {
1140 Some(first)
1141 } else {
1142 None
1143 }
1144}
1145
1146fn virtual_source_output_columns(
1147 source_info: &ScopeSourceInfo,
1148) -> Box<dyn Iterator<Item = String> + '_> {
1149 match &source_info.expression {
1150 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1151 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1152 Box::new(alias_output_columns(alias))
1153 }
1154 Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1155 Expression::LateralView(lateral_view) => {
1156 Box::new(lateral_view_output_columns(lateral_view))
1157 }
1158 _ => Box::new(source_info.alias.clone().into_iter()),
1159 }
1160}
1161
1162fn unnest_output_columns(
1163 unnest: &crate::expressions::UnnestFunc,
1164) -> impl Iterator<Item = String> + '_ {
1165 unnest
1166 .alias
1167 .iter()
1168 .map(|alias| alias.name.clone())
1169 .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1170}
1171
1172fn alias_output_columns(
1173 alias: &crate::expressions::Alias,
1174) -> Box<dyn Iterator<Item = String> + '_> {
1175 if alias.column_aliases.is_empty() {
1176 Box::new(std::iter::once(alias.alias.name.clone()))
1177 } else {
1178 Box::new(
1179 alias
1180 .column_aliases
1181 .iter()
1182 .map(|column| column.name.clone()),
1183 )
1184 }
1185}
1186
1187fn lateral_output_columns(
1188 lateral: &crate::expressions::Lateral,
1189) -> Box<dyn Iterator<Item = String> + '_> {
1190 if lateral.column_aliases.is_empty() {
1191 default_virtual_output_columns(&lateral.this)
1192 } else {
1193 Box::new(lateral.column_aliases.iter().cloned())
1194 }
1195}
1196
1197fn lateral_view_output_columns(
1198 lateral_view: &crate::expressions::LateralView,
1199) -> Box<dyn Iterator<Item = String> + '_> {
1200 Box::new(
1201 lateral_view
1202 .column_aliases
1203 .iter()
1204 .map(|column| column.name.clone()),
1205 )
1206}
1207
1208fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1209 match expr {
1210 Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1211 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1212 alias_output_columns(alias)
1213 }
1214 Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1215 Box::new(
1216 ["seq", "key", "path", "index", "value", "this"]
1217 .into_iter()
1218 .map(String::from),
1219 )
1220 }
1221 _ => Box::new(std::iter::empty()),
1222 }
1223}
1224
1225fn attach_virtual_source_dependencies(
1226 node: &mut LineageNode,
1227 scope: &Scope,
1228 dialect: Option<DialectType>,
1229 source_alias: &str,
1230 source_expr: &Expression,
1231 trim_selects: bool,
1232 all_cte_scopes: &[&Scope],
1233 depth: usize,
1234) {
1235 let parent_name = node.name.clone();
1236 let mut seen = HashSet::new();
1237 for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1238 let key = (
1239 col_ref.table.as_ref().map(|t| t.name.clone()),
1240 col_ref.column.clone(),
1241 );
1242 if !seen.insert(key) {
1243 continue;
1244 }
1245
1246 if let Some(table_id) = col_ref.table {
1247 let table = table_id.name;
1248 if table == source_alias {
1249 continue;
1250 }
1251 resolve_qualified_column(
1252 node,
1253 scope,
1254 dialect,
1255 &table,
1256 &col_ref.column,
1257 &parent_name,
1258 trim_selects,
1259 all_cte_scopes,
1260 depth + 1,
1261 );
1262 } else {
1263 let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1264 if non_virtual_sources.len() == 1 {
1265 resolve_qualified_column(
1266 node,
1267 scope,
1268 dialect,
1269 &non_virtual_sources[0],
1270 &col_ref.column,
1271 &parent_name,
1272 trim_selects,
1273 all_cte_scopes,
1274 depth + 1,
1275 );
1276 }
1277 }
1278 }
1279}
1280
1281fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1282 fn source_name(expr: &Expression) -> Option<String> {
1283 match expr {
1284 Expression::Table(table) => Some(
1285 table
1286 .alias
1287 .as_ref()
1288 .map(|a| a.name.clone())
1289 .unwrap_or_else(|| table.name.name.clone()),
1290 ),
1291 Expression::Subquery(subquery) => {
1292 subquery.alias.as_ref().map(|alias| alias.name.clone())
1293 }
1294 Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1295 Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1296 Some(alias.alias.name.clone())
1297 }
1298 Expression::Lateral(lateral) => lateral.alias.clone(),
1299 Expression::LateralView(lateral_view) => lateral_view
1300 .table_alias
1301 .as_ref()
1302 .or_else(|| lateral_view.column_aliases.first())
1303 .map(|alias| alias.name.clone()),
1304 Expression::Paren(paren) => source_name(&paren.this),
1305 _ => None,
1306 }
1307 }
1308
1309 let effective_expr = match &scope.expression {
1310 Expression::Cte(cte) => &cte.this,
1311 expr => expr,
1312 };
1313
1314 let mut names = Vec::new();
1315 let mut seen = std::collections::HashSet::new();
1316
1317 if let Expression::Select(select) = effective_expr {
1318 if let Some(from) = &select.from {
1319 for expr in &from.expressions {
1320 if let Some(name) = source_name(expr) {
1321 if !name.is_empty() && seen.insert(name.clone()) {
1322 names.push(name);
1323 }
1324 }
1325 }
1326 }
1327 for join in &select.joins {
1328 if let Some(name) = source_name(&join.this) {
1329 if !name.is_empty() && seen.insert(name.clone()) {
1330 names.push(name);
1331 }
1332 }
1333 }
1334 for lateral_view in &select.lateral_views {
1335 if let Some(name) =
1336 source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1337 {
1338 if !name.is_empty() && seen.insert(name.clone()) {
1339 names.push(name);
1340 }
1341 }
1342 }
1343 }
1344
1345 names
1346}
1347
1348fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1349 source_names_from_from_join(scope)
1350 .into_iter()
1351 .filter(|name| {
1352 !matches!(
1353 scope.sources.get(name).map(|source| source.kind),
1354 Some(SourceKind::Virtual)
1355 )
1356 })
1357 .collect()
1358}
1359
1360fn get_alias_or_name(expr: &Expression) -> Option<String> {
1366 match expr {
1367 Expression::Alias(alias) => Some(alias.alias.name.clone()),
1368 Expression::Column(col) => Some(col.name.name.clone()),
1369 Expression::Identifier(id) => Some(id.name.clone()),
1370 Expression::Star(_) => Some("*".to_string()),
1371 Expression::Annotated(a) => get_alias_or_name(&a.this),
1374 _ => None,
1375 }
1376}
1377
1378fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1380 match column {
1381 ColumnRef::Name(n) => n.to_string(),
1382 ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1383 }
1384}
1385
1386fn find_select_expr(
1388 scope_expr: &Expression,
1389 column: &ColumnRef<'_>,
1390 dialect: Option<DialectType>,
1391) -> Result<Expression> {
1392 if let Expression::Select(ref select) = scope_expr {
1393 match column {
1394 ColumnRef::Name(name) => {
1395 let normalized_name = normalize_column_name(name, dialect);
1396 for expr in &select.expressions {
1397 if let Some(alias_or_name) = get_alias_or_name(expr) {
1398 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1399 return Ok(expr.clone());
1400 }
1401 }
1402 }
1403 if let Some(expr) = synthesize_star_passthrough_expr(select, name) {
1404 return Ok(expr);
1405 }
1406 Err(crate::error::Error::parse(
1407 format!("Cannot find column '{}' in query", name),
1408 0,
1409 0,
1410 0,
1411 0,
1412 ))
1413 }
1414 ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1415 crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1416 }),
1417 }
1418 } else {
1419 Err(crate::error::Error::parse(
1420 "Expected SELECT expression for column lookup",
1421 0,
1422 0,
1423 0,
1424 0,
1425 ))
1426 }
1427}
1428
1429fn synthesize_star_passthrough_expr(select: &Select, name: &str) -> Option<Expression> {
1430 let sources = get_select_sources(select);
1431 if sources.is_empty() {
1432 return None;
1433 }
1434
1435 let mut candidate_aliases = Vec::new();
1436 let mut seen = HashSet::new();
1437
1438 for expr in &select.expressions {
1439 let aliases = match star_passthrough_source_aliases(expr, &sources) {
1440 StarPassthroughSources::None => continue,
1441 StarPassthroughSources::Ambiguous => return None,
1442 StarPassthroughSources::Aliases(aliases) => aliases,
1443 };
1444
1445 for alias in aliases {
1446 if seen.insert(alias.clone()) {
1447 candidate_aliases.push(alias);
1448 }
1449 }
1450 }
1451
1452 match candidate_aliases.as_slice() {
1453 [alias] => {
1454 let table = Identifier::new(alias.clone());
1455 Some(make_column_expr(name, Some(&table)))
1456 }
1457 _ => None,
1458 }
1459}
1460
1461enum StarPassthroughSources {
1462 None,
1463 Ambiguous,
1464 Aliases(Vec<String>),
1465}
1466
1467fn star_passthrough_source_aliases(
1468 expr: &Expression,
1469 sources: &[SourceInfo],
1470) -> StarPassthroughSources {
1471 match expr {
1472 Expression::Star(star) => star_source_aliases(star.table.as_ref(), sources),
1473 Expression::Column(column) if column.name.name == "*" => {
1474 star_source_aliases(column.table.as_ref(), sources)
1475 }
1476 Expression::Annotated(annotated) => {
1477 star_passthrough_source_aliases(&annotated.this, sources)
1478 }
1479 _ => StarPassthroughSources::None,
1480 }
1481}
1482
1483fn star_source_aliases(
1484 qualifier: Option<&Identifier>,
1485 sources: &[SourceInfo],
1486) -> StarPassthroughSources {
1487 if let Some(qualifier) = qualifier {
1488 let mut aliases = Vec::new();
1489
1490 for source in sources {
1491 if source_matches_star_qualifier(source, qualifier) {
1492 aliases.push(source.alias.clone());
1493 }
1494 }
1495
1496 return match aliases.len() {
1497 0 => StarPassthroughSources::None,
1498 1 => StarPassthroughSources::Aliases(aliases),
1499 _ => StarPassthroughSources::Ambiguous,
1500 };
1501 }
1502
1503 match sources {
1504 [source] if source.quoted => StarPassthroughSources::None,
1508 [source] => StarPassthroughSources::Aliases(vec![source.alias.clone()]),
1509 [] => StarPassthroughSources::None,
1510 _ => StarPassthroughSources::Ambiguous,
1511 }
1512}
1513
1514fn source_matches_star_qualifier(source: &SourceInfo, qualifier: &Identifier) -> bool {
1515 if source.normalized == normalize_cte_name(qualifier) {
1516 return true;
1517 }
1518
1519 if qualifier.quoted {
1520 source.alias == qualifier.name
1521 } else {
1522 source.alias.eq_ignore_ascii_case(&qualifier.name)
1523 }
1524}
1525
1526fn column_to_index(
1528 set_op_expr: &Expression,
1529 name: &str,
1530 dialect: Option<DialectType>,
1531) -> Result<usize> {
1532 let normalized_name = normalize_column_name(name, dialect);
1533 let mut expr = set_op_expr;
1534 loop {
1535 match expr {
1536 Expression::Union(u) => expr = &u.left,
1537 Expression::Intersect(i) => expr = &i.left,
1538 Expression::Except(e) => expr = &e.left,
1539 Expression::Select(select) => {
1540 for (i, e) in select.expressions.iter().enumerate() {
1541 if let Some(alias_or_name) = get_alias_or_name(e) {
1542 if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1543 return Ok(i);
1544 }
1545 }
1546 }
1547 return Err(crate::error::Error::parse(
1548 format!("Cannot find column '{}' in set operation", name),
1549 0,
1550 0,
1551 0,
1552 0,
1553 ));
1554 }
1555 _ => {
1556 return Err(crate::error::Error::parse(
1557 "Expected SELECT or set operation",
1558 0,
1559 0,
1560 0,
1561 0,
1562 ))
1563 }
1564 }
1565 }
1566}
1567
1568fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1569 normalize_name(name, dialect, false, true)
1570}
1571
1572fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1574 if let Expression::Select(select) = select_expr {
1575 let mut trimmed = select.as_ref().clone();
1576 trimmed.expressions = vec![target_expr.clone()];
1577 Expression::Select(Box::new(trimmed))
1578 } else {
1579 select_expr.clone()
1580 }
1581}
1582
1583fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1585 if scope.cte_sources.contains_key(source_name) {
1587 for cte_scope in &scope.cte_scopes {
1588 if let Expression::Cte(cte) = &cte_scope.expression {
1589 if cte.alias.name == source_name {
1590 return Some(cte_scope);
1591 }
1592 }
1593 }
1594 }
1595
1596 if let Some(source_info) = scope.sources.get(source_name) {
1598 if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1599 if let Expression::Subquery(sq) = &source_info.expression {
1600 for dt_scope in &scope.derived_table_scopes {
1601 if dt_scope.expression == sq.this {
1602 return Some(dt_scope);
1603 }
1604 }
1605 }
1606 }
1607 }
1608
1609 None
1610}
1611
1612fn find_child_scope_in<'a>(
1616 all_cte_scopes: &[&'a Scope],
1617 scope: &'a Scope,
1618 source_name: &str,
1619) -> Option<&'a Scope> {
1620 for cte_scope in &scope.cte_scopes {
1622 if let Expression::Cte(cte) = &cte_scope.expression {
1623 if cte.alias.name == source_name {
1624 return Some(cte_scope);
1625 }
1626 }
1627 }
1628
1629 for cte_scope in all_cte_scopes {
1631 if let Expression::Cte(cte) = &cte_scope.expression {
1632 if cte.alias.name == source_name {
1633 return Some(cte_scope);
1634 }
1635 }
1636 }
1637
1638 if let Some(source_info) = scope.sources.get(source_name) {
1640 if source_info.is_scope {
1641 if let Expression::Subquery(sq) = &source_info.expression {
1642 for dt_scope in &scope.derived_table_scopes {
1643 if dt_scope.expression == sq.this {
1644 return Some(dt_scope);
1645 }
1646 }
1647 }
1648 }
1649 }
1650
1651 None
1652}
1653
1654fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1656 let mut node = LineageNode::new(
1657 format!("{}.{}", table, column),
1658 Expression::Column(Box::new(crate::expressions::Column {
1659 name: crate::expressions::Identifier::new(column.to_string()),
1660 table: Some(crate::expressions::Identifier::new(table.to_string())),
1661 join_mark: false,
1662 trailing_comments: vec![],
1663 span: None,
1664 inferred_type: None,
1665 })),
1666 Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1667 );
1668 node.source_name = table.to_string();
1669 node.source_kind = SourceKind::Table;
1670 node
1671}
1672
1673fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1674 let mut parts: Vec<String> = Vec::new();
1675 if let Some(catalog) = &table_ref.catalog {
1676 parts.push(catalog.name.clone());
1677 }
1678 if let Some(schema) = &table_ref.schema {
1679 parts.push(schema.name.clone());
1680 }
1681 parts.push(table_ref.name.name.clone());
1682 parts.join(".")
1683}
1684
1685fn apply_source_info_context(
1686 node: &mut LineageNode,
1687 source_key: &str,
1688 source_info: &ScopeSourceInfo,
1689) {
1690 node.source_kind = source_info.kind;
1691 node.source_name =
1692 source_info
1693 .lineage_name
1694 .clone()
1695 .unwrap_or_else(|| match &source_info.expression {
1696 Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1697 _ => source_key.to_string(),
1698 });
1699 node.source_alias = source_info.alias.clone();
1700}
1701
1702fn make_table_column_node_from_source(
1703 source_key: &str,
1704 column: &str,
1705 source_info: &ScopeSourceInfo,
1706) -> LineageNode {
1707 let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1708 let mut node = LineageNode::new(
1709 format!("{}.{}", lineage_name, column),
1710 Expression::Column(Box::new(crate::expressions::Column {
1711 name: crate::expressions::Identifier::new(column.to_string()),
1712 table: Some(crate::expressions::Identifier::new(
1713 lineage_name.to_string(),
1714 )),
1715 join_mark: false,
1716 trailing_comments: vec![],
1717 span: None,
1718 inferred_type: None,
1719 })),
1720 source_info.expression.clone(),
1721 );
1722
1723 apply_source_info_context(&mut node, source_key, source_info);
1724
1725 node
1726}
1727
1728#[derive(Debug, Clone)]
1730struct SimpleColumnRef {
1731 table: Option<crate::expressions::Identifier>,
1732 column: String,
1733}
1734
1735fn find_column_refs_in_expr(
1737 expr: &Expression,
1738 dialect: Option<DialectType>,
1739) -> Vec<SimpleColumnRef> {
1740 let mut refs = Vec::new();
1741 collect_column_refs(expr, dialect, &mut refs);
1742 refs
1743}
1744
1745fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1746 match expr {
1747 Expression::Column(col) => {
1748 col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1749 }
1750 Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1751 _ => false,
1752 }
1753}
1754
1755fn collect_column_refs(
1756 expr: &Expression,
1757 dialect: Option<DialectType>,
1758 refs: &mut Vec<SimpleColumnRef>,
1759) {
1760 let mut stack: Vec<&Expression> = vec![expr];
1761
1762 while let Some(current) = stack.pop() {
1763 match current {
1764 Expression::Column(col) => {
1766 refs.push(SimpleColumnRef {
1767 table: col.table.clone(),
1768 column: col.name.name.clone(),
1769 });
1770 }
1771
1772 Expression::Subquery(_) | Expression::Exists(_) => {}
1774
1775 Expression::And(op)
1777 | Expression::Or(op)
1778 | Expression::Eq(op)
1779 | Expression::Neq(op)
1780 | Expression::Lt(op)
1781 | Expression::Lte(op)
1782 | Expression::Gt(op)
1783 | Expression::Gte(op)
1784 | Expression::Add(op)
1785 | Expression::Sub(op)
1786 | Expression::Mul(op)
1787 | Expression::Div(op)
1788 | Expression::Mod(op)
1789 | Expression::BitwiseAnd(op)
1790 | Expression::BitwiseOr(op)
1791 | Expression::BitwiseXor(op)
1792 | Expression::BitwiseLeftShift(op)
1793 | Expression::BitwiseRightShift(op)
1794 | Expression::Concat(op)
1795 | Expression::Adjacent(op)
1796 | Expression::TsMatch(op)
1797 | Expression::PropertyEQ(op)
1798 | Expression::ArrayContainsAll(op)
1799 | Expression::ArrayContainedBy(op)
1800 | Expression::ArrayOverlaps(op)
1801 | Expression::JSONBContainsAllTopKeys(op)
1802 | Expression::JSONBContainsAnyTopKeys(op)
1803 | Expression::JSONBDeleteAtPath(op)
1804 | Expression::ExtendsLeft(op)
1805 | Expression::ExtendsRight(op)
1806 | Expression::Is(op)
1807 | Expression::MemberOf(op)
1808 | Expression::NullSafeEq(op)
1809 | Expression::NullSafeNeq(op)
1810 | Expression::Glob(op)
1811 | Expression::Match(op) => {
1812 stack.push(&op.left);
1813 stack.push(&op.right);
1814 }
1815
1816 Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1818 stack.push(&u.this);
1819 }
1820
1821 Expression::Upper(f)
1823 | Expression::Lower(f)
1824 | Expression::Length(f)
1825 | Expression::LTrim(f)
1826 | Expression::RTrim(f)
1827 | Expression::Reverse(f)
1828 | Expression::Abs(f)
1829 | Expression::Sqrt(f)
1830 | Expression::Cbrt(f)
1831 | Expression::Ln(f)
1832 | Expression::Exp(f)
1833 | Expression::Sign(f)
1834 | Expression::Date(f)
1835 | Expression::Time(f)
1836 | Expression::DateFromUnixDate(f)
1837 | Expression::UnixDate(f)
1838 | Expression::UnixSeconds(f)
1839 | Expression::UnixMillis(f)
1840 | Expression::UnixMicros(f)
1841 | Expression::TimeStrToDate(f)
1842 | Expression::DateToDi(f)
1843 | Expression::DiToDate(f)
1844 | Expression::TsOrDiToDi(f)
1845 | Expression::TsOrDsToDatetime(f)
1846 | Expression::TsOrDsToTimestamp(f)
1847 | Expression::YearOfWeek(f)
1848 | Expression::YearOfWeekIso(f)
1849 | Expression::Initcap(f)
1850 | Expression::Ascii(f)
1851 | Expression::Chr(f)
1852 | Expression::Soundex(f)
1853 | Expression::ByteLength(f)
1854 | Expression::Hex(f)
1855 | Expression::LowerHex(f)
1856 | Expression::Unicode(f)
1857 | Expression::Radians(f)
1858 | Expression::Degrees(f)
1859 | Expression::Sin(f)
1860 | Expression::Cos(f)
1861 | Expression::Tan(f)
1862 | Expression::Asin(f)
1863 | Expression::Acos(f)
1864 | Expression::Atan(f)
1865 | Expression::IsNan(f)
1866 | Expression::IsInf(f)
1867 | Expression::ArrayLength(f)
1868 | Expression::ArraySize(f)
1869 | Expression::Cardinality(f)
1870 | Expression::ArrayReverse(f)
1871 | Expression::ArrayDistinct(f)
1872 | Expression::ArrayFlatten(f)
1873 | Expression::ArrayCompact(f)
1874 | Expression::Explode(f)
1875 | Expression::ExplodeOuter(f)
1876 | Expression::ToArray(f)
1877 | Expression::MapFromEntries(f)
1878 | Expression::MapKeys(f)
1879 | Expression::MapValues(f)
1880 | Expression::JsonArrayLength(f)
1881 | Expression::JsonKeys(f)
1882 | Expression::JsonType(f)
1883 | Expression::ParseJson(f)
1884 | Expression::ToJson(f)
1885 | Expression::Typeof(f)
1886 | Expression::BitwiseCount(f)
1887 | Expression::Year(f)
1888 | Expression::Month(f)
1889 | Expression::Day(f)
1890 | Expression::Hour(f)
1891 | Expression::Minute(f)
1892 | Expression::Second(f)
1893 | Expression::DayOfWeek(f)
1894 | Expression::DayOfWeekIso(f)
1895 | Expression::DayOfMonth(f)
1896 | Expression::DayOfYear(f)
1897 | Expression::WeekOfYear(f)
1898 | Expression::Quarter(f)
1899 | Expression::Epoch(f)
1900 | Expression::EpochMs(f)
1901 | Expression::TimeStrToUnix(f)
1902 | Expression::SHA(f)
1903 | Expression::SHA1Digest(f)
1904 | Expression::TimeToUnix(f)
1905 | Expression::JSONBool(f)
1906 | Expression::Int64(f)
1907 | Expression::MD5NumberLower64(f)
1908 | Expression::MD5NumberUpper64(f)
1909 | Expression::DateStrToDate(f)
1910 | Expression::DateToDateStr(f) => {
1911 stack.push(&f.this);
1912 }
1913
1914 Expression::Power(f)
1916 | Expression::NullIf(f)
1917 | Expression::IfNull(f)
1918 | Expression::Nvl(f)
1919 | Expression::UnixToTimeStr(f)
1920 | Expression::Contains(f)
1921 | Expression::StartsWith(f)
1922 | Expression::EndsWith(f)
1923 | Expression::Levenshtein(f)
1924 | Expression::ModFunc(f)
1925 | Expression::Atan2(f)
1926 | Expression::IntDiv(f)
1927 | Expression::AddMonths(f)
1928 | Expression::MonthsBetween(f)
1929 | Expression::NextDay(f)
1930 | Expression::ArrayContains(f)
1931 | Expression::ArrayPosition(f)
1932 | Expression::ArrayAppend(f)
1933 | Expression::ArrayPrepend(f)
1934 | Expression::ArrayUnion(f)
1935 | Expression::ArrayExcept(f)
1936 | Expression::ArrayRemove(f)
1937 | Expression::StarMap(f)
1938 | Expression::MapFromArrays(f)
1939 | Expression::MapContainsKey(f)
1940 | Expression::ElementAt(f)
1941 | Expression::JsonMergePatch(f)
1942 | Expression::JSONBContains(f)
1943 | Expression::JSONBExtract(f) => {
1944 stack.push(&f.this);
1945 stack.push(&f.expression);
1946 }
1947
1948 Expression::Greatest(f)
1950 | Expression::Least(f)
1951 | Expression::Coalesce(f)
1952 | Expression::ArrayConcat(f)
1953 | Expression::ArrayIntersect(f)
1954 | Expression::ArrayZip(f)
1955 | Expression::MapConcat(f)
1956 | Expression::JsonArray(f) => {
1957 for e in &f.expressions {
1958 stack.push(e);
1959 }
1960 }
1961
1962 Expression::Sum(f)
1964 | Expression::Avg(f)
1965 | Expression::Min(f)
1966 | Expression::Max(f)
1967 | Expression::ArrayAgg(f)
1968 | Expression::CountIf(f)
1969 | Expression::Stddev(f)
1970 | Expression::StddevPop(f)
1971 | Expression::StddevSamp(f)
1972 | Expression::Variance(f)
1973 | Expression::VarPop(f)
1974 | Expression::VarSamp(f)
1975 | Expression::Median(f)
1976 | Expression::Mode(f)
1977 | Expression::First(f)
1978 | Expression::Last(f)
1979 | Expression::AnyValue(f)
1980 | Expression::ApproxDistinct(f)
1981 | Expression::ApproxCountDistinct(f)
1982 | Expression::LogicalAnd(f)
1983 | Expression::LogicalOr(f)
1984 | Expression::Skewness(f)
1985 | Expression::ArrayConcatAgg(f)
1986 | Expression::ArrayUniqueAgg(f)
1987 | Expression::BoolXorAgg(f)
1988 | Expression::BitwiseAndAgg(f)
1989 | Expression::BitwiseOrAgg(f)
1990 | Expression::BitwiseXorAgg(f) => {
1991 stack.push(&f.this);
1992 if let Some(ref filter) = f.filter {
1993 stack.push(filter);
1994 }
1995 if let Some((ref expr, _)) = f.having_max {
1996 stack.push(expr);
1997 }
1998 if let Some(ref limit) = f.limit {
1999 stack.push(limit);
2000 }
2001 }
2002
2003 Expression::Function(func) => {
2005 for arg in &func.args {
2006 stack.push(arg);
2007 }
2008 }
2009 Expression::AggregateFunction(func) => {
2010 for arg in &func.args {
2011 stack.push(arg);
2012 }
2013 if let Some(ref filter) = func.filter {
2014 stack.push(filter);
2015 }
2016 if let Some(ref limit) = func.limit {
2017 stack.push(limit);
2018 }
2019 }
2020
2021 Expression::WindowFunction(wf) => {
2023 stack.push(&wf.this);
2024 }
2025
2026 Expression::Alias(a) => {
2028 stack.push(&a.this);
2029 }
2030 Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
2031 stack.push(&c.this);
2032 if let Some(ref fmt) = c.format {
2033 stack.push(fmt);
2034 }
2035 if let Some(ref def) = c.default {
2036 stack.push(def);
2037 }
2038 }
2039 Expression::Paren(p) => {
2040 stack.push(&p.this);
2041 }
2042 Expression::Annotated(a) => {
2043 stack.push(&a.this);
2044 }
2045 Expression::Case(case) => {
2046 if let Some(ref operand) = case.operand {
2047 stack.push(operand);
2048 }
2049 for (cond, result) in &case.whens {
2050 stack.push(cond);
2051 stack.push(result);
2052 }
2053 if let Some(ref else_expr) = case.else_ {
2054 stack.push(else_expr);
2055 }
2056 }
2057 Expression::Collation(c) => {
2058 stack.push(&c.this);
2059 }
2060 Expression::In(i) => {
2061 stack.push(&i.this);
2062 for e in &i.expressions {
2063 stack.push(e);
2064 }
2065 if let Some(ref q) = i.query {
2066 stack.push(q);
2067 }
2068 if let Some(ref u) = i.unnest {
2069 stack.push(u);
2070 }
2071 }
2072 Expression::Between(b) => {
2073 stack.push(&b.this);
2074 stack.push(&b.low);
2075 stack.push(&b.high);
2076 }
2077 Expression::IsNull(n) => {
2078 stack.push(&n.this);
2079 }
2080 Expression::IsTrue(t) | Expression::IsFalse(t) => {
2081 stack.push(&t.this);
2082 }
2083 Expression::IsJson(j) => {
2084 stack.push(&j.this);
2085 }
2086 Expression::Like(l) | Expression::ILike(l) => {
2087 stack.push(&l.left);
2088 stack.push(&l.right);
2089 if let Some(ref esc) = l.escape {
2090 stack.push(esc);
2091 }
2092 }
2093 Expression::SimilarTo(s) => {
2094 stack.push(&s.this);
2095 stack.push(&s.pattern);
2096 if let Some(ref esc) = s.escape {
2097 stack.push(esc);
2098 }
2099 }
2100 Expression::Ordered(o) => {
2101 stack.push(&o.this);
2102 }
2103 Expression::Array(a) => {
2104 for e in &a.expressions {
2105 stack.push(e);
2106 }
2107 }
2108 Expression::Tuple(t) => {
2109 for e in &t.expressions {
2110 stack.push(e);
2111 }
2112 }
2113 Expression::Struct(s) => {
2114 for (_, e) in &s.fields {
2115 stack.push(e);
2116 }
2117 }
2118 Expression::Subscript(s) => {
2119 stack.push(&s.this);
2120 stack.push(&s.index);
2121 }
2122 Expression::Dot(d) => {
2123 stack.push(&d.this);
2124 }
2125 Expression::MethodCall(m) => {
2126 if !matches!(dialect, Some(DialectType::BigQuery))
2127 || !is_bigquery_safe_namespace_receiver(&m.this)
2128 {
2129 stack.push(&m.this);
2130 }
2131 for arg in &m.args {
2132 stack.push(arg);
2133 }
2134 }
2135 Expression::ArraySlice(s) => {
2136 stack.push(&s.this);
2137 if let Some(ref start) = s.start {
2138 stack.push(start);
2139 }
2140 if let Some(ref end) = s.end {
2141 stack.push(end);
2142 }
2143 }
2144 Expression::Lambda(l) => {
2145 stack.push(&l.body);
2146 }
2147 Expression::NamedArgument(n) => {
2148 stack.push(&n.value);
2149 }
2150 Expression::Lateral(l) => {
2151 stack.push(&l.this);
2152 if let Some(ref view) = l.view {
2153 stack.push(view);
2154 }
2155 if let Some(ref outer) = l.outer {
2156 stack.push(outer);
2157 }
2158 if let Some(ref ordinality) = l.ordinality {
2159 stack.push(ordinality);
2160 }
2161 }
2162 Expression::LateralView(lv) => {
2163 stack.push(&lv.this);
2164 }
2165 Expression::TryCatch(t) => {
2166 for stmt in &t.try_body {
2167 stack.push(stmt);
2168 }
2169 if let Some(catch_body) = &t.catch_body {
2170 for stmt in catch_body {
2171 stack.push(stmt);
2172 }
2173 }
2174 }
2175 Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2176 stack.push(e);
2177 }
2178
2179 Expression::Substring(f) => {
2181 stack.push(&f.this);
2182 stack.push(&f.start);
2183 if let Some(ref len) = f.length {
2184 stack.push(len);
2185 }
2186 }
2187 Expression::Trim(f) => {
2188 stack.push(&f.this);
2189 if let Some(ref chars) = f.characters {
2190 stack.push(chars);
2191 }
2192 }
2193 Expression::Replace(f) => {
2194 stack.push(&f.this);
2195 stack.push(&f.old);
2196 stack.push(&f.new);
2197 }
2198 Expression::IfFunc(f) => {
2199 stack.push(&f.condition);
2200 stack.push(&f.true_value);
2201 if let Some(ref fv) = f.false_value {
2202 stack.push(fv);
2203 }
2204 }
2205 Expression::Nvl2(f) => {
2206 stack.push(&f.this);
2207 stack.push(&f.true_value);
2208 stack.push(&f.false_value);
2209 }
2210 Expression::ConcatWs(f) => {
2211 stack.push(&f.separator);
2212 for e in &f.expressions {
2213 stack.push(e);
2214 }
2215 }
2216 Expression::Count(f) => {
2217 if let Some(ref this) = f.this {
2218 stack.push(this);
2219 }
2220 if let Some(ref filter) = f.filter {
2221 stack.push(filter);
2222 }
2223 }
2224 Expression::GroupConcat(f) => {
2225 stack.push(&f.this);
2226 if let Some(ref sep) = f.separator {
2227 stack.push(sep);
2228 }
2229 if let Some(ref filter) = f.filter {
2230 stack.push(filter);
2231 }
2232 }
2233 Expression::StringAgg(f) => {
2234 stack.push(&f.this);
2235 if let Some(ref sep) = f.separator {
2236 stack.push(sep);
2237 }
2238 if let Some(ref filter) = f.filter {
2239 stack.push(filter);
2240 }
2241 if let Some(ref limit) = f.limit {
2242 stack.push(limit);
2243 }
2244 }
2245 Expression::ListAgg(f) => {
2246 stack.push(&f.this);
2247 if let Some(ref sep) = f.separator {
2248 stack.push(sep);
2249 }
2250 if let Some(ref filter) = f.filter {
2251 stack.push(filter);
2252 }
2253 }
2254 Expression::SumIf(f) => {
2255 stack.push(&f.this);
2256 stack.push(&f.condition);
2257 if let Some(ref filter) = f.filter {
2258 stack.push(filter);
2259 }
2260 }
2261 Expression::DateAdd(f) | Expression::DateSub(f) => {
2262 stack.push(&f.this);
2263 stack.push(&f.interval);
2264 }
2265 Expression::DateDiff(f) => {
2266 stack.push(&f.this);
2267 stack.push(&f.expression);
2268 }
2269 Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2270 stack.push(&f.this);
2271 }
2272 Expression::Extract(f) => {
2273 stack.push(&f.this);
2274 }
2275 Expression::Round(f) => {
2276 stack.push(&f.this);
2277 if let Some(ref d) = f.decimals {
2278 stack.push(d);
2279 }
2280 }
2281 Expression::Floor(f) => {
2282 stack.push(&f.this);
2283 if let Some(ref s) = f.scale {
2284 stack.push(s);
2285 }
2286 if let Some(ref t) = f.to {
2287 stack.push(t);
2288 }
2289 }
2290 Expression::Ceil(f) => {
2291 stack.push(&f.this);
2292 if let Some(ref d) = f.decimals {
2293 stack.push(d);
2294 }
2295 if let Some(ref t) = f.to {
2296 stack.push(t);
2297 }
2298 }
2299 Expression::Log(f) => {
2300 stack.push(&f.this);
2301 if let Some(ref b) = f.base {
2302 stack.push(b);
2303 }
2304 }
2305 Expression::AtTimeZone(f) => {
2306 stack.push(&f.this);
2307 stack.push(&f.zone);
2308 }
2309 Expression::Lead(f) | Expression::Lag(f) => {
2310 stack.push(&f.this);
2311 if let Some(ref off) = f.offset {
2312 stack.push(off);
2313 }
2314 if let Some(ref def) = f.default {
2315 stack.push(def);
2316 }
2317 }
2318 Expression::FirstValue(f) | Expression::LastValue(f) => {
2319 stack.push(&f.this);
2320 }
2321 Expression::NthValue(f) => {
2322 stack.push(&f.this);
2323 stack.push(&f.offset);
2324 }
2325 Expression::Position(f) => {
2326 stack.push(&f.substring);
2327 stack.push(&f.string);
2328 if let Some(ref start) = f.start {
2329 stack.push(start);
2330 }
2331 }
2332 Expression::Decode(f) => {
2333 stack.push(&f.this);
2334 for (search, result) in &f.search_results {
2335 stack.push(search);
2336 stack.push(result);
2337 }
2338 if let Some(ref def) = f.default {
2339 stack.push(def);
2340 }
2341 }
2342 Expression::CharFunc(f) => {
2343 for arg in &f.args {
2344 stack.push(arg);
2345 }
2346 }
2347 Expression::ArraySort(f) => {
2348 stack.push(&f.this);
2349 if let Some(ref cmp) = f.comparator {
2350 stack.push(cmp);
2351 }
2352 }
2353 Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2354 stack.push(&f.this);
2355 stack.push(&f.separator);
2356 if let Some(ref nr) = f.null_replacement {
2357 stack.push(nr);
2358 }
2359 }
2360 Expression::ArrayFilter(f) => {
2361 stack.push(&f.this);
2362 stack.push(&f.filter);
2363 }
2364 Expression::ArrayTransform(f) => {
2365 stack.push(&f.this);
2366 stack.push(&f.transform);
2367 }
2368 Expression::Sequence(f)
2369 | Expression::Generate(f)
2370 | Expression::ExplodingGenerateSeries(f) => {
2371 stack.push(&f.start);
2372 stack.push(&f.stop);
2373 if let Some(ref step) = f.step {
2374 stack.push(step);
2375 }
2376 }
2377 Expression::JsonExtract(f)
2378 | Expression::JsonExtractScalar(f)
2379 | Expression::JsonQuery(f)
2380 | Expression::JsonValue(f) => {
2381 stack.push(&f.this);
2382 stack.push(&f.path);
2383 }
2384 Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2385 stack.push(&f.this);
2386 for p in &f.paths {
2387 stack.push(p);
2388 }
2389 }
2390 Expression::JsonObject(f) => {
2391 for (k, v) in &f.pairs {
2392 stack.push(k);
2393 stack.push(v);
2394 }
2395 }
2396 Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2397 stack.push(&f.this);
2398 for (path, val) in &f.path_values {
2399 stack.push(path);
2400 stack.push(val);
2401 }
2402 }
2403 Expression::Overlay(f) => {
2404 stack.push(&f.this);
2405 stack.push(&f.replacement);
2406 stack.push(&f.from);
2407 if let Some(ref len) = f.length {
2408 stack.push(len);
2409 }
2410 }
2411 Expression::Convert(f) => {
2412 stack.push(&f.this);
2413 if let Some(ref style) = f.style {
2414 stack.push(style);
2415 }
2416 }
2417 Expression::ApproxPercentile(f) => {
2418 stack.push(&f.this);
2419 stack.push(&f.percentile);
2420 if let Some(ref acc) = f.accuracy {
2421 stack.push(acc);
2422 }
2423 if let Some(ref filter) = f.filter {
2424 stack.push(filter);
2425 }
2426 }
2427 Expression::Percentile(f)
2428 | Expression::PercentileCont(f)
2429 | Expression::PercentileDisc(f) => {
2430 stack.push(&f.this);
2431 stack.push(&f.percentile);
2432 if let Some(ref filter) = f.filter {
2433 stack.push(filter);
2434 }
2435 }
2436 Expression::WithinGroup(f) => {
2437 stack.push(&f.this);
2438 }
2439 Expression::Left(f) | Expression::Right(f) => {
2440 stack.push(&f.this);
2441 stack.push(&f.length);
2442 }
2443 Expression::Repeat(f) => {
2444 stack.push(&f.this);
2445 stack.push(&f.times);
2446 }
2447 Expression::Lpad(f) | Expression::Rpad(f) => {
2448 stack.push(&f.this);
2449 stack.push(&f.length);
2450 if let Some(ref fill) = f.fill {
2451 stack.push(fill);
2452 }
2453 }
2454 Expression::Split(f) => {
2455 stack.push(&f.this);
2456 stack.push(&f.delimiter);
2457 }
2458 Expression::RegexpLike(f) => {
2459 stack.push(&f.this);
2460 stack.push(&f.pattern);
2461 if let Some(ref flags) = f.flags {
2462 stack.push(flags);
2463 }
2464 }
2465 Expression::RegexpReplace(f) => {
2466 stack.push(&f.this);
2467 stack.push(&f.pattern);
2468 stack.push(&f.replacement);
2469 if let Some(ref flags) = f.flags {
2470 stack.push(flags);
2471 }
2472 }
2473 Expression::RegexpExtract(f) => {
2474 stack.push(&f.this);
2475 stack.push(&f.pattern);
2476 if let Some(ref group) = f.group {
2477 stack.push(group);
2478 }
2479 }
2480 Expression::ToDate(f) => {
2481 stack.push(&f.this);
2482 if let Some(ref fmt) = f.format {
2483 stack.push(fmt);
2484 }
2485 }
2486 Expression::ToTimestamp(f) => {
2487 stack.push(&f.this);
2488 if let Some(ref fmt) = f.format {
2489 stack.push(fmt);
2490 }
2491 }
2492 Expression::DateFormat(f) | Expression::FormatDate(f) => {
2493 stack.push(&f.this);
2494 stack.push(&f.format);
2495 }
2496 Expression::LastDay(f) => {
2497 stack.push(&f.this);
2498 }
2499 Expression::FromUnixtime(f) => {
2500 stack.push(&f.this);
2501 if let Some(ref fmt) = f.format {
2502 stack.push(fmt);
2503 }
2504 }
2505 Expression::UnixTimestamp(f) => {
2506 if let Some(ref this) = f.this {
2507 stack.push(this);
2508 }
2509 if let Some(ref fmt) = f.format {
2510 stack.push(fmt);
2511 }
2512 }
2513 Expression::MakeDate(f) => {
2514 stack.push(&f.year);
2515 stack.push(&f.month);
2516 stack.push(&f.day);
2517 }
2518 Expression::MakeTimestamp(f) => {
2519 stack.push(&f.year);
2520 stack.push(&f.month);
2521 stack.push(&f.day);
2522 stack.push(&f.hour);
2523 stack.push(&f.minute);
2524 stack.push(&f.second);
2525 if let Some(ref tz) = f.timezone {
2526 stack.push(tz);
2527 }
2528 }
2529 Expression::TruncFunc(f) => {
2530 stack.push(&f.this);
2531 if let Some(ref d) = f.decimals {
2532 stack.push(d);
2533 }
2534 }
2535 Expression::ArrayFunc(f) => {
2536 for e in &f.expressions {
2537 stack.push(e);
2538 }
2539 }
2540 Expression::Unnest(f) => {
2541 stack.push(&f.this);
2542 for e in &f.expressions {
2543 stack.push(e);
2544 }
2545 }
2546 Expression::StructFunc(f) => {
2547 for (_, e) in &f.fields {
2548 stack.push(e);
2549 }
2550 }
2551 Expression::StructExtract(f) => {
2552 stack.push(&f.this);
2553 }
2554 Expression::NamedStruct(f) => {
2555 for (k, v) in &f.pairs {
2556 stack.push(k);
2557 stack.push(v);
2558 }
2559 }
2560 Expression::MapFunc(f) => {
2561 for k in &f.keys {
2562 stack.push(k);
2563 }
2564 for v in &f.values {
2565 stack.push(v);
2566 }
2567 }
2568 Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2569 stack.push(&f.this);
2570 stack.push(&f.transform);
2571 }
2572 Expression::JsonArrayAgg(f) => {
2573 stack.push(&f.this);
2574 if let Some(ref filter) = f.filter {
2575 stack.push(filter);
2576 }
2577 }
2578 Expression::JsonObjectAgg(f) => {
2579 stack.push(&f.key);
2580 stack.push(&f.value);
2581 if let Some(ref filter) = f.filter {
2582 stack.push(filter);
2583 }
2584 }
2585 Expression::NTile(f) => {
2586 if let Some(ref n) = f.num_buckets {
2587 stack.push(n);
2588 }
2589 }
2590 Expression::Rand(f) => {
2591 if let Some(ref s) = f.seed {
2592 stack.push(s);
2593 }
2594 if let Some(ref lo) = f.lower {
2595 stack.push(lo);
2596 }
2597 if let Some(ref hi) = f.upper {
2598 stack.push(hi);
2599 }
2600 }
2601 Expression::Any(q) | Expression::All(q) => {
2602 stack.push(&q.this);
2603 stack.push(&q.subquery);
2604 }
2605 Expression::Overlaps(o) => {
2606 if let Some(ref this) = o.this {
2607 stack.push(this);
2608 }
2609 if let Some(ref expr) = o.expression {
2610 stack.push(expr);
2611 }
2612 if let Some(ref ls) = o.left_start {
2613 stack.push(ls);
2614 }
2615 if let Some(ref le) = o.left_end {
2616 stack.push(le);
2617 }
2618 if let Some(ref rs) = o.right_start {
2619 stack.push(rs);
2620 }
2621 if let Some(ref re) = o.right_end {
2622 stack.push(re);
2623 }
2624 }
2625 Expression::Interval(i) => {
2626 if let Some(ref this) = i.this {
2627 stack.push(this);
2628 }
2629 }
2630 Expression::TimeStrToTime(f) => {
2631 stack.push(&f.this);
2632 if let Some(ref zone) = f.zone {
2633 stack.push(zone);
2634 }
2635 }
2636 Expression::JSONBExtractScalar(f) => {
2637 stack.push(&f.this);
2638 stack.push(&f.expression);
2639 if let Some(ref jt) = f.json_type {
2640 stack.push(jt);
2641 }
2642 }
2643 Expression::JSONExtract(f) => {
2644 stack.push(&f.this);
2645 stack.push(&f.expression);
2646 for e in &f.expressions {
2647 stack.push(e);
2648 }
2649 if let Some(ref option) = f.option {
2650 stack.push(option);
2651 }
2652 if let Some(ref on_condition) = f.on_condition {
2653 stack.push(on_condition);
2654 }
2655 }
2656
2657 _ => {}
2662 }
2663 }
2664}
2665
2666#[cfg(test)]
2671mod tests {
2672 use super::*;
2673 use crate::dialects::{Dialect, DialectType};
2674 use crate::expressions::DataType;
2675 use crate::optimizer::annotate_types::annotate_types;
2676 use crate::parse_one;
2677 use crate::schema::{MappingSchema, Schema};
2678
2679 fn parse(sql: &str) -> Expression {
2680 let dialect = Dialect::get(DialectType::Generic);
2681 let ast = dialect.parse(sql).unwrap();
2682 ast.into_iter().next().unwrap()
2683 }
2684
2685 #[test]
2686 fn test_simple_lineage() {
2687 let expr = parse("SELECT a FROM t");
2688 let node = lineage("a", &expr, None, false).unwrap();
2689
2690 assert_eq!(node.name, "a");
2691 assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2692 let names = node.downstream_names();
2694 assert!(
2695 names.iter().any(|n| n == "t.a"),
2696 "Expected t.a in downstream, got: {:?}",
2697 names
2698 );
2699 }
2700
2701 #[test]
2702 fn test_lineage_walk() {
2703 let root = LineageNode {
2704 name: "col_a".to_string(),
2705 expression: Expression::Null(crate::expressions::Null),
2706 source: Expression::Null(crate::expressions::Null),
2707 downstream: vec![LineageNode::new(
2708 "t.a",
2709 Expression::Null(crate::expressions::Null),
2710 Expression::Null(crate::expressions::Null),
2711 )],
2712 source_name: String::new(),
2713 source_kind: SourceKind::Unknown,
2714 source_alias: None,
2715 reference_node_name: String::new(),
2716 };
2717
2718 let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2719 assert_eq!(names.len(), 2);
2720 assert_eq!(names[0], "col_a");
2721 assert_eq!(names[1], "t.a");
2722 }
2723
2724 #[test]
2725 fn test_aliased_column() {
2726 let expr = parse("SELECT a + 1 AS b FROM t");
2727 let node = lineage("b", &expr, None, false).unwrap();
2728
2729 assert_eq!(node.name, "b");
2730 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2732 assert!(
2733 all_names.iter().any(|n| n.contains("a")),
2734 "Expected to trace to column a, got: {:?}",
2735 all_names
2736 );
2737 }
2738
2739 #[test]
2740 fn test_qualified_column() {
2741 let expr = parse("SELECT t.a FROM t");
2742 let node = lineage("a", &expr, None, false).unwrap();
2743
2744 assert_eq!(node.name, "a");
2745 let names = node.downstream_names();
2746 assert!(
2747 names.iter().any(|n| n == "t.a"),
2748 "Expected t.a, got: {:?}",
2749 names
2750 );
2751 }
2752
2753 #[test]
2754 fn test_unqualified_column() {
2755 let expr = parse("SELECT a FROM t");
2756 let node = lineage("a", &expr, None, false).unwrap();
2757
2758 let names = node.downstream_names();
2760 assert!(
2761 names.iter().any(|n| n == "t.a"),
2762 "Expected t.a, got: {:?}",
2763 names
2764 );
2765 }
2766
2767 #[test]
2768 fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2769 let query = "SELECT name FROM users";
2770 let dialect = Dialect::get(DialectType::BigQuery);
2771 let expr = dialect
2772 .parse(query)
2773 .unwrap()
2774 .into_iter()
2775 .next()
2776 .expect("expected one expression");
2777
2778 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2779 schema
2780 .add_table("users", &[("name".into(), DataType::Text)], None)
2781 .expect("schema setup");
2782
2783 let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2784 .expect("lineage without schema");
2785 let mut expr_without = node_without_schema.expression.clone();
2786 annotate_types(
2787 &mut expr_without,
2788 Some(&schema),
2789 Some(DialectType::BigQuery),
2790 );
2791 assert_eq!(
2792 expr_without.inferred_type(),
2793 None,
2794 "Expected unresolved root type without schema-aware lineage qualification"
2795 );
2796
2797 let node_with_schema = lineage_with_schema(
2798 "name",
2799 &expr,
2800 Some(&schema),
2801 Some(DialectType::BigQuery),
2802 false,
2803 )
2804 .expect("lineage with schema");
2805 let mut expr_with = node_with_schema.expression.clone();
2806 annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2807
2808 assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2809 }
2810
2811 #[test]
2812 fn test_lineage_with_schema_correlated_scalar_subquery() {
2813 let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2814 let dialect = Dialect::get(DialectType::BigQuery);
2815 let expr = dialect
2816 .parse(query)
2817 .unwrap()
2818 .into_iter()
2819 .next()
2820 .expect("expected one expression");
2821
2822 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2823 schema
2824 .add_table(
2825 "t1",
2826 &[("id".into(), DataType::BigInt { length: None })],
2827 None,
2828 )
2829 .expect("schema setup");
2830 schema
2831 .add_table(
2832 "t2",
2833 &[
2834 ("id".into(), DataType::BigInt { length: None }),
2835 ("val".into(), DataType::BigInt { length: None }),
2836 ],
2837 None,
2838 )
2839 .expect("schema setup");
2840
2841 let node = lineage_with_schema(
2842 "id",
2843 &expr,
2844 Some(&schema),
2845 Some(DialectType::BigQuery),
2846 false,
2847 )
2848 .expect("lineage_with_schema should handle correlated scalar subqueries");
2849
2850 assert_eq!(node.name, "id");
2851 }
2852
2853 #[test]
2854 fn test_lineage_with_schema_join_using() {
2855 let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2856 let dialect = Dialect::get(DialectType::BigQuery);
2857 let expr = dialect
2858 .parse(query)
2859 .unwrap()
2860 .into_iter()
2861 .next()
2862 .expect("expected one expression");
2863
2864 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2865 schema
2866 .add_table(
2867 "t1",
2868 &[("a".into(), DataType::BigInt { length: None })],
2869 None,
2870 )
2871 .expect("schema setup");
2872 schema
2873 .add_table(
2874 "t2",
2875 &[("a".into(), DataType::BigInt { length: None })],
2876 None,
2877 )
2878 .expect("schema setup");
2879
2880 let node = lineage_with_schema(
2881 "a",
2882 &expr,
2883 Some(&schema),
2884 Some(DialectType::BigQuery),
2885 false,
2886 )
2887 .expect("lineage_with_schema should handle JOIN USING");
2888
2889 assert_eq!(node.name, "a");
2890 }
2891
2892 #[test]
2893 fn test_lineage_with_schema_qualified_table_name() {
2894 let query = "SELECT a FROM raw.t1";
2895 let dialect = Dialect::get(DialectType::BigQuery);
2896 let expr = dialect
2897 .parse(query)
2898 .unwrap()
2899 .into_iter()
2900 .next()
2901 .expect("expected one expression");
2902
2903 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2904 schema
2905 .add_table(
2906 "raw.t1",
2907 &[("a".into(), DataType::BigInt { length: None })],
2908 None,
2909 )
2910 .expect("schema setup");
2911
2912 let node = lineage_with_schema(
2913 "a",
2914 &expr,
2915 Some(&schema),
2916 Some(DialectType::BigQuery),
2917 false,
2918 )
2919 .expect("lineage_with_schema should handle dotted schema.table names");
2920
2921 assert_eq!(node.name, "a");
2922 }
2923
2924 #[test]
2925 fn test_lineage_with_schema_none_matches_lineage() {
2926 let expr = parse("SELECT a FROM t");
2927 let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2928 let with_none =
2929 lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2930
2931 assert_eq!(with_none.name, baseline.name);
2932 assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2933 }
2934
2935 #[test]
2936 fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2937 let dialect = Dialect::get(DialectType::BigQuery);
2938 let expr = dialect
2939 .parse("SELECT Name AS name FROM teams")
2940 .unwrap()
2941 .into_iter()
2942 .next()
2943 .expect("expected one expression");
2944
2945 let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2946 schema
2947 .add_table(
2948 "teams",
2949 &[("Name".into(), DataType::String { length: None })],
2950 None,
2951 )
2952 .expect("schema setup");
2953
2954 let node = lineage_with_schema(
2955 "name",
2956 &expr,
2957 Some(&schema),
2958 Some(DialectType::BigQuery),
2959 false,
2960 )
2961 .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2962
2963 let names = node.downstream_names();
2964 assert!(
2965 names.iter().any(|n| n == "teams.Name"),
2966 "Expected teams.Name in downstream, got: {:?}",
2967 names
2968 );
2969 }
2970
2971 #[test]
2972 fn test_lineage_bigquery_mixed_case_alias_lookup() {
2973 let dialect = Dialect::get(DialectType::BigQuery);
2974 let expr = dialect
2975 .parse("SELECT Name AS Name FROM teams")
2976 .unwrap()
2977 .into_iter()
2978 .next()
2979 .expect("expected one expression");
2980
2981 let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2982 .expect("lineage should resolve mixed-case aliases in BigQuery");
2983
2984 assert_eq!(node.name, "name");
2985 }
2986
2987 #[test]
2988 fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2989 let expr = parse_one(
2990 r#"
2991SELECT date_val AS week_start
2992FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2993"#,
2994 DialectType::BigQuery,
2995 )
2996 .expect("parse");
2997
2998 let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2999 .expect("lineage should resolve UNNEST alias as a source");
3000 let child = node
3001 .downstream
3002 .first()
3003 .expect("week_start should have downstream lineage");
3004
3005 assert_eq!(child.name, "_0.date_val");
3006 assert_eq!(child.source_name, "_0");
3007 assert_eq!(child.source_kind, SourceKind::Virtual);
3008 assert_eq!(child.source_alias.as_deref(), Some("date_val"));
3009
3010 let Expression::Column(column) = &child.expression else {
3011 panic!(
3012 "expected downstream column expression, got {:?}",
3013 child.expression
3014 );
3015 };
3016 assert_eq!(column.name.name, "date_val");
3017 assert_eq!(
3018 column.table.as_ref().map(|table| table.name.as_str()),
3019 Some("_0")
3020 );
3021 assert!(
3022 matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
3023 "expected UNNEST source expression, got {:?}",
3024 child.source
3025 );
3026 }
3027
3028 #[test]
3029 fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
3030 let expr =
3031 parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
3032
3033 let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3034 let child = node.downstream.first().expect("id should have lineage");
3035
3036 assert_eq!(child.name, "date_val.id");
3037 assert_eq!(child.source_name, "date_val");
3038 assert_eq!(child.source_kind, SourceKind::Table);
3039 assert_eq!(child.source_alias, None);
3040 }
3041
3042 #[test]
3043 fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
3044 let expr = parse_one(
3045 r#"
3046SELECT a.a AS first_value, b.b AS second_value
3047FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
3048JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
3049"#,
3050 DialectType::BigQuery,
3051 )
3052 .expect("parse");
3053
3054 let first =
3055 lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3056 let second =
3057 lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3058
3059 let first_child = first.downstream.first().expect("first source");
3060 let second_child = second.downstream.first().expect("second source");
3061
3062 assert_eq!(first_child.name, "_0.a");
3063 assert_eq!(first_child.source_name, "_0");
3064 assert_eq!(first_child.source_alias.as_deref(), Some("a"));
3065 assert_eq!(first_child.source_kind, SourceKind::Virtual);
3066
3067 assert_eq!(second_child.name, "_1.b");
3068 assert_eq!(second_child.source_name, "_1");
3069 assert_eq!(second_child.source_alias.as_deref(), Some("b"));
3070 assert_eq!(second_child.source_kind, SourceKind::Virtual);
3071 }
3072
3073 #[test]
3074 fn test_lineage_table_backed_unnest_points_to_real_source_column() {
3075 let expr = parse_one(
3076 r#"
3077SELECT item.item AS item
3078FROM t JOIN UNNEST(t.items) AS item ON TRUE
3079"#,
3080 DialectType::BigQuery,
3081 )
3082 .expect("parse");
3083
3084 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3085 let virtual_child = node.downstream.first().expect("virtual item source");
3086 assert_eq!(virtual_child.name, "_0.item");
3087 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3088
3089 let real_child = virtual_child
3090 .downstream
3091 .first()
3092 .expect("UNNEST(t.items) should depend on t.items");
3093 assert_eq!(real_child.name, "t.items");
3094 assert_eq!(real_child.source_name, "t");
3095 assert_eq!(real_child.source_kind, SourceKind::Table);
3096 }
3097
3098 #[test]
3099 fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
3100 let expr = parse_one(
3101 r#"
3102SELECT item AS item
3103FROM t JOIN UNNEST(t.items) AS item ON TRUE
3104"#,
3105 DialectType::BigQuery,
3106 )
3107 .expect("parse");
3108
3109 let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3110 let virtual_child = node.downstream.first().expect("virtual item source");
3111 assert_eq!(virtual_child.name, "_0.item");
3112 assert_eq!(virtual_child.source_name, "_0");
3113 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3114 assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
3115
3116 let real_child = virtual_child
3117 .downstream
3118 .first()
3119 .expect("UNNEST(t.items) should depend on t.items");
3120 assert_eq!(real_child.name, "t.items");
3121 assert_eq!(real_child.source_name, "t");
3122 assert_eq!(real_child.source_kind, SourceKind::Table);
3123 }
3124
3125 #[test]
3126 fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
3127 let cases = [
3128 (
3129 DialectType::PostgreSQL,
3130 "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
3131 ),
3132 (
3133 DialectType::Presto,
3134 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3135 ),
3136 (
3137 DialectType::Trino,
3138 "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3139 ),
3140 ];
3141
3142 for (dialect, sql) in cases {
3143 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3144 let node = lineage("out", &expr, Some(dialect), false)
3145 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3146 let virtual_child = node
3147 .downstream
3148 .first()
3149 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3150
3151 assert_eq!(
3152 virtual_child.name, "_0.x",
3153 "unexpected virtual child for {dialect:?}"
3154 );
3155 assert_eq!(virtual_child.source_name, "_0");
3156 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3157 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3158
3159 let real_child = virtual_child
3160 .downstream
3161 .first()
3162 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3163 assert_eq!(real_child.name, "t.items");
3164 assert_eq!(real_child.source_kind, SourceKind::Table);
3165 }
3166 }
3167
3168 #[test]
3169 fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3170 let cases = [
3171 (
3172 DialectType::Spark,
3173 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3174 ),
3175 (
3176 DialectType::Hive,
3177 "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3178 ),
3179 ];
3180
3181 for (dialect, sql) in cases {
3182 let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3183 let node = lineage("out", &expr, Some(dialect), false)
3184 .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3185 let virtual_child = node
3186 .downstream
3187 .first()
3188 .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3189
3190 assert_eq!(virtual_child.name, "_0.x");
3191 assert_eq!(virtual_child.source_name, "_0");
3192 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3193 assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3194
3195 let real_child = virtual_child
3196 .downstream
3197 .first()
3198 .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3199 assert_eq!(real_child.name, "t.items");
3200 assert_eq!(real_child.source_kind, SourceKind::Table);
3201 }
3202 }
3203
3204 #[test]
3205 fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3206 let expr = parse_one(
3207 "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3208 DialectType::Snowflake,
3209 )
3210 .expect("parse");
3211
3212 let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3213 let virtual_child = node.downstream.first().expect("virtual flatten source");
3214 assert_eq!(virtual_child.name, "_0.value");
3215 assert_eq!(virtual_child.source_name, "_0");
3216 assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3217 assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3218
3219 let real_child = virtual_child
3220 .downstream
3221 .first()
3222 .expect("FLATTEN input should depend on raw_events.payload");
3223 assert_eq!(real_child.name, "raw_events.payload");
3224 assert_eq!(real_child.source_kind, SourceKind::Table);
3225 }
3226
3227 #[test]
3228 fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3229 let expr = parse_one(
3230 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3231 DialectType::Snowflake,
3232 )
3233 .expect("parse");
3234
3235 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3236 schema
3237 .add_table(
3238 "fact.some_daily_metrics",
3239 &[("date_utc".to_string(), DataType::Date)],
3240 None,
3241 )
3242 .expect("schema setup");
3243
3244 let node = lineage_with_schema(
3245 "recency",
3246 &expr,
3247 Some(&schema),
3248 Some(DialectType::Snowflake),
3249 false,
3250 )
3251 .expect("lineage_with_schema should not treat date part as a column");
3252
3253 let names = node.downstream_names();
3254 assert!(
3255 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3256 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3257 names
3258 );
3259 assert!(
3260 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3261 "Did not expect date part to appear as lineage column, got: {:?}",
3262 names
3263 );
3264 }
3265
3266 #[test]
3267 fn test_snowflake_datediff_parses_to_typed_ast() {
3268 let expr = parse_one(
3269 "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3270 DialectType::Snowflake,
3271 )
3272 .expect("parse");
3273
3274 match expr {
3275 Expression::Select(select) => match &select.expressions[0] {
3276 Expression::Alias(alias) => match &alias.this {
3277 Expression::DateDiff(f) => {
3278 assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3279 }
3280 other => panic!("expected DateDiff, got {other:?}"),
3281 },
3282 other => panic!("expected Alias, got {other:?}"),
3283 },
3284 other => panic!("expected Select, got {other:?}"),
3285 }
3286 }
3287
3288 #[test]
3289 fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3290 let expr = parse_one(
3291 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3292 DialectType::Snowflake,
3293 )
3294 .expect("parse");
3295
3296 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3297 schema
3298 .add_table(
3299 "fact.some_daily_metrics",
3300 &[("date_utc".to_string(), DataType::Date)],
3301 None,
3302 )
3303 .expect("schema setup");
3304
3305 let node = lineage_with_schema(
3306 "next_day",
3307 &expr,
3308 Some(&schema),
3309 Some(DialectType::Snowflake),
3310 false,
3311 )
3312 .expect("lineage_with_schema should not treat DATEADD date part as a column");
3313
3314 let names = node.downstream_names();
3315 assert!(
3316 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3317 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3318 names
3319 );
3320 assert!(
3321 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3322 "Did not expect date part to appear as lineage column, got: {:?}",
3323 names
3324 );
3325 }
3326
3327 #[test]
3328 fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3329 let expr = parse_one(
3330 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3331 DialectType::Snowflake,
3332 )
3333 .expect("parse");
3334
3335 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3336 schema
3337 .add_table(
3338 "fact.some_daily_metrics",
3339 &[("date_utc".to_string(), DataType::Date)],
3340 None,
3341 )
3342 .expect("schema setup");
3343
3344 let node = lineage_with_schema(
3345 "day_part",
3346 &expr,
3347 Some(&schema),
3348 Some(DialectType::Snowflake),
3349 false,
3350 )
3351 .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3352
3353 let names = node.downstream_names();
3354 assert!(
3355 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3356 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3357 names
3358 );
3359 assert!(
3360 !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3361 "Did not expect date part to appear as lineage column, got: {:?}",
3362 names
3363 );
3364 }
3365
3366 #[test]
3367 fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3368 let expr = parse_one(
3369 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3370 DialectType::Snowflake,
3371 )
3372 .expect("parse");
3373
3374 let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3375 schema
3376 .add_table(
3377 "fact.some_daily_metrics",
3378 &[("date_utc".to_string(), DataType::Date)],
3379 None,
3380 )
3381 .expect("schema setup");
3382
3383 let node = lineage_with_schema(
3384 "day_part",
3385 &expr,
3386 Some(&schema),
3387 Some(DialectType::Snowflake),
3388 false,
3389 )
3390 .expect("quoted DATE_PART should continue to work");
3391
3392 let names = node.downstream_names();
3393 assert!(
3394 names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3395 "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3396 names
3397 );
3398 }
3399
3400 #[test]
3401 fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
3402 let expr = parse_one(
3403 "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3404 DialectType::Snowflake,
3405 )
3406 .expect("parse");
3407
3408 match expr {
3409 Expression::Select(select) => match &select.expressions[0] {
3410 Expression::Alias(alias) => match &alias.this {
3411 Expression::Function(f) => {
3412 assert_eq!(f.name.to_uppercase(), "DATEADD");
3413 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3414 }
3415 other => panic!("expected generic DATEADD function, got {other:?}"),
3416 },
3417 other => panic!("expected Alias, got {other:?}"),
3418 },
3419 other => panic!("expected Select, got {other:?}"),
3420 }
3421 }
3422
3423 #[test]
3424 fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
3425 let expr = parse_one(
3426 "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3427 DialectType::Snowflake,
3428 )
3429 .expect("parse");
3430
3431 match expr {
3432 Expression::Select(select) => match &select.expressions[0] {
3433 Expression::Alias(alias) => match &alias.this {
3434 Expression::Function(f) => {
3435 assert_eq!(f.name.to_uppercase(), "DATE_PART");
3436 assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3437 }
3438 other => panic!("expected generic DATE_PART function, got {other:?}"),
3439 },
3440 other => panic!("expected Alias, got {other:?}"),
3441 },
3442 other => panic!("expected Select, got {other:?}"),
3443 }
3444 }
3445
3446 #[test]
3447 fn test_snowflake_date_part_string_literal_stays_generic_function() {
3448 let expr = parse_one(
3449 "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3450 DialectType::Snowflake,
3451 )
3452 .expect("parse");
3453
3454 match expr {
3455 Expression::Select(select) => match &select.expressions[0] {
3456 Expression::Alias(alias) => match &alias.this {
3457 Expression::Function(f) => {
3458 assert_eq!(f.name.to_uppercase(), "DATE_PART");
3459 }
3460 other => panic!("expected generic DATE_PART function, got {other:?}"),
3461 },
3462 other => panic!("expected Alias, got {other:?}"),
3463 },
3464 other => panic!("expected Select, got {other:?}"),
3465 }
3466 }
3467
3468 #[test]
3469 fn test_lineage_join() {
3470 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3471
3472 let node_a = lineage("a", &expr, None, false).unwrap();
3473 let names_a = node_a.downstream_names();
3474 assert!(
3475 names_a.iter().any(|n| n == "t.a"),
3476 "Expected t.a, got: {:?}",
3477 names_a
3478 );
3479
3480 let node_b = lineage("b", &expr, None, false).unwrap();
3481 let names_b = node_b.downstream_names();
3482 assert!(
3483 names_b.iter().any(|n| n == "s.b"),
3484 "Expected s.b, got: {:?}",
3485 names_b
3486 );
3487 }
3488
3489 #[test]
3490 fn test_lineage_alias_leaf_has_resolved_source_name() {
3491 let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3492 let node = lineage("col1", &expr, None, false).unwrap();
3493
3494 let names = node.downstream_names();
3496 assert!(
3497 names.iter().any(|n| n == "t1.col1"),
3498 "Expected aliased column edge t1.col1, got: {:?}",
3499 names
3500 );
3501
3502 let leaf = node
3504 .downstream
3505 .iter()
3506 .find(|n| n.name == "t1.col1")
3507 .expect("Expected t1.col1 leaf");
3508 assert_eq!(leaf.source_name, "table1");
3509 match &leaf.source {
3510 Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3511 _ => panic!("Expected leaf source to be a table expression"),
3512 }
3513 }
3514
3515 #[test]
3516 fn test_lineage_derived_table() {
3517 let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3518 let node = lineage("a", &expr, None, false).unwrap();
3519
3520 assert_eq!(node.name, "a");
3521 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3523 assert!(
3524 all_names.iter().any(|n| n == "t.a"),
3525 "Expected to trace through derived table to t.a, got: {:?}",
3526 all_names
3527 );
3528 }
3529
3530 #[test]
3531 fn test_lineage_cte() {
3532 let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3533 let node = lineage("a", &expr, None, false).unwrap();
3534
3535 assert_eq!(node.name, "a");
3536 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3537 assert!(
3538 all_names.iter().any(|n| n == "t.a"),
3539 "Expected to trace through CTE to t.a, got: {:?}",
3540 all_names
3541 );
3542 }
3543
3544 #[test]
3545 fn test_lineage_union() {
3546 let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3547 let node = lineage("a", &expr, None, false).unwrap();
3548
3549 assert_eq!(node.name, "a");
3550 assert_eq!(
3552 node.downstream.len(),
3553 2,
3554 "Expected 2 branches for UNION, got {}",
3555 node.downstream.len()
3556 );
3557 }
3558
3559 #[test]
3560 fn test_lineage_cte_union() {
3561 let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3562 let node = lineage("a", &expr, None, false).unwrap();
3563
3564 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3566 assert!(
3567 all_names.len() >= 3,
3568 "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3569 all_names
3570 );
3571 }
3572
3573 #[test]
3574 fn test_lineage_star() {
3575 let expr = parse("SELECT * FROM t");
3576 let node = lineage("*", &expr, None, false).unwrap();
3577
3578 assert_eq!(node.name, "*");
3579 assert!(
3581 !node.downstream.is_empty(),
3582 "Star should produce downstream nodes"
3583 );
3584 }
3585
3586 #[test]
3587 fn test_lineage_subquery_in_select() {
3588 let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3589 let node = lineage("x", &expr, None, false).unwrap();
3590
3591 assert_eq!(node.name, "x");
3592 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3594 assert!(
3595 all_names.len() >= 2,
3596 "Expected tracing into scalar subquery, got: {:?}",
3597 all_names
3598 );
3599 }
3600
3601 #[test]
3602 fn test_lineage_multiple_columns() {
3603 let expr = parse("SELECT a, b FROM t");
3604
3605 let node_a = lineage("a", &expr, None, false).unwrap();
3606 let node_b = lineage("b", &expr, None, false).unwrap();
3607
3608 assert_eq!(node_a.name, "a");
3609 assert_eq!(node_b.name, "b");
3610
3611 let names_a = node_a.downstream_names();
3613 let names_b = node_b.downstream_names();
3614 assert!(names_a.iter().any(|n| n == "t.a"));
3615 assert!(names_b.iter().any(|n| n == "t.b"));
3616 }
3617
3618 #[test]
3619 fn test_get_source_tables() {
3620 let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3621 let node = lineage("a", &expr, None, false).unwrap();
3622
3623 let tables = get_source_tables(&node);
3624 assert!(
3625 tables.contains("t"),
3626 "Expected source table 't', got: {:?}",
3627 tables
3628 );
3629 }
3630
3631 #[test]
3632 fn test_lineage_column_not_found() {
3633 let expr = parse("SELECT a FROM t");
3634 let result = lineage("nonexistent", &expr, None, false);
3635 assert!(result.is_err());
3636 }
3637
3638 #[test]
3639 fn test_lineage_nested_cte() {
3640 let expr = parse(
3641 "WITH cte1 AS (SELECT a FROM t), \
3642 cte2 AS (SELECT a FROM cte1) \
3643 SELECT a FROM cte2",
3644 );
3645 let node = lineage("a", &expr, None, false).unwrap();
3646
3647 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3649 assert!(
3650 all_names.len() >= 3,
3651 "Expected to trace through nested CTEs, got: {:?}",
3652 all_names
3653 );
3654 }
3655
3656 #[test]
3657 fn test_trim_selects_true() {
3658 let expr = parse("SELECT a, b, c FROM t");
3659 let node = lineage("a", &expr, None, true).unwrap();
3660
3661 if let Expression::Select(select) = &node.source {
3663 assert_eq!(
3664 select.expressions.len(),
3665 1,
3666 "Trimmed source should have 1 expression, got {}",
3667 select.expressions.len()
3668 );
3669 } else {
3670 panic!("Expected Select source");
3671 }
3672 }
3673
3674 #[test]
3675 fn test_trim_selects_false() {
3676 let expr = parse("SELECT a, b, c FROM t");
3677 let node = lineage("a", &expr, None, false).unwrap();
3678
3679 if let Expression::Select(select) = &node.source {
3681 assert_eq!(
3682 select.expressions.len(),
3683 3,
3684 "Untrimmed source should have 3 expressions"
3685 );
3686 } else {
3687 panic!("Expected Select source");
3688 }
3689 }
3690
3691 #[test]
3692 fn test_lineage_expression_in_select() {
3693 let expr = parse("SELECT a + b AS c FROM t");
3694 let node = lineage("c", &expr, None, false).unwrap();
3695
3696 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3698 assert!(
3699 all_names.len() >= 3,
3700 "Expected to trace a + b to both columns, got: {:?}",
3701 all_names
3702 );
3703 }
3704
3705 #[test]
3706 fn test_set_operation_by_index() {
3707 let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3708
3709 let node = lineage("a", &expr, None, false).unwrap();
3711
3712 assert_eq!(node.downstream.len(), 2);
3714 }
3715
3716 fn print_node(node: &LineageNode, indent: usize) {
3719 let pad = " ".repeat(indent);
3720 println!(
3721 "{pad}name={:?} source_name={:?}",
3722 node.name, node.source_name
3723 );
3724 for child in &node.downstream {
3725 print_node(child, indent + 1);
3726 }
3727 }
3728
3729 #[test]
3730 fn test_issue18_repro() {
3731 let query = "SELECT UPPER(name) as upper_name FROM users";
3733 println!("Query: {query}\n");
3734
3735 let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3736 let exprs = dialect.parse(query).unwrap();
3737 let expr = &exprs[0];
3738
3739 let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3740 println!("lineage(\"upper_name\"):");
3741 print_node(&node, 1);
3742
3743 let names = node.downstream_names();
3744 assert!(
3745 names.iter().any(|n| n == "users.name"),
3746 "Expected users.name in downstream, got: {:?}",
3747 names
3748 );
3749 }
3750
3751 #[test]
3752 fn test_lineage_bigquery_safe_namespace_issue207() {
3753 let query = r#"
3754WITH import_cte AS (
3755 SELECT timestamp, data, operation
3756 FROM `project`.`dataset`.`source_table`
3757),
3758transform_cte AS (
3759 SELECT
3760 timestamp,
3761 SAFE.PARSE_JSON(data) AS json_data
3762 FROM import_cte
3763)
3764SELECT json_data FROM transform_cte
3765"#;
3766 let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3767 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3768 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3769 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3770
3771 assert!(
3772 names.iter().any(|name| name == "source_table.data"),
3773 "expected source_table.data in lineage, got {names:?}"
3774 );
3775 assert!(
3776 !names
3777 .iter()
3778 .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3779 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3780 );
3781 }
3782
3783 #[test]
3784 fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3785 let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3786 let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3787 .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3788 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3789
3790 assert!(
3791 names.iter().any(|name| name == "t.data"),
3792 "expected t.data in lineage, got {names:?}"
3793 );
3794 assert!(
3795 !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3796 "did not expect SAFE namespace receiver in lineage, got {names:?}"
3797 );
3798 }
3799
3800 #[test]
3801 fn test_lineage_method_call_receiver_control() {
3802 let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3803 let node = lineage("out", &expr, None, false).expect("lineage");
3804 let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3805
3806 assert!(
3807 names.iter().any(|name| name == "t.obj"),
3808 "expected ordinary method receiver to remain in lineage, got {names:?}"
3809 );
3810 assert!(
3811 names.iter().any(|name| name == "t.arg"),
3812 "expected method argument in lineage, got {names:?}"
3813 );
3814 }
3815
3816 #[test]
3817 fn test_lineage_upper_function() {
3818 let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3819 let node = lineage("upper_name", &expr, None, false).unwrap();
3820
3821 let names = node.downstream_names();
3822 assert!(
3823 names.iter().any(|n| n == "users.name"),
3824 "Expected users.name in downstream, got: {:?}",
3825 names
3826 );
3827 }
3828
3829 #[test]
3830 fn test_lineage_round_function() {
3831 let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3832 let node = lineage("rounded", &expr, None, false).unwrap();
3833
3834 let names = node.downstream_names();
3835 assert!(
3836 names.iter().any(|n| n == "products.price"),
3837 "Expected products.price in downstream, got: {:?}",
3838 names
3839 );
3840 }
3841
3842 #[test]
3843 fn test_lineage_coalesce_function() {
3844 let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3845 let node = lineage("val", &expr, None, false).unwrap();
3846
3847 let names = node.downstream_names();
3848 assert!(
3849 names.iter().any(|n| n == "t.a"),
3850 "Expected t.a in downstream, got: {:?}",
3851 names
3852 );
3853 assert!(
3854 names.iter().any(|n| n == "t.b"),
3855 "Expected t.b in downstream, got: {:?}",
3856 names
3857 );
3858 }
3859
3860 #[test]
3861 fn test_lineage_count_function() {
3862 let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3863 let node = lineage("cnt", &expr, None, false).unwrap();
3864
3865 let names = node.downstream_names();
3866 assert!(
3867 names.iter().any(|n| n == "t.id"),
3868 "Expected t.id in downstream, got: {:?}",
3869 names
3870 );
3871 }
3872
3873 #[test]
3874 fn test_lineage_sum_function() {
3875 let expr = parse("SELECT SUM(amount) AS total FROM t");
3876 let node = lineage("total", &expr, None, false).unwrap();
3877
3878 let names = node.downstream_names();
3879 assert!(
3880 names.iter().any(|n| n == "t.amount"),
3881 "Expected t.amount in downstream, got: {:?}",
3882 names
3883 );
3884 }
3885
3886 #[test]
3887 fn test_lineage_case_with_nested_functions() {
3888 let expr =
3889 parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3890 let node = lineage("result", &expr, None, false).unwrap();
3891
3892 let names = node.downstream_names();
3893 assert!(
3894 names.iter().any(|n| n == "t.x"),
3895 "Expected t.x in downstream, got: {:?}",
3896 names
3897 );
3898 assert!(
3899 names.iter().any(|n| n == "t.name"),
3900 "Expected t.name in downstream, got: {:?}",
3901 names
3902 );
3903 }
3904
3905 #[test]
3906 fn test_lineage_substring_function() {
3907 let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3908 let node = lineage("short", &expr, None, false).unwrap();
3909
3910 let names = node.downstream_names();
3911 assert!(
3912 names.iter().any(|n| n == "t.name"),
3913 "Expected t.name in downstream, got: {:?}",
3914 names
3915 );
3916 }
3917
3918 #[test]
3921 fn test_lineage_cte_select_star() {
3922 let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3926 let node = lineage("a", &expr, None, false).unwrap();
3927
3928 assert_eq!(node.name, "a");
3929 assert!(
3932 !node.downstream.is_empty(),
3933 "Expected downstream nodes tracing through CTE, got none"
3934 );
3935 }
3936
3937 #[test]
3938 fn test_lineage_schema_less_cte_star_passthrough_resolves_base_column() {
3939 let expr = parse("WITH c AS (SELECT * FROM t) SELECT c.x FROM c");
3940 let node = lineage("x", &expr, None, false).unwrap();
3941
3942 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3943 assert!(
3944 all_names.iter().any(|name| name == "t.x"),
3945 "Expected schema-less CTE star passthrough to reach t.x, got: {:?}",
3946 all_names
3947 );
3948
3949 let cte_node = node
3950 .walk()
3951 .find(|child| child.source_kind == SourceKind::Cte && child.source_name == "c")
3952 .expect("expected CTE hop with source_name c");
3953 assert_eq!(cte_node.source_kind, SourceKind::Cte);
3954 assert_eq!(cte_node.source_name, "c");
3955 }
3956
3957 #[test]
3958 fn test_lineage_schema_less_cte_star_passthrough_with_aggregation() {
3959 let expr = parse(
3960 "WITH c AS (SELECT * FROM t) \
3961 SELECT SUM(c.x) AS s FROM c GROUP BY 1",
3962 );
3963 let node = lineage("s", &expr, None, false).unwrap();
3964
3965 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3966 assert!(
3967 all_names.iter().any(|name| name == "t.x"),
3968 "Expected aggregate over CTE star passthrough to reach t.x, got: {:?}",
3969 all_names
3970 );
3971 }
3972
3973 #[test]
3974 fn test_lineage_schema_less_cte_star_passthrough_with_join_and_alias() {
3975 let expr = parse(
3976 "WITH a AS (SELECT * FROM t1), b AS (SELECT * FROM t2) \
3977 SELECT SUM(b.x) AS s FROM a LEFT JOIN b ON b.id = a.id GROUP BY a.k",
3978 );
3979 let node = lineage("s", &expr, None, false).unwrap();
3980
3981 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3982 assert!(
3983 all_names.iter().any(|name| name == "t2.x"),
3984 "Expected joined CTE star passthrough to reach t2.x, got: {:?}",
3985 all_names
3986 );
3987 }
3988
3989 #[test]
3990 fn test_lineage_schema_less_chained_cte_star_passthrough() {
3991 let expr = parse(
3992 "WITH c1 AS (SELECT * FROM t), \
3993 c2 AS (SELECT * FROM c1), \
3994 c3 AS (SELECT * FROM c2) \
3995 SELECT c3.x FROM c3",
3996 );
3997 let node = lineage("x", &expr, None, false).unwrap();
3998
3999 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4000 assert!(
4001 all_names.iter().any(|name| name == "t.x"),
4002 "Expected chained CTE star passthrough to reach t.x, got: {:?}",
4003 all_names
4004 );
4005 }
4006
4007 #[test]
4008 fn test_lineage_schema_less_unqualified_star_with_multiple_sources_does_not_guess() {
4009 let expr = parse("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
4010 let result = lineage("x", &expr, None, false);
4011
4012 assert!(
4013 result.is_err(),
4014 "Unqualified star over multiple sources should remain ambiguous, got: {:?}",
4015 result
4016 );
4017 }
4018
4019 #[test]
4020 fn test_lineage_cte_select_star_renamed_column() {
4021 let expr =
4024 parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
4025 let node = lineage("customer_id", &expr, None, false).unwrap();
4026
4027 assert_eq!(node.name, "customer_id");
4028 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4030 assert!(
4031 all_names.len() >= 2,
4032 "Expected at least 2 nodes (customer_id → source), got: {:?}",
4033 all_names
4034 );
4035 }
4036
4037 #[test]
4038 fn test_lineage_cte_select_star_multiple_columns() {
4039 let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
4041
4042 for col in &["a", "b", "c"] {
4043 let node = lineage(col, &expr, None, false).unwrap();
4044 assert_eq!(node.name, *col);
4045 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4047 assert!(
4048 all_names.len() >= 2,
4049 "Expected at least 2 nodes for column {}, got: {:?}",
4050 col,
4051 all_names
4052 );
4053 }
4054 }
4055
4056 #[test]
4057 fn test_lineage_nested_cte_select_star() {
4058 let expr = parse(
4060 "WITH cte1 AS (SELECT a FROM t), \
4061 cte2 AS (SELECT * FROM cte1) \
4062 SELECT * FROM cte2",
4063 );
4064 let node = lineage("a", &expr, None, false).unwrap();
4065
4066 assert_eq!(node.name, "a");
4067 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4068 assert!(
4069 all_names.len() >= 3,
4070 "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
4071 all_names
4072 );
4073 }
4074
4075 #[test]
4076 fn test_lineage_three_level_nested_cte_star() {
4077 let expr = parse(
4079 "WITH cte1 AS (SELECT x FROM t), \
4080 cte2 AS (SELECT * FROM cte1), \
4081 cte3 AS (SELECT * FROM cte2) \
4082 SELECT * FROM cte3",
4083 );
4084 let node = lineage("x", &expr, None, false).unwrap();
4085
4086 assert_eq!(node.name, "x");
4087 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4088 assert!(
4089 all_names.len() >= 4,
4090 "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
4091 all_names
4092 );
4093 }
4094
4095 #[test]
4096 fn test_lineage_cte_union_star() {
4097 let expr = parse(
4099 "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
4100 SELECT * FROM cte",
4101 );
4102 let node = lineage("a", &expr, None, false).unwrap();
4103
4104 assert_eq!(node.name, "a");
4105 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4106 assert!(
4107 all_names.len() >= 2,
4108 "Expected at least 2 nodes for CTE union star, got: {:?}",
4109 all_names
4110 );
4111 }
4112
4113 #[test]
4114 fn test_lineage_cte_star_unknown_table() {
4115 let expr = parse(
4118 "WITH cte AS (SELECT * FROM unknown_table) \
4119 SELECT * FROM cte",
4120 );
4121 let _result = lineage("x", &expr, None, false);
4124 }
4125
4126 #[test]
4127 fn test_lineage_cte_explicit_columns() {
4128 let expr = parse(
4130 "WITH cte(x, y) AS (SELECT a, b FROM t) \
4131 SELECT * FROM cte",
4132 );
4133 let node = lineage("x", &expr, None, false).unwrap();
4134 assert_eq!(node.name, "x");
4135 }
4136
4137 #[test]
4138 fn test_lineage_cte_qualified_star() {
4139 let expr = parse(
4141 "WITH cte AS (SELECT a, b FROM t) \
4142 SELECT cte.* FROM cte",
4143 );
4144 for col in &["a", "b"] {
4145 let node = lineage(col, &expr, None, false).unwrap();
4146 assert_eq!(node.name, *col);
4147 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4148 assert!(
4149 all_names.len() >= 2,
4150 "Expected at least 2 nodes for qualified star column {}, got: {:?}",
4151 col,
4152 all_names
4153 );
4154 }
4155 }
4156
4157 #[test]
4158 fn test_lineage_subquery_select_star() {
4159 let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
4162 let node = lineage("x", &expr, None, false).unwrap();
4163
4164 assert_eq!(node.name, "x");
4165 assert!(
4166 !node.downstream.is_empty(),
4167 "Expected downstream nodes for subquery with SELECT *, got none"
4168 );
4169 }
4170
4171 #[test]
4172 fn test_lineage_cte_star_with_schema_external_table() {
4173 let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
4175SELECT * FROM orders"#;
4176 let expr = parse(sql);
4177
4178 let mut schema = MappingSchema::new();
4179 let cols = vec![
4180 ("order_id".to_string(), DataType::Unknown),
4181 ("customer_id".to_string(), DataType::Unknown),
4182 ("amount".to_string(), DataType::Unknown),
4183 ];
4184 schema.add_table("stg_orders", &cols, None).unwrap();
4185
4186 let node =
4187 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4188 .unwrap();
4189 assert_eq!(node.name, "order_id");
4190 }
4191
4192 #[test]
4193 fn test_lineage_cte_star_with_schema_three_part_name() {
4194 let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
4196SELECT * FROM orders"#;
4197 let expr = parse(sql);
4198
4199 let mut schema = MappingSchema::new();
4200 let cols = vec![
4201 ("order_id".to_string(), DataType::Unknown),
4202 ("customer_id".to_string(), DataType::Unknown),
4203 ];
4204 schema
4205 .add_table("db.schema.stg_orders", &cols, None)
4206 .unwrap();
4207
4208 let node = lineage_with_schema(
4209 "customer_id",
4210 &expr,
4211 Some(&schema as &dyn Schema),
4212 None,
4213 false,
4214 )
4215 .unwrap();
4216 assert_eq!(node.name, "customer_id");
4217 }
4218
4219 #[test]
4220 fn test_lineage_cte_star_with_schema_nested() {
4221 let sql = r#"WITH
4224 raw AS (SELECT * FROM external_table),
4225 enriched AS (SELECT * FROM raw)
4226 SELECT * FROM enriched"#;
4227 let expr = parse(sql);
4228
4229 let mut schema = MappingSchema::new();
4230 let cols = vec![
4231 ("id".to_string(), DataType::Unknown),
4232 ("name".to_string(), DataType::Unknown),
4233 ];
4234 schema.add_table("external_table", &cols, None).unwrap();
4235
4236 let node =
4237 lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4238 assert_eq!(node.name, "name");
4239 }
4240
4241 #[test]
4242 fn test_lineage_cte_qualified_star_with_schema() {
4243 let sql = r#"WITH
4246 orders AS (SELECT * FROM stg_orders),
4247 enriched AS (
4248 SELECT orders.*, 'extra' AS extra
4249 FROM orders
4250 )
4251 SELECT * FROM enriched"#;
4252 let expr = parse(sql);
4253
4254 let mut schema = MappingSchema::new();
4255 let cols = vec![
4256 ("order_id".to_string(), DataType::Unknown),
4257 ("total".to_string(), DataType::Unknown),
4258 ];
4259 schema.add_table("stg_orders", &cols, None).unwrap();
4260
4261 let node =
4262 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4263 .unwrap();
4264 assert_eq!(node.name, "order_id");
4265
4266 let extra =
4268 lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4269 assert_eq!(extra.name, "extra");
4270 }
4271
4272 #[test]
4273 fn test_lineage_cte_star_without_schema_still_works() {
4274 let sql = r#"WITH
4276 cte1 AS (SELECT id, name FROM raw_table),
4277 cte2 AS (SELECT * FROM cte1)
4278 SELECT * FROM cte2"#;
4279 let expr = parse(sql);
4280
4281 let node = lineage("id", &expr, None, false).unwrap();
4283 assert_eq!(node.name, "id");
4284 }
4285
4286 #[test]
4287 fn test_lineage_nested_cte_star_with_join_and_schema() {
4288 let sql = r#"WITH
4291base_orders AS (
4292 SELECT * FROM stg_orders
4293),
4294with_payments AS (
4295 SELECT
4296 base_orders.*,
4297 p.amount
4298 FROM base_orders
4299 LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4300),
4301final_cte AS (
4302 SELECT * FROM with_payments
4303)
4304SELECT * FROM final_cte"#;
4305 let expr = parse(sql);
4306
4307 let mut schema = MappingSchema::new();
4308 let order_cols = vec![
4309 (
4310 "order_id".to_string(),
4311 crate::expressions::DataType::Unknown,
4312 ),
4313 (
4314 "customer_id".to_string(),
4315 crate::expressions::DataType::Unknown,
4316 ),
4317 ("status".to_string(), crate::expressions::DataType::Unknown),
4318 ];
4319 let pay_cols = vec![
4320 (
4321 "payment_id".to_string(),
4322 crate::expressions::DataType::Unknown,
4323 ),
4324 (
4325 "order_id".to_string(),
4326 crate::expressions::DataType::Unknown,
4327 ),
4328 ("amount".to_string(), crate::expressions::DataType::Unknown),
4329 ];
4330 schema.add_table("stg_orders", &order_cols, None).unwrap();
4331 schema.add_table("stg_payments", &pay_cols, None).unwrap();
4332
4333 let node =
4335 lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4336 .unwrap();
4337 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4338
4339 let has_table_qualified = all_names
4341 .iter()
4342 .any(|n| n.contains('.') && n.contains("order_id"));
4343 assert!(
4344 has_table_qualified,
4345 "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4346 all_names
4347 );
4348
4349 let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4351 .unwrap();
4352 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4353
4354 let has_table_qualified = all_names
4355 .iter()
4356 .any(|n| n.contains('.') && n.contains("amount"));
4357 assert!(
4358 has_table_qualified,
4359 "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4360 all_names
4361 );
4362 }
4363
4364 #[test]
4365 fn test_lineage_cte_alias_resolution() {
4366 let sql = r#"WITH import_stg_items AS (
4368 SELECT item_id, name, status FROM stg_items
4369)
4370SELECT base.item_id, base.status
4371FROM import_stg_items AS base"#;
4372 let expr = parse(sql);
4373
4374 let node = lineage("item_id", &expr, None, false).unwrap();
4375 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4376 assert!(
4378 all_names.iter().any(|n| n == "stg_items.item_id"),
4379 "Expected leaf 'stg_items.item_id', got: {:?}",
4380 all_names
4381 );
4382 }
4383
4384 #[test]
4385 fn test_lineage_cte_alias_with_schema_and_star() {
4386 let sql = r#"WITH import_stg AS (
4388 SELECT * FROM stg_items
4389)
4390SELECT base.item_id, base.status
4391FROM import_stg AS base"#;
4392 let expr = parse(sql);
4393
4394 let mut schema = MappingSchema::new();
4395 schema
4396 .add_table(
4397 "stg_items",
4398 &[
4399 ("item_id".to_string(), DataType::Unknown),
4400 ("name".to_string(), DataType::Unknown),
4401 ("status".to_string(), DataType::Unknown),
4402 ],
4403 None,
4404 )
4405 .unwrap();
4406
4407 let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
4408 .unwrap();
4409 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4410 assert!(
4411 all_names.iter().any(|n| n == "stg_items.item_id"),
4412 "Expected leaf 'stg_items.item_id', got: {:?}",
4413 all_names
4414 );
4415 }
4416
4417 #[test]
4418 fn test_lineage_cte_alias_with_join() {
4419 let sql = r#"WITH
4421 import_users AS (SELECT id, name FROM users),
4422 import_orders AS (SELECT id, user_id, amount FROM orders)
4423SELECT u.name, o.amount
4424FROM import_users AS u
4425LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
4426 let expr = parse(sql);
4427
4428 let node = lineage("name", &expr, None, false).unwrap();
4429 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4430 assert!(
4431 all_names.iter().any(|n| n == "users.name"),
4432 "Expected leaf 'users.name', got: {:?}",
4433 all_names
4434 );
4435
4436 let node = lineage("amount", &expr, None, false).unwrap();
4437 let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4438 assert!(
4439 all_names.iter().any(|n| n == "orders.amount"),
4440 "Expected leaf 'orders.amount', got: {:?}",
4441 all_names
4442 );
4443 }
4444
4445 #[test]
4450 fn test_lineage_unquoted_cte_case_insensitive() {
4451 let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
4454 let node = lineage("col", &expr, None, false).unwrap();
4455 assert_eq!(node.name, "col");
4456 assert!(
4457 !node.downstream.is_empty(),
4458 "Unquoted CTE should resolve case-insensitively"
4459 );
4460 }
4461
4462 #[test]
4463 fn test_lineage_quoted_cte_case_preserved() {
4464 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
4466 let node = lineage("col", &expr, None, false).unwrap();
4467 assert_eq!(node.name, "col");
4468 assert!(
4469 !node.downstream.is_empty(),
4470 "Quoted CTE with matching case should resolve"
4471 );
4472 }
4473
4474 #[test]
4475 fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
4476 let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
4480 let result = lineage("col", &expr, None, false);
4483 assert!(
4484 result.is_err(),
4485 "Quoted CTE with case mismatch should not expand star: {:?}",
4486 result
4487 );
4488 }
4489
4490 #[test]
4491 fn test_lineage_mixed_quoted_unquoted_cte() {
4492 let expr = parse(
4494 r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
4495 );
4496 let node = lineage("a", &expr, None, false).unwrap();
4497 assert_eq!(node.name, "a");
4498 assert!(
4499 !node.downstream.is_empty(),
4500 "Mixed quoted/unquoted CTE chain should resolve"
4501 );
4502 }
4503
4504 #[test]
4520 fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
4521 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
4532 let node = lineage("col", &expr, None, false).unwrap();
4533 assert!(!node.downstream.is_empty());
4534 let child = &node.downstream[0];
4535 assert_eq!(
4537 child.source_name, "MyCte",
4538 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4539 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4540 );
4541 }
4542
4543 #[test]
4544 fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4545 let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4552 let node = lineage("col", &expr, None, false).unwrap();
4553 assert!(!node.downstream.is_empty());
4554 let child = &node.downstream[0];
4555 assert_eq!(
4557 child.source_name, "MyCte",
4558 "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4559 If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4560 );
4561 }
4562
4563 #[test]
4570 #[ignore = "requires derived table star expansion (separate issue)"]
4571 fn test_node_name_doesnt_contain_comment() {
4572 let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4573 let node = lineage("x", &expr, None, false).unwrap();
4574
4575 assert_eq!(node.name, "x");
4576 assert!(!node.downstream.is_empty());
4577 }
4578
4579 #[test]
4583 fn test_comment_before_first_column_in_cte() {
4584 let sql_with_comment = "with t as (select 1 as a) select\n -- comment\n a from t";
4585 let sql_without_comment = "with t as (select 1 as a) select a from t";
4586
4587 let expr_ok = parse(sql_without_comment);
4589 let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4590
4591 let expr_comment = parse(sql_with_comment);
4593 let node_comment = lineage("a", &expr_comment, None, false)
4594 .expect("with comment before first column should succeed");
4595
4596 assert_eq!(node_ok.name, node_comment.name, "node names should match");
4597 assert_eq!(
4598 node_ok.downstream_names(),
4599 node_comment.downstream_names(),
4600 "downstream lineage should be identical with or without comment"
4601 );
4602 }
4603
4604 #[test]
4606 fn test_block_comment_before_first_column() {
4607 let sql = "with t as (select 1 as a) select /* section */ a from t";
4608 let expr = parse(sql);
4609 let node = lineage("a", &expr, None, false)
4610 .expect("block comment before first column should succeed");
4611 assert_eq!(node.name, "a");
4612 assert!(
4613 !node.downstream.is_empty(),
4614 "should have downstream lineage"
4615 );
4616 }
4617
4618 #[test]
4620 fn test_comment_before_first_column_second_col_ok() {
4621 let sql = "with t as (select 1 as a, 2 as b) select\n -- comment\n a, b from t";
4622 let expr = parse(sql);
4623
4624 let node_a =
4625 lineage("a", &expr, None, false).expect("column a with comment should succeed");
4626 assert_eq!(node_a.name, "a");
4627
4628 let node_b =
4629 lineage("b", &expr, None, false).expect("column b with comment should succeed");
4630 assert_eq!(node_b.name, "b");
4631 }
4632
4633 #[test]
4635 fn test_comment_before_aliased_column() {
4636 let sql = "with t as (select 1 as x) select\n -- renamed\n x as y from t";
4637 let expr = parse(sql);
4638 let node =
4639 lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4640 assert_eq!(node.name, "y");
4641 assert!(
4642 !node.downstream.is_empty(),
4643 "aliased column should have downstream lineage"
4644 );
4645 }
4646}