use crate::colors::*;
use anyhow::{Result, anyhow};
use qail_core::ast::{Action, AggregateFunc, Constraint, Expr, Operator, Qail};
use qail_pg::driver::PgDriver;
use std::path::PathBuf;
use crate::migrations::types::is_safe_cast;
fn parse_count_text(raw: Option<String>, label: &str) -> Result<u64> {
let raw = raw.ok_or_else(|| anyhow!("Missing {label} count"))?;
raw.trim()
.parse::<u64>()
.map_err(|e| anyhow!("Invalid {label} count {:?}: {}", raw, e))
}
fn required_backup_row_string(row: &qail_pg::PgRow, idx: usize, label: &str) -> Result<String> {
row.get_string(idx)
.filter(|value| !value.trim().is_empty())
.ok_or_else(|| anyhow!("Missing backup snapshot {}", label))
}
fn snapshot_json_string(value: &str) -> Result<String> {
serde_json::to_string(value)
.map_err(|e| anyhow!("Failed to serialize backup snapshot value: {}", e))
}
fn snapshot_row_json(row: &qail_pg::PgRow) -> Result<String> {
let mut object = serde_json::Map::new();
for idx in 0..row.columns.len() {
if let Some(text) = row.get_string(idx) {
object.insert(format!("col_{}", idx), serde_json::Value::String(text));
}
}
serde_json::to_string(&serde_json::Value::Object(object))
.map_err(|e| anyhow!("Failed to serialize backup snapshot row: {}", e))
}
fn escape_tsv_field(value: &str) -> String {
value
.replace('\\', "\\\\")
.replace('\t', "\\t")
.replace('\n', "\\n")
.replace('\r', "\\r")
}
fn backup_row_tsv(row: &qail_pg::PgRow, width: usize) -> String {
(0..width)
.map(|idx| {
row.get_string(idx)
.map(|value| escape_tsv_field(&value))
.unwrap_or_default()
})
.collect::<Vec<_>>()
.join("\t")
}
fn snapshot_column_label(row: &qail_pg::PgRow, idx: usize) -> String {
row.get_string(idx)
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "*".to_string())
}
fn snapshot_filename_component(raw: &str) -> String {
let mut safe = String::new();
let mut last_was_separator = false;
for ch in raw.chars() {
let is_safe = ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-' | '.');
if is_safe {
safe.push(ch);
last_was_separator = false;
} else if !last_was_separator {
safe.push('_');
last_was_separator = true;
}
}
let safe = safe.trim_matches(|ch| matches!(ch, '_' | '-' | '.'));
if safe.is_empty() {
"unnamed".to_string()
} else {
safe.to_string()
}
}
#[derive(Debug, Default)]
pub struct MigrationImpact {
pub table: String,
pub operation: String,
pub rows_affected: u64,
pub dropped_columns: Vec<String>,
pub affected_columns: Vec<String>,
pub is_destructive: bool,
}
pub async fn analyze_impact(driver: &mut PgDriver, cmd: &Qail) -> Result<MigrationImpact> {
let mut impact = MigrationImpact {
table: cmd.table.clone(),
operation: format!("{:?}", cmd.action),
..Default::default()
};
match cmd.action {
Action::Drop => {
impact.operation = "DROP TABLE".to_string();
impact.is_destructive = true;
impact.rows_affected = count_table_rows(driver, &cmd.table).await?;
}
Action::AlterDrop => {
impact.operation = "DROP COLUMN".to_string();
impact.is_destructive = true;
for col in &cmd.columns {
if let Expr::Named(name) = col {
impact.dropped_columns.push(name.clone());
impact.affected_columns.push(name.clone());
impact.rows_affected += count_column_values(driver, &cmd.table, name).await?;
}
}
}
Action::AlterType => {
impact.operation = "ALTER TYPE".to_string();
if let Some((column, target_type)) = alter_type_target(cmd)
&& let Some(source_type) = column_data_type(driver, &cmd.table, &column).await?
&& is_narrowing_type_change(&source_type, &target_type)
{
impact.operation =
format!("ALTER TYPE (narrowing {} -> {})", source_type, target_type);
impact.is_destructive = true;
impact.affected_columns.push(column);
impact.rows_affected = count_table_rows(driver, &cmd.table).await?;
}
}
Action::AlterSetNotNull => {
impact.operation = "ALTER SET NOT NULL".to_string();
let table_rows = count_table_rows(driver, &cmd.table).await?;
if table_rows > 0 {
impact.is_destructive = true;
impact.rows_affected = table_rows;
impact.affected_columns = extract_named_columns(&cmd.columns);
}
}
Action::Alter => {
impact.operation = "ALTER TABLE".to_string();
impact.is_destructive = false;
}
Action::Make => {
impact.operation = "CREATE TABLE".to_string();
impact.is_destructive = false;
}
_ => {}
}
Ok(impact)
}
async fn count_table_rows(driver: &mut PgDriver, table: &str) -> Result<u64> {
let cmd = count_table_rows_cmd(table);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to count rows: {}", e))?;
if let Some(row) = rows.first() {
return parse_count_text(row.get_string(0), "table row");
}
Err(anyhow!("Missing table row count result"))
}
fn count_table_rows_cmd(table: &str) -> Qail {
Qail::get(table).column_expr(Expr::Aggregate {
col: "*".to_string(),
func: AggregateFunc::Count,
distinct: false,
filter: None,
alias: None,
})
}
async fn count_column_values(driver: &mut PgDriver, table: &str, column: &str) -> Result<u64> {
let cmd = count_column_values_cmd(table, column);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to count column values: {}", e))?;
if let Some(row) = rows.first() {
return parse_count_text(row.get_string(0), "column value");
}
Err(anyhow!("Missing column value count result"))
}
fn count_column_values_cmd(table: &str, column: &str) -> Qail {
Qail::get(table).column_expr(Expr::Aggregate {
col: column.to_string(),
func: AggregateFunc::Count,
distinct: false,
filter: None,
alias: None,
})
}
fn alter_type_target(cmd: &Qail) -> Option<(String, String)> {
match cmd.columns.first() {
Some(Expr::Def {
name,
data_type,
constraints: _,
}) => Some((name.clone(), normalize_type_for_cast(data_type))),
_ => None,
}
}
fn normalize_type_for_cast(raw: &str) -> String {
match raw.trim().to_ascii_lowercase().as_str() {
"character varying" => "VARCHAR".to_string(),
"character" => "CHAR".to_string(),
"timestamp with time zone" => "TIMESTAMPTZ".to_string(),
"timestamp without time zone" => "TIMESTAMP".to_string(),
"double precision" => "DOUBLE PRECISION".to_string(),
"boolean" => "BOOLEAN".to_string(),
"integer" => "INT".to_string(),
"bigint" => "BIGINT".to_string(),
"smallint" => "SMALLINT".to_string(),
"numeric" => "NUMERIC".to_string(),
"uuid" => "UUID".to_string(),
"text" => "TEXT".to_string(),
"date" => "DATE".to_string(),
"time without time zone" => "TIME".to_string(),
"time with time zone" => "TIMETZ".to_string(),
other => other.to_ascii_uppercase(),
}
}
fn is_narrowing_type_change(source: &str, target: &str) -> bool {
!is_safe_cast(source, target)
}
async fn column_data_type(
driver: &mut PgDriver,
table: &str,
column: &str,
) -> Result<Option<String>> {
let cmd = Qail::get("information_schema.columns")
.column("data_type")
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table.to_string())
.filter("column_name", Operator::Eq, column.to_string())
.limit(1);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to inspect type for {}.{}: {}", table, column, e))?;
Ok(rows
.first()
.and_then(|row| row.get_string(0))
.map(|raw| normalize_type_for_cast(&raw)))
}
pub fn display_impact(impacts: &[MigrationImpact]) {
let destructive: Vec<_> = impacts.iter().filter(|i| i.is_destructive).collect();
if destructive.is_empty() {
println!("{}", "✓ No destructive operations detected".green());
return;
}
println!();
println!("{}", "🚨 Migration Impact Analysis".red().bold());
println!("{}", "━".repeat(40).dimmed());
let mut total_rows = 0u64;
for impact in &destructive {
let op_colored = if impact.operation == "DROP TABLE" {
impact.operation.red().bold()
} else if impact.operation == "DROP COLUMN"
|| impact.operation == "ALTER SET NOT NULL"
|| impact.operation.starts_with("ALTER TYPE (narrowing")
{
impact.operation.yellow().bold()
} else {
Painted {
text: impact.operation.clone(),
prefix: String::new(),
}
};
let display_columns = columns_requiring_snapshot(impact);
if !display_columns.is_empty() {
for col in display_columns {
println!(
" {} {}.{} → {} values at risk",
op_colored,
impact.table.cyan(),
col.yellow(),
impact.rows_affected.to_string().red().bold()
);
}
} else {
println!(
" {} {} → {} rows affected",
op_colored,
impact.table.cyan(),
impact.rows_affected.to_string().red().bold()
);
}
total_rows += impact.rows_affected;
}
println!("{}", "━".repeat(40).dimmed());
println!(
" Total: {} records at risk",
total_rows.to_string().red().bold()
);
println!();
}
fn extract_named_columns(columns: &[Expr]) -> Vec<String> {
columns
.iter()
.filter_map(|expr| match expr {
Expr::Named(name) => Some(name.trim().to_string()),
Expr::Def { name, .. } => Some(name.trim().to_string()),
_ => None,
})
.filter(|name| !name.is_empty())
.collect()
}
fn columns_requiring_snapshot(impact: &MigrationImpact) -> &[String] {
if !impact.dropped_columns.is_empty() {
&impact.dropped_columns
} else {
&impact.affected_columns
}
}
fn snapshot_label(impact: &MigrationImpact, columns: &[String]) -> String {
if columns.is_empty() {
impact.table.clone()
} else {
format!("{}.{}", impact.table, columns.join(","))
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum MigrationChoice {
Proceed,
BackupToFile,
BackupToDatabase,
Cancel,
}
pub fn prompt_migration_choice() -> MigrationChoice {
println!("Choose an option:");
println!(" {} Proceed (I have my own backup)", "[1]".cyan());
println!(" {} Backup to files (_qail_snapshots/)", "[2]".green());
println!(
" {} Backup to database (with rollback support)",
"[3]".green().bold()
);
println!(" {} Cancel migration", "[4]".red());
print!("> ");
use std::io::Write;
std::io::stdout().flush().ok();
let mut input = String::new();
if std::io::stdin().read_line(&mut input).is_ok() {
match input.trim() {
"1" => return MigrationChoice::Proceed,
"2" => return MigrationChoice::BackupToFile,
"3" => return MigrationChoice::BackupToDatabase,
"4" | "" => return MigrationChoice::Cancel,
_ => {}
}
}
MigrationChoice::Cancel
}
fn ensure_snapshot_dir() -> Result<PathBuf> {
let dir = PathBuf::from("_qail_snapshots");
if !dir.exists() {
std::fs::create_dir_all(&dir)?;
}
Ok(dir)
}
pub async fn backup_table(driver: &mut PgDriver, table: &str) -> Result<PathBuf> {
let snapshot_dir = ensure_snapshot_dir()?;
let timestamp = crate::time::timestamp_filename();
let table_name = snapshot_filename_component(table);
let filename = format!("{}_{}.csv", timestamp, table_name);
let path = snapshot_dir.join(&filename);
let cmd = Qail::get(table);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to export table {}: {}", table, e))?;
let mut content = String::new();
for row in rows {
if !row.columns.is_empty() {
content.push_str(&backup_row_tsv(&row, row.columns.len()));
content.push('\n');
}
}
std::fs::write(&path, content)?;
Ok(path)
}
pub async fn backup_columns(
driver: &mut PgDriver,
table: &str,
columns: &[String],
) -> Result<PathBuf> {
let snapshot_dir = ensure_snapshot_dir()?;
let timestamp = crate::time::timestamp_filename();
let table_name = snapshot_filename_component(table);
let col_names = snapshot_filename_component(&columns.join("_"));
let filename = format!("{}_{}_{}.csv", timestamp, table_name, col_names);
let path = snapshot_dir.join(&filename);
let mut cols: Vec<&str> = vec!["id"];
cols.extend(columns.iter().map(|s| s.as_str()));
let cols_len = cols.len();
let cmd = Qail::get(table).columns(cols);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to export columns from {}: {}", table, e))?;
let mut content = String::new();
for row in rows {
if cols_len > 0 {
content.push_str(&backup_row_tsv(&row, cols_len));
content.push('\n');
}
}
std::fs::write(&path, content)?;
Ok(path)
}
pub async fn create_snapshots(
driver: &mut PgDriver,
impacts: &[MigrationImpact],
) -> Result<Vec<PathBuf>> {
let mut paths = Vec::new();
println!();
println!("{}", "📦 Creating snapshots...".cyan().bold());
for impact in impacts {
if !impact.is_destructive {
continue;
}
let snapshot_columns = columns_requiring_snapshot(impact);
let path = if impact.operation == "DROP TABLE" {
backup_table(driver, &impact.table).await?
} else if !snapshot_columns.is_empty() {
backup_columns(driver, &impact.table, snapshot_columns).await?
} else {
return Err(anyhow!(
"No snapshot strategy for destructive operation '{}' on '{}'",
impact.operation,
impact.table
));
};
println!(
" {} {} → {}",
"✓".green(),
snapshot_label(impact, snapshot_columns).cyan(),
path.display().to_string().dimmed()
);
paths.push(path);
}
println!(" {}", "Done".green().bold());
println!();
Ok(paths)
}
pub const DATA_SNAPSHOTS_SCHEMA: &str = r#"
table _qail_data_snapshots (
id serial primary_key,
migration_version varchar(255) not null,
table_name varchar(255) not null,
column_name varchar(255),
row_id text not null,
value_json jsonb not null,
snapshot_type varchar(50) not null,
created_at timestamptz default NOW()
)
"#;
pub fn data_snapshots_ddl() -> String {
use qail_core::parser::schema::Schema;
let Ok(schema) = Schema::parse(DATA_SNAPSHOTS_SCHEMA) else {
return String::new();
};
schema
.tables
.first()
.map(|table| table.to_ddl())
.unwrap_or_default()
}
pub async fn ensure_snapshots_table(driver: &mut PgDriver) -> Result<()> {
let exists_cmd = Qail::get("information_schema.tables")
.column_expr(crate::util::qail_exists_projection())
.where_eq("table_schema", "public")
.where_eq("table_name", "_qail_data_snapshots")
.limit(1);
let exists = driver
.fetch_all(&exists_cmd)
.await
.map_err(|e| anyhow!("Failed to check data snapshots table: {}", e))?;
if exists.is_empty() {
let cmd = Qail {
action: Action::Make,
table: "_qail_data_snapshots".to_string(),
columns: vec![
Expr::Def {
name: "id".to_string(),
data_type: "serial".to_string(),
constraints: vec![Constraint::PrimaryKey],
},
Expr::Def {
name: "migration_version".to_string(),
data_type: "varchar".to_string(),
constraints: vec![],
},
Expr::Def {
name: "table_name".to_string(),
data_type: "varchar".to_string(),
constraints: vec![],
},
Expr::Def {
name: "column_name".to_string(),
data_type: "varchar".to_string(),
constraints: vec![Constraint::Nullable],
},
Expr::Def {
name: "row_id".to_string(),
data_type: "text".to_string(),
constraints: vec![],
},
Expr::Def {
name: "value_json".to_string(),
data_type: "jsonb".to_string(),
constraints: vec![],
},
Expr::Def {
name: "snapshot_type".to_string(),
data_type: "varchar".to_string(),
constraints: vec![],
},
Expr::Def {
name: "created_at".to_string(),
data_type: "timestamptz".to_string(),
constraints: vec![
Constraint::Nullable,
Constraint::Default("now()".to_string()),
],
},
],
..Default::default()
};
driver
.execute(&cmd)
.await
.map_err(|e| anyhow!("Failed to create data snapshots table: {}", e))?;
}
Ok(())
}
#[derive(Debug, Clone, Copy)]
pub enum SnapshotType {
DropTable,
DropColumn,
AlterColumn,
}
impl std::fmt::Display for SnapshotType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SnapshotType::DropTable => write!(f, "DROP_TABLE"),
SnapshotType::DropColumn => write!(f, "DROP_COLUMN"),
SnapshotType::AlterColumn => write!(f, "ALTER_COLUMN"),
}
}
}
pub async fn snapshot_column_to_db(
driver: &mut PgDriver,
migration_version: &str,
table: &str,
column: &str,
) -> Result<u64> {
snapshot_column_to_db_as(
driver,
migration_version,
table,
column,
SnapshotType::DropColumn,
)
.await
}
async fn snapshot_column_to_db_as(
driver: &mut PgDriver,
migration_version: &str,
table: &str,
column: &str,
snapshot_type: SnapshotType,
) -> Result<u64> {
ensure_snapshots_table(driver).await?;
let cmd = Qail::get(table).columns(["id", column]);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to fetch column data: {}", e))?;
let mut saved = 0u64;
for row in rows {
let row_id = required_backup_row_string(&row, 0, "row_id")?;
let value = row.get_string(1);
if let Some(val) = value {
let snapshot_cmd = Qail::add("_qail_data_snapshots")
.columns([
"migration_version",
"table_name",
"column_name",
"row_id",
"value_json",
"snapshot_type",
])
.values([
migration_version.to_string(),
table.to_string(),
column.to_string(),
row_id,
snapshot_json_string(&val)?,
snapshot_type.to_string(),
]);
driver
.execute(&snapshot_cmd)
.await
.map_err(|e| anyhow!("Failed to save snapshot: {}", e))?;
saved += 1;
}
}
Ok(saved)
}
pub async fn snapshot_table_to_db(
driver: &mut PgDriver,
migration_version: &str,
table: &str,
) -> Result<u64> {
ensure_snapshots_table(driver).await?;
let cmd = Qail::get(table);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to fetch table data: {}", e))?;
let mut saved = 0u64;
for (idx, row) in rows.iter().enumerate() {
let row_id = row.get_string(0).unwrap_or_else(|| idx.to_string());
let value_json = snapshot_row_json(row)?;
let snapshot_cmd = Qail::add("_qail_data_snapshots")
.columns([
"migration_version",
"table_name",
"row_id",
"value_json",
"snapshot_type",
])
.values([
migration_version.to_string(),
table.to_string(),
row_id,
value_json,
SnapshotType::DropTable.to_string(),
]);
driver
.execute(&snapshot_cmd)
.await
.map_err(|e| anyhow!("Failed to save table snapshot: {}", e))?;
saved += 1;
}
Ok(saved)
}
pub async fn create_db_snapshots(
driver: &mut PgDriver,
migration_version: &str,
impacts: &[MigrationImpact],
) -> Result<u64> {
let mut total_saved = 0u64;
println!();
println!(
"{}",
"💾 Creating database snapshots (Phase 2)...".cyan().bold()
);
for impact in impacts {
if !impact.is_destructive {
continue;
}
let saved = if impact.operation == "DROP TABLE" {
let count = snapshot_table_to_db(driver, migration_version, &impact.table).await?;
println!(
" {} {} → {} rows saved to _qail_data_snapshots",
"✓".green(),
impact.table.cyan(),
count.to_string().green()
);
count
} else if !columns_requiring_snapshot(impact).is_empty() {
let mut col_saved = 0u64;
let snapshot_type = if impact.dropped_columns.is_empty() {
SnapshotType::AlterColumn
} else {
SnapshotType::DropColumn
};
for col in columns_requiring_snapshot(impact) {
let count = snapshot_column_to_db_as(
driver,
migration_version,
&impact.table,
col,
snapshot_type,
)
.await?;
println!(
" {} {}.{} → {} values saved",
"✓".green(),
impact.table.cyan(),
col.yellow(),
count.to_string().green()
);
col_saved += count;
}
col_saved
} else {
return Err(anyhow!(
"No database snapshot strategy for destructive operation '{}' on '{}'",
impact.operation,
impact.table
));
};
total_saved += saved;
}
println!(
" {} Total: {} records backed up to database",
"✓".green().bold(),
total_saved.to_string().cyan()
);
println!();
Ok(total_saved)
}
pub async fn restore_column_from_db(
driver: &mut PgDriver,
migration_version: &str,
table: &str,
column: &str,
) -> Result<u64> {
use qail_core::ast::Operator;
let query_cmd = Qail::get("_qail_data_snapshots")
.columns(["row_id", "value_json"])
.filter("migration_version", Operator::Eq, migration_version)
.filter("table_name", Operator::Eq, table)
.filter("column_name", Operator::Eq, column);
let rows = driver
.fetch_all(&query_cmd)
.await
.map_err(|e| anyhow!("Failed to query snapshots: {}", e))?;
let mut restored = 0u64;
for row in rows {
let row_id = required_backup_row_string(&row, 0, "row_id")?;
let value_json = required_backup_row_string(&row, 1, "value_json")?;
let value = serde_json::from_str::<String>(&value_json)
.map_err(|e| anyhow!("Invalid backup snapshot value_json: {}", e))?;
let update_cmd = Qail::set(table)
.set_value(column, value)
.where_eq("id", row_id);
if driver.execute(&update_cmd).await.is_ok() {
restored += 1;
}
}
Ok(restored)
}
pub async fn list_snapshots(
driver: &mut PgDriver,
migration_version: Option<&str>,
) -> Result<Vec<(String, String, String, u64)>> {
let cmd = list_snapshots_cmd(migration_version);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to list snapshots: {}", e))?;
let mut results = Vec::new();
for row in rows {
let version = required_backup_row_string(&row, 0, "migration_version")?;
let table = required_backup_row_string(&row, 1, "table_name")?;
let column = snapshot_column_label(&row, 2);
let count = parse_count_text(row.get_string(3), "snapshot")?;
results.push((version, table, column, count));
}
Ok(results)
}
fn list_snapshots_cmd(migration_version: Option<&str>) -> Qail {
let mut cmd = Qail::get("_qail_data_snapshots").columns_expr([
Expr::Named("migration_version".to_string()),
Expr::Named("table_name".to_string()),
Expr::Named("column_name".to_string()),
Expr::Aggregate {
col: "*".to_string(),
func: AggregateFunc::Count,
distinct: false,
filter: None,
alias: None,
},
]);
if let Some(version) = migration_version {
cmd = cmd.filter("migration_version", Operator::Eq, version);
}
cmd.group_by(["migration_version", "table_name", "column_name"])
}
#[cfg(test)]
mod tests {
use super::{
MigrationImpact, backup_row_tsv, columns_requiring_snapshot, count_column_values_cmd,
count_table_rows_cmd, escape_tsv_field, is_narrowing_type_change, list_snapshots_cmd,
normalize_type_for_cast, parse_count_text, required_backup_row_string,
snapshot_column_label, snapshot_filename_component, snapshot_json_string, snapshot_label,
snapshot_row_json,
};
use qail_pg::protocol::AstEncoder;
#[test]
fn normalize_type_for_cast_maps_information_schema_names() {
assert_eq!(normalize_type_for_cast("character varying"), "VARCHAR");
assert_eq!(
normalize_type_for_cast("timestamp with time zone"),
"TIMESTAMPTZ"
);
assert_eq!(normalize_type_for_cast("integer"), "INT");
assert_eq!(normalize_type_for_cast("text"), "TEXT");
}
#[test]
fn narrowing_type_change_detection_uses_cast_safety() {
assert!(is_narrowing_type_change("TEXT", "INT"));
assert!(!is_narrowing_type_change("INT", "BIGINT"));
assert!(!is_narrowing_type_change("INT", "TEXT"));
}
#[test]
fn destructive_alter_column_impacts_require_column_snapshots() {
let impact = MigrationImpact {
table: "users".to_string(),
operation: "ALTER SET NOT NULL".to_string(),
rows_affected: 7,
affected_columns: vec!["email".to_string()],
is_destructive: true,
..Default::default()
};
assert_eq!(columns_requiring_snapshot(&impact), &["email".to_string()]);
assert_eq!(
snapshot_label(&impact, columns_requiring_snapshot(&impact)),
"users.email"
);
}
#[test]
fn count_parsing_fails_closed_on_missing_or_malformed_values() {
assert_eq!(
parse_count_text(Some("42".to_string()), "table row").unwrap(),
42
);
assert!(parse_count_text(None, "table row").is_err());
assert!(parse_count_text(Some("not-a-count".to_string()), "table row").is_err());
assert!(parse_count_text(Some("-1".to_string()), "table row").is_err());
}
#[test]
fn count_queries_use_structured_aggregate_ast() {
let (sql, params) = AstEncoder::encode_cmd_sql(&count_table_rows_cmd("users")).unwrap();
assert_eq!(sql, "SELECT COUNT(*) FROM users");
assert!(params.is_empty());
let (sql, params) =
AstEncoder::encode_cmd_sql(&count_column_values_cmd("users", "email")).unwrap();
assert_eq!(sql, "SELECT COUNT(email) FROM users");
assert!(params.is_empty());
let err =
AstEncoder::encode_cmd_sql(&count_column_values_cmd("users", "email) FROM secrets --"))
.expect_err("unsafe aggregate column must fail closed");
assert!(err.to_string().contains("unsafe identifier"));
}
#[test]
fn list_snapshots_query_uses_structured_count_projection() {
let (sql, params) =
AstEncoder::encode_cmd_sql(&list_snapshots_cmd(Some("20260525120000"))).unwrap();
assert!(sql.contains("SELECT migration_version, table_name, column_name, COUNT(*)"));
assert!(sql.contains("FROM _qail_data_snapshots"));
assert!(sql.contains("GROUP BY migration_version, table_name, column_name"));
assert_eq!(params, vec![Some(b"20260525120000".to_vec())]);
}
#[test]
fn backup_snapshot_metadata_fails_closed() {
let missing = qail_pg::PgRow {
columns: vec![None],
column_info: None,
};
assert!(required_backup_row_string(&missing, 0, "row_id").is_err());
let empty = qail_pg::PgRow {
columns: vec![Some(b"".to_vec())],
column_info: None,
};
assert!(required_backup_row_string(&empty, 0, "row_id").is_err());
}
#[test]
fn file_snapshot_names_sanitize_path_components() {
assert_eq!(snapshot_filename_component("users"), "users");
assert_eq!(snapshot_filename_component("public.users"), "public.users");
assert_eq!(
snapshot_filename_component("../../tenant/users\nemail"),
"tenant_users_email"
);
assert_eq!(snapshot_filename_component("..."), "unnamed");
}
#[test]
fn table_snapshot_column_label_defaults_to_star() {
let row = qail_pg::PgRow {
columns: vec![None],
column_info: None,
};
assert_eq!(snapshot_column_label(&row, 0), "*");
}
#[test]
fn backup_snapshot_json_escapes_full_string_content() {
let raw = "quote \" slash \\ newline \n tab \t";
let encoded = snapshot_json_string(raw).unwrap();
let decoded: String = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn file_backup_tsv_preserves_width_and_escapes_control_chars() {
let row = qail_pg::PgRow {
columns: vec![
Some(b"id-1".to_vec()),
None,
Some(b"tab\tnewline\nslash\\".to_vec()),
Some(b"tail".to_vec()),
],
column_info: None,
};
assert_eq!(
backup_row_tsv(&row, 4),
"id-1\t\ttab\\tnewline\\nslash\\\\\ttail"
);
assert_eq!(escape_tsv_field("a\rb"), "a\\rb");
}
#[test]
fn backup_table_snapshot_serializes_all_columns_as_json() {
let columns = (0..25)
.map(|idx| Some(format!("v{}\\", idx).into_bytes()))
.collect();
let row = qail_pg::PgRow {
columns,
column_info: None,
};
let encoded = snapshot_row_json(&row).unwrap();
let decoded: serde_json::Value = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded["col_0"], "v0\\");
assert_eq!(decoded["col_24"], "v24\\");
}
}