use crate::colors::*;
use anyhow::{Result, anyhow};
use qail_core::ast::{Action, Constraint, Expr, Qail};
use qail_pg::driver::PgDriver;
use crate::util::parse_pg_url;
#[derive(Debug, Clone)]
pub struct ShadowState {
pub primary_url: String,
pub shadow_name: String,
pub shadow_url: String,
pub is_ready: bool,
pub tables_synced: u64,
pub rows_synced: u64,
}
impl ShadowState {
pub fn new(primary_url: &str) -> Result<Self> {
let (host, port, user, password, database) = parse_pg_url(primary_url)?;
let shadow_name = format!("{}_shadow", database);
let shadow_url = if let Some(pwd) = &password {
format!(
"postgres://{}:{}@{}:{}/{}",
user, pwd, host, port, shadow_name
)
} else {
format!("postgres://{}@{}:{}/{}", user, host, port, shadow_name)
};
Ok(Self {
primary_url: primary_url.to_string(),
shadow_name,
shadow_url,
is_ready: false,
tables_synced: 0,
rows_synced: 0,
})
}
}
async fn ensure_shadow_state_table(driver: &mut PgDriver) -> Result<()> {
let exists_cmd = Qail::get("information_schema.tables")
.column("1")
.where_eq("table_schema", "public")
.where_eq("table_name", "_qail_shadow_state")
.limit(1);
let exists = driver
.fetch_all(&exists_cmd)
.await
.map_err(|e| anyhow!("Failed to check shadow state table: {}", e))?;
if exists.is_empty() {
let create_cmd = Qail {
action: Action::Make,
table: "_qail_shadow_state".to_string(),
columns: vec![
Expr::Def {
name: "id".to_string(),
data_type: "serial".to_string(),
constraints: vec![Constraint::PrimaryKey],
},
Expr::Def {
name: "shadow_name".to_string(),
data_type: "text".to_string(),
constraints: vec![],
},
Expr::Def {
name: "primary_url".to_string(),
data_type: "text".to_string(),
constraints: vec![],
},
Expr::Def {
name: "diff_cmds".to_string(),
data_type: "text".to_string(),
constraints: vec![],
},
Expr::Def {
name: "diff_checksum".to_string(),
data_type: "text".to_string(),
constraints: vec![Constraint::Nullable],
},
Expr::Def {
name: "old_schema_path".to_string(),
data_type: "text".to_string(),
constraints: vec![Constraint::Nullable],
},
Expr::Def {
name: "new_schema_path".to_string(),
data_type: "text".to_string(),
constraints: vec![Constraint::Nullable],
},
Expr::Def {
name: "created_at".to_string(),
data_type: "timestamptz".to_string(),
constraints: vec![
Constraint::Nullable,
Constraint::Default("now()".to_string()),
],
},
Expr::Def {
name: "status".to_string(),
data_type: "text".to_string(),
constraints: vec![
Constraint::Nullable,
Constraint::Default("'pending'".to_string()),
],
},
],
..Default::default()
};
driver
.execute(&create_cmd)
.await
.map_err(|e| anyhow!("Failed to create shadow state table: {}", e))?;
}
Ok(())
}
pub fn diff_cmds_checksum(diff_cmds: &[Qail]) -> String {
crate::migrations::stable_cmds_checksum(diff_cmds)
}
async fn save_shadow_state(
driver: &mut PgDriver,
state: &ShadowState,
diff_cmds: &[Qail],
old_path: &str,
new_path: &str,
) -> Result<()> {
ensure_shadow_state_table(driver).await?;
let diff_json = qail_core::wire::encode_cmds_text(diff_cmds);
let diff_checksum = diff_cmds_checksum(diff_cmds);
let clear_cmd = Qail::del("_qail_shadow_state").in_vals("status", ["pending", "verified"]);
let _ = driver.execute(&clear_cmd).await;
let insert_cmd = Qail::add("_qail_shadow_state")
.set_value("shadow_name", state.shadow_name.as_str())
.set_value("primary_url", state.primary_url.as_str())
.set_value("diff_cmds", diff_json)
.set_value("diff_checksum", diff_checksum)
.set_value("old_schema_path", old_path)
.set_value("new_schema_path", new_path)
.set_value("status", "verified");
driver
.execute(&insert_cmd)
.await
.map_err(|e| anyhow!("Failed to save shadow state: {}", e))?;
Ok(())
}
async fn load_shadow_state(driver: &mut PgDriver) -> Result<Option<(ShadowState, Vec<Qail>)>> {
ensure_shadow_state_table(driver).await?;
let cmd_verified = Qail::get("_qail_shadow_state")
.columns(["shadow_name", "primary_url", "diff_cmds"])
.filter("status", qail_core::ast::Operator::Eq, "verified")
.limit(1);
let mut rows = driver
.fetch_all(&cmd_verified)
.await
.map_err(|e| anyhow!("Failed to load shadow state: {}", e))?;
if rows.is_empty() {
let cmd_pending = Qail::get("_qail_shadow_state")
.columns(["shadow_name", "primary_url", "diff_cmds"])
.filter("status", qail_core::ast::Operator::Eq, "pending")
.limit(1);
rows = driver
.fetch_all(&cmd_pending)
.await
.map_err(|e| anyhow!("Failed to load shadow state: {}", e))?;
}
if rows.is_empty() {
return Ok(None);
}
let row = &rows[0];
let shadow_name = row
.get_string(0)
.ok_or_else(|| anyhow!("Missing shadow_name"))?;
let primary_url = row
.get_string(1)
.ok_or_else(|| anyhow!("Missing primary_url"))?;
let diff_json = row
.get_string(2)
.ok_or_else(|| anyhow!("Missing diff_cmds"))?;
let diff_cmds = qail_core::wire::decode_cmds_text(&diff_json)
.map_err(|e| anyhow!("Failed to decode diff commands: {}", e))?;
let state = ShadowState {
primary_url,
shadow_name,
shadow_url: String::new(), is_ready: true,
tables_synced: 0,
rows_synced: 0,
};
Ok(Some((state, diff_cmds)))
}
async fn update_shadow_state_status(driver: &mut PgDriver, new_status: &str) -> Result<()> {
let sql = Qail::set("_qail_shadow_state")
.set_value("status", new_status)
.in_vals("status", ["pending", "verified"]);
driver
.execute(&sql)
.await
.map_err(|e| anyhow!("Failed to update shadow state: {}", e))?;
Ok(())
}
pub async fn has_verified_shadow_receipt_with_driver(
driver: &mut PgDriver,
expected_checksum: &str,
) -> Result<bool> {
ensure_shadow_state_table(driver).await?;
for status in ["verified", "pending"] {
let cmd = Qail::get("_qail_shadow_state")
.columns(["diff_cmds", "diff_checksum"])
.filter("status", qail_core::ast::Operator::Eq, status)
.limit(5);
let rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to query shadow receipts: {}", e))?;
for row in rows {
if let Some(stored_checksum) = row.get_string(1)
&& stored_checksum == expected_checksum
{
return Ok(true);
}
if let Some(diff_json) = row.get_string(0)
&& let Ok(diff_cmds) = qail_core::wire::decode_cmds_text(&diff_json)
&& diff_cmds_checksum(&diff_cmds) == expected_checksum
{
return Ok(true);
}
}
}
Ok(false)
}
use qail_core::migrate::{Column, ColumnType, Index, IndexMethod, Schema, Table};
pub async fn introspect_schema(driver: &mut PgDriver) -> Result<Schema> {
use qail_core::ast::Operator;
let mut schema = Schema::default();
let tables_cmd = Qail::get("information_schema.tables")
.column("table_name")
.filter("table_schema", Operator::Eq, "public")
.filter("table_type", Operator::Eq, "BASE TABLE");
let table_rows = driver
.fetch_all(&tables_cmd)
.await
.map_err(|e| anyhow!("Failed to query tables: {}", e))?;
let table_names: Vec<String> = table_rows
.iter()
.filter_map(|r| r.get_string(0))
.filter(|t| !t.starts_with("_qail")) .collect();
for table_name in &table_names {
let cols_cmd = Qail::get("information_schema.columns")
.columns([
"column_name",
"data_type",
"is_nullable",
"column_default",
"is_identity",
])
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table_name.clone());
let col_rows = driver
.fetch_all(&cols_cmd)
.await
.map_err(|e| anyhow!("Failed to query columns for {}: {}", table_name, e))?;
let mut columns = Vec::new();
let mut pk_already_set = false;
for row in &col_rows {
let col_name = row.get_string(0).unwrap_or_default();
let data_type_str = row.get_string(1).unwrap_or_default();
let is_nullable = row.get_string(2).map(|s| s == "YES").unwrap_or(true);
let raw_default = row.get_string(3);
let is_identity = row.get_string(4).map(|s| s == "YES").unwrap_or(false);
let data_type = parse_column_type(&data_type_str);
let default = match &raw_default {
Some(d) if d.starts_with("nextval(") => None,
_ if is_identity => None, other => other.clone(),
};
let is_pk = if !pk_already_set {
let pk_check = is_primary_key(driver, table_name, &col_name).await?;
if pk_check {
pk_already_set = true;
}
pk_check
} else {
false
};
let is_unique = is_unique_column(driver, table_name, &col_name).await?;
columns.push(Column {
name: col_name,
data_type,
nullable: is_nullable,
primary_key: is_pk,
unique: is_unique,
default,
foreign_key: None, check: None,
generated: None,
});
}
schema.tables.insert(
table_name.clone(),
Table {
name: table_name.clone(),
columns,
multi_column_fks: vec![],
enable_rls: false,
force_rls: false,
},
);
}
let idx_cmd = Qail::get("pg_indexes")
.columns(["indexname", "tablename", "indexdef"])
.filter("schemaname", Operator::Eq, "public");
let idx_rows = driver
.fetch_all(&idx_cmd)
.await
.map_err(|e| anyhow!("Failed to query indexes: {}", e))?;
for row in &idx_rows {
let idx_name = row.get_string(0).unwrap_or_default();
let table_name = row.get_string(1).unwrap_or_default();
let indexdef = row.get_string(2).unwrap_or_default();
if idx_name.ends_with("_pkey") {
continue;
}
if idx_name.ends_with("_key") {
continue;
}
let cols = extract_index_columns(&indexdef);
let is_unique = indexdef.contains("UNIQUE");
schema.indexes.push(Index {
name: idx_name,
table: table_name,
columns: cols,
unique: is_unique,
method: IndexMethod::BTree,
where_clause: None,
include: vec![],
concurrently: false,
expressions: vec![],
});
}
let fk_ref_cmd = Qail::get("information_schema.referential_constraints")
.columns([
"constraint_name",
"unique_constraint_name",
"delete_rule",
"update_rule",
])
.filter("constraint_schema", Operator::Eq, "public");
let fk_ref_rows = driver
.fetch_all(&fk_ref_cmd)
.await
.map_err(|e| anyhow!("Failed to query FK refs: {}", e))?;
let mut fk_map: std::collections::HashMap<
String,
(
String,
qail_core::migrate::schema::FkAction,
qail_core::migrate::schema::FkAction,
),
> = std::collections::HashMap::new();
for row in fk_ref_rows {
let fk_name = row.text(0);
let ref_name = row.text(1);
let on_delete = match row.text(2).as_str() {
"CASCADE" => qail_core::migrate::schema::FkAction::Cascade,
"SET NULL" => qail_core::migrate::schema::FkAction::SetNull,
"SET DEFAULT" => qail_core::migrate::schema::FkAction::SetDefault,
"RESTRICT" => qail_core::migrate::schema::FkAction::Restrict,
_ => qail_core::migrate::schema::FkAction::NoAction,
};
let on_update = match row.text(3).as_str() {
"CASCADE" => qail_core::migrate::schema::FkAction::Cascade,
"SET NULL" => qail_core::migrate::schema::FkAction::SetNull,
"SET DEFAULT" => qail_core::migrate::schema::FkAction::SetDefault,
"RESTRICT" => qail_core::migrate::schema::FkAction::Restrict,
_ => qail_core::migrate::schema::FkAction::NoAction,
};
fk_map.insert(fk_name, (ref_name, on_delete, on_update));
}
let kcu_cmd = Qail::get("information_schema.key_column_usage")
.columns(["table_name", "column_name", "constraint_name"])
.filter("table_schema", Operator::Eq, "public");
let kcu_rows = driver
.fetch_all(&kcu_cmd)
.await
.map_err(|e| anyhow!("Failed to query key columns: {}", e))?;
let mut constraint_cols: std::collections::HashMap<String, Vec<(String, String)>> =
std::collections::HashMap::new();
for row in &kcu_rows {
let table = row.text(0);
let column = row.text(1);
let constraint = row.text(2);
constraint_cols
.entry(constraint)
.or_default()
.push((table, column));
}
for (fk_name, (ref_name, on_delete, on_update)) in &fk_map {
let fk_cols = constraint_cols.get(fk_name.as_str());
let ref_cols = constraint_cols.get(ref_name.as_str());
if let (Some(fk_list), Some(ref_list)) = (fk_cols, ref_cols)
&& fk_list.len() == 1
&& ref_list.len() == 1
{
let (fk_table, fk_col) = &fk_list[0];
let (ref_table, ref_col) = &ref_list[0];
if let Some(table) = schema.tables.get_mut(fk_table.as_str()) {
for col in table.columns.iter_mut() {
if col.name == *fk_col {
col.foreign_key = Some(qail_core::migrate::ForeignKey {
table: ref_table.clone(),
column: ref_col.clone(),
on_delete: on_delete.clone(),
on_update: on_update.clone(),
deferrable: qail_core::migrate::schema::Deferrable::NotDeferrable,
});
}
}
}
}
}
let rls_cmd = Qail::get("pg_catalog.pg_class")
.columns(["relname", "relrowsecurity", "relforcerowsecurity"])
.filter("relkind", Operator::Eq, "r");
let rls_rows = driver
.fetch_all(&rls_cmd)
.await
.map_err(|e| anyhow!("Failed to query RLS: {}", e))?;
for row in rls_rows {
let tbl_name = row.text(0);
let enable = row.text(1) == "t";
let force = row.text(2) == "t";
if (enable || force)
&& let Some(table) = schema.tables.get_mut(&tbl_name)
{
table.enable_rls = enable;
table.force_rls = force;
}
}
Ok(schema)
}
fn parse_column_type(s: &str) -> ColumnType {
match s.to_lowercase().as_str() {
"integer" | "int" | "int4" => ColumnType::Int,
"bigint" | "int8" => ColumnType::BigInt,
"smallint" | "int2" => ColumnType::Int, "text" => ColumnType::Text,
"character varying" | "varchar" => ColumnType::Varchar(None),
"boolean" | "bool" => ColumnType::Bool,
"timestamp without time zone" | "timestamp" => ColumnType::Timestamp,
"timestamp with time zone" | "timestamptz" => ColumnType::Timestamptz,
"date" => ColumnType::Date,
"time" => ColumnType::Time,
"uuid" => ColumnType::Uuid,
"jsonb" | "json" => ColumnType::Jsonb,
"real" | "float4" | "double precision" | "float8" => ColumnType::Float,
"numeric" | "decimal" => ColumnType::Decimal(None),
"bytea" => ColumnType::Bytea,
_ => ColumnType::Text, }
}
async fn is_primary_key(driver: &mut PgDriver, table: &str, column: &str) -> Result<bool> {
use qail_core::ast::Operator;
let cmd = Qail::get("information_schema.table_constraints")
.columns(["constraint_name"])
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table)
.filter("constraint_type", Operator::Eq, "PRIMARY KEY")
.limit(1);
let tc_rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to query PK constraints: {}", e))?;
if tc_rows.is_empty() {
return Ok(false);
}
let constraint_name = tc_rows[0].get_string(0).unwrap_or_default();
let kcu_cmd = Qail::get("information_schema.key_column_usage")
.column("column_name")
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table)
.filter("constraint_name", Operator::Eq, constraint_name.clone())
.filter("column_name", Operator::Eq, column);
let kcu_rows = driver
.fetch_all(&kcu_cmd)
.await
.map_err(|e| anyhow!("Failed to query PK columns: {}", e))?;
Ok(!kcu_rows.is_empty())
}
async fn is_unique_column(driver: &mut PgDriver, table: &str, column: &str) -> Result<bool> {
use qail_core::ast::Operator;
let cmd = Qail::get("information_schema.table_constraints")
.columns(["constraint_name"])
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table)
.filter("constraint_type", Operator::Eq, "UNIQUE");
let tc_rows = driver
.fetch_all(&cmd)
.await
.map_err(|e| anyhow!("Failed to query UNIQUE constraints: {}", e))?;
if tc_rows.is_empty() {
return Ok(false);
}
for row in &tc_rows {
let constraint_name = row.get_string(0).unwrap_or_default();
let kcu_cmd = Qail::get("information_schema.key_column_usage")
.column("column_name")
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table)
.filter("constraint_name", Operator::Eq, constraint_name)
.filter("column_name", Operator::Eq, column);
let kcu_rows = driver
.fetch_all(&kcu_cmd)
.await
.map_err(|e| anyhow!("Failed to query UNIQUE columns: {}", e))?;
if !kcu_rows.is_empty() {
return Ok(true);
}
}
Ok(false)
}
fn extract_index_columns(indexdef: &str) -> Vec<String> {
if let Some(start) = indexdef.find('(')
&& let Some(end) = indexdef.rfind(')')
{
let cols_str = &indexdef[start + 1..end];
return cols_str.split(',').map(|s| s.trim().to_string()).collect();
}
vec![]
}
pub async fn create_shadow_database(primary_url: &str) -> Result<ShadowState> {
println!();
println!("{}", "🔄 Shadow Migration Mode".cyan().bold());
println!("{}", "━".repeat(40).dimmed());
let state = ShadowState::new(primary_url)?;
println!(
" {} Creating shadow database: {}",
"[1/4]".cyan(),
state.shadow_name.yellow()
);
let (host, port, user, password, _database) = parse_pg_url(primary_url)?;
let mut admin_driver = if let Some(pwd) = password.clone() {
PgDriver::connect_with_password(&host, port, &user, "postgres", &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to postgres: {}", e))?
} else {
PgDriver::connect(&host, port, &user, "postgres")
.await
.map_err(|e| anyhow!("Failed to connect to postgres: {}", e))?
};
let check_cmd = Qail::get("pg_database")
.column("datname")
.where_eq("datname", state.shadow_name.clone());
let existing = admin_driver
.fetch_all(&check_cmd)
.await
.map_err(|e| anyhow!("Failed to check existing database: {}", e))?;
if !existing.is_empty() {
println!(" {} Shadow database already exists", "⚠".yellow());
} else {
let create_db = Qail::create_database(state.shadow_name.clone());
admin_driver
.execute(&create_db)
.await
.map_err(|e| anyhow!("Failed to create shadow database: {}", e))?;
println!(" {} Created", "✓".green());
}
Ok(state)
}
pub async fn apply_migrations_to_shadow(state: &mut ShadowState, cmds: &[Qail]) -> Result<()> {
println!(" {} Applying migration to shadow...", "[2/4]".cyan());
let (host, port, user, password, _) = parse_pg_url(&state.primary_url)?;
let mut shadow_driver = if let Some(pwd) = password {
PgDriver::connect_with_password(&host, port, &user, &state.shadow_name, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to shadow: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &state.shadow_name)
.await
.map_err(|e| anyhow!("Failed to connect to shadow: {}", e))?
};
for (i, cmd) in cmds.iter().enumerate() {
shadow_driver
.execute(cmd)
.await
.map_err(|e| anyhow!("Migration {} failed on shadow: {}", i + 1, e))?;
}
println!(" {} {} migrations applied", "✓".green(), cmds.len());
Ok(())
}
pub async fn sync_data_to_shadow(state: &mut ShadowState) -> Result<()> {
println!(
" {} Syncing data from primary to shadow...",
"[3/4]".cyan()
);
let (host, port, user, password, database) = parse_pg_url(&state.primary_url)?;
let mut primary_driver = if let Some(pwd) = password.clone() {
PgDriver::connect_with_password(&host, port, &user, &database, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &database)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
};
let mut shadow_driver = if let Some(pwd) = password {
PgDriver::connect_with_password(&host, port, &user, &state.shadow_name, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to shadow: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &state.shadow_name)
.await
.map_err(|e| anyhow!("Failed to connect to shadow: {}", e))?
};
use qail_core::ast::Operator;
let tables_cmd = Qail::get("information_schema.tables")
.column("table_name")
.filter("table_schema", Operator::Eq, "public")
.filter("table_type", Operator::Eq, "BASE TABLE");
let table_rows = shadow_driver
.fetch_all(&tables_cmd)
.await
.map_err(|e| anyhow!("Failed to list shadow tables: {}", e))?;
let tables: Vec<String> = table_rows
.iter()
.filter_map(|r| r.get_string(0))
.filter(|t| !t.starts_with("_qail")) .collect();
state.tables_synced = tables.len() as u64;
for table in &tables {
let cols_cmd = Qail::get("information_schema.columns")
.column("column_name")
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table.clone());
let col_rows = shadow_driver
.fetch_all(&cols_cmd)
.await
.map_err(|e| anyhow!("Failed to get columns for {}: {}", table, e))?;
let shadow_columns: Vec<String> = col_rows.iter().filter_map(|r| r.get_string(0)).collect();
if shadow_columns.is_empty() {
continue;
}
let check_cmd = Qail::get("information_schema.tables")
.column("table_name")
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table.clone());
let exists = primary_driver
.fetch_all(&check_cmd)
.await
.map_err(|e| anyhow!("Failed to check table {} in primary: {}", table, e))?;
if exists.is_empty() {
println!(" {} {} (new table, no data)", "⊕".blue(), table.cyan());
continue;
}
let primary_cols_cmd = Qail::get("information_schema.columns")
.column("column_name")
.filter("table_schema", Operator::Eq, "public")
.filter("table_name", Operator::Eq, table.clone());
let primary_col_rows = primary_driver
.fetch_all(&primary_cols_cmd)
.await
.map_err(|e| anyhow!("Failed to get primary columns for {}: {}", table, e))?;
let primary_columns: std::collections::HashSet<String> = primary_col_rows
.iter()
.filter_map(|r| r.get_string(0))
.collect();
let columns: Vec<String> = shadow_columns
.into_iter()
.filter(|c| primary_columns.contains(c))
.collect();
if columns.is_empty() {
println!(" {} {} (no common columns)", "⊕".blue(), table.cyan());
continue;
}
let copy_data = primary_driver
.copy_export_table(table, &columns)
.await
.map_err(|e| anyhow!("Failed to export {}: {}", table, e))?;
let row_count = copy_data.iter().filter(|&&b| b == b'\n').count();
if !copy_data.is_empty() {
let mut add_cmd = Qail::add(table);
for col in &columns {
add_cmd = add_cmd.column(col);
}
shadow_driver
.copy_bulk_bytes(&add_cmd, ©_data)
.await
.map_err(|e| anyhow!("Failed to import {}: {}", table, e))?;
}
state.rows_synced += row_count as u64;
println!(" {} {} ({} rows)", "✓".green(), table.cyan(), row_count);
}
println!(
" {} Synced {} tables, {} rows",
"✓".green().bold(),
state.tables_synced,
state.rows_synced
);
Ok(())
}
pub fn display_shadow_status(state: &ShadowState) {
println!(" {} Shadow ready for validation", "[4/4]".cyan());
println!();
println!("{}", "━".repeat(40).dimmed());
println!(" Shadow URL: {}", state.shadow_url.yellow());
println!(
" Tables: {}, Rows: {}",
state.tables_synced.to_string().cyan(),
state.rows_synced.to_string().cyan()
);
println!();
println!(" {}", "Available Commands:".bold());
println!(
" {} → Run tests against shadow",
"qail shadow test".green()
);
println!(
" {} → Switch traffic to shadow",
"qail shadow promote".green().bold()
);
println!(
" {} → Drop shadow, keep primary",
"qail shadow abort".red()
);
println!();
}
pub async fn promote_shadow(primary_url: &str) -> Result<()> {
let state = ShadowState::new(primary_url)?;
println!();
println!("{}", "🚀 Promoting Shadow to Primary".green().bold());
println!("{}", "━".repeat(40).dimmed());
let (host, port, user, password, database) = parse_pg_url(primary_url)?;
let mut primary_driver = if let Some(pwd) = password.clone() {
PgDriver::connect_with_password(&host, port, &user, &database, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &database)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
};
println!(" [1/4] Loading migration state...");
let state_option = load_shadow_state(&mut primary_driver).await?;
let (_, diff_cmds) = state_option.ok_or_else(|| {
anyhow!("No pending shadow migration found. Run 'qail migrate shadow' first.")
})?;
println!(
" {} {} migration commands loaded",
"✓".green(),
diff_cmds.len()
);
println!();
println!(
" {} Changes on primary since shadow sync may cause failure.",
"⚠️".yellow()
);
println!();
println!(" [2/4] Applying migration to primary...");
primary_driver
.begin()
.await
.map_err(|e| anyhow!("Failed to begin transaction: {}", e))?;
let mut migration_failed = false;
let mut failure_reason = String::new();
for (i, cmd) in diff_cmds.iter().enumerate() {
if let Err(e) = primary_driver.execute(cmd).await {
migration_failed = true;
failure_reason = format!("Migration {} failed: {} (cmd: {:?})", i + 1, e, cmd.action);
break;
}
}
if migration_failed {
primary_driver
.rollback()
.await
.map_err(|e| anyhow!("Failed to rollback: {}", e))?;
println!(
" {} Transaction rolled back - primary unchanged!",
"↩️".yellow()
);
return Err(anyhow!(failure_reason));
}
primary_driver
.commit()
.await
.map_err(|e| anyhow!("Failed to commit: {}", e))?;
println!(
" {} {} migrations applied to primary",
"✓".green(),
diff_cmds.len()
);
println!(" [3/4] Dropping shadow database...");
let mut admin_driver = if let Some(pwd) = password {
PgDriver::connect_with_password(&host, port, &user, "postgres", &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to postgres: {}", e))?
} else {
PgDriver::connect(&host, port, &user, "postgres")
.await
.map_err(|e| anyhow!("Failed to connect to postgres: {}", e))?
};
let drop_db = Qail::drop_database(state.shadow_name.clone());
admin_driver
.execute(&drop_db)
.await
.map_err(|e| anyhow!("Failed to drop shadow: {}", e))?;
println!(" {} Shadow database dropped", "✓".green());
println!(" [4/4] Updating migration status...");
update_shadow_state_status(&mut primary_driver, "promoted").await?;
println!(" {} Status: promoted", "✓".green());
println!();
println!("{}", "✓ Shadow promoted successfully!".green().bold());
println!(" Migration applied to: {}", database.cyan());
println!(" Shadow {} dropped", state.shadow_name.dimmed());
Ok(())
}
pub async fn abort_shadow(primary_url: &str) -> Result<()> {
let state = ShadowState::new(primary_url)?;
println!();
println!("{}", "🛑 Aborting Shadow Migration".red().bold());
println!("{}", "━".repeat(40).dimmed());
let (host, port, user, password, database) = parse_pg_url(primary_url)?;
let mut admin_driver = if let Some(pwd) = password.clone() {
PgDriver::connect_with_password(&host, port, &user, "postgres", &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to postgres: {}", e))?
} else {
PgDriver::connect(&host, port, &user, "postgres")
.await
.map_err(|e| anyhow!("Failed to connect to postgres: {}", e))?
};
println!(" Dropping shadow database: {}", state.shadow_name.yellow());
let drop_db = Qail::drop_database(state.shadow_name.clone());
admin_driver
.execute(&drop_db)
.await
.map_err(|e| anyhow!("Failed to drop shadow: {}", e))?;
let mut primary_driver = if let Some(pwd) = password {
PgDriver::connect_with_password(&host, port, &user, &database, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &database)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
};
let _ = update_shadow_state_status(&mut primary_driver, "aborted").await;
println!(
"{}",
"✓ Shadow database dropped. Primary unchanged.".green()
);
Ok(())
}
pub async fn run_shadow_migration(
primary_url: &str,
old_cmds: &[Qail],
diff_cmds: &[Qail],
old_path: &str,
new_path: &str,
) -> Result<ShadowState> {
let mut state = create_shadow_database(primary_url).await?;
apply_base_schema_to_shadow(&mut state, old_cmds).await?;
apply_migrations_to_shadow(&mut state, diff_cmds).await?;
sync_data_to_shadow(&mut state).await?;
let (host, port, user, password, database) = parse_pg_url(primary_url)?;
let mut primary_driver = if let Some(pwd) = password {
PgDriver::connect_with_password(&host, port, &user, &database, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &database)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
};
save_shadow_state(&mut primary_driver, &state, diff_cmds, old_path, new_path).await?;
state.is_ready = true;
display_shadow_status(&state);
Ok(state)
}
pub async fn run_shadow_migration_live(
primary_url: &str,
new_schema_path: &str,
) -> Result<ShadowState> {
use qail_core::migrate::{diff_schemas_checked, parse_qail_file, schema_to_commands};
println!();
println!(
"{}",
"🔄 Shadow Migration Mode (Live Introspection)"
.cyan()
.bold()
);
println!("{}", "━".repeat(40).dimmed());
println!(" {} Introspecting live database schema...", "[0/4]".cyan());
let (host, port, user, password, database) = parse_pg_url(primary_url)?;
let mut primary_driver = if let Some(pwd) = password.clone() {
PgDriver::connect_with_password(&host, port, &user, &database, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &database)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
};
let live_schema = introspect_schema(&mut primary_driver).await?;
println!(
" {} {} tables, {} indexes introspected",
"✓".green(),
live_schema.tables.len(),
live_schema.indexes.len()
);
let new_schema = parse_qail_file(new_schema_path)
.map_err(|e| anyhow!("Failed to parse new schema: {}", e))?;
let old_cmds = schema_to_commands(&live_schema);
let diff_cmds = diff_schemas_checked(&live_schema, &new_schema).map_err(|e| {
anyhow!(
"State-based diff unsupported for live shadow migration '{}': {}",
new_schema_path,
e
)
})?;
println!(
" {} {} migration commands generated",
"✓".green(),
diff_cmds.len()
);
let mut state = create_shadow_database(primary_url).await?;
apply_base_schema_to_shadow(&mut state, &old_cmds).await?;
apply_migrations_to_shadow(&mut state, &diff_cmds).await?;
sync_data_to_shadow(&mut state).await?;
let mut primary_reconnect = if let Some(pwd) = password {
PgDriver::connect_with_password(&host, port, &user, &database, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &database)
.await
.map_err(|e| anyhow!("Failed to connect to primary: {}", e))?
};
save_shadow_state(
&mut primary_reconnect,
&state,
&diff_cmds,
"[introspected]",
new_schema_path,
)
.await?;
state.is_ready = true;
display_shadow_status(&state);
Ok(state)
}
async fn apply_base_schema_to_shadow(state: &mut ShadowState, cmds: &[Qail]) -> Result<()> {
println!(" {} Applying base schema to shadow...", "[1.5/4]".cyan());
let (host, port, user, password, _) = parse_pg_url(&state.primary_url)?;
let mut shadow_driver = if let Some(pwd) = password {
PgDriver::connect_with_password(&host, port, &user, &state.shadow_name, &pwd)
.await
.map_err(|e| anyhow!("Failed to connect to shadow: {}", e))?
} else {
PgDriver::connect(&host, port, &user, &state.shadow_name)
.await
.map_err(|e| anyhow!("Failed to connect to shadow: {}", e))?
};
for (i, cmd) in cmds.iter().enumerate() {
shadow_driver
.execute(cmd)
.await
.map_err(|e| anyhow!("Base schema {} failed on shadow: {}", i + 1, e))?;
}
println!(" {} {} tables/indexes created", "✓".green(), cmds.len());
Ok(())
}