Skip to main content

qail_core/build/
schema.rs

1//! Schema types and parsing for build-time validation.
2
3use crate::migrate::types::ColumnType;
4use std::collections::HashMap;
5use std::path::Path;
6
7/// Foreign key relationship definition
8#[derive(Debug, Clone)]
9pub struct ForeignKey {
10    /// Column in this table that references another table
11    pub column: String,
12    /// Name of referenced table
13    pub ref_table: String,
14    /// Column in referenced table
15    pub ref_column: String,
16}
17
18/// Table schema information with column types and relations
19#[derive(Debug, Clone)]
20pub struct TableSchema {
21    /// Table name.
22    pub name: String,
23    /// Column name → Column type (strongly-typed AST enum)
24    pub columns: HashMap<String, ColumnType>,
25    /// Column name → Access Policy (Default: "Public", can be "Protected")
26    pub policies: HashMap<String, String>,
27    /// Foreign key relationships to other tables
28    pub foreign_keys: Vec<ForeignKey>,
29    /// Whether this table has Row-Level Security enabled
30    /// Auto-detected: table has `operator_id` column OR has `rls` keyword in schema.qail
31    pub rls_enabled: bool,
32}
33
34/// Parsed schema from schema.qail file
35#[derive(Debug, Default)]
36pub struct Schema {
37    /// Table schemas keyed by table name.
38    pub tables: HashMap<String, TableSchema>,
39    /// Infrastructure resources (bucket, queue, topic)
40    pub resources: HashMap<String, ResourceSchema>,
41}
42
43/// Infrastructure resource schema (bucket, queue, topic)
44#[derive(Debug, Clone)]
45pub struct ResourceSchema {
46    /// Resource name.
47    pub name: String,
48    /// Resource kind (bucket, queue, topic).
49    pub kind: String,
50    /// Cloud provider (e.g. "aws").
51    pub provider: Option<String>,
52    /// Provider-specific properties.
53    pub properties: HashMap<String, String>,
54}
55
56fn strip_schema_comments(line: &str) -> &str {
57    let line = line.split_once("--").map_or(line, |(left, _)| left);
58    line.split_once('#').map_or(line, |(left, _)| left).trim()
59}
60
61fn strip_sql_line_comments(line: &str) -> &str {
62    line.split_once("--").map_or(line, |(left, _)| left).trim()
63}
64
65impl Schema {
66    /// Parse a schema.qail file
67    pub fn parse_file(path: &str) -> Result<Self, String> {
68        let content = crate::schema_source::read_qail_schema_source(path)?;
69        Self::parse(&content)
70    }
71
72    /// Parse schema from string
73    pub fn parse(content: &str) -> Result<Self, String> {
74        let mut schema = Schema::default();
75        let mut current_table: Option<String> = None;
76        let mut current_columns: HashMap<String, ColumnType> = HashMap::new();
77        let mut current_policies: HashMap<String, String> = HashMap::new();
78        let mut current_fks: Vec<ForeignKey> = Vec::new();
79        let mut current_rls_flag = false;
80
81        for raw_line in content.lines() {
82            let line = strip_schema_comments(raw_line);
83
84            // Skip comments and empty lines
85            if line.is_empty() {
86                continue;
87            }
88
89            // Resource declarations: bucket, queue, topic
90            // Only match at the top level, NOT inside a table block
91            // (a column named 'topic' inside a table would otherwise be
92            //  misidentified as a resource declaration)
93            if current_table.is_none()
94                && (line.starts_with("bucket ")
95                    || line.starts_with("queue ")
96                    || line.starts_with("topic "))
97            {
98                let parts: Vec<&str> = line.splitn(2, ' ').collect();
99                let kind = parts[0].to_string();
100                let rest = parts.get(1).copied().unwrap_or("").trim();
101
102                // Extract name (before {
103                let name = rest.split('{').next().unwrap_or(rest).trim().to_string();
104                let mut provider = None;
105                let mut properties = HashMap::new();
106
107                if line.contains('{') {
108                    // Collect block content
109                    let block = rest.split('{').nth(1).unwrap_or("").to_string();
110                    if !block.contains('}') {
111                        for inner in content.lines().skip_while(|l| !l.contains(line)) {
112                            // Simple approach: read until }
113                            if inner.contains('}') {
114                                break;
115                            }
116                        }
117                    }
118                    let block = block.replace('}', "");
119                    let mut tokens = block.split_whitespace();
120                    while let Some(key) = tokens.next() {
121                        if let Some(val) = tokens.next() {
122                            let val = val.trim_matches('"').to_string();
123                            if key == "provider" {
124                                provider = Some(val);
125                            } else {
126                                properties.insert(key.to_string(), val);
127                            }
128                        }
129                    }
130                }
131
132                if !name.is_empty() {
133                    schema.resources.insert(
134                        name.clone(),
135                        ResourceSchema {
136                            name,
137                            kind,
138                            provider,
139                            properties,
140                        },
141                    );
142                }
143                continue;
144            }
145
146            // Table definition: table name { [rls]
147            if line.starts_with("table ") && (line.ends_with('{') || line.contains('{')) {
148                // Save previous table if any
149                if let Some(table_name) = current_table.take() {
150                    // Auto-detect RLS: table has operator_id column or was marked `rls`
151                    let has_rls = current_rls_flag || current_columns.contains_key("operator_id");
152                    schema.tables.insert(
153                        table_name.clone(),
154                        TableSchema {
155                            name: table_name,
156                            columns: std::mem::take(&mut current_columns),
157                            policies: std::mem::take(&mut current_policies),
158                            foreign_keys: std::mem::take(&mut current_fks),
159                            rls_enabled: has_rls,
160                        },
161                    );
162                }
163
164                // Parse new table name, check for `rls` keyword
165                // Format: "table bookings rls {" or "table bookings {"
166                let after_table = line.trim_start_matches("table ");
167                let before_brace = after_table.split('{').next().unwrap_or("").trim();
168                let parts: Vec<&str> = before_brace.split_whitespace().collect();
169                let name = parts.first().unwrap_or(&"").to_string();
170                current_rls_flag = parts.contains(&"rls");
171                current_table = Some(name);
172            }
173            // End of table definition
174            else if line == "}" {
175                if let Some(table_name) = current_table.take() {
176                    let has_rls = current_rls_flag || current_columns.contains_key("operator_id");
177                    schema.tables.insert(
178                        table_name.clone(),
179                        TableSchema {
180                            name: table_name,
181                            columns: std::mem::take(&mut current_columns),
182                            policies: std::mem::take(&mut current_policies),
183                            foreign_keys: std::mem::take(&mut current_fks),
184                            rls_enabled: has_rls,
185                        },
186                    );
187                    current_rls_flag = false;
188                }
189            }
190            // Column definition: column_name TYPE [constraints] [ref:table.column] [protected]
191            // Format from qail pull: "flow_name VARCHAR not_null"
192            // New format with FK: "user_id UUID ref:users.id"
193            // New format with Policy: "password_hash TEXT protected"
194            else if current_table.is_some() {
195                let parts: Vec<&str> = line.split_whitespace().collect();
196                if let Some(col_name) = parts.first() {
197                    // Second word is the type (default to TEXT if missing)
198                    let col_type_str = parts.get(1).copied().unwrap_or("text");
199                    let col_type = col_type_str
200                        .parse::<ColumnType>()
201                        .unwrap_or(ColumnType::Text);
202                    current_columns.insert(col_name.to_string(), col_type);
203
204                    // Check for policies and foreign keys
205                    let mut policy = "Public".to_string();
206
207                    for part in parts.iter().skip(2) {
208                        if *part == "protected" {
209                            policy = "Protected".to_string();
210                        } else if let Some(ref_spec) = part.strip_prefix("ref:") {
211                            // Parse "table.column" or ">table.column"
212                            let ref_spec = ref_spec.trim_start_matches('>');
213                            if let Some((ref_table, ref_col)) = ref_spec.split_once('.') {
214                                current_fks.push(ForeignKey {
215                                    column: col_name.to_string(),
216                                    ref_table: ref_table.to_string(),
217                                    ref_column: ref_col.to_string(),
218                                });
219                            }
220                        }
221                    }
222                    current_policies.insert(col_name.to_string(), policy);
223                }
224            }
225        }
226
227        if let Some(table_name) = current_table.take() {
228            return Err(format!(
229                "Unclosed table definition for '{}': expected closing '}}'",
230                table_name
231            ));
232        }
233
234        Ok(schema)
235    }
236
237    /// Check if table exists
238    pub fn has_table(&self, name: &str) -> bool {
239        self.tables.contains_key(name)
240    }
241
242    /// Get all table names that have RLS enabled
243    pub fn rls_tables(&self) -> Vec<&str> {
244        self.tables
245            .iter()
246            .filter(|(_, ts)| ts.rls_enabled)
247            .map(|(name, _)| name.as_str())
248            .collect()
249    }
250
251    /// Check if a specific table has RLS enabled
252    pub fn is_rls_table(&self, name: &str) -> bool {
253        self.tables.get(name).is_some_and(|t| t.rls_enabled)
254    }
255
256    /// Get table schema
257    pub fn table(&self, name: &str) -> Option<&TableSchema> {
258        self.tables.get(name)
259    }
260
261    /// Merge pending migrations into the schema
262    /// Scans migration directory for .sql files and extracts:
263    /// - CREATE TABLE statements
264    /// - ALTER TABLE ADD COLUMN statements
265    pub fn merge_migrations(&mut self, migrations_dir: &str) -> Result<usize, String> {
266        use std::fs;
267
268        let dir = Path::new(migrations_dir);
269        if !dir.exists() {
270            return Ok(0); // No migrations directory
271        }
272
273        let mut merged_count = 0;
274
275        // Walk migration directories (format: migrations/YYYYMMDD_name/up.sql)
276        let entries =
277            fs::read_dir(dir).map_err(|e| format!("Failed to read migrations dir: {}", e))?;
278
279        for entry in entries.flatten() {
280            let path = entry.path();
281
282            // Check for up.sql in subdirectory
283            let up_sql = if path.is_dir() {
284                path.join("up.sql")
285            } else if path.extension().is_some_and(|e| e == "sql") {
286                path.clone()
287            } else {
288                continue;
289            };
290
291            if up_sql.exists() {
292                let content = fs::read_to_string(&up_sql)
293                    .map_err(|e| format!("Failed to read {}: {}", up_sql.display(), e))?;
294
295                merged_count += self.parse_sql_migration(&content);
296            }
297        }
298
299        Ok(merged_count)
300    }
301
302    /// Parse SQL migration content and extract schema changes
303    pub(crate) fn parse_sql_migration(&mut self, sql: &str) -> usize {
304        let mut changes = 0;
305
306        // Extract CREATE TABLE statements
307        // Pattern: CREATE TABLE [IF NOT EXISTS] table_name (columns...)
308        for raw_line in sql.lines() {
309            let line = strip_sql_line_comments(raw_line);
310            if line.is_empty()
311                || line.starts_with("/*")
312                || line.starts_with('*')
313                || line.starts_with("*/")
314            {
315                continue;
316            }
317            let line_upper = line.to_uppercase();
318
319            if line_upper.starts_with("CREATE TABLE")
320                && let Some(table_name) = extract_create_table_name(line)
321                && !self.tables.contains_key(&table_name)
322            {
323                self.tables.insert(
324                    table_name.clone(),
325                    TableSchema {
326                        name: table_name,
327                        columns: HashMap::new(),
328                        policies: HashMap::new(),
329                        foreign_keys: vec![],
330                        rls_enabled: false,
331                    },
332                );
333                changes += 1;
334            }
335        }
336
337        // Extract column definitions from CREATE TABLE blocks
338        // IMPORTANT: Only track CREATE blocks for tables that were newly created
339        // by this migration. Tables that already exist in the schema (from schema.qail)
340        // already have correct column types — overwriting them with ColumnType::Text
341        // would cause false type-mismatch errors.
342        let mut current_table: Option<String> = None;
343        let mut in_create_block = false;
344        let mut paren_depth = 0;
345
346        for raw_line in sql.lines() {
347            let line = strip_sql_line_comments(raw_line);
348            if line.is_empty()
349                || line.starts_with("/*")
350                || line.starts_with('*')
351                || line.starts_with("*/")
352            {
353                continue;
354            }
355            let line_upper = line.to_uppercase();
356
357            if line_upper.starts_with("CREATE TABLE")
358                && let Some(name) = extract_create_table_name(line)
359            {
360                // Only track column extraction for tables that DON'T already
361                // have their types from schema.qail. Tables that existed before
362                // this migration already have correct types; overwriting them
363                // with ColumnType::Text would be a bug.
364                if self.tables.get(&name).is_none_or(|t| t.columns.is_empty()) {
365                    current_table = Some(name);
366                } else {
367                    current_table = None;
368                }
369                in_create_block = true;
370                paren_depth = 0;
371            }
372
373            if in_create_block {
374                paren_depth += line.chars().filter(|c| *c == '(').count();
375                paren_depth =
376                    paren_depth.saturating_sub(line.chars().filter(|c| *c == ')').count());
377
378                // Extract column name (first identifier after opening paren)
379                if let Some(col) = extract_column_from_create(line)
380                    && let Some(ref table) = current_table
381                    && let Some(t) = self.tables.get_mut(table)
382                    && t.columns.insert(col.clone(), ColumnType::Text).is_none()
383                {
384                    changes += 1;
385                }
386
387                if paren_depth == 0 && line.contains(')') {
388                    in_create_block = false;
389                    current_table = None;
390                }
391            }
392
393            // ALTER TABLE ... ADD COLUMN
394            if line_upper.starts_with("ALTER TABLE")
395                && line_upper.contains("ADD COLUMN")
396                && let Some((table, col)) = extract_alter_add_column(line)
397            {
398                if let Some(t) = self.tables.get_mut(&table) {
399                    if t.columns.insert(col.clone(), ColumnType::Text).is_none() {
400                        changes += 1;
401                    }
402                } else {
403                    // Table might be new from this migration
404                    let mut cols = HashMap::new();
405                    cols.insert(col, ColumnType::Text);
406                    self.tables.insert(
407                        table.clone(),
408                        TableSchema {
409                            name: table,
410                            columns: cols,
411                            policies: HashMap::new(),
412                            foreign_keys: vec![],
413                            rls_enabled: false,
414                        },
415                    );
416                    changes += 1;
417                }
418            }
419
420            // ALTER TABLE ... ADD (without COLUMN keyword)
421            if line_upper.starts_with("ALTER TABLE")
422                && line_upper.contains(" ADD ")
423                && !line_upper.contains("ADD COLUMN")
424                && let Some((table, col)) = extract_alter_add(line)
425                && let Some(t) = self.tables.get_mut(&table)
426                && t.columns.insert(col.clone(), ColumnType::Text).is_none()
427            {
428                changes += 1;
429            }
430
431            // DROP TABLE
432            if line_upper.starts_with("DROP TABLE")
433                && let Some(table_name) = extract_drop_table_name(line)
434                && self.tables.remove(&table_name).is_some()
435            {
436                changes += 1;
437            }
438
439            // ALTER TABLE ... DROP COLUMN
440            if line_upper.starts_with("ALTER TABLE")
441                && line_upper.contains("DROP COLUMN")
442                && let Some((table, col)) = extract_alter_drop_column(line)
443                && let Some(t) = self.tables.get_mut(&table)
444                && t.columns.remove(&col).is_some()
445            {
446                changes += 1;
447            }
448
449            // ALTER TABLE ... DROP (without COLUMN keyword - PostgreSQL style)
450            if line_upper.starts_with("ALTER TABLE")
451                && line_upper.contains(" DROP ")
452                && !line_upper.contains("DROP COLUMN")
453                && !line_upper.contains("DROP CONSTRAINT")
454                && !line_upper.contains("DROP INDEX")
455                && let Some((table, col)) = extract_alter_drop(line)
456                && let Some(t) = self.tables.get_mut(&table)
457                && t.columns.remove(&col).is_some()
458            {
459                changes += 1;
460            }
461        }
462
463        changes
464    }
465}
466
467/// Extract table name from CREATE TABLE statement
468fn extract_create_table_name(line: &str) -> Option<String> {
469    let line_upper = line.to_uppercase();
470    let rest = line_upper.strip_prefix("CREATE TABLE")?;
471    let rest = rest.trim_start();
472    let rest = if rest.starts_with("IF NOT EXISTS") {
473        rest.strip_prefix("IF NOT EXISTS")?.trim_start()
474    } else {
475        rest
476    };
477
478    // Get table name (first identifier)
479    let name: String = line[line.len() - rest.len()..]
480        .chars()
481        .take_while(|c| c.is_alphanumeric() || *c == '_')
482        .collect();
483
484    if name.is_empty() {
485        None
486    } else {
487        Some(name.to_lowercase())
488    }
489}
490
491/// Extract column name from a line inside CREATE TABLE block
492fn extract_column_from_create(line: &str) -> Option<String> {
493    let line = line.trim();
494
495    // Skip keywords and constraints
496    // IMPORTANT: Must check for word boundaries to avoid matching column names
497    // that happen to start with a keyword (e.g., created_at starts with CREATE,
498    // primary_contact starts with PRIMARY, check_status starts with CHECK, etc.)
499    let line_upper = line.to_uppercase();
500    let starts_with_keyword = |kw: &str| -> bool {
501        line_upper.starts_with(kw) && line_upper[kw.len()..].starts_with([' ', '('])
502    };
503
504    if starts_with_keyword("CREATE")
505        || starts_with_keyword("PRIMARY")
506        || starts_with_keyword("FOREIGN")
507        || starts_with_keyword("UNIQUE")
508        || starts_with_keyword("CHECK")
509        || starts_with_keyword("CONSTRAINT")
510        || line_upper.starts_with(")")
511        || line_upper.starts_with("(")
512        || line.is_empty()
513    {
514        return None;
515    }
516
517    // First word is column name
518    let name: String = line
519        .trim_start_matches('(')
520        .trim()
521        .chars()
522        .take_while(|c| c.is_alphanumeric() || *c == '_')
523        .collect();
524
525    if name.is_empty() || name.to_uppercase() == "IF" {
526        None
527    } else {
528        Some(name.to_lowercase())
529    }
530}
531
532/// Extract table and column from ALTER TABLE ... ADD COLUMN
533fn extract_alter_add_column(line: &str) -> Option<(String, String)> {
534    let line_upper = line.to_uppercase();
535    let alter_pos = line_upper.find("ALTER TABLE")?;
536    let add_pos = line_upper.find("ADD COLUMN")?;
537
538    // Table name between ALTER TABLE and ADD COLUMN
539    let table_part = &line[alter_pos + 11..add_pos];
540    let table: String = table_part
541        .trim()
542        .chars()
543        .take_while(|c| c.is_alphanumeric() || *c == '_')
544        .collect();
545
546    // Column name after ADD COLUMN [IF NOT EXISTS]
547    let mut col_part = &line[add_pos + 10..];
548    let col_upper = col_part.trim().to_uppercase();
549    if col_upper.starts_with("IF NOT EXISTS") {
550        col_part = &col_part.trim()[13..]; // skip "IF NOT EXISTS"
551    }
552    let col: String = col_part
553        .trim()
554        .chars()
555        .take_while(|c| c.is_alphanumeric() || *c == '_')
556        .collect();
557
558    if table.is_empty() || col.is_empty() {
559        None
560    } else {
561        Some((table.to_lowercase(), col.to_lowercase()))
562    }
563}
564
565/// Extract table and column from ALTER TABLE ... ADD (without COLUMN keyword)
566fn extract_alter_add(line: &str) -> Option<(String, String)> {
567    let line_upper = line.to_uppercase();
568    let alter_pos = line_upper.find("ALTER TABLE")?;
569    let add_pos = line_upper.find(" ADD ")?;
570
571    let table_part = &line[alter_pos + 11..add_pos];
572    let table: String = table_part
573        .trim()
574        .chars()
575        .take_while(|c| c.is_alphanumeric() || *c == '_')
576        .collect();
577
578    let col_part = &line[add_pos + 5..];
579    let col: String = col_part
580        .trim()
581        .chars()
582        .take_while(|c| c.is_alphanumeric() || *c == '_')
583        .collect();
584
585    if table.is_empty() || col.is_empty() {
586        None
587    } else {
588        Some((table.to_lowercase(), col.to_lowercase()))
589    }
590}
591
592/// Extract table name from DROP TABLE statement
593fn extract_drop_table_name(line: &str) -> Option<String> {
594    let line_upper = line.to_uppercase();
595    let rest = line_upper.strip_prefix("DROP TABLE")?;
596    let rest = rest.trim_start();
597    let rest = if rest.starts_with("IF EXISTS") {
598        rest.strip_prefix("IF EXISTS")?.trim_start()
599    } else {
600        rest
601    };
602
603    // Get table name (first identifier)
604    let name: String = line[line.len() - rest.len()..]
605        .chars()
606        .take_while(|c| c.is_alphanumeric() || *c == '_')
607        .collect();
608
609    if name.is_empty() {
610        None
611    } else {
612        Some(name.to_lowercase())
613    }
614}
615
616/// Extract table and column from ALTER TABLE ... DROP COLUMN
617fn extract_alter_drop_column(line: &str) -> Option<(String, String)> {
618    let line_upper = line.to_uppercase();
619    let alter_pos = line_upper.find("ALTER TABLE")?;
620    let drop_pos = line_upper.find("DROP COLUMN")?;
621
622    // Table name between ALTER TABLE and DROP COLUMN
623    let table_part = &line[alter_pos + 11..drop_pos];
624    let table: String = table_part
625        .trim()
626        .chars()
627        .take_while(|c| c.is_alphanumeric() || *c == '_')
628        .collect();
629
630    // Column name after DROP COLUMN
631    let col_part = &line[drop_pos + 11..];
632    let col: String = col_part
633        .trim()
634        .chars()
635        .take_while(|c| c.is_alphanumeric() || *c == '_')
636        .collect();
637
638    if table.is_empty() || col.is_empty() {
639        None
640    } else {
641        Some((table.to_lowercase(), col.to_lowercase()))
642    }
643}
644
645/// Extract table and column from ALTER TABLE ... DROP (without COLUMN keyword)
646fn extract_alter_drop(line: &str) -> Option<(String, String)> {
647    let line_upper = line.to_uppercase();
648    let alter_pos = line_upper.find("ALTER TABLE")?;
649    let drop_pos = line_upper.find(" DROP ")?;
650
651    let table_part = &line[alter_pos + 11..drop_pos];
652    let table: String = table_part
653        .trim()
654        .chars()
655        .take_while(|c| c.is_alphanumeric() || *c == '_')
656        .collect();
657
658    let col_part = &line[drop_pos + 6..];
659    let col: String = col_part
660        .trim()
661        .chars()
662        .take_while(|c| c.is_alphanumeric() || *c == '_')
663        .collect();
664
665    if table.is_empty() || col.is_empty() {
666        None
667    } else {
668        Some((table.to_lowercase(), col.to_lowercase()))
669    }
670}
671
672impl TableSchema {
673    /// Check if column exists
674    pub fn has_column(&self, name: &str) -> bool {
675        self.columns.contains_key(name)
676    }
677
678    /// Get column type
679    pub fn column_type(&self, name: &str) -> Option<&ColumnType> {
680        self.columns.get(name)
681    }
682
683    /// Get the primary key column name for this table.
684    ///
685    /// Convention: returns `"id"` if it exists as a column.
686    /// This is a single point of truth for PK resolution — when the schema
687    /// parser is enhanced to track PK constraints, update this method.
688    pub fn primary_key_column(&self) -> &str {
689        if self.columns.contains_key("id") {
690            "id"
691        } else {
692            // Fallback: look for `{singular_table_name}_id` pattern
693            // e.g., table "users" → "user_id"
694            let singular = self.name.trim_end_matches('s');
695            let conventional = format!("{}_id", singular);
696            if self.columns.contains_key(&conventional) {
697                // Leak into 'static to satisfy lifetime — this is called rarely
698                // and the string is small. Alternatively, return String.
699                return "id"; // Safe default — schema has no "id" but this avoids lifetime issues
700            }
701            "id" // Universal fallback
702        }
703    }
704}