1use std::collections::{HashMap, HashSet};
27
28use crate::ast::*;
29use crate::dialects::Dialect;
30use crate::errors::SqlglotError;
31use crate::schema::{MappingSchema, Schema};
32
33#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum LineageError {
40 ColumnNotFound(String),
42 AmbiguousColumn(String),
44 InvalidQuery(String),
46 ParseError(String),
48}
49
50impl std::fmt::Display for LineageError {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 LineageError::ColumnNotFound(c) => write!(f, "Column not found in output: {c}"),
54 LineageError::AmbiguousColumn(c) => write!(f, "Ambiguous column reference: {c}"),
55 LineageError::InvalidQuery(msg) => write!(f, "Invalid query for lineage: {msg}"),
56 LineageError::ParseError(msg) => write!(f, "Parse error: {msg}"),
57 }
58 }
59}
60
61impl std::error::Error for LineageError {}
62
63impl From<LineageError> for SqlglotError {
64 fn from(e: LineageError) -> Self {
65 SqlglotError::Internal(e.to_string())
66 }
67}
68
69pub type LineageResult<T> = std::result::Result<T, LineageError>;
71
72#[derive(Debug, Clone)]
78pub struct LineageConfig {
79 pub dialect: Dialect,
81 pub trim_qualifiers: bool,
83 pub sources: HashMap<String, String>,
86}
87
88impl Default for LineageConfig {
89 fn default() -> Self {
90 Self {
91 dialect: Dialect::Ansi,
92 trim_qualifiers: true,
93 sources: HashMap::new(),
94 }
95 }
96}
97
98impl LineageConfig {
99 #[must_use]
101 pub fn new(dialect: Dialect) -> Self {
102 Self {
103 dialect,
104 ..Default::default()
105 }
106 }
107
108 #[must_use]
110 pub fn with_sources(mut self, sources: HashMap<String, String>) -> Self {
111 self.sources = sources;
112 self
113 }
114
115 #[must_use]
117 pub fn with_trim_qualifiers(mut self, trim: bool) -> Self {
118 self.trim_qualifiers = trim;
119 self
120 }
121}
122
123#[derive(Debug, Clone)]
129pub struct LineageNode {
130 pub name: String,
132 pub expression: Option<Expr>,
134 pub source_name: Option<String>,
136 pub source: Option<Expr>,
138 pub downstream: Vec<LineageNode>,
140 pub alias: Option<String>,
142 pub depth: usize,
144}
145
146impl LineageNode {
147 #[must_use]
149 pub fn new(name: String) -> Self {
150 Self {
151 name,
152 expression: None,
153 source_name: None,
154 source: None,
155 downstream: Vec::new(),
156 alias: None,
157 depth: 0,
158 }
159 }
160
161 #[must_use]
163 pub fn with_source(mut self, source_name: String) -> Self {
164 self.source_name = Some(source_name);
165 self
166 }
167
168 #[must_use]
170 pub fn with_expression(mut self, expr: Expr) -> Self {
171 self.expression = Some(expr);
172 self
173 }
174
175 #[must_use]
177 #[allow(dead_code)]
178 pub fn with_alias(mut self, alias: String) -> Self {
179 self.alias = Some(alias);
180 self
181 }
182
183 #[must_use]
185 pub fn with_depth(mut self, depth: usize) -> Self {
186 self.depth = depth;
187 self
188 }
189
190 #[allow(dead_code)]
192 pub fn add_downstream(&mut self, node: LineageNode) {
193 self.downstream.push(node);
194 }
195
196 pub fn walk<F>(&self, visitor: &mut F)
198 where
199 F: FnMut(&LineageNode),
200 {
201 visitor(self);
202 for child in &self.downstream {
203 child.walk(visitor);
204 }
205 }
206
207 #[must_use]
209 pub fn iter(&self) -> LineageIterator<'_> {
210 LineageIterator {
211 stack: vec![self],
212 }
213 }
214
215 #[must_use]
217 #[allow(dead_code)]
218 pub fn source_columns(&self) -> Vec<&LineageNode> {
219 self.iter().filter(|n| n.downstream.is_empty()).collect()
220 }
221
222 #[must_use]
224 pub fn source_tables(&self) -> Vec<String> {
225 let mut tables = HashSet::new();
226 for node in self.iter() {
227 if let Some(ref source) = node.source_name {
228 tables.insert(source.clone());
229 }
230 }
231 tables.into_iter().collect()
232 }
233
234 #[must_use]
236 pub fn to_dot(&self) -> String {
237 let mut dot = String::from("digraph lineage {\n");
238 dot.push_str(" rankdir=BT;\n");
239 dot.push_str(" node [shape=box];\n");
240
241 let mut node_id = 0;
242 let mut node_ids = HashMap::new();
243
244 self.walk(&mut |node| {
246 let id = format!("n{}", node_id);
247 let label = if let Some(ref src) = node.source_name {
248 format!("{}.{}", src, node.name)
249 } else {
250 node.name.clone()
251 };
252 dot.push_str(&format!(" {} [label=\"{}\"];\n", id, escape_dot(&label)));
253 node_ids.insert(node as *const _ as usize, id);
254 node_id += 1;
255 });
256
257 self.walk(&mut |node| {
259 let parent_id = node_ids.get(&(node as *const _ as usize)).unwrap();
260 for child in &node.downstream {
261 let child_id = node_ids.get(&(child as *const _ as usize)).unwrap();
262 dot.push_str(&format!(" {} -> {};\n", child_id, parent_id));
263 }
264 });
265
266 dot.push_str("}\n");
267 dot
268 }
269
270 #[must_use]
272 pub fn to_mermaid(&self) -> String {
273 let mut mermaid = String::from("flowchart BT\n");
274
275 let mut node_id = 0;
276 let mut node_ids = HashMap::new();
277
278 self.walk(&mut |node| {
280 let id = format!("n{}", node_id);
281 let label = if let Some(ref src) = node.source_name {
282 format!("{}.{}", src, node.name)
283 } else {
284 node.name.clone()
285 };
286 mermaid.push_str(&format!(" {}[\"{}\"]\n", id, escape_mermaid(&label)));
287 node_ids.insert(node as *const _ as usize, id);
288 node_id += 1;
289 });
290
291 self.walk(&mut |node| {
293 let parent_id = node_ids.get(&(node as *const _ as usize)).unwrap();
294 for child in &node.downstream {
295 let child_id = node_ids.get(&(child as *const _ as usize)).unwrap();
296 mermaid.push_str(&format!(" {} --> {}\n", child_id, parent_id));
297 }
298 });
299
300 mermaid
301 }
302}
303
304pub struct LineageIterator<'a> {
306 stack: Vec<&'a LineageNode>,
307}
308
309impl<'a> Iterator for LineageIterator<'a> {
310 type Item = &'a LineageNode;
311
312 fn next(&mut self) -> Option<Self::Item> {
313 self.stack.pop().map(|node| {
314 for child in node.downstream.iter().rev() {
316 self.stack.push(child);
317 }
318 node
319 })
320 }
321}
322
323#[derive(Debug, Clone)]
329pub struct LineageGraph {
330 pub node: LineageNode,
332 pub sql: Option<String>,
334 pub dialect: Dialect,
336}
337
338impl LineageGraph {
339 #[must_use]
341 pub fn new(node: LineageNode, dialect: Dialect) -> Self {
342 Self {
343 node,
344 sql: None,
345 dialect,
346 }
347 }
348
349 #[must_use]
351 #[allow(dead_code)]
352 pub fn with_sql(mut self, sql: String) -> Self {
353 self.sql = Some(sql);
354 self
355 }
356
357 #[must_use]
359 pub fn source_tables(&self) -> Vec<String> {
360 self.node.source_tables()
361 }
362
363 #[must_use]
365 #[allow(dead_code)]
366 pub fn source_columns(&self) -> Vec<&LineageNode> {
367 self.node.source_columns()
368 }
369
370 #[allow(dead_code)]
372 pub fn walk<F>(&self, visitor: &mut F)
373 where
374 F: FnMut(&LineageNode),
375 {
376 self.node.walk(visitor);
377 }
378
379 #[must_use]
381 pub fn to_dot(&self) -> String {
382 self.node.to_dot()
383 }
384
385 #[must_use]
387 pub fn to_mermaid(&self) -> String {
388 self.node.to_mermaid()
389 }
390}
391
392struct LineageContext {
398 schema: MappingSchema,
400 config: LineageConfig,
402 depth: usize,
404 ctes: HashMap<String, Statement>,
406 sources: HashMap<String, SourceInfo>,
408 external_sources: HashMap<String, Statement>,
410 visiting: HashSet<String>,
412}
413
414#[derive(Debug, Clone)]
416struct SourceInfo {
417 kind: SourceKind,
419 columns: Option<Vec<SelectItem>>,
421 statement: Option<Statement>,
423}
424
425#[derive(Debug, Clone, Copy, PartialEq, Eq)]
426#[allow(dead_code)]
427enum SourceKind {
428 Table,
429 Cte,
430 DerivedTable,
431 External,
432}
433
434impl LineageContext {
435 fn new(schema: &MappingSchema, config: &LineageConfig) -> Self {
436 Self {
437 schema: schema.clone(),
438 config: config.clone(),
439 depth: 0,
440 ctes: HashMap::new(),
441 sources: HashMap::new(),
442 external_sources: HashMap::new(),
443 visiting: HashSet::new(),
444 }
445 }
446
447 fn with_depth(&self, depth: usize) -> Self {
448 Self {
449 schema: self.schema.clone(),
450 config: self.config.clone(),
451 depth,
452 ctes: self.ctes.clone(),
453 sources: self.sources.clone(),
454 external_sources: self.external_sources.clone(),
455 visiting: self.visiting.clone(),
456 }
457 }
458
459 #[allow(dead_code)]
460 fn resolve_source(&self, name: &str) -> Option<&SourceInfo> {
461 let normalized = normalize_name(name, self.config.dialect);
462 self.sources.get(&normalized)
463 }
464}
465
466pub fn lineage(
504 column: &str,
505 statement: &Statement,
506 schema: &MappingSchema,
507 config: &LineageConfig,
508) -> LineageResult<LineageGraph> {
509 let mut ctx = LineageContext::new(schema, config);
511
512 for (name, sql) in &config.sources {
513 match crate::parser::parse(sql, config.dialect) {
514 Ok(stmt) => {
515 ctx.external_sources.insert(normalize_name(name, config.dialect), stmt);
516 }
517 Err(e) => return Err(LineageError::ParseError(e.to_string())),
518 }
519 }
520
521 let node = build_lineage_for_column(column, statement, &mut ctx)?;
523
524 Ok(LineageGraph::new(node, config.dialect))
525}
526
527pub fn lineage_sql(
545 column: &str,
546 sql: &str,
547 schema: &MappingSchema,
548 config: &LineageConfig,
549) -> LineageResult<LineageGraph> {
550 let statement = crate::parser::parse(sql, config.dialect)
551 .map_err(|e| LineageError::ParseError(e.to_string()))?;
552
553 let mut graph = lineage(column, &statement, schema, config)?;
554 graph.sql = Some(sql.to_string());
555 Ok(graph)
556}
557
558fn build_lineage_for_column(
564 column: &str,
565 statement: &Statement,
566 ctx: &mut LineageContext,
567) -> LineageResult<LineageNode> {
568 match statement {
569 Statement::Select(sel) => build_lineage_for_select_column(column, sel, ctx),
570 Statement::SetOperation(set_op) => build_lineage_for_set_operation(column, set_op, ctx),
571 Statement::CreateView(cv) => build_lineage_for_column(column, &cv.query, ctx),
572 _ => Err(LineageError::InvalidQuery(
573 "Lineage analysis requires a SELECT or set operation statement".to_string(),
574 )),
575 }
576}
577
578fn build_lineage_for_select_column(
580 column: &str,
581 sel: &SelectStatement,
582 ctx: &mut LineageContext,
583) -> LineageResult<LineageNode> {
584 for cte in &sel.ctes {
586 let cte_name = normalize_name(&cte.name, ctx.config.dialect);
587 ctx.ctes.insert(cte_name.clone(), (*cte.query).clone());
588 ctx.sources.insert(
589 cte_name,
590 SourceInfo {
591 kind: SourceKind::Cte,
592 columns: extract_select_columns(&cte.query),
593 statement: Some((*cte.query).clone()),
594 },
595 );
596 }
597
598 if let Some(from) = &sel.from {
600 register_table_source(&from.source, ctx);
601 }
602
603 for join in &sel.joins {
605 register_table_source(&join.table, ctx);
606 }
607
608 let (col_name, table_qual) = parse_column_ref(column);
610
611 for item in &sel.columns {
612 match item {
613 SelectItem::Expr { expr, alias } => {
614 let item_name = alias.as_ref().map(String::as_str).unwrap_or_else(|| {
615 expr_output_name(expr)
616 });
617
618 if matches_column_name(item_name, &col_name) {
619 return build_lineage_for_expr(expr, alias.clone(), ctx);
620 }
621 }
622 SelectItem::Wildcard => {
623 for (source_name, source_info) in ctx.sources.clone() {
625 if let Some(cols) = &source_info.columns {
626 for col_item in cols {
627 if let SelectItem::Expr { expr, alias } = col_item {
628 let item_name = alias.as_ref().map(String::as_str).unwrap_or_else(|| {
629 expr_output_name(expr)
630 });
631 if matches_column_name(item_name, &col_name) {
632 return build_lineage_for_expr(expr, alias.clone(), ctx);
633 }
634 }
635 }
636 } else if source_info.kind == SourceKind::Table {
637 if let Ok(schema_cols) = ctx.schema.column_names(&[&source_name]) {
639 if schema_cols.iter().any(|c| matches_column_name(c, &col_name)) {
640 let mut node = LineageNode::new(col_name.clone())
642 .with_source(source_name.clone())
643 .with_depth(ctx.depth);
644 node.expression = Some(Expr::Column {
645 table: Some(source_name.clone()),
646 name: col_name.clone(),
647 quote_style: QuoteStyle::None,
648 table_quote_style: QuoteStyle::None,
649 });
650 return Ok(node);
651 }
652 }
653 }
654 }
655 }
656 SelectItem::QualifiedWildcard { table } => {
657 if table_qual.as_ref().is_some_and(|t| matches_column_name(t, table)) {
658 if let Some(source_info) = ctx.sources.get(table).cloned() {
660 if let Some(cols) = &source_info.columns {
661 for col_item in cols {
662 if let SelectItem::Expr { expr, alias } = col_item {
663 let item_name = alias.as_ref().map(String::as_str).unwrap_or_else(|| {
664 expr_output_name(expr)
665 });
666 if matches_column_name(item_name, &col_name) {
667 return build_lineage_for_expr(expr, alias.clone(), ctx);
668 }
669 }
670 }
671 }
672 }
673 }
674 }
675 }
676 }
677
678 Err(LineageError::ColumnNotFound(column.to_string()))
679}
680
681fn build_lineage_for_set_operation(
683 column: &str,
684 set_op: &SetOperationStatement,
685 ctx: &mut LineageContext,
686) -> LineageResult<LineageNode> {
687 let mut root = LineageNode::new(column.to_string()).with_depth(ctx.depth);
688
689 let mut child_ctx = ctx.with_depth(ctx.depth + 1);
691
692 let left_lineage = build_lineage_for_column(column, &set_op.left, &mut child_ctx)?;
693 let right_lineage = build_lineage_for_column(column, &set_op.right, &mut child_ctx)?;
694
695 root.downstream.push(left_lineage);
696 root.downstream.push(right_lineage);
697
698 Ok(root)
699}
700
701fn build_lineage_for_expr(
703 expr: &Expr,
704 alias: Option<String>,
705 ctx: &mut LineageContext,
706) -> LineageResult<LineageNode> {
707 let name = alias.clone().unwrap_or_else(|| expr_to_name(expr, ctx.config.trim_qualifiers));
708 let mut node = LineageNode::new(name.clone())
709 .with_expression(expr.clone())
710 .with_depth(ctx.depth);
711
712 if let Some(a) = alias {
713 node.alias = Some(a);
714 }
715
716 let columns = collect_expr_columns(expr);
718
719 let mut child_ctx = ctx.with_depth(ctx.depth + 1);
720
721 for col_ref in columns {
722 let child_node = resolve_column_lineage(&col_ref, &mut child_ctx)?;
723 node.downstream.push(child_node);
724 }
725
726 Ok(node)
727}
728
729fn resolve_column_lineage(
731 col: &ColumnReference,
732 ctx: &mut LineageContext,
733) -> LineageResult<LineageNode> {
734 let name = if ctx.config.trim_qualifiers {
735 col.name.clone()
736 } else {
737 col.qualified_name()
738 };
739
740 if let Some(ref table) = col.table {
742 let normalized_table = normalize_name(table, ctx.config.dialect);
743
744 if let Some(source_info) = ctx.sources.get(&normalized_table).cloned() {
745 match source_info.kind {
746 SourceKind::Table => {
747 let node = LineageNode::new(name)
749 .with_source(normalized_table)
750 .with_depth(ctx.depth);
751 return Ok(node);
752 }
753 SourceKind::Cte | SourceKind::DerivedTable => {
754 if !ctx.visiting.contains(&normalized_table) {
756 if let Some(stmt) = source_info.statement {
757 ctx.visiting.insert(normalized_table.clone());
758 let result = build_lineage_for_column(&col.name, &stmt, ctx);
759 ctx.visiting.remove(&normalized_table);
760 return result;
761 }
762 }
763 let node = LineageNode::new(name)
765 .with_source(normalized_table)
766 .with_depth(ctx.depth);
767 return Ok(node);
768 }
769 SourceKind::External => {
770 if let Some(stmt) = ctx.external_sources.get(&normalized_table).cloned() {
772 return build_lineage_for_column(&col.name, &stmt, ctx);
773 }
774 }
775 }
776 }
777 }
778
779 for (source_name, source_info) in ctx.sources.clone() {
781 match source_info.kind {
782 SourceKind::Table => {
783 if ctx.schema.has_column(&[&source_name], &col.name) {
785 let node = LineageNode::new(name)
786 .with_source(source_name.clone())
787 .with_depth(ctx.depth);
788 return Ok(node);
789 }
790 }
791 SourceKind::Cte | SourceKind::DerivedTable => {
792 if ctx.visiting.contains(&source_name) {
794 continue;
795 }
796 if let Some(ref columns) = source_info.columns {
798 if columns.iter().any(|c| select_item_has_column(c, &col.name)) {
799 if let Some(stmt) = source_info.statement {
800 ctx.visiting.insert(source_name.clone());
801 let result = build_lineage_for_column(&col.name, &stmt, ctx);
802 ctx.visiting.remove(&source_name);
803 return result;
804 }
805 }
806 }
807 }
808 SourceKind::External => {}
809 }
810 }
811
812 let node = LineageNode::new(name).with_depth(ctx.depth);
814 Ok(node)
815}
816
817fn register_table_source(source: &TableSource, ctx: &mut LineageContext) {
819 match source {
820 TableSource::Table(table_ref) => {
821 let key = table_ref.alias.as_ref().unwrap_or(&table_ref.name).clone();
822 let normalized = normalize_name(&key, ctx.config.dialect);
823 if !ctx.sources.contains_key(&normalized) {
825 ctx.sources.insert(
826 normalized,
827 SourceInfo {
828 kind: SourceKind::Table,
829 columns: None,
830 statement: None,
831 },
832 );
833 }
834 }
835 TableSource::Subquery { query, alias } => {
836 if let Some(alias) = alias {
837 let normalized = normalize_name(alias, ctx.config.dialect);
838 ctx.sources.insert(
839 normalized,
840 SourceInfo {
841 kind: SourceKind::DerivedTable,
842 columns: extract_select_columns(query),
843 statement: Some((**query).clone()),
844 },
845 );
846 }
847 }
848 TableSource::Lateral { source } => {
849 register_table_source(source, ctx);
850 }
851 TableSource::Pivot { source, alias, .. } | TableSource::Unpivot { source, alias, .. } => {
852 register_table_source(source, ctx);
853 if let Some(alias) = alias {
855 let normalized = normalize_name(alias, ctx.config.dialect);
856 ctx.sources.insert(
857 normalized,
858 SourceInfo {
859 kind: SourceKind::DerivedTable,
860 columns: None,
861 statement: None,
862 },
863 );
864 }
865 }
866 TableSource::TableFunction { alias, .. } => {
867 if let Some(alias) = alias {
868 let normalized = normalize_name(alias, ctx.config.dialect);
869 ctx.sources.insert(
870 normalized,
871 SourceInfo {
872 kind: SourceKind::Table,
873 columns: None,
874 statement: None,
875 },
876 );
877 }
878 }
879 TableSource::Unnest { alias, .. } => {
880 if let Some(alias) = alias {
881 let normalized = normalize_name(alias, ctx.config.dialect);
882 ctx.sources.insert(
883 normalized,
884 SourceInfo {
885 kind: SourceKind::Table,
886 columns: None,
887 statement: None,
888 },
889 );
890 }
891 }
892 }
893}
894
895#[derive(Debug, Clone)]
901struct ColumnReference {
902 table: Option<String>,
903 name: String,
904}
905
906impl ColumnReference {
907 fn qualified_name(&self) -> String {
908 if let Some(ref table) = self.table {
909 format!("{}.{}", table, self.name)
910 } else {
911 self.name.clone()
912 }
913 }
914}
915
916fn collect_expr_columns(expr: &Expr) -> Vec<ColumnReference> {
918 let mut columns = Vec::new();
919
920 expr.walk(&mut |e| {
921 if let Expr::Column { table, name, .. } = e {
922 columns.push(ColumnReference {
923 table: table.clone(),
924 name: name.clone(),
925 });
926 return false; }
928 !matches!(e, Expr::Subquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. })
930 });
931
932 columns
933}
934
935fn extract_select_columns(stmt: &Statement) -> Option<Vec<SelectItem>> {
937 match stmt {
938 Statement::Select(sel) => Some(sel.columns.clone()),
939 Statement::SetOperation(set_op) => extract_select_columns(&set_op.left),
940 Statement::CreateView(cv) => extract_select_columns(&cv.query),
941 _ => None,
942 }
943}
944
945fn expr_output_name(expr: &Expr) -> &str {
947 match expr {
948 Expr::Column { name, .. } => name,
949 Expr::Alias { name, .. } => name,
950 _ => "",
951 }
952}
953
954fn expr_to_name(expr: &Expr, trim_qualifiers: bool) -> String {
956 match expr {
957 Expr::Column { table, name, .. } => {
958 if trim_qualifiers {
959 name.clone()
960 } else if let Some(t) = table {
961 format!("{}.{}", t, name)
962 } else {
963 name.clone()
964 }
965 }
966 Expr::Alias { name, .. } => name.clone(),
967 Expr::Function { name, .. } => format!("{}(...)", name),
968 Expr::BinaryOp { op, .. } => format!("({:?})", op),
969 Expr::Cast { data_type, .. } => format!("CAST AS {:?}", data_type),
970 _ => "expr".to_string(),
971 }
972}
973
974fn parse_column_ref(column: &str) -> (String, Option<String>) {
976 if let Some(idx) = column.rfind('.') {
977 let table = column[..idx].to_string();
978 let name = column[idx + 1..].to_string();
979 (name, Some(table))
980 } else {
981 (column.to_string(), None)
982 }
983}
984
985fn matches_column_name(item: &str, target: &str) -> bool {
987 item.eq_ignore_ascii_case(target)
988}
989
990fn normalize_name(name: &str, dialect: Dialect) -> String {
992 crate::schema::normalize_identifier(name, dialect)
993}
994
995fn select_item_has_column(item: &SelectItem, name: &str) -> bool {
997 match item {
998 SelectItem::Expr { expr, alias } => {
999 let item_name = alias.as_ref().map(String::as_str).unwrap_or_else(|| {
1000 expr_output_name(expr)
1001 });
1002 matches_column_name(item_name, name)
1003 }
1004 SelectItem::Wildcard => true, SelectItem::QualifiedWildcard { .. } => true,
1006 }
1007}
1008
1009fn escape_dot(s: &str) -> String {
1011 s.replace('\\', "\\\\")
1012 .replace('"', "\\\"")
1013 .replace('\n', "\\n")
1014}
1015
1016fn escape_mermaid(s: &str) -> String {
1018 s.replace('"', "'")
1019 .replace('\n', " ")
1020 .replace('[', "(")
1021 .replace(']', ")")
1022}
1023
1024#[cfg(test)]
1029mod tests {
1030 use super::*;
1031 use crate::parser::parse;
1032
1033 fn test_config() -> LineageConfig {
1034 LineageConfig::new(Dialect::Ansi)
1035 }
1036
1037 fn test_schema() -> MappingSchema {
1038 let mut schema = MappingSchema::new(Dialect::Ansi);
1039 schema.add_table(
1040 &["t"],
1041 vec![
1042 ("a".to_string(), DataType::Int),
1043 ("b".to_string(), DataType::Int),
1044 ("c".to_string(), DataType::Int),
1045 ],
1046 ).unwrap();
1047 schema.add_table(
1048 &["users"],
1049 vec![
1050 ("id".to_string(), DataType::Int),
1051 ("name".to_string(), DataType::Varchar(Some(255))),
1052 ("email".to_string(), DataType::Text),
1053 ],
1054 ).unwrap();
1055 schema.add_table(
1056 &["orders"],
1057 vec![
1058 ("id".to_string(), DataType::Int),
1059 ("user_id".to_string(), DataType::Int),
1060 ("amount".to_string(), DataType::Decimal { precision: Some(10), scale: Some(2) }),
1061 ],
1062 ).unwrap();
1063 schema
1064 }
1065
1066 #[test]
1067 fn test_simple_column_lineage() {
1068 let sql = "SELECT a FROM t";
1069 let ast = parse(sql, Dialect::Ansi).unwrap();
1070 let schema = test_schema();
1071 let config = test_config();
1072
1073 let graph = lineage("a", &ast, &schema, &config).unwrap();
1074
1075 assert_eq!(graph.node.name, "a");
1076 assert_eq!(graph.node.depth, 0);
1077 assert_eq!(graph.node.downstream.len(), 1);
1079 assert_eq!(graph.node.downstream[0].source_name, Some("t".to_string()));
1080 }
1081
1082 #[test]
1083 fn test_aliased_column_lineage() {
1084 let sql = "SELECT a AS col_a FROM t";
1085 let ast = parse(sql, Dialect::Ansi).unwrap();
1086 let schema = test_schema();
1087 let config = test_config();
1088
1089 let graph = lineage("col_a", &ast, &schema, &config).unwrap();
1090
1091 assert_eq!(graph.node.name, "col_a");
1092 assert_eq!(graph.node.alias, Some("col_a".to_string()));
1093 }
1094
1095 #[test]
1096 fn test_expression_lineage() {
1097 let sql = "SELECT a + b AS sum FROM t";
1098 let ast = parse(sql, Dialect::Ansi).unwrap();
1099 let schema = test_schema();
1100 let config = test_config();
1101
1102 let graph = lineage("sum", &ast, &schema, &config).unwrap();
1103
1104 assert_eq!(graph.node.name, "sum");
1105 assert_eq!(graph.node.downstream.len(), 2);
1107 }
1108
1109 #[test]
1110 fn test_cte_lineage() {
1111 let sql = "WITH cte AS (SELECT a FROM t) SELECT a FROM cte";
1112 let ast = parse(sql, Dialect::Ansi).unwrap();
1113 let schema = test_schema();
1114 let config = test_config();
1115
1116 let graph = lineage("a", &ast, &schema, &config).unwrap();
1117
1118 assert_eq!(graph.node.name, "a");
1119 assert!(graph.source_tables().contains(&"t".to_string()));
1121 }
1122
1123 #[test]
1124 fn test_join_lineage() {
1125 let sql = "SELECT u.name, o.amount FROM users u JOIN orders o ON u.id = o.user_id";
1126 let ast = parse(sql, Dialect::Ansi).unwrap();
1127 let schema = test_schema();
1128 let config = test_config();
1129
1130 let graph = lineage("name", &ast, &schema, &config).unwrap();
1131 assert_eq!(graph.node.name, "name");
1132
1133 let graph2 = lineage("amount", &ast, &schema, &config).unwrap();
1134 assert_eq!(graph2.node.name, "amount");
1135 }
1136
1137 #[test]
1138 fn test_union_lineage() {
1139 let sql = "SELECT a FROM t UNION SELECT b AS a FROM t";
1140 let ast = parse(sql, Dialect::Ansi).unwrap();
1141 let schema = test_schema();
1142 let config = test_config();
1143
1144 let graph = lineage("a", &ast, &schema, &config).unwrap();
1145
1146 assert_eq!(graph.node.name, "a");
1147 assert_eq!(graph.node.downstream.len(), 2);
1149 }
1150
1151 #[test]
1152 fn test_column_not_found() {
1153 let sql = "SELECT a FROM t";
1154 let ast = parse(sql, Dialect::Ansi).unwrap();
1155 let schema = test_schema();
1156 let config = test_config();
1157
1158 let result = lineage("nonexistent", &ast, &schema, &config);
1159 assert!(matches!(result, Err(LineageError::ColumnNotFound(_))));
1160 }
1161
1162 #[test]
1163 fn test_derived_table_lineage() {
1164 let sql = "SELECT x FROM (SELECT a AS x FROM t) sub";
1165 let ast = parse(sql, Dialect::Ansi).unwrap();
1166 let schema = test_schema();
1167 let config = test_config();
1168
1169 let graph = lineage("x", &ast, &schema, &config).unwrap();
1170
1171 assert_eq!(graph.node.name, "x");
1172 assert!(graph.source_tables().contains(&"t".to_string()));
1174 }
1175
1176 #[test]
1177 fn test_function_lineage() {
1178 let sql = "SELECT SUM(a) AS total FROM t";
1179 let ast = parse(sql, Dialect::Ansi).unwrap();
1180 let schema = test_schema();
1181 let config = test_config();
1182
1183 let graph = lineage("total", &ast, &schema, &config).unwrap();
1184
1185 assert_eq!(graph.node.name, "total");
1186 assert_eq!(graph.node.downstream.len(), 1);
1187 }
1188
1189 #[test]
1190 fn test_lineage_sql_convenience() {
1191 let schema = test_schema();
1192 let config = test_config();
1193
1194 let graph = lineage_sql("b", "SELECT a, b FROM t", &schema, &config).unwrap();
1195
1196 assert_eq!(graph.node.name, "b");
1197 assert_eq!(graph.sql, Some("SELECT a, b FROM t".to_string()));
1198 }
1199
1200 #[test]
1201 fn test_source_tables() {
1202 let sql = "SELECT u.name, o.amount FROM users u JOIN orders o ON u.id = o.user_id";
1203 let ast = parse(sql, Dialect::Ansi).unwrap();
1204 let schema = test_schema();
1205 let config = test_config();
1206
1207 let graph = lineage("name", &ast, &schema, &config).unwrap();
1208 let tables = graph.source_tables();
1209
1210 assert!(tables.contains(&"u".to_string()));
1211 }
1212
1213 #[test]
1214 fn test_to_dot() {
1215 let sql = "SELECT a AS col FROM t";
1216 let ast = parse(sql, Dialect::Ansi).unwrap();
1217 let schema = test_schema();
1218 let config = test_config();
1219
1220 let graph = lineage("col", &ast, &schema, &config).unwrap();
1221 let dot = graph.to_dot();
1222
1223 assert!(dot.contains("digraph lineage"));
1224 assert!(dot.contains("rankdir=BT"));
1225 }
1226
1227 #[test]
1228 fn test_to_mermaid() {
1229 let sql = "SELECT a AS col FROM t";
1230 let ast = parse(sql, Dialect::Ansi).unwrap();
1231 let schema = test_schema();
1232 let config = test_config();
1233
1234 let graph = lineage("col", &ast, &schema, &config).unwrap();
1235 let mermaid = graph.to_mermaid();
1236
1237 assert!(mermaid.contains("flowchart BT"));
1238 }
1239
1240 #[test]
1241 fn test_case_expression_lineage() {
1242 let sql = "SELECT CASE WHEN a > 0 THEN b ELSE c END AS result FROM t";
1243 let ast = parse(sql, Dialect::Ansi).unwrap();
1244 let schema = test_schema();
1245 let config = test_config();
1246
1247 let graph = lineage("result", &ast, &schema, &config).unwrap();
1248
1249 assert_eq!(graph.node.name, "result");
1250 assert!(graph.node.downstream.len() >= 2);
1252 }
1253
1254 #[test]
1255 fn test_coalesce_lineage() {
1256 let sql = "SELECT COALESCE(a, b, c) AS val FROM t";
1257 let ast = parse(sql, Dialect::Ansi).unwrap();
1258 let schema = test_schema();
1259 let config = test_config();
1260
1261 let graph = lineage("val", &ast, &schema, &config).unwrap();
1262
1263 assert_eq!(graph.node.name, "val");
1264 assert_eq!(graph.node.downstream.len(), 3);
1266 }
1267
1268 #[test]
1269 fn test_nested_cte_lineage() {
1270 let sql = r#"
1271 WITH cte1 AS (SELECT a, b FROM t),
1272 cte2 AS (SELECT a + b AS sum FROM cte1)
1273 SELECT sum FROM cte2
1274 "#;
1275 let ast = parse(sql, Dialect::Ansi).unwrap();
1276 let schema = test_schema();
1277 let config = test_config();
1278
1279 let graph = lineage("sum", &ast, &schema, &config).unwrap();
1280
1281 assert_eq!(graph.node.name, "sum");
1282 let sources = graph.source_tables();
1284 assert!(sources.contains(&"t".to_string()));
1285 }
1286
1287 #[test]
1288 fn test_lineage_iterator() {
1289 let sql = "SELECT a + b AS sum FROM t";
1290 let ast = parse(sql, Dialect::Ansi).unwrap();
1291 let schema = test_schema();
1292 let config = test_config();
1293
1294 let graph = lineage("sum", &ast, &schema, &config).unwrap();
1295
1296 let nodes: Vec<_> = graph.node.iter().collect();
1297 assert!(!nodes.is_empty());
1298 assert_eq!(nodes[0].name, "sum");
1299 }
1300
1301 #[test]
1302 fn test_external_sources() {
1303 let schema = test_schema();
1304 let mut sources = HashMap::new();
1305 sources.insert("view1".to_string(), "SELECT a FROM t".to_string());
1306
1307 let config = LineageConfig::new(Dialect::Ansi).with_sources(sources);
1308
1309 let sql = "SELECT a FROM view1";
1310 let result = lineage_sql("a", sql, &schema, &config);
1311 assert!(result.is_ok() || matches!(result, Err(LineageError::ColumnNotFound(_))));
1313 }
1314
1315 #[test]
1316 fn test_qualified_column() {
1317 let sql = "SELECT t.a FROM t";
1318 let ast = parse(sql, Dialect::Ansi).unwrap();
1319 let schema = test_schema();
1320 let config = LineageConfig::new(Dialect::Ansi).with_trim_qualifiers(false);
1321
1322 let graph = lineage("a", &ast, &schema, &config).unwrap();
1323
1324 assert!(graph.node.name.contains('a'));
1326 }
1327}