use std::path::{Path, PathBuf};
use std::process::ExitCode;
use djogi::apps::AppRegistry;
use djogi::migrate::{
AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan, RepairConfirmation,
RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError, VerifyReport, VerifySeverity,
acquire_workspace_lock, apply_plan, attune, baseline_plan, compose, fake_apply_plan,
load_snapshot, project_from_inventory, repair_checksum_drift, repair_partial_apply,
repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
};
use djogi::migrate::LedgerStatus;
use crate::{PartialApplyResolutionCli, RepairSubcommand};
#[derive(Debug, Clone, serde::Deserialize)]
struct CliReplayPlan {
format_version: String,
checksum_up: String,
checksum_down: Option<String>,
classification: CliClassification,
segments: Vec<CliReplaySegment>,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
enum CliClassification {
NoOp,
Additive,
Reversible,
Destructive,
Lossy,
Unsupported {
reason: String,
},
PkTypeFlip {
co_destructive: bool,
co_lossy: bool,
},
}
#[derive(Debug, Clone, serde::Deserialize)]
struct CliReplaySegment {
kind: CliSegmentKind,
statements: Vec<CliReplayStatement>,
}
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
enum CliSegmentKind {
Transactional,
NonTransactional,
MetadataOnly,
}
#[derive(Debug, Clone, serde::Deserialize)]
struct CliReplayStatement {
label: String,
up: String,
}
const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
fn load_replay_plan_from_disk(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
pending_checksum_up: &str,
pending_checksum_down: Option<&str>,
) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
if let Ok(bytes) = std::fs::read(&replay_plan_path) {
let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
Ok(s) => s,
Err(e) => {
return Err(ApplyReplayPlanError::Parse {
path: replay_plan_path.clone(),
source: e.to_string(),
});
}
};
if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
return Err(ApplyReplayPlanError::FormatVersion {
found: stored.format_version,
path: replay_plan_path.clone(),
});
}
if stored.checksum_up != pending_checksum_up
|| stored.checksum_down.as_deref() != pending_checksum_down
{
return Err(ApplyReplayPlanError::ChecksumMismatch);
}
let plan = djogi::migrate::MigrationPlan {
bucket: bucket.clone(),
classification: stored.classification.into(),
segments: stored
.segments
.into_iter()
.map(|seg| djogi::migrate::Segment {
kind: seg.kind.into(),
statements: seg
.statements
.into_iter()
.map(|stmt| djogi::migrate::OperationSql {
label: stmt.label,
up: stmt.up,
down: String::new(),
lossy: None,
})
.collect(),
})
.collect(),
};
return Ok((plan, stored.checksum_up, stored.checksum_down));
}
let up_filename = djogi::migrate::up_filename(version);
let down_filename = djogi::migrate::down_filename(version);
let up_path = bucket_dir.join(&up_filename);
let down_path = bucket_dir.join(&down_filename);
let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
path: up_path.clone(),
source: e.to_string(),
})?;
let down_sql = match std::fs::read_to_string(&down_path) {
Ok(sql) => sql,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
Err(e) => {
return Err(ApplyReplayPlanError::SqlRead {
path: down_path.clone(),
source: e.to_string(),
});
}
};
let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
let plan = djogi::migrate::MigrationPlan {
bucket: bucket.clone(),
classification: djogi::migrate::Classification::Additive,
segments: vec![djogi::migrate::Segment {
kind: djogi::migrate::SegmentKind::Transactional,
statements: vec![djogi::migrate::OperationSql {
label: format!("replay {version}"),
up: up_sql,
down: down_sql,
lossy: None,
}],
}],
};
Ok((plan, computed_checksum_up, None))
}
#[derive(Debug)]
enum ApplyReplayPlanError {
Parse { path: PathBuf, source: String },
FormatVersion { found: String, path: PathBuf },
ChecksumMismatch,
SqlRead { path: PathBuf, source: String },
}
impl std::fmt::Display for ApplyReplayPlanError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Parse { path, source } => {
write!(f, "parse replay plan {}: {source}", path.display())
}
Self::FormatVersion { found, path } => write!(
f,
"replay plan format version mismatch in {}: expected {}, found {}",
path.display(),
CLI_REPLAY_PLAN_FORMAT_VERSION,
found
),
Self::ChecksumMismatch => {
write!(f, "checksum mismatch between pending JSON and replay plan")
}
Self::SqlRead { path, source } => {
write!(f, "read SQL file {}: {source}", path.display())
}
}
}
}
impl std::error::Error for ApplyReplayPlanError {}
impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
fn from(kind: CliSegmentKind) -> Self {
match kind {
CliSegmentKind::Transactional => Self::Transactional,
CliSegmentKind::NonTransactional => Self::NonTransactional,
CliSegmentKind::MetadataOnly => Self::MetadataOnly,
}
}
}
impl From<CliClassification> for djogi::migrate::Classification {
fn from(classification: CliClassification) -> Self {
match classification {
CliClassification::NoOp => Self::NoOp,
CliClassification::Additive => Self::Additive,
CliClassification::Reversible => Self::Reversible,
CliClassification::Destructive => Self::Destructive,
CliClassification::Lossy => Self::Lossy,
CliClassification::Unsupported { reason } => Self::Unsupported { reason },
CliClassification::PkTypeFlip {
co_destructive,
co_lossy,
} => Self::PkTypeFlip {
co_destructive,
co_lossy,
},
}
}
}
fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
}
fn discover_snapshot_buckets_on_disk(
workspace: &Path,
) -> Vec<djogi::migrate::projection::BucketKey> {
let mut out = Vec::new();
let migrations_root = djogi::migrate::migrations_root(workspace);
let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
return out;
};
for db_entry in db_entries.flatten() {
let Ok(ft) = db_entry.file_type() else {
continue;
};
if !ft.is_dir() {
continue;
}
let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
continue;
};
let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
continue;
};
for app_entry in app_entries.flatten() {
let Ok(ft) = app_entry.file_type() else {
continue;
};
if !ft.is_dir() {
continue;
}
let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
continue;
};
let snap_path = app_entry.path().join("schema_snapshot.json");
if !snap_path.exists() {
continue;
}
let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
out.push(djogi::migrate::projection::BucketKey {
database: database.clone(),
app: label,
});
}
}
out
}
pub fn compose_cmd(
name: &str,
allow_destructive: bool,
force_overwrite: bool,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let models = match project_from_inventory() {
Ok(m) => m,
Err(e) => {
eprintln!("djogi migrations compose: projection error: {e}");
return ExitCode::from(1);
}
};
let apps: Vec<AppLifecycle> = AppRegistry::all()
.iter()
.map(|d| AppLifecycle {
label: d.label.to_string(),
database: d.database.to_string(),
renamed_from: d.renamed_from.map(str::to_string),
tombstone: d.tombstone,
})
.collect();
let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations compose: config load: {e}");
return ExitCode::from(1);
}
};
let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
djogi_config.migrate.pk_flip_join_table_option,
);
compose_with_inputs(
&workspace,
name,
allow_destructive,
force_overwrite,
&models,
&apps,
time::OffsetDateTime::now_utc(),
Some(pk_flip_option),
)
}
#[allow(clippy::too_many_arguments)]
fn compose_with_inputs(
workspace: &Path,
name: &str,
allow_destructive: bool,
force_overwrite: bool,
models: &std::collections::BTreeMap<
djogi::migrate::projection::BucketKey,
djogi::migrate::AppliedSchema,
>,
apps: &[AppLifecycle],
now: time::OffsetDateTime,
pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
) -> ExitCode {
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
return ExitCode::from(1);
}
};
let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
models.keys().cloned().collect();
for bucket in discover_snapshot_buckets_on_disk(workspace) {
bucket_set.insert(bucket);
}
let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
for bucket in &bucket_set {
let path = djogi::migrate::snapshot_path(workspace, bucket);
match djogi::migrate::load_snapshot(&path) {
Ok(s) => {
snapshots.insert(bucket.clone(), s);
}
Err(djogi::migrate::SnapshotError::Io { source, .. })
if source.kind() == std::io::ErrorKind::NotFound =>
{
}
Err(e) => {
eprintln!(
"djogi migrations compose: snapshot load failed at {}: {e}",
path.display()
);
return ExitCode::from(1);
}
}
}
let req = ComposeRequest {
workspace_root: workspace,
models,
snapshots: &snapshots,
apps,
name,
allow_destructive,
force_overwrite,
now,
_guard: &guard,
pk_flip_join_table_option,
skip_phase_zero_auto_emit: false,
};
match compose(req) {
Ok(report) => {
for emit in &report.emitted_phase_zero {
let ext_summary = if emit.extensions.is_empty() {
"no extensions".to_string()
} else {
format!(
"extensions: {}",
emit.extensions
.iter()
.cloned()
.collect::<Vec<_>>()
.join(", ")
)
};
println!(
"auto-emitted Phase 0 bootstrap: {database}/_global_ ({ext_summary})",
database = emit.database,
);
}
for cb in &report.composed_buckets {
println!(
"composed {database}/{app}: {version} ({classification:?})",
database = cb.bucket.database,
app = if cb.bucket.app.is_empty() {
"_global_"
} else {
cb.bucket.app.as_str()
},
version = cb.version,
classification = cb.classification,
);
}
ExitCode::from(0)
}
Err(ComposeError::NothingToCompose) => {
println!("nothing to compose — model state matches snapshot for every bucket");
ExitCode::from(0)
}
Err(e) => {
eprintln!("djogi migrations compose: {e}");
ExitCode::from(1)
}
}
}
pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations status: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async { run_status(&workspace).await });
ExitCode::from(exit as u8)
}
async fn run_status(workspace: &Path) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations status: config load: {e}");
return 1;
}
};
let mut ctx = match connect_and_check(&config.database.url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations status", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations status: pool: {msg}");
return 1;
}
};
let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
Ok(rows) => rows,
Err(e) => {
if e.to_string().contains("djogi_schema_migrations") {
println!("No migrations recorded.");
return 0;
}
eprintln!("djogi migrations status: ledger read: {e}");
return 1;
}
};
let registered: Vec<String> = AppRegistry::all()
.iter()
.map(|d| d.label.to_string())
.collect();
let report = djogi::migrate::render_status(&rows, ®istered);
for line in &report.lines {
println!("{line}");
}
report.exit_code
}
#[allow(clippy::large_enum_variant)]
enum ContextOutcome {
Ready(djogi::context::DjogiContext),
UnsupportedVersion(djogi::error::DjogiError),
RuntimeError(String),
}
async fn connect_and_check(url: &str) -> ContextOutcome {
let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
Ok(p) => p,
Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
};
match djogi::pg::preflight::check_postgres_version(&pool).await {
Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
ContextOutcome::UnsupportedVersion(e)
}
Err(other) => ContextOutcome::RuntimeError(other.to_string()),
}
}
fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
return Some(db_config.url.clone());
}
if database == "crud_log"
&& let Some(u) = db_config.crud_log_url.as_deref()
&& !u.is_empty()
{
return Some(u.to_string());
}
if database == "event_log"
&& let Some(u) = db_config.event_log_url.as_deref()
&& !u.is_empty()
{
return Some(u.to_string());
}
djogi::migrate::derive_per_database_url(&db_config.url, database)
}
pub fn apply_cmd(workspace: Option<PathBuf>, fake: bool, reason: Option<String>) -> ExitCode {
let workspace = resolve_workspace(workspace);
let mode = if fake {
match reason {
Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
Some(_) => {
eprintln!(
"djogi migrations apply --fake: --reason must not be empty; \
supply a non-empty reason why these migrations are being \
faked (e.g. 'schema pre-exists from prior tooling')"
);
return ExitCode::from(2);
}
None => {
eprintln!(
"djogi migrations apply --fake: --reason is required; \
supply a reason why these migrations are being faked \
(e.g. 'schema pre-exists from prior tooling'). \
This is recorded in the ledger audit trail."
);
return ExitCode::from(2);
}
}
} else {
FakeMode::Real
};
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations apply: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async { run_apply(&workspace, &mode).await });
ExitCode::from(exit as u8)
}
#[derive(Debug, Clone)]
enum FakeMode {
Real,
Fake { reason: String },
}
async fn run_apply(workspace: &Path, mode: &FakeMode) -> i32 {
use djogi::config::DjogiConfig;
let action_verb = match mode {
FakeMode::Real => "apply",
FakeMode::Fake { .. } => "fake-apply",
};
let progress_verb = match mode {
FakeMode::Real => "applying",
FakeMode::Fake { .. } => "faking",
};
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations {action_verb}: config load: {e}");
return 2;
}
};
let pool = match djogi::pg::pool::DjogiPool::connect(&config.database.url).await {
Ok(p) => p,
Err(e) => {
eprintln!("djogi migrations {action_verb}: pool connect: {e}");
return 1;
}
};
if let Err(e) = djogi::pg::preflight::check_postgres_version(&pool).await {
crate::print_support_boundary_error("migrations apply", &e);
return 2;
}
let pending_files = discover_pending_plans(workspace);
if pending_files.is_empty() {
println!("No pending migrations to {action_verb}.");
return 0;
}
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
return 1;
}
};
let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
Err(_) => None,
};
let mut ctx = djogi::context::DjogiContext::from_pool(pool);
for (pending_path, bucket_database, app_label) in &pending_files {
println!(" {progress_verb} {bucket_database}/{app_label}...");
let result = apply_one_pending(
&mut ctx,
workspace,
pending_path,
bucket_database.clone(),
app_label.clone(),
&config,
&guard,
audit_pool.as_ref(),
mode,
)
.await;
match result {
ApplyResult::Ok => match mode {
FakeMode::Real => {
println!("Applied: {bucket_database}/{app_label}");
}
FakeMode::Fake { .. } => {
println!(
" faked {bucket_database}/{app_label}: \
recorded in ledger with status = 'faked' (no SQL executed)"
);
}
},
ApplyResult::Skipped(reason) => {
println!("Skipped {bucket_database}/{app_label}: {reason}");
}
ApplyResult::Refused(reason) => {
eprintln!(
"djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
);
return 2;
}
ApplyResult::RunnerError(e) => {
eprintln!(
"djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
);
return runner_error_exit_code(&e);
}
}
}
let summary_verb = match mode {
FakeMode::Real => "applied",
FakeMode::Fake { .. } => "faked",
};
println!("{summary_verb} {} migration(s).", pending_files.len());
0
}
#[derive(Debug)]
enum ApplyResult {
Ok,
Skipped(String),
Refused(String),
RunnerError(RunnerError),
}
fn discover_pending_plans(workspace: &Path) -> Vec<(PathBuf, String, String)> {
let pending_root = djogi::migrate::pending_root(workspace);
let mut out = Vec::new();
let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
return out;
};
for db_entry in db_entries.flatten() {
let db_name = match db_entry.file_name().to_str().map(str::to_string) {
Some(n) => n,
None => continue,
};
if db_name.starts_with('.') {
continue;
}
let db_dir = db_entry.path();
if !db_dir.is_dir() {
continue;
}
let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
continue;
};
for app_entry in app_entries.flatten() {
let path = app_entry.path();
if !path.is_file() {
continue;
}
let filename = match path.file_name().and_then(|f| f.to_str()) {
Some(f) => f,
None => continue,
};
if !filename.ends_with(".json") {
continue;
}
let app_label = if let Some(stripped) = filename.strip_suffix(".json") {
stripped.to_string()
} else {
continue;
};
out.push((path, db_name.clone(), app_label));
}
}
out.sort_by(|a, b| a.0.cmp(&b.0));
out
}
#[allow(clippy::too_many_arguments)]
#[djogi::deliberately_bypass_convention_with_raw_sql]
async fn apply_one_pending(
ctx: &mut djogi::context::DjogiContext,
workspace: &Path,
pending_path: &Path,
bucket_database: String,
app_label: String,
config: &djogi::config::DjogiConfig,
guard: &djogi::migrate::WorkspaceGuard,
audit_pool: Option<&deadpool_postgres::Pool>,
mode: &FakeMode,
) -> ApplyResult {
let pending_bytes = match std::fs::read(pending_path) {
Ok(b) => b,
Err(e) => {
return ApplyResult::Refused(format!("read pending JSON: {e}"));
}
};
let pending: PendingPlan = match serde_json::from_slice(&pending_bytes) {
Ok(p) => p,
Err(e) => {
return ApplyResult::Refused(format!("parse pending JSON: {e}"));
}
};
let resolved_app = if app_label == "_global_" {
String::new()
} else {
app_label.clone()
};
let bucket = djogi::migrate::BucketKey {
database: bucket_database,
app: resolved_app,
};
match check_ledger_state(ctx, &pending.version).await {
LedgerState::NotPresent => {}
LedgerState::AlreadyApplied => {
return ApplyResult::Skipped("already applied".to_string());
}
LedgerState::PendingOrPartial(existing_status) => {
if existing_status == LedgerStatus::Failed
|| existing_status == LedgerStatus::RolledBack
{
if let Err(e) = delete_failed_ledger_row(ctx, &pending.version).await {
return ApplyResult::Refused(format!(
"clean {} ledger row: {e}",
existing_status.as_db_str()
));
}
} else {
return ApplyResult::Refused(format!(
"version already in {} state — resolve before re-applying",
existing_status.as_db_str()
));
}
}
}
let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
workspace,
&bucket,
&pending.version,
&pending.checksum_up,
pending.checksum_down.as_deref(),
) {
Ok(result) => result,
Err(e) => {
return ApplyResult::Refused(format!("load replay plan: {e}"));
}
};
let runner_ctx = RunnerCtx {
bucket: bucket.clone(),
version: pending.version.clone(),
description: pending.slug.clone(),
checksum_up,
checksum_down,
snapshot: Some(pending.model_snapshot.clone()),
snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
config: djogi::config::MigrateConfig {
concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
},
out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
audit_pool: audit_pool.cloned(),
};
let runner_result = match mode {
FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
};
match runner_result {
Ok(_) => ApplyResult::Ok,
Err(e) => ApplyResult::RunnerError(e),
}
}
#[derive(Debug)]
enum LedgerState {
NotPresent,
AlreadyApplied,
PendingOrPartial(LedgerStatus),
}
async fn check_ledger_state(ctx: &mut djogi::context::DjogiContext, version: &str) -> LedgerState {
let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
return LedgerState::NotPresent;
};
let existing = rows.iter().find(|r| r.version == version);
match existing {
None => LedgerState::NotPresent,
Some(row) => match row.status {
LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
LedgerState::AlreadyApplied
}
LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
LedgerState::PendingOrPartial(row.status)
}
},
}
}
fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1
}
#[djogi::deliberately_bypass_convention_with_raw_sql]
async fn delete_failed_ledger_row(
ctx: &mut djogi::context::DjogiContext,
version: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
ctx.raw_execute(
"DELETE FROM djogi_schema_migrations WHERE version = $1",
&[&version],
)
.await?;
Ok(())
}
fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
let migrations_root = djogi::migrate::migrations_root(workspace);
migrations_root
.join(&bucket.database)
.join(djogi::migrate::app_dirname(&bucket.app))
.join("schema_snapshot.json")
}
#[allow(clippy::too_many_arguments)]
pub fn attune_cmd(
target: Option<&str>,
apply: bool,
record: bool,
record_ledger: bool,
record_reason: &str,
squash: bool,
from: Option<&str>,
publish: bool,
app: Option<&str>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let mode = match (record_ledger, squash) {
(false, false) => AttuneMode::DiffOnly,
(true, false) => AttuneMode::Record {
reason: record_reason.to_string(),
},
(false, true) => match from {
Some(v) if !v.is_empty() => AttuneMode::Squash {
from: v.to_string(),
publish,
app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
},
_ => {
eprintln!(
"djogi migrations attune --squash requires --from <version> (e.g. \
`--from V20260101000000__init`)"
);
return ExitCode::from(2);
}
},
(true, true) => {
eprintln!(
"djogi migrations attune: --record-ledger and --squash are mutually exclusive"
);
return ExitCode::from(2);
}
};
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations attune: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let target_owned = target.map(str::to_string);
let exit =
runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
ExitCode::from(exit as u8)
}
async fn run_attune(
workspace: &Path,
mode: AttuneMode,
target: Option<String>,
apply: bool,
record: bool,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations attune: config load: {e}");
return 1;
}
};
let mut ctx = match connect_and_check(&config.database.url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations attune", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations attune: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
return 1;
}
};
let req = AttuneRequest {
workspace_root: workspace,
database_url: &config.database.url,
profile: &config.profile,
dev_mode: config.database.dev_mode,
target: target.as_deref(),
apply,
record,
mode,
_guard: &guard,
};
match attune(&mut ctx, req).await {
Ok(report) => {
if report.entries.is_empty() {
println!("attune: no drift");
} else {
for entry in &report.entries {
let app_display = if entry.bucket.app.is_empty() {
"_global_"
} else {
entry.bucket.app.as_str()
};
println!(
" {kind:<10} {database}/{app} {version}",
kind = entry.kind.as_str(),
database = entry.bucket.database,
app = app_display,
version = entry.version,
);
}
}
for diag in &report.diagnostics {
println!(" diagnostic: {diag}");
}
if let Some(sha) = &report.resolved_target {
println!("resolved target: {sha}");
}
if let Some(squashed) = &report.squashed_to {
println!("squashed to: {squashed}");
}
if report.published {
println!("published to remote");
}
if report.parent_pointer_updated {
println!("parent submodule pointer updated");
}
0
}
Err(e) => {
eprintln!("djogi migrations attune: {e}");
attune_error_exit_code(&e)
}
}
}
fn attune_error_exit_code(err: &AttuneError) -> i32 {
match err {
AttuneError::Refused(_) => 2,
AttuneError::FilesystemScanFailed { .. }
| AttuneError::LedgerQueryFailed { .. }
| AttuneError::SqlReadFailed { .. }
| AttuneError::SqlWriteFailed { .. }
| AttuneError::SqlDeleteFailed { .. }
| AttuneError::GitPublishFailed { .. }
| AttuneError::GitTargetResolveFailed { .. }
| AttuneError::GitFetchFailed { .. }
| AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
}
}
pub fn verify_cmd(workspace: Option<PathBuf>, strict: bool) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations verify: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async { run_verify(&workspace, strict).await });
ExitCode::from(exit as u8)
}
async fn run_verify(workspace: &Path, strict: bool) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations verify: config load: {e}");
return 1;
}
};
let models = match project_from_inventory() {
Ok(m) => m,
Err(e) => {
eprintln!("djogi migrations verify: projection error: {e}");
return 1;
}
};
let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
models.keys().cloned().collect();
for bucket in discover_snapshot_buckets_on_disk(workspace) {
bucket_set.insert(bucket);
}
if bucket_set.is_empty() {
println!("No registered apps found for verification.");
return 0;
}
let policy = djogi::config::PolicyConfig {
strict_out_of_order: strict,
};
let database_has_models: std::collections::HashSet<String> = bucket_set
.iter()
.filter(|b| {
models
.get(*b)
.map(|s| !s.models.is_empty())
.unwrap_or(false)
})
.map(|b| b.database.clone())
.collect();
let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
std::collections::BTreeMap::new();
let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
let mut exit_code: i32 = 0;
for bucket in &bucket_set {
let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
let bd = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
eprintln!(
"djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
check that config.database.url has a valid path component",
bucket.database, bucket.database, bd
);
exit_code = 1;
continue;
};
if !contexts.contains_key(&bucket.database) {
match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => {
contexts.insert(bucket.database.clone(), ctx);
}
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations verify", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!(
"djogi migrations verify: pool for '{}': {msg}",
bucket.database
);
exit_code = 1;
continue;
}
}
}
let snap_path = snapshot_path(workspace, bucket);
let snapshot = match load_snapshot(&snap_path) {
Ok(s) => s,
Err(SnapshotError::Io { source, .. })
if source.kind() == std::io::ErrorKind::NotFound =>
{
let bd = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
let has_models = models
.get(bucket)
.map(|s| !s.models.is_empty())
.unwrap_or(false);
if has_models {
eprintln!(
"djogi migrations verify: {}/{} has registered models but no \
snapshot; run `djogi migrations compose` then \
`djogi migrations apply` to record a baseline",
bucket.database, bd
);
exit_code = 1;
} else {
println!("No snapshot found for bucket {}/{}", bucket.database, bd);
}
continue;
}
Err(e) => {
let bd = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
eprintln!(
"djogi migrations verify: load snapshot for {}/{}: {e}",
bucket.database, bd
);
exit_code = 1;
continue;
}
};
let db_has_models = database_has_models.contains(&bucket.database);
let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
let ctx = contexts
.get_mut(&bucket.database)
.expect("context inserted above");
let report = match djogi::migrate::verify_bucket(
ctx,
bucket,
&snapshot,
&policy,
emit_ledger,
db_has_models,
)
.await
{
Ok(r) => r,
Err(e) => {
let bd = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
eprintln!(
"djogi migrations verify: error for {}/{}: {e}",
bucket.database, bd
);
exit_code = 1;
continue;
}
};
for line in render_verify_report(&report, bucket) {
println!("{line}");
}
if report.has_errors() {
exit_code = 1;
}
}
exit_code
}
fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
let mut lines: Vec<String> = Vec::new();
let app_display = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
lines.push(format!(
"djogi migrations verify — {}/{}",
bucket.database, app_display
));
lines.push("──────────────────────────────────────────".to_string());
match (
&report.latest_applied_version,
report.applied_count,
report.unfinished_count,
) {
(Some(version), applied, 0) => {
lines.push(format!("Ledger: {applied} applied, latest {version}"));
}
(Some(version), applied, unfinished) => {
lines.push(format!(
"Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
));
}
(None, 0, 0) => {
lines.push("Ledger: empty (no migrations applied yet)".to_string());
}
_ => {}
}
lines.push(String::new());
if report.diagnostics.is_empty() {
lines.push("No drift detected. Schema is consistent.".to_string());
} else {
for d in &report.diagnostics {
let severity = match d.severity {
VerifySeverity::Info => "INFO",
VerifySeverity::Warning => "WARN",
VerifySeverity::Error => "ERROR",
};
let location = d.location.as_deref().unwrap_or("-");
lines.push(format!(
"[{severity}] {code} ({loc}): {msg}",
severity = severity,
code = d.code,
loc = location,
msg = d.message
));
}
}
let errors = report
.diagnostics
.iter()
.filter(|d| d.severity == VerifySeverity::Error)
.count();
let warnings = report
.diagnostics
.iter()
.filter(|d| d.severity == VerifySeverity::Warning)
.count();
let infos = report
.diagnostics
.iter()
.filter(|d| d.severity == VerifySeverity::Info)
.count();
if errors > 0 {
lines.push(String::new());
lines.push(format!(
"Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
));
} else if warnings > 0 {
lines.push(String::new());
lines.push(format!(
"Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
));
} else {
lines.push(String::new());
lines.push(format!("Result: PASSED ({infos} info(s))"));
}
lines
}
impl From<PartialApplyResolutionCli> for PartialApplyResolution {
fn from(cli: PartialApplyResolutionCli) -> Self {
match cli {
PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
PartialApplyResolutionCli::Faked => Self::MarkFaked,
PartialApplyResolutionCli::Applied => Self::MarkApplied,
}
}
}
pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
match command {
RepairSubcommand::ChecksumDrift {
version,
app,
database,
checksum_up,
checksum_down,
workspace,
} => repair_checksum_drift_cmd(
&version,
app.as_deref(),
database.as_deref(),
checksum_up.as_deref(),
checksum_down.as_deref(),
workspace,
),
RepairSubcommand::PartialApply {
version,
resolution,
note,
app,
database,
workspace,
} => repair_partial_apply_cmd(
&version,
resolution.into(),
¬e,
app.as_deref(),
database.as_deref(),
workspace,
),
RepairSubcommand::ResumePartial {
version,
app,
database,
workspace,
} => repair_resume_partial_apply_cmd(
&version,
app.as_deref(),
database.as_deref(),
workspace,
),
RepairSubcommand::SnapshotRebuild {
app,
database,
snapshot_path,
workspace,
} => repair_snapshot_rebuild_cmd(
app.as_deref(),
database.as_deref(),
snapshot_path.as_deref(),
workspace,
),
}
}
fn render_repair_report(report: &RepairReport) {
for action in &report.actions_taken {
println!(" {action}");
}
if !report.ledger_changes.is_empty() {
println!("Ledger changes:");
for lc in &report.ledger_changes {
println!(
" {} | {} | {} -> {}",
lc.version, lc.column, lc.before, lc.after,
);
}
}
if !report.snapshot_changes.is_empty() {
println!("Snapshot changes:");
for sc in &report.snapshot_changes {
println!(" {} | {}", sc.path.display(), sc.description);
}
}
}
fn repair_error_exit_code(err: &RepairError) -> i32 {
match err {
RepairError::LedgerIo { .. } | RepairError::SnapshotIo { .. } | RepairError::AdvisoryLockFailed { .. } | RepairError::AdvisoryLockQueryFailed { .. } | RepairError::PinnedSessionCheckoutFailed { .. } | RepairError::ResumeStepFailed { .. } | RepairError::ResumeProgressAckFailed { .. } => 1,
RepairError::VersionNotFound { .. }
| RepairError::InsufficientConfirmation
| RepairError::InvalidChecksum { .. }
| RepairError::InvalidResolution { .. }
| RepairError::BucketAppMismatch { .. }
| RepairError::PlanVersionMismatch { .. }
| RepairError::PlanChecksumMismatch { .. }
| RepairError::LeafIdentityMismatch { .. }
| RepairError::NothingToResume { .. }
| RepairError::ResumeBlockedByNonTxProgressClaim { .. }
| RepairError::SuppliedSnapshotDiverges { .. }
| RepairError::AdvisoryUnlockReturnedFalse { .. } | RepairError::ResumePlanShapeMismatch { .. }
| RepairError::ReplayPlanShapeMismatch { .. }
=> 2,
}
}
fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
database.unwrap_or("main").to_string()
}
fn compute_checksum_up_from_disk(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
) -> std::io::Result<String> {
let path =
djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
let sql = std::fs::read_to_string(&path)?;
Ok(djogi::migrate::compute_committed_sql_checksum(
&sql,
djogi::migrate::ResetSqlSide::Up,
))
}
fn compute_checksum_down_from_disk(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
) -> std::io::Result<Option<String>> {
let path =
djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
let sql = match std::fs::read_to_string(&path) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e),
};
Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
}
pub fn repair_checksum_drift_cmd(
version: &str,
app: Option<&str>,
database: Option<&str>,
checksum_up: Option<&str>,
checksum_down: Option<&str>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_repair_checksum_drift(
&workspace,
version,
app,
database,
checksum_up,
checksum_down,
)
.await
});
ExitCode::from(exit as u8)
}
async fn run_repair_checksum_drift(
workspace: &Path,
version: &str,
app: Option<&str>,
database: Option<&str>,
checksum_up: Option<&str>,
checksum_down: Option<&str>,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: config load: {e}");
return 1;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!(
"djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
);
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations repair checksum-drift", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
let new_checksum_up = match checksum_up {
Some(c) => c.to_string(),
None => {
match compute_checksum_up_from_disk(workspace, &bucket, version) {
Ok(cs) => cs,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
return 1;
}
}
}
};
let resolved_checksum_down = match checksum_down {
Some(c) => Some(c.to_string()),
None => {
match compute_checksum_down_from_disk(workspace, &bucket, version) {
Ok(cs_opt) => cs_opt,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
return 1;
}
}
}
};
match repair_checksum_drift(
&mut ctx,
&guard,
&bucket,
version,
&new_checksum_up,
resolved_checksum_down.as_deref(),
RepairConfirmation::OperatorAcknowledged,
)
.await
{
Ok(report) => {
render_repair_report(&report);
0
}
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: {e}");
repair_error_exit_code(&e)
}
}
}
pub fn repair_partial_apply_cmd(
version: &str,
resolution: PartialApplyResolution,
note: &str,
app: Option<&str>,
database: Option<&str>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
});
ExitCode::from(exit as u8)
}
async fn run_repair_partial_apply(
workspace: &Path,
version: &str,
resolution: PartialApplyResolution,
note: &str,
app: Option<&str>,
database: Option<&str>,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations repair partial-apply: config load: {e}");
return 1;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!(
"djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
);
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations repair partial-apply", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations repair partial-apply: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
match repair_partial_apply(
&mut ctx,
&guard,
&bucket,
version,
resolution,
note,
RepairConfirmation::OperatorAcknowledged,
)
.await
{
Ok(report) => {
render_repair_report(&report);
0
}
Err(e) => {
eprintln!("djogi migrations repair partial-apply: {e}");
repair_error_exit_code(&e)
}
}
}
pub fn repair_resume_partial_apply_cmd(
version: &str,
app: Option<&str>,
database: Option<&str>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime
.block_on(async { run_repair_resume_partial(&workspace, version, app, database).await });
ExitCode::from(exit as u8)
}
async fn run_repair_resume_partial(
workspace: &Path,
version: &str,
app: Option<&str>,
database: Option<&str>,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations repair resume-partial: config load: {e}");
return 1;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!(
"djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
);
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations repair resume-partial", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations repair resume-partial: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
Ok(p) => p,
Err(e) => {
eprintln!("djogi migrations repair resume-partial: load plan: {e}");
return 2;
}
};
match repair_resume_partial_apply(
&mut ctx,
&guard,
version,
&plan,
RepairConfirmation::OperatorAcknowledged,
)
.await
{
Ok(report) => {
render_repair_report(&report);
0
}
Err(e) => {
eprintln!("djogi migrations repair resume-partial: {e}");
repair_error_exit_code(&e)
}
}
}
fn load_committed_plan_for_resume(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
) -> Result<djogi::migrate::MigrationPlan, String> {
let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
let plan_path = bucket_dir.join(format!("{version}.plan.json"));
let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
let stored: CliReplayPlan = serde_json::from_slice(&bytes)
.map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
return Err(format!(
"{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
plan_path.display(),
stored.format_version,
));
}
Ok(djogi::migrate::MigrationPlan {
bucket: bucket.clone(),
classification: stored.classification.into(),
segments: stored
.segments
.into_iter()
.map(|seg| djogi::migrate::Segment {
kind: seg.kind.into(),
statements: seg
.statements
.into_iter()
.map(|stmt| djogi::migrate::OperationSql {
label: stmt.label,
up: stmt.up,
down: String::new(),
lossy: None,
})
.collect(),
})
.collect(),
})
}
pub fn repair_snapshot_rebuild_cmd(
app: Option<&str>,
database: Option<&str>,
snapshot_path: Option<&Path>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
});
ExitCode::from(exit as u8)
}
async fn run_repair_snapshot_rebuild(
workspace: &Path,
app: Option<&str>,
database: Option<&str>,
snapshot_path: Option<&Path>,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
return 1;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!(
"djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
);
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
let snap_path = match snapshot_path {
Some(p) => p.to_path_buf(),
None => reconstruct_snapshot_path(workspace, &bucket),
};
match repair_snapshot_rebuild(
&mut ctx,
&guard,
&bucket,
&snap_path,
RepairConfirmation::OperatorAcknowledged,
)
.await
{
Ok(report) => {
render_repair_report(&report);
0
}
Err(e) => {
eprintln!("djogi migrations repair snapshot-rebuild: {e}");
repair_error_exit_code(&e)
}
}
}
pub fn baseline_cmd(
version: &str,
description: &str,
reason: &str,
app: Option<&str>,
database: Option<&str>,
workspace: Option<PathBuf>,
) -> ExitCode {
if reason.trim().is_empty() {
eprintln!(
"djogi migrations baseline: --reason must not be empty; \
supply a non-empty reason why this baseline is being established \
(e.g. 'schema pre-exists from prior tooling'). \
This is recorded in the ledger audit trail."
);
return ExitCode::from(2);
}
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations baseline: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_baseline(&workspace, version, description, reason, app, database).await
});
ExitCode::from(exit as u8)
}
async fn run_baseline(
workspace: &Path,
version: &str,
description: &str,
reason: &str,
app: Option<&str>,
database: Option<&str>,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations baseline: config load: {e}");
return 1;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations baseline", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations baseline: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations baseline: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
let runner_ctx = RunnerCtx {
bucket: bucket.clone(),
version: version.to_string(),
description: description.to_string(),
checksum_up: String::new(),
checksum_down: None,
snapshot: None,
snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
config: djogi::config::MigrateConfig {
concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
},
out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
audit_pool: match djogi::migrate::resolve_audit_url(&config) {
Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
Err(_) => None,
},
};
match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
Ok(report) => {
println!(
"djogi migrations baseline: established baseline `{}` \
(ledger_id={}) in {:.1}s",
version,
report.ledger_id,
report.execution_time_ms as f64 / 1000.0
);
0
}
Err(e) => {
eprintln!("djogi migrations baseline: {e}");
baseline_error_exit_code(&e)
}
}
}
fn baseline_error_exit_code(err: &RunnerError) -> i32 {
match err {
RunnerError::VersionAlreadyApplied { .. }
| RunnerError::VersionCollisionNonTerminal { .. }
| RunnerError::BaselineSnapshotShouldNotBeProvided
| RunnerError::AdvisoryUnlockReturnedFalse { .. }
| RunnerError::SnapshotPersistFailed { .. }
| RunnerError::OutOfOrderRejected { .. } => 2,
_ => 1,
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
fn temp_workspace(tag: &str) -> std::path::PathBuf {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
fs::create_dir_all(&p).unwrap();
p
}
#[test]
fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
let work = temp_workspace("b1_discover");
let billing_dir = work.join("migrations/main/billing");
fs::create_dir_all(&billing_dir).unwrap();
fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
let global_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&global_dir).unwrap();
fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
let no_snap_dir = work.join("migrations/main/empty_app");
fs::create_dir_all(&no_snap_dir).unwrap();
let buckets = discover_snapshot_buckets_on_disk(&work);
let labels: std::collections::BTreeSet<&str> =
buckets.iter().map(|b| b.app.as_str()).collect();
assert!(
labels.contains("billing"),
"must include the renamed-from bucket: {labels:?}"
);
assert!(
labels.contains(""),
"must include the global bucket: {labels:?}"
);
assert!(
!labels.contains("empty_app"),
"must not include directories without a snapshot: {labels:?}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
let work = temp_workspace("a1_workspace_config");
let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
max_connections = 1\ndev_mode = false\n\
[server]\nhost = \"127.0.0.1\"\nport = 1234\n";
fs::write(work.join("Djogi.toml"), toml).unwrap();
let prior = std::env::var("DATABASE_URL").ok();
unsafe { std::env::remove_var("DATABASE_URL") };
let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
assert_eq!(
config.database.url,
"postgres://discovered-by-workspace-flag/test"
);
assert_eq!(config.server.port, 1234);
if let Some(v) = prior {
unsafe { std::env::set_var("DATABASE_URL", v) };
}
let _ = fs::remove_dir_all(&work);
}
#[test]
fn a1_round2_env_override_beats_workspace_toml() {
let work = temp_workspace("a1r2_env_override");
let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
max_connections = 1\ndev_mode = false\n\
[server]\nhost = \"127.0.0.1\"\nport = 1234\n";
fs::write(work.join("Djogi.toml"), toml).unwrap();
let prior = std::env::var("DATABASE_URL").ok();
unsafe { std::env::set_var("DATABASE_URL", "postgres://from-env/test") };
let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
assert_eq!(
config.database.url, "postgres://from-env/test",
"env DATABASE_URL must win over workspace Djogi.toml"
);
match prior {
Some(v) => unsafe { std::env::set_var("DATABASE_URL", v) },
None => unsafe { std::env::remove_var("DATABASE_URL") },
}
let _ = fs::remove_dir_all(&work);
}
#[test]
fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
use djogi::migrate::projection::BucketKey;
use djogi::migrate::schema::{
ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
};
use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
use std::collections::BTreeMap;
let work = temp_workspace("b1r2_compose_uses_discovery");
let billing_bucket = BucketKey {
database: "main".into(),
app: "billing".into(),
};
let mut billing_snap = AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: BTreeMap::new(),
format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-04-25T00:00:00Z".to_string(),
indexes: Vec::new(),
models: BTreeMap::new(),
registered_apps: vec!["billing".to_string()],
};
billing_snap.models.insert(
"widgets".to_string(),
TableSchema {
app: Some("billing".to_string()),
columns: vec![ColumnSchema {
check: None,
comment: None,
default_sql: Some("heerid_next_desc()".to_string()),
foreign_key: None,
generated: None,
identity: None,
index_type: None,
indexed: false,
max_length: None,
name: "id".to_string(),
nullable: false,
on_delete: None,
outbox_exclude: false,
rationale: None,
relation_kind: None,
renamed_from: None,
sequence_within: None,
sql_type: "BIGINT".to_string(),
unique: false,
type_change_using: None,
}],
exclusion_constraints: Vec::new(),
fts: None,
is_through: false,
moved_from_app: None,
partition: None,
primary_key: PrimaryKeySchema {
columns: vec!["id".to_string()],
kind: PkKindSchema::HeerIdRecencyBiased,
},
rationale: None,
renamed_from: None,
rls_enabled: false,
table: "widgets".to_string(),
table_comment: None,
storage_params: None,
tablespace: None,
tenant_key: None,
},
);
let snap_path = snapshot_path(&work, &billing_bucket);
save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
let exit = compose_with_inputs(
&work,
"drop billing remnant",
true, false, &empty_models,
&[],
now,
None, );
assert_eq!(exit, ExitCode::from(0), "compose must succeed");
let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
let mut up_path: Option<PathBuf> = None;
for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
let n = entry.file_name().to_string_lossy().to_string();
if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
up_path = Some(entry.path());
break;
}
}
let up_path = up_path.expect("compose must have written an up SQL file");
let up_sql = fs::read_to_string(&up_path).unwrap();
assert!(
up_sql.contains("DROP TABLE \"widgets\""),
"compose must have seen the disk snapshot and emitted DROP TABLE — \
this proves discover_snapshot_buckets_on_disk reached the differ. \
SQL: {up_sql}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn a1_round2_status_cmd_threads_workspace_to_config() {
let work = temp_workspace("a1r2_status_workspace");
fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
let exit = status_cmd(Some(work.clone()));
assert_eq!(
exit,
ExitCode::from(1),
"malformed workspace Djogi.toml must surface as config load error"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn u3_attune_refusal_variants_map_to_exit_code_two() {
use djogi::migrate::AttuneRefusal;
let cases = [
AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
database_url: "postgres://prod.example.com/main".to_string(),
}),
AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
profile: "production".to_string(),
}),
AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
env_value: "production".to_string(),
}),
AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
version: "V20260101000000__missing".to_string(),
}),
AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
version: "V20260101000000__shared".to_string(),
buckets: vec!["main/users".to_string(), "main/billing".to_string()],
}),
];
for err in &cases {
assert_eq!(
attune_error_exit_code(err),
2,
"refusal variant must map to exit 2: {err}"
);
}
}
#[test]
fn u3_attune_runtime_variants_map_to_exit_code_one() {
let cases = [
AttuneError::FilesystemScanFailed {
source: std::io::Error::other("disk full"),
},
AttuneError::SqlReadFailed {
path: PathBuf::from("/tmp/x.sdjql"),
source: std::io::Error::other("permission denied"),
},
AttuneError::SqlWriteFailed {
path: PathBuf::from("/tmp/x.sdjql"),
source: std::io::Error::other("read-only fs"),
},
AttuneError::SqlDeleteFailed {
path: PathBuf::from("/tmp/x.sdjql"),
source: std::io::Error::other("not found"),
},
AttuneError::GitPublishFailed {
stderr: "fatal: refusing to push".to_string(),
status_code: Some(128),
},
];
for err in &cases {
assert_eq!(
attune_error_exit_code(err),
1,
"runtime variant must map to exit 1: {err}"
);
}
}
#[test]
fn baseline_empty_reason_exits_code_2() {
let result = baseline_cmd(
"V00000000000000__baseline",
"description",
"",
None,
None,
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
);
assert_eq!(
result,
ExitCode::from(2),
"empty --reason must exit 2 before any DB work"
);
}
#[test]
fn baseline_whitespace_reason_exits_code_2() {
let result = baseline_cmd(
"V00000000000000__baseline",
"description",
" ",
None,
None,
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
);
assert_eq!(
result,
ExitCode::from(2),
"whitespace-only --reason must exit 2 before any DB work"
);
}
#[test]
fn baseline_refusal_variants_map_to_exit_code_two() {
let cases = [
RunnerError::VersionAlreadyApplied {
version: "V00000000000000__baseline".to_string(),
applied_at: None,
},
RunnerError::VersionCollisionNonTerminal {
version: "V00000000000000__baseline".to_string(),
status: LedgerStatus::Pending,
run_id: 1,
},
RunnerError::BaselineSnapshotShouldNotBeProvided,
RunnerError::AdvisoryUnlockReturnedFalse {
bucket: BucketKey {
database: "main".to_string(),
app: String::new(),
},
key: 0x0102_0304_0506_0708,
},
RunnerError::OutOfOrderRejected {
version: "V00000000000000__baseline".to_string(),
conflicting_version: "V20260101000000__later".to_string(),
conflicting_applied_at: None,
},
];
for err in &cases {
assert_eq!(
baseline_error_exit_code(err),
2,
"baseline refusal variant must map to exit 2: {err}"
);
}
}
#[test]
fn baseline_transient_variants_map_to_exit_code_one() {
use djogi::error::{DbError, DjogiError};
let cases = [
RunnerError::LedgerBootstrapFailed {
source: DjogiError::Db(DbError::other("create table failed")),
},
RunnerError::LedgerWriteFailed {
version: "V00000000000000__baseline".to_string(),
source: DjogiError::Db(DbError::other("insert failed")),
},
RunnerError::PinnedSessionCheckoutFailed {
source: DjogiError::Db(DbError::other("pool exhausted")),
},
RunnerError::AdvisoryLockFailed {
bucket: BucketKey {
database: "main".to_string(),
app: String::new(),
},
key: 0x0102_0304_0506_0708,
attempts: 3,
},
];
for err in &cases {
assert_eq!(
baseline_error_exit_code(err),
1,
"baseline transient variant must map to exit 1: {err}"
);
}
}
#[test]
fn fake_without_reason_exits_code_2() {
let result = apply_cmd(
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
true,
None,
);
assert_eq!(
result,
ExitCode::from(2),
"--fake without --reason must exit 2"
);
}
#[test]
fn fake_with_empty_reason_exits_code_2() {
let result = apply_cmd(
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
true,
Some(String::new()),
);
assert_eq!(
result,
ExitCode::from(2),
"--fake with empty reason must exit 2"
);
}
#[test]
fn fake_with_whitespace_reason_exits_code_2() {
let result = apply_cmd(
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
true,
Some(" ".to_string()),
);
assert_eq!(
result,
ExitCode::from(2),
"--fake with whitespace reason must exit 2"
);
}
#[test]
fn reason_without_fake_is_accepted() {
let result = apply_cmd(
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
false, Some("test reason".to_string()),
);
assert_ne!(
result,
ExitCode::from(2),
"--reason without --fake should not refuse"
);
}
fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
djogi::migrate::BucketKey {
database: database.to_string(),
app: app.to_string(),
}
}
fn diag(
code: &str,
severity: djogi::migrate::VerifySeverity,
message: &str,
location: Option<&str>,
) -> djogi::migrate::VerifyDiagnostic {
djogi::migrate::VerifyDiagnostic {
code: code.to_string(),
severity,
message: message.to_string(),
location: location.map(str::to_string),
}
}
#[test]
fn render_verify_report_clean_output() {
use djogi::migrate::VerifyReport;
let report = VerifyReport {
diagnostics: vec![],
latest_applied_version: Some("001_initial".to_string()),
applied_count: 3,
unfinished_count: 0,
};
let bucket = render_bucket("main", "");
let lines = render_verify_report(&report, &bucket);
assert!(
lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
"missing ledger line; got {lines:?}"
);
assert!(
lines.contains(&"No drift detected. Schema is consistent.".to_string()),
"missing clean line; got {lines:?}"
);
assert!(
lines.iter().any(|l| l.contains("Result: PASSED")),
"missing PASSED result; got {lines:?}"
);
assert!(
!lines.iter().any(|l| l.contains("FAILED")),
"clean report must not say FAILED; got {lines:?}"
);
}
#[test]
fn render_verify_report_with_errors() {
use djogi::migrate::{VerifyReport, VerifySeverity};
let report = VerifyReport {
diagnostics: vec![
diag(
"D601",
VerifySeverity::Error,
"Snapshot table missing from live DB",
Some("users"),
),
diag(
"D611",
VerifySeverity::Warning,
"Live index not present in snapshot",
Some("idx_posts_created"),
),
],
latest_applied_version: Some("V20260501000000__add_users".to_string()),
applied_count: 2,
unfinished_count: 0,
};
let bucket = render_bucket("main", "myapp");
assert!(report.has_errors());
let lines = render_verify_report(&report, &bucket);
assert!(
lines
.contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
"missing D601 line; got {lines:?}"
);
assert!(
lines.contains(
&"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
),
"missing D611 line; got {lines:?}"
);
assert!(
lines.iter().any(|l| l.contains("Result: FAILED")),
"error report must say FAILED; got {lines:?}"
);
}
#[test]
fn render_verify_report_header_shows_global_and_named_app() {
use djogi::migrate::VerifyReport;
let report = VerifyReport {
diagnostics: vec![],
latest_applied_version: None,
applied_count: 0,
unfinished_count: 0,
};
let global = render_verify_report(&report, &render_bucket("main", ""));
assert_eq!(
global.first().map(String::as_str),
Some("djogi migrations verify — main/_global_"),
"global bucket header; got {global:?}"
);
let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
assert_eq!(
named.first().map(String::as_str),
Some("djogi migrations verify — crud_log/billing"),
"named bucket header; got {named:?}"
);
}
#[test]
fn render_verify_report_warning_only_passes_with_warnings() {
use djogi::migrate::{VerifyReport, VerifySeverity};
let report = VerifyReport {
diagnostics: vec![diag(
"D606",
VerifySeverity::Warning,
"type differs (advisory)",
Some("users.age"),
)],
latest_applied_version: Some("001_initial".to_string()),
applied_count: 1,
unfinished_count: 0,
};
let lines = render_verify_report(&report, &render_bucket("main", ""));
assert!(
lines
.iter()
.any(|l| l.contains("Result: PASSED with warnings")),
"warning-only must PASS with warnings; got {lines:?}"
);
assert!(
!lines.iter().any(|l| l.contains("FAILED")),
"warning-only must not say FAILED; got {lines:?}"
);
}
#[test]
fn render_verify_report_empty_ledger_line() {
use djogi::migrate::VerifyReport;
let report = VerifyReport {
diagnostics: vec![],
latest_applied_version: None,
applied_count: 0,
unfinished_count: 0,
};
let lines = render_verify_report(&report, &render_bucket("main", ""));
assert!(
lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
"empty ledger line; got {lines:?}"
);
}
#[test]
fn render_verify_report_unfinished_ledger_line() {
use djogi::migrate::VerifyReport;
let report = VerifyReport {
diagnostics: vec![],
latest_applied_version: Some("V20260501000000__add_users".to_string()),
applied_count: 2,
unfinished_count: 1,
};
let lines = render_verify_report(&report, &render_bucket("main", ""));
assert!(
lines.contains(
&"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
),
"unfinished ledger line; got {lines:?}"
);
}
#[test]
fn render_verify_report_info_with_no_location_uses_dash() {
use djogi::migrate::{VerifyReport, VerifySeverity};
let report = VerifyReport {
diagnostics: vec![diag(
"D692",
VerifySeverity::Info,
"enum type(s) declared; not yet checked",
None,
)],
latest_applied_version: Some("001_initial".to_string()),
applied_count: 1,
unfinished_count: 0,
};
let lines = render_verify_report(&report, &render_bucket("main", ""));
assert!(
lines.iter().any(|l| l.contains("(-)")),
"location: None must render as (-); got {lines:?}"
);
assert!(
lines.contains(&"Result: PASSED (1 info(s))".to_string()),
"all-info summary; got {lines:?}"
);
}
fn db_config(
url: &str,
crud_log_url: Option<&str>,
event_log_url: Option<&str>,
) -> djogi::config::DatabaseConfig {
djogi::config::DatabaseConfig {
url: url.to_string(),
crud_log_url: crud_log_url.map(str::to_string),
event_log_url: event_log_url.map(str::to_string),
max_connections: None,
dev_mode: false,
}
}
#[test]
fn resolve_bucket_url_main_uses_app_url_verbatim() {
let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
assert_eq!(
resolve_bucket_url(&cfg, "main").as_deref(),
Some("postgres://user:pass@localhost:5432/myapp_prod"),
"main must return the app URL unchanged"
);
}
#[test]
fn resolve_bucket_url_crud_log_prefers_explicit_url() {
let cfg = db_config(
"postgres://localhost/main",
Some("postgres://localhost/explicit_crud"),
None,
);
assert_eq!(
resolve_bucket_url(&cfg, "crud_log").as_deref(),
Some("postgres://localhost/explicit_crud"),
"crud_log must prefer the explicit crud_log_url"
);
}
#[test]
fn resolve_bucket_url_event_log_prefers_explicit_url() {
let cfg = db_config(
"postgres://localhost/main",
None,
Some("postgres://localhost/explicit_event"),
);
assert_eq!(
resolve_bucket_url(&cfg, "event_log").as_deref(),
Some("postgres://localhost/explicit_event"),
"event_log must prefer the explicit event_log_url"
);
}
#[test]
fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
let cfg = db_config("postgres://localhost/main", Some(""), Some(" "));
assert_eq!(
resolve_bucket_url(&cfg, "crud_log").as_deref(),
Some("postgres://localhost/crud_log"),
"empty crud_log_url must fall back to derived"
);
assert_eq!(
resolve_bucket_url(&cfg, "event_log").as_deref(),
Some(" "),
"non-empty (whitespace) event_log_url is used verbatim"
);
}
#[test]
fn resolve_bucket_url_other_database_derives_from_app_url() {
let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
assert_eq!(
resolve_bucket_url(&cfg, "analytics").as_deref(),
Some("postgres://user:pass@localhost:5432/analytics"),
"an arbitrary database name derives by path splice"
);
}
#[test]
fn resolve_bucket_url_pathless_url_returns_none() {
let cfg = db_config("postgres://localhost", None, None);
assert_eq!(
resolve_bucket_url(&cfg, "crud_log"),
None,
"pathless URL must yield None for a derived database"
);
}
#[test]
fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
let cfg = db_config("postgres://localhost", None, None);
assert_eq!(
resolve_bucket_url(&cfg, "main").as_deref(),
Some("postgres://localhost"),
"main returns the app URL verbatim regardless of path"
);
}
}