use std::path::{Path, PathBuf};
use std::process::ExitCode;
use djogi::apps::AppRegistry;
use djogi::migrate::{
AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
};
use djogi::migrate::LedgerStatus;
use crate::{PartialApplyResolutionCli, RepairSubcommand};
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
struct CliReplayPlan {
format_version: String,
checksum_up: String,
checksum_down: Option<String>,
classification: CliClassification,
segments: Vec<CliReplaySegment>,
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
enum CliClassification {
NoOp,
Additive,
Reversible,
Destructive,
Lossy,
Unsupported {
reason: String,
},
PkTypeFlip {
co_destructive: bool,
co_lossy: bool,
},
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
struct CliReplaySegment {
kind: CliSegmentKind,
statements: Vec<CliReplayStatement>,
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
enum CliSegmentKind {
Transactional,
NonTransactional,
MetadataOnly,
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
struct CliReplayStatement {
label: String,
up: String,
}
const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
fn load_replay_plan_from_disk(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
pending_checksum_up: &str,
pending_checksum_down: Option<&str>,
) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
if let Ok(bytes) = std::fs::read(&replay_plan_path) {
let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
Ok(s) => s,
Err(e) => {
return Err(ApplyReplayPlanError::Parse {
path: replay_plan_path.clone(),
source: e.to_string(),
});
}
};
if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
return Err(ApplyReplayPlanError::FormatVersion {
found: stored.format_version,
path: replay_plan_path.clone(),
});
}
if stored.checksum_up != pending_checksum_up
|| stored.checksum_down.as_deref() != pending_checksum_down
{
return Err(ApplyReplayPlanError::ChecksumMismatch);
}
let plan = djogi::migrate::MigrationPlan {
bucket: bucket.clone(),
classification: stored.classification.into(),
segments: stored
.segments
.into_iter()
.map(|seg| djogi::migrate::Segment {
kind: seg.kind.into(),
statements: seg
.statements
.into_iter()
.map(|stmt| djogi::migrate::OperationSql {
label: stmt.label,
up: stmt.up,
down: String::new(),
lossy: None,
})
.collect(),
})
.collect(),
};
return Ok((plan, stored.checksum_up, stored.checksum_down));
}
let up_filename = djogi::migrate::up_filename(version);
let down_filename = djogi::migrate::down_filename(version);
let up_path = bucket_dir.join(&up_filename);
let down_path = bucket_dir.join(&down_filename);
let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
path: up_path.clone(),
source: e.to_string(),
})?;
let down_sql = match std::fs::read_to_string(&down_path) {
Ok(sql) => sql,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
Err(e) => {
return Err(ApplyReplayPlanError::SqlRead {
path: down_path.clone(),
source: e.to_string(),
});
}
};
let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
let plan = djogi::migrate::MigrationPlan {
bucket: bucket.clone(),
classification: djogi::migrate::Classification::Additive,
segments: vec![djogi::migrate::Segment {
kind: djogi::migrate::SegmentKind::Transactional,
statements: vec![djogi::migrate::OperationSql {
label: format!("replay {version}"),
up: up_sql,
down: down_sql,
lossy: None,
}],
}],
};
Ok((plan, computed_checksum_up, None))
}
#[derive(Debug)]
enum ApplyReplayPlanError {
Parse { path: PathBuf, source: String },
FormatVersion { found: String, path: PathBuf },
ChecksumMismatch,
SqlRead { path: PathBuf, source: String },
}
impl std::fmt::Display for ApplyReplayPlanError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Parse { path, source } => {
write!(f, "parse replay plan {}: {source}", path.display())
}
Self::FormatVersion { found, path } => write!(
f,
"replay plan format version mismatch in {}: expected {}, found {}",
path.display(),
CLI_REPLAY_PLAN_FORMAT_VERSION,
found
),
Self::ChecksumMismatch => {
write!(f, "checksum mismatch between pending JSON and replay plan")
}
Self::SqlRead { path, source } => {
write!(f, "read SQL file {}: {source}", path.display())
}
}
}
}
impl std::error::Error for ApplyReplayPlanError {}
fn classify_phase_zero_for_cleanup(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
pending_checksum_up: &str,
pending_checksum_down: Option<&str>,
) -> Option<String> {
let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
if let Ok(bytes) = std::fs::read(&replay_plan_path) {
let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
Ok(s) => s,
Err(e) => {
return Some(format!("parse replay plan: {e}"));
}
};
if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
return Some(format!(
"replay plan format version mismatch: expected {}, found {}",
CLI_REPLAY_PLAN_FORMAT_VERSION, stored.format_version
));
}
if stored.checksum_up != pending_checksum_up
|| stored.checksum_down.as_deref() != pending_checksum_down
{
return Some("checksum mismatch between pending JSON and replay plan".to_string());
}
let up_sql: String = stored
.segments
.iter()
.flat_map(|seg| seg.statements.iter())
.map(|stmt| stmt.up.as_str())
.collect::<Vec<&str>>()
.join("\n");
return classify_phase_zero_bytes(up_sql.as_bytes());
}
let up_filename = djogi::migrate::up_filename(version);
let up_path = bucket_dir.join(&up_filename);
match std::fs::read_to_string(&up_path) {
Ok(up_sql) => classify_phase_zero_bytes(up_sql.as_bytes()),
Err(e) => Some(format!("read up SQL file {}: {e}", up_path.display())),
}
}
fn classify_phase_zero_bytes(bytes: &[u8]) -> Option<String> {
match djogi::migrate::classify_phase_zero_artifact(bytes) {
djogi::migrate::PhaseZeroArtifactState::IdentityFreeCurrent => None,
djogi::migrate::PhaseZeroArtifactState::SeedCapableRuntimeCurrent => {
Some("seed-capable runtime-only artifact detected".to_string())
}
djogi::migrate::PhaseZeroArtifactState::SeedDmlNotRuntimeCurrent => {
Some("seed-dml non-runtime-current artifact detected".to_string())
}
djogi::migrate::PhaseZeroArtifactState::GeneratedStale => {
Some("generated-stale artifact detected".to_string())
}
djogi::migrate::PhaseZeroArtifactState::Ambiguous => {
Some("ambiguous or hand-edited artifact detected".to_string())
}
djogi::migrate::PhaseZeroArtifactState::Incomplete => {
Some("incomplete artifact (truncated generation)".to_string())
}
djogi::migrate::PhaseZeroArtifactState::Missing => Some("missing artifact".to_string()),
}
}
impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
fn from(kind: CliSegmentKind) -> Self {
match kind {
CliSegmentKind::Transactional => Self::Transactional,
CliSegmentKind::NonTransactional => Self::NonTransactional,
CliSegmentKind::MetadataOnly => Self::MetadataOnly,
}
}
}
impl From<CliClassification> for djogi::migrate::Classification {
fn from(classification: CliClassification) -> Self {
match classification {
CliClassification::NoOp => Self::NoOp,
CliClassification::Additive => Self::Additive,
CliClassification::Reversible => Self::Reversible,
CliClassification::Destructive => Self::Destructive,
CliClassification::Lossy => Self::Lossy,
CliClassification::Unsupported { reason } => Self::Unsupported { reason },
CliClassification::PkTypeFlip {
co_destructive,
co_lossy,
} => Self::PkTypeFlip {
co_destructive,
co_lossy,
},
}
}
}
fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
}
fn discover_snapshot_buckets_on_disk(
workspace: &Path,
) -> Vec<djogi::migrate::projection::BucketKey> {
let mut out = Vec::new();
let migrations_root = djogi::migrate::migrations_root(workspace);
let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
return out;
};
for db_entry in db_entries.flatten() {
let Ok(ft) = db_entry.file_type() else {
continue;
};
if !ft.is_dir() {
continue;
}
let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
continue;
};
let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
continue;
};
for app_entry in app_entries.flatten() {
let Ok(ft) = app_entry.file_type() else {
continue;
};
if !ft.is_dir() {
continue;
}
let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
continue;
};
let snap_path = app_entry.path().join("schema_snapshot.json");
if !snap_path.exists() {
continue;
}
let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
out.push(djogi::migrate::projection::BucketKey {
database: database.clone(),
app: label,
});
}
}
out
}
pub fn compose_cmd(
provider: &dyn DescriptorProvider,
name: &str,
allow_destructive: bool,
force_overwrite: bool,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let models = match project_from_provider(provider) {
Ok(m) => m,
Err(e) => {
eprintln!("djogi migrations compose: projection error: {e}");
return ExitCode::from(1);
}
};
let apps: Vec<AppLifecycle> = provider
.apps()
.iter()
.map(|d| AppLifecycle {
label: d.label.to_string(),
database: d.database.to_string(),
renamed_from: d.renamed_from.map(str::to_string),
tombstone: d.tombstone,
})
.collect();
let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations compose: config load: {e}");
return ExitCode::from(1);
}
};
let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
djogi_config.migrate.pk_flip_join_table_option,
);
compose_with_inputs(
&workspace,
name,
allow_destructive,
force_overwrite,
&models,
&apps,
time::OffsetDateTime::now_utc(),
Some(pk_flip_option),
)
}
#[allow(clippy::too_many_arguments)]
fn compose_with_inputs(
workspace: &Path,
name: &str,
allow_destructive: bool,
force_overwrite: bool,
models: &std::collections::BTreeMap<
djogi::migrate::projection::BucketKey,
djogi::migrate::AppliedSchema,
>,
apps: &[AppLifecycle],
now: time::OffsetDateTime,
pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
) -> ExitCode {
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
return ExitCode::from(1);
}
};
let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
models.keys().cloned().collect();
for bucket in discover_snapshot_buckets_on_disk(workspace) {
bucket_set.insert(bucket);
}
let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
for bucket in &bucket_set {
let path = djogi::migrate::snapshot_path(workspace, bucket);
match djogi::migrate::load_snapshot(&path) {
Ok(s) => {
snapshots.insert(bucket.clone(), s);
}
Err(djogi::migrate::SnapshotError::Io { source, .. })
if source.kind() == std::io::ErrorKind::NotFound =>
{
}
Err(e) => {
eprintln!(
"djogi migrations compose: snapshot load failed at {}: {e}",
path.display()
);
return ExitCode::from(1);
}
}
}
let req = ComposeRequest {
workspace_root: workspace,
models,
snapshots: &snapshots,
apps,
name,
allow_destructive,
force_overwrite,
now,
_guard: &guard,
pk_flip_join_table_option,
skip_phase_zero_auto_emit: false,
};
match compose(req) {
Ok(report) => {
for emit in &report.emitted_phase_zero {
let ext_summary = if emit.extensions.is_empty() {
"no extensions".to_string()
} else {
format!(
"extensions: {}",
emit.extensions
.iter()
.cloned()
.collect::<Vec<_>>()
.join(", ")
)
};
println!(
"auto-emitted bootstrap migration: {database}/_global_ ({ext_summary})",
database = emit.database,
);
}
for cb in &report.composed_buckets {
println!(
"composed {database}/{app}: {version} ({classification:?})",
database = cb.bucket.database,
app = if cb.bucket.app.is_empty() {
"_global_"
} else {
cb.bucket.app.as_str()
},
version = cb.version,
classification = cb.classification,
);
}
for bucket in &report.converged_snapshot_buckets {
println!(
"snapshot converged: {database}/{app} — snapshot updated to scoped enum set, no migration needed",
database = bucket.database,
app = if bucket.app.is_empty() {
"_global_"
} else {
bucket.app.as_str()
},
);
}
ExitCode::from(0)
}
Err(ComposeError::NothingToCompose) => {
println!("nothing to compose — model state matches snapshot for every bucket");
ExitCode::from(0)
}
Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
eprintln!("djogi migrations compose: {text}");
ExitCode::from(2)
}
Err(e @ ComposeError::CrossBucketForeignKeyCycle { .. }) => {
eprintln!("djogi migrations compose: {e}");
ExitCode::from(2)
}
Err(e) => {
eprintln!("djogi migrations compose: {e}");
ExitCode::from(1)
}
}
}
pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations status: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async { run_status(&workspace).await });
ExitCode::from(exit as u8)
}
async fn run_status(workspace: &Path) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations status: config load: {e}");
return 1;
}
};
let mut ctx = match connect_and_check(&config.database.url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations status", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations status: pool: {msg}");
return 1;
}
};
let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
Ok(rows) => rows,
Err(e) => {
if e.to_string().contains("djogi_schema_migrations") {
println!("No migrations recorded.");
return 0;
}
eprintln!("djogi migrations status: ledger read: {e}");
return 1;
}
};
let registered: Vec<String> = AppRegistry::all()
.iter()
.map(|d| d.label.to_string())
.collect();
let report = djogi::migrate::render_status(&rows, ®istered);
for line in &report.lines {
println!("{line}");
}
report.exit_code
}
#[allow(clippy::large_enum_variant)]
enum ContextOutcome {
Ready(djogi::context::DjogiContext),
UnsupportedVersion(djogi::error::DjogiError),
RuntimeError(String),
}
async fn connect_and_check(url: &str) -> ContextOutcome {
let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
Ok(p) => p,
Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
};
match djogi::pg::preflight::check_postgres_version(&pool).await {
Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
ContextOutcome::UnsupportedVersion(e)
}
Err(other) => ContextOutcome::RuntimeError(other.to_string()),
}
}
fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
return Some(db_config.url.clone());
}
if database == "crud_log"
&& let Some(u) = db_config.crud_log_url.as_deref()
&& !u.is_empty()
{
return Some(u.to_string());
}
if database == "event_log"
&& let Some(u) = db_config.event_log_url.as_deref()
&& !u.is_empty()
{
return Some(u.to_string());
}
djogi::migrate::derive_per_database_url(&db_config.url, database)
}
pub fn apply_cmd(
workspace: Option<PathBuf>,
fake: bool,
reason: Option<String>,
node_id: Option<u32>,
single_node_dev: bool,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let mode = if fake {
match reason {
Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
Some(_) => {
eprintln!(
"djogi migrations apply --fake: --reason must not be empty; \
supply a non-empty reason why these migrations are being \
faked (e.g. 'schema pre-exists from prior tooling')"
);
return ExitCode::from(2);
}
None => {
eprintln!(
"djogi migrations apply --fake: --reason is required; \
supply a reason why these migrations are being faked \
(e.g. 'schema pre-exists from prior tooling'). \
This is recorded in the ledger audit trail."
);
return ExitCode::from(2);
}
}
} else {
FakeMode::Real
};
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations apply: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit =
runtime.block_on(async { run_apply(&workspace, &mode, node_id, single_node_dev).await });
ExitCode::from(exit as u8)
}
#[derive(Debug, Clone)]
enum FakeMode {
Real,
Fake { reason: String },
}
async fn run_apply(
workspace: &Path,
mode: &FakeMode,
node_id: Option<u32>,
single_node_dev: bool,
) -> i32 {
use djogi::config::DjogiConfig;
let action_verb = match mode {
FakeMode::Real => "apply",
FakeMode::Fake { .. } => "fake-apply",
};
let progress_verb = match mode {
FakeMode::Real => "applying",
FakeMode::Fake { .. } => "faking",
};
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations {action_verb}: config load: {e}");
return 2;
}
};
let pending_files = match discover_pending_plans(workspace) {
Ok(pending_files) => pending_files,
Err(e) => {
eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
return 2;
}
};
if pending_files.is_empty() {
println!("No pending migrations to {action_verb}.");
return 0;
}
let runner_identity = match crate::identity::resolve_identity(
node_id,
single_node_dev,
&config.profile,
action_verb,
) {
Ok(resolved) => Some(resolved.into_runner_identity()),
Err(e) => {
let _ = crate::identity::print_identity_error(action_verb, &e);
return 2;
}
};
let target_urls = match resolve_apply_target_urls(&pending_files, &config.database) {
Ok(urls) => urls,
Err(e) => {
eprintln!("djogi migrations {action_verb}: target routing: {e}");
return 2;
}
};
let mut contexts = std::collections::BTreeMap::<String, djogi::context::DjogiContext>::new();
for (database, url) in &target_urls {
match connect_and_check(url).await {
ContextOutcome::Ready(ctx) => {
contexts.insert(database.clone(), ctx);
}
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations apply", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations {action_verb}: pool for '{database}': {msg}");
return 1;
}
}
}
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
return 1;
}
};
let pending_files = match reconcile_pending_plans_after_lock(workspace, &pending_files) {
Ok(pending_files) => pending_files,
Err(e) => {
eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
return 2;
}
};
let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
Err(_) => None,
};
for pending_file in &pending_files {
let bucket_database = &pending_file.bucket.database;
let app_label = &pending_file.bucket.app;
let Some(ctx) = contexts.get_mut(bucket_database) else {
eprintln!(
"djogi migrations {action_verb}: internal error: missing context for database '{bucket_database}'"
);
return 1;
};
println!(" {progress_verb} {bucket_database}/{app_label}...");
let result = apply_one_pending(
ctx,
workspace,
pending_file,
&config,
&guard,
audit_pool.as_ref(),
mode,
runner_identity,
)
.await;
match result {
ApplyResult::Ok => match mode {
FakeMode::Real => {
println!("Applied: {bucket_database}/{app_label}");
}
FakeMode::Fake { .. } => {
println!(
" faked {bucket_database}/{app_label}: \
recorded in ledger with status = 'faked' (no SQL executed)"
);
}
},
ApplyResult::Skipped(reason) => {
println!("Skipped {bucket_database}/{app_label}: {reason}");
}
ApplyResult::Refused(reason) => {
eprintln!(
"djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
);
return 2;
}
ApplyResult::RunnerError(e) => {
eprintln!(
"djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
);
return runner_error_exit_code(&e);
}
}
}
let summary_verb = match mode {
FakeMode::Real => "applied",
FakeMode::Fake { .. } => "faked",
};
println!("{summary_verb} {} migration(s).", pending_files.len());
0
}
#[derive(Debug)]
enum ApplyResult {
Ok,
Skipped(String),
Refused(String),
RunnerError(RunnerError),
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct DiscoveredPendingPlan {
path: PathBuf,
bucket: BucketKey,
plan: PendingPlan,
is_phase_zero: bool,
}
fn is_acceptable_pending_path_component(bytes: &[u8]) -> bool {
if bytes.is_empty() || bytes.len() > 63 {
return false;
}
if bytes[0] == b'.' {
return false;
}
let first = bytes[0];
if first != b'_' && !first.is_ascii_alphabetic() {
return false;
}
for &b in &bytes[1..] {
if b != b'_' && !b.is_ascii_alphanumeric() {
return false;
}
}
true
}
fn canonical_pending_filename(app_label: &str) -> String {
format!("{}.json", djogi::migrate::app_dirname(app_label))
}
fn validate_hidden_phase_zero_pending(
path: PathBuf,
database: &str,
) -> Result<DiscoveredPendingPlan, String> {
let filename = path
.file_name()
.and_then(|f| f.to_str())
.ok_or_else(|| format!("non-utf8 Phase 0 pending path {}", path.display()))?;
let expected_filename = format!("{}.json", djogi::migrate::PHASE_ZERO_VERSION);
if filename != expected_filename {
return Err(format!(
"hidden Phase 0 pending path {} must use canonical filename {}",
path.display(),
expected_filename
));
}
let plan = djogi::migrate::load_pending(&path)
.map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
if plan.bucket_database != database {
return Err(format!(
"pending JSON {} has bucket database {}, expected {} from path",
path.display(),
plan.bucket_database,
database
));
}
if !plan.bucket_app.is_empty() {
return Err(format!(
"pending JSON {} must target the global bucket in hidden Phase 0 namespace",
path.display()
));
}
if plan.version != djogi::migrate::PHASE_ZERO_VERSION {
return Err(format!(
"pending JSON {} must use Phase 0 version {}, found {}",
path.display(),
djogi::migrate::PHASE_ZERO_VERSION,
plan.version
));
}
Ok(DiscoveredPendingPlan {
path,
bucket: BucketKey {
database: database.to_string(),
app: String::new(),
},
plan,
is_phase_zero: true,
})
}
fn validate_normal_pending(
path: PathBuf,
database: &str,
filename: &str,
) -> Result<DiscoveredPendingPlan, String> {
let Some(stem) = filename.strip_suffix(".json") else {
return Err(format!(
"pending path {} must end with .json",
path.display()
));
};
let app = if stem == "_global_" {
String::new()
} else {
if !is_acceptable_pending_path_component(stem.as_bytes()) {
return Err(format!(
"pending path {} uses non-canonical app filename {}",
path.display(),
filename
));
}
stem.to_string()
};
let expected_filename = canonical_pending_filename(&app);
if filename != expected_filename {
return Err(format!(
"pending path {} must use canonical filename {}",
path.display(),
expected_filename
));
}
let plan = djogi::migrate::load_pending(&path)
.map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
if plan.bucket_database != database {
return Err(format!(
"pending JSON {} has bucket database {}, expected {} from path",
path.display(),
plan.bucket_database,
database
));
}
if plan.bucket_app != app {
let expected_app = if app.is_empty() {
"_global_"
} else {
app.as_str()
};
let found_app = if plan.bucket_app.is_empty() {
"_global_"
} else {
plan.bucket_app.as_str()
};
return Err(format!(
"pending JSON {} has bucket app {}, expected {} from path",
path.display(),
found_app,
expected_app
));
}
if plan.version == djogi::migrate::PHASE_ZERO_VERSION {
return Err(format!(
"pending JSON {} must use the hidden .phase_zero namespace for Phase 0",
path.display()
));
}
Ok(DiscoveredPendingPlan {
path,
bucket: BucketKey {
database: database.to_string(),
app,
},
is_phase_zero: false,
plan,
})
}
fn discover_pending_plans(workspace: &Path) -> Result<Vec<DiscoveredPendingPlan>, String> {
let pending_root = djogi::migrate::pending_root(workspace);
let mut out = Vec::new();
let mut seen_identities = std::collections::BTreeSet::new();
let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
return Ok(out);
};
for db_entry in db_entries.flatten() {
let db_name = match db_entry.file_name().to_str().map(str::to_string) {
Some(n) => n,
None => continue,
};
if !is_acceptable_pending_path_component(db_name.as_bytes()) {
continue;
}
let db_dir = db_entry.path();
if !db_dir.is_dir() {
continue;
}
let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
continue;
};
for app_entry in app_entries.flatten() {
let path = app_entry.path();
let file_type = match app_entry.file_type() {
Ok(file_type) => file_type,
Err(_) => continue,
};
if file_type.is_dir() {
if app_entry.file_name().to_str() == Some(".phase_zero") {
let Ok(phase_zero_entries) = std::fs::read_dir(&path) else {
continue;
};
for phase_zero_entry in phase_zero_entries.flatten() {
let phase_zero_path = phase_zero_entry.path();
if !phase_zero_path.is_file() {
continue;
}
let discovered =
validate_hidden_phase_zero_pending(phase_zero_path, &db_name)?;
let identity = (
discovered.bucket.database.clone(),
discovered.bucket.app.clone(),
discovered.plan.version.clone(),
);
if !seen_identities.insert(identity.clone()) {
return Err(format!(
"duplicate pending identity discovered for {}/{}/{}",
identity.0,
if identity.1.is_empty() {
"_global_"
} else {
identity.1.as_str()
},
identity.2
));
}
out.push(discovered);
}
}
continue;
}
if !file_type.is_file() {
continue;
}
let filename = match path.file_name().and_then(|f| f.to_str()) {
Some(f) => f.to_string(),
None => continue,
};
if !filename.ends_with(".json") {
continue;
}
let discovered = validate_normal_pending(path, &db_name, &filename)?;
let identity = (
discovered.bucket.database.clone(),
discovered.bucket.app.clone(),
discovered.plan.version.clone(),
);
if !seen_identities.insert(identity.clone()) {
return Err(format!(
"duplicate pending identity discovered for {}/{}/{}",
identity.0,
if identity.1.is_empty() {
"_global_"
} else {
identity.1.as_str()
},
identity.2
));
}
out.push(discovered);
}
}
out.sort_by(|a, b| {
a.plan
.version
.cmp(&b.plan.version)
.then_with(|| b.is_phase_zero.cmp(&a.is_phase_zero))
.then_with(|| a.path.cmp(&b.path))
});
let out = order_pending_groups_by_dependencies(out)?;
Ok(out)
}
fn order_pending_groups_by_dependencies(
out: Vec<DiscoveredPendingPlan>,
) -> Result<Vec<DiscoveredPendingPlan>, String> {
let mut result = Vec::with_capacity(out.len());
let mut i = 0;
while i < out.len() {
let mut j = i + 1;
while j < out.len()
&& out[j].bucket.database == out[i].bucket.database
&& out[j].plan.version == out[i].plan.version
&& out[j].is_phase_zero == out[i].is_phase_zero
{
j += 1;
}
for entry in &out[i..j] {
for dep_app in &entry.plan.depends_on {
if !is_acceptable_pending_path_component(dep_app.as_bytes()) {
return Err(format!(
"pending plan for {}/{} has invalid depends_on label {:?}",
entry.bucket.database, entry.bucket.app, dep_app,
));
}
}
}
if j - i <= 1 {
result.append(&mut out[i..j].to_vec());
i = j;
continue;
}
let database = &out[i].bucket.database;
let version = &out[i].plan.version;
let group_len = j - i;
let mut in_degree = vec![0usize; group_len];
let mut reverse: Vec<Vec<usize>> = vec![Vec::new(); group_len];
let app_to_idx: std::collections::HashMap<&str, usize> = out[i..j]
.iter()
.enumerate()
.map(|(idx, entry)| (entry.bucket.app.as_str(), idx))
.collect();
for (k_idx, entry) in out[i..j].iter().enumerate() {
for dep_app in &entry.plan.depends_on {
let Some(&dep_idx) = app_to_idx.get(dep_app.as_str()) else {
continue; };
if dep_idx != k_idx {
in_degree[k_idx] += 1;
reverse[dep_idx].push(k_idx);
}
}
}
let mut ready: std::collections::BTreeSet<usize> =
(0..group_len).filter(|&idx| in_degree[idx] == 0).collect();
let mut ordered = Vec::with_capacity(group_len);
while let Some(idx) = ready.iter().next().cloned() {
ready.remove(&idx);
ordered.push(idx);
for &dependent in &reverse[idx] {
in_degree[dependent] -= 1;
if in_degree[dependent] == 0 {
ready.insert(dependent);
}
}
}
if ordered.len() != group_len {
let mut chain: Vec<String> = (0..group_len)
.filter(|&idx| in_degree[idx] > 0)
.map(|idx| out[i + idx].bucket.app.clone())
.collect();
chain.sort();
return Err(format!(
"pending migrations for database `{database}` version `{version}` \
declare a dependency cycle between apps: {chain:?}; \
recompose or inspect hand-edited pending files"
));
}
for idx in ordered {
result.push(out[i + idx].clone());
}
i = j;
}
Ok(result)
}
fn load_verified_pending_for_apply(
pending_file: &DiscoveredPendingPlan,
) -> Result<PendingPlan, String> {
let pending_bytes =
std::fs::read(&pending_file.path).map_err(|e| format!("read pending JSON: {e}"))?;
let pending: PendingPlan =
serde_json::from_slice(&pending_bytes).map_err(|e| format!("parse pending JSON: {e}"))?;
if pending != pending_file.plan {
return Err(format!(
"pending JSON changed after discovery at {}; rerun the command",
pending_file.path.display()
));
}
Ok(pending)
}
fn resolve_apply_target_urls(
pending_files: &[DiscoveredPendingPlan],
db_config: &djogi::config::DatabaseConfig,
) -> Result<std::collections::BTreeMap<String, String>, String> {
let mut urls = std::collections::BTreeMap::new();
for pending_file in pending_files {
let database = &pending_file.bucket.database;
if urls.contains_key(database) {
continue;
}
let Some(url) = resolve_bucket_url(db_config, database) else {
return Err(format!("cannot derive a database URL for `{database}`"));
};
urls.insert(database.clone(), url);
}
Ok(urls)
}
fn reconcile_pending_plans_after_lock(
workspace: &Path,
pre_lock_pending_files: &[DiscoveredPendingPlan],
) -> Result<Vec<DiscoveredPendingPlan>, String> {
let locked_pending_files = discover_pending_plans(workspace)?;
if locked_pending_files != pre_lock_pending_files {
return Err(
"pending migration set changed while waiting for the workspace lock; rerun the command"
.to_string(),
);
}
Ok(locked_pending_files)
}
#[allow(clippy::too_many_arguments)]
#[djogi::deliberately_bypass_convention_with_raw_sql]
async fn apply_one_pending(
ctx: &mut djogi::context::DjogiContext,
workspace: &Path,
pending_file: &DiscoveredPendingPlan,
config: &djogi::config::DjogiConfig,
guard: &djogi::migrate::WorkspaceGuard,
audit_pool: Option<&deadpool_postgres::Pool>,
mode: &FakeMode,
runner_identity: Option<djogi::migrate::RunnerIdentity>,
) -> ApplyResult {
let pending = match load_verified_pending_for_apply(pending_file) {
Ok(pending) => pending,
Err(e) => return ApplyResult::Refused(e),
};
let bucket = pending_file.bucket.clone();
match check_ledger_state(ctx, &pending.version, &bucket.app).await {
LedgerState::NotPresent => {}
LedgerState::AlreadyApplied => {
return ApplyResult::Skipped("already applied".to_string());
}
LedgerState::PendingOrPartial(existing_status) => {
if existing_status == LedgerStatus::Failed
|| existing_status == LedgerStatus::RolledBack
{
if pending.version == djogi::migrate::PHASE_ZERO_VERSION {
let cleanup_refusal = classify_phase_zero_for_cleanup(
workspace,
&bucket,
&pending.version,
&pending.checksum_up,
pending.checksum_down.as_deref(),
);
if let Some(reason) = cleanup_refusal {
return ApplyResult::Refused(format!(
"Phase 0 cleanup refused: {reason}; \
refusing before deleting {} row to prevent stale replay",
existing_status.as_db_str()
));
}
}
if let Err(e) =
delete_reapply_blocking_ledger_row(ctx, &pending.version, &bucket.app).await
{
return ApplyResult::Refused(format!(
"clean {} ledger row: {e}",
existing_status.as_db_str()
));
}
} else {
return ApplyResult::Refused(format!(
"version already in {} state — resolve before re-applying",
existing_status.as_db_str()
));
}
}
}
let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
workspace,
&bucket,
&pending.version,
&pending.checksum_up,
pending.checksum_down.as_deref(),
) {
Ok(result) => result,
Err(e) => {
return ApplyResult::Refused(format!("load replay plan: {e}"));
}
};
let runner_ctx = RunnerCtx {
bucket: bucket.clone(),
version: pending.version.clone(),
description: pending.slug.clone(),
checksum_up,
checksum_down,
snapshot: Some(pending.model_snapshot.clone()),
snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
config: djogi::config::MigrateConfig {
concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
},
out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
audit_pool: audit_pool.cloned(),
runner_identity,
};
let runner_result = match mode {
FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
};
match runner_result {
Ok(_) => ApplyResult::Ok,
Err(e) => ApplyResult::RunnerError(e),
}
}
#[derive(Debug)]
enum LedgerState {
NotPresent,
AlreadyApplied,
PendingOrPartial(LedgerStatus),
}
async fn check_ledger_state(
ctx: &mut djogi::context::DjogiContext,
version: &str,
app_label: &str,
) -> LedgerState {
let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
return LedgerState::NotPresent;
};
let existing = rows
.iter()
.find(|r| r.version == version && r.app_label == app_label);
match existing {
None => LedgerState::NotPresent,
Some(row) => match row.status {
LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
LedgerState::AlreadyApplied
}
LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
LedgerState::PendingOrPartial(row.status)
}
},
}
}
fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1
}
#[djogi::deliberately_bypass_convention_with_raw_sql]
async fn delete_reapply_blocking_ledger_row(
ctx: &mut djogi::context::DjogiContext,
version: &str,
app_label: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
ctx.raw_execute(
"DELETE FROM djogi_schema_migrations \
WHERE version = $1 AND app_label = $2",
&[&version, &app_label],
)
.await?;
Ok(())
}
fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
let migrations_root = djogi::migrate::migrations_root(workspace);
migrations_root
.join(&bucket.database)
.join(djogi::migrate::app_dirname(&bucket.app))
.join("schema_snapshot.json")
}
#[allow(clippy::too_many_arguments)]
pub fn attune_cmd(
target: Option<&str>,
apply: bool,
record: bool,
record_ledger: bool,
record_reason: &str,
squash: bool,
from: Option<&str>,
publish: bool,
app: Option<&str>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let mode = match (record_ledger, squash) {
(false, false) => AttuneMode::DiffOnly,
(true, false) => AttuneMode::Record {
reason: record_reason.to_string(),
},
(false, true) => match from {
Some(v) if !v.is_empty() => AttuneMode::Squash {
from: v.to_string(),
publish,
app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
},
_ => {
eprintln!(
"djogi migrations attune --squash requires --from <version> (e.g. \
`--from V20260101000000__init`)"
);
return ExitCode::from(2);
}
},
(true, true) => {
eprintln!(
"djogi migrations attune: --record-ledger and --squash are mutually exclusive"
);
return ExitCode::from(2);
}
};
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations attune: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let target_owned = target.map(str::to_string);
let exit =
runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
ExitCode::from(exit as u8)
}
async fn run_attune(
workspace: &Path,
mode: AttuneMode,
target: Option<String>,
apply: bool,
record: bool,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations attune: config load: {e}");
return 1;
}
};
let mut ctx = match connect_and_check(&config.database.url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations attune", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations attune: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
return 1;
}
};
let req = AttuneRequest {
workspace_root: workspace,
database_url: &config.database.url,
profile: &config.profile,
dev_mode: config.database.dev_mode,
target: target.as_deref(),
apply,
record,
mode,
_guard: &guard,
};
match attune(&mut ctx, req).await {
Ok(report) => {
if report.entries.is_empty() {
println!("attune: no drift");
} else {
for entry in &report.entries {
let app_display = if entry.bucket.app.is_empty() {
"_global_"
} else {
entry.bucket.app.as_str()
};
println!(
" {kind:<10} {database}/{app} {version}",
kind = entry.kind.as_str(),
database = entry.bucket.database,
app = app_display,
version = entry.version,
);
}
}
for diag in &report.diagnostics {
println!(" diagnostic: {diag}");
}
if let Some(sha) = &report.resolved_target {
println!("resolved target: {sha}");
}
if let Some(squashed) = &report.squashed_to {
println!("squashed to: {squashed}");
}
if report.published {
println!("published to remote");
}
if report.parent_pointer_updated {
println!("parent submodule pointer updated");
}
0
}
Err(e) => {
eprintln!("djogi migrations attune: {e}");
attune_error_exit_code(&e)
}
}
}
fn attune_error_exit_code(err: &AttuneError) -> i32 {
match err {
AttuneError::Refused(_) => 2,
AttuneError::FilesystemScanFailed { .. }
| AttuneError::LedgerQueryFailed { .. }
| AttuneError::SqlReadFailed { .. }
| AttuneError::SqlWriteFailed { .. }
| AttuneError::SqlDeleteFailed { .. }
| AttuneError::GitPublishFailed { .. }
| AttuneError::GitTargetResolveFailed { .. }
| AttuneError::GitFetchFailed { .. }
| AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
}
}
pub fn verify_cmd(
provider: &dyn DescriptorProvider,
workspace: Option<PathBuf>,
strict: bool,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations verify: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
ExitCode::from(exit as u8)
}
async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
use djogi::config::DjogiConfig;
if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
crate::print_zero_descriptor_diagnostic("migrations verify");
return 2;
}
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations verify: config load: {e}");
return 1;
}
};
let models = match project_from_provider(provider) {
Ok(m) => m,
Err(e) => {
eprintln!("djogi migrations verify: projection error: {e}");
return 1;
}
};
let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
models.keys().cloned().collect();
for bucket in discover_snapshot_buckets_on_disk(workspace) {
bucket_set.insert(bucket);
}
if bucket_set.is_empty() {
crate::print_zero_descriptor_diagnostic("migrations verify");
return 2;
}
let policy = djogi::config::PolicyConfig {
strict_out_of_order: strict,
};
let database_has_models: std::collections::HashSet<String> = bucket_set
.iter()
.filter(|b| {
models
.get(*b)
.map(|s| !s.models.is_empty())
.unwrap_or(false)
})
.map(|b| b.database.clone())
.collect();
let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
std::collections::BTreeMap::new();
let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
let mut exit_code: i32 = 0;
for bucket in &bucket_set {
let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
let bd = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
eprintln!(
"djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
check that config.database.url has a valid path component",
bucket.database, bucket.database, bd
);
exit_code = 1;
continue;
};
if !contexts.contains_key(&bucket.database) {
match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => {
contexts.insert(bucket.database.clone(), ctx);
}
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations verify", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!(
"djogi migrations verify: pool for '{}': {msg}",
bucket.database
);
exit_code = 1;
continue;
}
}
}
let snap_path = snapshot_path(workspace, bucket);
let snapshot = match load_snapshot(&snap_path) {
Ok(s) => s,
Err(SnapshotError::Io { source, .. })
if source.kind() == std::io::ErrorKind::NotFound =>
{
let bd = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
let has_models = models
.get(bucket)
.map(|s| !s.models.is_empty())
.unwrap_or(false);
if has_models {
eprintln!(
"djogi migrations verify: {}/{} has registered models but no \
snapshot; run `djogi migrations compose` then \
`djogi migrations apply` to record a baseline",
bucket.database, bd
);
exit_code = 1;
} else {
println!("No snapshot found for bucket {}/{}", bucket.database, bd);
}
continue;
}
Err(e) => {
let bd = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
eprintln!(
"djogi migrations verify: load snapshot for {}/{}: {e}",
bucket.database, bd
);
exit_code = 1;
continue;
}
};
let db_has_models = database_has_models.contains(&bucket.database);
let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
let ctx = contexts
.get_mut(&bucket.database)
.expect("context inserted above");
let report = match djogi::migrate::verify_bucket(
ctx,
bucket,
&snapshot,
&policy,
emit_ledger,
db_has_models,
)
.await
{
Ok(r) => r,
Err(e) => {
let bd = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
eprintln!(
"djogi migrations verify: error for {}/{}: {e}",
bucket.database, bd
);
exit_code = 1;
continue;
}
};
for line in render_verify_report(&report, bucket) {
println!("{line}");
}
if report.has_errors() {
exit_code = 1;
}
}
exit_code
}
fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
let mut lines: Vec<String> = Vec::new();
let app_display = if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
};
lines.push(format!(
"djogi migrations verify — {}/{}",
bucket.database, app_display
));
lines.push("──────────────────────────────────────────".to_string());
match (
&report.latest_applied_version,
report.applied_count,
report.unfinished_count,
) {
(Some(version), applied, 0) => {
lines.push(format!("Ledger: {applied} applied, latest {version}"));
}
(Some(version), applied, unfinished) => {
lines.push(format!(
"Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
));
}
(None, 0, 0) => {
lines.push("Ledger: empty (no migrations applied yet)".to_string());
}
_ => {}
}
lines.push(String::new());
if report.diagnostics.is_empty() {
lines.push("No drift detected. Schema is consistent.".to_string());
} else {
for d in &report.diagnostics {
let severity = match d.severity {
VerifySeverity::Info => "INFO",
VerifySeverity::Warning => "WARN",
VerifySeverity::Error => "ERROR",
};
let location = d.location.as_deref().unwrap_or("-");
lines.push(format!(
"[{severity}] {code} ({loc}): {msg}",
severity = severity,
code = d.code,
loc = location,
msg = d.message
));
}
}
let errors = report
.diagnostics
.iter()
.filter(|d| d.severity == VerifySeverity::Error)
.count();
let warnings = report
.diagnostics
.iter()
.filter(|d| d.severity == VerifySeverity::Warning)
.count();
let infos = report
.diagnostics
.iter()
.filter(|d| d.severity == VerifySeverity::Info)
.count();
if errors > 0 {
lines.push(String::new());
lines.push(format!(
"Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
));
} else if warnings > 0 {
lines.push(String::new());
lines.push(format!(
"Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
));
} else {
lines.push(String::new());
lines.push(format!("Result: PASSED ({infos} info(s))"));
}
lines
}
impl From<PartialApplyResolutionCli> for PartialApplyResolution {
fn from(cli: PartialApplyResolutionCli) -> Self {
match cli {
PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
PartialApplyResolutionCli::Faked => Self::MarkFaked,
PartialApplyResolutionCli::Applied => Self::MarkApplied,
}
}
}
pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
match command {
RepairSubcommand::ChecksumDrift {
version,
app,
database,
checksum_up,
checksum_down,
workspace,
} => repair_checksum_drift_cmd(
&version,
app.as_deref(),
database.as_deref(),
checksum_up.as_deref(),
checksum_down.as_deref(),
workspace,
),
RepairSubcommand::PartialApply {
version,
resolution,
note,
app,
database,
workspace,
} => repair_partial_apply_cmd(
&version,
resolution.into(),
¬e,
app.as_deref(),
database.as_deref(),
workspace,
),
RepairSubcommand::ResumePartial {
version,
app,
database,
workspace,
node_id,
single_node_dev,
} => repair_resume_partial_apply_cmd(
&version,
app.as_deref(),
database.as_deref(),
workspace,
node_id,
single_node_dev,
),
RepairSubcommand::SnapshotRebuild {
app,
database,
snapshot_path,
workspace,
} => repair_snapshot_rebuild_cmd(
app.as_deref(),
database.as_deref(),
snapshot_path.as_deref(),
workspace,
),
}
}
fn render_repair_report(report: &RepairReport) {
for action in &report.actions_taken {
println!(" {action}");
}
if !report.ledger_changes.is_empty() {
println!("Ledger changes:");
for lc in &report.ledger_changes {
println!(
" {} | {} | {} -> {}",
lc.version, lc.column, lc.before, lc.after,
);
}
}
if !report.snapshot_changes.is_empty() {
println!("Snapshot changes:");
for sc in &report.snapshot_changes {
println!(" {} | {}", sc.path.display(), sc.description);
}
}
}
fn repair_error_exit_code(err: &RepairError) -> i32 {
match err {
RepairError::LedgerIo { .. } | RepairError::SnapshotIo { .. } | RepairError::AdvisoryLockFailed { .. } | RepairError::AdvisoryLockQueryFailed { .. } | RepairError::PinnedSessionCheckoutFailed { .. } | RepairError::ResumeStepFailed { .. } | RepairError::ResumeProgressAckFailed { .. } => 1,
RepairError::VersionNotFound { .. }
| RepairError::InsufficientConfirmation
| RepairError::InvalidChecksum { .. }
| RepairError::InvalidResolution { .. }
| RepairError::BucketAppMismatch { .. }
| RepairError::PlanVersionMismatch { .. }
| RepairError::PlanChecksumMismatch { .. }
| RepairError::LeafIdentityMismatch { .. }
| RepairError::NothingToResume { .. }
| RepairError::ResumeBlockedByNonTxProgressClaim { .. }
| RepairError::SuppliedSnapshotDiverges { .. }
| RepairError::AdvisoryUnlockReturnedFalse { .. } | RepairError::ResumePlanShapeMismatch { .. }
| RepairError::ReplayPlanShapeMismatch { .. }
| RepairError::PhaseZeroArtifactRefused { .. } | RepairError::MissingResumeIdentity { .. } => 2,
}
}
fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
database.unwrap_or("main").to_string()
}
fn compute_checksum_up_from_disk(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
) -> std::io::Result<String> {
let path =
djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
let sql = std::fs::read_to_string(&path)?;
Ok(djogi::migrate::compute_committed_sql_checksum(
&sql,
djogi::migrate::ResetSqlSide::Up,
))
}
fn compute_checksum_down_from_disk(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
) -> std::io::Result<Option<String>> {
let path =
djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
let sql = match std::fs::read_to_string(&path) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e),
};
Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
}
pub fn repair_checksum_drift_cmd(
version: &str,
app: Option<&str>,
database: Option<&str>,
checksum_up: Option<&str>,
checksum_down: Option<&str>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_repair_checksum_drift(
&workspace,
version,
app,
database,
checksum_up,
checksum_down,
)
.await
});
ExitCode::from(exit as u8)
}
async fn run_repair_checksum_drift(
workspace: &Path,
version: &str,
app: Option<&str>,
database: Option<&str>,
checksum_up: Option<&str>,
checksum_down: Option<&str>,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: config load: {e}");
return 1;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!(
"djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
);
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations repair checksum-drift", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
let new_checksum_up = match checksum_up {
Some(c) => c.to_string(),
None => {
match compute_checksum_up_from_disk(workspace, &bucket, version) {
Ok(cs) => cs,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
return 1;
}
}
}
};
let resolved_checksum_down = match checksum_down {
Some(c) => Some(c.to_string()),
None => {
match compute_checksum_down_from_disk(workspace, &bucket, version) {
Ok(cs_opt) => cs_opt,
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
return 1;
}
}
}
};
match repair_checksum_drift(
&mut ctx,
&guard,
&bucket,
version,
workspace,
&new_checksum_up,
resolved_checksum_down.as_deref(),
RepairConfirmation::OperatorAcknowledged,
)
.await
{
Ok(report) => {
render_repair_report(&report);
0
}
Err(e) => {
eprintln!("djogi migrations repair checksum-drift: {e}");
repair_error_exit_code(&e)
}
}
}
pub fn repair_partial_apply_cmd(
version: &str,
resolution: PartialApplyResolution,
note: &str,
app: Option<&str>,
database: Option<&str>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
});
ExitCode::from(exit as u8)
}
async fn run_repair_partial_apply(
workspace: &Path,
version: &str,
resolution: PartialApplyResolution,
note: &str,
app: Option<&str>,
database: Option<&str>,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations repair partial-apply: config load: {e}");
return 1;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!(
"djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
);
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations repair partial-apply", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations repair partial-apply: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
match repair_partial_apply(
&mut ctx,
&guard,
&bucket,
version,
workspace,
resolution,
note,
RepairConfirmation::OperatorAcknowledged,
)
.await
{
Ok(report) => {
render_repair_report(&report);
0
}
Err(e) => {
eprintln!("djogi migrations repair partial-apply: {e}");
repair_error_exit_code(&e)
}
}
}
pub fn repair_resume_partial_apply_cmd(
version: &str,
app: Option<&str>,
database: Option<&str>,
workspace: Option<PathBuf>,
node_id: Option<u32>,
single_node_dev: bool,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_repair_resume_partial(&workspace, version, app, database, node_id, single_node_dev)
.await
});
ExitCode::from(exit as u8)
}
async fn run_repair_resume_partial(
workspace: &Path,
version: &str,
app: Option<&str>,
database: Option<&str>,
node_id: Option<u32>,
single_node_dev: bool,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations repair resume-partial: config load: {e}");
return 1;
}
};
let runner_identity = match crate::identity::resolve_identity(
node_id,
single_node_dev,
&config.profile,
"repair resume-partial",
) {
Ok(resolved) => Some(resolved.into_runner_identity()),
Err(e) => {
let _ = crate::identity::print_identity_error("repair resume-partial", &e);
return 2;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!(
"djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
);
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations repair resume-partial", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations repair resume-partial: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
Ok(p) => p,
Err(e) => {
eprintln!("djogi migrations repair resume-partial: load plan: {e}");
return 2;
}
};
match repair_resume_partial_apply(
&mut ctx,
&guard,
workspace,
version,
&plan,
runner_identity,
RepairConfirmation::OperatorAcknowledged,
)
.await
{
Ok(report) => {
render_repair_report(&report);
0
}
Err(e) => {
eprintln!("djogi migrations repair resume-partial: {e}");
repair_error_exit_code(&e)
}
}
}
fn load_committed_plan_for_resume(
workspace: &Path,
bucket: &djogi::migrate::BucketKey,
version: &str,
) -> Result<djogi::migrate::MigrationPlan, String> {
let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
let plan_path = bucket_dir.join(format!("{version}.plan.json"));
let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
let stored: CliReplayPlan = serde_json::from_slice(&bytes)
.map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
return Err(format!(
"{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
plan_path.display(),
stored.format_version,
));
}
Ok(djogi::migrate::MigrationPlan {
bucket: bucket.clone(),
classification: stored.classification.into(),
segments: stored
.segments
.into_iter()
.map(|seg| djogi::migrate::Segment {
kind: seg.kind.into(),
statements: seg
.statements
.into_iter()
.map(|stmt| djogi::migrate::OperationSql {
label: stmt.label,
up: stmt.up,
down: String::new(),
lossy: None,
})
.collect(),
})
.collect(),
})
}
pub fn repair_snapshot_rebuild_cmd(
app: Option<&str>,
database: Option<&str>,
snapshot_path: Option<&Path>,
workspace: Option<PathBuf>,
) -> ExitCode {
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
});
ExitCode::from(exit as u8)
}
async fn run_repair_snapshot_rebuild(
workspace: &Path,
app: Option<&str>,
database: Option<&str>,
snapshot_path: Option<&Path>,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
return 1;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!(
"djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
);
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
let snap_path = match snapshot_path {
Some(p) => p.to_path_buf(),
None => reconstruct_snapshot_path(workspace, &bucket),
};
match repair_snapshot_rebuild(
&mut ctx,
&guard,
&bucket,
&snap_path,
RepairConfirmation::OperatorAcknowledged,
)
.await
{
Ok(report) => {
render_repair_report(&report);
0
}
Err(e) => {
eprintln!("djogi migrations repair snapshot-rebuild: {e}");
repair_error_exit_code(&e)
}
}
}
#[expect(
clippy::too_many_arguments,
reason = "CLI command entry point mirrors clap arguments explicitly"
)]
pub fn baseline_cmd(
version: &str,
description: &str,
reason: &str,
app: Option<&str>,
database: Option<&str>,
workspace: Option<PathBuf>,
node_id: Option<u32>,
single_node_dev: bool,
) -> ExitCode {
if reason.trim().is_empty() {
eprintln!(
"djogi migrations baseline: --reason must not be empty; \
supply a non-empty reason why this baseline is being established \
(e.g. 'schema pre-exists from prior tooling'). \
This is recorded in the ledger audit trail."
);
return ExitCode::from(2);
}
let workspace = resolve_workspace(workspace);
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(r) => r,
Err(e) => {
eprintln!("djogi migrations baseline: tokio runtime: {e}");
return ExitCode::from(1);
}
};
let exit = runtime.block_on(async {
run_baseline(
&workspace,
version,
description,
reason,
app,
database,
node_id,
single_node_dev,
)
.await
});
ExitCode::from(exit as u8)
}
#[expect(
clippy::too_many_arguments,
reason = "baseline async body keeps CLI arguments explicit through validation and connection setup"
)]
async fn run_baseline(
workspace: &Path,
version: &str,
description: &str,
reason: &str,
app: Option<&str>,
database: Option<&str>,
node_id: Option<u32>,
single_node_dev: bool,
) -> i32 {
use djogi::config::DjogiConfig;
let config = match DjogiConfig::load_from_workspace(workspace) {
Ok(c) => c,
Err(e) => {
eprintln!("djogi migrations baseline: config load: {e}");
return 1;
}
};
let runner_identity = match crate::identity::resolve_identity(
node_id,
single_node_dev,
&config.profile,
"baseline",
) {
Ok(resolved) => Some(resolved.into_runner_identity()),
Err(e) => {
let _ = crate::identity::print_identity_error("baseline", &e);
return 2;
}
};
let db_name = resolve_database(database, &config);
let url = match resolve_bucket_url(&config.database, &db_name) {
Some(u) => u,
None => {
eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
return 2;
}
};
let mut ctx = match connect_and_check(&url).await {
ContextOutcome::Ready(ctx) => ctx,
ContextOutcome::UnsupportedVersion(e) => {
crate::print_support_boundary_error("migrations baseline", &e);
return 2;
}
ContextOutcome::RuntimeError(msg) => {
eprintln!("djogi migrations baseline: pool: {msg}");
return 1;
}
};
let lock_path = workspace.join(LOCK_FILE_NAME);
let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
Ok(g) => g,
Err(e) => {
eprintln!("djogi migrations baseline: workspace lock: {e}");
return 1;
}
};
let app_label = app.unwrap_or("");
let bucket = BucketKey {
database: db_name,
app: app_label.to_string(),
};
let runner_ctx = RunnerCtx {
bucket: bucket.clone(),
version: version.to_string(),
description: description.to_string(),
checksum_up: String::new(),
checksum_down: None,
snapshot: None,
snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
config: djogi::config::MigrateConfig {
concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
},
out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
audit_pool: match djogi::migrate::resolve_audit_url(&config) {
Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
Err(_) => None,
},
runner_identity,
};
match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
Ok(report) => {
println!(
"djogi migrations baseline: established baseline `{}` \
(ledger_id={}) in {:.1}s",
version,
report.ledger_id,
report.execution_time_ms as f64 / 1000.0
);
0
}
Err(e) => {
eprintln!("djogi migrations baseline: {e}");
baseline_error_exit_code(&e)
}
}
}
fn baseline_error_exit_code(err: &RunnerError) -> i32 {
match err {
RunnerError::VersionAlreadyApplied { .. }
| RunnerError::VersionCollisionNonTerminal { .. }
| RunnerError::BaselineSnapshotShouldNotBeProvided
| RunnerError::AdvisoryUnlockReturnedFalse { .. }
| RunnerError::SnapshotPersistFailed { .. }
| RunnerError::OutOfOrderRejected { .. } => 2,
_ => 1,
}
}
#[cfg(test)]
mod tests {
use super::*;
use djogi::__bypass::RawAccessExt as _;
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
struct DatabaseUrlEnvGuard {
_lock: std::sync::MutexGuard<'static, ()>,
prior: Option<String>,
}
impl DatabaseUrlEnvGuard {
fn new() -> Self {
Self {
_lock: crate::test_env_lock(),
prior: std::env::var("DATABASE_URL").ok(),
}
}
fn set(&self, value: &str) {
unsafe { std::env::set_var("DATABASE_URL", value) };
}
fn remove(&self) {
unsafe { std::env::remove_var("DATABASE_URL") };
}
}
impl Drop for DatabaseUrlEnvGuard {
fn drop(&mut self) {
match &self.prior {
Some(value) => unsafe { std::env::set_var("DATABASE_URL", value) },
None => unsafe { std::env::remove_var("DATABASE_URL") },
}
}
}
fn temp_workspace(tag: &str) -> std::path::PathBuf {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
fs::create_dir_all(&p).unwrap();
p
}
fn write_unreachable_config(work: &std::path::Path) {
let toml = "[database]\nurl = \"postgres://localhost:1/djogi_unreachable\"\n\
max_connections = 1\ndev_mode = false\n\
[server]\nhost = \"127.0.0.1\"\nport = 1234\n";
fs::write(work.join("Djogi.toml"), toml).unwrap();
}
fn without_database_url<T>(f: impl FnOnce() -> T) -> T {
let env_guard = DatabaseUrlEnvGuard::new();
env_guard.remove();
f()
}
#[test]
fn database_url_env_guard_restores_prior_value() {
let env_guard = DatabaseUrlEnvGuard::new();
let expected = env_guard.prior.clone();
let next = if expected.as_deref() == Some("postgres://from-env/test") {
"postgres://temporary/test"
} else {
"postgres://from-env/test"
};
env_guard.set(next);
drop(env_guard);
assert_eq!(std::env::var("DATABASE_URL").ok(), expected);
}
fn current_production_phase_zero_sql(tag: &str) -> String {
let work = temp_workspace(tag);
let lock_path = work.join(LOCK_FILE_NAME);
let guard = acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT).expect("lock");
let models: std::collections::BTreeMap<
djogi::migrate::BucketKey,
djogi::migrate::AppliedSchema,
> = std::collections::BTreeMap::new();
let apps = vec![AppLifecycle {
label: "billing".to_string(),
database: "main".to_string(),
renamed_from: None,
tombstone: false,
}];
let emitted = djogi::migrate::ensure_phase_zero_emitted(
&work,
&models,
&apps,
time::OffsetDateTime::now_utc(),
&guard,
)
.expect("auto-emit Phase 0");
let sql = fs::read_to_string(&emitted[0].up_sql_path).expect("read emitted Phase 0");
drop(guard);
let _ = fs::remove_dir_all(&work);
sql
}
fn markerless_seed_phase_zero_sql(tag: &str) -> String {
let mut sql = current_production_phase_zero_sql(tag);
sql.push_str("\nINSERT INTO heer.heer_nodes (id) VALUES (1);\n");
sql
}
fn phase_zero_with_seed_statement(tag: &str, statement: &str) -> String {
let mut sql = current_production_phase_zero_sql(tag);
sql.push('\n');
sql.push_str(statement);
sql.push('\n');
sql
}
fn extended_seed_statement_cases() -> [(&'static str, &'static str); 4] {
[
(
"cte_insert",
"WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
),
(
"cte_delete",
"WITH moved AS (DELETE FROM heer.heer_node_state RETURNING *) SELECT 1;",
),
(
"merge",
"MERGE INTO heer.heer_nodes AS target USING incoming ON false WHEN NOT MATCHED THEN INSERT (id) VALUES (1);",
),
(
"copy_from",
"COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
),
]
}
fn generated_stale_phase_zero_sql(tag: &str) -> String {
let mut sql = current_production_phase_zero_sql(tag);
sql.push_str(
"\nALTER DATABASE \"mydb\" SET heer.node_id = '1';\n\
ALTER DATABASE \"mydb\" SET heer.ranj_node_id = '1';\n\
SET heer.node_id = '1';\n\
SET heer.ranj_node_id = '1';\n",
);
sql
}
fn seed_capable_phase_zero_sql() -> String {
djogi::testing::phase_zero_sql_for_testing("main", true)
.expect("compose seed-capable Phase 0")
}
fn write_pending_json(
path: &Path,
database: &str,
app: &str,
version: &str,
depends_on: &[&str],
) {
let pending = PendingPlan {
format_version: djogi::migrate::PENDING_FORMAT_VERSION.to_string(),
bucket_database: database.to_string(),
bucket_app: app.to_string(),
version: version.to_string(),
slug: "test".to_string(),
model_snapshot: djogi::migrate::AppliedSchema {
djogi_version: "0.1.0".to_string(),
enums: std::collections::BTreeMap::new(),
format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-06T00:00:00Z".to_string(),
indexes: Vec::new(),
models: std::collections::BTreeMap::new(),
registered_apps: vec![app.to_string()],
},
checksum_up: "V1:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
.to_string(),
checksum_down: None,
composed_at: "2026-06-06T00:00:00Z".to_string(),
depends_on: depends_on.iter().map(|s| s.to_string()).collect(),
};
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(path, serde_json::to_vec_pretty(&pending).unwrap()).unwrap();
}
#[test]
fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
let work = temp_workspace("b1_discover");
let billing_dir = work.join("migrations/main/billing");
fs::create_dir_all(&billing_dir).unwrap();
fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
let global_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&global_dir).unwrap();
fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
let no_snap_dir = work.join("migrations/main/empty_app");
fs::create_dir_all(&no_snap_dir).unwrap();
let buckets = discover_snapshot_buckets_on_disk(&work);
let labels: std::collections::BTreeSet<&str> =
buckets.iter().map(|b| b.app.as_str()).collect();
assert!(
labels.contains("billing"),
"must include the renamed-from bucket: {labels:?}"
);
assert!(
labels.contains(""),
"must include the global bucket: {labels:?}"
);
assert!(
!labels.contains("empty_app"),
"must not include directories without a snapshot: {labels:?}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
let work = temp_workspace("a1_workspace_config");
let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
max_connections = 1\ndev_mode = false\n\
[server]\nhost = \"127.0.0.1\"\nport = 1234\n";
fs::write(work.join("Djogi.toml"), toml).unwrap();
let env_guard = DatabaseUrlEnvGuard::new();
env_guard.remove();
let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
assert_eq!(
config.database.url,
"postgres://discovered-by-workspace-flag/test"
);
assert_eq!(config.server.port, 1234);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn a1_round2_env_override_beats_workspace_toml() {
let work = temp_workspace("a1r2_env_override");
let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
max_connections = 1\ndev_mode = false\n\
[server]\nhost = \"127.0.0.1\"\nport = 1234\n";
fs::write(work.join("Djogi.toml"), toml).unwrap();
let env_guard = DatabaseUrlEnvGuard::new();
env_guard.set("postgres://from-env/test");
let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
assert_eq!(
config.database.url, "postgres://from-env/test",
"env DATABASE_URL must win over workspace Djogi.toml"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn apply_no_pending_is_identity_free_and_skips_pool_connect() {
let work = temp_workspace("apply_no_pending");
write_unreachable_config(&work);
let exit = without_database_url(|| {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime");
runtime.block_on(run_apply(&work, &FakeMode::Real, None, false))
});
assert_eq!(
exit, 0,
"no-pending apply must return before identity resolution or pool checkout"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_pending_plans_orders_phase_zero_before_normal_global() {
let work = temp_workspace("discover_pending_phase_zero_first");
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: String::new(),
},
),
"main",
"",
"V20260606010101__later_global",
&[],
);
write_pending_json(
&djogi::migrate::phase_zero_pending_json_path(
&work,
"main",
djogi::migrate::PHASE_ZERO_VERSION,
),
"main",
"",
djogi::migrate::PHASE_ZERO_VERSION,
&[],
);
let discovered = discover_pending_plans(&work).expect("discover");
assert_eq!(discovered.len(), 2);
assert_eq!(
discovered[0].plan.version,
djogi::migrate::PHASE_ZERO_VERSION
);
assert!(discovered[0].is_phase_zero);
assert_eq!(discovered[1].plan.version, "V20260606010101__later_global");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_orders_same_version_buckets_by_depends_on() {
let work = temp_workspace("discover_pending_depends_on");
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: "system".to_string(),
},
),
"main",
"system",
"V20260609000000__initial",
&["users"],
);
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: "users".to_string(),
},
),
"main",
"users",
"V20260609000000__initial",
&[],
);
let plans = discover_pending_plans(&work).expect("discovers");
let apps: Vec<&str> = plans.iter().map(|p| p.bucket.app.as_str()).collect();
assert_eq!(apps, ["users", "system"]);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_orders_no_dependency_buckets_alphabetically() {
let work = temp_workspace("discover_pending_alpha_tiebreak");
for app in &["charlie", "bravo", "alpha"] {
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: app.to_string(),
},
),
"main",
app,
"V20260609000000__initial",
&[],
);
}
let plans = discover_pending_plans(&work).expect("discovers");
let apps: Vec<&str> = plans.iter().map(|p| p.bucket.app.as_str()).collect();
assert_eq!(apps, ["alpha", "bravo", "charlie"]);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_depends_on_missing_bucket_is_ignored() {
let work = temp_workspace("discover_pending_deps_missing");
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: "system".to_string(),
},
),
"main",
"system",
"V20260609000000__initial",
&["billing"],
);
let plans = discover_pending_plans(&work).expect("discovers");
assert_eq!(plans.len(), 1);
assert_eq!(plans[0].bucket.app, "system");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_depends_on_cycle_is_refused() {
let work = temp_workspace("discover_pending_deps_cycle");
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: "alpha".to_string(),
},
),
"main",
"alpha",
"V20260609000000__initial",
&["beta"],
);
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: "beta".to_string(),
},
),
"main",
"beta",
"V20260609000000__initial",
&["alpha"],
);
let err = discover_pending_plans(&work).expect_err("cycle must be refused");
assert!(
err.contains("alpha") && err.contains("beta") && err.contains("cycle"),
"error should name both apps and mention cycle, got: {err}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn single_bucket_with_invalid_depends_on_is_refused() {
let make_singleton = |dep: &str| -> Vec<DiscoveredPendingPlan> {
let plan = PendingPlan {
format_version: djogi::migrate::PENDING_FORMAT_VERSION.to_string(),
bucket_database: "main".to_string(),
bucket_app: "system".to_string(),
version: "V20260609000000__initial".to_string(),
slug: "test".to_string(),
model_snapshot: djogi::migrate::AppliedSchema {
djogi_version: "0.1.0".to_string(),
enums: std::collections::BTreeMap::new(),
format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-09T00:00:00Z".to_string(),
indexes: Vec::new(),
models: std::collections::BTreeMap::new(),
registered_apps: vec!["system".to_string()],
},
checksum_up: "V1:".to_string() + &"a".repeat(64),
checksum_down: None,
composed_at: "2026-06-09T00:00:00Z".to_string(),
depends_on: vec![dep.to_string()],
};
vec![DiscoveredPendingPlan {
path: PathBuf::from("target/djogi_pending/main/system.json"),
bucket: BucketKey {
database: "main".to_string(),
app: "system".to_string(),
},
plan,
is_phase_zero: false,
}]
};
for bad_label in ["../traversal", "has space"] {
let err = order_pending_groups_by_dependencies(make_singleton(bad_label))
.expect_err("invalid singleton depends_on label must be refused");
assert!(
err.contains("invalid depends_on label")
&& err.contains("main")
&& err.contains("system"),
"[{bad_label}] error must name database, app, and the invalid label: {err}"
);
}
}
#[djogi::djogi_test]
async fn cross_bucket_fk_applies_in_dependency_order(mut ctx: djogi::context::DjogiContext) {
static E2E_COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = E2E_COUNTER.fetch_add(1, Ordering::SeqCst);
let users_table = format!("e2e_users_{n}");
let event_log_table = format!("e2e_event_log_{n}");
let work = temp_workspace("cross-bucket-fk-e2e");
let guard = djogi::migrate::acquire_workspace_lock(
&work.join(LOCK_FILE_NAME),
std::time::Duration::from_secs(5),
)
.expect("lock workspace");
let mut models: std::collections::BTreeMap<
djogi::migrate::BucketKey,
djogi::migrate::AppliedSchema,
> = std::collections::BTreeMap::new();
let users_bucket = BucketKey {
database: "main".into(),
app: "users".into(),
};
let system_bucket = BucketKey {
database: "main".into(),
app: "system".into(),
};
{
let mut users_schema = djogi::migrate::AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: std::collections::BTreeMap::new(),
format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-10T00:00:00Z".to_string(),
indexes: Vec::new(),
models: std::collections::BTreeMap::new(),
registered_apps: vec!["users".to_string()],
};
users_schema.models.insert(
users_table.clone(),
djogi::migrate::TableSchema {
app: Some("users".to_string()),
columns: vec![djogi::migrate::ColumnSchema {
name: "id".to_string(),
sql_type: "BIGINT".to_string(),
nullable: false,
default_sql: Some("heerid_next_desc()".to_string()),
..default_col()
}],
primary_key: djogi::migrate::PrimaryKeySchema {
columns: vec!["id".to_string()],
kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
},
table: users_table.clone(),
..default_table()
},
);
models.insert(users_bucket.clone(), users_schema);
}
{
let mut system_schema = djogi::migrate::AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: std::collections::BTreeMap::new(),
format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-10T00:00:00Z".to_string(),
indexes: Vec::new(),
models: std::collections::BTreeMap::new(),
registered_apps: vec!["system".to_string()],
};
system_schema.models.insert(
event_log_table.clone(),
djogi::migrate::TableSchema {
app: Some("system".to_string()),
columns: vec![
djogi::migrate::ColumnSchema {
name: "id".to_string(),
sql_type: "BIGINT".to_string(),
nullable: false,
default_sql: Some("heerid_next_desc()".to_string()),
..default_col()
},
djogi::migrate::ColumnSchema {
name: "user_id".to_string(),
sql_type: "BIGINT".to_string(),
nullable: false,
foreign_key: Some(djogi::migrate::ForeignKeySchema {
deferrable: false,
initially_deferred: false,
on_delete: djogi::migrate::OnDeleteSchema::Restrict,
ref_column: "id".to_string(),
ref_table: users_table.clone(),
}),
..default_col()
},
],
primary_key: djogi::migrate::PrimaryKeySchema {
columns: vec!["id".to_string()],
kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
},
table: event_log_table.clone(),
..default_table()
},
);
models.insert(system_bucket.clone(), system_schema);
}
let mut snapshots = std::collections::BTreeMap::new();
for bucket in [&users_bucket, &system_bucket] {
snapshots.insert(
bucket.clone(),
djogi::migrate::AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: std::collections::BTreeMap::new(),
format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-10T00:00:00Z".to_string(),
indexes: Vec::new(),
models: std::collections::BTreeMap::new(),
registered_apps: vec![bucket.app.clone()],
},
);
}
let apps = vec![
djogi::migrate::AppLifecycle {
label: "users".into(),
database: "main".into(),
renamed_from: None,
tombstone: false,
},
djogi::migrate::AppLifecycle {
label: "system".into(),
database: "main".into(),
renamed_from: None,
tombstone: false,
},
];
let compose_req = djogi::migrate::ComposeRequest {
workspace_root: &work,
models: &models,
snapshots: &snapshots,
apps: &apps,
name: "cross-bucket-fk",
allow_destructive: false,
force_overwrite: false,
now: time::OffsetDateTime::UNIX_EPOCH
+ time::Duration::days(19726)
+ time::Duration::seconds(0),
_guard: &guard,
pk_flip_join_table_option: None,
skip_phase_zero_auto_emit: false,
};
let compose_report = djogi::migrate::compose(compose_req).expect("compose");
assert!(
!compose_report.composed_buckets.is_empty(),
"compose should produce delta buckets"
);
drop(guard);
let composed_version = &compose_report.composed_buckets[0].version;
let test_db = ctx
.raw_scalar::<String>("SELECT current_database()", &[])
.await
.expect("current_database");
let admin_url = std::env::var("DATABASE_URL").expect(
"DATABASE_URL must be set for djogi_test \
(e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
);
let test_db_url = replace_db_in_url(&admin_url, &test_db)
.expect("construct per-test database URL from DATABASE_URL");
fs::write(
work.join("Djogi.toml"),
format!(
"[database]\nurl = \"{test_db_url}\"\n\
max_connections = 1\ndev_mode = false\n\
[server]\nhost = \"127.0.0.1\"\nport = 8080\n"
),
)
.unwrap();
let db_url_guard = DatabaseUrlEnvGuard::new();
db_url_guard.set(&test_db_url);
let exit = {
let work = work.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime")
.block_on(run_apply(
&work,
&FakeMode::Real,
None,
true, ))
})
.await
.expect("spawn_blocking join")
};
drop(db_url_guard);
assert_eq!(
exit, 0,
"apply should succeed (tables created in FK dependency order)"
);
let fk_rows = ctx
.raw_rows(
"SELECT c.conname \
FROM pg_constraint c \
JOIN pg_class r ON r.oid = c.conrelid \
JOIN pg_class f ON f.oid = c.confrelid \
WHERE r.relname = $1 AND c.contype = 'f' AND f.relname = $2",
&[&event_log_table.as_str(), &users_table.as_str()],
)
.await
.expect("query pg_constraint");
assert!(
!fk_rows.is_empty(),
"FK constraint should exist from {event_log_table} → {users_table}"
);
let ledger_rows = ctx
.raw_rows(
"SELECT app_label FROM djogi_schema_migrations \
WHERE version = $1 AND status = 'applied'",
&[&composed_version.as_str()],
)
.await
.expect("query ledger");
assert_eq!(
ledger_rows.len(),
2,
"ledger should have exactly 2 rows for composed version {composed_version} \
(users + system), got {} rows",
ledger_rows.len()
);
let app_labels: Vec<String> = ledger_rows
.iter()
.map(|row| row.try_get(0).expect("decode app_label"))
.collect();
assert!(
app_labels.contains(&"users".to_string()),
"ledger should have 'users' bucket: {app_labels:?}"
);
assert!(
app_labels.contains(&"system".to_string()),
"ledger should have 'system' bucket: {app_labels:?}"
);
let ordered_rows = ctx
.raw_rows(
"SELECT app_label, id FROM djogi_schema_migrations \
WHERE version = $1 AND status = 'applied' ORDER BY id",
&[&composed_version.as_str()],
)
.await
.expect("query ledger ordered");
assert_eq!(ordered_rows[0].try_get::<_, String>(0).unwrap(), "users");
assert_eq!(ordered_rows[1].try_get::<_, String>(0).unwrap(), "system");
let _ = fs::remove_dir_all(&work);
}
#[djogi::djogi_test]
async fn shared_enum_multi_slice_applies(mut ctx: djogi::context::DjogiContext) {
static E2E_COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = E2E_COUNTER.fetch_add(1, Ordering::SeqCst);
let posts_table = format!("e2e_posts_{n}");
let comments_table = format!("e2e_comments_{n}");
let work = temp_workspace("shared-enum-e2e");
let guard = djogi::migrate::acquire_workspace_lock(
&work.join(LOCK_FILE_NAME),
std::time::Duration::from_secs(5),
)
.expect("lock workspace");
let mut models: std::collections::BTreeMap<
djogi::migrate::BucketKey,
djogi::migrate::AppliedSchema,
> = std::collections::BTreeMap::new();
let alpha_bucket = BucketKey {
database: "main".into(),
app: "alpha".into(),
};
let beta_bucket = BucketKey {
database: "main".into(),
app: "beta".into(),
};
let mood_enum = djogi::migrate::schema::EnumSchema {
name: "mood".to_string(),
variants: vec!["happy".to_string(), "sad".to_string()],
};
{
let mut alpha_schema = djogi::migrate::AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: std::collections::BTreeMap::new(),
format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-10T00:00:00Z".to_string(),
indexes: Vec::new(),
models: std::collections::BTreeMap::new(),
registered_apps: vec!["alpha".to_string()],
};
alpha_schema
.enums
.insert("mood".to_string(), mood_enum.clone());
alpha_schema.models.insert(
posts_table.clone(),
djogi::migrate::TableSchema {
app: Some("alpha".to_string()),
columns: vec![
djogi::migrate::ColumnSchema {
name: "id".to_string(),
sql_type: "BIGINT".to_string(),
nullable: false,
default_sql: Some("heerid_next_desc()".to_string()),
..default_col()
},
djogi::migrate::ColumnSchema {
name: "mood".to_string(),
sql_type: "mood".to_string(),
nullable: true,
..default_col()
},
],
primary_key: djogi::migrate::PrimaryKeySchema {
columns: vec!["id".to_string()],
kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
},
table: posts_table.clone(),
..default_table()
},
);
models.insert(alpha_bucket.clone(), alpha_schema);
}
{
let mut beta_schema = djogi::migrate::AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: std::collections::BTreeMap::new(),
format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-10T00:00:00Z".to_string(),
indexes: Vec::new(),
models: std::collections::BTreeMap::new(),
registered_apps: vec!["beta".to_string()],
};
beta_schema.enums.insert("mood".to_string(), mood_enum);
beta_schema.models.insert(
comments_table.clone(),
djogi::migrate::TableSchema {
app: Some("beta".to_string()),
columns: vec![
djogi::migrate::ColumnSchema {
name: "id".to_string(),
sql_type: "BIGINT".to_string(),
nullable: false,
default_sql: Some("heerid_next_desc()".to_string()),
..default_col()
},
djogi::migrate::ColumnSchema {
name: "post_id".to_string(),
sql_type: "BIGINT".to_string(),
nullable: false,
foreign_key: Some(djogi::migrate::ForeignKeySchema {
deferrable: false,
initially_deferred: false,
on_delete: djogi::migrate::OnDeleteSchema::Restrict,
ref_column: "id".to_string(),
ref_table: posts_table.clone(),
}),
..default_col()
},
djogi::migrate::ColumnSchema {
name: "author_mood".to_string(),
sql_type: "mood".to_string(),
nullable: true,
..default_col()
},
],
primary_key: djogi::migrate::PrimaryKeySchema {
columns: vec!["id".to_string()],
kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
},
table: comments_table.clone(),
..default_table()
},
);
models.insert(beta_bucket.clone(), beta_schema);
}
let mut snapshots = std::collections::BTreeMap::new();
for bucket in [&alpha_bucket, &beta_bucket] {
snapshots.insert(
bucket.clone(),
djogi::migrate::AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: std::collections::BTreeMap::new(),
format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-10T00:00:00Z".to_string(),
indexes: Vec::new(),
models: std::collections::BTreeMap::new(),
registered_apps: vec![bucket.app.clone()],
},
);
}
let apps = vec![
djogi::migrate::AppLifecycle {
label: "alpha".into(),
database: "main".into(),
renamed_from: None,
tombstone: false,
},
djogi::migrate::AppLifecycle {
label: "beta".into(),
database: "main".into(),
renamed_from: None,
tombstone: false,
},
];
let compose_req = djogi::migrate::ComposeRequest {
workspace_root: &work,
models: &models,
snapshots: &snapshots,
apps: &apps,
name: "shared-enum-multi-slice",
allow_destructive: false,
force_overwrite: false,
now: time::OffsetDateTime::UNIX_EPOCH
+ time::Duration::days(19726)
+ time::Duration::seconds(0),
_guard: &guard,
pk_flip_join_table_option: None,
skip_phase_zero_auto_emit: false,
};
let compose_report = djogi::migrate::compose(compose_req).expect("compose");
assert!(
!compose_report.composed_buckets.is_empty(),
"compose should produce delta buckets"
);
drop(guard);
let composed_version = &compose_report.composed_buckets[0].version;
let test_db = ctx
.raw_scalar::<String>("SELECT current_database()", &[])
.await
.expect("current_database");
let admin_url = std::env::var("DATABASE_URL").expect(
"DATABASE_URL must be set for djogi_test \
(e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
);
let test_db_url = replace_db_in_url(&admin_url, &test_db)
.expect("construct per-test database URL from DATABASE_URL");
fs::write(
work.join("Djogi.toml"),
format!(
"[database]\nurl = \"{test_db_url}\"\n\
max_connections = 1\ndev_mode = false\n\
[server]\nhost = \"127.0.0.1\"\nport = 8080\n"
),
)
.unwrap();
let db_url_guard = DatabaseUrlEnvGuard::new();
db_url_guard.set(&test_db_url);
let exit = {
let work = work.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime")
.block_on(run_apply(
&work,
&FakeMode::Real,
None,
true, ))
})
.await
.expect("spawn_blocking join")
};
drop(db_url_guard);
assert_eq!(
exit, 0,
"apply should succeed (enum created once, tables in FK order)"
);
let mood_count = ctx
.raw_scalar::<i64>(
"SELECT count(*) FROM pg_type WHERE typname = $1",
&[&"mood"],
)
.await
.expect("query pg_type for mood");
assert_eq!(
mood_count, 1,
"exactly one mood enum type should exist in pg_type, got {mood_count}"
);
let table_count = ctx
.raw_scalar::<i64>(
"SELECT count(*) FROM pg_class WHERE relname = $1 OR relname = $2",
&[&posts_table.as_str(), &comments_table.as_str()],
)
.await
.expect("query pg_class for tables");
assert_eq!(
table_count, 2,
"both tables should exist ({posts_table}, {comments_table}), got {table_count}"
);
let ledger_rows = ctx
.raw_rows(
"SELECT app_label FROM djogi_schema_migrations \
WHERE version = $1 AND status = 'applied'",
&[&composed_version.as_str()],
)
.await
.expect("query ledger");
assert_eq!(
ledger_rows.len(),
2,
"ledger should have exactly 2 rows for composed version {composed_version} \
(alpha + beta), got {} rows",
ledger_rows.len()
);
let _ = fs::remove_dir_all(&work);
}
fn replace_db_in_url(url: &str, new_db: &str) -> Option<String> {
let body = url
.strip_prefix("postgres://")
.or_else(|| url.strip_prefix("postgresql://"))?;
let scheme = if url.starts_with("postgres://") {
"postgres://"
} else {
"postgresql://"
};
let mut idx = 0usize;
let body_bytes = body.as_bytes();
while idx < body_bytes.len() && body_bytes[idx] != b'/' {
idx += 1;
}
if idx >= body_bytes.len() {
return None;
}
let authority = &body[..idx];
let path_start = idx + 1;
let mut path_end = path_start;
while path_end < body_bytes.len() && body_bytes[path_end] != b'?' {
path_end += 1;
}
let trailing = &body[path_end..];
Some(format!("{scheme}{authority}/{new_db}{trailing}"))
}
fn default_col() -> djogi::migrate::ColumnSchema {
djogi::migrate::ColumnSchema {
check: None,
codec: None,
comment: None,
default_sql: None,
foreign_key: None,
generated: None,
identity: None,
index_type: None,
indexed: false,
max_length: None,
name: "".to_string(),
nullable: false,
on_delete: None,
outbox_exclude: false,
rationale: None,
relation_kind: None,
renamed_from: None,
sequence_within: None,
sql_type: "".to_string(),
unique: false,
type_change_using: None,
}
}
fn default_table() -> djogi::migrate::TableSchema {
djogi::migrate::TableSchema {
app: None,
columns: Vec::new(),
exclusion_constraints: Vec::new(),
fts: None,
is_through: false,
moved_from_app: None,
partition: None,
primary_key: djogi::migrate::PrimaryKeySchema {
columns: Vec::new(),
kind: djogi::migrate::PkKindSchema::Composite,
},
rationale: None,
renamed_from: None,
rls_enabled: false,
table: "".to_string(),
table_comment: None,
storage_params: None,
tablespace: None,
tenant_key: None,
}
}
#[test]
fn discover_pending_plans_refuses_malformed_pending_json() {
let work = temp_workspace("discover_pending_malformed");
let path = djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: String::new(),
},
);
fs::create_dir_all(path.parent().unwrap()).unwrap();
fs::write(&path, b"{ not json").unwrap();
let err = discover_pending_plans(&work).expect_err("malformed pending must refuse");
assert!(err.contains("parse pending JSON"));
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_pending_plans_refuses_hidden_phase_zero_database_mismatch() {
let work = temp_workspace("discover_pending_phase_zero_db_mismatch");
write_pending_json(
&djogi::migrate::phase_zero_pending_json_path(
&work,
"main",
djogi::migrate::PHASE_ZERO_VERSION,
),
"other_db",
"",
djogi::migrate::PHASE_ZERO_VERSION,
&[],
);
let err = discover_pending_plans(&work).expect_err("hidden Phase 0 mismatch must refuse");
assert!(
err.contains("expected main from path"),
"unexpected error: {err}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_pending_plans_refuses_normal_global_phase_zero_pending() {
let work = temp_workspace("discover_pending_normal_global_phase_zero");
let path = djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: String::new(),
},
);
write_pending_json(&path, "main", "", djogi::migrate::PHASE_ZERO_VERSION, &[]);
let err = discover_pending_plans(&work).expect_err("normal-global Phase 0 must refuse");
assert!(
err.contains("Phase 0") && err.contains(".phase_zero"),
"unexpected error: {err}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_pending_plans_refuses_normal_pending_app_mismatch() {
let work = temp_workspace("discover_pending_normal_app_mismatch");
let path = djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: "billing".to_string(),
},
);
write_pending_json(&path, "main", "audit", "V20260606010101__mismatch", &[]);
let err = discover_pending_plans(&work).expect_err("normal app mismatch must refuse");
assert!(
err.contains("expected billing from path"),
"unexpected error: {err}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn discover_pending_plans_refuses_noncanonical_normal_pending_filename() {
let work = temp_workspace("discover_pending_noncanonical_filename");
let path = work.join("target/djogi_pending/main/bad-name.json");
write_pending_json(&path, "main", "bad-name", "V20260606010101__bad_name", &[]);
let err = discover_pending_plans(&work).expect_err("non-canonical filename must refuse");
assert!(
err.contains("non-canonical app filename"),
"unexpected error: {err}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn load_verified_pending_for_apply_refuses_changed_artifact() {
let work = temp_workspace("apply_pending_changed_after_discovery");
let path = djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: String::new(),
},
);
write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
let discovered = discover_pending_plans(&work).expect("discover");
fs::write(
&path,
serde_json::to_vec_pretty(&PendingPlan {
version: "V20260606010102__changed".to_string(),
..discovered[0].plan.clone()
})
.unwrap(),
)
.unwrap();
let err = load_verified_pending_for_apply(&discovered[0])
.expect_err("apply must refuse a changed pending artifact");
assert!(
err.contains("changed after discovery"),
"unexpected error: {err}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn reconcile_pending_plans_after_lock_refuses_added_artifact() {
let work = temp_workspace("apply_pending_added_before_lock");
let path = djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: String::new(),
},
);
write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
let discovered = discover_pending_plans(&work).expect("discover");
write_pending_json(
&djogi::migrate::phase_zero_pending_json_path(
&work,
"main",
djogi::migrate::PHASE_ZERO_VERSION,
),
"main",
"",
djogi::migrate::PHASE_ZERO_VERSION,
&[],
);
let err = reconcile_pending_plans_after_lock(&work, &discovered)
.expect_err("locked reconciliation must refuse a changed pending set");
assert!(
err.contains("changed while waiting for the workspace lock"),
"unexpected error: {err}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn reconcile_pending_plans_after_lock_accepts_unchanged_set() {
let work = temp_workspace("apply_pending_stable_under_lock");
let path = djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: String::new(),
},
);
write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
let discovered = discover_pending_plans(&work).expect("discover");
let locked = reconcile_pending_plans_after_lock(&work, &discovered)
.expect("unchanged set must reconcile");
assert_eq!(locked, discovered);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn repair_checksum_drift_is_identity_free() {
let work = temp_workspace("repair_checksum_identity_free");
write_unreachable_config(&work);
let exit = without_database_url(|| {
repair_checksum_drift_cmd(
"V20260601000000__repair_checksum",
None,
None,
Some("V1:0000000000000000000000000000000000000000000000000000000000000000"),
None,
Some(work.clone()),
)
});
assert_eq!(
exit,
ExitCode::from(1),
"checksum-drift should reach pool connection without shared identity validation"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn repair_partial_apply_is_identity_free() {
let work = temp_workspace("repair_partial_identity_free");
write_unreachable_config(&work);
let exit = without_database_url(|| {
repair_partial_apply_cmd(
"V20260601000000__repair_partial",
PartialApplyResolution::MarkRolledBack,
"operator confirmed rollback",
None,
None,
Some(work.clone()),
)
});
assert_eq!(
exit,
ExitCode::from(1),
"partial-apply should reach pool connection without shared identity validation"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn repair_snapshot_rebuild_is_identity_free() {
let work = temp_workspace("repair_snapshot_identity_free");
write_unreachable_config(&work);
let exit = without_database_url(|| {
repair_snapshot_rebuild_cmd(None, None, None, Some(work.clone()))
});
assert_eq!(
exit,
ExitCode::from(1),
"snapshot-rebuild should reach pool connection without shared identity validation"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
use djogi::migrate::projection::BucketKey;
use djogi::migrate::schema::{
ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
};
use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
use std::collections::BTreeMap;
let work = temp_workspace("b1r2_compose_uses_discovery");
let billing_bucket = BucketKey {
database: "main".into(),
app: "billing".into(),
};
let mut billing_snap = AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: BTreeMap::new(),
format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-04-25T00:00:00Z".to_string(),
indexes: Vec::new(),
models: BTreeMap::new(),
registered_apps: vec!["billing".to_string()],
};
billing_snap.models.insert(
"widgets".to_string(),
TableSchema {
app: Some("billing".to_string()),
columns: vec![ColumnSchema {
check: None,
codec: None,
comment: None,
default_sql: Some("heerid_next_desc()".to_string()),
foreign_key: None,
generated: None,
identity: None,
index_type: None,
indexed: false,
max_length: None,
name: "id".to_string(),
nullable: false,
on_delete: None,
outbox_exclude: false,
rationale: None,
relation_kind: None,
renamed_from: None,
sequence_within: None,
sql_type: "BIGINT".to_string(),
unique: false,
type_change_using: None,
}],
exclusion_constraints: Vec::new(),
fts: None,
is_through: false,
moved_from_app: None,
partition: None,
primary_key: PrimaryKeySchema {
columns: vec!["id".to_string()],
kind: PkKindSchema::HeerIdRecencyBiased,
},
rationale: None,
renamed_from: None,
rls_enabled: false,
table: "widgets".to_string(),
table_comment: None,
storage_params: None,
tablespace: None,
tenant_key: None,
},
);
let snap_path = snapshot_path(&work, &billing_bucket);
save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
let exit = compose_with_inputs(
&work,
"drop billing remnant",
true, false, &empty_models,
&[AppLifecycle {
label: "billing".to_string(),
database: "main".to_string(),
renamed_from: None,
tombstone: true, }],
now,
None, );
assert_eq!(exit, ExitCode::from(0), "compose must succeed");
let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
let mut up_path: Option<PathBuf> = None;
for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
let n = entry.file_name().to_string_lossy().to_string();
if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
up_path = Some(entry.path());
break;
}
}
let up_path = up_path.expect("compose must have written an up SQL file");
let up_sql = fs::read_to_string(&up_path).unwrap();
assert!(
up_sql.contains("DROP TABLE \"widgets\""),
"compose must have seen the disk snapshot and emitted DROP TABLE — \
this proves discover_snapshot_buckets_on_disk reached the differ. \
SQL: {up_sql}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn compose_cycle_exits_with_code_two() {
use djogi::migrate::projection::BucketKey;
use djogi::migrate::schema::{
AppliedSchema, ColumnSchema, ForeignKeySchema, OnDeleteSchema, PkKindSchema,
PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
};
use std::collections::BTreeMap;
let work = temp_workspace("compose_cycle_exit_two");
let fk_col = |name: &str, target_table: &str| -> ColumnSchema {
ColumnSchema {
name: name.to_string(),
sql_type: "BIGINT".to_string(),
foreign_key: Some(ForeignKeySchema {
deferrable: false,
initially_deferred: false,
on_delete: OnDeleteSchema::Restrict,
ref_column: "id".to_string(),
ref_table: target_table.to_string(),
}),
..default_col()
}
};
let table_with_fk =
|app: &str, table: &str, fk_name: &str, fk_target: &str| -> TableSchema {
let id_col = ColumnSchema {
name: "id".to_string(),
sql_type: "BIGINT".to_string(),
default_sql: Some("heerid_next_desc()".to_string()),
..default_col()
};
TableSchema {
app: Some(app.to_string()),
columns: vec![id_col, fk_col(fk_name, fk_target)],
primary_key: PrimaryKeySchema {
columns: vec!["id".to_string()],
kind: PkKindSchema::HeerIdRecencyBiased,
},
table: table.to_string(),
..default_table()
}
};
let schema_for =
|app: &str, table: &str, fk_name: &str, fk_target: &str| -> AppliedSchema {
let mut models = BTreeMap::new();
models.insert(
table.to_string(),
table_with_fk(app, table, fk_name, fk_target),
);
AppliedSchema {
djogi_version: env!("CARGO_PKG_VERSION").to_string(),
enums: BTreeMap::new(),
format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
generated_at: "2026-06-10T00:00:00Z".to_string(),
indexes: Vec::new(),
models,
registered_apps: vec![app.to_string()],
}
};
let a_bucket = BucketKey {
database: "main".into(),
app: "a".into(),
};
let b_bucket = BucketKey {
database: "main".into(),
app: "b".into(),
};
let mut models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
models.insert(a_bucket, schema_for("a", "table_a", "b_id", "table_b"));
models.insert(b_bucket, schema_for("b", "table_b", "a_id", "table_a"));
let now = time::OffsetDateTime::from_unix_timestamp(1_749_513_600).unwrap();
let exit = compose_with_inputs(
&work,
"cross-bucket cycle",
false, false, &models,
&[
AppLifecycle {
label: "a".to_string(),
database: "main".to_string(),
renamed_from: None,
tombstone: false,
},
AppLifecycle {
label: "b".to_string(),
database: "main".to_string(),
renamed_from: None,
tombstone: false,
},
],
now,
None, );
assert_eq!(
exit,
ExitCode::from(2),
"a cross-bucket FK cycle must exit 2 (operator-actionable refusal), not 1"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn a1_round2_status_cmd_threads_workspace_to_config() {
let work = temp_workspace("a1r2_status_workspace");
fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
let exit = status_cmd(Some(work.clone()));
assert_eq!(
exit,
ExitCode::from(1),
"malformed workspace Djogi.toml must surface as config load error"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn u3_attune_refusal_variants_map_to_exit_code_two() {
use djogi::migrate::AttuneRefusal;
let cases = [
AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
database_url: "postgres://prod.example.com/main".to_string(),
}),
AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
profile: "production".to_string(),
}),
AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
env_value: "production".to_string(),
}),
AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
version: "V20260101000000__missing".to_string(),
}),
AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
version: "V20260101000000__shared".to_string(),
buckets: vec!["main/users".to_string(), "main/billing".to_string()],
}),
];
for err in &cases {
assert_eq!(
attune_error_exit_code(err),
2,
"refusal variant must map to exit 2: {err}"
);
}
}
#[test]
fn u3_attune_runtime_variants_map_to_exit_code_one() {
let cases = [
AttuneError::FilesystemScanFailed {
source: std::io::Error::other("disk full"),
},
AttuneError::SqlReadFailed {
path: PathBuf::from("/tmp/x.sdjql"),
source: std::io::Error::other("permission denied"),
},
AttuneError::SqlWriteFailed {
path: PathBuf::from("/tmp/x.sdjql"),
source: std::io::Error::other("read-only fs"),
},
AttuneError::SqlDeleteFailed {
path: PathBuf::from("/tmp/x.sdjql"),
source: std::io::Error::other("not found"),
},
AttuneError::GitPublishFailed {
stderr: "fatal: refusing to push".to_string(),
status_code: Some(128),
},
];
for err in &cases {
assert_eq!(
attune_error_exit_code(err),
1,
"runtime variant must map to exit 1: {err}"
);
}
}
#[test]
fn baseline_empty_reason_exits_code_2() {
let result = baseline_cmd(
"V00000000000000__baseline",
"description",
"",
None,
None,
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
None, false, );
assert_eq!(
result,
ExitCode::from(2),
"empty --reason must exit 2 before any DB work"
);
}
#[test]
fn baseline_whitespace_reason_exits_code_2() {
let result = baseline_cmd(
"V00000000000000__baseline",
"description",
" ",
None,
None,
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
None, false, );
assert_eq!(
result,
ExitCode::from(2),
"whitespace-only --reason must exit 2 before any DB work"
);
}
#[test]
fn baseline_refusal_variants_map_to_exit_code_two() {
let cases = [
RunnerError::VersionAlreadyApplied {
version: "V00000000000000__baseline".to_string(),
applied_at: None,
},
RunnerError::VersionCollisionNonTerminal {
version: "V00000000000000__baseline".to_string(),
status: LedgerStatus::Pending,
run_id: 1,
},
RunnerError::BaselineSnapshotShouldNotBeProvided,
RunnerError::AdvisoryUnlockReturnedFalse {
bucket: BucketKey {
database: "main".to_string(),
app: String::new(),
},
key: 0x0102_0304_0506_0708,
},
RunnerError::OutOfOrderRejected {
version: "V00000000000000__baseline".to_string(),
conflicting_version: "V20260101000000__later".to_string(),
conflicting_applied_at: None,
},
];
for err in &cases {
assert_eq!(
baseline_error_exit_code(err),
2,
"baseline refusal variant must map to exit 2: {err}"
);
}
}
#[test]
fn baseline_transient_variants_map_to_exit_code_one() {
use djogi::error::{DbError, DjogiError};
let cases = [
RunnerError::LedgerBootstrapFailed {
source: DjogiError::Db(DbError::other("create table failed")),
},
RunnerError::LedgerWriteFailed {
version: "V00000000000000__baseline".to_string(),
source: DjogiError::Db(DbError::other("insert failed")),
},
RunnerError::PinnedSessionCheckoutFailed {
source: DjogiError::Db(DbError::other("pool exhausted")),
},
RunnerError::AdvisoryLockFailed {
bucket: BucketKey {
database: "main".to_string(),
app: String::new(),
},
key: 0x0102_0304_0506_0708,
attempts: 3,
},
];
for err in &cases {
assert_eq!(
baseline_error_exit_code(err),
1,
"baseline transient variant must map to exit 1: {err}"
);
}
}
#[test]
fn fake_without_reason_exits_code_2() {
let result = apply_cmd(
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
true,
None,
None, false, );
assert_eq!(
result,
ExitCode::from(2),
"--fake without --reason must exit 2"
);
}
#[test]
fn fake_with_empty_reason_exits_code_2() {
let result = apply_cmd(
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
true,
Some(String::new()),
None, false, );
assert_eq!(
result,
ExitCode::from(2),
"--fake with empty reason must exit 2"
);
}
#[test]
fn fake_with_whitespace_reason_exits_code_2() {
let result = apply_cmd(
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
true,
Some(" ".to_string()),
None, false, );
assert_eq!(
result,
ExitCode::from(2),
"--fake with whitespace reason must exit 2"
);
}
#[test]
fn reason_without_fake_is_accepted() {
let result = apply_cmd(
Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
false, Some("test reason".to_string()),
None, true, );
assert_ne!(
result,
ExitCode::from(2),
"--reason without --fake should not refuse"
);
}
fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
djogi::migrate::BucketKey {
database: database.to_string(),
app: app.to_string(),
}
}
fn diag(
code: &str,
severity: djogi::migrate::VerifySeverity,
message: &str,
location: Option<&str>,
) -> djogi::migrate::VerifyDiagnostic {
djogi::migrate::VerifyDiagnostic {
code: code.to_string(),
severity,
message: message.to_string(),
location: location.map(str::to_string),
}
}
#[test]
fn render_verify_report_clean_output() {
use djogi::migrate::VerifyReport;
let report = VerifyReport {
diagnostics: vec![],
latest_applied_version: Some("001_initial".to_string()),
applied_count: 3,
unfinished_count: 0,
};
let bucket = render_bucket("main", "");
let lines = render_verify_report(&report, &bucket);
assert!(
lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
"missing ledger line; got {lines:?}"
);
assert!(
lines.contains(&"No drift detected. Schema is consistent.".to_string()),
"missing clean line; got {lines:?}"
);
assert!(
lines.iter().any(|l| l.contains("Result: PASSED")),
"missing PASSED result; got {lines:?}"
);
assert!(
!lines.iter().any(|l| l.contains("FAILED")),
"clean report must not say FAILED; got {lines:?}"
);
}
#[test]
fn render_verify_report_with_errors() {
use djogi::migrate::{VerifyReport, VerifySeverity};
let report = VerifyReport {
diagnostics: vec![
diag(
"D601",
VerifySeverity::Error,
"Snapshot table missing from live DB",
Some("users"),
),
diag(
"D611",
VerifySeverity::Warning,
"Live index not present in snapshot",
Some("idx_posts_created"),
),
],
latest_applied_version: Some("V20260501000000__add_users".to_string()),
applied_count: 2,
unfinished_count: 0,
};
let bucket = render_bucket("main", "myapp");
assert!(report.has_errors());
let lines = render_verify_report(&report, &bucket);
assert!(
lines
.contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
"missing D601 line; got {lines:?}"
);
assert!(
lines.contains(
&"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
),
"missing D611 line; got {lines:?}"
);
assert!(
lines.iter().any(|l| l.contains("Result: FAILED")),
"error report must say FAILED; got {lines:?}"
);
}
#[test]
fn render_verify_report_header_shows_global_and_named_app() {
use djogi::migrate::VerifyReport;
let report = VerifyReport {
diagnostics: vec![],
latest_applied_version: None,
applied_count: 0,
unfinished_count: 0,
};
let global = render_verify_report(&report, &render_bucket("main", ""));
assert_eq!(
global.first().map(String::as_str),
Some("djogi migrations verify — main/_global_"),
"global bucket header; got {global:?}"
);
let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
assert_eq!(
named.first().map(String::as_str),
Some("djogi migrations verify — crud_log/billing"),
"named bucket header; got {named:?}"
);
}
#[test]
fn render_verify_report_warning_only_passes_with_warnings() {
use djogi::migrate::{VerifyReport, VerifySeverity};
let report = VerifyReport {
diagnostics: vec![diag(
"D606",
VerifySeverity::Warning,
"type differs (advisory)",
Some("users.age"),
)],
latest_applied_version: Some("001_initial".to_string()),
applied_count: 1,
unfinished_count: 0,
};
let lines = render_verify_report(&report, &render_bucket("main", ""));
assert!(
lines
.iter()
.any(|l| l.contains("Result: PASSED with warnings")),
"warning-only must PASS with warnings; got {lines:?}"
);
assert!(
!lines.iter().any(|l| l.contains("FAILED")),
"warning-only must not say FAILED; got {lines:?}"
);
}
#[test]
fn render_verify_report_empty_ledger_line() {
use djogi::migrate::VerifyReport;
let report = VerifyReport {
diagnostics: vec![],
latest_applied_version: None,
applied_count: 0,
unfinished_count: 0,
};
let lines = render_verify_report(&report, &render_bucket("main", ""));
assert!(
lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
"empty ledger line; got {lines:?}"
);
}
#[test]
fn render_verify_report_unfinished_ledger_line() {
use djogi::migrate::VerifyReport;
let report = VerifyReport {
diagnostics: vec![],
latest_applied_version: Some("V20260501000000__add_users".to_string()),
applied_count: 2,
unfinished_count: 1,
};
let lines = render_verify_report(&report, &render_bucket("main", ""));
assert!(
lines.contains(
&"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
),
"unfinished ledger line; got {lines:?}"
);
}
#[test]
fn render_verify_report_info_with_no_location_uses_dash() {
use djogi::migrate::{VerifyReport, VerifySeverity};
let report = VerifyReport {
diagnostics: vec![diag(
"D692",
VerifySeverity::Info,
"enum type(s) declared; not yet checked",
None,
)],
latest_applied_version: Some("001_initial".to_string()),
applied_count: 1,
unfinished_count: 0,
};
let lines = render_verify_report(&report, &render_bucket("main", ""));
assert!(
lines.iter().any(|l| l.contains("(-)")),
"location: None must render as (-); got {lines:?}"
);
assert!(
lines.contains(&"Result: PASSED (1 info(s))".to_string()),
"all-info summary; got {lines:?}"
);
}
fn db_config(
url: &str,
crud_log_url: Option<&str>,
event_log_url: Option<&str>,
) -> djogi::config::DatabaseConfig {
djogi::config::DatabaseConfig {
url: url.to_string(),
crud_log_url: crud_log_url.map(str::to_string),
event_log_url: event_log_url.map(str::to_string),
max_connections: None,
dev_mode: false,
}
}
#[test]
fn resolve_bucket_url_main_uses_app_url_verbatim() {
let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
assert_eq!(
resolve_bucket_url(&cfg, "main").as_deref(),
Some("postgres://user:pass@localhost:5432/myapp_prod"),
"main must return the app URL unchanged"
);
}
#[test]
fn resolve_bucket_url_crud_log_prefers_explicit_url() {
let cfg = db_config(
"postgres://localhost/main",
Some("postgres://localhost/explicit_crud"),
None,
);
assert_eq!(
resolve_bucket_url(&cfg, "crud_log").as_deref(),
Some("postgres://localhost/explicit_crud"),
"crud_log must prefer the explicit crud_log_url"
);
}
#[test]
fn resolve_bucket_url_event_log_prefers_explicit_url() {
let cfg = db_config(
"postgres://localhost/main",
None,
Some("postgres://localhost/explicit_event"),
);
assert_eq!(
resolve_bucket_url(&cfg, "event_log").as_deref(),
Some("postgres://localhost/explicit_event"),
"event_log must prefer the explicit event_log_url"
);
}
#[test]
fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
let cfg = db_config("postgres://localhost/main", Some(""), Some(" "));
assert_eq!(
resolve_bucket_url(&cfg, "crud_log").as_deref(),
Some("postgres://localhost/crud_log"),
"empty crud_log_url must fall back to derived"
);
assert_eq!(
resolve_bucket_url(&cfg, "event_log").as_deref(),
Some(" "),
"non-empty (whitespace) event_log_url is used verbatim"
);
}
#[test]
fn resolve_bucket_url_other_database_derives_from_app_url() {
let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
assert_eq!(
resolve_bucket_url(&cfg, "analytics").as_deref(),
Some("postgres://user:pass@localhost:5432/analytics"),
"an arbitrary database name derives by path splice"
);
}
#[test]
fn resolve_bucket_url_pathless_url_returns_none() {
let cfg = db_config("postgres://localhost", None, None);
assert_eq!(
resolve_bucket_url(&cfg, "crud_log"),
None,
"pathless URL must yield None for a derived database"
);
}
#[test]
fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
let cfg = db_config("postgres://localhost", None, None);
assert_eq!(
resolve_bucket_url(&cfg, "main").as_deref(),
Some("postgres://localhost"),
"main returns the app URL verbatim regardless of path"
);
}
#[test]
fn resolve_apply_target_urls_uses_pending_bucket_databases() {
let work = temp_workspace("apply_target_urls");
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "main".to_string(),
app: String::new(),
},
),
"main",
"",
"V20260607010101__main_global",
&[],
);
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "crud_log".to_string(),
app: "audit".to_string(),
},
),
"crud_log",
"audit",
"V20260607010102__crud_log_audit",
&[],
);
let discovered = discover_pending_plans(&work).expect("discover");
let cfg = db_config(
"postgres://user:pass@localhost:5432/myapp_prod",
Some("postgres://user:pass@localhost:5432/myapp_crud"),
None,
);
let urls = resolve_apply_target_urls(&discovered, &cfg).expect("resolve");
assert_eq!(
urls.len(),
2,
"apply must preserve distinct target databases"
);
assert_eq!(
urls.get("main").map(String::as_str),
Some("postgres://user:pass@localhost:5432/myapp_prod"),
"main pending plans must keep the app database URL"
);
assert_eq!(
urls.get("crud_log").map(String::as_str),
Some("postgres://user:pass@localhost:5432/myapp_crud"),
"crud_log pending plans must route through the crud_log database URL"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn resolve_apply_target_urls_refuses_unresolvable_pending_database() {
let work = temp_workspace("apply_target_urls_unresolvable");
write_pending_json(
&djogi::migrate::pending_json_path(
&work,
&BucketKey {
database: "analytics".to_string(),
app: String::new(),
},
),
"analytics",
"",
"V20260607010103__analytics_global",
&[],
);
let discovered = discover_pending_plans(&work).expect("discover");
let cfg = db_config("postgres://localhost", None, None);
let err = resolve_apply_target_urls(&discovered, &cfg)
.expect_err("pathless app URL must refuse a derived pending database");
assert!(err.contains("analytics"), "unexpected error: {err}");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_bytes_identity_free_production_is_ok() {
let sql = current_production_phase_zero_sql("current_bytes");
assert!(
classify_phase_zero_bytes(sql.as_bytes()).is_none(),
"production Phase 0 should be identity-free replay-current (no refusal)"
);
}
#[test]
fn classify_phase_zero_bytes_seed_capable_is_refused() {
let sql = seed_capable_phase_zero_sql();
let refusal = classify_phase_zero_bytes(sql.as_bytes());
assert!(
refusal.is_some(),
"seed-capable Phase 0 should be refused by cleanup guard"
);
assert!(refusal.unwrap().contains("seed-capable"));
}
#[test]
fn classify_phase_zero_bytes_generated_stale_is_refused() {
let sql = generated_stale_phase_zero_sql("stale_bytes");
let refusal = classify_phase_zero_bytes(sql.as_bytes());
assert!(
refusal.is_some(),
"generated-stale Phase 0 should be refused"
);
assert!(refusal.unwrap().contains("generated-stale"));
}
#[test]
fn classify_phase_zero_bytes_markerless_seed_is_refused() {
let sql = markerless_seed_phase_zero_sql("markerless_seed_bytes");
let refusal = classify_phase_zero_bytes(sql.as_bytes());
assert!(
refusal.is_some(),
"markerless seed Phase 0 should be refused by cleanup guard"
);
assert!(refusal.unwrap().contains("seed-dml"));
}
#[test]
fn classify_phase_zero_bytes_extended_seed_dml_forms_are_refused() {
for (name, statement) in extended_seed_statement_cases() {
let sql =
phase_zero_with_seed_statement(&format!("extended_seed_bytes_{name}"), statement);
let refusal = classify_phase_zero_bytes(sql.as_bytes());
let msg = refusal.expect("extended seed Phase 0 should be refused");
assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
}
}
#[test]
fn classify_phase_zero_bytes_ambiguous_is_refused() {
let sql = "CREATE SCHEMA IF NOT EXISTS heer;\n\
ALTER DATABASE \"mydb\" SET heer.node_id = '1';\n";
let refusal = classify_phase_zero_bytes(sql.as_bytes());
assert!(refusal.is_some(), "ambiguous Phase 0 should be refused");
assert!(refusal.unwrap().contains("ambiguous"));
}
#[test]
fn classify_phase_zero_bytes_missing_is_refused() {
let refusal = classify_phase_zero_bytes(b" \n\t ");
assert!(refusal.is_some(), "missing Phase 0 should be refused");
assert!(refusal.unwrap().contains("missing"));
}
#[test]
fn classify_phase_zero_for_cleanup_refuses_stale_replay_plan() {
let work = temp_workspace("stale_cleanup");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let replay = CliReplayPlan {
format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
classification: CliClassification::Additive,
checksum_up: "V1:aabbccdd".to_string(),
checksum_down: None,
segments: vec![CliReplaySegment {
kind: CliSegmentKind::Transactional,
statements: vec![CliReplayStatement {
label: "phase_zero_bootstrap".to_string(),
up: generated_stale_phase_zero_sql("stale_replay"),
}],
}],
};
fs::write(
bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
serde_json::to_string(&replay).unwrap(),
)
.unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:aabbccdd",
None,
);
assert!(
refusal.is_some(),
"stale Phase 0 replay plan should be refused by cleanup guard"
);
let msg = refusal.unwrap();
assert!(msg.contains("generated-stale"), "refusal reason: {msg}");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_for_cleanup_allows_current_replay_plan() {
let work = temp_workspace("current_cleanup");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let replay = CliReplayPlan {
format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
classification: CliClassification::Additive,
checksum_up: "V1:eeff0011".to_string(),
checksum_down: None,
segments: vec![CliReplaySegment {
kind: CliSegmentKind::Transactional,
statements: vec![CliReplayStatement {
label: "phase_zero_bootstrap".to_string(),
up: current_production_phase_zero_sql("current_replay"),
}],
}],
};
fs::write(
bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
serde_json::to_string(&replay).unwrap(),
)
.unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:eeff0011",
None,
);
assert!(
refusal.is_none(),
"identity-free Phase 0 should be allowed by cleanup guard; got: {refusal:?}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_for_cleanup_refuses_seed_capable_replay_plan() {
let work = temp_workspace("seed_cleanup_replay_plan");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let replay = CliReplayPlan {
format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
classification: CliClassification::Additive,
checksum_up: "V1:11223344".to_string(),
checksum_down: None,
segments: vec![CliReplaySegment {
kind: CliSegmentKind::Transactional,
statements: vec![CliReplayStatement {
label: "phase_zero_bootstrap".to_string(),
up: seed_capable_phase_zero_sql(),
}],
}],
};
fs::write(
bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
serde_json::to_string(&replay).unwrap(),
)
.unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:11223344",
None,
);
let msg = refusal.expect("seed-capable replay plan must refuse");
assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_for_cleanup_refuses_markerless_seed_replay_plan() {
let work = temp_workspace("markerless_seed_cleanup_replay_plan");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let replay = CliReplayPlan {
format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
classification: CliClassification::Additive,
checksum_up: "V1:55667788".to_string(),
checksum_down: None,
segments: vec![CliReplaySegment {
kind: CliSegmentKind::Transactional,
statements: vec![CliReplayStatement {
label: "phase_zero_bootstrap".to_string(),
up: markerless_seed_phase_zero_sql("markerless_seed_replay"),
}],
}],
};
fs::write(
bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
serde_json::to_string(&replay).unwrap(),
)
.unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:55667788",
None,
);
let msg = refusal.expect("markerless seed replay plan must refuse");
assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_for_cleanup_refuses_cte_seed_dml_replay_plan() {
let work = temp_workspace("cte_seed_cleanup_replay_plan");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let replay = CliReplayPlan {
format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
classification: CliClassification::Additive,
checksum_up: "V1:66778899".to_string(),
checksum_down: None,
segments: vec![CliReplaySegment {
kind: CliSegmentKind::Transactional,
statements: vec![CliReplayStatement {
label: "phase_zero_bootstrap".to_string(),
up: phase_zero_with_seed_statement(
"cte_seed_cleanup_replay",
"WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
),
}],
}],
};
fs::write(
bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
serde_json::to_string(&replay).unwrap(),
)
.unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:66778899",
None,
);
let msg = refusal.expect("CTE seed replay plan must refuse");
assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_for_cleanup_fallback_sql_file() {
let work = temp_workspace("fallback_cleanup");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let up_sql = current_production_phase_zero_sql("fallback_sql");
let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
fs::write(bucket_dir.join(&up_filename), up_sql).unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:anychecksum",
None,
);
assert!(
refusal.is_none(),
"identity-free Phase 0 fallback SQL should be allowed; got: {refusal:?}"
);
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_for_cleanup_refuses_seed_capable_fallback_sql_file() {
let work = temp_workspace("seed_cleanup_fallback");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
fs::write(bucket_dir.join(&up_filename), seed_capable_phase_zero_sql()).unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:anychecksum",
None,
);
let msg = refusal.expect("seed-capable fallback SQL must refuse");
assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_for_cleanup_refuses_markerless_seed_fallback_sql_file() {
let work = temp_workspace("markerless_seed_cleanup_fallback");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
fs::write(
bucket_dir.join(&up_filename),
markerless_seed_phase_zero_sql("markerless_seed_fallback"),
)
.unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:anychecksum",
None,
);
let msg = refusal.expect("markerless seed fallback SQL must refuse");
assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
let _ = fs::remove_dir_all(&work);
}
#[test]
fn classify_phase_zero_for_cleanup_refuses_copy_from_seed_fallback_sql_file() {
let work = temp_workspace("copy_seed_cleanup_fallback");
let bucket_dir = work.join("migrations/main/_global_");
fs::create_dir_all(&bucket_dir).unwrap();
let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
fs::write(
bucket_dir.join(&up_filename),
phase_zero_with_seed_statement(
"copy_seed_cleanup_fallback",
"COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
),
)
.unwrap();
let bucket = djogi::migrate::BucketKey {
database: "main".to_string(),
app: String::new(),
};
let refusal = classify_phase_zero_for_cleanup(
&work,
&bucket,
djogi::migrate::PHASE_ZERO_VERSION,
"V1:anychecksum",
None,
);
let msg = refusal.expect("COPY FROM seed fallback SQL must refuse");
assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
let _ = fs::remove_dir_all(&work);
}
#[djogi::djogi_test]
async fn check_ledger_state_is_app_scoped(mut ctx: djogi::context::DjogiContext) {
use djogi::migrate::{ExecutionMode, LedgerRow, LedgerStatus};
djogi::migrate::bootstrap_ledger(&mut ctx)
.await
.expect("bootstrap");
let row = LedgerRow {
version: "V20260609000000__t397".into(),
description: "test migration".into(),
checksum_up: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
checksum_down: Some(
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".into(),
),
execution_mode: ExecutionMode::Transactional,
status: LedgerStatus::Pending,
execution_time_ms: 0,
out_of_order_flag: false,
applied_steps_count: 0,
total_steps: None,
partial_apply_note: None,
run_id: 1,
snapshot_version: "0".into(),
app_label: "users".into(),
leaf_identity: None,
};
let ledger_id = djogi::migrate::insert_pending_ledger_row(&mut ctx, &row)
.await
.expect("insert pending");
djogi::migrate::mark_ledger_applied(&mut ctx, ledger_id, 10, 1)
.await
.expect("mark applied");
let state = check_ledger_state(&mut ctx, "V20260609000000__t397", "system").await;
assert!(
matches!(state, LedgerState::NotPresent),
"different app stream must be NotPresent, got {state:?}",
);
let state = check_ledger_state(&mut ctx, "V20260609000000__t397", "users").await;
assert!(
matches!(state, LedgerState::AlreadyApplied),
"same app stream must be AlreadyApplied, got {state:?}",
);
}
}