use sha2::{Digest, Sha256};
use crate::api::gateway::contracts::{
D1MigrationAppliedStatementResult, D1MigrationConversion, D1MigrationDiagnostic,
D1MigrationPlanEntry, D1MigrationRequest, D1MigrationSourceMeta, D1MigrationSourceRange,
};
use crate::api::query::d1_migration::executor::{
D1MigrationBatch, chunk_statements, plan_batch_results,
};
mod executor;
mod parser;
mod rewrite;
mod warnings;
pub use parser::ParsedSqlStatement;
#[derive(Debug, Clone)]
pub struct RawStatement {
pub index: usize,
pub source_sql: String,
pub source_range: D1MigrationSourceRange,
}
impl From<ParsedSqlStatement> for RawStatement {
fn from(value: ParsedSqlStatement) -> Self {
Self {
index: value.index,
source_sql: value.source_sql,
source_range: value.source_range,
}
}
}
pub fn prepare_source_sql(body: &D1MigrationRequest) -> String {
let schema_sql = body.schema_sql.trim();
if !schema_sql.is_empty() {
return schema_sql.to_string();
}
let mut pieces = Vec::new();
if let Some(files) = &body.files {
pieces.extend(
files
.iter()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty()),
);
}
if let Some(statements) = &body.statements {
pieces.extend(
statements
.iter()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty()),
);
}
pieces.join(";\n")
}
pub fn convert_pg_schema_to_d1(body: &D1MigrationRequest) -> D1MigrationConversion {
let source_sql = prepare_source_sql(body);
if source_sql.trim().is_empty() {
let empty_range = D1MigrationSourceRange {
statement_index: 0,
start_line: 1,
end_line: 1,
start_column: 1,
end_column: 1,
};
let source_error = D1MigrationDiagnostic {
code: "input.empty".to_string(),
message: "No schema SQL provided. Use schemaSql, files, or statements.".to_string(),
statement_index: 0,
source_range: empty_range.clone(),
};
let source_meta = D1MigrationSourceMeta {
statement_count: 0,
fingerprint: String::new(),
estimated_apply_order: Vec::new(),
};
if body.strict {
return D1MigrationConversion {
converted_sql: String::new(),
original_sql: source_sql,
statement_count: 0,
estimated_apply_order: Vec::new(),
statements: Vec::new(),
warnings: Vec::new(),
errors: vec![source_error],
source_meta,
};
}
return D1MigrationConversion {
converted_sql: String::new(),
original_sql: source_sql,
statement_count: 0,
estimated_apply_order: Vec::new(),
statements: Vec::new(),
warnings: vec![D1MigrationDiagnostic {
code: "input.empty".to_string(),
message: "No schema SQL provided; nothing to convert.".to_string(),
statement_index: 0,
source_range: empty_range.clone(),
}],
errors: Vec::new(),
source_meta,
};
}
let parsed = parser::split_sql_statements(&source_sql);
let raw: Vec<RawStatement> = parsed.into_iter().map(RawStatement::from).collect();
let mut plan = rewrite::rewrite_statements(&raw, body.strict);
let mut statements = Vec::new();
let mut apply_order = 1usize;
for entry in plan.entries.drain(..) {
let mut entry = entry;
if entry.target_sql.trim().is_empty() {
if body.strict {
} else if entry.action == crate::api::gateway::contracts::D1MigrationAction::Skipped {
entry.action = crate::api::gateway::contracts::D1MigrationAction::Warning;
}
}
if entry.action == crate::api::gateway::contracts::D1MigrationAction::Converted {
entry.apply_order = apply_order;
apply_order += 1;
}
statements.push(entry);
}
if !body.strict {
let mut downgraded_errors = Vec::new();
for mut error in plan.errors.drain(..) {
error.code = format!("best_effort.{code}", code = error.code);
error.message = format!("best-effort: {}", error.message);
downgraded_errors.push(error);
}
plan.warnings.extend(downgraded_errors);
}
let converted_sql = statements
.iter()
.filter(|entry| {
entry.action == crate::api::gateway::contracts::D1MigrationAction::Converted
})
.map(|entry| entry.target_sql.trim().trim_end_matches(';').to_string())
.filter(|entry| !entry.is_empty())
.collect::<Vec<_>>()
.join(";\n");
let mut digest = Sha256::new();
digest.update(converted_sql.as_bytes());
let fingerprint = format!("{:x}", digest.finalize());
let estimated_apply_order = statements
.iter()
.filter(|entry| {
entry.action == crate::api::gateway::contracts::D1MigrationAction::Converted
})
.map(|entry| entry.statement_index)
.collect::<Vec<_>>();
let source_meta = D1MigrationSourceMeta {
statement_count: raw.len(),
fingerprint,
estimated_apply_order: estimated_apply_order.clone(),
};
D1MigrationConversion {
converted_sql,
original_sql: source_sql.clone(),
statement_count: raw.len(),
estimated_apply_order: estimated_apply_order.clone(),
statements,
warnings: plan.warnings,
errors: if body.strict { plan.errors } else { Vec::new() },
source_meta,
}
}
pub fn build_batches_for_execution(
statements: &[D1MigrationPlanEntry],
batch_size: Option<usize>,
) -> Vec<D1MigrationBatch> {
let size = batch_size.unwrap_or(1).max(1);
chunk_statements(statements, size)
}
pub fn map_execution_results(
statements: &[D1MigrationPlanEntry],
batches: &[D1MigrationBatch],
duration_ms: Option<u64>,
rows_affected: Option<u64>,
) -> Vec<D1MigrationAppliedStatementResult> {
plan_batch_results(statements, batches, "applied", duration_ms, rows_affected)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::gateway::contracts::{D1MigrationDialect, D1MigrationRequest};
fn migration_request(schema_sql: &str) -> D1MigrationRequest {
D1MigrationRequest {
driver: "cloudflare-d1".to_string(),
db_name: "test".to_string(),
schema_sql: schema_sql.to_string(),
dialect: D1MigrationDialect::PostgreSQL,
dry_run: true,
strict: true,
batch_size: None,
files: None,
statements: None,
}
}
#[test]
fn converts_create_table_with_serial_and_uuid() {
let request = migration_request(
"CREATE TABLE users (
id SERIAL PRIMARY KEY,
external_id UUID,
created_at timestamp with time zone
);",
);
let report = convert_pg_schema_to_d1(&request);
assert!(!report.converted_sql.is_empty());
assert!(!report.statements.is_empty());
assert!(report.errors.is_empty());
}
#[test]
fn strict_mode_blocks_extensions() {
let request = migration_request("CREATE EXTENSION IF NOT EXISTS pgcrypto;");
let report = convert_pg_schema_to_d1(&request);
assert_eq!(report.errors.len(), 1);
assert!(report.warnings.is_empty());
}
#[test]
fn non_strict_mode_converts_with_warning() {
let mut request = migration_request("CREATE DOMAIN users AS integer;");
request.strict = false;
let report = convert_pg_schema_to_d1(&request);
assert!(report.errors.is_empty());
assert!(!report.warnings.is_empty());
}
#[test]
fn empty_input_is_strict_error_and_non_strict_warning() {
let strict_empty = migration_request("");
let strict_report = convert_pg_schema_to_d1(&strict_empty);
assert_eq!(strict_report.errors.len(), 1);
assert!(strict_report.warnings.is_empty());
assert_eq!(strict_report.statements.len(), 0);
let mut non_strict_empty = migration_request("");
non_strict_empty.strict = false;
let non_strict_report = convert_pg_schema_to_d1(&non_strict_empty);
assert_eq!(non_strict_report.errors.len(), 0);
assert_eq!(non_strict_report.warnings.len(), 1);
assert_eq!(non_strict_report.statements.len(), 0);
}
#[test]
fn deterministic_order_and_fingerprint_for_repeat_runs() {
let source = "CREATE TABLE users (
id SERIAL PRIMARY KEY,
email TEXT
);
CREATE INDEX users_email_idx ON users (email);
DROP TABLE IF EXISTS old_users;
";
let first = convert_pg_schema_to_d1(&migration_request(source));
let second = convert_pg_schema_to_d1(&migration_request(source));
assert_eq!(first.converted_sql, second.converted_sql);
assert_eq!(first.statement_count, second.statement_count);
assert_eq!(first.estimated_apply_order, second.estimated_apply_order);
assert_eq!(
first.source_meta.fingerprint,
second.source_meta.fingerprint
);
assert_eq!(first.source_meta.estimated_apply_order, vec![0, 1, 2]);
}
#[test]
fn maps_common_pg_types_with_expected_warnings() {
let request = migration_request(
"CREATE TABLE samples (
id BIGSERIAL PRIMARY KEY,
external_id uuid,
meta jsonb,
rate numeric(10,2),
tags text[]
);",
);
let report = convert_pg_schema_to_d1(&request);
assert!(report.errors.is_empty());
assert!(report.converted_sql.contains("integer"));
assert!(report.converted_sql.contains("text"));
assert!(report.converted_sql.contains("real"));
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.uuid")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.json")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.numeric")
);
}
#[test]
fn covers_supported_d1_conversion_matrix_and_strict_failures() {
let request = migration_request(
"CREATE TABLE users (
id SERIAL PRIMARY KEY,
identity_id BIGINT GENERATED ALWAYS AS IDENTITY,
external_id UUID,
payload JSONB,
rate NUMERIC(12,4),
tags TEXT[],
mode CITEXT,
alias INET,
value DATERANGE,
is_active BOOLEAN,
blob_data BYTEA,
created_at TIMESTAMP WITH TIME ZONE,
birthday DATE,
team_id INT,
CONSTRAINT users_team_id_fkey FOREIGN KEY (team_id) REFERENCES teams(id)
);
CREATE UNIQUE INDEX CONCURRENTLY users_external_id_idx ON users(external_id);",
);
let report = convert_pg_schema_to_d1(&request);
assert!(report.errors.is_empty());
assert!(
report
.converted_sql
.contains("id integer PRIMARY KEY AUTOINCREMENT")
);
assert!(
report
.converted_sql
.contains("identity_id integer GENERATED ALWAYS AS IDENTITY AUTOINCREMENT")
);
assert!(report.converted_sql.contains("external_id text"));
assert!(report.converted_sql.contains("payload text"));
assert!(report.converted_sql.contains("rate real"));
assert!(report.converted_sql.contains("tags text"));
assert!(report.converted_sql.contains("mode text"));
assert!(report.converted_sql.contains("alias text"));
assert!(report.converted_sql.contains("value text"));
assert!(report.converted_sql.contains("is_active integer"));
assert!(report.converted_sql.contains("blob_data blob"));
assert!(report.converted_sql.contains("created_at text"));
assert!(report.converted_sql.contains("birthday text"));
assert!(
report.converted_sql.contains(
"CONSTRAINT users_team_id_fkey FOREIGN KEY (team_id) REFERENCES teams(id)"
)
);
assert!(
report
.converted_sql
.contains("CREATE UNIQUE INDEX users_external_id_idx ON users(external_id)")
);
assert!(!report.converted_sql.to_uppercase().contains("CONCURRENTLY"));
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.serial")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "ddl.identity")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.uuid")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.json")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.numeric")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.array")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.custom")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.boolean")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.time")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "ddl.foreign_key")
);
}
#[test]
fn maps_index_syntax_and_identity_behavior() {
let request = migration_request(
"CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
account_id UUID,
created_at TIMESTAMP WITH TIME ZONE
);
CREATE UNIQUE INDEX CONCURRENTLY users_account_id_idx ON users(account_id);",
);
let report = convert_pg_schema_to_d1(&request);
assert!(report.errors.is_empty());
assert!(report.converted_sql.contains("PRIMARY KEY AUTOINCREMENT"));
assert!(
report
.converted_sql
.contains("CREATE UNIQUE INDEX users_account_id_idx ON users(account_id)")
);
assert!(!report.converted_sql.to_uppercase().contains("CONCURRENTLY"));
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.serial")
);
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "ddl.identity")
);
}
#[test]
fn unsupported_type_converts_with_warning_when_not_strict() {
let mut request = migration_request(
"CREATE TABLE docs (
id INT,
payload custom_scalar
);",
);
request.strict = false;
let report = convert_pg_schema_to_d1(&request);
assert!(report.errors.is_empty());
assert!(
report
.warnings
.iter()
.any(|warning| warning.code == "type.unsupported")
);
}
#[test]
fn strict_mode_reports_unsupported_d1_constructs() {
let request = migration_request(
"CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE PUBLICATION d1_pub;
CREATE ROLE d1_role;
CREATE EVENT TRIGGER d1_trigger;
COPY users TO STDOUT;
LISTEN d1_events;
NOTIFY d1_events;
CREATE TABLE users (
id INT
) PARTITION BY HASH (id);",
);
let report = convert_pg_schema_to_d1(&request);
assert!(!report.errors.is_empty());
let error_codes: Vec<&str> = report
.errors
.iter()
.map(|error| error.code.as_str())
.collect();
assert!(
error_codes
.iter()
.any(|code| *code == "unsupported.extension")
);
assert!(
error_codes
.iter()
.any(|code| *code == "unsupported.publication")
);
assert!(error_codes.iter().any(|code| *code == "unsupported.role"));
assert!(error_codes.iter().any(|code| *code == "unsupported.event"));
assert!(error_codes.iter().any(|code| *code == "unsupported.copy"));
assert!(
error_codes
.iter()
.any(|code| *code == "unsupported.notification")
);
assert!(
error_codes
.iter()
.any(|code| *code == "unsupported.partitioning")
);
}
#[test]
fn prepare_source_sql_prefers_schema_sql_over_files_and_statements() {
let request = D1MigrationRequest {
driver: "cloudflare-d1".to_string(),
db_name: "test".to_string(),
schema_sql: "SELECT 1;".to_string(),
dialect: D1MigrationDialect::PostgreSQL,
dry_run: true,
strict: true,
batch_size: None,
files: Some(vec!["CREATE TABLE from_file ();".to_string()]),
statements: Some(vec!["CREATE TABLE from_statement ();".to_string()]),
};
assert_eq!(prepare_source_sql(&request), "SELECT 1;");
}
#[test]
fn prepare_source_sql_falls_back_to_files_and_statements() {
let request = D1MigrationRequest {
driver: "cloudflare-d1".to_string(),
db_name: "test".to_string(),
schema_sql: " ".to_string(),
dialect: D1MigrationDialect::PostgreSQL,
dry_run: true,
strict: true,
batch_size: None,
files: Some(vec!["CREATE TABLE users (id INT)".to_string()]),
statements: Some(vec!["CREATE TABLE orders (id INT)".to_string()]),
};
let merged = prepare_source_sql(&request);
assert_eq!(
merged,
"CREATE TABLE users (id INT);\nCREATE TABLE orders (id INT)"
);
}
#[test]
fn strict_mode_rejects_partitioned_tables() {
let request = migration_request(
"CREATE TABLE users (
id int,
region int
) PARTITION BY HASH (region);",
);
let report = convert_pg_schema_to_d1(&request);
assert_eq!(report.errors.len(), 1);
assert_eq!(report.errors[0].code, "unsupported.partitioning");
assert!(report.statements.iter().any(
|entry| entry.action == crate::api::gateway::contracts::D1MigrationAction::Skipped
));
}
#[test]
fn non_strict_mode_converts_unknown_statements_as_warnings() {
let mut request = migration_request("ALTER TABLE users RENAME TO archived_users;");
request.strict = false;
let report = convert_pg_schema_to_d1(&request);
assert!(report.errors.is_empty());
assert!(!report.warnings.is_empty());
assert!(
report.statements.iter().all(|entry| entry.action
!= crate::api::gateway::contracts::D1MigrationAction::Converted)
);
}
#[test]
fn map_execution_results_tracks_batches_and_statement_indexes() {
use crate::api::gateway::contracts::{
D1MigrationAction, D1MigrationPlanEntry, D1MigrationSourceRange,
};
let source_range = D1MigrationSourceRange {
statement_index: 0,
start_line: 1,
end_line: 1,
start_column: 1,
end_column: 1,
};
let plan = vec![
D1MigrationPlanEntry {
action: D1MigrationAction::Converted,
statement_index: 0,
source_range: source_range.clone(),
source_sql: "CREATE TABLE users (id int);".to_string(),
target_sql: "CREATE TABLE users (id integer);".to_string(),
apply_order: 1,
},
D1MigrationPlanEntry {
action: D1MigrationAction::Warning,
statement_index: 1,
source_range: source_range,
source_sql: "CREATE EXTENSION hstore;".to_string(),
target_sql: String::new(),
apply_order: 0,
},
];
let batches = build_batches_for_execution(&plan, Some(1));
let results = map_execution_results(&plan, &batches, Some(33), Some(1));
assert_eq!(batches.len(), 1);
assert_eq!(results.len(), 1);
assert_eq!(results[0].statement_index, 0);
assert_eq!(results[0].batch_index, 0);
assert_eq!(results[0].rows_affected, Some(1));
assert_eq!(results[0].duration_ms, Some(33));
assert_eq!(results[0].status, "applied");
}
#[test]
fn execute_batching_honors_batch_size_and_skips_non_converted_statements() {
use crate::api::gateway::contracts::{
D1MigrationAction, D1MigrationPlanEntry, D1MigrationSourceRange,
};
let source_range = D1MigrationSourceRange {
statement_index: 0,
start_line: 1,
end_line: 1,
start_column: 1,
end_column: 1,
};
let statements = [
D1MigrationPlanEntry {
action: D1MigrationAction::Converted,
statement_index: 0,
source_range: source_range.clone(),
source_sql: "CREATE TABLE users (id int);".to_string(),
target_sql: "CREATE TABLE users (id integer);".to_string(),
apply_order: 1,
},
D1MigrationPlanEntry {
action: D1MigrationAction::Converted,
statement_index: 1,
source_range: source_range.clone(),
source_sql: "CREATE INDEX ON users (id);".to_string(),
target_sql: "CREATE INDEX ON users (id);".to_string(),
apply_order: 2,
},
D1MigrationPlanEntry {
action: D1MigrationAction::Skipped,
statement_index: 2,
source_range,
source_sql: "CREATE EXTENSION hstore;".to_string(),
target_sql: String::new(),
apply_order: 0,
},
];
let batches = build_batches_for_execution(&statements, Some(1));
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].statement_indexes, vec![0]);
assert_eq!(batches[1].statement_indexes, vec![1]);
assert!(batches[0].sql.ends_with(';'));
assert!(batches[1].sql.ends_with(';'));
assert_eq!(batches[0].sql, "CREATE TABLE users (id integer);");
assert_eq!(batches[1].sql, "CREATE INDEX ON users (id);");
}
}