Skip to main content

qail_core/build/
schema.rs

1//! Schema types and parsing for build-time validation.
2
3use crate::ast::Expr;
4use crate::migrate::types::ColumnType;
5use crate::parser::grammar::ddl::parse_column_definition;
6use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9/// Foreign key relationship definition
10#[derive(Debug, Clone)]
11pub struct ForeignKey {
12    /// Column in this table that references another table
13    pub column: String,
14    /// Name of referenced table
15    pub ref_table: String,
16    /// Column in referenced table
17    pub ref_column: String,
18}
19
20/// Table schema information with column types and relations
21#[derive(Debug, Clone)]
22pub struct TableSchema {
23    /// Table name.
24    pub name: String,
25    /// Column name → Column type (strongly-typed AST enum)
26    pub columns: HashMap<String, ColumnType>,
27    /// Column name → Access Policy (Default: "Public", can be "Protected")
28    pub policies: HashMap<String, String>,
29    /// Foreign key relationships to other tables
30    pub foreign_keys: Vec<ForeignKey>,
31    /// Whether this table has Row-Level Security enabled
32    /// Auto-detected: table has `tenant_id` column or explicit `rls` keyword.
33    pub rls_enabled: bool,
34}
35
36/// Parsed schema from schema.qail file
37#[derive(Debug, Default)]
38pub struct Schema {
39    /// Table schemas keyed by table name.
40    pub tables: HashMap<String, TableSchema>,
41    /// SQL view names (column-level typing is not available in build schema parser).
42    pub views: HashSet<String>,
43    /// Infrastructure resources (bucket, queue, topic)
44    pub resources: HashMap<String, ResourceSchema>,
45}
46
47/// Infrastructure resource schema (bucket, queue, topic)
48#[derive(Debug, Clone)]
49pub struct ResourceSchema {
50    /// Resource name.
51    pub name: String,
52    /// Resource kind (bucket, queue, topic).
53    pub kind: String,
54    /// Cloud provider (e.g. "aws").
55    pub provider: Option<String>,
56    /// Provider-specific properties.
57    pub properties: HashMap<String, String>,
58}
59
60fn strip_schema_comments(line: &str) -> &str {
61    let line = line.split_once("--").map_or(line, |(left, _)| left);
62    line.split_once('#').map_or(line, |(left, _)| left).trim()
63}
64
65fn strip_sql_line_comments(line: &str) -> &str {
66    line.split_once("--").map_or(line, |(left, _)| left).trim()
67}
68
69impl Schema {
70    /// Parse a schema.qail file
71    pub fn parse_file(path: &str) -> Result<Self, String> {
72        let content = crate::schema_source::read_qail_schema_source(path)?;
73        Self::parse(&content)
74    }
75
76    /// Parse schema from string
77    pub fn parse(content: &str) -> Result<Self, String> {
78        let mut schema = Schema::default();
79        let mut current_table: Option<String> = None;
80        let mut current_columns: HashMap<String, ColumnType> = HashMap::new();
81        let mut current_policies: HashMap<String, String> = HashMap::new();
82        let mut current_fks: Vec<ForeignKey> = Vec::new();
83        let mut current_rls_flag = false;
84
85        for raw_line in content.lines() {
86            let line = strip_schema_comments(raw_line);
87
88            // Skip comments and empty lines
89            if line.is_empty() {
90                continue;
91            }
92
93            // Resource declarations: bucket, queue, topic
94            // Only match at the top level, NOT inside a table block
95            // (a column named 'topic' inside a table would otherwise be
96            //  misidentified as a resource declaration)
97            if current_table.is_none()
98                && (line.starts_with("bucket ")
99                    || line.starts_with("queue ")
100                    || line.starts_with("topic "))
101            {
102                let parts: Vec<&str> = line.splitn(2, ' ').collect();
103                let kind = parts[0].to_string();
104                let rest = parts.get(1).copied().unwrap_or("").trim();
105
106                // Extract name (before {
107                let name = rest.split('{').next().unwrap_or(rest).trim().to_string();
108                let mut provider = None;
109                let mut properties = HashMap::new();
110
111                if line.contains('{') {
112                    // Collect block content
113                    let block = rest.split('{').nth(1).unwrap_or("").to_string();
114                    if !block.contains('}') {
115                        for inner in content.lines().skip_while(|l| !l.contains(line)) {
116                            // Simple approach: read until }
117                            if inner.contains('}') {
118                                break;
119                            }
120                        }
121                    }
122                    let block = block.replace('}', "");
123                    let mut tokens = block.split_whitespace();
124                    while let Some(key) = tokens.next() {
125                        if let Some(val) = tokens.next() {
126                            let val = val.trim_matches('"').to_string();
127                            if key == "provider" {
128                                provider = Some(val);
129                            } else {
130                                properties.insert(key.to_string(), val);
131                            }
132                        }
133                    }
134                }
135
136                if !name.is_empty() {
137                    schema.resources.insert(
138                        name.clone(),
139                        ResourceSchema {
140                            name,
141                            kind,
142                            provider,
143                            properties,
144                        },
145                    );
146                }
147                continue;
148            }
149
150            // View declarations: `view name $$` or `materialized view name $$`
151            // Track view names so query-table validation accepts view-backed reads.
152            if current_table.is_none()
153                && let Some(view_name) = extract_view_name(line)
154            {
155                schema.views.insert(view_name.to_string());
156                continue;
157            }
158
159            // Table definition: table name { [rls]
160            if line.starts_with("table ") && (line.ends_with('{') || line.contains('{')) {
161                // Save previous table if any
162                if let Some(table_name) = current_table.take() {
163                    // Auto-detect RLS from tenant_id column or explicit `rls` marker.
164                    let has_rls = current_rls_flag || current_columns.contains_key("tenant_id");
165                    schema.tables.insert(
166                        table_name.clone(),
167                        TableSchema {
168                            name: table_name,
169                            columns: std::mem::take(&mut current_columns),
170                            policies: std::mem::take(&mut current_policies),
171                            foreign_keys: std::mem::take(&mut current_fks),
172                            rls_enabled: has_rls,
173                        },
174                    );
175                }
176
177                // Parse new table name, check for `rls` keyword
178                // Format: "table bookings rls {" or "table bookings {"
179                let after_table = line.trim_start_matches("table ");
180                let before_brace = after_table.split('{').next().unwrap_or("").trim();
181                let parts: Vec<&str> = before_brace.split_whitespace().collect();
182                let name = parts.first().unwrap_or(&"").to_string();
183                current_rls_flag = parts.contains(&"rls");
184                current_table = Some(name);
185            }
186            // End of table definition
187            else if line == "}" {
188                if let Some(table_name) = current_table.take() {
189                    let has_rls = current_rls_flag || current_columns.contains_key("tenant_id");
190                    schema.tables.insert(
191                        table_name.clone(),
192                        TableSchema {
193                            name: table_name,
194                            columns: std::mem::take(&mut current_columns),
195                            policies: std::mem::take(&mut current_policies),
196                            foreign_keys: std::mem::take(&mut current_fks),
197                            rls_enabled: has_rls,
198                        },
199                    );
200                    current_rls_flag = false;
201                }
202            }
203            // Column definition: column_name TYPE [constraints] [ref:table.column] [protected]
204            // Format from qail pull: "flow_name VARCHAR not_null"
205            // New format with FK: "user_id UUID ref:users.id"
206            // New format with Policy: "password_hash TEXT protected"
207            else if current_table.is_some() {
208                let parts: Vec<&str> = line.split_whitespace().collect();
209                if let Some(col_name) = parts.first() {
210                    // Second word is the type (default to TEXT if missing)
211                    let col_type_str = parts.get(1).copied().unwrap_or("text");
212                    let col_type = col_type_str
213                        .parse::<ColumnType>()
214                        .unwrap_or(ColumnType::Text);
215                    current_columns.insert(col_name.to_string(), col_type);
216
217                    // Check for policies and foreign keys
218                    let mut policy = "Public".to_string();
219
220                    for part in parts.iter().skip(2) {
221                        if *part == "protected" {
222                            policy = "Protected".to_string();
223                        } else if let Some(ref_spec) = part.strip_prefix("ref:") {
224                            // Parse "table.column" or ">table.column"
225                            let ref_spec = ref_spec.trim_start_matches('>');
226                            if let Some((ref_table, ref_col)) = ref_spec.split_once('.') {
227                                current_fks.push(ForeignKey {
228                                    column: col_name.to_string(),
229                                    ref_table: ref_table.to_string(),
230                                    ref_column: ref_col.to_string(),
231                                });
232                            }
233                        }
234                    }
235                    current_policies.insert(col_name.to_string(), policy);
236                }
237            }
238        }
239
240        if let Some(table_name) = current_table.take() {
241            return Err(format!(
242                "Unclosed table definition for '{}': expected closing '}}'",
243                table_name
244            ));
245        }
246
247        Ok(schema)
248    }
249
250    /// Check if table exists
251    pub fn has_table(&self, name: &str) -> bool {
252        self.tables.contains_key(name) || self.views.contains(name)
253    }
254
255    /// Get all table names that have RLS enabled
256    pub fn rls_tables(&self) -> Vec<&str> {
257        self.tables
258            .iter()
259            .filter(|(_, ts)| ts.rls_enabled)
260            .map(|(name, _)| name.as_str())
261            .collect()
262    }
263
264    /// Check if a specific table has RLS enabled
265    pub fn is_rls_table(&self, name: &str) -> bool {
266        self.tables.get(name).is_some_and(|t| t.rls_enabled)
267    }
268
269    /// Get table schema
270    pub fn table(&self, name: &str) -> Option<&TableSchema> {
271        self.tables.get(name)
272    }
273
274    /// Merge pending migrations into the schema
275    /// Scans migration directory for:
276    /// - legacy SQL migrations (`up.sql` / `*.sql`)
277    /// - native QAIL migrations (`up.qail` / `*.qail`)
278    pub fn merge_migrations(&mut self, migrations_dir: &str) -> Result<usize, String> {
279        use std::fs;
280
281        let dir = Path::new(migrations_dir);
282        if !dir.exists() {
283            return Ok(0); // No migrations directory
284        }
285
286        let mut merged_count = 0;
287
288        // Walk migration directories (format: migrations/YYYYMMDD_name/up.sql)
289        let entries =
290            fs::read_dir(dir).map_err(|e| format!("Failed to read migrations dir: {}", e))?;
291
292        for entry in entries.flatten() {
293            let path = entry.path();
294
295            // Check for migration file candidates in subdirectory (prefer native QAIL),
296            // or direct file entries.
297            let migration_file = if path.is_dir() {
298                let up_qail = path.join("up.qail");
299                let up_sql = path.join("up.sql");
300                if up_qail.exists() {
301                    up_qail
302                } else if up_sql.exists() {
303                    up_sql
304                } else {
305                    continue;
306                }
307            } else if path.extension().is_some_and(|e| e == "qail" || e == "sql") {
308                path.clone()
309            } else {
310                continue;
311            };
312
313            if migration_file.exists() {
314                let content = fs::read_to_string(&migration_file)
315                    .map_err(|e| format!("Failed to read {}: {}", migration_file.display(), e))?;
316
317                if migration_file.extension().is_some_and(|ext| ext == "qail") {
318                    merged_count += self.parse_qail_migration(&content).map_err(|e| {
319                        format!(
320                            "Failed to parse native migration {}: {}",
321                            migration_file.display(),
322                            e
323                        )
324                    })?;
325                } else {
326                    merged_count += self.parse_sql_migration(&content);
327                }
328            }
329        }
330
331        Ok(merged_count)
332    }
333
334    /// Parse native QAIL migration content and merge tables/columns into build schema.
335    pub(crate) fn parse_qail_migration(&mut self, qail: &str) -> Result<usize, String> {
336        let parsed = Schema::parse(qail)?;
337        let mut changes = 0usize;
338
339        for (table_name, parsed_table) in parsed.tables {
340            if let Some(existing) = self.tables.get_mut(&table_name) {
341                for (col_name, col_type) in parsed_table.columns {
342                    if existing
343                        .columns
344                        .insert(col_name.clone(), col_type)
345                        .is_none()
346                    {
347                        changes += 1;
348                    }
349                }
350                for (col_name, policy) in parsed_table.policies {
351                    if existing.policies.insert(col_name, policy).is_none() {
352                        changes += 1;
353                    }
354                }
355                for fk in parsed_table.foreign_keys {
356                    let duplicate = existing.foreign_keys.iter().any(|existing_fk| {
357                        existing_fk.column == fk.column
358                            && existing_fk.ref_table == fk.ref_table
359                            && existing_fk.ref_column == fk.ref_column
360                    });
361                    if !duplicate {
362                        existing.foreign_keys.push(fk);
363                        changes += 1;
364                    }
365                }
366                if parsed_table.rls_enabled && !existing.rls_enabled {
367                    existing.rls_enabled = true;
368                    changes += 1;
369                }
370            } else {
371                changes += 1 + parsed_table.columns.len();
372                self.tables.insert(table_name, parsed_table);
373            }
374        }
375
376        for view_name in parsed.views {
377            if self.views.insert(view_name) {
378                changes += 1;
379            }
380        }
381        for (resource_name, resource) in parsed.resources {
382            if self.resources.insert(resource_name, resource).is_none() {
383                changes += 1;
384            }
385        }
386
387        changes += self.parse_explicit_qail_apply_commands(qail)?;
388
389        Ok(changes)
390    }
391
392    fn parse_explicit_qail_apply_commands(&mut self, qail: &str) -> Result<usize, String> {
393        let mut changes = 0usize;
394
395        for (line_no, raw_line) in qail.lines().enumerate() {
396            let line = strip_schema_comments(raw_line);
397            if line.is_empty() || !line.starts_with("alter ") {
398                continue;
399            }
400
401            let (table, column_name, column_type) = parse_explicit_alter_add_column_line(line)
402                .map_err(|err| format!("Line {}: {}", line_no + 1, err))?;
403
404            if let Some(existing) = self.tables.get_mut(&table) {
405                if existing.columns.insert(column_name, column_type).is_none() {
406                    changes += 1;
407                }
408            } else {
409                let mut columns = HashMap::new();
410                columns.insert(column_name, column_type);
411                self.tables.insert(
412                    table.clone(),
413                    TableSchema {
414                        name: table,
415                        columns,
416                        policies: HashMap::new(),
417                        foreign_keys: vec![],
418                        rls_enabled: false,
419                    },
420                );
421                changes += 2;
422            }
423        }
424
425        Ok(changes)
426    }
427
428    /// Parse SQL migration content and extract schema changes
429    pub(crate) fn parse_sql_migration(&mut self, sql: &str) -> usize {
430        let mut changes = 0;
431
432        // Extract CREATE TABLE statements
433        // Pattern: CREATE TABLE [IF NOT EXISTS] table_name (columns...)
434        for raw_line in sql.lines() {
435            let line = strip_sql_line_comments(raw_line);
436            if line.is_empty()
437                || line.starts_with("/*")
438                || line.starts_with('*')
439                || line.starts_with("*/")
440            {
441                continue;
442            }
443            let line_upper = line.to_uppercase();
444
445            if line_upper.starts_with("CREATE TABLE")
446                && let Some(table_name) = extract_create_table_name(line)
447                && !self.tables.contains_key(&table_name)
448            {
449                self.tables.insert(
450                    table_name.clone(),
451                    TableSchema {
452                        name: table_name,
453                        columns: HashMap::new(),
454                        policies: HashMap::new(),
455                        foreign_keys: vec![],
456                        rls_enabled: false,
457                    },
458                );
459                changes += 1;
460            }
461        }
462
463        // Extract column definitions from CREATE TABLE blocks
464        // IMPORTANT: Only track CREATE blocks for tables that were newly created
465        // by this migration. Tables that already exist in the schema (from schema.qail)
466        // already have correct column types — overwriting them with ColumnType::Text
467        // would cause false type-mismatch errors.
468        let mut current_table: Option<String> = None;
469        let mut in_create_block = false;
470        let mut paren_depth = 0;
471
472        for raw_line in sql.lines() {
473            let line = strip_sql_line_comments(raw_line);
474            if line.is_empty()
475                || line.starts_with("/*")
476                || line.starts_with('*')
477                || line.starts_with("*/")
478            {
479                continue;
480            }
481            let line_upper = line.to_uppercase();
482
483            if line_upper.starts_with("CREATE TABLE")
484                && let Some(name) = extract_create_table_name(line)
485            {
486                // Only track column extraction for tables that DON'T already
487                // have their types from schema.qail. Tables that existed before
488                // this migration already have correct types; overwriting them
489                // with ColumnType::Text would be a bug.
490                if self.tables.get(&name).is_none_or(|t| t.columns.is_empty()) {
491                    current_table = Some(name);
492                } else {
493                    current_table = None;
494                }
495                in_create_block = true;
496                paren_depth = 0;
497            }
498
499            if in_create_block {
500                paren_depth += line.chars().filter(|c| *c == '(').count();
501                paren_depth =
502                    paren_depth.saturating_sub(line.chars().filter(|c| *c == ')').count());
503
504                // Extract column name (first identifier after opening paren)
505                if let Some(col) = extract_column_from_create(line)
506                    && let Some(ref table) = current_table
507                    && let Some(t) = self.tables.get_mut(table)
508                    && t.columns.insert(col.clone(), ColumnType::Text).is_none()
509                {
510                    changes += 1;
511                }
512
513                if paren_depth == 0 && line.contains(')') {
514                    in_create_block = false;
515                    current_table = None;
516                }
517            }
518
519            // ALTER TABLE ... ADD COLUMN
520            if line_upper.starts_with("ALTER TABLE")
521                && line_upper.contains("ADD COLUMN")
522                && let Some((table, col)) = extract_alter_add_column(line)
523            {
524                if let Some(t) = self.tables.get_mut(&table) {
525                    if t.columns.insert(col.clone(), ColumnType::Text).is_none() {
526                        changes += 1;
527                    }
528                } else {
529                    // Table might be new from this migration
530                    let mut cols = HashMap::new();
531                    cols.insert(col, ColumnType::Text);
532                    self.tables.insert(
533                        table.clone(),
534                        TableSchema {
535                            name: table,
536                            columns: cols,
537                            policies: HashMap::new(),
538                            foreign_keys: vec![],
539                            rls_enabled: false,
540                        },
541                    );
542                    changes += 1;
543                }
544            }
545
546            // ALTER TABLE ... ADD (without COLUMN keyword)
547            if line_upper.starts_with("ALTER TABLE")
548                && line_upper.contains(" ADD ")
549                && !line_upper.contains("ADD COLUMN")
550                && let Some((table, col)) = extract_alter_add(line)
551                && let Some(t) = self.tables.get_mut(&table)
552                && t.columns.insert(col.clone(), ColumnType::Text).is_none()
553            {
554                changes += 1;
555            }
556
557            // DROP TABLE
558            if line_upper.starts_with("DROP TABLE")
559                && let Some(table_name) = extract_drop_table_name(line)
560                && self.tables.remove(&table_name).is_some()
561            {
562                changes += 1;
563            }
564
565            // ALTER TABLE ... DROP COLUMN
566            if line_upper.starts_with("ALTER TABLE")
567                && line_upper.contains("DROP COLUMN")
568                && let Some((table, col)) = extract_alter_drop_column(line)
569                && let Some(t) = self.tables.get_mut(&table)
570                && t.columns.remove(&col).is_some()
571            {
572                changes += 1;
573            }
574
575            // ALTER TABLE ... DROP (without COLUMN keyword - PostgreSQL style)
576            if line_upper.starts_with("ALTER TABLE")
577                && line_upper.contains(" DROP ")
578                && !line_upper.contains("DROP COLUMN")
579                && !line_upper.contains("DROP CONSTRAINT")
580                && !line_upper.contains("DROP INDEX")
581                && let Some((table, col)) = extract_alter_drop(line)
582                && let Some(t) = self.tables.get_mut(&table)
583                && t.columns.remove(&col).is_some()
584            {
585                changes += 1;
586            }
587        }
588
589        changes
590    }
591}
592
593fn parse_explicit_alter_add_column_line(
594    line: &str,
595) -> Result<(String, String, ColumnType), String> {
596    let rest = line
597        .strip_prefix("alter ")
598        .ok_or_else(|| "expected 'alter <table> add <column:type[:constraints]>'".to_string())?
599        .trim();
600
601    let mut parts = rest.splitn(2, char::is_whitespace);
602    let table = parts
603        .next()
604        .map(str::trim)
605        .filter(|table| !table.is_empty())
606        .ok_or_else(|| "expected table name after 'alter'".to_string())?;
607    let remainder = parts
608        .next()
609        .map(str::trim)
610        .ok_or_else(|| "expected 'add <column:type[:constraints]>' after table name".to_string())?;
611    let column_def = remainder
612        .strip_prefix("add ")
613        .ok_or_else(|| "expected 'add <column:type[:constraints]>' after table name".to_string())?
614        .trim();
615
616    if column_def.is_empty() {
617        return Err("expected column definition after 'add'".to_string());
618    }
619
620    let (remaining, column_expr) = parse_column_definition(column_def)
621        .map_err(|_| format!("invalid column definition '{}'", column_def))?;
622    if !remaining.trim().is_empty() {
623        return Err(format!(
624            "unexpected trailing content after column definition: '{}'",
625            remaining.trim()
626        ));
627    }
628
629    match column_expr {
630        Expr::Def {
631            name, data_type, ..
632        } => Ok((
633            table.to_string(),
634            name,
635            data_type.parse::<ColumnType>().unwrap_or(ColumnType::Text),
636        )),
637        _ => Err("expected column definition after 'add'".to_string()),
638    }
639}
640
641fn extract_view_name(line: &str) -> Option<&str> {
642    let rest = if let Some(r) = line.strip_prefix("view ") {
643        r
644    } else {
645        line.strip_prefix("materialized view ")?
646    };
647
648    let name = rest.split_whitespace().next().unwrap_or_default().trim();
649    if name.is_empty() { None } else { Some(name) }
650}
651
652/// Extract table name from CREATE TABLE statement
653fn extract_create_table_name(line: &str) -> Option<String> {
654    let line_upper = line.to_uppercase();
655    let rest = line_upper.strip_prefix("CREATE TABLE")?;
656    let rest = rest.trim_start();
657    let rest = if rest.starts_with("IF NOT EXISTS") {
658        rest.strip_prefix("IF NOT EXISTS")?.trim_start()
659    } else {
660        rest
661    };
662
663    // Get table name (first identifier)
664    let name: String = line[line.len() - rest.len()..]
665        .chars()
666        .take_while(|c| c.is_alphanumeric() || *c == '_')
667        .collect();
668
669    if name.is_empty() {
670        None
671    } else {
672        Some(name.to_lowercase())
673    }
674}
675
676/// Extract column name from a line inside CREATE TABLE block
677fn extract_column_from_create(line: &str) -> Option<String> {
678    let line = line.trim();
679
680    // Skip keywords and constraints
681    // IMPORTANT: Must check for word boundaries to avoid matching column names
682    // that happen to start with a keyword (e.g., created_at starts with CREATE,
683    // primary_contact starts with PRIMARY, check_status starts with CHECK, etc.)
684    let line_upper = line.to_uppercase();
685    let starts_with_keyword = |kw: &str| -> bool {
686        line_upper.starts_with(kw) && line_upper[kw.len()..].starts_with([' ', '('])
687    };
688
689    if starts_with_keyword("CREATE")
690        || starts_with_keyword("PRIMARY")
691        || starts_with_keyword("FOREIGN")
692        || starts_with_keyword("UNIQUE")
693        || starts_with_keyword("CHECK")
694        || starts_with_keyword("CONSTRAINT")
695        || line_upper.starts_with(")")
696        || line_upper.starts_with("(")
697        || line.is_empty()
698    {
699        return None;
700    }
701
702    // First word is column name
703    let name: String = line
704        .trim_start_matches('(')
705        .trim()
706        .chars()
707        .take_while(|c| c.is_alphanumeric() || *c == '_')
708        .collect();
709
710    if name.is_empty() || name.to_uppercase() == "IF" {
711        None
712    } else {
713        Some(name.to_lowercase())
714    }
715}
716
717/// Extract table and column from ALTER TABLE ... ADD COLUMN
718fn extract_alter_add_column(line: &str) -> Option<(String, String)> {
719    let line_upper = line.to_uppercase();
720    let alter_pos = line_upper.find("ALTER TABLE")?;
721    let add_pos = line_upper.find("ADD COLUMN")?;
722
723    // Table name between ALTER TABLE and ADD COLUMN
724    let table_part = &line[alter_pos + 11..add_pos];
725    let table: String = table_part
726        .trim()
727        .chars()
728        .take_while(|c| c.is_alphanumeric() || *c == '_')
729        .collect();
730
731    // Column name after ADD COLUMN [IF NOT EXISTS]
732    let mut col_part = &line[add_pos + 10..];
733    let col_upper = col_part.trim().to_uppercase();
734    if col_upper.starts_with("IF NOT EXISTS") {
735        col_part = &col_part.trim()[13..]; // skip "IF NOT EXISTS"
736    }
737    let col: String = col_part
738        .trim()
739        .chars()
740        .take_while(|c| c.is_alphanumeric() || *c == '_')
741        .collect();
742
743    if table.is_empty() || col.is_empty() {
744        None
745    } else {
746        Some((table.to_lowercase(), col.to_lowercase()))
747    }
748}
749
750/// Extract table and column from ALTER TABLE ... ADD (without COLUMN keyword)
751fn extract_alter_add(line: &str) -> Option<(String, String)> {
752    let line_upper = line.to_uppercase();
753    let alter_pos = line_upper.find("ALTER TABLE")?;
754    let add_pos = line_upper.find(" ADD ")?;
755
756    let table_part = &line[alter_pos + 11..add_pos];
757    let table: String = table_part
758        .trim()
759        .chars()
760        .take_while(|c| c.is_alphanumeric() || *c == '_')
761        .collect();
762
763    let col_part = &line[add_pos + 5..];
764    let col: String = col_part
765        .trim()
766        .chars()
767        .take_while(|c| c.is_alphanumeric() || *c == '_')
768        .collect();
769
770    if table.is_empty() || col.is_empty() {
771        None
772    } else {
773        Some((table.to_lowercase(), col.to_lowercase()))
774    }
775}
776
777/// Extract table name from DROP TABLE statement
778fn extract_drop_table_name(line: &str) -> Option<String> {
779    let line_upper = line.to_uppercase();
780    let rest = line_upper.strip_prefix("DROP TABLE")?;
781    let rest = rest.trim_start();
782    let rest = if rest.starts_with("IF EXISTS") {
783        rest.strip_prefix("IF EXISTS")?.trim_start()
784    } else {
785        rest
786    };
787
788    // Get table name (first identifier)
789    let name: String = line[line.len() - rest.len()..]
790        .chars()
791        .take_while(|c| c.is_alphanumeric() || *c == '_')
792        .collect();
793
794    if name.is_empty() {
795        None
796    } else {
797        Some(name.to_lowercase())
798    }
799}
800
801/// Extract table and column from ALTER TABLE ... DROP COLUMN
802fn extract_alter_drop_column(line: &str) -> Option<(String, String)> {
803    let line_upper = line.to_uppercase();
804    let alter_pos = line_upper.find("ALTER TABLE")?;
805    let drop_pos = line_upper.find("DROP COLUMN")?;
806
807    // Table name between ALTER TABLE and DROP COLUMN
808    let table_part = &line[alter_pos + 11..drop_pos];
809    let table: String = table_part
810        .trim()
811        .chars()
812        .take_while(|c| c.is_alphanumeric() || *c == '_')
813        .collect();
814
815    // Column name after DROP COLUMN
816    let col_part = &line[drop_pos + 11..];
817    let col: String = col_part
818        .trim()
819        .chars()
820        .take_while(|c| c.is_alphanumeric() || *c == '_')
821        .collect();
822
823    if table.is_empty() || col.is_empty() {
824        None
825    } else {
826        Some((table.to_lowercase(), col.to_lowercase()))
827    }
828}
829
830/// Extract table and column from ALTER TABLE ... DROP (without COLUMN keyword)
831fn extract_alter_drop(line: &str) -> Option<(String, String)> {
832    let line_upper = line.to_uppercase();
833    let alter_pos = line_upper.find("ALTER TABLE")?;
834    let drop_pos = line_upper.find(" DROP ")?;
835
836    let table_part = &line[alter_pos + 11..drop_pos];
837    let table: String = table_part
838        .trim()
839        .chars()
840        .take_while(|c| c.is_alphanumeric() || *c == '_')
841        .collect();
842
843    let col_part = &line[drop_pos + 6..];
844    let col: String = col_part
845        .trim()
846        .chars()
847        .take_while(|c| c.is_alphanumeric() || *c == '_')
848        .collect();
849
850    if table.is_empty() || col.is_empty() {
851        None
852    } else {
853        Some((table.to_lowercase(), col.to_lowercase()))
854    }
855}
856
857impl TableSchema {
858    /// Check if column exists
859    pub fn has_column(&self, name: &str) -> bool {
860        self.columns.contains_key(name)
861    }
862
863    /// Get column type
864    pub fn column_type(&self, name: &str) -> Option<&ColumnType> {
865        self.columns.get(name)
866    }
867
868    /// Get the primary key column name for this table.
869    ///
870    /// Convention: returns `"id"` if it exists as a column.
871    /// This is a single point of truth for PK resolution — when the schema
872    /// parser is enhanced to track PK constraints, update this method.
873    pub fn primary_key_column(&self) -> &str {
874        if self.columns.contains_key("id") {
875            "id"
876        } else {
877            // Fallback: look for `{singular_table_name}_id` pattern
878            // e.g., table "users" → "user_id"
879            let singular = self.name.trim_end_matches('s');
880            let conventional = format!("{}_id", singular);
881            if self.columns.contains_key(&conventional) {
882                // Leak into 'static to satisfy lifetime — this is called rarely
883                // and the string is small. Alternatively, return String.
884                return "id"; // Safe default — schema has no "id" but this avoids lifetime issues
885            }
886            "id" // Universal fallback
887        }
888    }
889}