use std::path::{Path, PathBuf};
use clap::Subcommand;
use crate::cli::fmt;
use crate::cli::GlobalOpts;
use crate::connection::DatabaseClient;
use crate::error::{Result, SurqlError};
use crate::migration::{
create_blank_migration, create_migration_plan, discover_migrations, execute_migration_plan,
get_migration_history, get_migration_status, migrate_down as lib_migrate_down,
squash_migrations, validate_migrations, MigrationDirection, MigrationPlan, SquashOptions,
};
#[derive(Debug, Subcommand)]
pub enum MigrateCommand {
Up {
#[arg(long, value_name = "VERSION")]
target: Option<String>,
#[arg(long)]
dry_run: bool,
},
Down {
#[arg(long, value_name = "VERSION")]
target: Option<String>,
#[arg(long)]
dry_run: bool,
},
Status,
History,
Create {
description: String,
#[arg(long, value_name = "PATH")]
schema_dir: Option<PathBuf>,
},
Validate {
version: Option<String>,
},
Generate {
#[arg(long, value_name = "VERSION")]
from: Option<String>,
#[arg(long, value_name = "VERSION")]
to: Option<String>,
},
Squash {
from: String,
to: String,
#[arg(long, short = 'o', value_name = "PATH")]
output: Option<PathBuf>,
#[arg(long)]
dry_run: bool,
},
}
pub async fn run(cmd: MigrateCommand, global: &GlobalOpts) -> Result<()> {
let settings = global.settings()?;
let migrations_dir = settings.migration_path.clone();
match cmd {
MigrateCommand::Up { target, dry_run } => {
up(&settings, &migrations_dir, target.as_deref(), dry_run).await
}
MigrateCommand::Down { target, dry_run } => {
down(&settings, &migrations_dir, target.as_deref(), dry_run).await
}
MigrateCommand::Status => status(&settings, &migrations_dir).await,
MigrateCommand::History => history(&settings).await,
MigrateCommand::Create {
description,
schema_dir,
} => {
let dir = schema_dir.unwrap_or(migrations_dir);
create(&description, &dir)
}
MigrateCommand::Validate { version } => validate(&migrations_dir, version.as_deref()).await,
MigrateCommand::Generate { from, to } => {
generate(&migrations_dir, from.as_deref(), to.as_deref())
}
MigrateCommand::Squash {
from,
to,
output,
dry_run,
} => squash(&migrations_dir, &from, &to, output.as_deref(), dry_run),
}
}
async fn connected_client(settings: &crate::settings::Settings) -> Result<DatabaseClient> {
let client = DatabaseClient::new(settings.database().clone())?;
client.connect().await?;
Ok(client)
}
async fn up(
settings: &crate::settings::Settings,
migrations_dir: &Path,
target: Option<&str>,
dry_run: bool,
) -> Result<()> {
let client = connected_client(settings).await?;
let plan = create_migration_plan(&client, migrations_dir).await?;
let selected = select_up_range(plan, target)?;
if selected.migrations.is_empty() {
fmt::info("no pending migrations");
return Ok(());
}
if dry_run {
fmt::info(format!(
"dry-run: {} migration(s) would be applied",
selected.migrations.len()
));
for m in &selected.migrations {
fmt::info(format!(" - {} {}", m.version, m.description));
}
return Ok(());
}
let statuses = execute_migration_plan(&client, selected).await?;
render_statuses(&statuses);
Ok(())
}
async fn down(
settings: &crate::settings::Settings,
migrations_dir: &Path,
target: Option<&str>,
dry_run: bool,
) -> Result<()> {
let client = connected_client(settings).await?;
let steps = resolve_down_steps(&client, migrations_dir, target).await?;
if steps == 0 {
fmt::info("nothing to roll back");
return Ok(());
}
if dry_run {
fmt::info(format!(
"dry-run: {steps} migration(s) would be rolled back"
));
return Ok(());
}
let statuses = lib_migrate_down(&client, migrations_dir, steps).await?;
render_statuses(&statuses);
Ok(())
}
async fn status(settings: &crate::settings::Settings, migrations_dir: &Path) -> Result<()> {
let client = connected_client(settings).await?;
let report = get_migration_status(&client, migrations_dir).await?;
fmt::info(format!(
"total: {} applied: {} pending: {}",
report.total,
report.applied_count(),
report.pending_count()
));
let mut table = fmt::make_table();
table.set_header(vec!["version", "state", "description"]);
for s in &report.applied {
table.add_row(vec![
s.migration.version.clone(),
"applied".to_string(),
s.migration.description.clone(),
]);
}
for s in &report.pending {
table.add_row(vec![
s.migration.version.clone(),
"pending".to_string(),
s.migration.description.clone(),
]);
}
println!("{table}");
Ok(())
}
async fn history(settings: &crate::settings::Settings) -> Result<()> {
let client = connected_client(settings).await?;
let rows = get_migration_history(&client).await?;
if rows.is_empty() {
fmt::info("no migration history");
return Ok(());
}
let mut table = fmt::make_table();
table.set_header(vec!["version", "applied_at", "description", "checksum"]);
for row in &rows {
table.add_row(vec![
row.version.clone(),
row.applied_at.to_rfc3339(),
row.description.clone(),
row.checksum.clone(),
]);
}
println!("{table}");
Ok(())
}
fn create(description: &str, directory: &Path) -> Result<()> {
std::fs::create_dir_all(directory)?;
let migration = create_blank_migration(description, description, directory)?;
fmt::success(format!(
"created {} at {}",
migration.version,
migration.path.display()
));
Ok(())
}
async fn validate(migrations_dir: &Path, version: Option<&str>) -> Result<()> {
let errors = validate_migrations(migrations_dir).await?;
let filtered: Vec<&String> = match version {
Some(v) => errors.iter().filter(|e| e.contains(v)).collect(),
None => errors.iter().collect(),
};
if filtered.is_empty() {
fmt::success("migrations are consistent");
return Ok(());
}
for err in &filtered {
fmt::error(err);
}
Err(SurqlError::MigrationDiscovery {
reason: format!("{} validation error(s)", filtered.len()),
})
}
fn generate(migrations_dir: &Path, from: Option<&str>, to: Option<&str>) -> Result<()> {
let all = discover_migrations(migrations_dir)?;
let filtered: Vec<_> = crate::migration::filter_migrations_by_version(&all, from, to);
if filtered.is_empty() {
fmt::warn("no migrations in requested range");
return Ok(());
}
let mut table = fmt::make_table();
table.set_header(vec!["version", "description", "up_stmts", "down_stmts"]);
for m in &filtered {
table.add_row(vec![
m.version.clone(),
m.description.clone(),
format!("{}", m.up.len()),
format!("{}", m.down.len()),
]);
}
println!("{table}");
fmt::info(format!(
"{} migration(s) would be combined (use `surql migrate squash` to persist)",
filtered.len()
));
Ok(())
}
fn squash(
migrations_dir: &Path,
from: &str,
to: &str,
output: Option<&Path>,
dry_run: bool,
) -> Result<()> {
let mut opts = SquashOptions::new().from_version(from).to_version(to);
if dry_run {
opts = opts.dry_run(true);
}
if let Some(path) = output {
opts = opts.output_path(path);
}
let result = squash_migrations(migrations_dir, &opts)?;
fmt::success(format!(
"squashed {} migration(s) into {} (statements: {}, optimisations: {})",
result.original_count,
result.squashed_path.display(),
result.statement_count,
result.optimizations_applied
));
Ok(())
}
fn select_up_range(plan: MigrationPlan, target: Option<&str>) -> Result<MigrationPlan> {
let Some(target) = target else {
return Ok(plan);
};
let mut selected = Vec::new();
let mut found = false;
for m in plan.migrations {
selected.push(m.clone());
if m.version == target {
found = true;
break;
}
}
if !found {
return Err(SurqlError::Validation {
reason: format!("target version {target:?} not found among pending migrations"),
});
}
Ok(MigrationPlan {
migrations: selected,
direction: MigrationDirection::Up,
})
}
async fn resolve_down_steps(
client: &DatabaseClient,
migrations_dir: &Path,
target: Option<&str>,
) -> Result<u32> {
let applied = get_migration_history(client).await?;
if applied.is_empty() {
return Ok(0);
}
let _on_disk = discover_migrations(migrations_dir)?;
let Some(target) = target else {
return Ok(1);
};
let mut steps: u32 = 0;
let mut seen_target = false;
for row in applied.iter().rev() {
steps = steps.saturating_add(1);
if row.version == target {
seen_target = true;
break;
}
}
if !seen_target {
return Err(SurqlError::Validation {
reason: format!("target version {target:?} not found in history"),
});
}
Ok(steps)
}
fn render_statuses(statuses: &[crate::migration::MigrationStatus]) {
let mut table = fmt::make_table();
table.set_header(vec!["version", "state", "error"]);
for s in statuses {
table.add_row(vec![
s.migration.version.clone(),
format!("{:?}", s.state),
s.error.clone().unwrap_or_default(),
]);
}
println!("{table}");
}