Skip to main content

sqlmodel_schema/
introspect.rs

1//! Database introspection.
2//!
3//! This module provides comprehensive schema introspection for SQLite, PostgreSQL, and MySQL.
4//! It extracts metadata about tables, columns, constraints, and indexes.
5
6use asupersync::{Cx, Outcome};
7use sqlmodel_core::{Connection, Error};
8use std::collections::HashMap;
9
10#[cfg(test)]
11use sqlmodel_core::sanitize_identifier;
12
13// ============================================================================
14// Schema Types
15// ============================================================================
16
17/// Complete representation of a database schema.
18#[derive(Debug, Clone, Default)]
19pub struct DatabaseSchema {
20    /// All tables in the schema, keyed by table name
21    pub tables: HashMap<String, TableInfo>,
22    /// Database dialect
23    pub dialect: Dialect,
24}
25
26impl DatabaseSchema {
27    /// Create a new empty schema for the given dialect.
28    pub fn new(dialect: Dialect) -> Self {
29        Self {
30            tables: HashMap::new(),
31            dialect,
32        }
33    }
34
35    /// Get a table by name.
36    pub fn table(&self, name: &str) -> Option<&TableInfo> {
37        self.tables.get(name)
38    }
39
40    /// Get all table names.
41    pub fn table_names(&self) -> Vec<&str> {
42        self.tables.keys().map(|s| s.as_str()).collect()
43    }
44}
45
46/// Parsed SQL type with extracted metadata.
47#[derive(Debug, Clone, Default, PartialEq, Eq)]
48pub struct ParsedSqlType {
49    /// Base type name (e.g., VARCHAR, INTEGER, DECIMAL)
50    pub base_type: String,
51    /// Length for character types (e.g., VARCHAR(255) -> 255)
52    pub length: Option<u32>,
53    /// Precision for numeric types (e.g., DECIMAL(10,2) -> 10)
54    pub precision: Option<u32>,
55    /// Scale for numeric types (e.g., DECIMAL(10,2) -> 2)
56    pub scale: Option<u32>,
57    /// Whether the type is unsigned (MySQL)
58    pub unsigned: bool,
59    /// Whether this is an array type (PostgreSQL)
60    pub array: bool,
61}
62
63impl ParsedSqlType {
64    /// Parse a SQL type string into structured metadata.
65    ///
66    /// # Examples
67    /// - `VARCHAR(255)` -> base_type: "VARCHAR", length: 255
68    /// - `DECIMAL(10,2)` -> base_type: "DECIMAL", precision: 10, scale: 2
69    /// - `INT UNSIGNED` -> base_type: "INT", unsigned: true
70    /// - `TEXT[]` -> base_type: "TEXT", array: true
71    pub fn parse(type_str: &str) -> Self {
72        let type_str = type_str.trim().to_uppercase();
73
74        // Check for array suffix (PostgreSQL)
75        let (type_str, array) = if type_str.ends_with("[]") {
76            (type_str.trim_end_matches("[]"), true)
77        } else {
78            (type_str.as_str(), false)
79        };
80
81        // Check for UNSIGNED suffix (MySQL)
82        let (type_str, unsigned) = if type_str.ends_with(" UNSIGNED") {
83            (type_str.trim_end_matches(" UNSIGNED"), true)
84        } else {
85            (type_str, false)
86        };
87
88        // Parse base type and parameters
89        if let Some(paren_start) = type_str.find('(') {
90            let base_type = type_str[..paren_start].trim().to_string();
91            let params = &type_str[paren_start + 1..type_str.len() - 1]; // Remove ()
92
93            // Check if it's precision,scale or just length
94            if params.contains(',') {
95                let parts: Vec<&str> = params.split(',').collect();
96                let precision = parts.first().and_then(|s| s.trim().parse().ok());
97                let scale = parts.get(1).and_then(|s| s.trim().parse().ok());
98                Self {
99                    base_type,
100                    length: None,
101                    precision,
102                    scale,
103                    unsigned,
104                    array,
105                }
106            } else {
107                let length = params.trim().parse().ok();
108                Self {
109                    base_type,
110                    length,
111                    precision: None,
112                    scale: None,
113                    unsigned,
114                    array,
115                }
116            }
117        } else {
118            Self {
119                base_type: type_str.to_string(),
120                length: None,
121                precision: None,
122                scale: None,
123                unsigned,
124                array,
125            }
126        }
127    }
128
129    /// Check if this is a text/string type.
130    pub fn is_text(&self) -> bool {
131        matches!(
132            self.base_type.as_str(),
133            "VARCHAR" | "CHAR" | "TEXT" | "CLOB" | "NVARCHAR" | "NCHAR" | "NTEXT"
134        )
135    }
136
137    /// Check if this is a numeric type.
138    pub fn is_numeric(&self) -> bool {
139        matches!(
140            self.base_type.as_str(),
141            "INT"
142                | "INTEGER"
143                | "BIGINT"
144                | "SMALLINT"
145                | "TINYINT"
146                | "MEDIUMINT"
147                | "DECIMAL"
148                | "NUMERIC"
149                | "FLOAT"
150                | "DOUBLE"
151                | "REAL"
152                | "DOUBLE PRECISION"
153        )
154    }
155
156    /// Check if this is a date/time type.
157    pub fn is_datetime(&self) -> bool {
158        matches!(
159            self.base_type.as_str(),
160            "DATE" | "TIME" | "DATETIME" | "TIMESTAMP" | "TIMESTAMPTZ" | "TIMETZ"
161        )
162    }
163}
164
165/// Unique constraint information.
166#[derive(Debug, Clone)]
167pub struct UniqueConstraintInfo {
168    /// Constraint name
169    pub name: Option<String>,
170    /// Columns in the constraint
171    pub columns: Vec<String>,
172}
173
174/// Check constraint information.
175#[derive(Debug, Clone)]
176pub struct CheckConstraintInfo {
177    /// Constraint name
178    pub name: Option<String>,
179    /// Check expression
180    pub expression: String,
181}
182
183/// Information about a database table.
184#[derive(Debug, Clone)]
185pub struct TableInfo {
186    /// Table name
187    pub name: String,
188    /// Columns in the table
189    pub columns: Vec<ColumnInfo>,
190    /// Primary key column names
191    pub primary_key: Vec<String>,
192    /// Foreign key constraints
193    pub foreign_keys: Vec<ForeignKeyInfo>,
194    /// Unique constraints
195    pub unique_constraints: Vec<UniqueConstraintInfo>,
196    /// Check constraints
197    pub check_constraints: Vec<CheckConstraintInfo>,
198    /// Indexes on the table
199    pub indexes: Vec<IndexInfo>,
200    /// Table comment (if any)
201    pub comment: Option<String>,
202}
203
204impl TableInfo {
205    /// Get a column by name.
206    pub fn column(&self, name: &str) -> Option<&ColumnInfo> {
207        self.columns.iter().find(|c| c.name == name)
208    }
209
210    /// Check if this table has a single-column auto-increment primary key.
211    pub fn has_auto_pk(&self) -> bool {
212        self.primary_key.len() == 1
213            && self
214                .column(&self.primary_key[0])
215                .is_some_and(|c| c.auto_increment)
216    }
217}
218
219/// Information about a table column.
220#[derive(Debug, Clone)]
221pub struct ColumnInfo {
222    /// Column name
223    pub name: String,
224    /// SQL type as raw string
225    pub sql_type: String,
226    /// Parsed SQL type with extracted metadata
227    pub parsed_type: ParsedSqlType,
228    /// Whether the column is nullable
229    pub nullable: bool,
230    /// Default value expression
231    pub default: Option<String>,
232    /// Whether this is part of the primary key
233    pub primary_key: bool,
234    /// Whether this column auto-increments
235    pub auto_increment: bool,
236    /// Column comment (if any)
237    pub comment: Option<String>,
238}
239
240/// Information about a foreign key constraint.
241#[derive(Debug, Clone)]
242pub struct ForeignKeyInfo {
243    /// Constraint name
244    pub name: Option<String>,
245    /// Local column name
246    pub column: String,
247    /// Referenced table
248    pub foreign_table: String,
249    /// Referenced column
250    pub foreign_column: String,
251    /// ON DELETE action
252    pub on_delete: Option<String>,
253    /// ON UPDATE action
254    pub on_update: Option<String>,
255}
256
257/// Information about an index.
258#[derive(Debug, Clone)]
259pub struct IndexInfo {
260    /// Index name
261    pub name: String,
262    /// Columns in the index
263    pub columns: Vec<String>,
264    /// Whether this is a unique index
265    pub unique: bool,
266    /// Index type (BTREE, HASH, GIN, GIST, etc.)
267    pub index_type: Option<String>,
268    /// Whether this is a primary key index
269    pub primary: bool,
270}
271
272#[derive(Default)]
273struct MySqlIndexAccumulator {
274    columns: Vec<(i64, String)>,
275    unique: bool,
276    index_type: Option<String>,
277    primary: bool,
278}
279
280/// Database introspector.
281pub struct Introspector {
282    /// Database type for dialect-specific queries
283    dialect: Dialect,
284}
285
286/// Supported database dialects.
287#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
288pub enum Dialect {
289    /// SQLite
290    #[default]
291    Sqlite,
292    /// PostgreSQL
293    Postgres,
294    /// MySQL/MariaDB
295    Mysql,
296}
297
298impl Introspector {
299    /// Create a new introspector for the given dialect.
300    pub fn new(dialect: Dialect) -> Self {
301        Self { dialect }
302    }
303
304    /// List all table names in the database.
305    pub async fn table_names<C: Connection>(
306        &self,
307        cx: &Cx,
308        conn: &C,
309    ) -> Outcome<Vec<String>, Error> {
310        let sql = match self.dialect {
311            Dialect::Sqlite => {
312                "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
313            }
314            Dialect::Postgres => {
315                "SELECT table_name
316                                   FROM information_schema.tables
317                                   WHERE table_schema = current_schema()
318                                     AND table_type = 'BASE TABLE'"
319            }
320            Dialect::Mysql => "SHOW TABLES",
321        };
322
323        let rows = match conn.query(cx, sql, &[]).await {
324            Outcome::Ok(rows) => rows,
325            Outcome::Err(e) => return Outcome::Err(e),
326            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
327            Outcome::Panicked(p) => return Outcome::Panicked(p),
328        };
329
330        let names: Vec<String> = rows
331            .iter()
332            .filter_map(|row| row.get(0).and_then(|v| v.as_str().map(String::from)))
333            .collect();
334
335        Outcome::Ok(names)
336    }
337
338    /// Get detailed information about a table.
339    pub async fn table_info<C: Connection>(
340        &self,
341        cx: &Cx,
342        conn: &C,
343        table_name: &str,
344    ) -> Outcome<TableInfo, Error> {
345        let columns = match self.columns(cx, conn, table_name).await {
346            Outcome::Ok(cols) => cols,
347            Outcome::Err(e) => return Outcome::Err(e),
348            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
349            Outcome::Panicked(p) => return Outcome::Panicked(p),
350        };
351
352        let primary_key: Vec<String> = columns
353            .iter()
354            .filter(|c| c.primary_key)
355            .map(|c| c.name.clone())
356            .collect();
357
358        let foreign_keys = match self.foreign_keys(cx, conn, table_name).await {
359            Outcome::Ok(fks) => fks,
360            Outcome::Err(e) => return Outcome::Err(e),
361            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
362            Outcome::Panicked(p) => return Outcome::Panicked(p),
363        };
364
365        let indexes = match self.indexes(cx, conn, table_name).await {
366            Outcome::Ok(idxs) => idxs,
367            Outcome::Err(e) => return Outcome::Err(e),
368            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
369            Outcome::Panicked(p) => return Outcome::Panicked(p),
370        };
371
372        let unique_constraints = match self.dialect {
373            Dialect::Postgres => match self.postgres_unique_constraints(cx, conn, table_name).await
374            {
375                Outcome::Ok(uks) => uks,
376                Outcome::Err(e) => return Outcome::Err(e),
377                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
378                Outcome::Panicked(p) => return Outcome::Panicked(p),
379            },
380            Dialect::Sqlite | Dialect::Mysql => {
381                // For SQLite/MySQL, UNIQUE constraints are represented by UNIQUE indexes.
382                // We normalize them into `unique_constraints` and remove them from `indexes`
383                // so the diff engine does not try to DROP/CREATE constraint-backed indexes.
384                Vec::new()
385            }
386        };
387
388        // SQLite/MySQL: derive unique_constraints from indexes (unique && !primary).
389        // PostgreSQL: unique_constraints already queried from pg_constraint; indexes already
390        // exclude constraint-backed indexes (see postgres_indexes()).
391        let (unique_constraints, indexes) = match self.dialect {
392            Dialect::Sqlite | Dialect::Mysql => {
393                let mut uks = Vec::new();
394                let mut idxs = Vec::new();
395                for idx in indexes {
396                    if idx.unique && !idx.primary {
397                        uks.push(UniqueConstraintInfo {
398                            name: Some(idx.name.clone()),
399                            columns: idx.columns.clone(),
400                        });
401                    } else {
402                        idxs.push(idx);
403                    }
404                }
405                (uks, idxs)
406            }
407            Dialect::Postgres => (unique_constraints, indexes),
408        };
409
410        let check_constraints = match self.check_constraints(cx, conn, table_name).await {
411            Outcome::Ok(checks) => checks,
412            Outcome::Err(e) => return Outcome::Err(e),
413            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
414            Outcome::Panicked(p) => return Outcome::Panicked(p),
415        };
416
417        let comment = match self.table_comment(cx, conn, table_name).await {
418            Outcome::Ok(comment) => comment,
419            Outcome::Err(e) => return Outcome::Err(e),
420            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
421            Outcome::Panicked(p) => return Outcome::Panicked(p),
422        };
423
424        Outcome::Ok(TableInfo {
425            name: table_name.to_string(),
426            columns,
427            primary_key,
428            foreign_keys,
429            unique_constraints,
430            check_constraints,
431            indexes,
432            comment,
433        })
434    }
435
436    async fn postgres_unique_constraints<C: Connection>(
437        &self,
438        cx: &Cx,
439        conn: &C,
440        table_name: &str,
441    ) -> Outcome<Vec<UniqueConstraintInfo>, Error> {
442        debug_assert!(self.dialect == Dialect::Postgres);
443
444        let sql = "SELECT
445                       c.conname AS constraint_name,
446                       a.attname AS column_name,
447                       u.ord AS ordinal
448                   FROM pg_constraint c
449                   JOIN pg_class t ON t.oid = c.conrelid
450                   JOIN pg_namespace n ON n.oid = t.relnamespace
451                   JOIN LATERAL unnest(c.conkey) WITH ORDINALITY AS u(attnum, ord) ON true
452                   JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = u.attnum
453                   WHERE t.relname = $1
454                     AND n.nspname = current_schema()
455                     AND c.contype = 'u'
456                   ORDER BY c.conname, u.ord";
457
458        let rows = match conn
459            .query(
460                cx,
461                sql,
462                &[sqlmodel_core::Value::Text(table_name.to_string())],
463            )
464            .await
465        {
466            Outcome::Ok(rows) => rows,
467            Outcome::Err(e) => return Outcome::Err(e),
468            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
469            Outcome::Panicked(p) => return Outcome::Panicked(p),
470        };
471
472        let mut map: HashMap<String, Vec<(i64, String)>> = HashMap::new();
473        for row in &rows {
474            let Ok(name) = row.get_named::<String>("constraint_name") else {
475                continue;
476            };
477            let Ok(col) = row.get_named::<String>("column_name") else {
478                continue;
479            };
480            let ord = row.get_named::<i64>("ordinal").ok().unwrap_or(0);
481            map.entry(name.clone())
482                .and_modify(|cols| cols.push((ord, col.clone())))
483                .or_insert_with(|| vec![(ord, col)]);
484        }
485
486        let mut out = Vec::new();
487        for (name, mut cols) in map {
488            cols.sort_by_key(|(ord, _)| *ord);
489            out.push(UniqueConstraintInfo {
490                name: Some(name),
491                columns: cols.into_iter().map(|(_, c)| c).collect(),
492            });
493        }
494        out.sort_by(|a, b| a.name.cmp(&b.name));
495
496        Outcome::Ok(out)
497    }
498
499    /// Introspect the entire database schema.
500    pub async fn introspect_all<C: Connection>(
501        &self,
502        cx: &Cx,
503        conn: &C,
504    ) -> Outcome<DatabaseSchema, Error> {
505        let table_names = match self.table_names(cx, conn).await {
506            Outcome::Ok(names) => names,
507            Outcome::Err(e) => return Outcome::Err(e),
508            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
509            Outcome::Panicked(p) => return Outcome::Panicked(p),
510        };
511
512        let mut schema = DatabaseSchema::new(self.dialect);
513
514        for name in table_names {
515            let info = match self.table_info(cx, conn, &name).await {
516                Outcome::Ok(info) => info,
517                Outcome::Err(e) => return Outcome::Err(e),
518                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
519                Outcome::Panicked(p) => return Outcome::Panicked(p),
520            };
521            schema.tables.insert(name, info);
522        }
523
524        Outcome::Ok(schema)
525    }
526
527    /// Get column information for a table.
528    async fn columns<C: Connection>(
529        &self,
530        cx: &Cx,
531        conn: &C,
532        table_name: &str,
533    ) -> Outcome<Vec<ColumnInfo>, Error> {
534        match self.dialect {
535            Dialect::Sqlite => self.sqlite_columns(cx, conn, table_name).await,
536            Dialect::Postgres => self.postgres_columns(cx, conn, table_name).await,
537            Dialect::Mysql => self.mysql_columns(cx, conn, table_name).await,
538        }
539    }
540
541    async fn sqlite_columns<C: Connection>(
542        &self,
543        cx: &Cx,
544        conn: &C,
545        table_name: &str,
546    ) -> Outcome<Vec<ColumnInfo>, Error> {
547        let sql = format!("PRAGMA table_info({})", quote_sqlite_identifier(table_name));
548        let rows = match conn.query(cx, &sql, &[]).await {
549            Outcome::Ok(rows) => rows,
550            Outcome::Err(e) => return Outcome::Err(e),
551            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
552            Outcome::Panicked(p) => return Outcome::Panicked(p),
553        };
554
555        let columns: Vec<ColumnInfo> = rows
556            .iter()
557            .filter_map(|row| {
558                let name = row.get_named::<String>("name").ok()?;
559                let sql_type = row.get_named::<String>("type").ok()?;
560                let notnull = row.get_named::<i64>("notnull").ok().unwrap_or(0);
561                let dflt_value = row.get_named::<String>("dflt_value").ok();
562                let pk = row.get_named::<i64>("pk").ok().unwrap_or(0);
563                let parsed_type = ParsedSqlType::parse(&sql_type);
564
565                Some(ColumnInfo {
566                    name,
567                    sql_type,
568                    parsed_type,
569                    nullable: notnull == 0,
570                    default: dflt_value,
571                    primary_key: pk > 0,
572                    auto_increment: false, // SQLite doesn't report this via PRAGMA
573                    comment: None,         // SQLite doesn't support column comments
574                })
575            })
576            .collect();
577
578        Outcome::Ok(columns)
579    }
580
581    async fn postgres_columns<C: Connection>(
582        &self,
583        cx: &Cx,
584        conn: &C,
585        table_name: &str,
586    ) -> Outcome<Vec<ColumnInfo>, Error> {
587        // Use a more comprehensive query to get full type info
588        let sql = "SELECT
589                       c.column_name,
590                       c.data_type,
591                       c.udt_name,
592                       c.character_maximum_length,
593                       c.numeric_precision,
594                       c.numeric_scale,
595                       c.is_nullable,
596                       c.column_default,
597                       COALESCE(d.description, '') as column_comment
598                   FROM information_schema.columns c
599                   LEFT JOIN pg_catalog.pg_statio_all_tables st
600                       ON c.table_schema = st.schemaname AND c.table_name = st.relname
601                   LEFT JOIN pg_catalog.pg_description d
602                       ON d.objoid = st.relid AND d.objsubid = c.ordinal_position
603                   WHERE c.table_name = $1 AND c.table_schema = current_schema()
604                   ORDER BY c.ordinal_position";
605
606        let rows = match conn
607            .query(
608                cx,
609                sql,
610                &[sqlmodel_core::Value::Text(table_name.to_string())],
611            )
612            .await
613        {
614            Outcome::Ok(rows) => rows,
615            Outcome::Err(e) => return Outcome::Err(e),
616            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
617            Outcome::Panicked(p) => return Outcome::Panicked(p),
618        };
619
620        let columns: Vec<ColumnInfo> = rows
621            .iter()
622            .filter_map(|row| {
623                let name = row.get_named::<String>("column_name").ok()?;
624                let data_type = row.get_named::<String>("data_type").ok()?;
625                let udt_name = row.get_named::<String>("udt_name").ok().unwrap_or_default();
626                let char_len = row.get_named::<i64>("character_maximum_length").ok();
627                let precision = row.get_named::<i64>("numeric_precision").ok();
628                let scale = row.get_named::<i64>("numeric_scale").ok();
629                let nullable_str = row.get_named::<String>("is_nullable").ok()?;
630                let default = row.get_named::<String>("column_default").ok();
631                let comment = row.get_named::<String>("column_comment").ok();
632
633                // Build a complete SQL type string
634                let sql_type =
635                    build_postgres_type(&data_type, &udt_name, char_len, precision, scale);
636                let parsed_type = ParsedSqlType::parse(&sql_type);
637
638                // Check if auto-increment by looking at default (nextval)
639                let auto_increment = default.as_ref().is_some_and(|d| d.starts_with("nextval("));
640
641                Some(ColumnInfo {
642                    name,
643                    sql_type,
644                    parsed_type,
645                    nullable: nullable_str == "YES",
646                    default,
647                    primary_key: false, // Determined via separate index query
648                    auto_increment,
649                    comment: comment.filter(|s| !s.is_empty()),
650                })
651            })
652            .collect();
653
654        Outcome::Ok(columns)
655    }
656
657    async fn mysql_columns<C: Connection>(
658        &self,
659        cx: &Cx,
660        conn: &C,
661        table_name: &str,
662    ) -> Outcome<Vec<ColumnInfo>, Error> {
663        // Use SHOW FULL COLUMNS to get comments
664        let sql = format!(
665            "SHOW FULL COLUMNS FROM {}",
666            quote_mysql_identifier(table_name)
667        );
668        let rows = match conn.query(cx, &sql, &[]).await {
669            Outcome::Ok(rows) => rows,
670            Outcome::Err(e) => return Outcome::Err(e),
671            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
672            Outcome::Panicked(p) => return Outcome::Panicked(p),
673        };
674
675        let columns: Vec<ColumnInfo> = rows
676            .iter()
677            .filter_map(|row| {
678                let name = row.get_named::<String>("Field").ok()?;
679                let sql_type = row.get_named::<String>("Type").ok()?;
680                let null = row.get_named::<String>("Null").ok()?;
681                let key = row.get_named::<String>("Key").ok()?;
682                let default = row.get_named::<String>("Default").ok();
683                let extra = row.get_named::<String>("Extra").ok().unwrap_or_default();
684                let comment = row.get_named::<String>("Comment").ok();
685                let parsed_type = ParsedSqlType::parse(&sql_type);
686
687                Some(ColumnInfo {
688                    name,
689                    sql_type,
690                    parsed_type,
691                    nullable: null == "YES",
692                    default,
693                    primary_key: key == "PRI",
694                    auto_increment: extra.contains("auto_increment"),
695                    comment: comment.filter(|s| !s.is_empty()),
696                })
697            })
698            .collect();
699
700        Outcome::Ok(columns)
701    }
702
703    // ========================================================================
704    // Foreign Key Introspection
705    // ========================================================================
706
707    async fn check_constraints<C: Connection>(
708        &self,
709        cx: &Cx,
710        conn: &C,
711        table_name: &str,
712    ) -> Outcome<Vec<CheckConstraintInfo>, Error> {
713        match self.dialect {
714            Dialect::Sqlite => self.sqlite_check_constraints(cx, conn, table_name).await,
715            Dialect::Postgres => self.postgres_check_constraints(cx, conn, table_name).await,
716            Dialect::Mysql => self.mysql_check_constraints(cx, conn, table_name).await,
717        }
718    }
719
720    async fn sqlite_check_constraints<C: Connection>(
721        &self,
722        cx: &Cx,
723        conn: &C,
724        table_name: &str,
725    ) -> Outcome<Vec<CheckConstraintInfo>, Error> {
726        let sql = "SELECT sql FROM sqlite_master WHERE type='table' AND name=?1";
727        let rows = match conn
728            .query(
729                cx,
730                sql,
731                &[sqlmodel_core::Value::Text(table_name.to_string())],
732            )
733            .await
734        {
735            Outcome::Ok(rows) => rows,
736            Outcome::Err(e) => return Outcome::Err(e),
737            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
738            Outcome::Panicked(p) => return Outcome::Panicked(p),
739        };
740
741        let create_sql = rows.iter().find_map(|row| {
742            row.get_named::<String>("sql").ok().or_else(|| {
743                row.get(0)
744                    .and_then(|value| value.as_str().map(ToString::to_string))
745            })
746        });
747
748        match create_sql {
749            Some(sql) => Outcome::Ok(extract_sqlite_check_constraints(&sql)),
750            None => Outcome::Ok(Vec::new()),
751        }
752    }
753
754    async fn postgres_check_constraints<C: Connection>(
755        &self,
756        cx: &Cx,
757        conn: &C,
758        table_name: &str,
759    ) -> Outcome<Vec<CheckConstraintInfo>, Error> {
760        let sql = "SELECT
761                       c.conname AS constraint_name,
762                       pg_get_constraintdef(c.oid, true) AS constraint_definition
763                   FROM pg_constraint c
764                   JOIN pg_class t ON t.oid = c.conrelid
765                   JOIN pg_namespace n ON n.oid = t.relnamespace
766                   WHERE t.relname = $1
767                     AND n.nspname = current_schema()
768                     AND c.contype = 'c'
769                   ORDER BY c.conname";
770
771        let rows = match conn
772            .query(
773                cx,
774                sql,
775                &[sqlmodel_core::Value::Text(table_name.to_string())],
776            )
777            .await
778        {
779            Outcome::Ok(rows) => rows,
780            Outcome::Err(e) => return Outcome::Err(e),
781            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
782            Outcome::Panicked(p) => return Outcome::Panicked(p),
783        };
784
785        let checks = rows
786            .iter()
787            .filter_map(|row| {
788                let definition = row.get_named::<String>("constraint_definition").ok()?;
789                let expression = normalize_check_expression(&definition);
790                if expression.is_empty() {
791                    return None;
792                }
793                Some(CheckConstraintInfo {
794                    name: row
795                        .get_named::<String>("constraint_name")
796                        .ok()
797                        .filter(|s| !s.is_empty()),
798                    expression,
799                })
800            })
801            .collect();
802
803        Outcome::Ok(checks)
804    }
805
806    async fn mysql_check_constraints<C: Connection>(
807        &self,
808        cx: &Cx,
809        conn: &C,
810        table_name: &str,
811    ) -> Outcome<Vec<CheckConstraintInfo>, Error> {
812        let sql = "SELECT
813                 tc.CONSTRAINT_NAME AS constraint_name,
814                 cc.CHECK_CLAUSE AS check_clause
815             FROM information_schema.TABLE_CONSTRAINTS tc
816             JOIN information_schema.CHECK_CONSTRAINTS cc
817               ON tc.CONSTRAINT_SCHEMA = cc.CONSTRAINT_SCHEMA
818              AND tc.CONSTRAINT_NAME = cc.CONSTRAINT_NAME
819             WHERE tc.CONSTRAINT_TYPE = 'CHECK'
820               AND tc.TABLE_SCHEMA = DATABASE()
821               AND tc.TABLE_NAME = ?
822             ORDER BY tc.CONSTRAINT_NAME";
823
824        let rows = match conn
825            .query(
826                cx,
827                sql,
828                &[sqlmodel_core::Value::Text(table_name.to_string())],
829            )
830            .await
831        {
832            Outcome::Ok(rows) => rows,
833            Outcome::Err(e) => return Outcome::Err(e),
834            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
835            Outcome::Panicked(p) => return Outcome::Panicked(p),
836        };
837
838        let checks = rows
839            .iter()
840            .filter_map(|row| {
841                let definition = row.get_named::<String>("check_clause").ok()?;
842                let expression = normalize_check_expression(&definition);
843                if expression.is_empty() {
844                    return None;
845                }
846                Some(CheckConstraintInfo {
847                    name: row
848                        .get_named::<String>("constraint_name")
849                        .ok()
850                        .filter(|s| !s.is_empty()),
851                    expression,
852                })
853            })
854            .collect();
855
856        Outcome::Ok(checks)
857    }
858
859    async fn table_comment<C: Connection>(
860        &self,
861        cx: &Cx,
862        conn: &C,
863        table_name: &str,
864    ) -> Outcome<Option<String>, Error> {
865        match self.dialect {
866            Dialect::Sqlite => Outcome::Ok(None),
867            Dialect::Postgres => self.postgres_table_comment(cx, conn, table_name).await,
868            Dialect::Mysql => self.mysql_table_comment(cx, conn, table_name).await,
869        }
870    }
871
872    async fn postgres_table_comment<C: Connection>(
873        &self,
874        cx: &Cx,
875        conn: &C,
876        table_name: &str,
877    ) -> Outcome<Option<String>, Error> {
878        let sql = "SELECT
879                       COALESCE(obj_description(c.oid, 'pg_class'), '') AS table_comment
880                   FROM pg_class c
881                   JOIN pg_namespace n ON n.oid = c.relnamespace
882                   WHERE c.relname = $1
883                     AND n.nspname = current_schema()
884                   LIMIT 1";
885
886        let rows = match conn
887            .query(
888                cx,
889                sql,
890                &[sqlmodel_core::Value::Text(table_name.to_string())],
891            )
892            .await
893        {
894            Outcome::Ok(rows) => rows,
895            Outcome::Err(e) => return Outcome::Err(e),
896            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
897            Outcome::Panicked(p) => return Outcome::Panicked(p),
898        };
899
900        let comment = rows.iter().find_map(|row| {
901            row.get_named::<String>("table_comment")
902                .ok()
903                .filter(|s| !s.is_empty())
904        });
905        Outcome::Ok(comment)
906    }
907
908    async fn mysql_table_comment<C: Connection>(
909        &self,
910        cx: &Cx,
911        conn: &C,
912        table_name: &str,
913    ) -> Outcome<Option<String>, Error> {
914        let sql = "SELECT TABLE_COMMENT AS table_comment
915             FROM information_schema.TABLES
916             WHERE TABLE_SCHEMA = DATABASE()
917               AND TABLE_NAME = ?
918             LIMIT 1";
919
920        let rows = match conn
921            .query(
922                cx,
923                sql,
924                &[sqlmodel_core::Value::Text(table_name.to_string())],
925            )
926            .await
927        {
928            Outcome::Ok(rows) => rows,
929            Outcome::Err(e) => return Outcome::Err(e),
930            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
931            Outcome::Panicked(p) => return Outcome::Panicked(p),
932        };
933
934        let comment = rows.iter().find_map(|row| {
935            row.get_named::<String>("table_comment")
936                .ok()
937                .filter(|s| !s.is_empty())
938        });
939        Outcome::Ok(comment)
940    }
941
942    /// Get foreign key constraints for a table.
943    async fn foreign_keys<C: Connection>(
944        &self,
945        cx: &Cx,
946        conn: &C,
947        table_name: &str,
948    ) -> Outcome<Vec<ForeignKeyInfo>, Error> {
949        match self.dialect {
950            Dialect::Sqlite => self.sqlite_foreign_keys(cx, conn, table_name).await,
951            Dialect::Postgres => self.postgres_foreign_keys(cx, conn, table_name).await,
952            Dialect::Mysql => self.mysql_foreign_keys(cx, conn, table_name).await,
953        }
954    }
955
956    async fn sqlite_foreign_keys<C: Connection>(
957        &self,
958        cx: &Cx,
959        conn: &C,
960        table_name: &str,
961    ) -> Outcome<Vec<ForeignKeyInfo>, Error> {
962        let sql = format!(
963            "PRAGMA foreign_key_list({})",
964            quote_sqlite_identifier(table_name)
965        );
966        let rows = match conn.query(cx, &sql, &[]).await {
967            Outcome::Ok(rows) => rows,
968            Outcome::Err(e) => return Outcome::Err(e),
969            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
970            Outcome::Panicked(p) => return Outcome::Panicked(p),
971        };
972
973        let fks: Vec<ForeignKeyInfo> = rows
974            .iter()
975            .filter_map(|row| {
976                let table = row.get_named::<String>("table").ok()?;
977                let from = row.get_named::<String>("from").ok()?;
978                let to = row.get_named::<String>("to").ok()?;
979                let on_update = row.get_named::<String>("on_update").ok();
980                let on_delete = row.get_named::<String>("on_delete").ok();
981
982                Some(ForeignKeyInfo {
983                    name: None, // SQLite doesn't name FK constraints in PRAGMA output
984                    column: from,
985                    foreign_table: table,
986                    foreign_column: to,
987                    on_delete: on_delete.filter(|s| s != "NO ACTION"),
988                    on_update: on_update.filter(|s| s != "NO ACTION"),
989                })
990            })
991            .collect();
992
993        Outcome::Ok(fks)
994    }
995
996    async fn postgres_foreign_keys<C: Connection>(
997        &self,
998        cx: &Cx,
999        conn: &C,
1000        table_name: &str,
1001    ) -> Outcome<Vec<ForeignKeyInfo>, Error> {
1002        let sql = "SELECT
1003                       tc.constraint_name,
1004                       kcu.column_name,
1005                       ccu.table_name AS foreign_table_name,
1006                       ccu.column_name AS foreign_column_name,
1007                       rc.delete_rule,
1008                       rc.update_rule
1009                   FROM information_schema.table_constraints AS tc
1010                   JOIN information_schema.key_column_usage AS kcu
1011                       ON tc.constraint_name = kcu.constraint_name
1012                       AND tc.table_schema = kcu.table_schema
1013                   JOIN information_schema.constraint_column_usage AS ccu
1014                       ON ccu.constraint_name = tc.constraint_name
1015                       AND ccu.table_schema = tc.table_schema
1016                   JOIN information_schema.referential_constraints AS rc
1017                       ON rc.constraint_name = tc.constraint_name
1018                       AND rc.constraint_schema = tc.table_schema
1019                   WHERE tc.constraint_type = 'FOREIGN KEY'
1020                       AND tc.table_name = $1
1021                       AND tc.table_schema = current_schema()";
1022
1023        let rows = match conn
1024            .query(
1025                cx,
1026                sql,
1027                &[sqlmodel_core::Value::Text(table_name.to_string())],
1028            )
1029            .await
1030        {
1031            Outcome::Ok(rows) => rows,
1032            Outcome::Err(e) => return Outcome::Err(e),
1033            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1034            Outcome::Panicked(p) => return Outcome::Panicked(p),
1035        };
1036
1037        let fks: Vec<ForeignKeyInfo> = rows
1038            .iter()
1039            .filter_map(|row| {
1040                let name = row.get_named::<String>("constraint_name").ok();
1041                let column = row.get_named::<String>("column_name").ok()?;
1042                let foreign_table = row.get_named::<String>("foreign_table_name").ok()?;
1043                let foreign_column = row.get_named::<String>("foreign_column_name").ok()?;
1044                let on_delete = row.get_named::<String>("delete_rule").ok();
1045                let on_update = row.get_named::<String>("update_rule").ok();
1046
1047                Some(ForeignKeyInfo {
1048                    name,
1049                    column,
1050                    foreign_table,
1051                    foreign_column,
1052                    on_delete: on_delete.filter(|s| s != "NO ACTION"),
1053                    on_update: on_update.filter(|s| s != "NO ACTION"),
1054                })
1055            })
1056            .collect();
1057
1058        Outcome::Ok(fks)
1059    }
1060
1061    async fn mysql_foreign_keys<C: Connection>(
1062        &self,
1063        cx: &Cx,
1064        conn: &C,
1065        table_name: &str,
1066    ) -> Outcome<Vec<ForeignKeyInfo>, Error> {
1067        let sql = "SELECT
1068                       kcu.constraint_name,
1069                       kcu.column_name,
1070                       kcu.referenced_table_name,
1071                       kcu.referenced_column_name,
1072                       rc.delete_rule,
1073                       rc.update_rule
1074                   FROM information_schema.key_column_usage AS kcu
1075                   JOIN information_schema.referential_constraints AS rc
1076                       ON rc.constraint_name = kcu.constraint_name
1077                       AND rc.constraint_schema = kcu.constraint_schema
1078                   WHERE kcu.table_schema = DATABASE()
1079                       AND kcu.table_name = ?
1080                       AND kcu.referenced_table_name IS NOT NULL";
1081
1082        let rows = match conn
1083            .query(
1084                cx,
1085                sql,
1086                &[sqlmodel_core::Value::Text(table_name.to_string())],
1087            )
1088            .await
1089        {
1090            Outcome::Ok(rows) => rows,
1091            Outcome::Err(e) => return Outcome::Err(e),
1092            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1093            Outcome::Panicked(p) => return Outcome::Panicked(p),
1094        };
1095
1096        let fks: Vec<ForeignKeyInfo> = rows
1097            .iter()
1098            .filter_map(|row| {
1099                let name = row.get_named::<String>("constraint_name").ok();
1100                let column = row.get_named::<String>("column_name").ok()?;
1101                let foreign_table = row.get_named::<String>("referenced_table_name").ok()?;
1102                let foreign_column = row.get_named::<String>("referenced_column_name").ok()?;
1103                let on_delete = row.get_named::<String>("delete_rule").ok();
1104                let on_update = row.get_named::<String>("update_rule").ok();
1105
1106                Some(ForeignKeyInfo {
1107                    name,
1108                    column,
1109                    foreign_table,
1110                    foreign_column,
1111                    on_delete: on_delete.filter(|s| s != "NO ACTION"),
1112                    on_update: on_update.filter(|s| s != "NO ACTION"),
1113                })
1114            })
1115            .collect();
1116
1117        Outcome::Ok(fks)
1118    }
1119
1120    // ========================================================================
1121    // Index Introspection
1122    // ========================================================================
1123
1124    /// Get indexes for a table.
1125    async fn indexes<C: Connection>(
1126        &self,
1127        cx: &Cx,
1128        conn: &C,
1129        table_name: &str,
1130    ) -> Outcome<Vec<IndexInfo>, Error> {
1131        match self.dialect {
1132            Dialect::Sqlite => self.sqlite_indexes(cx, conn, table_name).await,
1133            Dialect::Postgres => self.postgres_indexes(cx, conn, table_name).await,
1134            Dialect::Mysql => self.mysql_indexes(cx, conn, table_name).await,
1135        }
1136    }
1137
1138    async fn sqlite_indexes<C: Connection>(
1139        &self,
1140        cx: &Cx,
1141        conn: &C,
1142        table_name: &str,
1143    ) -> Outcome<Vec<IndexInfo>, Error> {
1144        let sql = format!("PRAGMA index_list({})", quote_sqlite_identifier(table_name));
1145        let rows = match conn.query(cx, &sql, &[]).await {
1146            Outcome::Ok(rows) => rows,
1147            Outcome::Err(e) => return Outcome::Err(e),
1148            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1149            Outcome::Panicked(p) => return Outcome::Panicked(p),
1150        };
1151
1152        let mut indexes = Vec::new();
1153
1154        for row in &rows {
1155            let Ok(name) = row.get_named::<String>("name") else {
1156                continue;
1157            };
1158            let unique = row.get_named::<i64>("unique").ok().unwrap_or(0) == 1;
1159            let origin = row.get_named::<String>("origin").ok().unwrap_or_default();
1160            let primary = origin == "pk";
1161
1162            // Get column info for this index
1163            let info_sql = format!("PRAGMA index_info({})", quote_sqlite_identifier(&name));
1164            let info_rows = match conn.query(cx, &info_sql, &[]).await {
1165                Outcome::Ok(r) => r,
1166                Outcome::Err(_) => continue,
1167                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1168                Outcome::Panicked(p) => return Outcome::Panicked(p),
1169            };
1170
1171            let columns: Vec<String> = info_rows
1172                .iter()
1173                .filter_map(|r| r.get_named::<String>("name").ok())
1174                .collect();
1175
1176            indexes.push(IndexInfo {
1177                name,
1178                columns,
1179                unique,
1180                index_type: None, // SQLite doesn't expose index type
1181                primary,
1182            });
1183        }
1184
1185        Outcome::Ok(indexes)
1186    }
1187
1188    async fn postgres_indexes<C: Connection>(
1189        &self,
1190        cx: &Cx,
1191        conn: &C,
1192        table_name: &str,
1193    ) -> Outcome<Vec<IndexInfo>, Error> {
1194        // Exclude indexes backing PRIMARY KEY / UNIQUE constraints; those are represented
1195        // via TableInfo.primary_key and TableInfo.unique_constraints so the diff engine
1196        // doesn't try to DROP/CREATE constraint-backed indexes.
1197        let sql = "SELECT
1198                       i.relname AS index_name,
1199                       a.attname AS column_name,
1200                       k.ord AS column_ord,
1201                       ix.indisunique AS is_unique,
1202                       ix.indisprimary AS is_primary,
1203                       am.amname AS index_type
1204                   FROM pg_class t
1205                   JOIN pg_namespace n ON n.oid = t.relnamespace
1206                   JOIN pg_index ix ON t.oid = ix.indrelid
1207                   JOIN LATERAL unnest(ix.indkey) WITH ORDINALITY AS k(attnum, ord) ON true
1208                   JOIN pg_class i ON i.oid = ix.indexrelid
1209                   JOIN pg_am am ON i.relam = am.oid
1210                   JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = k.attnum
1211                   WHERE t.relname = $1
1212                       AND n.nspname = current_schema()
1213                       AND t.relkind = 'r'
1214                       AND NOT EXISTS (
1215                           SELECT 1
1216                           FROM pg_constraint c
1217                           WHERE c.conrelid = t.oid
1218                             AND c.conindid = i.oid
1219                             AND c.contype IN ('p', 'u')
1220                       )
1221                   ORDER BY i.relname, k.ord";
1222
1223        let rows = match conn
1224            .query(
1225                cx,
1226                sql,
1227                &[sqlmodel_core::Value::Text(table_name.to_string())],
1228            )
1229            .await
1230        {
1231            Outcome::Ok(rows) => rows,
1232            Outcome::Err(e) => return Outcome::Err(e),
1233            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1234            Outcome::Panicked(p) => return Outcome::Panicked(p),
1235        };
1236
1237        // Group by index name
1238        let mut index_map: HashMap<String, IndexInfo> = HashMap::new();
1239
1240        for row in &rows {
1241            let Ok(name) = row.get_named::<String>("index_name") else {
1242                continue;
1243            };
1244            let Ok(column) = row.get_named::<String>("column_name") else {
1245                continue;
1246            };
1247            let unique = row.get_named::<bool>("is_unique").ok().unwrap_or(false);
1248            let primary = row.get_named::<bool>("is_primary").ok().unwrap_or(false);
1249            let index_type = row.get_named::<String>("index_type").ok();
1250
1251            index_map
1252                .entry(name.clone())
1253                .and_modify(|idx| idx.columns.push(column.clone()))
1254                .or_insert_with(|| IndexInfo {
1255                    name,
1256                    columns: vec![column],
1257                    unique,
1258                    index_type,
1259                    primary,
1260                });
1261        }
1262
1263        Outcome::Ok(index_map.into_values().collect())
1264    }
1265
1266    async fn mysql_indexes<C: Connection>(
1267        &self,
1268        cx: &Cx,
1269        conn: &C,
1270        table_name: &str,
1271    ) -> Outcome<Vec<IndexInfo>, Error> {
1272        let sql = format!("SHOW INDEX FROM {}", quote_mysql_identifier(table_name));
1273        let rows = match conn.query(cx, &sql, &[]).await {
1274            Outcome::Ok(rows) => rows,
1275            Outcome::Err(e) => return Outcome::Err(e),
1276            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1277            Outcome::Panicked(p) => return Outcome::Panicked(p),
1278        };
1279
1280        // Group by index name, preserving declared key order via Seq_in_index.
1281        let mut index_map: HashMap<String, MySqlIndexAccumulator> = HashMap::new();
1282
1283        for row in &rows {
1284            let Ok(name) = row.get_named::<String>("Key_name") else {
1285                continue;
1286            };
1287            let Ok(column) = row.get_named::<String>("Column_name") else {
1288                continue;
1289            };
1290            let seq_in_index = row
1291                .get_named::<i64>("Seq_in_index")
1292                .ok()
1293                .unwrap_or(i64::MAX);
1294            let non_unique = row.get_named::<i64>("Non_unique").ok().unwrap_or(1);
1295            let index_type = row.get_named::<String>("Index_type").ok();
1296            let primary = name == "PRIMARY";
1297
1298            index_map
1299                .entry(name.clone())
1300                .and_modify(|idx| idx.columns.push((seq_in_index, column.clone())))
1301                .or_insert_with(|| MySqlIndexAccumulator {
1302                    columns: vec![(seq_in_index, column)],
1303                    unique: non_unique == 0,
1304                    index_type: index_type.clone(),
1305                    primary,
1306                });
1307        }
1308
1309        let indexes = index_map
1310            .into_iter()
1311            .map(|(name, mut acc)| {
1312                acc.columns.sort_by_key(|(seq, _)| *seq);
1313                IndexInfo {
1314                    name,
1315                    columns: acc.columns.into_iter().map(|(_, col)| col).collect(),
1316                    unique: acc.unique,
1317                    index_type: acc.index_type,
1318                    primary: acc.primary,
1319                }
1320            })
1321            .collect();
1322
1323        Outcome::Ok(indexes)
1324    }
1325}
1326
1327// ============================================================================
1328// Helper Functions
1329// ============================================================================
1330
1331fn quote_sqlite_identifier(name: &str) -> String {
1332    let escaped = name.replace('"', "\"\"");
1333    format!("\"{escaped}\"")
1334}
1335
1336fn quote_mysql_identifier(name: &str) -> String {
1337    let escaped = name.replace('`', "``");
1338    format!("`{escaped}`")
1339}
1340
1341/// Build a complete PostgreSQL type string from information_schema data.
1342fn build_postgres_type(
1343    data_type: &str,
1344    udt_name: &str,
1345    char_len: Option<i64>,
1346    precision: Option<i64>,
1347    scale: Option<i64>,
1348) -> String {
1349    // Handle array types
1350    if data_type == "ARRAY" {
1351        return format!("{}[]", udt_name.trim_start_matches('_'));
1352    }
1353
1354    // For character types with length
1355    if let Some(len) = char_len {
1356        return format!("{}({})", data_type.to_uppercase(), len);
1357    }
1358
1359    // For numeric types with precision/scale
1360    if let (Some(p), Some(s)) = (precision, scale) {
1361        if data_type == "numeric" {
1362            return format!("NUMERIC({},{})", p, s);
1363        }
1364    }
1365
1366    // Default: just return the data type
1367    data_type.to_uppercase()
1368}
1369
1370fn normalize_check_expression(definition: &str) -> String {
1371    let trimmed = definition.trim();
1372    let check_positions = keyword_positions_outside_quotes(trimmed, "CHECK");
1373    if let Some(check_pos) = check_positions.first().copied() {
1374        let mut cursor = check_pos + "CHECK".len();
1375        while cursor < trimmed.len() && trimmed.as_bytes()[cursor].is_ascii_whitespace() {
1376            cursor += 1;
1377        }
1378        if cursor < trimmed.len()
1379            && trimmed.as_bytes()[cursor] == b'('
1380            && let Some((expr, _)) = extract_parenthesized(trimmed, cursor)
1381        {
1382            return expr;
1383        }
1384    }
1385    trimmed.to_string()
1386}
1387
1388fn extract_sqlite_check_constraints(create_table_sql: &str) -> Vec<CheckConstraintInfo> {
1389    let Some(definitions) = sqlite_table_definitions(create_table_sql) else {
1390        return Vec::new();
1391    };
1392
1393    let mut checks = Vec::new();
1394    for definition in split_sqlite_definitions(definitions) {
1395        let constraint_positions = keyword_positions_outside_quotes(definition, "CONSTRAINT");
1396        let check_positions = keyword_positions_outside_quotes(definition, "CHECK");
1397
1398        for check_pos in check_positions {
1399            let mut cursor = check_pos + "CHECK".len();
1400            while cursor < definition.len() && definition.as_bytes()[cursor].is_ascii_whitespace() {
1401                cursor += 1;
1402            }
1403
1404            if cursor >= definition.len() || definition.as_bytes()[cursor] != b'(' {
1405                continue;
1406            }
1407
1408            let Some((expression, _end_pos)) = extract_parenthesized(definition, cursor) else {
1409                continue;
1410            };
1411
1412            checks.push(CheckConstraintInfo {
1413                name: sqlite_constraint_name_for_check(
1414                    definition,
1415                    check_pos,
1416                    &constraint_positions,
1417                ),
1418                expression,
1419            });
1420        }
1421    }
1422
1423    checks
1424}
1425
1426fn sqlite_table_definitions(create_table_sql: &str) -> Option<&str> {
1427    let mut start = None;
1428    let mut depth = 0usize;
1429
1430    for (idx, byte) in create_table_sql.as_bytes().iter().copied().enumerate() {
1431        match byte {
1432            b'(' => {
1433                if start.is_none() {
1434                    start = Some(idx + 1);
1435                }
1436                depth += 1;
1437            }
1438            b')' if depth > 0 => {
1439                depth -= 1;
1440                if depth == 0 {
1441                    return start.map(|s| &create_table_sql[s..idx]);
1442                }
1443            }
1444            _ => {}
1445        }
1446    }
1447
1448    None
1449}
1450
1451fn split_sqlite_definitions(definitions: &str) -> Vec<&str> {
1452    let mut parts = Vec::new();
1453    let bytes = definitions.as_bytes();
1454    let mut depth = 0usize;
1455    let mut start = 0usize;
1456    let mut i = 0usize;
1457    let mut single_quote = false;
1458    let mut double_quote = false;
1459    let mut backtick_quote = false;
1460    let mut bracket_quote = false;
1461
1462    while i < bytes.len() {
1463        let b = bytes[i];
1464        if single_quote {
1465            if b == b'\'' {
1466                if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
1467                    i += 2;
1468                    continue;
1469                }
1470                single_quote = false;
1471            }
1472            i += 1;
1473            continue;
1474        }
1475        if double_quote {
1476            if b == b'"' {
1477                double_quote = false;
1478            }
1479            i += 1;
1480            continue;
1481        }
1482        if backtick_quote {
1483            if b == b'`' {
1484                backtick_quote = false;
1485            }
1486            i += 1;
1487            continue;
1488        }
1489        if bracket_quote {
1490            if b == b']' {
1491                bracket_quote = false;
1492            }
1493            i += 1;
1494            continue;
1495        }
1496
1497        match b {
1498            b'\'' => single_quote = true,
1499            b'"' => double_quote = true,
1500            b'`' => backtick_quote = true,
1501            b'[' => bracket_quote = true,
1502            b'(' => depth += 1,
1503            b')' if depth > 0 => depth -= 1,
1504            b',' if depth == 0 => {
1505                let part = definitions[start..i].trim();
1506                if !part.is_empty() {
1507                    parts.push(part);
1508                }
1509                start = i + 1;
1510            }
1511            _ => {}
1512        }
1513
1514        i += 1;
1515    }
1516
1517    let tail = definitions[start..].trim();
1518    if !tail.is_empty() {
1519        parts.push(tail);
1520    }
1521
1522    parts
1523}
1524
1525fn keyword_positions_outside_quotes(input: &str, keyword: &str) -> Vec<usize> {
1526    if keyword.is_empty() || input.len() < keyword.len() {
1527        return Vec::new();
1528    }
1529
1530    let bytes = input.as_bytes();
1531    let keyword_bytes = keyword.as_bytes();
1532    let mut positions = Vec::new();
1533    let mut i = 0usize;
1534    let mut single_quote = false;
1535    let mut double_quote = false;
1536    let mut backtick_quote = false;
1537    let mut bracket_quote = false;
1538
1539    while i < bytes.len() {
1540        let b = bytes[i];
1541        if single_quote {
1542            if b == b'\'' {
1543                if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
1544                    i += 2;
1545                    continue;
1546                }
1547                single_quote = false;
1548            }
1549            i += 1;
1550            continue;
1551        }
1552        if double_quote {
1553            if b == b'"' {
1554                double_quote = false;
1555            }
1556            i += 1;
1557            continue;
1558        }
1559        if backtick_quote {
1560            if b == b'`' {
1561                backtick_quote = false;
1562            }
1563            i += 1;
1564            continue;
1565        }
1566        if bracket_quote {
1567            if b == b']' {
1568                bracket_quote = false;
1569            }
1570            i += 1;
1571            continue;
1572        }
1573
1574        match b {
1575            b'\'' => {
1576                single_quote = true;
1577                i += 1;
1578                continue;
1579            }
1580            b'"' => {
1581                double_quote = true;
1582                i += 1;
1583                continue;
1584            }
1585            b'`' => {
1586                backtick_quote = true;
1587                i += 1;
1588                continue;
1589            }
1590            b'[' => {
1591                bracket_quote = true;
1592                i += 1;
1593                continue;
1594            }
1595            _ => {}
1596        }
1597
1598        if i + keyword_bytes.len() <= bytes.len()
1599            && bytes[i..i + keyword_bytes.len()].eq_ignore_ascii_case(keyword_bytes)
1600            && (i == 0 || !is_identifier_byte(bytes[i - 1]))
1601            && (i + keyword_bytes.len() == bytes.len()
1602                || !is_identifier_byte(bytes[i + keyword_bytes.len()]))
1603        {
1604            positions.push(i);
1605            i += keyword_bytes.len();
1606            continue;
1607        }
1608
1609        i += 1;
1610    }
1611
1612    positions
1613}
1614
1615fn sqlite_constraint_name_for_check(
1616    definition: &str,
1617    check_pos: usize,
1618    constraint_positions: &[usize],
1619) -> Option<String> {
1620    let constraint_pos = constraint_positions
1621        .iter()
1622        .copied()
1623        .rfind(|pos| *pos < check_pos)?;
1624
1625    let mut cursor = constraint_pos + "CONSTRAINT".len();
1626    while cursor < definition.len() && definition.as_bytes()[cursor].is_ascii_whitespace() {
1627        cursor += 1;
1628    }
1629    if cursor >= definition.len() {
1630        return None;
1631    }
1632
1633    let (name, _next) = parse_sqlite_identifier_token(definition, cursor)?;
1634    Some(name)
1635}
1636
1637fn parse_sqlite_identifier_token(input: &str, start: usize) -> Option<(String, usize)> {
1638    let bytes = input.as_bytes();
1639    let first = *bytes.get(start)?;
1640    match first {
1641        b'"' => {
1642            let mut i = start + 1;
1643            while i < bytes.len() {
1644                if bytes[i] == b'"' {
1645                    if i + 1 < bytes.len() && bytes[i + 1] == b'"' {
1646                        i += 2;
1647                        continue;
1648                    }
1649                    let name = input[start + 1..i].replace("\"\"", "\"");
1650                    return Some((name, i + 1));
1651                }
1652                i += 1;
1653            }
1654            None
1655        }
1656        b'`' => {
1657            let mut i = start + 1;
1658            while i < bytes.len() {
1659                if bytes[i] == b'`' {
1660                    if i + 1 < bytes.len() && bytes[i + 1] == b'`' {
1661                        i += 2;
1662                        continue;
1663                    }
1664                    let name = input[start + 1..i].replace("``", "`");
1665                    return Some((name, i + 1));
1666                }
1667                i += 1;
1668            }
1669            None
1670        }
1671        b'[' => {
1672            let mut i = start + 1;
1673            while i < bytes.len() {
1674                if bytes[i] == b']' {
1675                    let name = input[start + 1..i].to_string();
1676                    return Some((name, i + 1));
1677                }
1678                i += 1;
1679            }
1680            None
1681        }
1682        _ => {
1683            let mut i = start;
1684            while i < bytes.len() && !bytes[i].is_ascii_whitespace() {
1685                i += 1;
1686            }
1687            if i == start {
1688                None
1689            } else {
1690                Some((input[start..i].to_string(), i))
1691            }
1692        }
1693    }
1694}
1695
1696fn extract_parenthesized(input: &str, open_paren_pos: usize) -> Option<(String, usize)> {
1697    let bytes = input.as_bytes();
1698    if bytes.get(open_paren_pos).copied() != Some(b'(') {
1699        return None;
1700    }
1701
1702    let mut depth = 0usize;
1703    let mut i = open_paren_pos;
1704    let mut single_quote = false;
1705    let mut double_quote = false;
1706    let mut backtick_quote = false;
1707    let mut bracket_quote = false;
1708
1709    while i < bytes.len() {
1710        let b = bytes[i];
1711        if single_quote {
1712            if b == b'\'' {
1713                if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
1714                    i += 2;
1715                    continue;
1716                }
1717                single_quote = false;
1718            }
1719            i += 1;
1720            continue;
1721        }
1722        if double_quote {
1723            if b == b'"' {
1724                double_quote = false;
1725            }
1726            i += 1;
1727            continue;
1728        }
1729        if backtick_quote {
1730            if b == b'`' {
1731                backtick_quote = false;
1732            }
1733            i += 1;
1734            continue;
1735        }
1736        if bracket_quote {
1737            if b == b']' {
1738                bracket_quote = false;
1739            }
1740            i += 1;
1741            continue;
1742        }
1743
1744        match b {
1745            b'\'' => single_quote = true,
1746            b'"' => double_quote = true,
1747            b'`' => backtick_quote = true,
1748            b'[' => bracket_quote = true,
1749            b'(' => depth += 1,
1750            b')' => {
1751                if depth == 0 {
1752                    return None;
1753                }
1754                depth -= 1;
1755                if depth == 0 {
1756                    let expression = input[open_paren_pos + 1..i].trim().to_string();
1757                    return Some((expression, i));
1758                }
1759            }
1760            _ => {}
1761        }
1762        i += 1;
1763    }
1764
1765    None
1766}
1767
1768fn is_identifier_byte(b: u8) -> bool {
1769    b.is_ascii_alphanumeric() || b == b'_'
1770}
1771
1772// ============================================================================
1773// Unit Tests
1774// ============================================================================
1775
1776#[cfg(test)]
1777mod tests {
1778    use super::*;
1779
1780    #[test]
1781    fn test_parsed_sql_type_varchar() {
1782        let t = ParsedSqlType::parse("VARCHAR(255)");
1783        assert_eq!(t.base_type, "VARCHAR");
1784        assert_eq!(t.length, Some(255));
1785        assert_eq!(t.precision, None);
1786        assert_eq!(t.scale, None);
1787        assert!(!t.unsigned);
1788        assert!(!t.array);
1789    }
1790
1791    #[test]
1792    fn test_parsed_sql_type_decimal() {
1793        let t = ParsedSqlType::parse("DECIMAL(10,2)");
1794        assert_eq!(t.base_type, "DECIMAL");
1795        assert_eq!(t.length, None);
1796        assert_eq!(t.precision, Some(10));
1797        assert_eq!(t.scale, Some(2));
1798    }
1799
1800    #[test]
1801    fn test_parsed_sql_type_unsigned() {
1802        let t = ParsedSqlType::parse("INT UNSIGNED");
1803        assert_eq!(t.base_type, "INT");
1804        assert!(t.unsigned);
1805    }
1806
1807    #[test]
1808    fn test_parsed_sql_type_array() {
1809        let t = ParsedSqlType::parse("TEXT[]");
1810        assert_eq!(t.base_type, "TEXT");
1811        assert!(t.array);
1812    }
1813
1814    #[test]
1815    fn test_parsed_sql_type_simple() {
1816        let t = ParsedSqlType::parse("INTEGER");
1817        assert_eq!(t.base_type, "INTEGER");
1818        assert_eq!(t.length, None);
1819        assert!(!t.unsigned);
1820        assert!(!t.array);
1821    }
1822
1823    #[test]
1824    fn test_parsed_sql_type_is_text() {
1825        assert!(ParsedSqlType::parse("VARCHAR(100)").is_text());
1826        assert!(ParsedSqlType::parse("TEXT").is_text());
1827        assert!(ParsedSqlType::parse("CHAR(1)").is_text());
1828        assert!(!ParsedSqlType::parse("INTEGER").is_text());
1829    }
1830
1831    #[test]
1832    fn test_parsed_sql_type_is_numeric() {
1833        assert!(ParsedSqlType::parse("INTEGER").is_numeric());
1834        assert!(ParsedSqlType::parse("BIGINT").is_numeric());
1835        assert!(ParsedSqlType::parse("DECIMAL(10,2)").is_numeric());
1836        assert!(!ParsedSqlType::parse("TEXT").is_numeric());
1837    }
1838
1839    #[test]
1840    fn test_parsed_sql_type_is_datetime() {
1841        assert!(ParsedSqlType::parse("DATE").is_datetime());
1842        assert!(ParsedSqlType::parse("TIMESTAMP").is_datetime());
1843        assert!(ParsedSqlType::parse("TIMESTAMPTZ").is_datetime());
1844        assert!(!ParsedSqlType::parse("TEXT").is_datetime());
1845    }
1846
1847    #[test]
1848    fn test_database_schema_new() {
1849        let schema = DatabaseSchema::new(Dialect::Postgres);
1850        assert_eq!(schema.dialect, Dialect::Postgres);
1851        assert!(schema.tables.is_empty());
1852    }
1853
1854    #[test]
1855    fn test_table_info_column() {
1856        let table = TableInfo {
1857            name: "test".to_string(),
1858            columns: vec![ColumnInfo {
1859                name: "id".to_string(),
1860                sql_type: "INTEGER".to_string(),
1861                parsed_type: ParsedSqlType::parse("INTEGER"),
1862                nullable: false,
1863                default: None,
1864                primary_key: true,
1865                auto_increment: true,
1866                comment: None,
1867            }],
1868            primary_key: vec!["id".to_string()],
1869            foreign_keys: Vec::new(),
1870            unique_constraints: Vec::new(),
1871            check_constraints: Vec::new(),
1872            indexes: Vec::new(),
1873            comment: None,
1874        };
1875
1876        assert!(table.column("id").is_some());
1877        assert!(table.column("nonexistent").is_none());
1878        assert!(table.has_auto_pk());
1879    }
1880
1881    #[test]
1882    fn test_build_postgres_type_array() {
1883        let result = build_postgres_type("ARRAY", "_text", None, None, None);
1884        assert_eq!(result, "text[]");
1885    }
1886
1887    #[test]
1888    fn test_build_postgres_type_varchar() {
1889        let result = build_postgres_type("character varying", "", Some(100), None, None);
1890        assert_eq!(result, "CHARACTER VARYING(100)");
1891    }
1892
1893    #[test]
1894    fn test_build_postgres_type_numeric() {
1895        let result = build_postgres_type("numeric", "", None, Some(10), Some(2));
1896        assert_eq!(result, "NUMERIC(10,2)");
1897    }
1898
1899    #[test]
1900    fn test_sanitize_identifier_normal() {
1901        assert_eq!(sanitize_identifier("users"), "users");
1902        assert_eq!(sanitize_identifier("my_table"), "my_table");
1903        assert_eq!(sanitize_identifier("Table123"), "Table123");
1904    }
1905
1906    #[test]
1907    fn test_sanitize_identifier_sql_injection() {
1908        // SQL injection attempts should be sanitized
1909        assert_eq!(sanitize_identifier("users; DROP TABLE--"), "usersDROPTABLE");
1910        assert_eq!(sanitize_identifier("table`; malicious"), "tablemalicious");
1911        assert_eq!(sanitize_identifier("users'--"), "users");
1912        assert_eq!(
1913            sanitize_identifier("table\"); DROP TABLE users;--"),
1914            "tableDROPTABLEusers"
1915        );
1916    }
1917
1918    #[test]
1919    fn test_sanitize_identifier_special_chars() {
1920        // Various special characters should be stripped
1921        assert_eq!(sanitize_identifier("table-name"), "tablename");
1922        assert_eq!(sanitize_identifier("table.name"), "tablename");
1923        assert_eq!(sanitize_identifier("table name"), "tablename");
1924        assert_eq!(sanitize_identifier("table\nname"), "tablename");
1925    }
1926
1927    #[test]
1928    fn test_quote_sqlite_identifier_preserves_special_chars() {
1929        assert_eq!(quote_sqlite_identifier("my table"), "\"my table\"");
1930        assert_eq!(quote_sqlite_identifier("my\"table"), "\"my\"\"table\"");
1931    }
1932
1933    #[test]
1934    fn test_quote_mysql_identifier_preserves_special_chars() {
1935        assert_eq!(quote_mysql_identifier("my-table"), "`my-table`");
1936        assert_eq!(quote_mysql_identifier("my`table"), "`my``table`");
1937    }
1938
1939    #[test]
1940    fn test_normalize_check_expression_wrapped_check() {
1941        assert_eq!(
1942            normalize_check_expression("CHECK ((age >= 0) AND (age <= 150))"),
1943            "(age >= 0) AND (age <= 150)"
1944        );
1945    }
1946
1947    #[test]
1948    fn test_normalize_check_expression_raw_clause() {
1949        assert_eq!(normalize_check_expression("(score > 0)"), "(score > 0)");
1950    }
1951
1952    #[test]
1953    fn test_normalize_check_expression_with_quoted_commas() {
1954        assert_eq!(
1955            normalize_check_expression("CHECK (kind IN ('A,B', 'C'))"),
1956            "kind IN ('A,B', 'C')"
1957        );
1958    }
1959
1960    #[test]
1961    fn test_extract_sqlite_check_constraints_named_and_unnamed() {
1962        let sql = r"
1963            CREATE TABLE heroes (
1964                id INTEGER PRIMARY KEY,
1965                age INTEGER,
1966                CONSTRAINT age_non_negative CHECK (age >= 0),
1967                CHECK (age <= 150)
1968            )
1969        ";
1970
1971        let checks = extract_sqlite_check_constraints(sql);
1972        assert_eq!(checks.len(), 2);
1973        assert_eq!(checks[0].name.as_deref(), Some("age_non_negative"));
1974        assert_eq!(checks[0].expression, "age >= 0");
1975        assert_eq!(checks[1].name, None);
1976        assert_eq!(checks[1].expression, "age <= 150");
1977    }
1978
1979    #[test]
1980    fn test_extract_sqlite_check_constraints_column_level_and_nested() {
1981        let sql = r"
1982            CREATE TABLE heroes (
1983                age INTEGER CONSTRAINT age_positive CHECK (age > 0),
1984                score INTEGER CHECK ((score >= 0) AND (score <= 100)),
1985                level INTEGER CHECK (level > 0) CHECK (level < 10)
1986            )
1987        ";
1988
1989        let checks = extract_sqlite_check_constraints(sql);
1990        assert_eq!(checks.len(), 4);
1991        assert_eq!(checks[0].name.as_deref(), Some("age_positive"));
1992        assert_eq!(checks[0].expression, "age > 0");
1993        assert_eq!(checks[1].name, None);
1994        assert_eq!(checks[1].expression, "(score >= 0) AND (score <= 100)");
1995        assert_eq!(checks[2].expression, "level > 0");
1996        assert_eq!(checks[3].expression, "level < 10");
1997    }
1998
1999    #[test]
2000    fn test_extract_sqlite_check_constraints_handles_quoted_commas() {
2001        let sql = r"
2002            CREATE TABLE heroes (
2003                kind TEXT CHECK (kind IN ('A,B', 'C')),
2004                note TEXT
2005            )
2006        ";
2007
2008        let checks = extract_sqlite_check_constraints(sql);
2009        assert_eq!(checks.len(), 1);
2010        assert_eq!(checks[0].expression, "kind IN ('A,B', 'C')");
2011    }
2012}