use crate::api::gateway::contracts::{
D1MigrationAction, D1MigrationDiagnostic, D1MigrationPlanEntry,
};
use crate::api::query::d1_migration::parser::{split_top_level_csv, statement_keyword_prefix};
use crate::api::query::d1_migration::{RawStatement, warnings::add_warning};
#[derive(Debug)]
pub struct RewritePlan {
pub entries: Vec<D1MigrationPlanEntry>,
pub warnings: Vec<D1MigrationDiagnostic>,
pub errors: Vec<D1MigrationDiagnostic>,
}
pub fn rewrite_statements(raw: &[RawStatement], strict: bool) -> RewritePlan {
let mut entries = Vec::new();
let mut warnings = Vec::new();
let mut errors = Vec::new();
for statement in raw {
let statement_sql = statement.source_sql.trim();
if statement_sql.is_empty() {
continue;
}
let prefixes = statement_keyword_prefix(statement_sql);
let first = prefixes.first().map(|value| value.as_str()).unwrap_or("");
let second = prefixes.get(1).map(|value| value.as_str()).unwrap_or("");
match first {
"create" => match second {
"table" => match rewrite_create_table(statement, strict, &mut warnings) {
Ok(entry) => entries.push(entry),
Err(mut error) => {
if strict {
errors.append(&mut error);
entries.push(create_plan_entry(
statement,
D1MigrationAction::Skipped,
"",
));
} else {
warnings.push(add_warning(
statement,
"unsupported.create_table",
"CREATE TABLE could not be fully converted; best-effort mode keeps this statement as warning.",
));
entries.push(create_plan_entry(
statement,
D1MigrationAction::Warning,
"",
));
}
}
},
"index" | "unique" => {
entries.push(rewrite_create_index(statement));
}
"extension" | "publication" | "rule" | "trigger" | "type" | "sequence"
| "schema" | "event" | "policy" | "function" | "procedure" | "view" | "role"
| "domain" => {
push_unsupported(
&mut errors,
statement,
strict,
&mut warnings,
format!("unsupported.{second}"),
format!("PostgreSQL {second} statements are not supported by D1"),
);
entries.push(create_plan_entry(statement, D1MigrationAction::Skipped, ""));
}
_ => {
push_unsupported(
&mut errors,
statement,
strict,
&mut warnings,
"unsupported.create".to_string(),
"Unsupported CREATE variant".to_string(),
);
entries.push(create_plan_entry(statement, D1MigrationAction::Skipped, ""));
}
},
"alter" => {
rewrite_alter_table(statement, strict, &mut entries, &mut warnings, &mut errors);
}
"drop" | "truncate" => {
entries.push(create_plan_entry(
statement,
D1MigrationAction::Converted,
&statement_sql,
));
}
"copy" => {
push_unsupported(
&mut errors,
statement,
strict,
&mut warnings,
"unsupported.copy".to_string(),
"COPY is not supported by D1 migrations".to_string(),
);
entries.push(create_plan_entry(statement, D1MigrationAction::Skipped, ""));
}
"listen" | "notify" => {
push_unsupported(
&mut errors,
statement,
strict,
&mut warnings,
"unsupported.notification".to_string(),
"LISTEN/NOTIFY is not supported by D1 migrations".to_string(),
);
entries.push(create_plan_entry(statement, D1MigrationAction::Skipped, ""));
}
_ => {
if strict {
errors.push(D1MigrationDiagnostic {
code: "unsupported.statement".to_string(),
message: "Unsupported statement type for D1 migration conversion"
.to_string(),
statement_index: statement.index,
source_range: statement.source_range.clone(),
});
entries.push(create_plan_entry(statement, D1MigrationAction::Skipped, ""));
} else {
warnings.push(add_warning(
statement,
"unsupported.statement",
"Statement was skipped in non-strict mode.",
));
entries.push(create_plan_entry(statement, D1MigrationAction::Warning, ""));
}
}
}
}
RewritePlan {
entries,
warnings,
errors,
}
}
fn rewrite_create_table(
statement: &RawStatement,
strict: bool,
warnings: &mut Vec<D1MigrationDiagnostic>,
) -> Result<D1MigrationPlanEntry, Vec<D1MigrationDiagnostic>> {
let statement_sql = statement.source_sql.trim().to_string();
let mut errors = Vec::new();
let lower = statement_sql.to_ascii_lowercase();
if lower.contains("partition by") || lower.contains("partitioned by") {
let diagnostic = make_error(
statement,
"unsupported.partitioning",
"Table partitioning syntax is not supported by D1",
);
if strict {
return Err(vec![diagnostic]);
}
warnings.push(add_warning(
statement,
"unsupported.partitioning",
"Partitioning syntax was skipped in non-strict mode",
));
return Ok(create_plan_entry(statement, D1MigrationAction::Warning, ""));
}
let open_index = statement_sql.find('(').ok_or_else(|| {
vec![make_error(
statement,
"invalid.create_table",
"Missing opening parenthesis",
)]
})?;
let close_index = statement_sql[open_index..]
.rfind(')')
.map(|value| open_index + value)
.ok_or_else(|| {
vec![make_error(
statement,
"invalid.create_table",
"Missing closing parenthesis",
)]
})?;
let header = statement_sql[..open_index].trim();
let body = statement_sql[open_index + 1..close_index].trim();
let columns = split_top_level_csv(body);
let header_tokens = header
.split_whitespace()
.filter(|token| !token.is_empty())
.collect::<Vec<_>>();
let table_name = parse_create_table_name(&header_tokens).ok_or_else(|| {
vec![make_error(
statement,
"invalid.create_table",
"Unable to parse CREATE TABLE target",
)]
})?;
let raw_table_name = table_name.as_str();
if table_name_position_has_schema_name(raw_table_name) {
if strict {
return Err(vec![make_error(
statement,
"ddl.schema_name",
"Schema-qualified table names are not safely represented in D1",
)]);
}
warnings.push(add_warning(
statement,
"ddl.schema_name",
"Schema-qualified names were ignored in non-strict mode",
));
}
if body.is_empty() {
return Err(vec![make_error(
statement,
"invalid.create_table",
"Empty CREATE TABLE body",
)]);
}
let normalized_table = normalize_identifier(raw_table_name);
let mut converted_columns = Vec::new();
let mut has_primary_key = false;
let mut has_timestamp = false;
let mut has_foreign_key = false;
for column in columns {
let column = column.trim();
if column.is_empty() {
continue;
}
let normalized = column.to_ascii_lowercase();
if normalized.starts_with("constraint ")
|| normalized.starts_with("primary key ")
|| normalized.starts_with("unique ")
|| normalized.starts_with("check ")
|| normalized.starts_with("foreign key ")
{
if normalized.starts_with("constraint") {
converted_columns.push(column.to_string());
continue;
}
if normalized.starts_with("primary key ") {
has_primary_key = true;
}
if normalized.starts_with("foreign key ") {
has_foreign_key = true;
warnings.push(add_warning(
statement,
"ddl.foreign_key",
"Foreign key constraints are validated loosely on D1",
));
}
converted_columns.push(column.to_string());
continue;
}
match rewrite_column_definition(column, strict, statement, &mut *warnings) {
Ok(column_sql) => {
if column_sql.to_ascii_lowercase().contains("timestamp") {
has_timestamp = true;
}
if column_sql.to_ascii_lowercase().contains("primary key") {
has_primary_key = true;
}
converted_columns.push(column_sql);
}
Err(mut issue) => {
errors.append(&mut issue);
if strict {
return Err(errors);
}
warnings.push(add_warning(
statement,
"unsupported.create_table",
"A column definition was downgraded in non-strict mode.",
));
converted_columns.push(format!("-- skipped: {}", column));
}
}
}
if has_foreign_key {
warnings.push(add_warning(
statement,
"ddl.foreign_key",
"D1 supports foreign key syntax but does not enforce all foreign key constraints.",
));
}
if has_timestamp {
warnings.push(add_warning(
statement,
"ddl.timestamp",
"Temporal semantics are mapped to text to preserve timezone-aware values in D1.",
));
}
if !has_primary_key {
warnings.push(add_warning(
statement,
"ddl.primary_key",
"No explicit PRIMARY KEY found; D1 table will be created without primary-key constraint.",
));
}
if converted_columns.is_empty() {
return Err(vec![make_error(
statement,
"invalid.create_table",
"No processable table columns found",
)]);
}
Ok(create_plan_entry(
statement,
D1MigrationAction::Converted,
&format!(
"CREATE TABLE {} ({});",
normalized_table,
converted_columns.join(", ")
),
))
}
fn rewrite_create_index(statement: &RawStatement) -> D1MigrationPlanEntry {
let normalized = statement
.source_sql
.replace(" CONCURRENTLY ", " ")
.replace(" concurrently ", " ")
.replace("\n", " ");
create_plan_entry(statement, D1MigrationAction::Converted, normalized.trim())
}
fn parse_create_table_name(tokens: &[&str]) -> Option<String> {
if tokens.len() < 2 || !tokens.first()?.eq_ignore_ascii_case(&"create") {
return None;
}
let mut index = 1usize;
while index < tokens.len() {
if tokens[index].eq_ignore_ascii_case("unlogged")
|| tokens[index].eq_ignore_ascii_case("temporary")
|| tokens[index].eq_ignore_ascii_case("temp")
|| tokens[index].eq_ignore_ascii_case("if")
|| tokens[index].eq_ignore_ascii_case("not")
|| tokens[index].eq_ignore_ascii_case("exists")
{
index += 1;
continue;
}
if tokens[index].eq_ignore_ascii_case("table") {
index += 1;
continue;
}
if tokens[index].is_empty() {
index += 1;
continue;
}
break;
}
if index >= tokens.len() {
return None;
}
Some(tokens[index].to_string())
}
fn table_name_position_has_schema_name(table_name: &str) -> bool {
table_name.contains('.')
}
fn rewrite_alter_table(
statement: &RawStatement,
strict: bool,
entries: &mut Vec<D1MigrationPlanEntry>,
warnings: &mut Vec<D1MigrationDiagnostic>,
errors: &mut Vec<D1MigrationDiagnostic>,
) {
let statement_sql = statement.source_sql.trim();
let lower = statement_sql.to_ascii_lowercase();
let table_clause = lower.trim_start_matches("alter table").trim();
if table_clause.is_empty() {
push_unsupported(
errors,
statement,
strict,
warnings,
"unsupported.alter_table".to_string(),
"Empty ALTER TABLE statement".to_string(),
);
entries.push(create_plan_entry(statement, D1MigrationAction::Skipped, ""));
return;
}
if lower.contains("add column") {
let table_name = table_clause.split_whitespace().next().unwrap_or("unknown");
if let Some(add_index) = lower.find("add column") {
let add_sql = &statement.source_sql[add_index + "add column".len()..];
match rewrite_column_definition(add_sql, strict, statement, warnings) {
Ok(column_sql) => {
entries.push(create_plan_entry(
statement,
D1MigrationAction::Converted,
&format!(
"ALTER TABLE {} ADD COLUMN {}",
sanitize_identifier(table_name),
column_sql
),
));
return;
}
Err(_) if strict => {
errors.push(make_error(
statement,
"unsupported.alter_column",
"Could not parse ALTER TABLE ADD COLUMN",
));
}
Err(_) => {
warnings.push(add_warning(
statement,
"unsupported.alter_column",
"Could not parse ALTER TABLE ADD COLUMN; skipped in non-strict mode.",
));
}
}
}
}
if lower.contains("drop column") || lower.contains("alter column") {
push_unsupported(
errors,
statement,
strict,
warnings,
"unsupported.alter_table".to_string(),
"ALTER TABLE column mutation has limited support in D1".to_string(),
);
entries.push(create_plan_entry(
statement,
if strict {
D1MigrationAction::Skipped
} else {
D1MigrationAction::Warning
},
"",
));
return;
}
if !strict {
warnings.push(add_warning(
statement,
"unsupported.alter_table",
"ALTER TABLE variant is unsupported and skipped in non-strict mode.",
));
}
entries.push(create_plan_entry(
statement,
if strict {
D1MigrationAction::Skipped
} else {
D1MigrationAction::Warning
},
&statement_sql,
));
}
fn rewrite_column_definition(
column_definition: &str,
strict: bool,
statement: &RawStatement,
warnings: &mut Vec<D1MigrationDiagnostic>,
) -> Result<String, Vec<D1MigrationDiagnostic>> {
let definition = column_definition.trim();
let mut tokens = definition.split_whitespace().collect::<Vec<_>>();
if tokens.len() < 2 {
return Err(vec![make_error(
statement,
"invalid.column",
"Invalid column definition",
)]);
}
let name = sanitize_identifier(tokens[0]);
tokens.remove(0);
let (pg_type, remainder_start) = extract_type_and_constraints(&tokens);
let is_serial = is_serial_type(&pg_type);
let constraint_sql = tokens[remainder_start..].join(" ");
let mut mapped_type = map_pg_type_to_d1(statement, &pg_type, strict, warnings)?;
let mut constraints = constraint_sql;
if is_identity_or_serial_syntax(&constraints) || is_serial {
if !constraints.to_ascii_lowercase().contains("autoincrement") {
warnings.push(add_warning(
statement,
"ddl.identity",
"IDENTITY/SERIAL-style columns are mapped to INTEGER AUTOINCREMENT in D1.",
));
}
mapped_type = "integer".to_string();
if !constraints.to_ascii_lowercase().contains("autoincrement") {
constraints = format!("{constraints} AUTOINCREMENT");
}
}
if constraints.to_ascii_lowercase().contains("serial") {
constraints = constraints.replace("serial", "").trim().to_string();
}
if constraints.to_ascii_lowercase().contains("now()") {
warnings.push(add_warning(
statement,
"ddl.timestamp_default",
"NOW() default was mapped to CURRENT_TIMESTAMP",
));
constraints = constraints.replace("now()", "CURRENT_TIMESTAMP");
}
let combined = format!("{name} {mapped_type} {constraints}")
.replace(" ", " ")
.trim()
.to_string();
if combined.trim().is_empty() {
return Err(vec![make_error(
statement,
"invalid.column",
"Failed to rebuild column definition",
)]);
}
Ok(combined)
}
fn extract_type_and_constraints(tokens: &[&str]) -> (String, usize) {
if tokens.is_empty() {
return ("text".to_string(), 1);
}
if is_double_precision(tokens) {
return ("double precision".to_string(), 2);
}
let lower_first = first_token_without_trailing_separator(tokens[0]).to_ascii_lowercase();
if lower_first == "timestamp" && tokens.len() >= 3 {
let lower_second = tokens[1].to_ascii_lowercase();
let lower_third = tokens[2].to_ascii_lowercase();
if lower_second == "with" && lower_third == "time" {
return ("timestamp with time".to_string(), 3);
}
}
if lower_first == "time" && tokens.len() >= 3 {
let lower_second = tokens[1].to_ascii_lowercase();
let lower_third = tokens[2].to_ascii_lowercase();
if lower_second == "with" && lower_third == "time" {
return ("time".to_string(), 3);
}
}
if lower_first.starts_with("character") && tokens.len() >= 2 && tokens[1].starts_with("varying")
{
return (format!("{} {}", tokens[0], tokens[1]), 2);
}
if lower_first.ends_with(")") && !lower_first.contains('(') {
return (tokens[0].to_string(), 1);
}
if lower_first.ends_with(')')
|| lower_first.starts_with("numeric")
|| lower_first.starts_with("decimal")
|| lower_first.starts_with("interval")
{
return (
first_token_without_trailing_separator(tokens[0]).to_string(),
1,
);
}
if lower_first.contains('(')
&& !tokens.len().wrapping_sub(1).eq(&0)
&& !tokens[1].ends_with(')')
{
let mut end = 2usize;
for token in &tokens[1..] {
if token.ends_with(')') {
break;
}
end += 1;
}
return (tokens[..end].join(" "), end);
}
(
first_token_without_trailing_separator(tokens[0]).to_string(),
1,
)
}
fn first_token_without_trailing_separator(token: &str) -> &str {
token.trim_end_matches(|value| matches!(value, ',' | ';'))
}
fn is_double_precision(tokens: &[&str]) -> bool {
tokens.len() >= 2
&& tokens[0].eq_ignore_ascii_case("double")
&& tokens[1].eq_ignore_ascii_case("precision")
}
fn is_identity_or_serial_syntax(constraints: &str) -> bool {
let lower = constraints.to_ascii_lowercase();
lower.contains("generated always as identity") || lower.contains("identity")
}
fn is_serial_type(type_name: &str) -> bool {
matches!(
type_name.trim().to_ascii_lowercase().as_str(),
"serial" | "smallserial" | "bigserial"
)
}
fn map_pg_type_to_d1(
statement: &RawStatement,
input: &str,
strict: bool,
warnings: &mut Vec<D1MigrationDiagnostic>,
) -> Result<String, Vec<D1MigrationDiagnostic>> {
let lower = input.to_ascii_lowercase();
if is_serial_type(&lower) {
warnings.push(add_warning(
statement,
"type.serial",
"SERIAL-style types are stored as INTEGER",
));
return Ok("integer".to_string());
}
if matches!(lower.as_str(), "integer" | "int" | "bigint" | "smallint") {
return Ok("integer".to_string());
}
if lower == "uuid" {
warnings.push(add_warning(statement, "type.uuid", "UUID mapped to TEXT"));
return Ok("text".to_string());
}
if lower == "jsonb" || lower == "json" {
warnings.push(add_warning(
statement,
"type.json",
"JSON mapped to TEXT for D1 compatibility",
));
return Ok("text".to_string());
}
if lower == "bytea" || lower == "blob" {
return Ok("blob".to_string());
}
if lower.starts_with("numeric") || lower.starts_with("decimal") {
warnings.push(add_warning(
statement,
"type.numeric",
"NUMERIC mapped to REAL",
));
return Ok("real".to_string());
}
if lower == "boolean" || lower == "bool" {
warnings.push(add_warning(
statement,
"type.boolean",
"BOOLEAN mapped to INTEGER",
));
return Ok("integer".to_string());
}
if lower == "text"
|| lower.starts_with("character varying")
|| lower.starts_with("varchar")
|| lower.starts_with("character")
{
return Ok("text".to_string());
}
if lower == "timestamp" || lower == "date" || lower.starts_with("timestamp") || lower == "time"
{
warnings.push(add_warning(
statement,
"type.time",
"Temporal PG type mapped to TEXT",
));
return Ok("text".to_string());
}
if lower.ends_with("[]") || lower.starts_with('_') || lower.starts_with("array") {
warnings.push(add_warning(
statement,
"type.array",
"Array type mapped to TEXT",
));
return Ok("text".to_string());
}
if lower == "enum"
|| lower == "inet"
|| lower == "citext"
|| lower == "daterange"
|| lower == "uuid[]"
{
warnings.push(add_warning(
statement,
"type.custom",
"Domain/enum-like type mapped to TEXT",
));
return Ok("text".to_string());
}
if strict {
Err(vec![make_error(
statement,
"type.unsupported",
&format!("Unsupported PostgreSQL type '{input}'"),
)])
} else {
warnings.push(add_warning(
statement,
"type.unsupported",
&format!("Unsupported PostgreSQL type '{input}' mapped to TEXT"),
));
Ok("text".to_string())
}
}
fn sanitize_identifier(value: &str) -> String {
value.trim().trim_matches('"').to_string()
}
fn normalize_identifier(value: &str) -> String {
let value = sanitize_identifier(value);
value
.split('.')
.next_back()
.map(|name| sanitize_identifier(name))
.unwrap_or_else(|| value.clone())
}
fn create_plan_entry(
statement: &RawStatement,
action: D1MigrationAction,
target_sql: &str,
) -> D1MigrationPlanEntry {
D1MigrationPlanEntry {
action,
statement_index: statement.index,
source_range: statement.source_range.clone(),
source_sql: statement.source_sql.trim().to_string(),
target_sql: target_sql.to_string(),
apply_order: 0,
}
}
fn push_unsupported(
errors: &mut Vec<D1MigrationDiagnostic>,
statement: &RawStatement,
strict: bool,
warnings: &mut Vec<D1MigrationDiagnostic>,
code: String,
message: String,
) {
if strict {
errors.push(D1MigrationDiagnostic {
code,
message,
statement_index: statement.index,
source_range: statement.source_range.clone(),
});
} else {
warnings.push(D1MigrationDiagnostic {
code,
message,
statement_index: statement.index,
source_range: statement.source_range.clone(),
});
}
}
fn make_error(statement: &RawStatement, code: &str, message: &str) -> D1MigrationDiagnostic {
D1MigrationDiagnostic {
code: code.to_string(),
message: message.to_string(),
statement_index: statement.index,
source_range: statement.source_range.clone(),
}
}