use crate::ast::Expr;
use crate::migrate::types::ColumnType;
use crate::parser::grammar::ddl::parse_column_definition;
use std::collections::{HashMap, HashSet};
use std::path::Path;
#[derive(Debug, Clone)]
pub struct ForeignKey {
pub column: String,
pub ref_table: String,
pub ref_column: String,
}
#[derive(Debug, Clone)]
pub struct TableSchema {
pub name: String,
pub columns: HashMap<String, ColumnType>,
pub policies: HashMap<String, String>,
pub foreign_keys: Vec<ForeignKey>,
pub rls_enabled: bool,
}
#[derive(Debug, Default)]
pub struct Schema {
pub tables: HashMap<String, TableSchema>,
pub views: HashSet<String>,
pub resources: HashMap<String, ResourceSchema>,
}
#[derive(Debug, Clone)]
pub struct ResourceSchema {
pub name: String,
pub kind: String,
pub provider: Option<String>,
pub properties: HashMap<String, String>,
}
fn strip_schema_comments(line: &str) -> &str {
let line = line.split_once("--").map_or(line, |(left, _)| left);
line.split_once('#').map_or(line, |(left, _)| left).trim()
}
fn strip_sql_line_comments(line: &str) -> &str {
line.split_once("--").map_or(line, |(left, _)| left).trim()
}
impl Schema {
pub fn parse_file(path: &str) -> Result<Self, String> {
let content = crate::schema_source::read_qail_schema_source(path)?;
Self::parse(&content)
}
pub fn parse(content: &str) -> Result<Self, String> {
let mut schema = Schema::default();
let mut current_table: Option<String> = None;
let mut current_columns: HashMap<String, ColumnType> = HashMap::new();
let mut current_policies: HashMap<String, String> = HashMap::new();
let mut current_fks: Vec<ForeignKey> = Vec::new();
let mut current_rls_flag = false;
for raw_line in content.lines() {
let line = strip_schema_comments(raw_line);
if line.is_empty() {
continue;
}
if current_table.is_none()
&& (line.starts_with("bucket ")
|| line.starts_with("queue ")
|| line.starts_with("topic "))
{
let parts: Vec<&str> = line.splitn(2, ' ').collect();
let kind = parts[0].to_string();
let rest = parts.get(1).copied().unwrap_or("").trim();
let name = rest.split('{').next().unwrap_or(rest).trim().to_string();
let mut provider = None;
let mut properties = HashMap::new();
if line.contains('{') {
let block = rest.split('{').nth(1).unwrap_or("").to_string();
if !block.contains('}') {
for inner in content.lines().skip_while(|l| !l.contains(line)) {
if inner.contains('}') {
break;
}
}
}
let block = block.replace('}', "");
let mut tokens = block.split_whitespace();
while let Some(key) = tokens.next() {
if let Some(val) = tokens.next() {
let val = val.trim_matches('"').to_string();
if key == "provider" {
provider = Some(val);
} else {
properties.insert(key.to_string(), val);
}
}
}
}
if !name.is_empty() {
schema.resources.insert(
name.clone(),
ResourceSchema {
name,
kind,
provider,
properties,
},
);
}
continue;
}
if current_table.is_none()
&& let Some(view_name) = extract_view_name(line)
{
schema.views.insert(view_name.to_string());
continue;
}
if line.starts_with("table ") && (line.ends_with('{') || line.contains('{')) {
if let Some(table_name) = current_table.take() {
let has_rls = current_rls_flag || current_columns.contains_key("tenant_id");
schema.tables.insert(
table_name.clone(),
TableSchema {
name: table_name,
columns: std::mem::take(&mut current_columns),
policies: std::mem::take(&mut current_policies),
foreign_keys: std::mem::take(&mut current_fks),
rls_enabled: has_rls,
},
);
}
let after_table = line.trim_start_matches("table ");
let before_brace = after_table.split('{').next().unwrap_or("").trim();
let parts: Vec<&str> = before_brace.split_whitespace().collect();
let name = parts.first().unwrap_or(&"").to_string();
current_rls_flag = parts.contains(&"rls");
current_table = Some(name);
}
else if line == "}" {
if let Some(table_name) = current_table.take() {
let has_rls = current_rls_flag || current_columns.contains_key("tenant_id");
schema.tables.insert(
table_name.clone(),
TableSchema {
name: table_name,
columns: std::mem::take(&mut current_columns),
policies: std::mem::take(&mut current_policies),
foreign_keys: std::mem::take(&mut current_fks),
rls_enabled: has_rls,
},
);
current_rls_flag = false;
}
}
else if current_table.is_some() {
let parts: Vec<&str> = line.split_whitespace().collect();
if let Some(col_name) = parts.first() {
let col_type_str = parts.get(1).copied().unwrap_or("text");
let col_type = col_type_str
.parse::<ColumnType>()
.unwrap_or(ColumnType::Text);
current_columns.insert(col_name.to_string(), col_type);
let mut policy = "Public".to_string();
for part in parts.iter().skip(2) {
if *part == "protected" {
policy = "Protected".to_string();
} else if let Some(ref_spec) = part.strip_prefix("ref:") {
let ref_spec = ref_spec.trim_start_matches('>');
if let Some((ref_table, ref_col)) = ref_spec.split_once('.') {
current_fks.push(ForeignKey {
column: col_name.to_string(),
ref_table: ref_table.to_string(),
ref_column: ref_col.to_string(),
});
}
}
}
current_policies.insert(col_name.to_string(), policy);
}
}
}
if let Some(table_name) = current_table.take() {
return Err(format!(
"Unclosed table definition for '{}': expected closing '}}'",
table_name
));
}
Ok(schema)
}
pub fn has_table(&self, name: &str) -> bool {
self.tables.contains_key(name) || self.views.contains(name)
}
pub fn rls_tables(&self) -> Vec<&str> {
self.tables
.iter()
.filter(|(_, ts)| ts.rls_enabled)
.map(|(name, _)| name.as_str())
.collect()
}
pub fn is_rls_table(&self, name: &str) -> bool {
self.tables.get(name).is_some_and(|t| t.rls_enabled)
}
pub fn table(&self, name: &str) -> Option<&TableSchema> {
self.tables.get(name)
}
pub fn merge_migrations(&mut self, migrations_dir: &str) -> Result<usize, String> {
use std::fs;
let dir = Path::new(migrations_dir);
if !dir.exists() {
return Ok(0); }
let mut merged_count = 0;
let entries =
fs::read_dir(dir).map_err(|e| format!("Failed to read migrations dir: {}", e))?;
for entry in entries.flatten() {
let path = entry.path();
let migration_file = if path.is_dir() {
let up_qail = path.join("up.qail");
let up_sql = path.join("up.sql");
if up_qail.exists() {
up_qail
} else if up_sql.exists() {
up_sql
} else {
continue;
}
} else if path.extension().is_some_and(|e| e == "qail" || e == "sql") {
path.clone()
} else {
continue;
};
if migration_file.exists() {
let content = fs::read_to_string(&migration_file)
.map_err(|e| format!("Failed to read {}: {}", migration_file.display(), e))?;
if migration_file.extension().is_some_and(|ext| ext == "qail") {
merged_count += self.parse_qail_migration(&content).map_err(|e| {
format!(
"Failed to parse native migration {}: {}",
migration_file.display(),
e
)
})?;
} else {
merged_count += self.parse_sql_migration(&content);
}
}
}
Ok(merged_count)
}
pub(crate) fn parse_qail_migration(&mut self, qail: &str) -> Result<usize, String> {
let parsed = Schema::parse(qail)?;
let mut changes = 0usize;
for (table_name, parsed_table) in parsed.tables {
if let Some(existing) = self.tables.get_mut(&table_name) {
for (col_name, col_type) in parsed_table.columns {
if existing
.columns
.insert(col_name.clone(), col_type)
.is_none()
{
changes += 1;
}
}
for (col_name, policy) in parsed_table.policies {
if existing.policies.insert(col_name, policy).is_none() {
changes += 1;
}
}
for fk in parsed_table.foreign_keys {
let duplicate = existing.foreign_keys.iter().any(|existing_fk| {
existing_fk.column == fk.column
&& existing_fk.ref_table == fk.ref_table
&& existing_fk.ref_column == fk.ref_column
});
if !duplicate {
existing.foreign_keys.push(fk);
changes += 1;
}
}
if parsed_table.rls_enabled && !existing.rls_enabled {
existing.rls_enabled = true;
changes += 1;
}
} else {
changes += 1 + parsed_table.columns.len();
self.tables.insert(table_name, parsed_table);
}
}
for view_name in parsed.views {
if self.views.insert(view_name) {
changes += 1;
}
}
for (resource_name, resource) in parsed.resources {
if self.resources.insert(resource_name, resource).is_none() {
changes += 1;
}
}
changes += self.parse_explicit_qail_apply_commands(qail)?;
Ok(changes)
}
fn parse_explicit_qail_apply_commands(&mut self, qail: &str) -> Result<usize, String> {
let mut changes = 0usize;
for (line_no, raw_line) in qail.lines().enumerate() {
let line = strip_schema_comments(raw_line);
if line.is_empty() || !line.starts_with("alter ") {
continue;
}
let (table, column_name, column_type) = parse_explicit_alter_add_column_line(line)
.map_err(|err| format!("Line {}: {}", line_no + 1, err))?;
if let Some(existing) = self.tables.get_mut(&table) {
if existing.columns.insert(column_name, column_type).is_none() {
changes += 1;
}
} else {
let mut columns = HashMap::new();
columns.insert(column_name, column_type);
self.tables.insert(
table.clone(),
TableSchema {
name: table,
columns,
policies: HashMap::new(),
foreign_keys: vec![],
rls_enabled: false,
},
);
changes += 2;
}
}
Ok(changes)
}
pub(crate) fn parse_sql_migration(&mut self, sql: &str) -> usize {
let mut changes = 0;
for raw_line in sql.lines() {
let line = strip_sql_line_comments(raw_line);
if line.is_empty()
|| line.starts_with("/*")
|| line.starts_with('*')
|| line.starts_with("*/")
{
continue;
}
let line_upper = line.to_uppercase();
if line_upper.starts_with("CREATE TABLE")
&& let Some(table_name) = extract_create_table_name(line)
&& !self.tables.contains_key(&table_name)
{
self.tables.insert(
table_name.clone(),
TableSchema {
name: table_name,
columns: HashMap::new(),
policies: HashMap::new(),
foreign_keys: vec![],
rls_enabled: false,
},
);
changes += 1;
}
}
let mut current_table: Option<String> = None;
let mut in_create_block = false;
let mut paren_depth = 0;
for raw_line in sql.lines() {
let line = strip_sql_line_comments(raw_line);
if line.is_empty()
|| line.starts_with("/*")
|| line.starts_with('*')
|| line.starts_with("*/")
{
continue;
}
let line_upper = line.to_uppercase();
if line_upper.starts_with("CREATE TABLE")
&& let Some(name) = extract_create_table_name(line)
{
if self.tables.get(&name).is_none_or(|t| t.columns.is_empty()) {
current_table = Some(name);
} else {
current_table = None;
}
in_create_block = true;
paren_depth = 0;
}
if in_create_block {
paren_depth += line.chars().filter(|c| *c == '(').count();
paren_depth =
paren_depth.saturating_sub(line.chars().filter(|c| *c == ')').count());
if let Some(col) = extract_column_from_create(line)
&& let Some(ref table) = current_table
&& let Some(t) = self.tables.get_mut(table)
&& t.columns.insert(col.clone(), ColumnType::Text).is_none()
{
changes += 1;
}
if paren_depth == 0 && line.contains(')') {
in_create_block = false;
current_table = None;
}
}
if line_upper.starts_with("ALTER TABLE")
&& line_upper.contains("ADD COLUMN")
&& let Some((table, col)) = extract_alter_add_column(line)
{
if let Some(t) = self.tables.get_mut(&table) {
if t.columns.insert(col.clone(), ColumnType::Text).is_none() {
changes += 1;
}
} else {
let mut cols = HashMap::new();
cols.insert(col, ColumnType::Text);
self.tables.insert(
table.clone(),
TableSchema {
name: table,
columns: cols,
policies: HashMap::new(),
foreign_keys: vec![],
rls_enabled: false,
},
);
changes += 1;
}
}
if line_upper.starts_with("ALTER TABLE")
&& line_upper.contains(" ADD ")
&& !line_upper.contains("ADD COLUMN")
&& let Some((table, col)) = extract_alter_add(line)
&& let Some(t) = self.tables.get_mut(&table)
&& t.columns.insert(col.clone(), ColumnType::Text).is_none()
{
changes += 1;
}
if line_upper.starts_with("DROP TABLE")
&& let Some(table_name) = extract_drop_table_name(line)
&& self.tables.remove(&table_name).is_some()
{
changes += 1;
}
if line_upper.starts_with("ALTER TABLE")
&& line_upper.contains("DROP COLUMN")
&& let Some((table, col)) = extract_alter_drop_column(line)
&& let Some(t) = self.tables.get_mut(&table)
&& t.columns.remove(&col).is_some()
{
changes += 1;
}
if line_upper.starts_with("ALTER TABLE")
&& line_upper.contains(" DROP ")
&& !line_upper.contains("DROP COLUMN")
&& !line_upper.contains("DROP CONSTRAINT")
&& !line_upper.contains("DROP INDEX")
&& let Some((table, col)) = extract_alter_drop(line)
&& let Some(t) = self.tables.get_mut(&table)
&& t.columns.remove(&col).is_some()
{
changes += 1;
}
}
changes
}
}
fn parse_explicit_alter_add_column_line(
line: &str,
) -> Result<(String, String, ColumnType), String> {
let rest = line
.strip_prefix("alter ")
.ok_or_else(|| "expected 'alter <table> add <column:type[:constraints]>'".to_string())?
.trim();
let mut parts = rest.splitn(2, char::is_whitespace);
let table = parts
.next()
.map(str::trim)
.filter(|table| !table.is_empty())
.ok_or_else(|| "expected table name after 'alter'".to_string())?;
let remainder = parts
.next()
.map(str::trim)
.ok_or_else(|| "expected 'add <column:type[:constraints]>' after table name".to_string())?;
let column_def = remainder
.strip_prefix("add ")
.ok_or_else(|| "expected 'add <column:type[:constraints]>' after table name".to_string())?
.trim();
if column_def.is_empty() {
return Err("expected column definition after 'add'".to_string());
}
let (remaining, column_expr) = parse_column_definition(column_def)
.map_err(|_| format!("invalid column definition '{}'", column_def))?;
if !remaining.trim().is_empty() {
return Err(format!(
"unexpected trailing content after column definition: '{}'",
remaining.trim()
));
}
match column_expr {
Expr::Def {
name, data_type, ..
} => Ok((
table.to_string(),
name,
data_type.parse::<ColumnType>().unwrap_or(ColumnType::Text),
)),
_ => Err("expected column definition after 'add'".to_string()),
}
}
fn extract_view_name(line: &str) -> Option<&str> {
let rest = if let Some(r) = line.strip_prefix("view ") {
r
} else {
line.strip_prefix("materialized view ")?
};
let name = rest.split_whitespace().next().unwrap_or_default().trim();
if name.is_empty() { None } else { Some(name) }
}
fn extract_create_table_name(line: &str) -> Option<String> {
let line_upper = line.to_uppercase();
let rest = line_upper.strip_prefix("CREATE TABLE")?;
let rest = rest.trim_start();
let rest = if rest.starts_with("IF NOT EXISTS") {
rest.strip_prefix("IF NOT EXISTS")?.trim_start()
} else {
rest
};
let name: String = line[line.len() - rest.len()..]
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
if name.is_empty() {
None
} else {
Some(name.to_lowercase())
}
}
fn extract_column_from_create(line: &str) -> Option<String> {
let line = line.trim();
let line_upper = line.to_uppercase();
let starts_with_keyword = |kw: &str| -> bool {
line_upper.starts_with(kw) && line_upper[kw.len()..].starts_with([' ', '('])
};
if starts_with_keyword("CREATE")
|| starts_with_keyword("PRIMARY")
|| starts_with_keyword("FOREIGN")
|| starts_with_keyword("UNIQUE")
|| starts_with_keyword("CHECK")
|| starts_with_keyword("CONSTRAINT")
|| line_upper.starts_with(")")
|| line_upper.starts_with("(")
|| line.is_empty()
{
return None;
}
let name: String = line
.trim_start_matches('(')
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
if name.is_empty() || name.to_uppercase() == "IF" {
None
} else {
Some(name.to_lowercase())
}
}
fn extract_alter_add_column(line: &str) -> Option<(String, String)> {
let line_upper = line.to_uppercase();
let alter_pos = line_upper.find("ALTER TABLE")?;
let add_pos = line_upper.find("ADD COLUMN")?;
let table_part = &line[alter_pos + 11..add_pos];
let table: String = table_part
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
let mut col_part = &line[add_pos + 10..];
let col_upper = col_part.trim().to_uppercase();
if col_upper.starts_with("IF NOT EXISTS") {
col_part = &col_part.trim()[13..]; }
let col: String = col_part
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
if table.is_empty() || col.is_empty() {
None
} else {
Some((table.to_lowercase(), col.to_lowercase()))
}
}
fn extract_alter_add(line: &str) -> Option<(String, String)> {
let line_upper = line.to_uppercase();
let alter_pos = line_upper.find("ALTER TABLE")?;
let add_pos = line_upper.find(" ADD ")?;
let table_part = &line[alter_pos + 11..add_pos];
let table: String = table_part
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
let col_part = &line[add_pos + 5..];
let col: String = col_part
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
if table.is_empty() || col.is_empty() {
None
} else {
Some((table.to_lowercase(), col.to_lowercase()))
}
}
fn extract_drop_table_name(line: &str) -> Option<String> {
let line_upper = line.to_uppercase();
let rest = line_upper.strip_prefix("DROP TABLE")?;
let rest = rest.trim_start();
let rest = if rest.starts_with("IF EXISTS") {
rest.strip_prefix("IF EXISTS")?.trim_start()
} else {
rest
};
let name: String = line[line.len() - rest.len()..]
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
if name.is_empty() {
None
} else {
Some(name.to_lowercase())
}
}
fn extract_alter_drop_column(line: &str) -> Option<(String, String)> {
let line_upper = line.to_uppercase();
let alter_pos = line_upper.find("ALTER TABLE")?;
let drop_pos = line_upper.find("DROP COLUMN")?;
let table_part = &line[alter_pos + 11..drop_pos];
let table: String = table_part
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
let col_part = &line[drop_pos + 11..];
let col: String = col_part
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
if table.is_empty() || col.is_empty() {
None
} else {
Some((table.to_lowercase(), col.to_lowercase()))
}
}
fn extract_alter_drop(line: &str) -> Option<(String, String)> {
let line_upper = line.to_uppercase();
let alter_pos = line_upper.find("ALTER TABLE")?;
let drop_pos = line_upper.find(" DROP ")?;
let table_part = &line[alter_pos + 11..drop_pos];
let table: String = table_part
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
let col_part = &line[drop_pos + 6..];
let col: String = col_part
.trim()
.chars()
.take_while(|c| c.is_alphanumeric() || *c == '_')
.collect();
if table.is_empty() || col.is_empty() {
None
} else {
Some((table.to_lowercase(), col.to_lowercase()))
}
}
impl TableSchema {
pub fn has_column(&self, name: &str) -> bool {
self.columns.contains_key(name)
}
pub fn column_type(&self, name: &str) -> Option<&ColumnType> {
self.columns.get(name)
}
pub fn primary_key_column(&self) -> &str {
if self.columns.contains_key("id") {
"id"
} else {
let singular = self.name.trim_end_matches('s');
let conventional = format!("{}_id", singular);
if self.columns.contains_key(&conventional) {
return "id"; }
"id" }
}
}