Skip to main content

qail_core/build/
schema.rs

1//! Schema types and parsing for build-time validation.
2
3use crate::ast::Expr;
4use crate::migrate::types::ColumnType;
5use crate::parser::grammar::ddl::parse_column_definition;
6use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9/// Foreign key relationship definition
10#[derive(Debug, Clone)]
11pub struct ForeignKey {
12    /// Column in this table that references another table
13    pub column: String,
14    /// Name of referenced table
15    pub ref_table: String,
16    /// Column in referenced table
17    pub ref_column: String,
18}
19
20/// Table schema information with column types and relations
21#[derive(Debug, Clone)]
22pub struct TableSchema {
23    /// Table name.
24    pub name: String,
25    /// Column name → Column type (strongly-typed AST enum)
26    pub columns: HashMap<String, ColumnType>,
27    /// Column name → Access Policy (Default: "Public", can be "Protected")
28    pub policies: HashMap<String, String>,
29    /// Foreign key relationships to other tables
30    pub foreign_keys: Vec<ForeignKey>,
31    /// Whether this table has Row-Level Security enabled
32    /// Auto-detected: table has `tenant_id` column or explicit `rls` keyword.
33    pub rls_enabled: bool,
34}
35
36/// Parsed schema from schema.qail file
37#[derive(Debug, Default)]
38pub struct Schema {
39    /// Table schemas keyed by table name.
40    pub tables: HashMap<String, TableSchema>,
41    /// SQL view names (column-level typing is not available in build schema parser).
42    pub views: HashSet<String>,
43    /// Infrastructure resources (bucket, queue, topic)
44    pub resources: HashMap<String, ResourceSchema>,
45}
46
47/// Infrastructure resource schema (bucket, queue, topic)
48#[derive(Debug, Clone)]
49pub struct ResourceSchema {
50    /// Resource name.
51    pub name: String,
52    /// Resource kind (bucket, queue, topic).
53    pub kind: String,
54    /// Cloud provider (e.g. "aws").
55    pub provider: Option<String>,
56    /// Provider-specific properties.
57    pub properties: HashMap<String, String>,
58}
59
60fn strip_schema_comments(line: &str) -> &str {
61    let Some(idx) = schema_comment_start(line, true) else {
62        return line.trim();
63    };
64    line[..idx].trim()
65}
66
67#[cfg(test)]
68fn strip_sql_line_comments(line: &str) -> &str {
69    let Some(idx) = schema_comment_start(line, false) else {
70        return line.trim();
71    };
72    line[..idx].trim()
73}
74
75fn strip_sql_migration_comments(
76    line: &str,
77    in_block_comment: &mut bool,
78    dollar_quote: &mut Option<String>,
79) -> String {
80    let mut out = String::new();
81    let mut in_single = false;
82    let mut in_double = false;
83    let mut suppress_dollar_content = dollar_quote.is_some();
84    let mut i = 0usize;
85
86    while i < line.len() {
87        if *in_block_comment {
88            if line[i..].starts_with("*/") {
89                i += 2;
90                *in_block_comment = false;
91            } else {
92                i += line[i..].chars().next().map(char::len_utf8).unwrap_or(1);
93            }
94            continue;
95        }
96
97        if let Some(delim) = dollar_quote.as_deref() {
98            if line[i..].starts_with(delim) {
99                out.push_str(delim);
100                i += delim.len();
101                *dollar_quote = None;
102                suppress_dollar_content = false;
103            } else if let Some(ch) = line[i..].chars().next() {
104                if !suppress_dollar_content {
105                    out.push(ch);
106                }
107                i += ch.len_utf8();
108            }
109            continue;
110        }
111
112        let Some(ch) = line[i..].chars().next() else {
113            break;
114        };
115
116        if in_single {
117            out.push(ch);
118            if ch == '\'' {
119                if line[i + ch.len_utf8()..].starts_with('\'') {
120                    out.push('\'');
121                    i += ch.len_utf8() + 1;
122                } else {
123                    i += ch.len_utf8();
124                    in_single = false;
125                }
126            } else {
127                i += ch.len_utf8();
128            }
129            continue;
130        }
131
132        if in_double {
133            out.push(ch);
134            if ch == '"' {
135                if line[i + ch.len_utf8()..].starts_with('"') {
136                    out.push('"');
137                    i += ch.len_utf8() + 1;
138                } else {
139                    i += ch.len_utf8();
140                    in_double = false;
141                }
142            } else {
143                i += ch.len_utf8();
144            }
145            continue;
146        }
147
148        match ch {
149            '\'' => {
150                in_single = true;
151                out.push(ch);
152                i += ch.len_utf8();
153            }
154            '"' => {
155                in_double = true;
156                out.push(ch);
157                i += ch.len_utf8();
158            }
159            '$' => {
160                let Some(delim) = sql_dollar_quote_delimiter_at(line, i) else {
161                    out.push(ch);
162                    i += ch.len_utf8();
163                    continue;
164                };
165                out.push_str(delim);
166                i += delim.len();
167                *dollar_quote = Some(delim.to_string());
168            }
169            '-' if line[i + ch.len_utf8()..].starts_with('-') => break,
170            '/' if line[i + ch.len_utf8()..].starts_with('*') => {
171                i += ch.len_utf8() + 1;
172                *in_block_comment = true;
173            }
174            _ => {
175                out.push(ch);
176                i += ch.len_utf8();
177            }
178        }
179    }
180
181    out.trim().to_string()
182}
183
184fn schema_comment_start(line: &str, hash_comments: bool) -> Option<usize> {
185    let bytes = line.as_bytes();
186    let mut in_single = false;
187    let mut in_double = false;
188    let mut i = 0usize;
189
190    while i < bytes.len() {
191        match bytes[i] {
192            b'\'' if !in_double => {
193                if in_single && bytes.get(i + 1) == Some(&b'\'') {
194                    i += 2;
195                    continue;
196                }
197                in_single = !in_single;
198            }
199            b'"' if !in_single => {
200                if in_double && bytes.get(i + 1) == Some(&b'"') {
201                    i += 2;
202                    continue;
203                }
204                in_double = !in_double;
205            }
206            b'-' if !in_single && !in_double && bytes.get(i + 1) == Some(&b'-') => {
207                return Some(i);
208            }
209            b'#' if hash_comments && !in_single && !in_double => return Some(i),
210            _ => {}
211        }
212        i += 1;
213    }
214
215    None
216}
217
218fn sql_dollar_quote_delimiter_at(raw: &str, idx: usize) -> Option<&str> {
219    let bytes = raw.as_bytes();
220    if bytes.get(idx) != Some(&b'$') {
221        return None;
222    }
223
224    let mut end = idx + 1;
225    while end < bytes.len() {
226        match bytes[end] {
227            b'$' => return Some(&raw[idx..=end]),
228            b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'_' => end += 1,
229            _ => return None,
230        }
231    }
232
233    None
234}
235
236impl Schema {
237    /// Parse a schema.qail file
238    pub fn parse_file(path: &str) -> Result<Self, String> {
239        let content = crate::schema_source::read_qail_schema_source(path)?;
240        Self::parse(&content)
241    }
242
243    /// Parse schema from string
244    pub fn parse(content: &str) -> Result<Self, String> {
245        let mut schema = Schema::default();
246        let mut current_table: Option<String> = None;
247        let mut current_columns: HashMap<String, ColumnType> = HashMap::new();
248        let mut current_policies: HashMap<String, String> = HashMap::new();
249        let mut current_fks: Vec<ForeignKey> = Vec::new();
250        let mut current_rls_flag = false;
251        let mut enum_types: HashMap<String, Vec<String>> = HashMap::new();
252
253        let mut lines = content.lines().peekable();
254        while let Some(raw_line) = lines.next() {
255            let line = strip_schema_comments(raw_line);
256
257            // Skip comments and empty lines
258            if line.is_empty() {
259                continue;
260            }
261
262            if current_table.is_none() && line.starts_with("enum ") {
263                let (name, values) = parse_build_enum_declaration(line, &mut lines)?;
264                if enum_types.insert(name.clone(), values).is_some() {
265                    return Err(format!("duplicate enum declaration '{}'", name));
266                }
267                continue;
268            }
269
270            // Resource declarations: bucket, queue, topic
271            // Only match at the top level, NOT inside a table block
272            // (a column named 'topic' inside a table would otherwise be
273            //  misidentified as a resource declaration)
274            if current_table.is_none()
275                && (line.starts_with("bucket ")
276                    || line.starts_with("queue ")
277                    || line.starts_with("topic "))
278            {
279                let parts: Vec<&str> = line.splitn(2, ' ').collect();
280                let kind = parts[0].to_string();
281                let rest = parts.get(1).copied().unwrap_or("").trim();
282
283                // Extract name (before {
284                let has_block = line.contains('{');
285                let (name, block_start) = if has_block {
286                    let (name, block) = rest.split_once('{').unwrap_or((rest, ""));
287                    (name.trim().to_string(), Some(block.to_string()))
288                } else {
289                    let mut parts = rest.split_whitespace();
290                    let name = parts.next().unwrap_or("").to_string();
291                    if parts.next().is_some() {
292                        return Err(format!("Trailing content after {} resource name", kind));
293                    }
294                    (name, None)
295                };
296                if name.is_empty() {
297                    return Err(format!("Missing name for {} declaration", kind));
298                }
299                if !is_build_identifier(&name) {
300                    return Err(format!("Invalid {} resource name '{}'", kind, name));
301                }
302                let mut provider = None;
303                let mut properties = HashMap::new();
304
305                if let Some(mut block) = block_start {
306                    let mut block_content = None;
307                    while block_content.is_none() {
308                        block_content = resource_block_content_before_closing(&block)?;
309                        if block_content.is_some() {
310                            break;
311                        }
312                        let Some(next_line) = lines.next() else {
313                            return Err(format!(
314                                "Unclosed {} resource definition for '{}': expected closing '}}'",
315                                kind, name
316                            ));
317                        };
318                        let inner = strip_schema_comments(next_line);
319                        block.push(' ');
320                        block.push_str(inner);
321                    }
322                    let block = block_content.unwrap_or_default();
323                    let tokens = split_resource_tokens(block.trim())?;
324                    let mut tokens = tokens.iter();
325                    let mut seen_keys = HashSet::new();
326                    while let Some(key) = tokens.next() {
327                        if !seen_keys.insert(key) {
328                            return Err(format!(
329                                "Duplicate resource property '{}' in '{}'",
330                                key, name
331                            ));
332                        }
333                        let Some(val) = tokens.next() else {
334                            return Err(format!(
335                                "Resource property '{}' in '{}' requires a value",
336                                key, name
337                            ));
338                        };
339                        if key == "provider" {
340                            provider = Some(val.to_string());
341                        } else {
342                            properties.insert(key.to_string(), val.to_string());
343                        }
344                    }
345                }
346
347                if schema.resources.contains_key(&name) {
348                    return Err(format!("duplicate resource declaration '{}'", name));
349                }
350                schema.resources.insert(
351                    name.clone(),
352                    ResourceSchema {
353                        name,
354                        kind,
355                        provider,
356                        properties,
357                    },
358                );
359                continue;
360            }
361
362            // View declarations: `view name $$` or `materialized view name $$`
363            // Track view names so query-table validation accepts view-backed reads.
364            if current_table.is_none()
365                && let Some(view_name) = extract_view_name(line)
366            {
367                if !is_build_table_ref(view_name) {
368                    return Err(format!("Invalid view name '{}'", view_name));
369                }
370                if !schema.views.insert(view_name.to_string()) {
371                    return Err(format!("duplicate view declaration '{}'", view_name));
372                }
373                continue;
374            }
375
376            // Table definition: table name { [rls]
377            if line.starts_with("table ") && (line.ends_with('{') || line.contains('{')) {
378                if let Some(table_name) = current_table.as_deref() {
379                    return Err(format!(
380                        "Table declaration encountered before closing table '{}'",
381                        table_name
382                    ));
383                }
384
385                // Parse new table name, check for `rls` keyword
386                // Format: "table bookings rls {" or "table bookings {"
387                let after_table = line.trim_start_matches("table ");
388                let (before_brace, after_brace) = after_table
389                    .split_once('{')
390                    .ok_or_else(|| format!("Invalid table definition: {}", line))?;
391                if !after_brace.trim().is_empty() {
392                    return Err(format!(
393                        "Trailing content after table opening brace for '{}'",
394                        before_brace
395                            .split_whitespace()
396                            .next()
397                            .unwrap_or("<missing>")
398                    ));
399                }
400                let before_brace = before_brace.trim();
401                let parts: Vec<&str> = before_brace.split_whitespace().collect();
402                let Some(name) = parts.first().filter(|name| !name.is_empty()) else {
403                    return Err("Missing name for table declaration".to_string());
404                };
405                if !is_build_table_ref(name) {
406                    return Err(format!("Invalid table name '{}'", name));
407                }
408                let mut seen_rls_option = false;
409                for option in parts.iter().skip(1) {
410                    if *option != "rls" {
411                        return Err(format!("Unknown table option '{}' for '{}'", option, name));
412                    }
413                    if seen_rls_option {
414                        return Err(format!("Duplicate table option 'rls' for '{}'", name));
415                    }
416                    seen_rls_option = true;
417                }
418                current_rls_flag = parts.contains(&"rls");
419                current_table = Some((*name).to_string());
420            }
421            // End of table definition
422            else if let Some(after_brace) = line.strip_prefix('}') {
423                let Some(table_name) = current_table.take() else {
424                    return Err("Unexpected table closing brace".to_string());
425                };
426                if !after_brace.trim().is_empty() {
427                    return Err(format!(
428                        "Trailing content after table closing brace for '{}'",
429                        table_name
430                    ));
431                }
432                if schema.tables.contains_key(&table_name) {
433                    return Err(format!("duplicate table declaration '{}'", table_name));
434                }
435                let has_rls = current_rls_flag || current_columns.contains_key("tenant_id");
436                schema.tables.insert(
437                    table_name.clone(),
438                    TableSchema {
439                        name: table_name,
440                        columns: std::mem::take(&mut current_columns),
441                        policies: std::mem::take(&mut current_policies),
442                        foreign_keys: std::mem::take(&mut current_fks),
443                        rls_enabled: has_rls,
444                    },
445                );
446                current_rls_flag = false;
447            }
448            // Column definition: column_name TYPE [constraints] [ref:table.column] [protected]
449            // Format from qail pull: "flow_name VARCHAR not_null"
450            // New format with FK: "user_id UUID ref:users.id"
451            // New format with Policy: "password_hash TEXT protected"
452            else if current_table.is_some() {
453                if matches!(line, "enable_rls" | "force_rls") {
454                    current_rls_flag = true;
455                    continue;
456                }
457
458                let parts: Vec<&str> = line.split_whitespace().collect();
459                if let Some(col_name) = parts.first() {
460                    if !is_build_identifier(col_name) {
461                        let table_name = current_table.as_deref().unwrap_or("<unknown>");
462                        return Err(format!(
463                            "Invalid column name '{}' in table '{}'",
464                            col_name, table_name
465                        ));
466                    }
467                    if current_columns.contains_key(*col_name) {
468                        let table_name = current_table.as_deref().unwrap_or("<unknown>");
469                        return Err(format!(
470                            "duplicate column '{}' in table '{}'",
471                            col_name, table_name
472                        ));
473                    }
474                    let table_name = current_table.as_deref().unwrap_or("<unknown>");
475                    let Some((col_type, type_end)) =
476                        parse_build_column_type_prefix(&parts, &enum_types)
477                    else {
478                        let Some(col_type_str) = parts.get(1).copied() else {
479                            return Err(format!(
480                                "Missing type for column '{}' in table '{}'",
481                                col_name, table_name
482                            ));
483                        };
484                        return Err(format!(
485                            "Unknown column type '{}' for column '{}' in table '{}'",
486                            col_type_str, col_name, table_name
487                        ));
488                    };
489                    current_columns.insert(col_name.to_string(), col_type);
490
491                    // Check for policies and foreign keys
492                    let mut policy = "Public".to_string();
493                    let mut seen_protected = false;
494                    let mut seen_column_options = HashSet::new();
495                    let mut nullability_option: Option<&str> = None;
496                    let mut generated_option: Option<&str> = None;
497                    let mut has_foreign_key = false;
498                    let mut seen_fk_actions = HashSet::new();
499
500                    let mut i = type_end;
501                    while i < parts.len() {
502                        let part = parts[i];
503                        if part == "protected" {
504                            if seen_protected {
505                                return Err(format!(
506                                    "duplicate protected option for column '{}' in table '{}'",
507                                    col_name, table_name
508                                ));
509                            }
510                            seen_protected = true;
511                            policy = "Protected".to_string();
512                        } else if matches!(
513                            part,
514                            "primary_key"
515                                | "not_null"
516                                | "nullable"
517                                | "unique"
518                                | "generated_identity"
519                                | "generated_by_default_identity"
520                        ) {
521                            if !seen_column_options.insert(part) {
522                                return Err(format!(
523                                    "duplicate column option '{}' for column '{}' in table '{}'",
524                                    part, col_name, table_name
525                                ));
526                            }
527                            if matches!(part, "not_null" | "nullable") {
528                                if let Some(existing) = nullability_option {
529                                    return Err(format!(
530                                        "conflicting nullability options '{}' and '{}' for column '{}' in table '{}'",
531                                        existing, part, col_name, table_name
532                                    ));
533                                }
534                                nullability_option = Some(part);
535                            }
536                            if matches!(
537                                part,
538                                "generated_identity" | "generated_by_default_identity"
539                            ) {
540                                if let Some(existing) = generated_option {
541                                    return Err(format!(
542                                        "conflicting generated options '{}' and '{}' for column '{}' in table '{}'",
543                                        existing, part, col_name, table_name
544                                    ));
545                                }
546                                generated_option = Some(part);
547                            }
548                            // Build-time validation only needs shape, type, policy, and relations.
549                        } else if part == "default" {
550                            if i + 1 >= parts.len() {
551                                return Err(format!(
552                                    "default requires a value for column '{}' in table '{}'",
553                                    col_name, table_name
554                                ));
555                            }
556                            break;
557                        } else if part.starts_with("default=")
558                            || part.starts_with("default:")
559                            || part.starts_with("generated_stored(")
560                            || part.starts_with("check(")
561                        {
562                            break;
563                        } else if let Some(ref_spec) = part.strip_prefix("ref:") {
564                            // Parse "table.column" or ">table.column"
565                            let (ref_table, ref_column) =
566                                parse_build_ref_spec(ref_spec, col_name, table_name)?;
567                            push_build_foreign_key(
568                                &mut current_fks,
569                                col_name,
570                                ref_table,
571                                ref_column,
572                                table_name,
573                            )?;
574                            has_foreign_key = true;
575                        } else if part == "references" {
576                            if i + 1 >= parts.len() {
577                                return Err(format!(
578                                    "foreign key reference target is required for column '{}' in table '{}'",
579                                    col_name, table_name
580                                ));
581                            }
582                            i += 1;
583                            let (ref_table, ref_column) =
584                                parse_build_references_target(parts[i], col_name, table_name)?;
585                            push_build_foreign_key(
586                                &mut current_fks,
587                                col_name,
588                                ref_table,
589                                ref_column,
590                                table_name,
591                            )?;
592                            has_foreign_key = true;
593                        } else if let Some(ref_target) = part.strip_prefix("references") {
594                            let (ref_table, ref_column) =
595                                parse_build_references_target(ref_target, col_name, table_name)?;
596                            push_build_foreign_key(
597                                &mut current_fks,
598                                col_name,
599                                ref_table,
600                                ref_column,
601                                table_name,
602                            )?;
603                            has_foreign_key = true;
604                        } else if matches!(part, "on_delete" | "on_update") {
605                            if !has_foreign_key {
606                                return Err(format!(
607                                    "{} requires a preceding foreign key for column '{}' in table '{}'",
608                                    part, col_name, table_name
609                                ));
610                            }
611                            if !seen_fk_actions.insert(part) {
612                                return Err(format!(
613                                    "duplicate {} action for column '{}' in table '{}'",
614                                    part, col_name, table_name
615                                ));
616                            }
617                            if i + 1 >= parts.len() {
618                                return Err(format!(
619                                    "{} requires a foreign key action for column '{}' in table '{}'",
620                                    part, col_name, table_name
621                                ));
622                            }
623                            i += 1;
624                            if !is_build_fk_action(parts[i]) {
625                                return Err(format!(
626                                    "unknown foreign key action '{}' for column '{}' in table '{}'",
627                                    parts[i], col_name, table_name
628                                ));
629                            }
630                        } else if part == "check_name" {
631                            if i + 1 >= parts.len() {
632                                return Err(format!(
633                                    "check_name requires a name for column '{}' in table '{}'",
634                                    col_name, table_name
635                                ));
636                            }
637                            i += 1;
638                        } else {
639                            return Err(format!(
640                                "Unknown column option '{}' for column '{}' in table '{}'",
641                                part, col_name, table_name
642                            ));
643                        }
644                        i += 1;
645                    }
646                    current_policies.insert(col_name.to_string(), policy);
647                }
648            }
649        }
650
651        if let Some(table_name) = current_table.take() {
652            return Err(format!(
653                "Unclosed table definition for '{}': expected closing '}}'",
654                table_name
655            ));
656        }
657
658        Ok(schema)
659    }
660
661    /// Check if table exists
662    pub fn has_table(&self, name: &str) -> bool {
663        self.tables.contains_key(name) || self.views.contains(name)
664    }
665
666    /// Get all table names that have RLS enabled
667    pub fn rls_tables(&self) -> Vec<&str> {
668        self.tables
669            .iter()
670            .filter(|(_, ts)| ts.rls_enabled)
671            .map(|(name, _)| name.as_str())
672            .collect()
673    }
674
675    /// Check if a specific table has RLS enabled
676    pub fn is_rls_table(&self, name: &str) -> bool {
677        self.tables.get(name).is_some_and(|t| t.rls_enabled)
678    }
679
680    /// Get table schema
681    pub fn table(&self, name: &str) -> Option<&TableSchema> {
682        self.tables.get(name)
683    }
684
685    /// Merge pending migrations into the schema
686    /// Scans migration directory for:
687    /// - legacy SQL migrations (`up.sql` / `*.sql`)
688    /// - native QAIL migrations (`up.qail` / `*.qail`)
689    pub fn merge_migrations(&mut self, migrations_dir: &str) -> Result<usize, String> {
690        use std::fs;
691
692        let dir = Path::new(migrations_dir);
693        if !dir.exists() {
694            return Ok(0); // No migrations directory
695        }
696
697        let mut merged_count = 0;
698
699        // Walk migration directories (format: migrations/YYYYMMDD_name/up.sql)
700        let entries =
701            fs::read_dir(dir).map_err(|e| format!("Failed to read migrations dir: {}", e))?;
702
703        for entry in entries.flatten() {
704            let path = entry.path();
705
706            // Check for migration file candidates in subdirectory (prefer native QAIL),
707            // or direct file entries.
708            let migration_file = if path.is_dir() {
709                let up_qail = path.join("up.qail");
710                let up_sql = path.join("up.sql");
711                if up_qail.exists() {
712                    up_qail
713                } else if up_sql.exists() {
714                    up_sql
715                } else {
716                    continue;
717                }
718            } else if path.extension().is_some_and(|e| e == "qail" || e == "sql") {
719                path.clone()
720            } else {
721                continue;
722            };
723
724            if migration_file.exists() {
725                let content = fs::read_to_string(&migration_file)
726                    .map_err(|e| format!("Failed to read {}: {}", migration_file.display(), e))?;
727
728                if migration_file.extension().is_some_and(|ext| ext == "qail") {
729                    merged_count += self.parse_qail_migration(&content).map_err(|e| {
730                        format!(
731                            "Failed to parse native migration {}: {}",
732                            migration_file.display(),
733                            e
734                        )
735                    })?;
736                } else {
737                    merged_count += self.parse_sql_migration(&content);
738                }
739            }
740        }
741
742        Ok(merged_count)
743    }
744
745    /// Parse native QAIL migration content and merge tables/columns into build schema.
746    pub(crate) fn parse_qail_migration(&mut self, qail: &str) -> Result<usize, String> {
747        let parsed = Schema::parse(qail)?;
748        let mut changes = 0usize;
749
750        for (table_name, parsed_table) in parsed.tables {
751            if let Some(existing) = self.tables.get_mut(&table_name) {
752                for (col_name, col_type) in parsed_table.columns {
753                    if let Some(existing_type) = existing.columns.get(&col_name) {
754                        if existing_type != &col_type {
755                            return Err(format!(
756                                "conflicting column type for '{}.{}': existing {:?}, migration {:?}",
757                                table_name, col_name, existing_type, col_type
758                            ));
759                        }
760                    } else {
761                        existing.columns.insert(col_name.clone(), col_type);
762                        changes += 1;
763                    }
764                }
765                for (col_name, policy) in parsed_table.policies {
766                    if existing.policies.insert(col_name, policy).is_none() {
767                        changes += 1;
768                    }
769                }
770                for fk in parsed_table.foreign_keys {
771                    let duplicate = existing.foreign_keys.iter().any(|existing_fk| {
772                        existing_fk.column == fk.column
773                            && existing_fk.ref_table == fk.ref_table
774                            && existing_fk.ref_column == fk.ref_column
775                    });
776                    if !duplicate {
777                        existing.foreign_keys.push(fk);
778                        changes += 1;
779                    }
780                }
781                if parsed_table.rls_enabled && !existing.rls_enabled {
782                    existing.rls_enabled = true;
783                    changes += 1;
784                }
785            } else {
786                changes += 1 + parsed_table.columns.len();
787                self.tables.insert(table_name, parsed_table);
788            }
789        }
790
791        for view_name in parsed.views {
792            if self.views.insert(view_name) {
793                changes += 1;
794            }
795        }
796        for (resource_name, resource) in parsed.resources {
797            if self.resources.insert(resource_name, resource).is_none() {
798                changes += 1;
799            }
800        }
801
802        changes += self.parse_explicit_qail_apply_commands(qail)?;
803
804        Ok(changes)
805    }
806
807    fn parse_explicit_qail_apply_commands(&mut self, qail: &str) -> Result<usize, String> {
808        let mut changes = 0usize;
809
810        for (line_no, raw_line) in qail.lines().enumerate() {
811            let line = strip_schema_comments(raw_line);
812            if line.is_empty() || !line.starts_with("alter ") {
813                continue;
814            }
815
816            let (table, column_name, column_type) = parse_explicit_alter_add_column_line(line)
817                .map_err(|err| format!("Line {}: {}", line_no + 1, err))?;
818
819            if let Some(existing) = self.tables.get_mut(&table) {
820                if let Some(existing_type) = existing.columns.get(&column_name) {
821                    if existing_type != &column_type {
822                        return Err(format!(
823                            "conflicting column type for '{}.{}': existing {:?}, migration {:?}",
824                            table, column_name, existing_type, column_type
825                        ));
826                    }
827                } else {
828                    existing.columns.insert(column_name, column_type);
829                    changes += 1;
830                }
831            } else {
832                let mut columns = HashMap::new();
833                columns.insert(column_name, column_type);
834                self.tables.insert(
835                    table.clone(),
836                    TableSchema {
837                        name: table,
838                        columns,
839                        policies: HashMap::new(),
840                        foreign_keys: vec![],
841                        rls_enabled: false,
842                    },
843                );
844                changes += 2;
845            }
846        }
847
848        Ok(changes)
849    }
850
851    /// Parse SQL migration content and extract schema changes
852    pub(crate) fn parse_sql_migration(&mut self, sql: &str) -> usize {
853        let mut changes = 0;
854
855        for statement in sql_migration_statements(sql) {
856            let line = statement.as_str();
857            let line_upper = line.to_uppercase();
858
859            if let Some((name, after_table_name)) = extract_create_table_name_with_tail(line) {
860                let table_existed = self.tables.contains_key(&name);
861                if !table_existed {
862                    self.tables.insert(
863                        name.clone(),
864                        TableSchema {
865                            name: name.clone(),
866                            columns: HashMap::new(),
867                            policies: HashMap::new(),
868                            foreign_keys: vec![],
869                            rls_enabled: false,
870                        },
871                    );
872                    changes += 1;
873                }
874
875                let after_table_name = after_table_name.trim_start();
876                let has_column_block =
877                    after_table_name.is_empty() || after_table_name.starts_with('(');
878                // Only track column extraction for tables that DON'T already
879                // have their types from schema.qail. Tables that existed before
880                // this migration already have correct types; overwriting them
881                // with ColumnType::Text would be a bug.
882                if has_column_block
883                    && (!table_existed
884                        || self.tables.get(&name).is_some_and(|t| t.columns.is_empty()))
885                {
886                    for col in extract_inline_create_columns(line) {
887                        if let Some(t) = self.tables.get_mut(&name)
888                            && t.columns.insert(col, ColumnType::Text).is_none()
889                        {
890                            changes += 1;
891                        }
892                    }
893                }
894                continue;
895            }
896
897            // ALTER TABLE ... ADD [COLUMN] ...
898            for (table, col) in extract_alter_add_columns(line) {
899                if let Some(t) = self.tables.get_mut(&table) {
900                    if t.columns.insert(col.clone(), ColumnType::Text).is_none() {
901                        changes += 1;
902                    }
903                } else {
904                    // Table might be new from this migration
905                    let mut cols = HashMap::new();
906                    cols.insert(col, ColumnType::Text);
907                    self.tables.insert(
908                        table.clone(),
909                        TableSchema {
910                            name: table,
911                            columns: cols,
912                            policies: HashMap::new(),
913                            foreign_keys: vec![],
914                            rls_enabled: false,
915                        },
916                    );
917                    changes += 1;
918                }
919            }
920
921            // DROP TABLE
922            if line_upper.starts_with("DROP TABLE") {
923                for table_name in extract_drop_table_names(line) {
924                    if self.tables.remove(&table_name).is_some() {
925                        changes += 1;
926                    }
927                }
928            }
929
930            // ALTER TABLE ... DROP [COLUMN] ...
931            for (table, col) in extract_alter_drop_columns(line) {
932                if let Some(t) = self.tables.get_mut(&table)
933                    && t.columns.remove(&col).is_some()
934                {
935                    changes += 1;
936                }
937            }
938
939            // ALTER TABLE ... RENAME COLUMN old TO new
940            if line_upper.starts_with("ALTER TABLE")
941                && let Some((table, old_col, new_col)) = extract_alter_rename_column(line)
942                && let Some(t) = self.tables.get_mut(&table)
943            {
944                let old_type = t.columns.remove(&old_col);
945                if old_type.is_some() {
946                    changes += 1;
947                }
948                if t.columns
949                    .insert(new_col, old_type.unwrap_or(ColumnType::Text))
950                    .is_none()
951                {
952                    changes += 1;
953                }
954            }
955
956            // ALTER TABLE ... RENAME TO new_table
957            if line_upper.starts_with("ALTER TABLE")
958                && let Some((old_table, new_table)) = extract_alter_rename_table(line)
959                && !self.tables.contains_key(&new_table)
960                && let Some(mut table) = self.tables.remove(&old_table)
961            {
962                table.name = new_table.clone();
963                self.tables.insert(new_table, table);
964                changes += 1;
965            }
966        }
967
968        changes
969    }
970}
971
972fn sql_migration_statements(sql: &str) -> Vec<String> {
973    let mut cleaned = String::new();
974    let mut in_block_comment = false;
975    let mut dollar_quote = None;
976
977    for raw_line in sql.lines() {
978        let line = strip_sql_migration_comments(raw_line, &mut in_block_comment, &mut dollar_quote);
979        if line.is_empty() {
980            continue;
981        }
982        cleaned.push_str(&line);
983        cleaned.push('\n');
984    }
985
986    split_sql_statements(&cleaned)
987}
988
989fn parse_build_column_type_prefix(
990    parts: &[&str],
991    enum_types: &HashMap<String, Vec<String>>,
992) -> Option<(ColumnType, usize)> {
993    let max_end = parts.len().min(5);
994    for end in (2..=max_end).rev() {
995        let type_str = parts[1..end].join(" ");
996        if let Ok(column_type) = type_str.parse::<ColumnType>() {
997            return Some((column_type, end));
998        }
999        if let Some(values) = enum_types.get(&type_str) {
1000            return Some((
1001                ColumnType::Enum {
1002                    name: type_str,
1003                    values: values.clone(),
1004                },
1005                end,
1006            ));
1007        }
1008    }
1009    None
1010}
1011
1012fn parse_build_enum_declaration<'a, I: Iterator<Item = &'a str>>(
1013    first_line: &str,
1014    lines: &mut std::iter::Peekable<I>,
1015) -> Result<(String, Vec<String>), String> {
1016    let rest = first_line
1017        .strip_prefix("enum ")
1018        .ok_or_else(|| "Expected 'enum' prefix".to_string())?
1019        .trim();
1020    let (name, body_start) = rest
1021        .split_once('{')
1022        .ok_or_else(|| "enum definition requires { values }".to_string())?;
1023    let name = name.trim();
1024    if name.is_empty() {
1025        return Err("enum name is missing before '{'".to_string());
1026    }
1027    if !is_build_table_ref(name) {
1028        return Err(format!("Invalid enum name '{}'", name));
1029    }
1030
1031    let mut body = body_start.to_string();
1032    while build_enum_body_before_closing_brace(&body)?.is_none() {
1033        let Some(next_line) = lines.next() else {
1034            return Err(format!("enum '{}' is missing closing '}}'", name));
1035        };
1036        let inner = strip_schema_comments(next_line);
1037        body.push(' ');
1038        body.push_str(inner);
1039    }
1040
1041    let body = build_enum_body_before_closing_brace(&body)?
1042        .ok_or_else(|| format!("enum '{}' is missing closing '}}'", name))?;
1043    let values = parse_build_enum_values(body)?;
1044    if values.is_empty() {
1045        return Err(format!("enum '{}' must have at least one value", name));
1046    }
1047
1048    Ok((name.to_string(), values))
1049}
1050
1051fn build_enum_body_before_closing_brace(raw: &str) -> Result<Option<&str>, String> {
1052    let mut quote: Option<char> = None;
1053    let mut chars = raw.char_indices().peekable();
1054
1055    while let Some((idx, ch)) = chars.next() {
1056        if let Some(q) = quote {
1057            if ch == q {
1058                if chars.peek().is_some_and(|(_, next)| *next == q) {
1059                    chars.next();
1060                } else {
1061                    quote = None;
1062                }
1063            }
1064            continue;
1065        }
1066
1067        match ch {
1068            '\'' | '"' => quote = Some(ch),
1069            '}' => {
1070                let rest = &raw[idx + ch.len_utf8()..];
1071                if !rest.trim().is_empty() {
1072                    return Err("trailing content after enum block".to_string());
1073                }
1074                return Ok(Some(&raw[..idx]));
1075            }
1076            _ => {}
1077        }
1078    }
1079
1080    Ok(None)
1081}
1082
1083fn parse_build_enum_values(raw: &str) -> Result<Vec<String>, String> {
1084    let mut values = Vec::new();
1085    let mut quote: Option<char> = None;
1086    let mut start = 0;
1087    let mut chars = raw.char_indices().peekable();
1088
1089    while let Some((idx, ch)) = chars.next() {
1090        if let Some(q) = quote {
1091            if ch == q {
1092                if chars.peek().is_some_and(|(_, next)| *next == q) {
1093                    chars.next();
1094                } else {
1095                    quote = None;
1096                }
1097            }
1098            continue;
1099        }
1100
1101        match ch {
1102            '\'' | '"' => quote = Some(ch),
1103            ',' => {
1104                push_build_enum_value(&mut values, &raw[start..idx])?;
1105                start = idx + ch.len_utf8();
1106            }
1107            _ => {}
1108        }
1109    }
1110
1111    if quote.is_some() {
1112        return Err("unterminated quoted enum value".to_string());
1113    }
1114
1115    push_build_enum_value(&mut values, &raw[start..])?;
1116    let mut seen = HashSet::new();
1117    for value in &values {
1118        if !seen.insert(value) {
1119            return Err(format!("duplicate enum value '{}'", value));
1120        }
1121    }
1122
1123    Ok(values)
1124}
1125
1126fn push_build_enum_value(values: &mut Vec<String>, raw: &str) -> Result<(), String> {
1127    let was_quoted = raw
1128        .trim()
1129        .chars()
1130        .next()
1131        .is_some_and(|ch| matches!(ch, '\'' | '"'));
1132    let value = parse_build_enum_value(raw)?;
1133    if value.is_empty() && !was_quoted {
1134        return Err("enum value is empty".to_string());
1135    }
1136    values.push(value);
1137    Ok(())
1138}
1139
1140fn parse_build_enum_value(raw: &str) -> Result<String, String> {
1141    let trimmed = raw.trim();
1142    if trimmed.is_empty() {
1143        return Ok(String::new());
1144    }
1145
1146    if let Some(quote) = trimmed.chars().next().filter(|ch| matches!(ch, '"' | '\'')) {
1147        let mut value = String::new();
1148        let mut chars = trimmed.char_indices();
1149        chars.next();
1150        let mut chars = chars.peekable();
1151
1152        while let Some((idx, ch)) = chars.next() {
1153            if ch == quote {
1154                if chars.peek().is_some_and(|(_, next)| *next == quote) {
1155                    value.push(quote);
1156                    chars.next();
1157                    continue;
1158                }
1159
1160                let after = idx + ch.len_utf8();
1161                if !trimmed[after..].trim().is_empty() {
1162                    return Err(format!("invalid enum value token '{}'", trimmed));
1163                }
1164                return Ok(value);
1165            }
1166
1167            value.push(ch);
1168        }
1169
1170        return Err("unterminated quoted enum value".to_string());
1171    }
1172
1173    if trimmed
1174        .chars()
1175        .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1176    {
1177        return Ok(trimmed.to_string());
1178    }
1179
1180    Err(format!("invalid enum value token '{}'", trimmed))
1181}
1182
1183fn parse_build_references_target(
1184    target: &str,
1185    col_name: &str,
1186    table_name: &str,
1187) -> Result<(String, String), String> {
1188    let target = target.trim();
1189    let (ref_table, ref_column) = target.split_once('(').ok_or_else(|| {
1190        format!(
1191            "Invalid foreign key reference target '{}' for column '{}' in table '{}'",
1192            target, col_name, table_name
1193        )
1194    })?;
1195    let ref_column = ref_column.strip_suffix(')').ok_or_else(|| {
1196        format!(
1197            "Invalid foreign key reference target '{}' for column '{}' in table '{}'",
1198            target, col_name, table_name
1199        )
1200    })?;
1201    let ref_table = ref_table.trim();
1202    let ref_column = ref_column.trim();
1203    if !is_build_table_ref(ref_table) || !is_build_identifier(ref_column) {
1204        return Err(format!(
1205            "Invalid foreign key reference target '{}' for column '{}' in table '{}'",
1206            target, col_name, table_name
1207        ));
1208    }
1209
1210    Ok((ref_table.to_string(), ref_column.to_string()))
1211}
1212
1213fn parse_build_ref_spec(
1214    ref_spec: &str,
1215    col_name: &str,
1216    table_name: &str,
1217) -> Result<(String, String), String> {
1218    let ref_spec = ref_spec.trim_start_matches('>');
1219    let (ref_table, ref_column) = ref_spec.split_once('.').ok_or_else(|| {
1220        format!(
1221            "Invalid ref target '{}' for column '{}' in table '{}'",
1222            ref_spec, col_name, table_name
1223        )
1224    })?;
1225    let ref_table = ref_table.trim();
1226    let ref_column = ref_column.trim();
1227    if !is_build_table_ref(ref_table) || !is_build_identifier(ref_column) {
1228        return Err(format!(
1229            "Invalid ref target '{}' for column '{}' in table '{}'",
1230            ref_spec, col_name, table_name
1231        ));
1232    }
1233
1234    Ok((ref_table.to_string(), ref_column.to_string()))
1235}
1236
1237fn push_build_foreign_key(
1238    foreign_keys: &mut Vec<ForeignKey>,
1239    column: &str,
1240    ref_table: String,
1241    ref_column: String,
1242    table_name: &str,
1243) -> Result<(), String> {
1244    if foreign_keys
1245        .iter()
1246        .any(|fk| fk.column == column && fk.ref_table == ref_table && fk.ref_column == ref_column)
1247    {
1248        return Err(format!(
1249            "duplicate foreign key '{}.{} -> {}.{}'",
1250            table_name, column, ref_table, ref_column
1251        ));
1252    }
1253
1254    foreign_keys.push(ForeignKey {
1255        column: column.to_string(),
1256        ref_table,
1257        ref_column,
1258    });
1259    Ok(())
1260}
1261
1262fn is_build_table_ref(value: &str) -> bool {
1263    let mut parts = value.split('.');
1264    let Some(first) = parts.next() else {
1265        return false;
1266    };
1267    !first.is_empty() && is_build_identifier(first) && parts.all(is_build_identifier)
1268}
1269
1270fn is_build_identifier(value: &str) -> bool {
1271    !value.is_empty()
1272        && value
1273            .chars()
1274            .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1275}
1276
1277fn is_build_fk_action(value: &str) -> bool {
1278    matches!(
1279        value,
1280        "cascade" | "set_null" | "set_default" | "restrict" | "no_action"
1281    )
1282}
1283
1284fn resource_block_content_before_closing(content: &str) -> Result<Option<String>, String> {
1285    let mut quote: Option<char> = None;
1286    let mut escaped = false;
1287
1288    for (idx, ch) in content.char_indices() {
1289        if escaped {
1290            escaped = false;
1291            continue;
1292        }
1293
1294        match quote {
1295            Some(q) => match ch {
1296                '\\' => escaped = true,
1297                c if c == q => quote = None,
1298                _ => {}
1299            },
1300            None => match ch {
1301                '"' | '\'' => quote = Some(ch),
1302                '}' => {
1303                    let rest = &content[idx + ch.len_utf8()..];
1304                    if !rest.trim().is_empty() {
1305                        return Err("Trailing content after resource definition".to_string());
1306                    }
1307                    return Ok(Some(content[..idx].trim().to_string()));
1308                }
1309                _ => {}
1310            },
1311        }
1312    }
1313
1314    Ok(None)
1315}
1316
1317fn split_resource_tokens(content: &str) -> Result<Vec<String>, String> {
1318    let mut tokens = Vec::new();
1319    let mut current = String::new();
1320    let mut quote: Option<char> = None;
1321    let mut escaped = false;
1322
1323    for ch in content.chars() {
1324        if escaped {
1325            current.push(ch);
1326            escaped = false;
1327            continue;
1328        }
1329
1330        match quote {
1331            Some(q) => match ch {
1332                '\\' => escaped = true,
1333                c if c == q => quote = None,
1334                c => current.push(c),
1335            },
1336            None => match ch {
1337                '"' | '\'' => quote = Some(ch),
1338                c if c.is_whitespace() => {
1339                    if !current.is_empty() {
1340                        tokens.push(std::mem::take(&mut current));
1341                    }
1342                }
1343                c => current.push(c),
1344            },
1345        }
1346    }
1347
1348    if escaped {
1349        current.push('\\');
1350    }
1351    if quote.is_some() {
1352        return Err("Unterminated quoted resource value".to_string());
1353    }
1354    if !current.is_empty() {
1355        tokens.push(current);
1356    }
1357
1358    Ok(tokens)
1359}
1360
1361fn parse_explicit_alter_add_column_line(
1362    line: &str,
1363) -> Result<(String, String, ColumnType), String> {
1364    let rest = line
1365        .strip_prefix("alter ")
1366        .ok_or_else(|| "expected 'alter <table> add <column:type[:constraints]>'".to_string())?
1367        .trim();
1368
1369    let mut parts = rest.splitn(2, char::is_whitespace);
1370    let table = parts
1371        .next()
1372        .map(str::trim)
1373        .filter(|table| !table.is_empty())
1374        .ok_or_else(|| "expected table name after 'alter'".to_string())?;
1375    if !is_build_table_ref(table) {
1376        return Err(format!("invalid alter table name '{}'", table));
1377    }
1378    let remainder = parts
1379        .next()
1380        .map(str::trim)
1381        .ok_or_else(|| "expected 'add <column:type[:constraints]>' after table name".to_string())?;
1382    let column_def = remainder
1383        .strip_prefix("add ")
1384        .ok_or_else(|| "expected 'add <column:type[:constraints]>' after table name".to_string())?
1385        .trim();
1386
1387    if column_def.is_empty() {
1388        return Err("expected column definition after 'add'".to_string());
1389    }
1390
1391    let (remaining, column_expr) = parse_column_definition(column_def)
1392        .map_err(|_| format!("invalid column definition '{}'", column_def))?;
1393    if !remaining.trim().is_empty() {
1394        return Err(format!(
1395            "unexpected trailing content after column definition: '{}'",
1396            remaining.trim()
1397        ));
1398    }
1399
1400    match column_expr {
1401        Expr::Def {
1402            name, data_type, ..
1403        } => {
1404            let column_type = data_type.parse::<ColumnType>().map_err(|_| {
1405                format!(
1406                    "unknown column type '{}' for column '{}' in alter '{}'",
1407                    data_type, name, table
1408                )
1409            })?;
1410            Ok((table.to_string(), name, column_type))
1411        }
1412        _ => Err("expected column definition after 'add'".to_string()),
1413    }
1414}
1415
1416fn extract_view_name(line: &str) -> Option<&str> {
1417    let rest = if let Some(r) = line.strip_prefix("view ") {
1418        r
1419    } else {
1420        line.strip_prefix("materialized view ")?
1421    };
1422
1423    let name = rest.split_whitespace().next().unwrap_or_default().trim();
1424    if name.is_empty() { None } else { Some(name) }
1425}
1426
1427fn extract_create_table_name_with_tail(line: &str) -> Option<(String, &str)> {
1428    let rest = extract_create_table_target_start(line)?;
1429    let rest = strip_sql_if_not_exists(rest).unwrap_or(rest);
1430
1431    extract_sql_table_ref_with_tail(rest)
1432}
1433
1434fn extract_create_table_target_start(line: &str) -> Option<&str> {
1435    let mut rest = strip_sql_keyword(line, "CREATE")?;
1436
1437    if let Some(after_unlogged) = strip_sql_keyword(rest, "UNLOGGED") {
1438        rest = after_unlogged;
1439    } else if strip_sql_keyword(rest, "TEMP")
1440        .or_else(|| strip_sql_keyword(rest, "TEMPORARY"))
1441        .is_some()
1442    {
1443        return None;
1444    }
1445
1446    strip_sql_keyword(rest, "TABLE")
1447}
1448
1449fn strip_sql_keyword<'a>(raw: &'a str, keyword: &str) -> Option<&'a str> {
1450    let rest = raw.trim_start();
1451    let tail = rest.get(keyword.len()..)?;
1452    if rest[..keyword.len()].eq_ignore_ascii_case(keyword)
1453        && (tail.is_empty() || tail.starts_with(char::is_whitespace))
1454    {
1455        Some(tail.trim_start())
1456    } else {
1457        None
1458    }
1459}
1460
1461fn strip_sql_if_exists(raw: &str) -> Option<&str> {
1462    let after_if = strip_sql_keyword(raw, "IF")?;
1463    strip_sql_keyword(after_if, "EXISTS")
1464}
1465
1466fn strip_sql_if_not_exists(raw: &str) -> Option<&str> {
1467    let after_if = strip_sql_keyword(raw, "IF")?;
1468    let after_not = strip_sql_keyword(after_if, "NOT")?;
1469    strip_sql_keyword(after_not, "EXISTS")
1470}
1471
1472/// Extract column name from a line inside CREATE TABLE block
1473fn extract_column_from_create(line: &str) -> Option<String> {
1474    let line = line.trim();
1475
1476    // Skip keywords and constraints
1477    // IMPORTANT: Must check for word boundaries to avoid matching column names
1478    // that happen to start with a keyword (e.g., created_at starts with CREATE,
1479    // primary_contact starts with PRIMARY, check_status starts with CHECK, etc.)
1480    let line_upper = line.to_uppercase();
1481    let starts_with_keyword = |kw: &str| -> bool {
1482        line_upper.starts_with(kw) && line_upper[kw.len()..].starts_with([' ', '('])
1483    };
1484
1485    if starts_with_keyword("CREATE")
1486        || starts_with_keyword("PRIMARY")
1487        || starts_with_keyword("FOREIGN")
1488        || starts_with_keyword("UNIQUE")
1489        || starts_with_keyword("CHECK")
1490        || starts_with_keyword("CONSTRAINT")
1491        || starts_with_keyword("EXCLUDE")
1492        || starts_with_keyword("LIKE")
1493        || line_upper.starts_with(")")
1494        || line_upper.starts_with("(")
1495        || line.is_empty()
1496    {
1497        return None;
1498    }
1499
1500    extract_sql_column_ref(line.trim_start_matches('(').trim())
1501}
1502
1503fn extract_inline_create_columns(line: &str) -> Vec<String> {
1504    let Some(open_idx) = line.find('(') else {
1505        return Vec::new();
1506    };
1507    let Some(close_idx) = find_matching_sql_paren(line, open_idx) else {
1508        return Vec::new();
1509    };
1510    let body = &line[open_idx + 1..close_idx];
1511    split_sql_top_level_csv(body)
1512        .into_iter()
1513        .filter_map(extract_column_from_create)
1514        .collect()
1515}
1516
1517fn find_matching_sql_paren(raw: &str, open_idx: usize) -> Option<usize> {
1518    let mut depth = 0usize;
1519    let mut in_single = false;
1520    let mut in_double = false;
1521    let mut dollar_quote: Option<String> = None;
1522    let mut i = open_idx;
1523
1524    while i < raw.len() {
1525        if let Some(delim) = dollar_quote.as_deref() {
1526            if raw[i..].starts_with(delim) {
1527                i += delim.len();
1528                dollar_quote = None;
1529            } else {
1530                i += raw[i..].chars().next().map(char::len_utf8).unwrap_or(1);
1531            }
1532            continue;
1533        }
1534
1535        let ch = raw[i..].chars().next()?;
1536        match ch {
1537            '\'' if !in_double => {
1538                if in_single && raw[i + ch.len_utf8()..].starts_with('\'') {
1539                    i += 2;
1540                    continue;
1541                }
1542                in_single = !in_single;
1543            }
1544            '"' if !in_single => {
1545                if in_double && raw[i + ch.len_utf8()..].starts_with('"') {
1546                    i += 2;
1547                    continue;
1548                }
1549                in_double = !in_double;
1550            }
1551            '$' if !in_single && !in_double => {
1552                if let Some(delim) = sql_dollar_quote_delimiter_at(raw, i) {
1553                    dollar_quote = Some(delim.to_string());
1554                    i += delim.len();
1555                    continue;
1556                }
1557            }
1558            '(' if !in_single && !in_double => depth += 1,
1559            ')' if !in_single && !in_double => {
1560                depth = depth.checked_sub(1)?;
1561                if depth == 0 {
1562                    return Some(i);
1563                }
1564            }
1565            _ => {}
1566        }
1567        i += ch.len_utf8();
1568    }
1569
1570    None
1571}
1572
1573fn split_sql_top_level_csv(raw: &str) -> Vec<&str> {
1574    let mut pieces = Vec::new();
1575    let mut start = 0usize;
1576    let mut depth = 0usize;
1577    let mut in_single = false;
1578    let mut in_double = false;
1579    let mut dollar_quote: Option<String> = None;
1580    let mut i = 0usize;
1581
1582    while i < raw.len() {
1583        if let Some(delim) = dollar_quote.as_deref() {
1584            if raw[i..].starts_with(delim) {
1585                i += delim.len();
1586                dollar_quote = None;
1587            } else {
1588                i += raw[i..].chars().next().map(char::len_utf8).unwrap_or(1);
1589            }
1590            continue;
1591        }
1592
1593        let Some(ch) = raw[i..].chars().next() else {
1594            break;
1595        };
1596        match ch {
1597            '\'' if !in_double => {
1598                if in_single && raw[i + ch.len_utf8()..].starts_with('\'') {
1599                    i += 2;
1600                    continue;
1601                }
1602                in_single = !in_single;
1603            }
1604            '"' if !in_single => {
1605                if in_double && raw[i + ch.len_utf8()..].starts_with('"') {
1606                    i += 2;
1607                    continue;
1608                }
1609                in_double = !in_double;
1610            }
1611            '$' if !in_single && !in_double => {
1612                if let Some(delim) = sql_dollar_quote_delimiter_at(raw, i) {
1613                    dollar_quote = Some(delim.to_string());
1614                    i += delim.len();
1615                    continue;
1616                }
1617            }
1618            '(' if !in_single && !in_double => depth += 1,
1619            ')' if !in_single && !in_double => depth = depth.saturating_sub(1),
1620            ',' if depth == 0 => {
1621                pieces.push(raw[start..i].trim());
1622                start = i + ch.len_utf8();
1623            }
1624            _ => {}
1625        }
1626        i += ch.len_utf8();
1627    }
1628
1629    pieces.push(raw[start..].trim());
1630    pieces
1631}
1632
1633fn split_sql_statements(raw: &str) -> Vec<String> {
1634    let mut statements = Vec::new();
1635    let mut start = 0usize;
1636    let mut in_single = false;
1637    let mut in_double = false;
1638    let mut dollar_quote: Option<String> = None;
1639    let mut i = 0usize;
1640
1641    while i < raw.len() {
1642        if let Some(delim) = dollar_quote.as_deref() {
1643            if raw[i..].starts_with(delim) {
1644                i += delim.len();
1645                dollar_quote = None;
1646            } else {
1647                i += raw[i..].chars().next().map(char::len_utf8).unwrap_or(1);
1648            }
1649            continue;
1650        }
1651
1652        let Some(ch) = raw[i..].chars().next() else {
1653            break;
1654        };
1655        match ch {
1656            '\'' if !in_double => {
1657                if in_single && raw[i + ch.len_utf8()..].starts_with('\'') {
1658                    i += 2;
1659                    continue;
1660                }
1661                in_single = !in_single;
1662            }
1663            '"' if !in_single => {
1664                if in_double && raw[i + ch.len_utf8()..].starts_with('"') {
1665                    i += 2;
1666                    continue;
1667                }
1668                in_double = !in_double;
1669            }
1670            '$' if !in_single && !in_double => {
1671                if let Some(delim) = sql_dollar_quote_delimiter_at(raw, i) {
1672                    dollar_quote = Some(delim.to_string());
1673                    i += delim.len();
1674                    continue;
1675                }
1676            }
1677            ';' if !in_single && !in_double => {
1678                let statement = raw[start..i].trim();
1679                if !statement.is_empty() {
1680                    statements.push(statement.to_string());
1681                }
1682                start = i + ch.len_utf8();
1683            }
1684            _ => {}
1685        }
1686        i += ch.len_utf8();
1687    }
1688
1689    let tail = raw[start..].trim();
1690    if !tail.is_empty() {
1691        statements.push(tail.to_string());
1692    }
1693
1694    statements
1695}
1696
1697/// Extract table and columns from ALTER TABLE ... ADD [COLUMN] actions.
1698fn extract_alter_add_columns(line: &str) -> Vec<(String, String)> {
1699    let line_upper = line.to_uppercase();
1700    if !line_upper.starts_with("ALTER TABLE") {
1701        return Vec::new();
1702    }
1703    let Some((table, actions_part)) = extract_alter_table_ref_with_tail(&line[11..]) else {
1704        return Vec::new();
1705    };
1706
1707    split_sql_top_level_csv(actions_part)
1708        .into_iter()
1709        .filter_map(|action| {
1710            extract_alter_add_column_action(action).map(|col| (table.clone(), col))
1711        })
1712        .collect()
1713}
1714
1715fn extract_alter_add_column_action(action: &str) -> Option<String> {
1716    let mut col_part = strip_sql_keyword(action, "ADD")?;
1717    col_part = strip_sql_keyword(col_part, "COLUMN").unwrap_or(col_part);
1718    col_part = strip_sql_if_not_exists(col_part).unwrap_or(col_part);
1719
1720    let col_upper = col_part.trim_start().to_uppercase();
1721    if [
1722        "CONSTRAINT",
1723        "PRIMARY",
1724        "UNIQUE",
1725        "CHECK",
1726        "FOREIGN",
1727        "EXCLUDE",
1728    ]
1729    .iter()
1730    .any(|keyword| {
1731        col_upper.starts_with(keyword) && col_upper[keyword.len()..].starts_with([' ', '('])
1732    }) {
1733        return None;
1734    }
1735
1736    extract_sql_column_ref(col_part.trim())
1737}
1738
1739/// Extract table names from DROP TABLE statement
1740fn extract_drop_table_names(line: &str) -> Vec<String> {
1741    let line_upper = line.to_uppercase();
1742    let Some(rest) = line_upper.strip_prefix("DROP TABLE") else {
1743        return Vec::new();
1744    };
1745    let rest = rest.trim_start();
1746    let rest = if rest.starts_with("IF EXISTS") {
1747        match rest.strip_prefix("IF EXISTS") {
1748            Some(rest) => rest.trim_start(),
1749            None => return Vec::new(),
1750        }
1751    } else {
1752        rest
1753    };
1754
1755    split_sql_top_level_csv(&line[line.len() - rest.len()..])
1756        .into_iter()
1757        .filter_map(extract_sql_table_ref)
1758        .collect()
1759}
1760
1761/// Extract table and columns from ALTER TABLE ... DROP [COLUMN] actions.
1762fn extract_alter_drop_columns(line: &str) -> Vec<(String, String)> {
1763    let line_upper = line.to_uppercase();
1764    if !line_upper.starts_with("ALTER TABLE") {
1765        return Vec::new();
1766    }
1767    let Some((table, actions_part)) = extract_alter_table_ref_with_tail(&line[11..]) else {
1768        return Vec::new();
1769    };
1770
1771    split_sql_top_level_csv(actions_part)
1772        .into_iter()
1773        .filter_map(|action| {
1774            extract_alter_drop_column_action(action).map(|col| (table.clone(), col))
1775        })
1776        .collect()
1777}
1778
1779fn extract_alter_drop_column_action(action: &str) -> Option<String> {
1780    let mut col_part = strip_sql_keyword(action, "DROP")?;
1781    col_part = strip_sql_keyword(col_part, "COLUMN").unwrap_or(col_part);
1782    col_part = strip_sql_if_exists(col_part).unwrap_or(col_part);
1783
1784    let col_upper = col_part.trim_start().to_uppercase();
1785    if ["CONSTRAINT", "INDEX"].iter().any(|keyword| {
1786        col_upper.starts_with(keyword)
1787            && col_upper[keyword.len()..].starts_with(char::is_whitespace)
1788    }) {
1789        return None;
1790    }
1791
1792    extract_sql_column_ref(col_part.trim())
1793}
1794
1795fn extract_alter_rename_column(line: &str) -> Option<(String, String, String)> {
1796    let line_upper = line.to_uppercase();
1797    if !line_upper.starts_with("ALTER TABLE") {
1798        return None;
1799    }
1800    let (table, actions_part) = extract_alter_table_ref_with_tail(&line[11..])?;
1801    let actions_upper = actions_part.to_uppercase();
1802    let (rename_pos, rename_len) = if let Some(pos) = actions_upper.find("RENAME COLUMN") {
1803        (pos, "RENAME COLUMN".len())
1804    } else {
1805        (actions_upper.find("RENAME ")?, "RENAME".len())
1806    };
1807    let to_pos = actions_upper[rename_pos + rename_len..].find(" TO ")? + rename_pos + rename_len;
1808
1809    let old_part = &actions_part[rename_pos + rename_len..to_pos];
1810    let new_part = &actions_part[to_pos + 4..];
1811    let old_col = extract_sql_column_ref(old_part.trim())?;
1812    let new_col = extract_sql_column_ref(new_part.trim())?;
1813
1814    Some((table, old_col, new_col))
1815}
1816
1817fn extract_alter_rename_table(line: &str) -> Option<(String, String)> {
1818    let line_upper = line.to_uppercase();
1819    if !line_upper.starts_with("ALTER TABLE") {
1820        return None;
1821    }
1822    let (old_table, actions_part) = extract_alter_table_ref_with_tail(&line[11..])?;
1823    let actions_upper = actions_part.to_uppercase();
1824    let rename_pos = actions_upper.find("RENAME TO ")?;
1825
1826    let new_part = &actions_part[rename_pos + "RENAME TO ".len()..];
1827    let new_ref = extract_sql_table_ref(new_part.trim())?;
1828    let new_table = if new_ref.contains('.') {
1829        new_ref
1830    } else if let Some((schema, _)) = old_table.rsplit_once('.') {
1831        format!("{schema}.{new_ref}")
1832    } else {
1833        new_ref
1834    };
1835
1836    Some((old_table, new_table))
1837}
1838
1839fn extract_sql_table_ref(raw: &str) -> Option<String> {
1840    extract_sql_table_ref_with_tail(raw).map(|(name, _)| name)
1841}
1842
1843fn extract_sql_table_ref_with_tail(raw: &str) -> Option<(String, &str)> {
1844    let mut rest = raw.trim_start();
1845    let mut parts = Vec::new();
1846
1847    loop {
1848        let (part, tail, _) = parse_sql_identifier_segment(rest)?;
1849        parts.push(part.to_ascii_lowercase());
1850        rest = tail.trim_start();
1851        if let Some(tail) = rest.strip_prefix('.') {
1852            rest = tail.trim_start();
1853        } else {
1854            break;
1855        }
1856    }
1857
1858    let name = parts.join(".");
1859    is_build_table_ref(&name).then_some((name, rest))
1860}
1861
1862fn extract_sql_column_ref(raw: &str) -> Option<String> {
1863    let (name, rest, quoted) = parse_sql_identifier_segment(raw)?;
1864    if rest.trim_start().starts_with('.') {
1865        return None;
1866    }
1867    let name = name.to_ascii_lowercase();
1868    if name.is_empty() || !is_build_identifier(&name) || (!quoted && name == "if") {
1869        None
1870    } else {
1871        Some(name)
1872    }
1873}
1874
1875fn parse_sql_identifier_segment(raw: &str) -> Option<(String, &str, bool)> {
1876    let rest = raw.trim_start();
1877    if let Some(quoted) = rest.strip_prefix('"') {
1878        let mut out = String::new();
1879        let mut chars = quoted.char_indices().peekable();
1880        while let Some((idx, ch)) = chars.next() {
1881            if ch == '"' {
1882                if chars.peek().is_some_and(|(_, next)| *next == '"') {
1883                    out.push('"');
1884                    chars.next();
1885                    continue;
1886                }
1887                let consumed = 1 + idx + ch.len_utf8();
1888                return Some((out, &rest[consumed..], true));
1889            }
1890            out.push(ch);
1891        }
1892        return None;
1893    }
1894
1895    let name: String = rest
1896        .chars()
1897        .take_while(|c| c.is_ascii_alphanumeric() || *c == '_')
1898        .collect();
1899    if name.is_empty() {
1900        return None;
1901    }
1902    let tail = &rest[name.len()..];
1903    Some((name, tail, false))
1904}
1905
1906fn extract_alter_table_ref_with_tail(raw: &str) -> Option<(String, &str)> {
1907    let mut rest = raw.trim_start();
1908    let upper = rest.to_uppercase();
1909    if upper.starts_with("IF EXISTS")
1910        && rest
1911            .get("IF EXISTS".len()..)
1912            .is_some_and(|tail| tail.starts_with(char::is_whitespace))
1913    {
1914        rest = rest.get("IF EXISTS".len()..)?.trim_start();
1915    }
1916    let upper = rest.to_uppercase();
1917    if upper.starts_with("ONLY")
1918        && rest
1919            .get("ONLY".len()..)
1920            .is_some_and(|tail| tail.starts_with(char::is_whitespace))
1921    {
1922        rest = rest.get("ONLY".len()..)?.trim_start();
1923    }
1924    let (table, tail) = extract_sql_table_ref_with_tail(rest)?;
1925    Some((table, tail.trim_start()))
1926}
1927
1928impl TableSchema {
1929    /// Check if column exists
1930    pub fn has_column(&self, name: &str) -> bool {
1931        self.columns.contains_key(name)
1932    }
1933
1934    /// Get column type
1935    pub fn column_type(&self, name: &str) -> Option<&ColumnType> {
1936        self.columns.get(name)
1937    }
1938
1939    /// Get the primary key column name for this table.
1940    ///
1941    /// Convention: returns `"id"` if it exists as a column.
1942    /// This is a single point of truth for PK resolution — when the schema
1943    /// parser is enhanced to track PK constraints, update this method.
1944    pub fn primary_key_column(&self) -> &str {
1945        if self.columns.contains_key("id") {
1946            "id"
1947        } else {
1948            // Fallback: look for `{singular_table_name}_id` pattern
1949            // e.g., table "users" → "user_id"
1950            let singular = self.name.trim_end_matches('s');
1951            let conventional = format!("{}_id", singular);
1952            if self.columns.contains_key(&conventional) {
1953                // Leak into 'static to satisfy lifetime — this is called rarely
1954                // and the string is small. Alternatively, return String.
1955                return "id"; // Safe default — schema has no "id" but this avoids lifetime issues
1956            }
1957            "id" // Universal fallback
1958        }
1959    }
1960}
1961
1962#[cfg(test)]
1963mod comment_tests {
1964    use super::{ColumnType, Schema, strip_schema_comments, strip_sql_line_comments};
1965
1966    #[test]
1967    fn schema_comment_stripping_ignores_markers_inside_quotes() {
1968        assert_eq!(
1969            strip_schema_comments(r#"status TEXT default 'draft--internal#tag' # comment"#),
1970            r#"status TEXT default 'draft--internal#tag'"#
1971        );
1972        assert_eq!(
1973            strip_schema_comments(r#"status TEXT default "draft--internal#tag" -- comment"#),
1974            r#"status TEXT default "draft--internal#tag""#
1975        );
1976    }
1977
1978    #[test]
1979    fn sql_comment_stripping_ignores_double_dash_inside_strings() {
1980        assert_eq!(
1981            strip_sql_line_comments("CREATE TABLE logs (message text DEFAULT 'a--b'); -- comment"),
1982            "CREATE TABLE logs (message text DEFAULT 'a--b');"
1983        );
1984        assert_eq!(
1985            strip_sql_line_comments("CREATE TABLE tags (name text DEFAULT '#not-comment');"),
1986            "CREATE TABLE tags (name text DEFAULT '#not-comment');"
1987        );
1988    }
1989
1990    #[test]
1991    fn sql_migration_paren_depth_ignores_string_literals() {
1992        let mut schema = Schema::default();
1993        schema.parse_sql_migration(
1994            r#"
1995CREATE TABLE logs (
1996  message text DEFAULT ')',
1997  tag text DEFAULT '(',
1998  level text
1999);
2000"#,
2001        );
2002
2003        let logs = schema.table("logs").expect("logs table should parse");
2004        assert!(logs.has_column("message"));
2005        assert!(logs.has_column("tag"));
2006        assert!(logs.has_column("level"));
2007    }
2008
2009    #[test]
2010    fn schema_parse_accepts_pulled_rls_directives() {
2011        let schema = Schema::parse(
2012            r#"
2013table agents {
2014  id UUID
2015  tenant_id UUID
2016  enable_rls
2017  force_rls
2018}
2019"#,
2020        )
2021        .expect("pulled schema RLS directives should parse");
2022
2023        let agents = schema.table("agents").expect("agents table should parse");
2024        assert!(agents.has_column("id"));
2025        assert!(agents.rls_enabled);
2026        assert!(!agents.has_column("enable_rls"));
2027        assert!(!agents.has_column("force_rls"));
2028    }
2029
2030    #[test]
2031    fn schema_parse_accepts_multi_word_column_types() {
2032        let schema = Schema::parse(
2033            r#"
2034table car_fullday_reseller_pricing {
2035  percentage_markup DOUBLE PRECISION
2036  starts_at TIMESTAMP WITH TIME ZONE
2037}
2038"#,
2039        )
2040        .expect("pulled schema multi-word types should parse");
2041
2042        let pricing = schema
2043            .table("car_fullday_reseller_pricing")
2044            .expect("pricing table should parse");
2045        assert_eq!(
2046            pricing.column_type("percentage_markup"),
2047            Some(&ColumnType::Float)
2048        );
2049        assert_eq!(
2050            pricing.column_type("starts_at"),
2051            Some(&ColumnType::Timestamptz)
2052        );
2053    }
2054
2055    #[test]
2056    fn sql_migration_ignores_multiline_block_comments() {
2057        let mut schema = Schema::default();
2058        schema.parse_sql_migration(
2059            r#"
2060CREATE TABLE users (
2061  id uuid
2062);
2063
2064/*
2065ALTER TABLE users ADD COLUMN hidden text;
2066CREATE TABLE hidden_table (
2067  id uuid
2068);
2069*/
2070"#,
2071        );
2072
2073        let users = schema.table("users").expect("users table should parse");
2074        assert!(users.has_column("id"));
2075        assert!(!users.has_column("hidden"));
2076        assert!(!schema.has_table("hidden_table"));
2077    }
2078
2079    #[test]
2080    fn sql_migration_preserves_schema_qualified_table_names() {
2081        let mut schema = Schema::default();
2082        schema.parse_sql_migration(
2083            r#"
2084CREATE TABLE app.users (
2085  id uuid
2086);
2087
2088ALTER TABLE app.users ADD COLUMN email text;
2089"#,
2090        );
2091
2092        assert!(!schema.has_table("app"));
2093        let users = schema
2094            .table("app.users")
2095            .expect("schema-qualified table should parse");
2096        assert!(users.has_column("id"));
2097        assert!(users.has_column("email"));
2098    }
2099
2100    #[test]
2101    fn sql_migration_extracts_inline_create_table_columns() {
2102        let mut schema = Schema::default();
2103        schema.parse_sql_migration(
2104            "CREATE TABLE users (id uuid, email text DEFAULT 'a,b', CHECK (length(email) > 3));",
2105        );
2106
2107        let users = schema.table("users").expect("users table should parse");
2108        assert!(users.has_column("id"));
2109        assert!(users.has_column("email"));
2110        assert!(!users.has_column("check"));
2111    }
2112
2113    #[test]
2114    fn sql_migration_drops_multiple_tables() {
2115        let mut schema = Schema::default();
2116        schema.parse_sql_migration(
2117            r#"
2118CREATE TABLE app.users (id uuid);
2119CREATE TABLE app.posts (id uuid);
2120DROP TABLE IF EXISTS app.users, app.posts CASCADE;
2121"#,
2122        );
2123
2124        assert!(!schema.has_table("app.users"));
2125        assert!(!schema.has_table("app.posts"));
2126    }
2127
2128    #[test]
2129    fn sql_migration_ignores_create_table_non_column_clauses() {
2130        let mut schema = Schema::default();
2131        schema.parse_sql_migration(
2132            r#"
2133CREATE TABLE bookings (
2134  id uuid,
2135  EXCLUDE USING gist (room WITH =),
2136  LIKE booking_template INCLUDING ALL
2137);
2138"#,
2139        );
2140
2141        let bookings = schema
2142            .table("bookings")
2143            .expect("bookings table should parse");
2144        assert!(bookings.has_column("id"));
2145        assert!(!bookings.has_column("exclude"));
2146        assert!(!bookings.has_column("like"));
2147    }
2148
2149    #[test]
2150    fn sql_migration_ignores_alter_add_constraints() {
2151        let mut schema = Schema::default();
2152        schema.parse_sql_migration(
2153            r#"
2154CREATE TABLE users (id uuid, email text);
2155ALTER TABLE users ADD CONSTRAINT users_email_key UNIQUE (email);
2156ALTER TABLE users ADD PRIMARY KEY (id);
2157"#,
2158        );
2159
2160        let users = schema.table("users").expect("users table should parse");
2161        assert!(users.has_column("id"));
2162        assert!(users.has_column("email"));
2163        assert!(!users.has_column("constraint"));
2164        assert!(!users.has_column("primary"));
2165    }
2166
2167    #[test]
2168    fn sql_migration_handles_alter_table_modifiers() {
2169        let mut schema = Schema::default();
2170        schema.parse_sql_migration(
2171            r#"
2172CREATE TABLE users (id uuid);
2173ALTER TABLE ONLY users ADD COLUMN email text;
2174ALTER TABLE IF EXISTS users DROP COLUMN id;
2175"#,
2176        );
2177
2178        assert!(!schema.has_table("only"));
2179        assert!(!schema.has_table("if"));
2180        let users = schema.table("users").expect("users table should parse");
2181        assert!(!users.has_column("id"));
2182        assert!(users.has_column("email"));
2183    }
2184
2185    #[test]
2186    fn sql_migration_handles_drop_column_if_exists() {
2187        let mut schema = Schema::default();
2188        schema.parse_sql_migration(
2189            r#"
2190CREATE TABLE users (id uuid, old_email text, old_name text);
2191ALTER TABLE users DROP COLUMN IF EXISTS old_email;
2192ALTER TABLE users DROP IF EXISTS old_name;
2193"#,
2194        );
2195
2196        let users = schema.table("users").expect("users table should parse");
2197        assert!(users.has_column("id"));
2198        assert!(!users.has_column("old_email"));
2199        assert!(!users.has_column("old_name"));
2200        assert!(!users.has_column("if"));
2201    }
2202
2203    #[test]
2204    fn sql_migration_handles_quoted_table_and_column_identifiers() {
2205        let mut schema = Schema::default();
2206        schema.parse_sql_migration(
2207            r#"
2208CREATE TABLE "app"."order" ("id" uuid, "select" text);
2209ALTER TABLE "app"."order" ADD COLUMN "from" text;
2210ALTER TABLE "app"."order" DROP COLUMN "select";
2211"#,
2212        );
2213
2214        let orders = schema
2215            .table("app.order")
2216            .expect("quoted schema-qualified table should parse");
2217        assert!(orders.has_column("id"));
2218        assert!(orders.has_column("from"));
2219        assert!(!orders.has_column("select"));
2220    }
2221
2222    #[test]
2223    fn sql_migration_ignores_dollar_quoted_default_syntax() {
2224        let mut schema = Schema::default();
2225        schema.parse_sql_migration(
2226            r#"
2227CREATE TABLE logs (id uuid, body text DEFAULT $$a,b)--not-comment$$, tag text);
2228"#,
2229        );
2230
2231        let logs = schema.table("logs").expect("logs table should parse");
2232        assert!(logs.has_column("id"));
2233        assert!(logs.has_column("body"));
2234        assert!(logs.has_column("tag"));
2235        assert!(!logs.has_column("b"));
2236        assert!(!logs.has_column("not"));
2237    }
2238
2239    #[test]
2240    fn sql_migration_ignores_multiline_dollar_quoted_bodies() {
2241        let mut schema = Schema::default();
2242        schema.parse_sql_migration(
2243            r#"
2244CREATE TABLE users (id uuid);
2245CREATE FUNCTION rebuild_hidden() RETURNS void AS $$
2246BEGIN
2247  CREATE TABLE hidden_from_function (id uuid);
2248END;
2249$$ LANGUAGE plpgsql;
2250"#,
2251        );
2252
2253        assert!(schema.has_table("users"));
2254        assert!(!schema.has_table("hidden_from_function"));
2255    }
2256
2257    #[test]
2258    fn sql_migration_handles_unlogged_create_tables() {
2259        let mut schema = Schema::default();
2260        schema.parse_sql_migration(
2261            r#"
2262CREATE UNLOGGED TABLE IF NOT EXISTS jobs (id uuid, status text);
2263CREATE TEMP TABLE scratch_jobs (id uuid);
2264"#,
2265        );
2266
2267        let jobs = schema.table("jobs").expect("unlogged table should parse");
2268        assert!(jobs.has_column("id"));
2269        assert!(jobs.has_column("status"));
2270        assert!(!schema.has_table("scratch_jobs"));
2271    }
2272
2273    #[test]
2274    fn sql_migration_tracks_column_renames() {
2275        let mut schema = Schema::default();
2276        schema.parse_sql_migration(
2277            r#"
2278CREATE TABLE users (id uuid, old_email text);
2279ALTER TABLE users RENAME COLUMN old_email TO email;
2280"#,
2281        );
2282
2283        let users = schema.table("users").expect("users table should parse");
2284        assert!(users.has_column("id"));
2285        assert!(users.has_column("email"));
2286        assert!(!users.has_column("old_email"));
2287    }
2288
2289    #[test]
2290    fn sql_migration_tracks_table_renames() {
2291        let mut schema = Schema::default();
2292        schema.parse_sql_migration(
2293            r#"
2294CREATE TABLE app.users (id uuid, email text);
2295ALTER TABLE app.users RENAME TO customers;
2296"#,
2297        );
2298
2299        assert!(!schema.has_table("app.users"));
2300        let customers = schema
2301            .table("app.customers")
2302            .expect("schema-qualified table rename should parse");
2303        assert!(customers.has_column("id"));
2304        assert!(customers.has_column("email"));
2305    }
2306
2307    #[test]
2308    fn sql_migration_handles_add_if_not_exists_without_column_keyword() {
2309        let mut schema = Schema::default();
2310        schema.parse_sql_migration(
2311            r#"
2312CREATE TABLE users (id uuid);
2313ALTER TABLE users ADD IF NOT EXISTS email text;
2314"#,
2315        );
2316
2317        let users = schema.table("users").expect("users table should parse");
2318        assert!(users.has_column("id"));
2319        assert!(users.has_column("email"));
2320        assert!(!users.has_column("if"));
2321    }
2322
2323    #[test]
2324    fn sql_migration_tracks_column_renames_without_column_keyword() {
2325        let mut schema = Schema::default();
2326        schema.parse_sql_migration(
2327            r#"
2328CREATE TABLE users (id uuid, old_email text);
2329ALTER TABLE users RENAME old_email TO email;
2330"#,
2331        );
2332
2333        let users = schema.table("users").expect("users table should parse");
2334        assert!(users.has_column("email"));
2335        assert!(!users.has_column("old_email"));
2336    }
2337
2338    #[test]
2339    fn sql_migration_does_not_treat_create_table_as_select_as_column_block() {
2340        let mut schema = Schema::default();
2341        schema.parse_sql_migration(
2342            r#"
2343CREATE TABLE reports AS SELECT id FROM users;
2344ALTER TABLE reports ADD COLUMN status text;
2345"#,
2346        );
2347
2348        let reports = schema.table("reports").expect("reports table should parse");
2349        assert!(reports.has_column("status"));
2350        assert!(!reports.has_column("alter"));
2351    }
2352
2353    #[test]
2354    fn sql_migration_handles_multiple_alter_add_actions() {
2355        let mut schema = Schema::default();
2356        schema.parse_sql_migration(
2357            r#"
2358CREATE TABLE users (id uuid);
2359ALTER TABLE users ADD COLUMN email text, ADD IF NOT EXISTS name text;
2360"#,
2361        );
2362
2363        let users = schema.table("users").expect("users table should parse");
2364        assert!(users.has_column("email"));
2365        assert!(users.has_column("name"));
2366    }
2367
2368    #[test]
2369    fn sql_migration_handles_multiple_alter_drop_actions() {
2370        let mut schema = Schema::default();
2371        schema.parse_sql_migration(
2372            r#"
2373CREATE TABLE users (id uuid, old_email text, old_name text);
2374ALTER TABLE users DROP COLUMN old_email, DROP IF EXISTS old_name;
2375"#,
2376        );
2377
2378        let users = schema.table("users").expect("users table should parse");
2379        assert!(users.has_column("id"));
2380        assert!(!users.has_column("old_email"));
2381        assert!(!users.has_column("old_name"));
2382    }
2383
2384    #[test]
2385    fn sql_migration_handles_multiline_mixed_alter_actions() {
2386        let mut schema = Schema::default();
2387        schema.parse_sql_migration(
2388            r#"
2389CREATE TABLE users (id uuid, old_email text, old_name text);
2390ALTER TABLE users
2391  ADD COLUMN email text,
2392  DROP COLUMN old_email,
2393  RENAME COLUMN old_name TO legacy_name;
2394"#,
2395        );
2396
2397        let users = schema.table("users").expect("users table should parse");
2398        assert!(users.has_column("id"));
2399        assert!(users.has_column("email"));
2400        assert!(users.has_column("legacy_name"));
2401        assert!(!users.has_column("old_email"));
2402        assert!(!users.has_column("old_name"));
2403    }
2404
2405    #[test]
2406    fn sql_migration_handles_drop_then_recreate_order() {
2407        let mut schema = Schema::default();
2408        schema.parse_sql_migration(
2409            r#"
2410CREATE TABLE users (stale text);
2411DROP TABLE users;
2412CREATE TABLE users (id uuid, email text);
2413"#,
2414        );
2415
2416        let users = schema
2417            .table("users")
2418            .expect("recreated table should remain in schema");
2419        assert!(users.has_column("id"));
2420        assert!(users.has_column("email"));
2421        assert!(!users.has_column("stale"));
2422    }
2423
2424    #[test]
2425    fn sql_migration_allows_alter_add_columns_with_constraint_prefixes() {
2426        let mut schema = Schema::default();
2427        schema.parse_sql_migration(
2428            r#"
2429CREATE TABLE users (id uuid);
2430ALTER TABLE users ADD COLUMN primary_contact text, ADD check_status text;
2431"#,
2432        );
2433
2434        let users = schema.table("users").expect("users table should parse");
2435        assert!(users.has_column("primary_contact"));
2436        assert!(users.has_column("check_status"));
2437    }
2438
2439    #[test]
2440    fn sql_migration_handles_create_table_paren_on_next_line() {
2441        let mut schema = Schema::default();
2442        schema.parse_sql_migration(
2443            r#"
2444CREATE TABLE users
2445(
2446  id uuid,
2447  email text
2448);
2449"#,
2450        );
2451
2452        let users = schema.table("users").expect("users table should parse");
2453        assert!(users.has_column("id"));
2454        assert!(users.has_column("email"));
2455    }
2456
2457    #[test]
2458    fn sql_migration_does_not_treat_alter_column_drop_as_column_drop() {
2459        let mut schema = Schema::default();
2460        schema.parse_sql_migration(
2461            r#"
2462CREATE TABLE users (id uuid, email text, not text);
2463ALTER TABLE users ALTER COLUMN email DROP NOT NULL;
2464"#,
2465        );
2466
2467        let users = schema.table("users").expect("users table should parse");
2468        assert!(users.has_column("email"));
2469        assert!(users.has_column("not"));
2470    }
2471
2472    #[test]
2473    fn sql_migration_chaos_mixed_postgres_syntax() {
2474        let mut schema = Schema::default();
2475        schema.parse_sql_migration(
2476            r#"
2477CREATE SCHEMA app;
2478CREATE UNLOGGED TABLE IF NOT EXISTS "app"."users"
2479(
2480  id uuid,
2481  old_email text,
2482  old_name text,
2483  "select" text,
2484  "not" text
2485);
2486CREATE TEMP TABLE scratch_jobs (id uuid);
2487ALTER TABLE ONLY "app"."users" ADD COLUMN primary_contact text, ADD check_status text;
2488ALTER TABLE "app"."users" ADD IF NOT EXISTS guarded text;
2489ALTER TABLE "app"."users" DROP COLUMN "select", DROP IF EXISTS guarded, DROP COLUMN IF EXISTS old_name;
2490ALTER TABLE "app"."users" RENAME old_email TO email;
2491ALTER TABLE "app"."users" ALTER COLUMN email DROP NOT NULL;
2492ALTER TABLE "app"."users" RENAME TO customers;
2493
2494CREATE TABLE app.logs (id uuid, body text DEFAULT $$a,b)--not-comment$$, tag text);
2495CREATE FUNCTION app.rebuild_hidden() RETURNS void AS $$
2496BEGIN
2497  CREATE TABLE hidden_from_function (id uuid);
2498END;
2499$$ LANGUAGE plpgsql;
2500CREATE TABLE app.reports AS SELECT id FROM app.customers;
2501ALTER TABLE app.reports ADD COLUMN status text;
2502"#,
2503        );
2504
2505        assert!(!schema.has_table("scratch_jobs"));
2506        assert!(!schema.has_table("app.users"));
2507        assert!(!schema.has_table("hidden_from_function"));
2508
2509        let customers = schema
2510            .table("app.customers")
2511            .expect("renamed schema-qualified table should parse");
2512        assert!(customers.has_column("id"));
2513        assert!(customers.has_column("email"));
2514        assert!(customers.has_column("not"));
2515        assert!(customers.has_column("primary_contact"));
2516        assert!(customers.has_column("check_status"));
2517        assert!(!customers.has_column("old_email"));
2518        assert!(!customers.has_column("old_name"));
2519        assert!(!customers.has_column("select"));
2520        assert!(!customers.has_column("guarded"));
2521
2522        let logs = schema.table("app.logs").expect("logs table should parse");
2523        assert!(logs.has_column("id"));
2524        assert!(logs.has_column("body"));
2525        assert!(logs.has_column("tag"));
2526        assert!(!logs.has_column("b"));
2527
2528        let reports = schema
2529            .table("app.reports")
2530            .expect("ctas table should parse");
2531        assert!(reports.has_column("status"));
2532        assert!(!reports.has_column("alter"));
2533    }
2534}