flowscope_core/analyzer/helpers/
constraints.rs

1//! Constraint extraction utilities for DDL statements.
2//!
3//! This module provides shared logic for extracting primary key, foreign key,
4//! and other constraints from SQL DDL statements.
5
6use crate::types::{ColumnSchema, ConstraintType, ForeignKeyRef, TableConstraintInfo};
7use sqlparser::ast::{ColumnDef, ColumnOption, ColumnOptionDef, TableConstraint};
8use std::collections::HashSet;
9
10/// Extract constraint info from column options (inline PRIMARY KEY, FOREIGN KEY).
11///
12/// Returns a tuple of (is_primary_key, foreign_key_ref).
13/// The foreign_key_ref is only created if the referenced column can be determined.
14pub fn extract_column_constraints(
15    options: &[ColumnOptionDef],
16) -> (Option<bool>, Option<ForeignKeyRef>) {
17    let mut is_pk = None;
18    let mut fk_ref = None;
19
20    for opt in options {
21        match &opt.option {
22            ColumnOption::Unique { is_primary, .. } if *is_primary => {
23                is_pk = Some(true);
24            }
25            ColumnOption::ForeignKey {
26                foreign_table,
27                referred_columns,
28                ..
29            } => {
30                // Only create FK ref if we have a referenced column.
31                // If referred_columns is empty (e.g., `REFERENCES orders`), we skip
32                // creating the FK reference since we can't determine the target column.
33                if let Some(col) = referred_columns.first() {
34                    fk_ref = Some(ForeignKeyRef {
35                        table: foreign_table.to_string(),
36                        column: col.value.clone(),
37                    });
38                }
39            }
40            _ => {}
41        }
42    }
43
44    (is_pk, fk_ref)
45}
46
47/// Extract table-level constraints (composite PRIMARY KEY, FOREIGN KEY, UNIQUE).
48///
49/// Returns a tuple of (pk_column_names, constraint_infos).
50/// The pk_column_names is useful for marking individual columns as part of a composite PK.
51pub fn extract_table_constraints(
52    constraints: &[TableConstraint],
53) -> (Vec<String>, Vec<TableConstraintInfo>) {
54    let mut pk_columns = Vec::new();
55    let mut constraint_infos = Vec::new();
56
57    for constraint in constraints {
58        match constraint {
59            TableConstraint::PrimaryKey { columns, .. } => {
60                // IndexColumn has column: OrderByExpr, we extract the column name from expr
61                let col_names: Vec<String> =
62                    columns.iter().map(|c| c.column.expr.to_string()).collect();
63                pk_columns.extend(col_names.clone());
64                constraint_infos.push(TableConstraintInfo {
65                    constraint_type: ConstraintType::PrimaryKey,
66                    columns: col_names,
67                    referenced_table: None,
68                    referenced_columns: None,
69                });
70            }
71            TableConstraint::ForeignKey {
72                columns,
73                foreign_table,
74                referred_columns,
75                ..
76            } => {
77                // FK columns are Vec<Ident>
78                let col_names: Vec<String> = columns.iter().map(|c| c.value.clone()).collect();
79                let ref_col_names: Vec<String> =
80                    referred_columns.iter().map(|c| c.value.clone()).collect();
81                constraint_infos.push(TableConstraintInfo {
82                    constraint_type: ConstraintType::ForeignKey,
83                    columns: col_names,
84                    referenced_table: Some(foreign_table.to_string()),
85                    referenced_columns: Some(ref_col_names),
86                });
87            }
88            TableConstraint::Unique { columns, .. } => {
89                // IndexColumn has column: OrderByExpr
90                let col_names: Vec<String> =
91                    columns.iter().map(|c| c.column.expr.to_string()).collect();
92                constraint_infos.push(TableConstraintInfo {
93                    constraint_type: ConstraintType::Unique,
94                    columns: col_names,
95                    referenced_table: None,
96                    referenced_columns: None,
97                });
98            }
99            _ => {}
100        }
101    }
102
103    (pk_columns, constraint_infos)
104}
105
106/// Build column schemas with constraint information from DDL column definitions.
107///
108/// This function consolidates the logic for extracting column schema and constraint
109/// information from CREATE TABLE statements, used in both DDL pre-collection and
110/// main analysis passes.
111///
112/// # Arguments
113/// * `columns` - Column definitions from the CREATE TABLE statement
114/// * `table_constraints` - Table-level constraints (composite PKs, FKs, etc.)
115///
116/// # Returns
117/// A tuple of (column_schemas, table_constraint_infos) ready for schema registration.
118pub fn build_column_schemas_with_constraints(
119    columns: &[ColumnDef],
120    table_constraints: &[TableConstraint],
121) -> (Vec<ColumnSchema>, Vec<TableConstraintInfo>) {
122    // Extract table-level constraints and build a set of PK column names for O(1) lookup.
123    let (pk_column_names, table_constraint_infos) = extract_table_constraints(table_constraints);
124    let pk_columns_set: HashSet<&str> = pk_column_names.iter().map(|s| s.as_str()).collect();
125
126    let column_schemas = columns
127        .iter()
128        .map(|c| {
129            let (is_pk, fk_ref) = extract_column_constraints(&c.options);
130            // Column is PK if either inline constraint or in table-level PK
131            let is_primary_key =
132                if is_pk.unwrap_or(false) || pk_columns_set.contains(c.name.value.as_str()) {
133                    Some(true)
134                } else {
135                    None
136                };
137            ColumnSchema {
138                name: c.name.value.clone(),
139                data_type: Some(c.data_type.to_string()),
140                is_primary_key,
141                foreign_key: fk_ref,
142            }
143        })
144        .collect();
145
146    (column_schemas, table_constraint_infos)
147}