use std::path::PathBuf;
use std::process::ExitCode;
use std::str::FromStr;
use clap::Subcommand;
use djogi::__bypass::RawAccessExt as _;
use djogi::config::DjogiConfig;
use djogi::context::DjogiContext;
use djogi::live_migrate::compose::StepResult;
use djogi::live_migrate::{
DaemonConfig, DaemonError, LivePlanRow, PlanFileError, PlanStatus, active_hooks_at_step,
plan_path, read_plan, run_daemon, verify_checksum,
};
use djogi::pg::pool::DjogiPool;
use djogi::types::HeerId;
#[derive(Debug, Clone, Subcommand)]
pub enum LiveCmd {
Plan {
version: Option<String>,
#[arg(long)]
workspace: Option<PathBuf>,
},
Show {
plan_id: String,
#[arg(long)]
workspace: Option<PathBuf>,
},
Run {
plan_id: String,
#[arg(long, default_value_t = false)]
allow_destructive: bool,
#[arg(long)]
justify: Option<String>,
#[arg(long, default_value_t = false)]
allow_raw_dangerous: bool,
#[arg(long)]
workspace: Option<PathBuf>,
},
Resume {
plan_id: String,
#[arg(long, default_value_t = false)]
allow_destructive: bool,
#[arg(long)]
justify: Option<String>,
#[arg(long)]
workspace: Option<PathBuf>,
},
Finalize {
plan_id: String,
#[arg(long)]
justify: Option<String>,
#[arg(long)]
workspace: Option<PathBuf>,
},
Abandon {
plan_id: String,
#[arg(long, default_value_t = false)]
force: bool,
#[arg(long)]
workspace: Option<PathBuf>,
},
Daemon {
#[arg(long, default_value = "30s", value_parser = parse_humantime_duration)]
poll_interval: std::time::Duration,
#[arg(long, default_value = "10m", value_parser = parse_humantime_duration)]
claim_stale_after: std::time::Duration,
#[arg(long, default_value_t = false)]
allow_non_localhost: bool,
#[arg(long)]
workspace: Option<PathBuf>,
},
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum LiveCmdError {
#[error("{0}")]
Runtime(String),
#[error("classification refused: {0}")]
ClassificationRefused(String),
#[error("plan file checksum drift: {0}")]
ChecksumDrift(String),
#[error("plan state conflict: {0}")]
StateConflict(String),
#[error("argument refused: {0}")]
ArgRefused(String),
#[error("malformed plan_id: {0}")]
MalformedPlanId(String),
#[error("plan {0} not found in djogi_live_plans")]
PlanNotFound(HeerId),
}
impl LiveCmdError {
pub fn exit_code(&self) -> i32 {
match self {
LiveCmdError::Runtime(_)
| LiveCmdError::ArgRefused(_)
| LiveCmdError::MalformedPlanId(_)
| LiveCmdError::PlanNotFound(_) => 1,
LiveCmdError::ClassificationRefused(_) => 2,
LiveCmdError::ChecksumDrift(_) => 4,
LiveCmdError::StateConflict(_) => 5,
}
}
}
impl From<PlanFileError> for LiveCmdError {
fn from(value: PlanFileError) -> Self {
match value {
PlanFileError::ChecksumMismatch { .. } => {
LiveCmdError::ChecksumDrift(value.to_string())
}
other => LiveCmdError::Runtime(other.to_string()),
}
}
}
pub fn dispatch(cmd: LiveCmd) -> ExitCode {
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi live: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async { run(cmd).await });
let code = match exit {
Ok(c) => c,
Err(e) => {
eprintln!("djogi live: {e}");
e.exit_code()
}
};
ExitCode::from(code as u8)
}
async fn run(cmd: LiveCmd) -> Result<i32, LiveCmdError> {
match cmd {
LiveCmd::Plan { version, workspace } => plan_cmd(version.as_deref(), workspace).await,
LiveCmd::Show { plan_id, workspace } => show_cmd(&plan_id, workspace).await,
LiveCmd::Run {
plan_id,
allow_destructive,
justify,
allow_raw_dangerous,
workspace,
} => {
run_cmd(
&plan_id,
allow_destructive,
justify.as_deref(),
allow_raw_dangerous,
workspace,
)
.await
}
LiveCmd::Resume {
plan_id,
allow_destructive,
justify,
workspace,
} => resume_cmd(&plan_id, allow_destructive, justify.as_deref(), workspace).await,
LiveCmd::Finalize {
plan_id,
justify,
workspace,
} => finalize_cmd(&plan_id, justify.as_deref(), workspace).await,
LiveCmd::Abandon {
plan_id,
force,
workspace,
} => abandon_cmd(&plan_id, force, workspace).await,
LiveCmd::Daemon {
poll_interval,
claim_stale_after,
allow_non_localhost,
workspace,
} => {
daemon_cmd(
poll_interval,
claim_stale_after,
allow_non_localhost,
workspace,
)
.await
}
}
}
fn parse_plan_id(raw: &str) -> Result<HeerId, LiveCmdError> {
HeerId::from_str(raw).map_err(|e| LiveCmdError::MalformedPlanId(format!("`{raw}`: {e}")))
}
fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
}
fn require_justify_for_dangerous(
allow_raw_dangerous: bool,
justify: Option<&str>,
) -> Result<(), LiveCmdError> {
if allow_raw_dangerous && justify_is_empty(justify) {
return Err(LiveCmdError::ArgRefused(
"--allow-raw-dangerous requires --justify \"<reason>\"".to_string(),
));
}
Ok(())
}
fn require_justify_for_destructive(
allow_destructive: bool,
justify: Option<&str>,
) -> Result<(), LiveCmdError> {
if allow_destructive && justify_is_empty(justify) {
return Err(LiveCmdError::ArgRefused(
"--allow-destructive requires --justify \"<reason>\"".to_string(),
));
}
Ok(())
}
fn justify_is_empty(justify: Option<&str>) -> bool {
justify.map(|s| s.trim().is_empty()).unwrap_or(true)
}
fn require_destructive_gate_for_plan(
plan: &djogi::live_migrate::LivePlan,
allow_destructive: bool,
justify: Option<&str>,
) -> Result<(), LiveCmdError> {
if !plan.has_destructive_steps() {
return Ok(());
}
if !allow_destructive {
return Err(LiveCmdError::ArgRefused(
"plan contains a destructive step (DROP / TRUNCATE class); \
pass `--allow-destructive --justify \"<reason>\"` to proceed"
.to_string(),
));
}
if justify_is_empty(justify) {
return Err(LiveCmdError::ArgRefused(
"plan contains a destructive step; `--allow-destructive` requires \
`--justify \"<reason>\"`"
.to_string(),
));
}
Ok(())
}
fn force_allowed_in_env() -> bool {
match std::env::var("DJOGI_ENV") {
Ok(v) => !v.eq_ignore_ascii_case("production"),
Err(_) => true,
}
}
fn parse_humantime_duration(s: &str) -> Result<std::time::Duration, String> {
let trimmed = s.trim();
if trimmed.is_empty() {
return Err(format!(
"empty duration string `{s}`; expected e.g. `30s` / `5m` / `2h` / `1d` / `10min`"
));
}
let bytes = trimmed.as_bytes();
let mut i = 0usize;
while i < bytes.len() && bytes[i].is_ascii_digit() {
i += 1;
}
if i == 0 {
return Err(format!(
"duration `{s}` must start with one or more ASCII digits"
));
}
let digits = &trimmed[..i];
let unit = &trimmed[i..];
let value: u64 = digits
.parse()
.map_err(|e| format!("duration `{s}`: numeric prefix `{digits}` overflows u64: {e}"))?;
let secs: u64 = match unit {
"s" => value,
"m" | "min" => value
.checked_mul(60)
.ok_or_else(|| format!("duration `{s}` overflows u64 seconds"))?,
"h" => value
.checked_mul(3_600)
.ok_or_else(|| format!("duration `{s}` overflows u64 seconds"))?,
"d" => value
.checked_mul(86_400)
.ok_or_else(|| format!("duration `{s}` overflows u64 seconds"))?,
other => {
return Err(format!(
"duration `{s}`: unknown unit `{other}`; expected `s` / `m` / `min` / `h` / `d`"
));
}
};
Ok(std::time::Duration::from_secs(secs))
}
fn resolve_plan_file_path(workspace: &std::path::Path, row: &LivePlanRow) -> std::path::PathBuf {
let migrations_root = djogi::migrate::migrations_root(workspace);
plan_path(
&migrations_root,
&row.target_database,
row.plan_id,
&row.slug,
)
}
async fn connect(database_url: &str) -> Result<DjogiContext, LiveCmdError> {
let pool = DjogiPool::connect(database_url)
.await
.map_err(|e| LiveCmdError::Runtime(format!("connect: {e}")))?;
djogi::pg::preflight::check_postgres_version(&pool)
.await
.map_err(|e| LiveCmdError::Runtime(format!("support boundary: {e}")))?;
Ok(DjogiContext::from_pool(pool))
}
fn load_config(workspace: &std::path::Path) -> Result<DjogiConfig, LiveCmdError> {
DjogiConfig::load_from_workspace(workspace)
.map_err(|e| LiveCmdError::Runtime(format!("config load: {e}")))
}
async fn fetch_row(ctx: &mut DjogiContext, plan_id: HeerId) -> Result<LivePlanRow, LiveCmdError> {
use djogi::live_migrate::state;
let bucket_row = ctx
.raw_rows(
"SELECT target_database, app_label FROM djogi_live_plans WHERE plan_id = $1",
&[&plan_id.as_i64()],
)
.await
.map_err(|e| LiveCmdError::Runtime(format!("plan lookup: {e}")))?;
let bucket = match bucket_row.first() {
Some(row) => {
let target_database: String = row
.try_get(0)
.map_err(|e| LiveCmdError::Runtime(format!("plan lookup decode: {e}")))?;
let app_label: String = row
.try_get(1)
.map_err(|e| LiveCmdError::Runtime(format!("plan lookup decode: {e}")))?;
(target_database, app_label)
}
None => return Err(LiveCmdError::PlanNotFound(plan_id)),
};
let row = state::fetch_row_by_id(ctx, plan_id, &bucket.0, &bucket.1)
.await
.map_err(|e| LiveCmdError::Runtime(format!("plan fetch: {e}")))?
.ok_or(LiveCmdError::PlanNotFound(plan_id))?;
Ok(row)
}
async fn plan_cmd(version: Option<&str>, workspace: Option<PathBuf>) -> Result<i32, LiveCmdError> {
let workspace = resolve_workspace(workspace);
let _config = load_config(&workspace)?;
if let Some(v) = version
&& !v.is_empty()
{
return Err(refuse_offline_only(format!(
"live plan: explicit version filter `{v}` requires the live-plan compose engine; \
this CLI build ships the dispatch + parsing surface only"
)));
}
Err(LiveCmdError::Runtime(
"live plan: descriptor → snapshot → classify → dispatch pipeline lands in a follow-up task; \
this CLI build shipped the dispatch + parsing surface only. Use `djogi migrations compose` \
today; the live-plan emitter wraps that in a forthcoming task"
.to_string(),
))
}
pub fn refuse_offline_only(reason: impl Into<String>) -> LiveCmdError {
LiveCmdError::ClassificationRefused(reason.into())
}
async fn show_cmd(plan_id_raw: &str, workspace: Option<PathBuf>) -> Result<i32, LiveCmdError> {
let plan_id = parse_plan_id(plan_id_raw)?;
let workspace = resolve_workspace(workspace);
let config = load_config(&workspace)?;
let mut ctx = connect(&config.database.url).await?;
let row = fetch_row(&mut ctx, plan_id).await?;
let path = resolve_plan_file_path(&workspace, &row);
verify_checksum(&path, &row.plan_file_checksum)?;
let plan = read_plan(&path)?;
let current_index = u32::try_from(row.current_step_index).unwrap_or(0);
let hooks = active_hooks_at_step(&plan, current_index)
.map_err(|e| LiveCmdError::Runtime(format!("hook walker: {e}")))?;
println!("plan_id : {}", row.plan_id);
println!("slug : {}", row.slug);
println!("classification : {}", row.classification.as_db_str());
println!("status : {}", row.status.as_db_str());
println!(
"current_step : {} (index {})",
row.current_step.as_deref().unwrap_or("<none>"),
row.current_step_index,
);
let total = row
.backfill_rows_total
.map(|n| n.to_string())
.unwrap_or_else(|| "<unknown>".to_string());
println!(
"backfill_rows : {} done / {} total",
row.backfill_rows_done, total,
);
println!("originating : {}", row.originating_migration.as_str(),);
if let Some(progress) = row.last_progress_at.as_ref() {
println!("last_progress : {progress}");
}
if let Some(err) = row.last_error.as_deref() {
println!("last_error : {err}");
}
println!("plan_file : {}", path.display());
println!();
println!("steps ({} total):", plan.steps.len(),);
for step in &plan.steps {
let marker = if (step.ordinal as i32) < row.current_step_index {
"[done]"
} else if (step.ordinal as i32) == row.current_step_index {
"[curr]"
} else {
"[ todo]"
};
println!(
" {marker} {ordinal:>3}: {kind:?}",
ordinal = step.ordinal,
kind = step.kind,
);
}
println!();
println!(
"active hooks : dual_read={}, dual_write={}, suppress_events={}",
hooks.dual_read.len(),
hooks.dual_write.len(),
hooks.side_effects_suppressed,
);
Ok(0)
}
async fn run_cmd(
plan_id_raw: &str,
allow_destructive: bool,
justify: Option<&str>,
allow_raw_dangerous: bool,
workspace: Option<PathBuf>,
) -> Result<i32, LiveCmdError> {
require_justify_for_destructive(allow_destructive, justify)?;
require_justify_for_dangerous(allow_raw_dangerous, justify)?;
let plan_id = parse_plan_id(plan_id_raw)?;
let workspace = resolve_workspace(workspace);
let config = load_config(&workspace)?;
let mut ctx = connect(&config.database.url).await?;
let row = fetch_row(&mut ctx, plan_id).await?;
assert_run_status_allows_progress(row.status)?;
let path = resolve_plan_file_path(&workspace, &row);
verify_checksum(&path, &row.plan_file_checksum)?;
let plan = read_plan(&path)?;
require_destructive_gate_for_plan(&plan, allow_destructive, justify)?;
match djogi::live_migrate::executor::run_plan(
&mut ctx,
path,
0,
false,
allow_destructive,
justify,
)
.await
{
Ok(result) => match result {
StepResult::Completed => {
println!("live run: plan {plan_id} completed successfully");
Ok(0)
}
StepResult::Paused => {
println!(
"live run: paused at operator gate; resume with `djogi live run {plan_id}`"
);
Ok(0)
}
StepResult::Partial {
rows_done,
rows_total,
} => {
if rows_total > 0 {
let pct = (rows_done as f64 / rows_total as f64) * 100.0;
println!(
"live run: backfill progress {rows_done}/{rows_total} ({pct:.1}%); resume with `djogi live resume {plan_id}`"
);
} else {
println!(
"live run: backfill interrupted after {rows_done} rows; resume with `djogi live resume {plan_id}`"
);
}
Ok(0)
}
},
Err(e) => Err(LiveCmdError::Runtime(format!("executor error: {e}"))),
}
}
async fn resume_cmd(
plan_id_raw: &str,
allow_destructive: bool,
justify: Option<&str>,
workspace: Option<PathBuf>,
) -> Result<i32, LiveCmdError> {
require_justify_for_destructive(allow_destructive, justify)?;
let plan_id = parse_plan_id(plan_id_raw)?;
let workspace = resolve_workspace(workspace);
let config = load_config(&workspace)?;
let mut ctx = connect(&config.database.url).await?;
let row = fetch_row(&mut ctx, plan_id).await?;
assert_resume_status_allows_progress(row.status)?;
let path = resolve_plan_file_path(&workspace, &row);
verify_checksum(&path, &row.plan_file_checksum)?;
let _plan = read_plan(&path)?;
let start_idx = u32::try_from(row.current_step_index).unwrap_or(0);
match djogi::live_migrate::executor::run_plan(
&mut ctx,
path,
start_idx,
true,
allow_destructive,
justify,
)
.await
{
Ok(result) => match result {
StepResult::Completed => {
println!("live resume: plan {plan_id} completed successfully");
Ok(0)
}
StepResult::Paused => {
println!(
"live resume: paused at operator gate; resume with `djogi live run {plan_id}`"
);
Ok(0)
}
StepResult::Partial {
rows_done,
rows_total,
} => {
if rows_total > 0 {
let pct = (rows_done as f64 / rows_total as f64) * 100.0;
println!(
"live resume: backfill progress {rows_done}/{rows_total} ({pct:.1}%); resume with `djogi live resume {plan_id}`"
);
} else {
println!(
"live resume: backfill interrupted after {rows_done} rows; resume with `djogi live resume {plan_id}`"
);
}
Ok(0)
}
},
Err(e) => Err(LiveCmdError::Runtime(format!("executor error: {e}"))),
}
}
fn assert_run_status_allows_progress(status: PlanStatus) -> Result<(), LiveCmdError> {
match status {
PlanStatus::Pending | PlanStatus::Running => Ok(()),
PlanStatus::Paused => Err(LiveCmdError::StateConflict(
"plan is in `paused`; use `live resume` to re-enter the run loop \
(paused is an explicit operator checkpoint and `live run` does \
not auto-advance through it)"
.to_string(),
)),
PlanStatus::Validating
| PlanStatus::Cutover
| PlanStatus::Finalizing
| PlanStatus::Complete
| PlanStatus::Abandoned
| PlanStatus::Failed => Err(LiveCmdError::StateConflict(format!(
"plan is in `{}`; `live run` advances only Pending / Running plans",
status.as_db_str()
))),
_ => Err(LiveCmdError::StateConflict(format!(
"plan is in `{}`; this CLI build does not recognise the status",
status.as_db_str()
))),
}
}
fn assert_resume_status_allows_progress(status: PlanStatus) -> Result<(), LiveCmdError> {
match status {
PlanStatus::Running | PlanStatus::Paused => Ok(()),
PlanStatus::Pending => Err(LiveCmdError::StateConflict(
"plan is in `pending`; use `live run` to start it (resume is for an interrupted run)"
.to_string(),
)),
PlanStatus::Validating
| PlanStatus::Cutover
| PlanStatus::Finalizing
| PlanStatus::Complete
| PlanStatus::Abandoned
| PlanStatus::Failed => Err(LiveCmdError::StateConflict(format!(
"plan is in `{}`; resume is for interrupted Running / Paused plans \
(use `live run` past gates, `live finalize` to complete, or `live abandon` to walk away)",
status.as_db_str()
))),
_ => Err(LiveCmdError::StateConflict(format!(
"plan is in `{}`; this CLI build does not recognise the status",
status.as_db_str()
))),
}
}
async fn finalize_cmd(
plan_id_raw: &str,
justify: Option<&str>,
workspace: Option<PathBuf>,
) -> Result<i32, LiveCmdError> {
let plan_id = parse_plan_id(plan_id_raw)?;
let workspace = resolve_workspace(workspace);
let config = load_config(&workspace)?;
let mut ctx = connect(&config.database.url).await?;
let row = fetch_row(&mut ctx, plan_id).await?;
assert_finalize_status(row.status)?;
let justify_present = justify.map(|s| !s.trim().is_empty()).unwrap_or(false);
if !justify_present {
return Err(LiveCmdError::ArgRefused(
"live finalize runs destructive cleanup steps; pass \
--justify \"<reason>\""
.to_string(),
));
}
let path = resolve_plan_file_path(&workspace, &row);
verify_checksum(&path, &row.plan_file_checksum)?;
let _plan = read_plan(&path)?;
let start_idx = u32::try_from(row.current_step_index).unwrap_or(0);
match djogi::live_migrate::executor::run_plan(&mut ctx, path, start_idx, true, true, justify)
.await
{
Ok(result) => match result {
StepResult::Completed => {
println!("live finalize: plan {plan_id} completed successfully");
Ok(0)
}
StepResult::Paused => {
println!(
"live finalize: paused at operator gate; resume with `djogi live run {plan_id}`"
);
Ok(0)
}
StepResult::Partial {
rows_done,
rows_total,
} => {
if rows_total > 0 {
let pct = (rows_done as f64 / rows_total as f64) * 100.0;
println!(
"live finalize: backfill progress {rows_done}/{rows_total} ({pct:.1}%); resume with `djogi live finalize {plan_id}`"
);
} else {
println!(
"live finalize: backfill interrupted after {rows_done} rows; resume with `djogi live finalize {plan_id}`"
);
}
Ok(0)
}
},
Err(e) => Err(LiveCmdError::Runtime(format!("executor error: {e}"))),
}
}
fn assert_finalize_status(status: PlanStatus) -> Result<(), LiveCmdError> {
match status {
PlanStatus::Finalizing => Ok(()),
other => Err(LiveCmdError::StateConflict(format!(
"plan is in `{}`; `live finalize` runs only against the `finalizing` state",
other.as_db_str()
))),
}
}
async fn abandon_cmd(
plan_id_raw: &str,
force: bool,
workspace: Option<PathBuf>,
) -> Result<i32, LiveCmdError> {
let plan_id = parse_plan_id(plan_id_raw)?;
let workspace = resolve_workspace(workspace);
let config = load_config(&workspace)?;
if force && !force_allowed_in_env() {
return Err(LiveCmdError::ArgRefused(
"--force refused under DJOGI_ENV=production".to_string(),
));
}
let confirmed = if force {
true
} else {
match interactive_confirm_abandon(plan_id) {
Ok(c) => c,
Err(_) => {
return Err(LiveCmdError::ArgRefused(
"failed to read confirmation; refusing without an explicit `--force`"
.to_string(),
));
}
}
};
if !confirmed {
eprintln!("djogi live abandon: aborted; plan {plan_id} unchanged");
return Ok(0);
}
let mut ctx = connect(&config.database.url).await?;
let row = fetch_row(&mut ctx, plan_id).await?;
assert_abandon_status(row.status)?;
djogi::live_migrate::state::update_status(
&mut ctx,
plan_id,
&row.target_database,
&row.app_label,
PlanStatus::Abandoned,
)
.await
.map_err(|e| LiveCmdError::Runtime(format!("abandon update_status: {e}")))?;
println!(
"live abandon: plan {plan_id} marked abandoned (was `{}`); plan file \
preserved on disk for audit",
row.status.as_db_str(),
);
Ok(0)
}
fn assert_abandon_status(status: PlanStatus) -> Result<(), LiveCmdError> {
match status {
PlanStatus::Complete => Err(LiveCmdError::StateConflict(
"plan is `complete`; nothing to abandon".to_string(),
)),
PlanStatus::Abandoned => Err(LiveCmdError::StateConflict(
"plan is already `abandoned`".to_string(),
)),
PlanStatus::Failed => Err(LiveCmdError::StateConflict(
"plan is `failed`; the failure is recorded for audit and the \
plan is terminal — generate a fresh plan after addressing the \
underlying cause"
.to_string(),
)),
PlanStatus::Pending
| PlanStatus::Running
| PlanStatus::Paused
| PlanStatus::Validating
| PlanStatus::Cutover
| PlanStatus::Finalizing => Ok(()),
_ => Err(LiveCmdError::StateConflict(format!(
"plan is in `{}`; this CLI build does not recognise the status",
status.as_db_str()
))),
}
}
async fn daemon_cmd(
poll_interval: std::time::Duration,
claim_stale_after: std::time::Duration,
allow_non_localhost: bool,
workspace: Option<PathBuf>,
) -> Result<i32, LiveCmdError> {
let workspace = resolve_workspace(workspace);
let config = load_config(&workspace)?;
let cfg = DaemonConfig {
poll_interval,
claim_stale_after,
allow_non_localhost,
database_url: config.database.url.clone(),
host: hostname_for_claim(),
pid: i64::from(std::process::id()),
profile: config.profile.clone(),
workspace_root: workspace.to_path_buf(),
};
let mut ctx = connect(&config.database.url).await?;
match run_daemon(&mut ctx, cfg).await {
Ok(()) => Ok(0),
Err(DaemonError::Shutdown) => Ok(0),
Err(DaemonError::NotLocalhost) => Err(LiveCmdError::ArgRefused(
"live daemon refused: not running on localhost (pass --allow-non-localhost to override)"
.to_string(),
)),
Err(DaemonError::Production) => Err(LiveCmdError::ArgRefused(
"live daemon refused: DJOGI_ENV=production".to_string(),
)),
Err(DaemonError::Backfill(e)) => {
Err(LiveCmdError::Runtime(format!("daemon backfill: {e}")))
}
Err(DaemonError::Database(e)) => Err(LiveCmdError::Runtime(format!("daemon db: {e}"))),
Err(other) => Err(LiveCmdError::Runtime(format!("daemon: {other}"))),
}
}
fn hostname_for_claim() -> String {
std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string())
}
fn interactive_confirm_abandon(plan_id: HeerId) -> std::io::Result<bool> {
use std::io::{BufRead, Write};
let stderr = std::io::stderr();
let mut handle = stderr.lock();
writeln!(
handle,
"WARNING: live abandon will mark plan {plan_id} as `abandoned`. Schema state \
remains at the last completed step; the plan file stays on disk. Resume is \
refused after abandonment — generate a fresh plan instead."
)?;
write!(handle, "Type `yes` to confirm, anything else to abort: ")?;
handle.flush()?;
let stdin = std::io::stdin();
let mut line = String::new();
stdin.lock().read_line(&mut line)?;
Ok(matches!(
line.trim().to_ascii_lowercase().as_str(),
"y" | "yes"
))
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[derive(Parser, Debug)]
struct LiveCli {
#[command(subcommand)]
cmd: LiveCmd,
}
fn parse(argv: &[&str]) -> Result<LiveCli, clap::Error> {
let mut full = vec!["live"];
full.extend_from_slice(argv);
LiveCli::try_parse_from(full)
}
#[test]
fn live_plan_parses_without_args() {
let parsed = parse(&["plan"]).expect("plan parses");
match parsed.cmd {
LiveCmd::Plan { version, .. } => assert!(version.is_none()),
other => panic!("expected Plan, got {other:?}"),
}
}
#[test]
fn live_plan_accepts_optional_version() {
let parsed = parse(&["plan", "V20260428000000__demo"]).expect("plan with version parses");
match parsed.cmd {
LiveCmd::Plan { version, .. } => {
assert_eq!(version.as_deref(), Some("V20260428000000__demo"));
}
other => panic!("expected Plan, got {other:?}"),
}
}
#[test]
fn live_show_requires_plan_id() {
let err = parse(&["show"]).expect_err("show without plan_id must fail");
let msg = err.to_string();
assert!(
msg.to_lowercase().contains("plan_id") || msg.to_lowercase().contains("required"),
"expected plan_id requirement in clap message: {msg}",
);
}
#[test]
fn live_show_parses_plan_id() {
let parsed = parse(&["show", "12345"]).expect("show with plan_id parses");
match parsed.cmd {
LiveCmd::Show { plan_id, .. } => assert_eq!(plan_id, "12345"),
other => panic!("expected Show, got {other:?}"),
}
}
#[test]
fn live_run_accepts_allow_destructive_with_justify() {
let parsed = parse(&[
"run",
"12345",
"--allow-destructive",
"--justify",
"rotate keys for incident IR-7",
])
.expect("run with destructive + justify parses");
match parsed.cmd {
LiveCmd::Run {
plan_id,
allow_destructive,
justify,
allow_raw_dangerous,
..
} => {
assert_eq!(plan_id, "12345");
assert!(allow_destructive);
assert_eq!(justify.as_deref(), Some("rotate keys for incident IR-7"));
assert!(!allow_raw_dangerous);
}
other => panic!("expected Run, got {other:?}"),
}
}
#[test]
fn live_run_accepts_allow_raw_dangerous_with_justify() {
let parsed = parse(&[
"run",
"67890",
"--allow-raw-dangerous",
"--justify",
"operator runbook RB-12",
])
.expect("run with allow-raw-dangerous parses");
match parsed.cmd {
LiveCmd::Run {
allow_raw_dangerous,
justify,
..
} => {
assert!(allow_raw_dangerous);
assert_eq!(justify.as_deref(), Some("operator runbook RB-12"));
}
other => panic!("expected Run, got {other:?}"),
}
}
#[test]
fn live_resume_parses() {
let parsed = parse(&["resume", "55"]).expect("resume parses");
assert!(matches!(parsed.cmd, LiveCmd::Resume { .. }));
}
#[test]
fn live_finalize_accepts_justify() {
let parsed = parse(&["finalize", "55", "--justify", "drop legacy"])
.expect("finalize with justify parses");
match parsed.cmd {
LiveCmd::Finalize {
justify, plan_id, ..
} => {
assert_eq!(plan_id, "55");
assert_eq!(justify.as_deref(), Some("drop legacy"));
}
other => panic!("expected Finalize, got {other:?}"),
}
}
#[test]
fn live_abandon_accepts_force() {
let parsed = parse(&["abandon", "12345", "--force"]).expect("abandon with force parses");
match parsed.cmd {
LiveCmd::Abandon { force, plan_id, .. } => {
assert!(force);
assert_eq!(plan_id, "12345");
}
other => panic!("expected Abandon, got {other:?}"),
}
}
#[test]
fn live_daemon_parses_with_default_intervals() {
let parsed = parse(&["daemon"]).expect("daemon parses with no args");
match parsed.cmd {
LiveCmd::Daemon {
poll_interval,
claim_stale_after,
allow_non_localhost,
..
} => {
assert_eq!(
poll_interval,
std::time::Duration::from_secs(30),
"default poll interval is 30s",
);
assert_eq!(
claim_stale_after,
std::time::Duration::from_secs(600),
"default stale threshold is 10 minutes",
);
assert!(
!allow_non_localhost,
"default refuses non-localhost connections",
);
}
other => panic!("expected Daemon, got {other:?}"),
}
}
#[test]
fn live_daemon_accepts_custom_intervals() {
let parsed = parse(&[
"daemon",
"--poll-interval",
"5s",
"--claim-stale-after",
"1m",
"--allow-non-localhost",
])
.expect("daemon with overrides parses");
match parsed.cmd {
LiveCmd::Daemon {
poll_interval,
claim_stale_after,
allow_non_localhost,
..
} => {
assert_eq!(poll_interval, std::time::Duration::from_secs(5));
assert_eq!(claim_stale_after, std::time::Duration::from_secs(60));
assert!(allow_non_localhost);
}
other => panic!("expected Daemon, got {other:?}"),
}
}
#[test]
fn live_daemon_accepts_humantime_minutes_and_hours() {
let parsed = parse(&[
"daemon",
"--poll-interval",
"10min",
"--claim-stale-after",
"2h",
])
.expect("daemon with humantime durations parses");
match parsed.cmd {
LiveCmd::Daemon {
poll_interval,
claim_stale_after,
..
} => {
assert_eq!(poll_interval, std::time::Duration::from_secs(600));
assert_eq!(claim_stale_after, std::time::Duration::from_secs(7200));
}
other => panic!("expected Daemon, got {other:?}"),
}
}
#[test]
fn live_daemon_accepts_workspace_override() {
let parsed = parse(&["daemon", "--workspace", "/tmp/example"])
.expect("daemon with --workspace parses");
match parsed.cmd {
LiveCmd::Daemon { workspace, .. } => {
assert_eq!(
workspace.as_deref(),
Some(std::path::Path::new("/tmp/example")),
);
}
other => panic!("expected Daemon, got {other:?}"),
}
}
#[test]
fn parse_humantime_duration_accepts_seconds() {
assert_eq!(
parse_humantime_duration("30s").unwrap(),
std::time::Duration::from_secs(30),
);
assert_eq!(
parse_humantime_duration("0s").unwrap(),
std::time::Duration::from_secs(0),
);
}
#[test]
fn parse_humantime_duration_accepts_minutes_and_hours_and_days() {
assert_eq!(
parse_humantime_duration("5m").unwrap(),
std::time::Duration::from_secs(300),
);
assert_eq!(
parse_humantime_duration("10min").unwrap(),
std::time::Duration::from_secs(600),
);
assert_eq!(
parse_humantime_duration("2h").unwrap(),
std::time::Duration::from_secs(7_200),
);
assert_eq!(
parse_humantime_duration("1d").unwrap(),
std::time::Duration::from_secs(86_400),
);
}
#[test]
fn parse_humantime_duration_rejects_empty_input() {
let err = parse_humantime_duration("").unwrap_err();
assert!(err.contains("empty"), "{err}");
let err = parse_humantime_duration(" ").unwrap_err();
assert!(err.contains("empty"), "{err}");
}
#[test]
fn parse_humantime_duration_rejects_missing_digits() {
let err = parse_humantime_duration("s").unwrap_err();
assert!(err.contains("ASCII digits"), "{err}");
let err = parse_humantime_duration("min").unwrap_err();
assert!(err.contains("ASCII digits"), "{err}");
}
#[test]
fn parse_humantime_duration_rejects_unknown_unit() {
let err = parse_humantime_duration("30y").unwrap_err();
assert!(err.contains("unknown unit"), "{err}");
let err = parse_humantime_duration("1h30m").unwrap_err();
assert!(err.contains("unknown unit"), "{err}");
}
#[test]
fn parse_humantime_duration_rejects_trailing_junk() {
let err = parse_humantime_duration("30sX").unwrap_err();
assert!(
err.contains("unknown unit") || err.contains("expected"),
"{err}"
);
let err = parse_humantime_duration("30 s").unwrap_err();
assert!(
err.contains("unknown unit") || err.contains("expected"),
"{err}"
);
}
#[test]
fn parse_humantime_duration_handles_outer_whitespace() {
assert_eq!(
parse_humantime_duration(" 30s ").unwrap(),
std::time::Duration::from_secs(30),
);
}
#[test]
fn hostname_for_claim_falls_back_to_unknown() {
let prior = std::env::var("HOSTNAME").ok();
unsafe { std::env::remove_var("HOSTNAME") };
assert_eq!(hostname_for_claim(), "unknown");
unsafe { std::env::set_var("HOSTNAME", "ci-runner-7") };
assert_eq!(hostname_for_claim(), "ci-runner-7");
match prior {
Some(v) => unsafe { std::env::set_var("HOSTNAME", v) },
None => unsafe { std::env::remove_var("HOSTNAME") },
}
}
#[test]
fn justify_is_empty_handles_none_and_blank() {
assert!(justify_is_empty(None));
assert!(justify_is_empty(Some("")));
assert!(justify_is_empty(Some(" ")));
assert!(!justify_is_empty(Some("real reason")));
}
#[test]
fn require_justify_for_destructive_refuses_without_reason() {
let err = require_justify_for_destructive(true, None).unwrap_err();
assert!(matches!(err, LiveCmdError::ArgRefused(_)));
let err = require_justify_for_destructive(true, Some(" ")).unwrap_err();
assert!(matches!(err, LiveCmdError::ArgRefused(_)));
require_justify_for_destructive(false, None).unwrap();
require_justify_for_destructive(true, Some("rotate keys")).unwrap();
}
#[test]
fn require_justify_for_dangerous_refuses_without_reason() {
let err = require_justify_for_dangerous(true, None).unwrap_err();
assert!(matches!(err, LiveCmdError::ArgRefused(_)));
require_justify_for_dangerous(false, None).unwrap();
require_justify_for_dangerous(true, Some("runbook")).unwrap();
}
#[test]
fn require_destructive_gate_passes_for_non_destructive_plan() {
use djogi::live_migrate::{
LivePlan, PlanClassification, PlanHeader, Step, StepKind, StepParameters,
};
let plan = LivePlan {
header: PlanHeader {
plan_id: HeerId::ZERO,
slug: "demo".to_string(),
classification: PlanClassification::ExpandContract,
originating_migration: "V20260428000000__demo".to_string(),
target_database: "main".to_string(),
app_label: "".to_string(),
},
steps: vec![Step {
kind: StepKind::ExpandSchema,
ordinal: 0,
parameters: StepParameters::ExpandSchema {
sql_segments: vec!["ALTER TABLE foo ADD COLUMN bar INT".to_string()],
},
}],
};
require_destructive_gate_for_plan(&plan, false, None).unwrap();
}
#[test]
fn require_destructive_gate_refuses_destructive_plan_without_flag() {
use djogi::live_migrate::{
LivePlan, PlanClassification, PlanHeader, Step, StepKind, StepParameters,
};
let plan = LivePlan {
header: PlanHeader {
plan_id: HeerId::ZERO,
slug: "demo".to_string(),
classification: PlanClassification::ExpandContract,
originating_migration: "V20260428000000__demo".to_string(),
target_database: "main".to_string(),
app_label: "".to_string(),
},
steps: vec![Step {
kind: StepKind::CleanupLegacyState,
ordinal: 0,
parameters: StepParameters::CleanupLegacyState {
sql_segments: vec!["ALTER TABLE foo DROP COLUMN baz".to_string()],
},
}],
};
let err = require_destructive_gate_for_plan(&plan, false, None).unwrap_err();
assert!(matches!(err, LiveCmdError::ArgRefused(_)));
let err = require_destructive_gate_for_plan(&plan, true, None).unwrap_err();
assert!(matches!(err, LiveCmdError::ArgRefused(_)));
let err = require_destructive_gate_for_plan(&plan, true, Some(" ")).unwrap_err();
assert!(matches!(err, LiveCmdError::ArgRefused(_)));
require_destructive_gate_for_plan(&plan, true, Some("ops runbook RB-19")).unwrap();
}
#[test]
fn parse_plan_id_accepts_decimal() {
let id = parse_plan_id("12345").unwrap();
assert_eq!(id.as_i64(), 12345);
}
#[test]
fn parse_plan_id_rejects_garbage() {
let err = parse_plan_id("not-a-number").unwrap_err();
assert!(matches!(err, LiveCmdError::MalformedPlanId(_)));
}
#[test]
fn exit_code_runtime_maps_to_one() {
assert_eq!(LiveCmdError::Runtime("x".to_string()).exit_code(), 1);
assert_eq!(LiveCmdError::ArgRefused("x".to_string()).exit_code(), 1);
assert_eq!(
LiveCmdError::MalformedPlanId("x".to_string()).exit_code(),
1
);
}
#[test]
fn exit_code_classification_refused_maps_to_two() {
assert_eq!(
LiveCmdError::ClassificationRefused("offline only".to_string()).exit_code(),
2,
);
}
#[test]
fn exit_code_checksum_drift_maps_to_four() {
assert_eq!(
LiveCmdError::ChecksumDrift("mismatch".to_string()).exit_code(),
4,
);
}
#[test]
fn exit_code_state_conflict_maps_to_five() {
assert_eq!(
LiveCmdError::StateConflict("complete".to_string()).exit_code(),
5,
);
}
#[test]
fn assert_run_status_accepts_pending_running() {
assert!(assert_run_status_allows_progress(PlanStatus::Pending).is_ok());
assert!(assert_run_status_allows_progress(PlanStatus::Running).is_ok());
}
#[test]
fn assert_run_status_refuses_paused_pointing_to_resume() {
let err = assert_run_status_allows_progress(PlanStatus::Paused)
.expect_err("paused must be a state conflict for `live run`");
match err {
LiveCmdError::StateConflict(msg) => {
assert!(msg.contains("paused"), "{msg}");
assert!(msg.contains("live resume"), "{msg}");
}
other => panic!("expected StateConflict, got {other:?}"),
}
}
#[test]
fn assert_run_status_refuses_terminal_and_gates() {
for status in [
PlanStatus::Validating,
PlanStatus::Cutover,
PlanStatus::Finalizing,
PlanStatus::Complete,
PlanStatus::Abandoned,
PlanStatus::Failed,
] {
let err = assert_run_status_allows_progress(status)
.expect_err("non-progressable status must refuse");
assert!(matches!(err, LiveCmdError::StateConflict(_)));
}
}
#[test]
fn assert_resume_status_distinguishes_pending_from_terminal() {
let err = assert_resume_status_allows_progress(PlanStatus::Pending)
.expect_err("pending must refuse");
match err {
LiveCmdError::StateConflict(msg) => assert!(msg.contains("pending")),
other => panic!("expected StateConflict, got {other:?}"),
}
assert!(assert_resume_status_allows_progress(PlanStatus::Running).is_ok());
assert!(assert_resume_status_allows_progress(PlanStatus::Paused).is_ok());
for status in [
PlanStatus::Validating,
PlanStatus::Cutover,
PlanStatus::Finalizing,
PlanStatus::Complete,
PlanStatus::Abandoned,
PlanStatus::Failed,
] {
let err = assert_resume_status_allows_progress(status)
.expect_err("non-resumable status must refuse");
assert!(matches!(err, LiveCmdError::StateConflict(_)));
}
}
#[test]
fn assert_finalize_status_accepts_only_finalizing() {
assert!(assert_finalize_status(PlanStatus::Finalizing).is_ok());
for status in [
PlanStatus::Pending,
PlanStatus::Running,
PlanStatus::Paused,
PlanStatus::Validating,
PlanStatus::Cutover,
PlanStatus::Complete,
PlanStatus::Abandoned,
PlanStatus::Failed,
] {
let err = assert_finalize_status(status).expect_err("non-finalizing must refuse");
assert!(matches!(err, LiveCmdError::StateConflict(_)));
}
}
#[test]
fn assert_abandon_status_refuses_every_terminal_state() {
for status in [
PlanStatus::Complete,
PlanStatus::Abandoned,
PlanStatus::Failed,
] {
let err = assert_abandon_status(status)
.expect_err("terminal status must be a state conflict for abandon");
assert!(matches!(err, LiveCmdError::StateConflict(_)));
}
for status in [
PlanStatus::Pending,
PlanStatus::Running,
PlanStatus::Paused,
PlanStatus::Validating,
PlanStatus::Cutover,
PlanStatus::Finalizing,
] {
assert!(assert_abandon_status(status).is_ok(), "{status:?} accepts");
}
}
#[test]
fn assert_abandon_status_failed_message_points_to_fresh_plan() {
let err = assert_abandon_status(PlanStatus::Failed).expect_err("failed must refuse");
match err {
LiveCmdError::StateConflict(msg) => {
assert!(msg.contains("failed"), "{msg}");
assert!(msg.contains("fresh plan") || msg.contains("audit"), "{msg}",);
}
other => panic!("expected StateConflict, got {other:?}"),
}
}
#[test]
fn force_allowed_when_djogi_env_unset() {
let prior = std::env::var("DJOGI_ENV").ok();
unsafe { std::env::remove_var("DJOGI_ENV") };
assert!(force_allowed_in_env());
unsafe { std::env::set_var("DJOGI_ENV", "development") };
assert!(force_allowed_in_env());
unsafe { std::env::set_var("DJOGI_ENV", "PRODUCTION") };
assert!(
!force_allowed_in_env(),
"case-insensitive production must refuse"
);
unsafe { std::env::set_var("DJOGI_ENV", "production") };
assert!(!force_allowed_in_env());
match prior {
Some(v) => unsafe { std::env::set_var("DJOGI_ENV", v) },
None => unsafe { std::env::remove_var("DJOGI_ENV") },
}
}
#[test]
fn plan_file_checksum_mismatch_maps_to_drift() {
let pfe = PlanFileError::ChecksumMismatch {
path: PathBuf::from("/tmp/x.json"),
expected: "V1:0".to_string(),
actual: "V1:1".to_string(),
};
let err: LiveCmdError = pfe.into();
assert_eq!(err.exit_code(), 4, "checksum mismatch must exit 4");
}
#[test]
fn plan_file_io_maps_to_runtime() {
let pfe = PlanFileError::NotFound(PathBuf::from("/missing"));
let err: LiveCmdError = pfe.into();
assert_eq!(err.exit_code(), 1);
}
}