1use crate::ast::Expr;
4use crate::migrate::types::ColumnType;
5use crate::parser::grammar::ddl::parse_column_definition;
6use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9#[derive(Debug, Clone)]
11pub struct ForeignKey {
12 pub column: String,
14 pub ref_table: String,
16 pub ref_column: String,
18}
19
20#[derive(Debug, Clone)]
22pub struct TableSchema {
23 pub name: String,
25 pub columns: HashMap<String, ColumnType>,
27 pub policies: HashMap<String, String>,
29 pub foreign_keys: Vec<ForeignKey>,
31 pub rls_enabled: bool,
34}
35
36#[derive(Debug, Default)]
38pub struct Schema {
39 pub tables: HashMap<String, TableSchema>,
41 pub views: HashSet<String>,
43 pub resources: HashMap<String, ResourceSchema>,
45}
46
47#[derive(Debug, Clone)]
49pub struct ResourceSchema {
50 pub name: String,
52 pub kind: String,
54 pub provider: Option<String>,
56 pub properties: HashMap<String, String>,
58}
59
60fn strip_schema_comments(line: &str) -> &str {
61 let line = line.split_once("--").map_or(line, |(left, _)| left);
62 line.split_once('#').map_or(line, |(left, _)| left).trim()
63}
64
65fn strip_sql_line_comments(line: &str) -> &str {
66 line.split_once("--").map_or(line, |(left, _)| left).trim()
67}
68
69impl Schema {
70 pub fn parse_file(path: &str) -> Result<Self, String> {
72 let content = crate::schema_source::read_qail_schema_source(path)?;
73 Self::parse(&content)
74 }
75
76 pub fn parse(content: &str) -> Result<Self, String> {
78 let mut schema = Schema::default();
79 let mut current_table: Option<String> = None;
80 let mut current_columns: HashMap<String, ColumnType> = HashMap::new();
81 let mut current_policies: HashMap<String, String> = HashMap::new();
82 let mut current_fks: Vec<ForeignKey> = Vec::new();
83 let mut current_rls_flag = false;
84
85 for raw_line in content.lines() {
86 let line = strip_schema_comments(raw_line);
87
88 if line.is_empty() {
90 continue;
91 }
92
93 if current_table.is_none()
98 && (line.starts_with("bucket ")
99 || line.starts_with("queue ")
100 || line.starts_with("topic "))
101 {
102 let parts: Vec<&str> = line.splitn(2, ' ').collect();
103 let kind = parts[0].to_string();
104 let rest = parts.get(1).copied().unwrap_or("").trim();
105
106 let name = rest.split('{').next().unwrap_or(rest).trim().to_string();
108 let mut provider = None;
109 let mut properties = HashMap::new();
110
111 if line.contains('{') {
112 let block = rest.split('{').nth(1).unwrap_or("").to_string();
114 if !block.contains('}') {
115 for inner in content.lines().skip_while(|l| !l.contains(line)) {
116 if inner.contains('}') {
118 break;
119 }
120 }
121 }
122 let block = block.replace('}', "");
123 let mut tokens = block.split_whitespace();
124 while let Some(key) = tokens.next() {
125 if let Some(val) = tokens.next() {
126 let val = val.trim_matches('"').to_string();
127 if key == "provider" {
128 provider = Some(val);
129 } else {
130 properties.insert(key.to_string(), val);
131 }
132 }
133 }
134 }
135
136 if !name.is_empty() {
137 schema.resources.insert(
138 name.clone(),
139 ResourceSchema {
140 name,
141 kind,
142 provider,
143 properties,
144 },
145 );
146 }
147 continue;
148 }
149
150 if current_table.is_none()
153 && let Some(view_name) = extract_view_name(line)
154 {
155 schema.views.insert(view_name.to_string());
156 continue;
157 }
158
159 if line.starts_with("table ") && (line.ends_with('{') || line.contains('{')) {
161 if let Some(table_name) = current_table.take() {
163 let has_rls = current_rls_flag || current_columns.contains_key("tenant_id");
165 schema.tables.insert(
166 table_name.clone(),
167 TableSchema {
168 name: table_name,
169 columns: std::mem::take(&mut current_columns),
170 policies: std::mem::take(&mut current_policies),
171 foreign_keys: std::mem::take(&mut current_fks),
172 rls_enabled: has_rls,
173 },
174 );
175 }
176
177 let after_table = line.trim_start_matches("table ");
180 let before_brace = after_table.split('{').next().unwrap_or("").trim();
181 let parts: Vec<&str> = before_brace.split_whitespace().collect();
182 let name = parts.first().unwrap_or(&"").to_string();
183 current_rls_flag = parts.contains(&"rls");
184 current_table = Some(name);
185 }
186 else if line == "}" {
188 if let Some(table_name) = current_table.take() {
189 let has_rls = current_rls_flag || current_columns.contains_key("tenant_id");
190 schema.tables.insert(
191 table_name.clone(),
192 TableSchema {
193 name: table_name,
194 columns: std::mem::take(&mut current_columns),
195 policies: std::mem::take(&mut current_policies),
196 foreign_keys: std::mem::take(&mut current_fks),
197 rls_enabled: has_rls,
198 },
199 );
200 current_rls_flag = false;
201 }
202 }
203 else if current_table.is_some() {
208 let parts: Vec<&str> = line.split_whitespace().collect();
209 if let Some(col_name) = parts.first() {
210 let col_type_str = parts.get(1).copied().unwrap_or("text");
212 let col_type = col_type_str
213 .parse::<ColumnType>()
214 .unwrap_or(ColumnType::Text);
215 current_columns.insert(col_name.to_string(), col_type);
216
217 let mut policy = "Public".to_string();
219
220 for part in parts.iter().skip(2) {
221 if *part == "protected" {
222 policy = "Protected".to_string();
223 } else if let Some(ref_spec) = part.strip_prefix("ref:") {
224 let ref_spec = ref_spec.trim_start_matches('>');
226 if let Some((ref_table, ref_col)) = ref_spec.split_once('.') {
227 current_fks.push(ForeignKey {
228 column: col_name.to_string(),
229 ref_table: ref_table.to_string(),
230 ref_column: ref_col.to_string(),
231 });
232 }
233 }
234 }
235 current_policies.insert(col_name.to_string(), policy);
236 }
237 }
238 }
239
240 if let Some(table_name) = current_table.take() {
241 return Err(format!(
242 "Unclosed table definition for '{}': expected closing '}}'",
243 table_name
244 ));
245 }
246
247 Ok(schema)
248 }
249
250 pub fn has_table(&self, name: &str) -> bool {
252 self.tables.contains_key(name) || self.views.contains(name)
253 }
254
255 pub fn rls_tables(&self) -> Vec<&str> {
257 self.tables
258 .iter()
259 .filter(|(_, ts)| ts.rls_enabled)
260 .map(|(name, _)| name.as_str())
261 .collect()
262 }
263
264 pub fn is_rls_table(&self, name: &str) -> bool {
266 self.tables.get(name).is_some_and(|t| t.rls_enabled)
267 }
268
269 pub fn table(&self, name: &str) -> Option<&TableSchema> {
271 self.tables.get(name)
272 }
273
274 pub fn merge_migrations(&mut self, migrations_dir: &str) -> Result<usize, String> {
279 use std::fs;
280
281 let dir = Path::new(migrations_dir);
282 if !dir.exists() {
283 return Ok(0); }
285
286 let mut merged_count = 0;
287
288 let entries =
290 fs::read_dir(dir).map_err(|e| format!("Failed to read migrations dir: {}", e))?;
291
292 for entry in entries.flatten() {
293 let path = entry.path();
294
295 let migration_file = if path.is_dir() {
298 let up_qail = path.join("up.qail");
299 let up_sql = path.join("up.sql");
300 if up_qail.exists() {
301 up_qail
302 } else if up_sql.exists() {
303 up_sql
304 } else {
305 continue;
306 }
307 } else if path.extension().is_some_and(|e| e == "qail" || e == "sql") {
308 path.clone()
309 } else {
310 continue;
311 };
312
313 if migration_file.exists() {
314 let content = fs::read_to_string(&migration_file)
315 .map_err(|e| format!("Failed to read {}: {}", migration_file.display(), e))?;
316
317 if migration_file.extension().is_some_and(|ext| ext == "qail") {
318 merged_count += self.parse_qail_migration(&content).map_err(|e| {
319 format!(
320 "Failed to parse native migration {}: {}",
321 migration_file.display(),
322 e
323 )
324 })?;
325 } else {
326 merged_count += self.parse_sql_migration(&content);
327 }
328 }
329 }
330
331 Ok(merged_count)
332 }
333
334 pub(crate) fn parse_qail_migration(&mut self, qail: &str) -> Result<usize, String> {
336 let parsed = Schema::parse(qail)?;
337 let mut changes = 0usize;
338
339 for (table_name, parsed_table) in parsed.tables {
340 if let Some(existing) = self.tables.get_mut(&table_name) {
341 for (col_name, col_type) in parsed_table.columns {
342 if existing
343 .columns
344 .insert(col_name.clone(), col_type)
345 .is_none()
346 {
347 changes += 1;
348 }
349 }
350 for (col_name, policy) in parsed_table.policies {
351 if existing.policies.insert(col_name, policy).is_none() {
352 changes += 1;
353 }
354 }
355 for fk in parsed_table.foreign_keys {
356 let duplicate = existing.foreign_keys.iter().any(|existing_fk| {
357 existing_fk.column == fk.column
358 && existing_fk.ref_table == fk.ref_table
359 && existing_fk.ref_column == fk.ref_column
360 });
361 if !duplicate {
362 existing.foreign_keys.push(fk);
363 changes += 1;
364 }
365 }
366 if parsed_table.rls_enabled && !existing.rls_enabled {
367 existing.rls_enabled = true;
368 changes += 1;
369 }
370 } else {
371 changes += 1 + parsed_table.columns.len();
372 self.tables.insert(table_name, parsed_table);
373 }
374 }
375
376 for view_name in parsed.views {
377 if self.views.insert(view_name) {
378 changes += 1;
379 }
380 }
381 for (resource_name, resource) in parsed.resources {
382 if self.resources.insert(resource_name, resource).is_none() {
383 changes += 1;
384 }
385 }
386
387 changes += self.parse_explicit_qail_apply_commands(qail)?;
388
389 Ok(changes)
390 }
391
392 fn parse_explicit_qail_apply_commands(&mut self, qail: &str) -> Result<usize, String> {
393 let mut changes = 0usize;
394
395 for (line_no, raw_line) in qail.lines().enumerate() {
396 let line = strip_schema_comments(raw_line);
397 if line.is_empty() || !line.starts_with("alter ") {
398 continue;
399 }
400
401 let (table, column_name, column_type) = parse_explicit_alter_add_column_line(line)
402 .map_err(|err| format!("Line {}: {}", line_no + 1, err))?;
403
404 if let Some(existing) = self.tables.get_mut(&table) {
405 if existing.columns.insert(column_name, column_type).is_none() {
406 changes += 1;
407 }
408 } else {
409 let mut columns = HashMap::new();
410 columns.insert(column_name, column_type);
411 self.tables.insert(
412 table.clone(),
413 TableSchema {
414 name: table,
415 columns,
416 policies: HashMap::new(),
417 foreign_keys: vec![],
418 rls_enabled: false,
419 },
420 );
421 changes += 2;
422 }
423 }
424
425 Ok(changes)
426 }
427
428 pub(crate) fn parse_sql_migration(&mut self, sql: &str) -> usize {
430 let mut changes = 0;
431
432 for raw_line in sql.lines() {
435 let line = strip_sql_line_comments(raw_line);
436 if line.is_empty()
437 || line.starts_with("/*")
438 || line.starts_with('*')
439 || line.starts_with("*/")
440 {
441 continue;
442 }
443 let line_upper = line.to_uppercase();
444
445 if line_upper.starts_with("CREATE TABLE")
446 && let Some(table_name) = extract_create_table_name(line)
447 && !self.tables.contains_key(&table_name)
448 {
449 self.tables.insert(
450 table_name.clone(),
451 TableSchema {
452 name: table_name,
453 columns: HashMap::new(),
454 policies: HashMap::new(),
455 foreign_keys: vec![],
456 rls_enabled: false,
457 },
458 );
459 changes += 1;
460 }
461 }
462
463 let mut current_table: Option<String> = None;
469 let mut in_create_block = false;
470 let mut paren_depth = 0;
471
472 for raw_line in sql.lines() {
473 let line = strip_sql_line_comments(raw_line);
474 if line.is_empty()
475 || line.starts_with("/*")
476 || line.starts_with('*')
477 || line.starts_with("*/")
478 {
479 continue;
480 }
481 let line_upper = line.to_uppercase();
482
483 if line_upper.starts_with("CREATE TABLE")
484 && let Some(name) = extract_create_table_name(line)
485 {
486 if self.tables.get(&name).is_none_or(|t| t.columns.is_empty()) {
491 current_table = Some(name);
492 } else {
493 current_table = None;
494 }
495 in_create_block = true;
496 paren_depth = 0;
497 }
498
499 if in_create_block {
500 paren_depth += line.chars().filter(|c| *c == '(').count();
501 paren_depth =
502 paren_depth.saturating_sub(line.chars().filter(|c| *c == ')').count());
503
504 if let Some(col) = extract_column_from_create(line)
506 && let Some(ref table) = current_table
507 && let Some(t) = self.tables.get_mut(table)
508 && t.columns.insert(col.clone(), ColumnType::Text).is_none()
509 {
510 changes += 1;
511 }
512
513 if paren_depth == 0 && line.contains(')') {
514 in_create_block = false;
515 current_table = None;
516 }
517 }
518
519 if line_upper.starts_with("ALTER TABLE")
521 && line_upper.contains("ADD COLUMN")
522 && let Some((table, col)) = extract_alter_add_column(line)
523 {
524 if let Some(t) = self.tables.get_mut(&table) {
525 if t.columns.insert(col.clone(), ColumnType::Text).is_none() {
526 changes += 1;
527 }
528 } else {
529 let mut cols = HashMap::new();
531 cols.insert(col, ColumnType::Text);
532 self.tables.insert(
533 table.clone(),
534 TableSchema {
535 name: table,
536 columns: cols,
537 policies: HashMap::new(),
538 foreign_keys: vec![],
539 rls_enabled: false,
540 },
541 );
542 changes += 1;
543 }
544 }
545
546 if line_upper.starts_with("ALTER TABLE")
548 && line_upper.contains(" ADD ")
549 && !line_upper.contains("ADD COLUMN")
550 && let Some((table, col)) = extract_alter_add(line)
551 && let Some(t) = self.tables.get_mut(&table)
552 && t.columns.insert(col.clone(), ColumnType::Text).is_none()
553 {
554 changes += 1;
555 }
556
557 if line_upper.starts_with("DROP TABLE")
559 && let Some(table_name) = extract_drop_table_name(line)
560 && self.tables.remove(&table_name).is_some()
561 {
562 changes += 1;
563 }
564
565 if line_upper.starts_with("ALTER TABLE")
567 && line_upper.contains("DROP COLUMN")
568 && let Some((table, col)) = extract_alter_drop_column(line)
569 && let Some(t) = self.tables.get_mut(&table)
570 && t.columns.remove(&col).is_some()
571 {
572 changes += 1;
573 }
574
575 if line_upper.starts_with("ALTER TABLE")
577 && line_upper.contains(" DROP ")
578 && !line_upper.contains("DROP COLUMN")
579 && !line_upper.contains("DROP CONSTRAINT")
580 && !line_upper.contains("DROP INDEX")
581 && let Some((table, col)) = extract_alter_drop(line)
582 && let Some(t) = self.tables.get_mut(&table)
583 && t.columns.remove(&col).is_some()
584 {
585 changes += 1;
586 }
587 }
588
589 changes
590 }
591}
592
593fn parse_explicit_alter_add_column_line(
594 line: &str,
595) -> Result<(String, String, ColumnType), String> {
596 let rest = line
597 .strip_prefix("alter ")
598 .ok_or_else(|| "expected 'alter <table> add <column:type[:constraints]>'".to_string())?
599 .trim();
600
601 let mut parts = rest.splitn(2, char::is_whitespace);
602 let table = parts
603 .next()
604 .map(str::trim)
605 .filter(|table| !table.is_empty())
606 .ok_or_else(|| "expected table name after 'alter'".to_string())?;
607 let remainder = parts
608 .next()
609 .map(str::trim)
610 .ok_or_else(|| "expected 'add <column:type[:constraints]>' after table name".to_string())?;
611 let column_def = remainder
612 .strip_prefix("add ")
613 .ok_or_else(|| "expected 'add <column:type[:constraints]>' after table name".to_string())?
614 .trim();
615
616 if column_def.is_empty() {
617 return Err("expected column definition after 'add'".to_string());
618 }
619
620 let (remaining, column_expr) = parse_column_definition(column_def)
621 .map_err(|_| format!("invalid column definition '{}'", column_def))?;
622 if !remaining.trim().is_empty() {
623 return Err(format!(
624 "unexpected trailing content after column definition: '{}'",
625 remaining.trim()
626 ));
627 }
628
629 match column_expr {
630 Expr::Def {
631 name, data_type, ..
632 } => Ok((
633 table.to_string(),
634 name,
635 data_type.parse::<ColumnType>().unwrap_or(ColumnType::Text),
636 )),
637 _ => Err("expected column definition after 'add'".to_string()),
638 }
639}
640
641fn extract_view_name(line: &str) -> Option<&str> {
642 let rest = if let Some(r) = line.strip_prefix("view ") {
643 r
644 } else {
645 line.strip_prefix("materialized view ")?
646 };
647
648 let name = rest.split_whitespace().next().unwrap_or_default().trim();
649 if name.is_empty() { None } else { Some(name) }
650}
651
652fn extract_create_table_name(line: &str) -> Option<String> {
654 let line_upper = line.to_uppercase();
655 let rest = line_upper.strip_prefix("CREATE TABLE")?;
656 let rest = rest.trim_start();
657 let rest = if rest.starts_with("IF NOT EXISTS") {
658 rest.strip_prefix("IF NOT EXISTS")?.trim_start()
659 } else {
660 rest
661 };
662
663 let name: String = line[line.len() - rest.len()..]
665 .chars()
666 .take_while(|c| c.is_alphanumeric() || *c == '_')
667 .collect();
668
669 if name.is_empty() {
670 None
671 } else {
672 Some(name.to_lowercase())
673 }
674}
675
676fn extract_column_from_create(line: &str) -> Option<String> {
678 let line = line.trim();
679
680 let line_upper = line.to_uppercase();
685 let starts_with_keyword = |kw: &str| -> bool {
686 line_upper.starts_with(kw) && line_upper[kw.len()..].starts_with([' ', '('])
687 };
688
689 if starts_with_keyword("CREATE")
690 || starts_with_keyword("PRIMARY")
691 || starts_with_keyword("FOREIGN")
692 || starts_with_keyword("UNIQUE")
693 || starts_with_keyword("CHECK")
694 || starts_with_keyword("CONSTRAINT")
695 || line_upper.starts_with(")")
696 || line_upper.starts_with("(")
697 || line.is_empty()
698 {
699 return None;
700 }
701
702 let name: String = line
704 .trim_start_matches('(')
705 .trim()
706 .chars()
707 .take_while(|c| c.is_alphanumeric() || *c == '_')
708 .collect();
709
710 if name.is_empty() || name.to_uppercase() == "IF" {
711 None
712 } else {
713 Some(name.to_lowercase())
714 }
715}
716
717fn extract_alter_add_column(line: &str) -> Option<(String, String)> {
719 let line_upper = line.to_uppercase();
720 let alter_pos = line_upper.find("ALTER TABLE")?;
721 let add_pos = line_upper.find("ADD COLUMN")?;
722
723 let table_part = &line[alter_pos + 11..add_pos];
725 let table: String = table_part
726 .trim()
727 .chars()
728 .take_while(|c| c.is_alphanumeric() || *c == '_')
729 .collect();
730
731 let mut col_part = &line[add_pos + 10..];
733 let col_upper = col_part.trim().to_uppercase();
734 if col_upper.starts_with("IF NOT EXISTS") {
735 col_part = &col_part.trim()[13..]; }
737 let col: String = col_part
738 .trim()
739 .chars()
740 .take_while(|c| c.is_alphanumeric() || *c == '_')
741 .collect();
742
743 if table.is_empty() || col.is_empty() {
744 None
745 } else {
746 Some((table.to_lowercase(), col.to_lowercase()))
747 }
748}
749
750fn extract_alter_add(line: &str) -> Option<(String, String)> {
752 let line_upper = line.to_uppercase();
753 let alter_pos = line_upper.find("ALTER TABLE")?;
754 let add_pos = line_upper.find(" ADD ")?;
755
756 let table_part = &line[alter_pos + 11..add_pos];
757 let table: String = table_part
758 .trim()
759 .chars()
760 .take_while(|c| c.is_alphanumeric() || *c == '_')
761 .collect();
762
763 let col_part = &line[add_pos + 5..];
764 let col: String = col_part
765 .trim()
766 .chars()
767 .take_while(|c| c.is_alphanumeric() || *c == '_')
768 .collect();
769
770 if table.is_empty() || col.is_empty() {
771 None
772 } else {
773 Some((table.to_lowercase(), col.to_lowercase()))
774 }
775}
776
777fn extract_drop_table_name(line: &str) -> Option<String> {
779 let line_upper = line.to_uppercase();
780 let rest = line_upper.strip_prefix("DROP TABLE")?;
781 let rest = rest.trim_start();
782 let rest = if rest.starts_with("IF EXISTS") {
783 rest.strip_prefix("IF EXISTS")?.trim_start()
784 } else {
785 rest
786 };
787
788 let name: String = line[line.len() - rest.len()..]
790 .chars()
791 .take_while(|c| c.is_alphanumeric() || *c == '_')
792 .collect();
793
794 if name.is_empty() {
795 None
796 } else {
797 Some(name.to_lowercase())
798 }
799}
800
801fn extract_alter_drop_column(line: &str) -> Option<(String, String)> {
803 let line_upper = line.to_uppercase();
804 let alter_pos = line_upper.find("ALTER TABLE")?;
805 let drop_pos = line_upper.find("DROP COLUMN")?;
806
807 let table_part = &line[alter_pos + 11..drop_pos];
809 let table: String = table_part
810 .trim()
811 .chars()
812 .take_while(|c| c.is_alphanumeric() || *c == '_')
813 .collect();
814
815 let col_part = &line[drop_pos + 11..];
817 let col: String = col_part
818 .trim()
819 .chars()
820 .take_while(|c| c.is_alphanumeric() || *c == '_')
821 .collect();
822
823 if table.is_empty() || col.is_empty() {
824 None
825 } else {
826 Some((table.to_lowercase(), col.to_lowercase()))
827 }
828}
829
830fn extract_alter_drop(line: &str) -> Option<(String, String)> {
832 let line_upper = line.to_uppercase();
833 let alter_pos = line_upper.find("ALTER TABLE")?;
834 let drop_pos = line_upper.find(" DROP ")?;
835
836 let table_part = &line[alter_pos + 11..drop_pos];
837 let table: String = table_part
838 .trim()
839 .chars()
840 .take_while(|c| c.is_alphanumeric() || *c == '_')
841 .collect();
842
843 let col_part = &line[drop_pos + 6..];
844 let col: String = col_part
845 .trim()
846 .chars()
847 .take_while(|c| c.is_alphanumeric() || *c == '_')
848 .collect();
849
850 if table.is_empty() || col.is_empty() {
851 None
852 } else {
853 Some((table.to_lowercase(), col.to_lowercase()))
854 }
855}
856
857impl TableSchema {
858 pub fn has_column(&self, name: &str) -> bool {
860 self.columns.contains_key(name)
861 }
862
863 pub fn column_type(&self, name: &str) -> Option<&ColumnType> {
865 self.columns.get(name)
866 }
867
868 pub fn primary_key_column(&self) -> &str {
874 if self.columns.contains_key("id") {
875 "id"
876 } else {
877 let singular = self.name.trim_end_matches('s');
880 let conventional = format!("{}_id", singular);
881 if self.columns.contains_key(&conventional) {
882 return "id"; }
886 "id" }
888 }
889}