Skip to main content

djogi_cli/
migrations.rs

1//! `djogi migrations` subcommand glue
2//! Two leaves: `compose` and `status`. Both flow through the public
3//! `djogi::migrate` API. Compose acquires the workspace file lock for
4//! the duration of the call; status is read-only and does not.
5//! The CLI surface here is intentionally thin — all the real logic
6//! lives in the library so integration tests can exercise it without
7//! spawning subprocesses.
8
9use std::path::{Path, PathBuf};
10use std::process::ExitCode;
11
12use djogi::apps::AppRegistry;
13use djogi::migrate::{
14    AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
15    DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
16    RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
17    VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
18    compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
19    repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
20};
21
22// Re-export for the apply command's ledger state machine.
23use djogi::migrate::LedgerStatus;
24
25// CLI-side enums declared at the crate root (`main.rs` is the binary's
26// root module — there is no `mod main`), reached here as `crate::*`.
27use crate::{PartialApplyResolutionCli, RepairSubcommand};
28
29// ── Replay plan deserialization ──────────────────────────────────────────
30
31/// Local mirror of `StoredReplayPlan` (pub(crate) in the library).
32/// The committed replay plan JSON written by `compose` at
33/// `migrations/<database>/<app>/<version>.plan.json`. This struct
34/// allows the CLI to parse it and construct a proper [`MigrationPlan`]
35/// with correct segment structure and checksums.
36#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
37struct CliReplayPlan {
38    format_version: String,
39    checksum_up: String,
40    checksum_down: Option<String>,
41    classification: CliClassification,
42    segments: Vec<CliReplaySegment>,
43}
44
45#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
46#[serde(tag = "kind", rename_all = "snake_case")]
47enum CliClassification {
48    NoOp,
49    Additive,
50    Reversible,
51    Destructive,
52    Lossy,
53    Unsupported {
54        reason: String,
55    },
56    PkTypeFlip {
57        co_destructive: bool,
58        co_lossy: bool,
59    },
60}
61
62#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
63struct CliReplaySegment {
64    kind: CliSegmentKind,
65    statements: Vec<CliReplayStatement>,
66}
67
68#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
69#[serde(rename_all = "snake_case")]
70enum CliSegmentKind {
71    Transactional,
72    NonTransactional,
73    MetadataOnly,
74}
75
76#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
77struct CliReplayStatement {
78    label: String,
79    up: String,
80}
81
82/// Expected format version for the committed replay plan JSON.
83const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
84
85/// Load the committed replay plan from disk and convert to a
86/// [`djogi::migrate::MigrationPlan`]. Returns `(plan, checksum_up, checksum_down)`.
87/// Falls back to reading the up/down SQL files and constructing a
88/// single-segment transactional plan when the replay plan JSON is
89/// absent or invalid. This mirrors the reset.rs fallback path.
90fn load_replay_plan_from_disk(
91    workspace: &Path,
92    bucket: &djogi::migrate::BucketKey,
93    version: &str,
94    pending_checksum_up: &str,
95    pending_checksum_down: Option<&str>,
96) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
97    // Try to load the committed replay plan JSON first.
98    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
99    let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
100
101    if let Ok(bytes) = std::fs::read(&replay_plan_path) {
102        let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
103            Ok(s) => s,
104            Err(e) => {
105                return Err(ApplyReplayPlanError::Parse {
106                    path: replay_plan_path.clone(),
107                    source: e.to_string(),
108                });
109            }
110        };
111
112        if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
113            return Err(ApplyReplayPlanError::FormatVersion {
114                found: stored.format_version,
115                path: replay_plan_path.clone(),
116            });
117        }
118
119        // Verify checksums match the pending plan.
120        if stored.checksum_up != pending_checksum_up
121            || stored.checksum_down.as_deref() != pending_checksum_down
122        {
123            return Err(ApplyReplayPlanError::ChecksumMismatch);
124        }
125
126        let plan = djogi::migrate::MigrationPlan {
127            bucket: bucket.clone(),
128            classification: stored.classification.into(),
129            segments: stored
130                .segments
131                .into_iter()
132                .map(|seg| djogi::migrate::Segment {
133                    kind: seg.kind.into(),
134                    statements: seg
135                        .statements
136                        .into_iter()
137                        .map(|stmt| djogi::migrate::OperationSql {
138                            label: stmt.label,
139                            up: stmt.up,
140                            down: String::new(),
141                            lossy: None,
142                        })
143                        .collect(),
144                })
145                .collect(),
146        };
147
148        return Ok((plan, stored.checksum_up, stored.checksum_down));
149    }
150
151    // Fallback: read SQL files and construct single-segment plan.
152    let up_filename = djogi::migrate::up_filename(version);
153    let down_filename = djogi::migrate::down_filename(version);
154    let up_path = bucket_dir.join(&up_filename);
155    let down_path = bucket_dir.join(&down_filename);
156
157    let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
158        path: up_path.clone(),
159        source: e.to_string(),
160    })?;
161
162    let down_sql = match std::fs::read_to_string(&down_path) {
163        Ok(sql) => sql,
164        Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
165        Err(e) => {
166            return Err(ApplyReplayPlanError::SqlRead {
167                path: down_path.clone(),
168                source: e.to_string(),
169            });
170        }
171    };
172
173    // Compute checksum for the single-segment fallback. The runner
174    // recomputes from the plan's SQL fragments and verifies against
175    // what we provide in RunnerCtx, so they must match.
176    let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
177
178    // Build a single-transactional-segment plan. This is correct for
179    // most migrations — only CONCURRENTLY indexes require non-tx
180    // segments, and those always have a replay plan JSON.
181    let plan = djogi::migrate::MigrationPlan {
182        bucket: bucket.clone(),
183        classification: djogi::migrate::Classification::Additive,
184        segments: vec![djogi::migrate::Segment {
185            kind: djogi::migrate::SegmentKind::Transactional,
186            statements: vec![djogi::migrate::OperationSql {
187                label: format!("replay {version}"),
188                up: up_sql,
189                down: down_sql,
190                lossy: None,
191            }],
192        }],
193    };
194
195    Ok((plan, computed_checksum_up, None))
196}
197
198/// Errors from [`load_replay_plan_from_disk`].
199#[derive(Debug)]
200enum ApplyReplayPlanError {
201    Parse { path: PathBuf, source: String },
202    FormatVersion { found: String, path: PathBuf },
203    ChecksumMismatch,
204    SqlRead { path: PathBuf, source: String },
205}
206
207impl std::fmt::Display for ApplyReplayPlanError {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        match self {
210            Self::Parse { path, source } => {
211                write!(f, "parse replay plan {}: {source}", path.display())
212            }
213            Self::FormatVersion { found, path } => write!(
214                f,
215                "replay plan format version mismatch in {}: expected {}, found {}",
216                path.display(),
217                CLI_REPLAY_PLAN_FORMAT_VERSION,
218                found
219            ),
220            Self::ChecksumMismatch => {
221                write!(f, "checksum mismatch between pending JSON and replay plan")
222            }
223            Self::SqlRead { path, source } => {
224                write!(f, "read SQL file {}: {source}", path.display())
225            }
226        }
227    }
228}
229
230impl std::error::Error for ApplyReplayPlanError {}
231
232/// Classify a Phase 0 artifact for the CLI cleanup path (#386).
233/// Loads the committed replay plan JSON or falls back to the SQL file,
234/// classifies the up SQL using [`djogi::migrate::classify_phase_zero_artifact`],
235/// and returns `Some(reason)` unless the artifact is identity-free
236/// replay-current.
237/// Returns `None` when the artifact is safe for migration replay.
238fn classify_phase_zero_for_cleanup(
239    workspace: &Path,
240    bucket: &djogi::migrate::BucketKey,
241    version: &str,
242    pending_checksum_up: &str,
243    pending_checksum_down: Option<&str>,
244) -> Option<String> {
245    // Try to load the committed replay plan JSON first.
246    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
247    let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
248
249    if let Ok(bytes) = std::fs::read(&replay_plan_path) {
250        let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
251            Ok(s) => s,
252            Err(e) => {
253                return Some(format!("parse replay plan: {e}"));
254            }
255        };
256
257        if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
258            return Some(format!(
259                "replay plan format version mismatch: expected {}, found {}",
260                CLI_REPLAY_PLAN_FORMAT_VERSION, stored.format_version
261            ));
262        }
263
264        // Verify checksums match the pending plan.
265        if stored.checksum_up != pending_checksum_up
266            || stored.checksum_down.as_deref() != pending_checksum_down
267        {
268            return Some("checksum mismatch between pending JSON and replay plan".to_string());
269        }
270
271        // Reconstruct the up SQL from the replay plan segments for classification.
272        let up_sql: String = stored
273            .segments
274            .iter()
275            .flat_map(|seg| seg.statements.iter())
276            .map(|stmt| stmt.up.as_str())
277            .collect::<Vec<&str>>()
278            .join("\n");
279
280        return classify_phase_zero_bytes(up_sql.as_bytes());
281    }
282
283    // Fallback: read the up SQL file directly.
284    let up_filename = djogi::migrate::up_filename(version);
285    let up_path = bucket_dir.join(&up_filename);
286    match std::fs::read_to_string(&up_path) {
287        Ok(up_sql) => classify_phase_zero_bytes(up_sql.as_bytes()),
288        Err(e) => Some(format!("read up SQL file {}: {e}", up_path.display())),
289    }
290}
291
292/// Classify raw bytes as Phase 0 artifact and return refusal reason unless it
293/// is identity-free replay-current.
294fn classify_phase_zero_bytes(bytes: &[u8]) -> Option<String> {
295    match djogi::migrate::classify_phase_zero_artifact(bytes) {
296        djogi::migrate::PhaseZeroArtifactState::IdentityFreeCurrent => None,
297        djogi::migrate::PhaseZeroArtifactState::SeedCapableRuntimeCurrent => {
298            Some("seed-capable runtime-only artifact detected".to_string())
299        }
300        djogi::migrate::PhaseZeroArtifactState::SeedDmlNotRuntimeCurrent => {
301            Some("seed-dml non-runtime-current artifact detected".to_string())
302        }
303        djogi::migrate::PhaseZeroArtifactState::GeneratedStale => {
304            Some("generated-stale artifact detected".to_string())
305        }
306        djogi::migrate::PhaseZeroArtifactState::Ambiguous => {
307            Some("ambiguous or hand-edited artifact detected".to_string())
308        }
309        djogi::migrate::PhaseZeroArtifactState::Incomplete => {
310            Some("incomplete artifact (truncated generation)".to_string())
311        }
312        djogi::migrate::PhaseZeroArtifactState::Missing => Some("missing artifact".to_string()),
313    }
314}
315
316// ── Type conversions from CLI-local types to library types ────────────────
317
318impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
319    fn from(kind: CliSegmentKind) -> Self {
320        match kind {
321            CliSegmentKind::Transactional => Self::Transactional,
322            CliSegmentKind::NonTransactional => Self::NonTransactional,
323            CliSegmentKind::MetadataOnly => Self::MetadataOnly,
324        }
325    }
326}
327
328impl From<CliClassification> for djogi::migrate::Classification {
329    fn from(classification: CliClassification) -> Self {
330        match classification {
331            CliClassification::NoOp => Self::NoOp,
332            CliClassification::Additive => Self::Additive,
333            CliClassification::Reversible => Self::Reversible,
334            CliClassification::Destructive => Self::Destructive,
335            CliClassification::Lossy => Self::Lossy,
336            CliClassification::Unsupported { reason } => Self::Unsupported { reason },
337            CliClassification::PkTypeFlip {
338                co_destructive,
339                co_lossy,
340            } => Self::PkTypeFlip {
341                co_destructive,
342                co_lossy,
343            },
344        }
345    }
346}
347
348/// Resolve the workspace root from the `--workspace` flag. When the
349/// flag is absent we use the current working directory — the typical
350/// invocation pattern is `cd <project>` then `djogi migrations …`.
351fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
352    workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
353}
354
355/// Walk the on-disk `migrations/<database>/<app>/` tree and return the
356/// set of buckets that already have a `schema_snapshot.json` file.
357/// Compose's `snapshots` map must include the OLD bucket of any
358/// renamed app — and that bucket is guaranteed to be absent from the
359/// current `models` inventory because the `#[app(renamed_from =
360/// "old")]` annotation lives on the NEW app. Walking disk directly
361/// recovers those orphaned snapshots so the differ sees both sides of
362/// a rename.
363/// Each entry maps to a [`djogi::migrate::projection::BucketKey`]
364/// using the inverse of [`djogi::migrate::app_dirname`] (synthetic
365/// `_global_` directory → empty-string label).
366fn discover_snapshot_buckets_on_disk(
367    workspace: &Path,
368) -> Vec<djogi::migrate::projection::BucketKey> {
369    let mut out = Vec::new();
370    let migrations_root = djogi::migrate::migrations_root(workspace);
371    let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
372        return out;
373    };
374    for db_entry in db_entries.flatten() {
375        let Ok(ft) = db_entry.file_type() else {
376            continue;
377        };
378        if !ft.is_dir() {
379            continue;
380        }
381        let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
382            continue;
383        };
384        let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
385            continue;
386        };
387        for app_entry in app_entries.flatten() {
388            let Ok(ft) = app_entry.file_type() else {
389                continue;
390            };
391            if !ft.is_dir() {
392                continue;
393            }
394            let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
395                continue;
396            };
397            let snap_path = app_entry.path().join("schema_snapshot.json");
398            if !snap_path.exists() {
399                continue;
400            }
401            let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
402            out.push(djogi::migrate::projection::BucketKey {
403                database: database.clone(),
404                app: label,
405            });
406        }
407    }
408    out
409}
410
411/// `djogi migrations compose` entry point.
412pub fn compose_cmd(
413    provider: &dyn DescriptorProvider,
414    name: &str,
415    allow_destructive: bool,
416    force_overwrite: bool,
417    workspace: Option<PathBuf>,
418) -> ExitCode {
419    let workspace = resolve_workspace(workspace);
420    let models = match project_from_provider(provider) {
421        Ok(m) => m,
422        Err(e) => {
423            eprintln!("djogi migrations compose: projection error: {e}");
424            return ExitCode::from(1);
425        }
426    };
427    let apps: Vec<AppLifecycle> = provider
428        .apps()
429        .iter()
430        .map(|d| AppLifecycle {
431            label: d.label.to_string(),
432            database: d.database.to_string(),
433            renamed_from: d.renamed_from.map(str::to_string),
434            tombstone: d.tombstone,
435        })
436        .collect();
437    // The resolved workspace flows into config loading. Compose consumes
438    // the [`MigrateConfig::pk_flip_join_table_option`] knob so we no
439    // longer drop the parsed config — the join-table layout
440    // selected in `Djogi.toml` reaches the differ via this path.
441    let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
442        Ok(c) => c,
443        Err(e) => {
444            eprintln!("djogi migrations compose: config load: {e}");
445            return ExitCode::from(1);
446        }
447    };
448    let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
449        djogi_config.migrate.pk_flip_join_table_option,
450    );
451    compose_with_inputs(
452        &workspace,
453        name,
454        allow_destructive,
455        force_overwrite,
456        &models,
457        &apps,
458        time::OffsetDateTime::now_utc(),
459        Some(pk_flip_option),
460    )
461}
462
463/// Shared compose body — separated from [`compose_cmd`] so tests can
464/// drive it with explicit `models` and `apps` (the production entry
465/// point sources both from `inventory::iter` and `AppRegistry::all`,
466/// which are global state and thus not directly addressable from a
467/// unit test).
468/// Acquires the workspace lock, walks the on-disk migration tree to
469/// recover orphaned snapshots (renamed-from buckets), and
470/// invokes [`djogi::migrate::compose`].
471// Compose has 8 inputs because it sits at the bridge between
472// CLI flag parsing (workspace / name / flags / clock) and the
473// engine (`models` / `apps` / `pk_flip_join_table_option`).
474// Folding these into a struct would push the same fields onto
475// the caller; the CLI tests already pass them positionally and
476// a struct-based refactor would be churn for no clarity gain.
477#[allow(clippy::too_many_arguments)]
478fn compose_with_inputs(
479    workspace: &Path,
480    name: &str,
481    allow_destructive: bool,
482    force_overwrite: bool,
483    models: &std::collections::BTreeMap<
484        djogi::migrate::projection::BucketKey,
485        djogi::migrate::AppliedSchema,
486    >,
487    apps: &[AppLifecycle],
488    now: time::OffsetDateTime,
489    pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
490) -> ExitCode {
491    let lock_path = workspace.join(LOCK_FILE_NAME);
492    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
493        Ok(g) => g,
494        Err(e) => {
495            eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
496            return ExitCode::from(1);
497        }
498    };
499
500    // Read snapshots from disk. The bucket set we load is the UNION of
501    // (a) every bucket the current projection knows about and (b) every
502    // on-disk bucket that has a snapshot file.
503    // Without (b) a renamed-from app's old snapshot is missed
504    // entirely (the new app's `BucketKey` differs and the differ
505    // never sees the old schema, breaking compose's rename + drop +
506    // move emission).
507    let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
508        models.keys().cloned().collect();
509    for bucket in discover_snapshot_buckets_on_disk(workspace) {
510        bucket_set.insert(bucket);
511    }
512
513    let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
514    for bucket in &bucket_set {
515        let path = djogi::migrate::snapshot_path(workspace, bucket);
516        match djogi::migrate::load_snapshot(&path) {
517            Ok(s) => {
518                snapshots.insert(bucket.clone(), s);
519            }
520            Err(djogi::migrate::SnapshotError::Io { source, .. })
521                if source.kind() == std::io::ErrorKind::NotFound =>
522            {
523                // Fresh app — no prior snapshot.
524            }
525            Err(e) => {
526                eprintln!(
527                    "djogi migrations compose: snapshot load failed at {}: {e}",
528                    path.display()
529                );
530                return ExitCode::from(1);
531            }
532        }
533    }
534
535    let req = ComposeRequest {
536        workspace_root: workspace,
537        models,
538        snapshots: &snapshots,
539        apps,
540        name,
541        allow_destructive,
542        force_overwrite,
543        now,
544        _guard: &guard,
545        pk_flip_join_table_option,
546        // Production: always run auto-emit. The flag is a
547        // test-only escape hatch for unit tests that exercise
548        // compose's lower-level write/rollback machinery in
549        // isolation; the CLI / production path always goes through
550        // the full bootstrap flow.
551        skip_phase_zero_auto_emit: false,
552    };
553    match compose(req) {
554        Ok(report) => {
555            // Surface auto-emitted bootstraps before
556            // the regular composed buckets so the operator sees the
557            // bootstrap context before the per-bucket changes.
558            for emit in &report.emitted_phase_zero {
559                let ext_summary = if emit.extensions.is_empty() {
560                    "no extensions".to_string()
561                } else {
562                    format!(
563                        "extensions: {}",
564                        emit.extensions
565                            .iter()
566                            .cloned()
567                            .collect::<Vec<_>>()
568                            .join(", ")
569                    )
570                };
571                println!(
572                    "auto-emitted bootstrap migration: {database}/_global_ ({ext_summary})",
573                    database = emit.database,
574                );
575            }
576            for cb in &report.composed_buckets {
577                println!(
578                    "composed {database}/{app}: {version} ({classification:?})",
579                    database = cb.bucket.database,
580                    app = if cb.bucket.app.is_empty() {
581                        "_global_"
582                    } else {
583                        cb.bucket.app.as_str()
584                    },
585                    version = cb.version,
586                    classification = cb.classification,
587                );
588            }
589            ExitCode::from(0)
590        }
591        Err(ComposeError::NothingToCompose) => {
592            println!("nothing to compose — model state matches snapshot for every bucket");
593            // Per the inline-decisions: nothing-to-compose is
594            // not an error. The status command is the one that
595            // signals out-of-sync state via exit code.
596            ExitCode::from(0)
597        }
598        Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
599            eprintln!("djogi migrations compose: {text}");
600            // Exit 2 — refusal: models must be compiled in before dropping app linkage.
601            ExitCode::from(2)
602        }
603        Err(e) => {
604            eprintln!("djogi migrations compose: {e}");
605            ExitCode::from(1)
606        }
607    }
608}
609
610/// `djogi migrations status` entry point.
611/// Read-only — does not acquire the workspace lock. Reads the
612/// migration ledger from the active database via
613/// [`djogi::context::DjogiContext`].
614pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
615    let workspace = resolve_workspace(workspace);
616
617    // Build a tokio runtime so we can drive the async ledger query.
618    let runtime = match tokio::runtime::Builder::new_current_thread()
619        .enable_all()
620        .build()
621    {
622        Ok(r) => r,
623        Err(e) => {
624            eprintln!("djogi migrations status: tokio runtime: {e}");
625            return ExitCode::from(1);
626        }
627    };
628
629    let exit = runtime.block_on(async { run_status(&workspace).await });
630    ExitCode::from(exit as u8)
631}
632
633/// Async body of [`status_cmd`]. Returns the desired exit code.
634/// The resolved `workspace` path feeds
635/// [`djogi::config::DjogiConfig::load_from_workspace`] so a
636/// `--workspace /custom/path` actually reads `/<custom>/Djogi.toml`
637/// instead of always picking up the cwd's config. Production callers
638/// running from inside the project root (the typical case) get the
639/// previous behaviour for free — `resolve_workspace(None)` returns
640/// `cwd`.
641async fn run_status(workspace: &Path) -> i32 {
642    use djogi::config::DjogiConfig;
643
644    let config = match DjogiConfig::load_from_workspace(workspace) {
645        Ok(c) => c,
646        Err(e) => {
647            eprintln!("djogi migrations status: config load: {e}");
648            return 1;
649        }
650    };
651
652    let mut ctx = match connect_and_check(&config.database.url).await {
653        ContextOutcome::Ready(ctx) => ctx,
654        ContextOutcome::UnsupportedVersion(e) => {
655            crate::print_support_boundary_error("migrations status", &e);
656            return 2;
657        }
658        ContextOutcome::RuntimeError(msg) => {
659            eprintln!("djogi migrations status: pool: {msg}");
660            return 1;
661        }
662    };
663
664    let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
665        Ok(rows) => rows,
666        Err(e) => {
667            // A missing ledger table is treated as "no migrations
668            // applied" — print the empty state and exit 0.
669            if e.to_string().contains("djogi_schema_migrations") {
670                println!("No migrations recorded.");
671                return 0;
672            }
673            eprintln!("djogi migrations status: ledger read: {e}");
674            return 1;
675        }
676    };
677
678    let registered: Vec<String> = AppRegistry::all()
679        .iter()
680        .map(|d| d.label.to_string())
681        .collect();
682    let report = djogi::migrate::render_status(&rows, &registered);
683    for line in &report.lines {
684        println!("{line}");
685    }
686    report.exit_code
687}
688
689/// Outcome of [`connect_and_check`] — connecting a pool and running the
690/// Postgres-version preflight, with the support-boundary refusal kept
691/// distinct from ordinary runtime failures.
692/// The three arms drive different exit codes at the call site:
693/// - [`ContextOutcome::Ready`] — pool connected and PG ≥ 18; proceed.
694/// - [`ContextOutcome::UnsupportedVersion`] — PG < 18. The caller renders
695///   the support-boundary message via
696///   [`crate::print_support_boundary_error`] and exits `2` (refusal: the
697///   operator must upgrade Postgres; retrying changes nothing).
698/// - [`ContextOutcome::RuntimeError`] — pool connect failed, the preflight
699///   query errored, or any other non-version `DjogiError`. The caller
700///   prints the message and exits `1` (transient: CI may retry).
701// The `Ready` variant holds a `DjogiContext` (large — it wraps a
702// `DjogiPool`), while the other two variants are small (`DjogiError` /
703// `String`). Boxing `Ready` would add a heap allocation on the success
704// path; this value is constructed and immediately matched at each call
705// site (never stored in a collection), so the wider stack value is a
706// transient one-off, not a per-element penalty. Same trade-off and
707// rationale as `ContextInner` in `djogi::context` (see its
708// `large_enum_variant` allow).
709#[allow(clippy::large_enum_variant)]
710enum ContextOutcome {
711    /// Pool connected and the PG-version preflight passed.
712    Ready(djogi::context::DjogiContext),
713    /// The PG-version preflight refused — server is below the minimum
714    /// supported major version.
715    UnsupportedVersion(djogi::error::DjogiError),
716    /// A runtime failure (connect / preflight / other) — already rendered
717    /// to a string so the call site need not re-match.
718    RuntimeError(String),
719}
720
721/// Connect a pool from `url` and run the Postgres-version preflight,
722/// returning a typed [`ContextOutcome`].
723/// Splits the support-boundary refusal (PG < 18, exit `2`) from runtime
724/// failures (connect / query errors, exit `1`) so each call site can map
725/// the outcome onto the documented exit-code matrix. Connects via the
726/// public `DjogiPool::connect` entry point, then hands the pool to the
727/// public `DjogiContext::from_pool` API once the version check passes.
728async fn connect_and_check(url: &str) -> ContextOutcome {
729    let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
730        Ok(p) => p,
731        Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
732    };
733    match djogi::pg::preflight::check_postgres_version(&pool).await {
734        Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
735        // `DjogiError` is `#[non_exhaustive]`, so the `@`-bound
736        // `UnsupportedPostgresVersion` arm needs the trailing `_` catch-all.
737        Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
738            ContextOutcome::UnsupportedVersion(e)
739        }
740        Err(other) => ContextOutcome::RuntimeError(other.to_string()),
741    }
742}
743
744/// Resolve the connection URL for a single migration-bucket database.
745/// Verify routes each bucket to the pool for its `database` component.
746/// The mapping mirrors Djogi's three-database architecture:
747/// - `"main"` ([`djogi::apps::AppDescriptor::GLOBAL_DATABASE`]) always uses
748///   the app URL verbatim. We do NOT derive it by splicing `"main"` into
749///   the path, because the operator's app URL may carry a path component
750///   that is not literally named `main` (e.g. `…/myapp_prod`); deriving
751///   would target a database that does not exist.
752/// - `"crud_log"` / `"event_log"` prefer the explicit
753///   [`djogi::config::DatabaseConfig::crud_log_url`] /
754///   [`event_log_url`](djogi::config::DatabaseConfig::event_log_url) when
755///   set to a non-empty value, matching how the audit / event pools are
756///   resolved elsewhere.
757/// - Any other database name (and the log databases when their explicit
758///   URL is absent) is derived by splicing the name into the app URL's
759///   path component via [`djogi::migrate::derive_per_database_url`].
760///   Returns `None` when derivation fails (the app URL has no recognisable
761///   path component); the caller surfaces that as a runtime error for the
762///   affected bucket.
763fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
764    // "main" always uses the app URL verbatim — do NOT derive, as the app
765    // URL may not have a path component named "main".
766    if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
767        return Some(db_config.url.clone());
768    }
769    if database == "crud_log"
770        && let Some(u) = db_config.crud_log_url.as_deref()
771        && !u.is_empty()
772    {
773        return Some(u.to_string());
774    }
775    if database == "event_log"
776        && let Some(u) = db_config.event_log_url.as_deref()
777        && !u.is_empty()
778    {
779        return Some(u.to_string());
780    }
781    djogi::migrate::derive_per_database_url(&db_config.url, database)
782}
783
784/// `djogi migrations apply` entry point.
785/// Discovers pending JSON files under `target/djogi_pending/`, loads the
786/// committed replay plan for each, and drives [`djogi::migrate::apply_plan`]
787/// through the library runner after CLI-side ledger-state classification.
788/// `Pending` rows require operator resolution. Caller-gated `Failed`/`RolledBack`
789/// rows are reapply-blocking cleanup candidates before runner invocation. Phase
790/// 0 cleanup is identity-free replay-current-only: seed-capable runtime,
791/// seed-DML non-runtime-current, missing, incomplete, generated-stale, or
792/// ambiguous artifacts refuse before delete.
793pub fn apply_cmd(
794    workspace: Option<PathBuf>,
795    fake: bool,
796    reason: Option<String>,
797    node_id: Option<u32>,
798    single_node_dev: bool,
799) -> ExitCode {
800    let workspace = resolve_workspace(workspace);
801
802    // Validate --fake / --reason pairing before doing any expensive work.
803    let mode = if fake {
804        match reason {
805            Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
806            Some(_) => {
807                eprintln!(
808                    "djogi migrations apply --fake: --reason must not be empty; \
809                     supply a non-empty reason why these migrations are being \
810                     faked (e.g. 'schema pre-exists from prior tooling')"
811                );
812                return ExitCode::from(2);
813            }
814            None => {
815                eprintln!(
816                    "djogi migrations apply --fake: --reason is required; \
817                     supply a reason why these migrations are being faked \
818                     (e.g. 'schema pre-exists from prior tooling'). \
819                     This is recorded in the ledger audit trail."
820                );
821                return ExitCode::from(2);
822            }
823        }
824    } else {
825        FakeMode::Real
826    };
827
828    let runtime = match tokio::runtime::Builder::new_current_thread()
829        .enable_all()
830        .build()
831    {
832        Ok(r) => r,
833        Err(e) => {
834            eprintln!("djogi migrations apply: tokio runtime: {e}");
835            return ExitCode::from(1);
836        }
837    };
838
839    let exit =
840        runtime.block_on(async { run_apply(&workspace, &mode, node_id, single_node_dev).await });
841    ExitCode::from(exit as u8)
842}
843
844/// Controls whether `apply_one_pending` executes SQL or records a
845/// fake-apply row in the ledger.
846#[derive(Debug, Clone)]
847enum FakeMode {
848    /// Execute DDL via `apply_plan`. Normal migration apply.
849    Real,
850    /// Skip DDL; record `status = 'faked'` via `fake_apply_plan`.
851    Fake { reason: String },
852}
853
854/// Async body of [`apply_cmd`]. Returns the desired exit code.
855async fn run_apply(
856    workspace: &Path,
857    mode: &FakeMode,
858    node_id: Option<u32>,
859    single_node_dev: bool,
860) -> i32 {
861    use djogi::config::DjogiConfig;
862
863    let action_verb = match mode {
864        FakeMode::Real => "apply",
865        FakeMode::Fake { .. } => "fake-apply",
866    };
867    let progress_verb = match mode {
868        FakeMode::Real => "applying",
869        FakeMode::Fake { .. } => "faking",
870    };
871
872    // 1. Load config.
873    let config = match DjogiConfig::load_from_workspace(workspace) {
874        Ok(c) => c,
875        Err(e) => {
876            eprintln!("djogi migrations {action_verb}: config load: {e}");
877            return 2;
878        }
879    };
880
881    // 2. Discover pending JSONs before resolving identity or connecting to DB.
882    // No-pending apply (zero pending files) is an identity-free inverse —
883    // skip the resolver and pool connection entirely when no pending plans exist.
884    let pending_files = match discover_pending_plans(workspace) {
885        Ok(pending_files) => pending_files,
886        Err(e) => {
887            eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
888            return 2;
889        }
890    };
891    if pending_files.is_empty() {
892        println!("No pending migrations to {action_verb}.");
893        return 0;
894    }
895
896    // 3. Resolve node identity for identity-bearing operations (only when work exists).
897    // Both real apply and fake-apply are identity-bearing (run-id generation + ledger).
898    let runner_identity = match crate::identity::resolve_identity(
899        node_id,
900        single_node_dev,
901        &config.profile,
902        action_verb,
903    ) {
904        Ok(resolved) => Some(resolved.into_runner_identity()),
905        Err(e) => {
906            let _ = crate::identity::print_identity_error(action_verb, &e);
907            return 2;
908        }
909    };
910
911    // 4. Resolve one URL per pending database target, then connect and
912    // preflight a dedicated context for each database before taking the
913    // workspace lock. The runner routes queries through the supplied
914    // context pool, so apply must bind one context per bucket.database.
915    let target_urls = match resolve_apply_target_urls(&pending_files, &config.database) {
916        Ok(urls) => urls,
917        Err(e) => {
918            eprintln!("djogi migrations {action_verb}: target routing: {e}");
919            return 2;
920        }
921    };
922    let mut contexts = std::collections::BTreeMap::<String, djogi::context::DjogiContext>::new();
923    for (database, url) in &target_urls {
924        match connect_and_check(url).await {
925            ContextOutcome::Ready(ctx) => {
926                contexts.insert(database.clone(), ctx);
927            }
928            ContextOutcome::UnsupportedVersion(e) => {
929                crate::print_support_boundary_error("migrations apply", &e);
930                return 2;
931            }
932            ContextOutcome::RuntimeError(msg) => {
933                eprintln!("djogi migrations {action_verb}: pool for '{database}': {msg}");
934                return 1;
935            }
936        }
937    }
938
939    // 5. Acquire workspace lock.
940    let lock_path = workspace.join(LOCK_FILE_NAME);
941    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
942        Ok(g) => g,
943        Err(e) => {
944            eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
945            return 1;
946        }
947    };
948
949    // 6. Reconcile the pending set under the lock before any cleanup/apply work.
950    let pending_files = match reconcile_pending_plans_after_lock(workspace, &pending_files) {
951        Ok(pending_files) => pending_files,
952        Err(e) => {
953            eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
954            return 2;
955        }
956    };
957
958    // 7. Build audit pool (optional — silently skipped if unavailable).
959    let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
960        Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
961        Err(_) => None,
962    };
963
964    // 8. Apply each pending migration through the context for its
965    // bucket database. The pending discovery sweep already deduped and
966    // preflighted the target database set above.
967    for pending_file in &pending_files {
968        let bucket_database = &pending_file.bucket.database;
969        let app_label = &pending_file.bucket.app;
970        let Some(ctx) = contexts.get_mut(bucket_database) else {
971            eprintln!(
972                "djogi migrations {action_verb}: internal error: missing context for database '{bucket_database}'"
973            );
974            return 1;
975        };
976        println!("  {progress_verb} {bucket_database}/{app_label}...");
977        let result = apply_one_pending(
978            ctx,
979            workspace,
980            pending_file,
981            &config,
982            &guard,
983            audit_pool.as_ref(),
984            mode,
985            runner_identity,
986        )
987        .await;
988
989        match result {
990            ApplyResult::Ok => match mode {
991                FakeMode::Real => {
992                    println!("Applied: {bucket_database}/{app_label}");
993                }
994                FakeMode::Fake { .. } => {
995                    println!(
996                        "  faked {bucket_database}/{app_label}: \
997                             recorded in ledger with status = 'faked' (no SQL executed)"
998                    );
999                }
1000            },
1001            ApplyResult::Skipped(reason) => {
1002                println!("Skipped {bucket_database}/{app_label}: {reason}");
1003            }
1004            ApplyResult::Refused(reason) => {
1005                eprintln!(
1006                    "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
1007                );
1008                return 2;
1009            }
1010            ApplyResult::RunnerError(e) => {
1011                eprintln!(
1012                    "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
1013                );
1014                return runner_error_exit_code(&e);
1015            }
1016        }
1017    }
1018
1019    let summary_verb = match mode {
1020        FakeMode::Real => "applied",
1021        FakeMode::Fake { .. } => "faked",
1022    };
1023    println!("{summary_verb} {} migration(s).", pending_files.len());
1024    0
1025}
1026
1027/// Outcome of applying a single pending migration.
1028#[derive(Debug)]
1029enum ApplyResult {
1030    /// Migration applied successfully.
1031    Ok,
1032    /// Migration skipped (already applied or no-op).
1033    Skipped(String),
1034    /// User-facing refusal — exit code 2.
1035    Refused(String),
1036    /// Runner error — exit code 1.
1037    RunnerError(RunnerError),
1038}
1039
1040#[derive(Debug, Clone, PartialEq, Eq)]
1041struct DiscoveredPendingPlan {
1042    path: PathBuf,
1043    bucket: BucketKey,
1044    plan: PendingPlan,
1045    is_phase_zero: bool,
1046}
1047
1048fn is_acceptable_pending_path_component(bytes: &[u8]) -> bool {
1049    if bytes.is_empty() || bytes.len() > 63 {
1050        return false;
1051    }
1052    if bytes[0] == b'.' {
1053        return false;
1054    }
1055    let first = bytes[0];
1056    if first != b'_' && !first.is_ascii_alphabetic() {
1057        return false;
1058    }
1059    for &b in &bytes[1..] {
1060        if b != b'_' && !b.is_ascii_alphanumeric() {
1061            return false;
1062        }
1063    }
1064    true
1065}
1066
1067fn canonical_pending_filename(app_label: &str) -> String {
1068    format!("{}.json", djogi::migrate::app_dirname(app_label))
1069}
1070
1071fn validate_hidden_phase_zero_pending(
1072    path: PathBuf,
1073    database: &str,
1074) -> Result<DiscoveredPendingPlan, String> {
1075    let filename = path
1076        .file_name()
1077        .and_then(|f| f.to_str())
1078        .ok_or_else(|| format!("non-utf8 Phase 0 pending path {}", path.display()))?;
1079    let expected_filename = format!("{}.json", djogi::migrate::PHASE_ZERO_VERSION);
1080    if filename != expected_filename {
1081        return Err(format!(
1082            "hidden Phase 0 pending path {} must use canonical filename {}",
1083            path.display(),
1084            expected_filename
1085        ));
1086    }
1087    let plan = djogi::migrate::load_pending(&path)
1088        .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1089    if plan.bucket_database != database {
1090        return Err(format!(
1091            "pending JSON {} has bucket database {}, expected {} from path",
1092            path.display(),
1093            plan.bucket_database,
1094            database
1095        ));
1096    }
1097    if !plan.bucket_app.is_empty() {
1098        return Err(format!(
1099            "pending JSON {} must target the global bucket in hidden Phase 0 namespace",
1100            path.display()
1101        ));
1102    }
1103    if plan.version != djogi::migrate::PHASE_ZERO_VERSION {
1104        return Err(format!(
1105            "pending JSON {} must use Phase 0 version {}, found {}",
1106            path.display(),
1107            djogi::migrate::PHASE_ZERO_VERSION,
1108            plan.version
1109        ));
1110    }
1111    Ok(DiscoveredPendingPlan {
1112        path,
1113        bucket: BucketKey {
1114            database: database.to_string(),
1115            app: String::new(),
1116        },
1117        plan,
1118        is_phase_zero: true,
1119    })
1120}
1121
1122fn validate_normal_pending(
1123    path: PathBuf,
1124    database: &str,
1125    filename: &str,
1126) -> Result<DiscoveredPendingPlan, String> {
1127    let Some(stem) = filename.strip_suffix(".json") else {
1128        return Err(format!(
1129            "pending path {} must end with .json",
1130            path.display()
1131        ));
1132    };
1133    let app = if stem == "_global_" {
1134        String::new()
1135    } else {
1136        if !is_acceptable_pending_path_component(stem.as_bytes()) {
1137            return Err(format!(
1138                "pending path {} uses non-canonical app filename {}",
1139                path.display(),
1140                filename
1141            ));
1142        }
1143        stem.to_string()
1144    };
1145    let expected_filename = canonical_pending_filename(&app);
1146    if filename != expected_filename {
1147        return Err(format!(
1148            "pending path {} must use canonical filename {}",
1149            path.display(),
1150            expected_filename
1151        ));
1152    }
1153    let plan = djogi::migrate::load_pending(&path)
1154        .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1155    if plan.bucket_database != database {
1156        return Err(format!(
1157            "pending JSON {} has bucket database {}, expected {} from path",
1158            path.display(),
1159            plan.bucket_database,
1160            database
1161        ));
1162    }
1163    if plan.bucket_app != app {
1164        let expected_app = if app.is_empty() {
1165            "_global_"
1166        } else {
1167            app.as_str()
1168        };
1169        let found_app = if plan.bucket_app.is_empty() {
1170            "_global_"
1171        } else {
1172            plan.bucket_app.as_str()
1173        };
1174        return Err(format!(
1175            "pending JSON {} has bucket app {}, expected {} from path",
1176            path.display(),
1177            found_app,
1178            expected_app
1179        ));
1180    }
1181    if plan.version == djogi::migrate::PHASE_ZERO_VERSION {
1182        return Err(format!(
1183            "pending JSON {} must use the hidden .phase_zero namespace for Phase 0",
1184            path.display()
1185        ));
1186    }
1187    Ok(DiscoveredPendingPlan {
1188        path,
1189        bucket: BucketKey {
1190            database: database.to_string(),
1191            app,
1192        },
1193        is_phase_zero: false,
1194        plan,
1195    })
1196}
1197
1198/// Scan `target/djogi_pending/` for pending JSON files.
1199/// Returns parsed pending plans sorted by version so Phase 0 runs
1200/// before later normal-global work. Malformed or duplicate pending
1201/// identities refuse rather than being guessed from filenames.
1202fn discover_pending_plans(workspace: &Path) -> Result<Vec<DiscoveredPendingPlan>, String> {
1203    let pending_root = djogi::migrate::pending_root(workspace);
1204    let mut out = Vec::new();
1205    let mut seen_identities = std::collections::BTreeSet::new();
1206
1207    let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
1208        return Ok(out);
1209    };
1210
1211    for db_entry in db_entries.flatten() {
1212        let db_name = match db_entry.file_name().to_str().map(str::to_string) {
1213            Some(n) => n,
1214            None => continue,
1215        };
1216        if !is_acceptable_pending_path_component(db_name.as_bytes()) {
1217            continue;
1218        }
1219
1220        let db_dir = db_entry.path();
1221        if !db_dir.is_dir() {
1222            continue;
1223        }
1224
1225        let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
1226            continue;
1227        };
1228
1229        for app_entry in app_entries.flatten() {
1230            let path = app_entry.path();
1231            let file_type = match app_entry.file_type() {
1232                Ok(file_type) => file_type,
1233                Err(_) => continue,
1234            };
1235            if file_type.is_dir() {
1236                if app_entry.file_name().to_str() == Some(".phase_zero") {
1237                    let Ok(phase_zero_entries) = std::fs::read_dir(&path) else {
1238                        continue;
1239                    };
1240                    for phase_zero_entry in phase_zero_entries.flatten() {
1241                        let phase_zero_path = phase_zero_entry.path();
1242                        if !phase_zero_path.is_file() {
1243                            continue;
1244                        }
1245                        let discovered =
1246                            validate_hidden_phase_zero_pending(phase_zero_path, &db_name)?;
1247                        let identity = (
1248                            discovered.bucket.database.clone(),
1249                            discovered.bucket.app.clone(),
1250                            discovered.plan.version.clone(),
1251                        );
1252                        if !seen_identities.insert(identity.clone()) {
1253                            return Err(format!(
1254                                "duplicate pending identity discovered for {}/{}/{}",
1255                                identity.0,
1256                                if identity.1.is_empty() {
1257                                    "_global_"
1258                                } else {
1259                                    identity.1.as_str()
1260                                },
1261                                identity.2
1262                            ));
1263                        }
1264                        out.push(discovered);
1265                    }
1266                }
1267                continue;
1268            }
1269            if !file_type.is_file() {
1270                continue;
1271            }
1272            let filename = match path.file_name().and_then(|f| f.to_str()) {
1273                Some(f) => f.to_string(),
1274                None => continue,
1275            };
1276            if !filename.ends_with(".json") {
1277                continue;
1278            }
1279            let discovered = validate_normal_pending(path, &db_name, &filename)?;
1280            let identity = (
1281                discovered.bucket.database.clone(),
1282                discovered.bucket.app.clone(),
1283                discovered.plan.version.clone(),
1284            );
1285            if !seen_identities.insert(identity.clone()) {
1286                return Err(format!(
1287                    "duplicate pending identity discovered for {}/{}/{}",
1288                    identity.0,
1289                    if identity.1.is_empty() {
1290                        "_global_"
1291                    } else {
1292                        identity.1.as_str()
1293                    },
1294                    identity.2
1295                ));
1296            }
1297            out.push(discovered);
1298        }
1299    }
1300
1301    out.sort_by(|a, b| {
1302        a.plan
1303            .version
1304            .cmp(&b.plan.version)
1305            .then_with(|| b.is_phase_zero.cmp(&a.is_phase_zero))
1306            .then_with(|| a.path.cmp(&b.path))
1307    });
1308    Ok(out)
1309}
1310
1311fn load_verified_pending_for_apply(
1312    pending_file: &DiscoveredPendingPlan,
1313) -> Result<PendingPlan, String> {
1314    let pending_bytes =
1315        std::fs::read(&pending_file.path).map_err(|e| format!("read pending JSON: {e}"))?;
1316    let pending: PendingPlan =
1317        serde_json::from_slice(&pending_bytes).map_err(|e| format!("parse pending JSON: {e}"))?;
1318    if pending != pending_file.plan {
1319        return Err(format!(
1320            "pending JSON changed after discovery at {}; rerun the command",
1321            pending_file.path.display()
1322        ));
1323    }
1324    Ok(pending)
1325}
1326
1327fn resolve_apply_target_urls(
1328    pending_files: &[DiscoveredPendingPlan],
1329    db_config: &djogi::config::DatabaseConfig,
1330) -> Result<std::collections::BTreeMap<String, String>, String> {
1331    let mut urls = std::collections::BTreeMap::new();
1332    for pending_file in pending_files {
1333        let database = &pending_file.bucket.database;
1334        if urls.contains_key(database) {
1335            continue;
1336        }
1337        let Some(url) = resolve_bucket_url(db_config, database) else {
1338            return Err(format!("cannot derive a database URL for `{database}`"));
1339        };
1340        urls.insert(database.clone(), url);
1341    }
1342    Ok(urls)
1343}
1344
1345fn reconcile_pending_plans_after_lock(
1346    workspace: &Path,
1347    pre_lock_pending_files: &[DiscoveredPendingPlan],
1348) -> Result<Vec<DiscoveredPendingPlan>, String> {
1349    let locked_pending_files = discover_pending_plans(workspace)?;
1350    if locked_pending_files != pre_lock_pending_files {
1351        return Err(
1352            "pending migration set changed while waiting for the workspace lock; rerun the command"
1353                .to_string(),
1354        );
1355    }
1356    Ok(locked_pending_files)
1357}
1358
1359/// Apply a single pending migration.
1360/// Re-loads the pending JSON after discovery and refuses if the bytes no
1361/// longer match the path-verified artifact, then checks the ledger-state
1362/// classification, loads the committed replay plan (or falls back to a
1363/// single-segment plan from the SQL file), and drives
1364/// [`djogi::migrate::apply_plan`]. `Pending` rows require operator resolution;
1365/// caller-gated `Failed`/`RolledBack` rows are reapply-blocking cleanup
1366/// candidates before runner invocation. Phase 0 cleanup refuses anything other
1367/// than identity-free replay-current before delete.
1368/// Uses the bypass attribute because deleting reapply-blocking
1369/// Failed/RolledBack ledger rows requires raw SQL that is not exposed through
1370/// the public typed API.
1371// apply_one_pending carries 9 arguments because it sits at the bridge
1372// between the CLI dispatch (workspace, path, bucket info) and the
1373// library runner (config, guard, audit pool, mode). Folding these into a
1374// struct would push the same fields onto the caller and add churn for
1375// no clarity gain — the pattern matches compose_with_inputs and attune.
1376#[allow(clippy::too_many_arguments)]
1377#[djogi::deliberately_bypass_convention_with_raw_sql]
1378// JUSTIFICATION (PIN): apply_one_pending owns the shared cleanup path for
1379// caller-gated Failed/RolledBack rows via
1380// `DELETE FROM djogi_schema_migrations WHERE version = $1`. The public API has
1381// no delete operation — `select_all_ledger_rows` is read-only and
1382// `insert_pending` is write-only. This is the minimal raw SQL surface for
1383// reapply-blocking ledger-row cleanup.
1384async fn apply_one_pending(
1385    ctx: &mut djogi::context::DjogiContext,
1386    workspace: &Path,
1387    pending_file: &DiscoveredPendingPlan,
1388    config: &djogi::config::DjogiConfig,
1389    guard: &djogi::migrate::WorkspaceGuard,
1390    audit_pool: Option<&deadpool_postgres::Pool>,
1391    mode: &FakeMode,
1392    runner_identity: Option<djogi::migrate::RunnerIdentity>,
1393) -> ApplyResult {
1394    // 1. Parse pending JSON to get bucket + version + checksums.
1395    let pending = match load_verified_pending_for_apply(pending_file) {
1396        Ok(pending) => pending,
1397        Err(e) => return ApplyResult::Refused(e),
1398    };
1399
1400    let bucket = pending_file.bucket.clone();
1401
1402    // 2. Check ledger state machine for this version.
1403    match check_ledger_state(ctx, &pending.version).await {
1404        LedgerState::NotPresent => {} /* normal path */
1405        LedgerState::AlreadyApplied => {
1406            return ApplyResult::Skipped("already applied".to_string());
1407        }
1408        LedgerState::PendingOrPartial(existing_status) => {
1409            // Pending rows require explicit operator resolution.
1410            // Caller-gated Failed and RolledBack rows are reapply-blocking
1411            // cleanup candidates before runner invocation.
1412            if existing_status == LedgerStatus::Failed
1413                || existing_status == LedgerStatus::RolledBack
1414            {
1415                // #386: Phase 0 cleanup must classify before deleting.
1416                // Load the committed replay plan or fallback SQL first,
1417                // and refuse any non-identity-free Phase 0 artifact before
1418                // removing the failed/rolled_back row. This applies to both
1419                // real apply and fake apply paths.
1420                if pending.version == djogi::migrate::PHASE_ZERO_VERSION {
1421                    let cleanup_refusal = classify_phase_zero_for_cleanup(
1422                        workspace,
1423                        &bucket,
1424                        &pending.version,
1425                        &pending.checksum_up,
1426                        pending.checksum_down.as_deref(),
1427                    );
1428                    if let Some(reason) = cleanup_refusal {
1429                        return ApplyResult::Refused(format!(
1430                            "Phase 0 cleanup refused: {reason}; \
1431                             refusing before deleting {} row to prevent stale replay",
1432                            existing_status.as_db_str()
1433                        ));
1434                    }
1435                }
1436
1437                // Failed and RolledBack rows both block re-apply, but callers
1438                // gate which statuses may be cleaned before reaching this
1439                // status-agnostic DELETE helper.
1440                if let Err(e) = delete_reapply_blocking_ledger_row(ctx, &pending.version).await {
1441                    return ApplyResult::Refused(format!(
1442                        "clean {} ledger row: {e}",
1443                        existing_status.as_db_str()
1444                    ));
1445                }
1446            } else {
1447                return ApplyResult::Refused(format!(
1448                    "version already in {} state — resolve before re-applying",
1449                    existing_status.as_db_str()
1450                ));
1451            }
1452        }
1453    }
1454
1455    // 3. Load committed replay plan (or fall back to single-segment).
1456    let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1457        workspace,
1458        &bucket,
1459        &pending.version,
1460        &pending.checksum_up,
1461        pending.checksum_down.as_deref(),
1462    ) {
1463        Ok(result) => result,
1464        Err(e) => {
1465            return ApplyResult::Refused(format!("load replay plan: {e}"));
1466        }
1467    };
1468
1469    // 4. Construct RunnerCtx.
1470    let runner_ctx = RunnerCtx {
1471        bucket: bucket.clone(),
1472        version: pending.version.clone(),
1473        description: pending.slug.clone(),
1474        checksum_up,
1475        checksum_down,
1476        snapshot: Some(pending.model_snapshot.clone()),
1477        snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
1478        // MigrateConfig does not derive Clone; construct from fields.
1479        config: djogi::config::MigrateConfig {
1480            concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1481            strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1482            pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1483            pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1484        },
1485        out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1486        audit_pool: audit_pool.cloned(),
1487        runner_identity,
1488    };
1489
1490    // 5. Apply (or fake-apply) the plan through the library runner.
1491    let runner_result = match mode {
1492        FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1493        FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1494    };
1495    match runner_result {
1496        Ok(_) => ApplyResult::Ok,
1497        Err(e) => ApplyResult::RunnerError(e),
1498    }
1499}
1500
1501/// Ledger state for a given migration version.
1502#[derive(Debug)]
1503enum LedgerState {
1504    /// No row exists — first apply.
1505    NotPresent,
1506    /// Row exists and is in terminal applied state.
1507    AlreadyApplied,
1508    /// Row exists in a non-terminal state with the specific status.
1509    PendingOrPartial(LedgerStatus),
1510}
1511
1512/// Check the ledger for an existing row matching `version`.
1513async fn check_ledger_state(ctx: &mut djogi::context::DjogiContext, version: &str) -> LedgerState {
1514    let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1515        // Ledger table might not exist yet — treat as NotPresent so
1516        // the runner can bootstrap it.
1517        return LedgerState::NotPresent;
1518    };
1519
1520    let existing = rows.iter().find(|r| r.version == version);
1521    match existing {
1522        None => LedgerState::NotPresent,
1523        Some(row) => match row.status {
1524            LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1525                LedgerState::AlreadyApplied
1526            }
1527            LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1528                LedgerState::PendingOrPartial(row.status)
1529            }
1530        },
1531    }
1532}
1533
1534/// Map a [`RunnerError`] to an exit code.
1535/// All runner errors map to exit code 1 (apply failure). Exit code 2
1536/// is reserved for user-facing refusals that happen before the runner
1537/// is invoked.
1538fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1539    1
1540}
1541
1542#[djogi::deliberately_bypass_convention_with_raw_sql]
1543// JUSTIFICATION (PIN): delete_reapply_blocking_ledger_row removes a caller-
1544// gated Failed or RolledBack row so the migration can be retried. The public
1545// API has no delete operation for ledger rows — only select_all_ledger_rows
1546// and insert_pending are exposed. This DELETE is the minimal raw SQL for
1547// reapply-blocking ledger-row cleanup.
1548async fn delete_reapply_blocking_ledger_row(
1549    ctx: &mut djogi::context::DjogiContext,
1550    version: &str,
1551) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1552    ctx.raw_execute(
1553        "DELETE FROM djogi_schema_migrations WHERE version = $1",
1554        &[&version],
1555    )
1556    .await?;
1557    Ok(())
1558}
1559
1560/// Reconstruct the snapshot path for a bucket: `migrations/<database>/<app>/schema_snapshot.json`.
1561fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1562    let migrations_root = djogi::migrate::migrations_root(workspace);
1563    migrations_root
1564        .join(&bucket.database)
1565        .join(djogi::migrate::app_dirname(&bucket.app))
1566        .join("schema_snapshot.json")
1567}
1568
1569/// `djogi migrations attune` entry point.
1570/// Mode selection (per CLI flags):
1571/// | `--record-ledger` | `--squash` | resolved mode |
1572/// |-----------|-----------|---------------|
1573/// | false | false | [`AttuneMode::DiffOnly`] (read-only diff) |
1574/// | true | false | [`AttuneMode::Record`] |
1575/// | false | true | [`AttuneMode::Squash { from, publish, app }`] |
1576/// | true | true | rejected by clap (`conflicts_with`) |
1577/// Argument semantics:
1578/// - `target` is an optional positional Git target (commit / tag /
1579///   branch). When supplied, attune resolves it (local first, fetch
1580///   on miss) before any DB / disk mutation.
1581/// - `apply` gates DB / disk mutation. Without it, every mode is a
1582///   dry-run.
1583/// - `record` controls the parent repo's recorded submodule pointer
1584///   (separate from `record_ledger`, which controls the
1585///   `djogi_schema_migrations` ledger inserts).
1586///   `--squash` requires `--from <ver>`; an absent `from` while
1587///   `--squash` is set surfaces as a CLI error before any work happens.
1588// The CLI dispatch carries 11 inputs because the attune surface is
1589// the broadest in the migrations CLI — target
1590// resolution + dry-run + record-ledger + record-pointer + squash +
1591// publish all live on the same command. Folding them into a struct
1592// would push the same fields onto the caller; the dispatch above
1593// already passes them positionally and a struct refactor would be
1594// churn for no clarity gain.
1595#[allow(clippy::too_many_arguments)]
1596pub fn attune_cmd(
1597    target: Option<&str>,
1598    apply: bool,
1599    record: bool,
1600    record_ledger: bool,
1601    record_reason: &str,
1602    squash: bool,
1603    from: Option<&str>,
1604    publish: bool,
1605    app: Option<&str>,
1606    workspace: Option<PathBuf>,
1607) -> ExitCode {
1608    let workspace = resolve_workspace(workspace);
1609    let mode = match (record_ledger, squash) {
1610        (false, false) => AttuneMode::DiffOnly,
1611        (true, false) => AttuneMode::Record {
1612            reason: record_reason.to_string(),
1613        },
1614        (false, true) => match from {
1615            Some(v) if !v.is_empty() => AttuneMode::Squash {
1616                from: v.to_string(),
1617                publish,
1618                app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1619            },
1620            _ => {
1621                eprintln!(
1622                    "djogi migrations attune --squash requires --from <version> (e.g. \
1623                     `--from V20260101000000__init`)"
1624                );
1625                return ExitCode::from(2);
1626            }
1627        },
1628        (true, true) => {
1629            // Already rejected by clap's `conflicts_with`; this branch
1630            // is defensive in case the flag is added programmatically.
1631            eprintln!(
1632                "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1633            );
1634            return ExitCode::from(2);
1635        }
1636    };
1637
1638    let runtime = match tokio::runtime::Builder::new_current_thread()
1639        .enable_all()
1640        .build()
1641    {
1642        Ok(r) => r,
1643        Err(e) => {
1644            eprintln!("djogi migrations attune: tokio runtime: {e}");
1645            return ExitCode::from(1);
1646        }
1647    };
1648
1649    let target_owned = target.map(str::to_string);
1650    let exit =
1651        runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1652    ExitCode::from(exit as u8)
1653}
1654
1655/// Async body of [`attune_cmd`]. Loads config, builds the context,
1656/// acquires the workspace lock, invokes the library entry point.
1657async fn run_attune(
1658    workspace: &Path,
1659    mode: AttuneMode,
1660    target: Option<String>,
1661    apply: bool,
1662    record: bool,
1663) -> i32 {
1664    use djogi::config::DjogiConfig;
1665
1666    let config = match DjogiConfig::load_from_workspace(workspace) {
1667        Ok(c) => c,
1668        Err(e) => {
1669            eprintln!("djogi migrations attune: config load: {e}");
1670            return 1;
1671        }
1672    };
1673
1674    let mut ctx = match connect_and_check(&config.database.url).await {
1675        ContextOutcome::Ready(ctx) => ctx,
1676        ContextOutcome::UnsupportedVersion(e) => {
1677            crate::print_support_boundary_error("migrations attune", &e);
1678            return 2;
1679        }
1680        ContextOutcome::RuntimeError(msg) => {
1681            eprintln!("djogi migrations attune: pool: {msg}");
1682            return 1;
1683        }
1684    };
1685
1686    // All three modes acquire the workspace lock per the v3 file-lock
1687    // contract — even DiffOnly takes the lock so a concurrent compose
1688    // / apply cannot mutate the tree mid-scan.
1689    let lock_path = workspace.join(LOCK_FILE_NAME);
1690    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1691        Ok(g) => g,
1692        Err(e) => {
1693            eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
1694            return 1;
1695        }
1696    };
1697
1698    let req = AttuneRequest {
1699        workspace_root: workspace,
1700        database_url: &config.database.url,
1701        profile: &config.profile,
1702        // Thread `[database].dev_mode` to the squash gate. Read-only modes
1703        // (`DiffOnly`, `Record`) ignore it; `Squash` mode refuses unless
1704        // this is `true`.
1705        dev_mode: config.database.dev_mode,
1706        // The operator-supplied target + the `--apply` / `--record` gates
1707        // flow through to the library
1708        // entry point. The library owns the resolution + parent-pointer
1709        // update; the CLI is just plumbing.
1710        target: target.as_deref(),
1711        apply,
1712        record,
1713        mode,
1714        _guard: &guard,
1715    };
1716    match attune(&mut ctx, req).await {
1717        Ok(report) => {
1718            if report.entries.is_empty() {
1719                println!("attune: no drift");
1720            } else {
1721                for entry in &report.entries {
1722                    let app_display = if entry.bucket.app.is_empty() {
1723                        "_global_"
1724                    } else {
1725                        entry.bucket.app.as_str()
1726                    };
1727                    println!(
1728                        "  {kind:<10}  {database}/{app}  {version}",
1729                        kind = entry.kind.as_str(),
1730                        database = entry.bucket.database,
1731                        app = app_display,
1732                        version = entry.version,
1733                    );
1734                }
1735            }
1736            // Surface structured diagnostics — today this carries the
1737            // LedgerTableMissing notice when DiffOnly runs on a
1738            // fresh database.
1739            for diag in &report.diagnostics {
1740                println!("  diagnostic: {diag}");
1741            }
1742            if let Some(sha) = &report.resolved_target {
1743                println!("resolved target: {sha}");
1744            }
1745            if let Some(squashed) = &report.squashed_to {
1746                println!("squashed to: {squashed}");
1747            }
1748            if report.published {
1749                println!("published to remote");
1750            }
1751            if report.parent_pointer_updated {
1752                println!("parent submodule pointer updated");
1753            }
1754            0
1755        }
1756        Err(e) => {
1757            eprintln!("djogi migrations attune: {e}");
1758            attune_error_exit_code(&e)
1759        }
1760    }
1761}
1762
1763/// Map an [`AttuneError`] variant onto the documented exit-code
1764/// matrix (`docs/spec/configuration.md` §14):
1765/// - Refusal variants → exit code `2` ("operator must intervene;
1766///   nothing happened"). Today every refusal flows through
1767///   [`AttuneError::Refused`]; the localhost gate, the dev-profile
1768///   gate, the missing-version refusal, and the ambiguous-version
1769///   refusal are all reachable through that variant.
1770/// - Runtime variants → exit code `1` ("we tried; something broke"
1771///   filesystem scan, ledger query, SQL read/write/delete, git
1772///   publish). CI may safely retry these.
1773///   Pulled out as a free function so unit tests can pin every variant
1774///   without spinning a Tokio runtime. Operators rely on the 1-vs-2
1775///   distinction to tell "refused before any side effect" from "ran and
1776///   failed mid-flight".
1777fn attune_error_exit_code(err: &AttuneError) -> i32 {
1778    match err {
1779        AttuneError::Refused(_) => 2,
1780        AttuneError::FilesystemScanFailed { .. }
1781        | AttuneError::LedgerQueryFailed { .. }
1782        | AttuneError::SqlReadFailed { .. }
1783        | AttuneError::SqlWriteFailed { .. }
1784        | AttuneError::SqlDeleteFailed { .. }
1785        | AttuneError::GitPublishFailed { .. }
1786        | AttuneError::GitTargetResolveFailed { .. }
1787        | AttuneError::GitFetchFailed { .. }
1788        | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
1789    }
1790}
1791
1792/// `djogi migrations verify` entry point.
1793/// Read-only — does not acquire the workspace lock. Reads the live
1794/// Postgres catalog via [`djogi::context::DjogiContext`] and compares
1795/// against the projected schema from the descriptor inventory.
1796/// Exit codes: 0 on success (no error-level diagnostics), 1 on runtime
1797/// error (config / network / SQL / projection), 2 on refusal
1798/// (below PG 18).
1799pub fn verify_cmd(
1800    provider: &dyn DescriptorProvider,
1801    workspace: Option<PathBuf>,
1802    strict: bool,
1803) -> ExitCode {
1804    let workspace = resolve_workspace(workspace);
1805
1806    let runtime = match tokio::runtime::Builder::new_current_thread()
1807        .enable_all()
1808        .build()
1809    {
1810        Ok(r) => r,
1811        Err(e) => {
1812            eprintln!("djogi migrations verify: tokio runtime: {e}");
1813            return ExitCode::from(1);
1814        }
1815    };
1816
1817    let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
1818    ExitCode::from(exit as u8)
1819}
1820
1821/// Async body of [`verify_cmd`]. Returns the desired exit code.
1822/// Verify is multi-database aware: each `(database, app)` bucket is routed
1823/// to the pool for its `database` component via [`resolve_bucket_url`], and
1824/// the per-database context is connected lazily and cached so a database
1825/// with several app buckets connects once. The bucket set is the UNION of
1826/// the inventory projection and the on-disk snapshot tree, so an orphaned
1827/// snapshot (a removed app's snapshot still on disk) is verified and
1828/// surfaces drift rather than being silently skipped .
1829/// Exit codes:
1830/// - `0` — every bucket verified with no error-severity diagnostic.
1831/// - `1` — at least one runtime failure (pool / snapshot / verify error)
1832///   or at least one bucket reported an error-severity diagnostic.
1833/// - `2` — the server is below the minimum supported Postgres version
1834///   (a server-global refusal: verify returns immediately).
1835async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
1836    use djogi::config::DjogiConfig;
1837
1838    // 0. Zero-descriptor refusal (§5.6 / REQ-370-8). `verify` refuses with
1839    // the dual-cause diagnostic + exit 2 ONLY when there are NEITHER
1840    // descriptors NOR on-disk snapshots — the genuinely unusable state
1841    // (a standalone binary with nothing to verify against). When
1842    // snapshots exist, verify DEGRADES to snapshot-only (the union below
1843    // enumerates the disk buckets), so we must not refuse here.
1844    // Guard on `provider.models().is_empty()` rather than the projected
1845    // `bucket_set`: projection always seeds the synthetic global bucket
1846    // (`(main, "")`), so the bucket set is never empty and is the wrong
1847    // signal for "no descriptors". This is the same guard the
1848    // compose/schema/docs gates in `lib.rs` use.
1849    if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
1850        crate::print_zero_descriptor_diagnostic("migrations verify");
1851        return 2;
1852    }
1853
1854    // 1. Load config from workspace.
1855    let config = match DjogiConfig::load_from_workspace(workspace) {
1856        Ok(c) => c,
1857        Err(e) => {
1858            eprintln!("djogi migrations verify: config load: {e}");
1859            return 1;
1860        }
1861    };
1862
1863    // 2. Project schema from descriptor provider.
1864    let models = match project_from_provider(provider) {
1865        Ok(m) => m,
1866        Err(e) => {
1867            eprintln!("djogi migrations verify: projection error: {e}");
1868            return 1;
1869        }
1870    };
1871
1872    // 3. Build the bucket set as the UNION of the inventory projection and
1873    // the on-disk snapshot tree . An orphaned snapshot
1874    // a removed app whose snapshot still sits on disk — is absent from
1875    // `models` but present on disk; without the union it would never be
1876    // verified and out-of-band drift would go unreported.
1877    let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
1878        models.keys().cloned().collect();
1879    for bucket in discover_snapshot_buckets_on_disk(workspace) {
1880        bucket_set.insert(bucket);
1881    }
1882    // The zero-descriptor refusal (step 0) already returned for the only
1883    // state that yields an empty bucket set (no descriptors + no snapshots).
1884    // Projection always seeds the synthetic global bucket, so reaching here
1885    // with an empty set is impossible; if a future projection change ever
1886    // breaks that invariant, fail closed with the dual-cause refusal rather
1887    // than silently reporting success on a binary that verified nothing.
1888    if bucket_set.is_empty() {
1889        crate::print_zero_descriptor_diagnostic("migrations verify");
1890        return 2;
1891    }
1892
1893    // 4. Policy configuration for the --strict flag.
1894    let policy = djogi::config::PolicyConfig {
1895        strict_out_of_order: strict,
1896    };
1897
1898    // 5. Pre-compute the set of databases that have at least one INVENTORY
1899    // bucket with non-empty models. Orphan-only databases (snapshots on
1900    // disk but no registered models) are excluded — `unwrap_or(false)`
1901    // treats a disk-only bucket as model-less. This gates D699 inside
1902    // `verify_bucket`: an orphan-only database has no live tables to
1903    // miss, so D601 is the actionable signal instead.
1904    let database_has_models: std::collections::HashSet<String> = bucket_set
1905        .iter()
1906        .filter(|b| {
1907            models
1908                .get(*b)
1909                .map(|s| !s.models.is_empty())
1910                .unwrap_or(false)
1911        })
1912        .map(|b| b.database.clone())
1913        .collect();
1914
1915    // 6. Per-database context cache + dedup sets. Contexts are connected
1916    // lazily (only for databases that have a bucket needing a live read)
1917    // and reused across that database's app buckets. `seen_ledger_databases`
1918    // ensures the ledger-lifecycle diagnostics (D621/D622/D699) are
1919    // emitted once per database, not once per app bucket.
1920    let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
1921        std::collections::BTreeMap::new();
1922    let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
1923    let mut exit_code: i32 = 0;
1924
1925    // 7. Verify each bucket.
1926    for bucket in &bucket_set {
1927        // a. Resolve the per-database URL.
1928        let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
1929            let bd = if bucket.app.is_empty() {
1930                "_global_"
1931            } else {
1932                &bucket.app
1933            };
1934            eprintln!(
1935                "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
1936                 check that config.database.url has a valid path component",
1937                bucket.database, bucket.database, bd
1938            );
1939            exit_code = 1;
1940            continue;
1941        };
1942
1943        // b. Connect (lazily, once per distinct database). PG < 18 is a
1944        // server-global refusal — there is no point continuing to other
1945        // buckets, so we return 2 immediately.
1946        if !contexts.contains_key(&bucket.database) {
1947            match connect_and_check(&url).await {
1948                ContextOutcome::Ready(ctx) => {
1949                    contexts.insert(bucket.database.clone(), ctx);
1950                }
1951                ContextOutcome::UnsupportedVersion(e) => {
1952                    crate::print_support_boundary_error("migrations verify", &e);
1953                    return 2;
1954                }
1955                ContextOutcome::RuntimeError(msg) => {
1956                    eprintln!(
1957                        "djogi migrations verify: pool for '{}': {msg}",
1958                        bucket.database
1959                    );
1960                    exit_code = 1;
1961                    continue;
1962                }
1963            }
1964        }
1965
1966        // c. Load the snapshot. A missing snapshot for a bucket that HAS
1967        // registered models is a hard error (exit 1) — the operator must
1968        // record a baseline; a missing snapshot for a model-less bucket
1969        // is informational.
1970        let snap_path = snapshot_path(workspace, bucket);
1971        let snapshot = match load_snapshot(&snap_path) {
1972            Ok(s) => s,
1973            Err(SnapshotError::Io { source, .. })
1974                if source.kind() == std::io::ErrorKind::NotFound =>
1975            {
1976                let bd = if bucket.app.is_empty() {
1977                    "_global_"
1978                } else {
1979                    &bucket.app
1980                };
1981                let has_models = models
1982                    .get(bucket)
1983                    .map(|s| !s.models.is_empty())
1984                    .unwrap_or(false);
1985                if has_models {
1986                    eprintln!(
1987                        "djogi migrations verify: {}/{} has registered models but no \
1988                         snapshot; run `djogi migrations compose` then \
1989                         `djogi migrations apply` to record a baseline",
1990                        bucket.database, bd
1991                    );
1992                    exit_code = 1;
1993                } else {
1994                    println!("No snapshot found for bucket {}/{}", bucket.database, bd);
1995                }
1996                continue;
1997            }
1998            Err(e) => {
1999                let bd = if bucket.app.is_empty() {
2000                    "_global_"
2001                } else {
2002                    &bucket.app
2003                };
2004                eprintln!(
2005                    "djogi migrations verify: load snapshot for {}/{}: {e}",
2006                    bucket.database, bd
2007                );
2008                exit_code = 1;
2009                continue;
2010            }
2011        };
2012
2013        // d. Compute ledger-emission flags. The ledger is shared per
2014        // database; emit its lifecycle diagnostics once per database
2015        // (the first bucket of each database that reaches this point),
2016        // and only for databases that actually have registered models.
2017        let db_has_models = database_has_models.contains(&bucket.database);
2018        let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
2019
2020        // e. Run the bucket-scoped verify against the routed context.
2021        let ctx = contexts
2022            .get_mut(&bucket.database)
2023            .expect("context inserted above");
2024        let report = match djogi::migrate::verify_bucket(
2025            ctx,
2026            bucket,
2027            &snapshot,
2028            &policy,
2029            emit_ledger,
2030            db_has_models,
2031        )
2032        .await
2033        {
2034            Ok(r) => r,
2035            Err(e) => {
2036                let bd = if bucket.app.is_empty() {
2037                    "_global_"
2038                } else {
2039                    &bucket.app
2040                };
2041                eprintln!(
2042                    "djogi migrations verify: error for {}/{}: {e}",
2043                    bucket.database, bd
2044                );
2045                exit_code = 1;
2046                continue;
2047            }
2048        };
2049
2050        // f. Render and fold the bucket's error state into the exit code.
2051        for line in render_verify_report(&report, bucket) {
2052            println!("{line}");
2053        }
2054        if report.has_errors() {
2055            exit_code = 1;
2056        }
2057    }
2058
2059    exit_code
2060}
2061
2062/// Render a [`VerifyReport`] to a vector of output lines.
2063/// Format: one line per diagnostic with severity prefix, code, location,
2064/// and message. Summary line at the end. Output is deterministic because
2065/// `report.diagnostics` is already sorted by `(code, location)`.
2066/// Returns the lines instead of printing directly so the rendering is unit-
2067/// testable ; the caller iterates the returned vector and prints each
2068/// line. Blank separator lines are returned as empty strings.
2069fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
2070    let mut lines: Vec<String> = Vec::new();
2071
2072    let app_display = if bucket.app.is_empty() {
2073        "_global_"
2074    } else {
2075        &bucket.app
2076    };
2077    lines.push(format!(
2078        "djogi migrations verify — {}/{}",
2079        bucket.database, app_display
2080    ));
2081    lines.push("──────────────────────────────────────────".to_string());
2082
2083    match (
2084        &report.latest_applied_version,
2085        report.applied_count,
2086        report.unfinished_count,
2087    ) {
2088        (Some(version), applied, 0) => {
2089            lines.push(format!("Ledger: {applied} applied, latest {version}"));
2090        }
2091        (Some(version), applied, unfinished) => {
2092            lines.push(format!(
2093                "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
2094            ));
2095        }
2096        (None, 0, 0) => {
2097            lines.push("Ledger: empty (no migrations applied yet)".to_string());
2098        }
2099        _ => {}
2100    }
2101    lines.push(String::new());
2102
2103    if report.diagnostics.is_empty() {
2104        lines.push("No drift detected. Schema is consistent.".to_string());
2105    } else {
2106        for d in &report.diagnostics {
2107            let severity = match d.severity {
2108                VerifySeverity::Info => "INFO",
2109                VerifySeverity::Warning => "WARN",
2110                VerifySeverity::Error => "ERROR",
2111            };
2112            let location = d.location.as_deref().unwrap_or("-");
2113            lines.push(format!(
2114                "[{severity}] {code} ({loc}): {msg}",
2115                severity = severity,
2116                code = d.code,
2117                loc = location,
2118                msg = d.message
2119            ));
2120        }
2121    }
2122
2123    let errors = report
2124        .diagnostics
2125        .iter()
2126        .filter(|d| d.severity == VerifySeverity::Error)
2127        .count();
2128    let warnings = report
2129        .diagnostics
2130        .iter()
2131        .filter(|d| d.severity == VerifySeverity::Warning)
2132        .count();
2133    let infos = report
2134        .diagnostics
2135        .iter()
2136        .filter(|d| d.severity == VerifySeverity::Info)
2137        .count();
2138
2139    if errors > 0 {
2140        lines.push(String::new());
2141        lines.push(format!(
2142            "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
2143        ));
2144    } else if warnings > 0 {
2145        lines.push(String::new());
2146        lines.push(format!(
2147            "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
2148        ));
2149    } else {
2150        lines.push(String::new());
2151        lines.push(format!("Result: PASSED ({infos} info(s))"));
2152    }
2153
2154    lines
2155}
2156
2157// ── repair subcommand dispatch ────────────────────────────────────────────
2158
2159impl From<PartialApplyResolutionCli> for PartialApplyResolution {
2160    fn from(cli: PartialApplyResolutionCli) -> Self {
2161        match cli {
2162            PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
2163            PartialApplyResolutionCli::Faked => Self::MarkFaked,
2164            PartialApplyResolutionCli::Applied => Self::MarkApplied,
2165        }
2166    }
2167}
2168
2169/// `djogi migrations repair <subcommand>` entry point.
2170/// Routes each subcommand to its glue function. The glue functions own
2171/// the runtime / config / pool / lock / report-render lifecycle; this
2172/// router only destructures the parsed clap variant.
2173pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
2174    match command {
2175        RepairSubcommand::ChecksumDrift {
2176            version,
2177            app,
2178            database,
2179            checksum_up,
2180            checksum_down,
2181            workspace,
2182        } => repair_checksum_drift_cmd(
2183            &version,
2184            app.as_deref(),
2185            database.as_deref(),
2186            checksum_up.as_deref(),
2187            checksum_down.as_deref(),
2188            workspace,
2189        ),
2190        RepairSubcommand::PartialApply {
2191            version,
2192            resolution,
2193            note,
2194            app,
2195            database,
2196            workspace,
2197        } => repair_partial_apply_cmd(
2198            &version,
2199            resolution.into(),
2200            &note,
2201            app.as_deref(),
2202            database.as_deref(),
2203            workspace,
2204        ),
2205        RepairSubcommand::ResumePartial {
2206            version,
2207            app,
2208            database,
2209            workspace,
2210            node_id,
2211            single_node_dev,
2212        } => repair_resume_partial_apply_cmd(
2213            &version,
2214            app.as_deref(),
2215            database.as_deref(),
2216            workspace,
2217            node_id,
2218            single_node_dev,
2219        ),
2220        RepairSubcommand::SnapshotRebuild {
2221            app,
2222            database,
2223            snapshot_path,
2224            workspace,
2225        } => repair_snapshot_rebuild_cmd(
2226            app.as_deref(),
2227            database.as_deref(),
2228            snapshot_path.as_deref(),
2229            workspace,
2230        ),
2231    }
2232}
2233
2234/// Render a [`RepairReport`] to stdout. Shared across all four repair
2235/// glue functions so the operator sees a consistent action / ledger /
2236/// snapshot summary regardless of which repair ran.
2237fn render_repair_report(report: &RepairReport) {
2238    for action in &report.actions_taken {
2239        println!("  {action}");
2240    }
2241    if !report.ledger_changes.is_empty() {
2242        println!("Ledger changes:");
2243        for lc in &report.ledger_changes {
2244            println!(
2245                "  {} | {} | {} -> {}",
2246                lc.version, lc.column, lc.before, lc.after,
2247            );
2248        }
2249    }
2250    if !report.snapshot_changes.is_empty() {
2251        println!("Snapshot changes:");
2252        for sc in &report.snapshot_changes {
2253            println!("  {} | {}", sc.path.display(), sc.description);
2254        }
2255    }
2256}
2257
2258/// Map a [`RepairError`] onto the CLI exit-code contract.
2259/// `RepairError` is NOT `#[non_exhaustive]`, so this match is
2260/// **exhaustive with NO `_ =>` wildcard** by deliberate design: a future
2261/// variant breaks compilation here, forcing a conscious exit-code
2262/// classification rather than silently bucketing an unclassified error.
2263/// Classification rule — when a new variant is added, classify it the
2264/// same way:
2265/// - **Exit 1 (retryable):** variants wrapping a transient I/O /
2266///   connection / pool / SQL failure (a `source: DjogiError`, snapshot
2267///   filesystem I/O, or advisory-lock contention). A retry may succeed.
2268/// - **Exit 2 (refusal):** structural refusals and ledger-logic guards
2269///   that require operator intervention. A blind retry hits the same
2270///   refusal.
2271fn repair_error_exit_code(err: &RepairError) -> i32 {
2272    match err {
2273        // ── Exit 1: transient I/O / connection / pool / SQL failures.
2274        // These wrap a DjogiError (network, connection, query) or a
2275        // filesystem error and may succeed on retry.
2276        RepairError::LedgerIo { .. }                  // ledger DB I/O
2277        | RepairError::SnapshotIo { .. }              // snapshot filesystem I/O
2278        | RepairError::AdvisoryLockFailed { .. }      // lock held by a concurrent runner; retry after it releases
2279        | RepairError::AdvisoryLockQueryFailed { .. } // pg_try_advisory_lock query itself errored
2280        | RepairError::PinnedSessionCheckoutFailed { .. } // could not check out a pinned session from the pool
2281        | RepairError::ResumeStepFailed { .. }        // a replayed statement failed; partial state recorded, retryable
2282        | RepairError::ResumeProgressAckFailed { .. } // step committed but the progress ack write failed; retryable
2283        => 1,
2284
2285        // ── Exit 2: refusals and structural / ledger-logic guards.
2286        // The operator must investigate and intervene; a blind retry
2287        // would hit the same refusal.
2288        RepairError::VersionNotFound { .. }
2289        | RepairError::InsufficientConfirmation
2290        | RepairError::InvalidChecksum { .. }
2291        | RepairError::InvalidResolution { .. }
2292        | RepairError::BucketAppMismatch { .. }
2293        | RepairError::PlanVersionMismatch { .. }
2294        | RepairError::PlanChecksumMismatch { .. }
2295        | RepairError::LeafIdentityMismatch { .. }
2296        | RepairError::NothingToResume { .. }
2297        | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
2298        | RepairError::SuppliedSnapshotDiverges { .. }
2299        | RepairError::AdvisoryUnlockReturnedFalse { .. } // session-pinning correctness failure — not a blind retry
2300        | RepairError::ResumePlanShapeMismatch { .. }
2301        | RepairError::ReplayPlanShapeMismatch { .. }
2302        | RepairError::PhaseZeroArtifactRefused { .. }  // #386: refusal — operator must replace the stale file
2303        | RepairError::MissingResumeIdentity { .. }     // #386: refusal — operator must supply identity for resume
2304        => 2,
2305    }
2306}
2307
2308/// Resolve the database name for bucket construction. Uses the explicit
2309/// `--database` flag if provided, otherwise defaults to `"main"` (the
2310/// global database name — see [`djogi::apps::AppDescriptor::GLOBAL_DATABASE`]).
2311/// `_config` is threaded so this single helper can grow a config-driven
2312/// default database (should `DjogiConfig` gain one) without changing
2313/// every call site.
2314fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
2315    database.unwrap_or("main").to_string()
2316}
2317
2318/// Compute the `V1:`-prefixed checksum of a committed up SQL file on disk,
2319/// using the canonical fragment-level domain (strips the composed-file
2320/// header and label comments, matching what compose stores in the ledger).
2321/// The naive whole-file checksum is WRONG here: compose stores checksums
2322/// computed over the [`djogi::migrate::OperationSql`] fragments only,
2323/// without the rendered file's `-- Djogi composed migration — up` header
2324/// or the per-statement label comment lines. Recomputing over the full
2325/// file content would never match the ledger value, so the drift repair
2326/// would write a checksum that immediately re-drifts. Delegating to
2327/// [`djogi::migrate::compute_committed_sql_checksum`] keeps the CLI's
2328/// recompute path in the same domain as compose.
2329/// Returns the underlying [`std::io::Error`] unchanged so the caller can
2330/// surface a missing/unreadable up file as a retryable I/O error.
2331fn compute_checksum_up_from_disk(
2332    workspace: &Path,
2333    bucket: &djogi::migrate::BucketKey,
2334    version: &str,
2335) -> std::io::Result<String> {
2336    let path =
2337        djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
2338    let sql = std::fs::read_to_string(&path)?;
2339    Ok(djogi::migrate::compute_committed_sql_checksum(
2340        &sql,
2341        djogi::migrate::ResetSqlSide::Up,
2342    ))
2343}
2344
2345/// Compute the canonical checksum of a committed down SQL file on disk,
2346/// using the same fragment-level domain as compose (see
2347/// [`compute_checksum_up_from_disk`] for why the whole-file checksum is
2348/// wrong).
2349/// Returns `Ok(None)` when the file is absent
2350/// ([`std::io::ErrorKind::NotFound`]) or when the file contains only SQL
2351/// comments — both map onto compose's `NULL` `checksum_down` sentinel for
2352/// comment-only down files. Returns `Err` for any other I/O failure so a
2353/// retry after the file is restored can succeed.
2354fn compute_checksum_down_from_disk(
2355    workspace: &Path,
2356    bucket: &djogi::migrate::BucketKey,
2357    version: &str,
2358) -> std::io::Result<Option<String>> {
2359    let path =
2360        djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
2361    let sql = match std::fs::read_to_string(&path) {
2362        Ok(s) => s,
2363        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
2364        Err(e) => return Err(e),
2365    };
2366    Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
2367}
2368
2369/// `djogi migrations repair checksum-drift` entry point.
2370/// Updates the `checksum_up` / `checksum_down` columns of an
2371/// already-applied ledger row after its committed SQL was edited. When
2372/// `--checksum-up` / `--checksum-down` are omitted, the checksums are
2373/// recomputed from the committed files on disk (a missing down file is a
2374/// no-op; any other read error aborts with exit 1).
2375pub fn repair_checksum_drift_cmd(
2376    version: &str,
2377    app: Option<&str>,
2378    database: Option<&str>,
2379    checksum_up: Option<&str>,
2380    checksum_down: Option<&str>,
2381    workspace: Option<PathBuf>,
2382) -> ExitCode {
2383    let workspace = resolve_workspace(workspace);
2384    let runtime = match tokio::runtime::Builder::new_current_thread()
2385        .enable_all()
2386        .build()
2387    {
2388        Ok(r) => r,
2389        Err(e) => {
2390            eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
2391            return ExitCode::from(1);
2392        }
2393    };
2394    let exit = runtime.block_on(async {
2395        run_repair_checksum_drift(
2396            &workspace,
2397            version,
2398            app,
2399            database,
2400            checksum_up,
2401            checksum_down,
2402        )
2403        .await
2404    });
2405    ExitCode::from(exit as u8)
2406}
2407
2408/// Async body of [`repair_checksum_drift_cmd`]. Returns the desired exit code.
2409async fn run_repair_checksum_drift(
2410    workspace: &Path,
2411    version: &str,
2412    app: Option<&str>,
2413    database: Option<&str>,
2414    checksum_up: Option<&str>,
2415    checksum_down: Option<&str>,
2416) -> i32 {
2417    use djogi::config::DjogiConfig;
2418
2419    let config = match DjogiConfig::load_from_workspace(workspace) {
2420        Ok(c) => c,
2421        Err(e) => {
2422            eprintln!("djogi migrations repair checksum-drift: config load: {e}");
2423            return 1;
2424        }
2425    };
2426
2427    // Resolve the per-database URL BEFORE connecting: `--database
2428    // crud_log` / `event_log` operate on a different bucket's ledger than
2429    // the app DB, so connecting to `config.database.url` first would
2430    // silently mutate the wrong database.
2431    let db_name = resolve_database(database, &config);
2432    let url = match resolve_bucket_url(&config.database, &db_name) {
2433        Some(u) => u,
2434        None => {
2435            eprintln!(
2436                "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
2437            );
2438            return 2;
2439        }
2440    };
2441
2442    let mut ctx = match connect_and_check(&url).await {
2443        ContextOutcome::Ready(ctx) => ctx,
2444        ContextOutcome::UnsupportedVersion(e) => {
2445            crate::print_support_boundary_error("migrations repair checksum-drift", &e);
2446            return 2;
2447        }
2448        ContextOutcome::RuntimeError(msg) => {
2449            eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
2450            return 1;
2451        }
2452    };
2453
2454    let lock_path = workspace.join(LOCK_FILE_NAME);
2455    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2456        Ok(g) => g,
2457        Err(e) => {
2458            eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
2459            return 1;
2460        }
2461    };
2462
2463    let app_label = app.unwrap_or("");
2464    let bucket = BucketKey {
2465        database: db_name,
2466        app: app_label.to_string(),
2467    };
2468
2469    let new_checksum_up = match checksum_up {
2470        Some(c) => c.to_string(),
2471        None => {
2472            // Auto-compute from the committed up SQL file on disk. A
2473            // missing or unreadable up file is an environment I/O error,
2474            // not an operator-facing refusal — exit 1 (same class as the
2475            // down file's non-NotFound branch below), so a retry after
2476            // the file is restored can succeed.
2477            match compute_checksum_up_from_disk(workspace, &bucket, version) {
2478                Ok(cs) => cs,
2479                Err(e) => {
2480                    eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
2481                    return 1;
2482                }
2483            }
2484        }
2485    };
2486
2487    let resolved_checksum_down = match checksum_down {
2488        Some(c) => Some(c.to_string()),
2489        None => {
2490            // Auto-compute from the down file; a missing down file (or a
2491            // comment-only down file) is a no-op (no down checksum), other
2492            // read errors surface. NotFound is folded into `Ok(None)` by
2493            // `compute_checksum_down_from_disk`.
2494            match compute_checksum_down_from_disk(workspace, &bucket, version) {
2495                Ok(cs_opt) => cs_opt,
2496                Err(e) => {
2497                    eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
2498                    return 1;
2499                }
2500            }
2501        }
2502    };
2503
2504    match repair_checksum_drift(
2505        &mut ctx,
2506        &guard,
2507        &bucket,
2508        version,
2509        workspace,
2510        &new_checksum_up,
2511        resolved_checksum_down.as_deref(),
2512        RepairConfirmation::OperatorAcknowledged,
2513    )
2514    .await
2515    {
2516        Ok(report) => {
2517            render_repair_report(&report);
2518            0
2519        }
2520        Err(e) => {
2521            eprintln!("djogi migrations repair checksum-drift: {e}");
2522            repair_error_exit_code(&e)
2523        }
2524    }
2525}
2526
2527/// `djogi migrations repair partial-apply` entry point.
2528/// Resolves a partial-apply ledger row by rewriting its status to
2529/// `rolled_back`, `faked`, or `applied`. No SQL executes — only the
2530/// ledger row is mutated.
2531pub fn repair_partial_apply_cmd(
2532    version: &str,
2533    resolution: PartialApplyResolution,
2534    note: &str,
2535    app: Option<&str>,
2536    database: Option<&str>,
2537    workspace: Option<PathBuf>,
2538) -> ExitCode {
2539    let workspace = resolve_workspace(workspace);
2540    let runtime = match tokio::runtime::Builder::new_current_thread()
2541        .enable_all()
2542        .build()
2543    {
2544        Ok(r) => r,
2545        Err(e) => {
2546            eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
2547            return ExitCode::from(1);
2548        }
2549    };
2550    let exit = runtime.block_on(async {
2551        run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
2552    });
2553    ExitCode::from(exit as u8)
2554}
2555
2556/// Async body of [`repair_partial_apply_cmd`]. Returns the desired exit code.
2557async fn run_repair_partial_apply(
2558    workspace: &Path,
2559    version: &str,
2560    resolution: PartialApplyResolution,
2561    note: &str,
2562    app: Option<&str>,
2563    database: Option<&str>,
2564) -> i32 {
2565    use djogi::config::DjogiConfig;
2566
2567    let config = match DjogiConfig::load_from_workspace(workspace) {
2568        Ok(c) => c,
2569        Err(e) => {
2570            eprintln!("djogi migrations repair partial-apply: config load: {e}");
2571            return 1;
2572        }
2573    };
2574
2575    // Resolve the per-database URL BEFORE connecting: `--database
2576    // crud_log` / `event_log` operate on a different bucket's ledger than
2577    // the app DB, so connecting to `config.database.url` first would
2578    // silently mutate the wrong database.
2579    let db_name = resolve_database(database, &config);
2580    let url = match resolve_bucket_url(&config.database, &db_name) {
2581        Some(u) => u,
2582        None => {
2583            eprintln!(
2584                "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
2585            );
2586            return 2;
2587        }
2588    };
2589
2590    let mut ctx = match connect_and_check(&url).await {
2591        ContextOutcome::Ready(ctx) => ctx,
2592        ContextOutcome::UnsupportedVersion(e) => {
2593            crate::print_support_boundary_error("migrations repair partial-apply", &e);
2594            return 2;
2595        }
2596        ContextOutcome::RuntimeError(msg) => {
2597            eprintln!("djogi migrations repair partial-apply: pool: {msg}");
2598            return 1;
2599        }
2600    };
2601
2602    let lock_path = workspace.join(LOCK_FILE_NAME);
2603    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2604        Ok(g) => g,
2605        Err(e) => {
2606            eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
2607            return 1;
2608        }
2609    };
2610
2611    let app_label = app.unwrap_or("");
2612    let bucket = BucketKey {
2613        database: db_name,
2614        app: app_label.to_string(),
2615    };
2616
2617    match repair_partial_apply(
2618        &mut ctx,
2619        &guard,
2620        &bucket,
2621        version,
2622        workspace,
2623        resolution,
2624        note,
2625        RepairConfirmation::OperatorAcknowledged,
2626    )
2627    .await
2628    {
2629        Ok(report) => {
2630            render_repair_report(&report);
2631            0
2632        }
2633        Err(e) => {
2634            eprintln!("djogi migrations repair partial-apply: {e}");
2635            repair_error_exit_code(&e)
2636        }
2637    }
2638}
2639
2640/// `djogi migrations repair resume-partial` entry point.
2641/// Resumes an interrupted non-transactional apply by loading the
2642/// committed `<version>.plan.json` and replaying its remaining steps.
2643/// Loads the committed plan directly (no CLI-level checksum pre-gate);
2644/// the library validates the plan against the ledger row internally.
2645pub fn repair_resume_partial_apply_cmd(
2646    version: &str,
2647    app: Option<&str>,
2648    database: Option<&str>,
2649    workspace: Option<PathBuf>,
2650    node_id: Option<u32>,
2651    single_node_dev: bool,
2652) -> ExitCode {
2653    let workspace = resolve_workspace(workspace);
2654    let runtime = match tokio::runtime::Builder::new_current_thread()
2655        .enable_all()
2656        .build()
2657    {
2658        Ok(r) => r,
2659        Err(e) => {
2660            eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
2661            return ExitCode::from(1);
2662        }
2663    };
2664    let exit = runtime.block_on(async {
2665        run_repair_resume_partial(&workspace, version, app, database, node_id, single_node_dev)
2666            .await
2667    });
2668    ExitCode::from(exit as u8)
2669}
2670
2671/// Async body of [`repair_resume_partial_apply_cmd`]. Returns the desired exit code.
2672async fn run_repair_resume_partial(
2673    workspace: &Path,
2674    version: &str,
2675    app: Option<&str>,
2676    database: Option<&str>,
2677    node_id: Option<u32>,
2678    single_node_dev: bool,
2679) -> i32 {
2680    use djogi::config::DjogiConfig;
2681
2682    let config = match DjogiConfig::load_from_workspace(workspace) {
2683        Ok(c) => c,
2684        Err(e) => {
2685            eprintln!("djogi migrations repair resume-partial: config load: {e}");
2686            return 1;
2687        }
2688    };
2689
2690    // Resolve node identity before any DB work.
2691    let runner_identity = match crate::identity::resolve_identity(
2692        node_id,
2693        single_node_dev,
2694        &config.profile,
2695        "repair resume-partial",
2696    ) {
2697        Ok(resolved) => Some(resolved.into_runner_identity()),
2698        Err(e) => {
2699            let _ = crate::identity::print_identity_error("repair resume-partial", &e);
2700            return 2;
2701        }
2702    };
2703
2704    // Resolve the per-database URL BEFORE connecting: `--database
2705    // crud_log` / `event_log` operate on a different bucket's ledger than
2706    // the app DB, so connecting to `config.database.url` first would
2707    // silently mutate the wrong database.
2708    let db_name = resolve_database(database, &config);
2709    let url = match resolve_bucket_url(&config.database, &db_name) {
2710        Some(u) => u,
2711        None => {
2712            eprintln!(
2713                "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
2714            );
2715            return 2;
2716        }
2717    };
2718
2719    let mut ctx = match connect_and_check(&url).await {
2720        ContextOutcome::Ready(ctx) => ctx,
2721        ContextOutcome::UnsupportedVersion(e) => {
2722            crate::print_support_boundary_error("migrations repair resume-partial", &e);
2723            return 2;
2724        }
2725        ContextOutcome::RuntimeError(msg) => {
2726            eprintln!("djogi migrations repair resume-partial: pool: {msg}");
2727            return 1;
2728        }
2729    };
2730
2731    let lock_path = workspace.join(LOCK_FILE_NAME);
2732    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2733        Ok(g) => g,
2734        Err(e) => {
2735            eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
2736            return 1;
2737        }
2738    };
2739
2740    let app_label = app.unwrap_or("");
2741    let bucket = BucketKey {
2742        database: db_name,
2743        app: app_label.to_string(),
2744    };
2745
2746    // Load the committed replay plan directly from disk — no CLI-level
2747    // checksum pre-gate, because repair_resume_partial_apply validates
2748    // plan↔ledger checksums itself. Synthesizing a whole-file checksum
2749    // here would not match the per-statement-fragment checksums stored
2750    // in the plan JSON.
2751    let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
2752        Ok(p) => p,
2753        Err(e) => {
2754            eprintln!("djogi migrations repair resume-partial: load plan: {e}");
2755            return 2;
2756        }
2757    };
2758
2759    match repair_resume_partial_apply(
2760        &mut ctx,
2761        &guard,
2762        workspace,
2763        version,
2764        &plan,
2765        runner_identity,
2766        RepairConfirmation::OperatorAcknowledged,
2767    )
2768    .await
2769    {
2770        Ok(report) => {
2771            render_repair_report(&report);
2772            0
2773        }
2774        Err(e) => {
2775            eprintln!("djogi migrations repair resume-partial: {e}");
2776            repair_error_exit_code(&e)
2777        }
2778    }
2779}
2780
2781/// Load the committed `<version>.plan.json` for `resume-partial` without
2782/// the CLI-level checksum pre-gate.
2783/// [`repair_resume_partial_apply`] validates the plan against the ledger
2784/// row internally (`PlanVersionMismatch` / `PlanChecksumMismatch`), so
2785/// re-gating here with a hand-rolled whole-file checksum would be both
2786/// wrong (the plan stores per-statement-fragment checksums) and
2787/// redundant. This helper therefore deliberately does NOT reuse
2788/// [`load_replay_plan_from_disk`] (a pending-apply helper that DOES
2789/// checksum-gate) — it reuses only that function's `CliReplay*`
2790/// deserialization + segment-conversion shape.
2791/// Returns a human-readable error string on a missing/unparseable plan
2792/// file or a format-version mismatch. A missing plan file maps to exit 2
2793/// at the call site (the committed plan is a precondition of resume).
2794fn load_committed_plan_for_resume(
2795    workspace: &Path,
2796    bucket: &djogi::migrate::BucketKey,
2797    version: &str,
2798) -> Result<djogi::migrate::MigrationPlan, String> {
2799    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
2800    let plan_path = bucket_dir.join(format!("{version}.plan.json"));
2801    let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
2802    let stored: CliReplayPlan = serde_json::from_slice(&bytes)
2803        .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
2804    if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
2805        return Err(format!(
2806            "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
2807            plan_path.display(),
2808            stored.format_version,
2809        ));
2810    }
2811    Ok(djogi::migrate::MigrationPlan {
2812        bucket: bucket.clone(),
2813        classification: stored.classification.into(),
2814        segments: stored
2815            .segments
2816            .into_iter()
2817            .map(|seg| djogi::migrate::Segment {
2818                kind: seg.kind.into(),
2819                statements: seg
2820                    .statements
2821                    .into_iter()
2822                    .map(|stmt| djogi::migrate::OperationSql {
2823                        label: stmt.label,
2824                        up: stmt.up,
2825                        down: String::new(),
2826                        lossy: None,
2827                    })
2828                    .collect(),
2829            })
2830            .collect(),
2831    })
2832}
2833
2834/// `djogi migrations repair snapshot-rebuild` entry point.
2835/// Rebuilds a bucket's schema snapshot by walking the ledger and
2836/// re-projecting from live database state. When `--snapshot-path` is
2837/// omitted, the path is derived from
2838/// `migrations/<database>/<app>/schema_snapshot.json`.
2839pub fn repair_snapshot_rebuild_cmd(
2840    app: Option<&str>,
2841    database: Option<&str>,
2842    snapshot_path: Option<&Path>,
2843    workspace: Option<PathBuf>,
2844) -> ExitCode {
2845    let workspace = resolve_workspace(workspace);
2846    let runtime = match tokio::runtime::Builder::new_current_thread()
2847        .enable_all()
2848        .build()
2849    {
2850        Ok(r) => r,
2851        Err(e) => {
2852            eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
2853            return ExitCode::from(1);
2854        }
2855    };
2856    let exit = runtime.block_on(async {
2857        run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
2858    });
2859    ExitCode::from(exit as u8)
2860}
2861
2862/// Async body of [`repair_snapshot_rebuild_cmd`]. Returns the desired exit code.
2863async fn run_repair_snapshot_rebuild(
2864    workspace: &Path,
2865    app: Option<&str>,
2866    database: Option<&str>,
2867    snapshot_path: Option<&Path>,
2868) -> i32 {
2869    use djogi::config::DjogiConfig;
2870
2871    let config = match DjogiConfig::load_from_workspace(workspace) {
2872        Ok(c) => c,
2873        Err(e) => {
2874            eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
2875            return 1;
2876        }
2877    };
2878
2879    // Resolve the per-database URL BEFORE connecting: `--database
2880    // crud_log` / `event_log` operate on a different bucket's ledger than
2881    // the app DB, so connecting to `config.database.url` first would
2882    // silently rebuild the snapshot from the wrong database.
2883    let db_name = resolve_database(database, &config);
2884    let url = match resolve_bucket_url(&config.database, &db_name) {
2885        Some(u) => u,
2886        None => {
2887            eprintln!(
2888                "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
2889            );
2890            return 2;
2891        }
2892    };
2893
2894    let mut ctx = match connect_and_check(&url).await {
2895        ContextOutcome::Ready(ctx) => ctx,
2896        ContextOutcome::UnsupportedVersion(e) => {
2897            crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
2898            return 2;
2899        }
2900        ContextOutcome::RuntimeError(msg) => {
2901            eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
2902            return 1;
2903        }
2904    };
2905
2906    let lock_path = workspace.join(LOCK_FILE_NAME);
2907    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2908        Ok(g) => g,
2909        Err(e) => {
2910            eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
2911            return 1;
2912        }
2913    };
2914
2915    let app_label = app.unwrap_or("");
2916    let bucket = BucketKey {
2917        database: db_name,
2918        app: app_label.to_string(),
2919    };
2920
2921    let snap_path = match snapshot_path {
2922        Some(p) => p.to_path_buf(),
2923        None => reconstruct_snapshot_path(workspace, &bucket),
2924    };
2925
2926    match repair_snapshot_rebuild(
2927        &mut ctx,
2928        &guard,
2929        &bucket,
2930        &snap_path,
2931        RepairConfirmation::OperatorAcknowledged,
2932    )
2933    .await
2934    {
2935        Ok(report) => {
2936            render_repair_report(&report);
2937            0
2938        }
2939        Err(e) => {
2940            eprintln!("djogi migrations repair snapshot-rebuild: {e}");
2941            repair_error_exit_code(&e)
2942        }
2943    }
2944}
2945
2946// ── baseline command ──────────────────────────────────────────────────────
2947
2948/// `djogi migrations baseline` entry point.
2949/// Establishes a baseline ledger row + snapshot for an existing
2950/// database adopted under Djogi's migration ledger. The schema already
2951/// exists, so `compose` + `apply` cannot run against the populated
2952/// database without a starting point; baseline projects the live
2953/// catalog into a single `baseline` ledger row (no SQL runs against
2954/// user tables) and persists the projected snapshot as the canonical
2955/// baseline so future migrations diff against the real DB state.
2956/// `--reason` is required and must be non-empty — it is recorded in the
2957/// ledger row's `partial_apply_note` for the audit trail. An empty
2958/// reason is a refusal (exit 2) caught before any DB work.
2959/// Exit codes: `0` success, `1` runtime error (config / pool / projection
2960/// failure), `2` refusal (empty `--reason`, unresolvable database URL,
2961/// duplicate version collision, snapshot-persist failure after ledger
2962/// insert, session-pinning correctness failure, or below PG 18).
2963#[expect(
2964    clippy::too_many_arguments,
2965    reason = "CLI command entry point mirrors clap arguments explicitly"
2966)]
2967pub fn baseline_cmd(
2968    version: &str,
2969    description: &str,
2970    reason: &str,
2971    app: Option<&str>,
2972    database: Option<&str>,
2973    workspace: Option<PathBuf>,
2974    node_id: Option<u32>,
2975    single_node_dev: bool,
2976) -> ExitCode {
2977    // Validate --reason before any expensive work, mirroring the
2978    // `apply --fake --reason` empty-reason gate. The library's
2979    // baseline_plan does not itself reject an empty reason (it records
2980    // whatever string it is handed), so the CLI owns this guard.
2981    if reason.trim().is_empty() {
2982        eprintln!(
2983            "djogi migrations baseline: --reason must not be empty; \
2984             supply a non-empty reason why this baseline is being established \
2985             (e.g. 'schema pre-exists from prior tooling'). \
2986             This is recorded in the ledger audit trail."
2987        );
2988        return ExitCode::from(2);
2989    }
2990
2991    let workspace = resolve_workspace(workspace);
2992    let runtime = match tokio::runtime::Builder::new_current_thread()
2993        .enable_all()
2994        .build()
2995    {
2996        Ok(r) => r,
2997        Err(e) => {
2998            eprintln!("djogi migrations baseline: tokio runtime: {e}");
2999            return ExitCode::from(1);
3000        }
3001    };
3002    let exit = runtime.block_on(async {
3003        run_baseline(
3004            &workspace,
3005            version,
3006            description,
3007            reason,
3008            app,
3009            database,
3010            node_id,
3011            single_node_dev,
3012        )
3013        .await
3014    });
3015    ExitCode::from(exit as u8)
3016}
3017
3018/// Async body of [`baseline_cmd`]. Returns the desired exit code.
3019/// Resolves the per-database URL BEFORE connecting (a `--database
3020/// crud_log` / `event_log` baseline targets a different bucket's ledger
3021/// than the app DB), connects + runs the PG-version preflight via
3022/// [`connect_and_check`], acquires the workspace file lock, then drives
3023/// [`baseline_plan`]. The runner projects the live schema itself and
3024/// computes the baseline checksum from that projection, so the
3025/// `RunnerCtx` is constructed with `snapshot: None` (requires the
3026/// caller NOT supply a snapshot) and an empty `checksum_up` (the
3027/// baseline path never reads it).
3028#[expect(
3029    clippy::too_many_arguments,
3030    reason = "baseline async body keeps CLI arguments explicit through validation and connection setup"
3031)]
3032async fn run_baseline(
3033    workspace: &Path,
3034    version: &str,
3035    description: &str,
3036    reason: &str,
3037    app: Option<&str>,
3038    database: Option<&str>,
3039    node_id: Option<u32>,
3040    single_node_dev: bool,
3041) -> i32 {
3042    use djogi::config::DjogiConfig;
3043
3044    let config = match DjogiConfig::load_from_workspace(workspace) {
3045        Ok(c) => c,
3046        Err(e) => {
3047            eprintln!("djogi migrations baseline: config load: {e}");
3048            return 1;
3049        }
3050    };
3051
3052    // Resolve node identity before any DB work.
3053    let runner_identity = match crate::identity::resolve_identity(
3054        node_id,
3055        single_node_dev,
3056        &config.profile,
3057        "baseline",
3058    ) {
3059        Ok(resolved) => Some(resolved.into_runner_identity()),
3060        Err(e) => {
3061            let _ = crate::identity::print_identity_error("baseline", &e);
3062            return 2;
3063        }
3064    };
3065
3066    // Resolve the per-database URL BEFORE connecting: `--database
3067    // crud_log` / `event_log` operate on a different bucket's ledger
3068    // than the app DB, so connecting to `config.database.url` first
3069    // would silently baseline the wrong database.
3070    let db_name = resolve_database(database, &config);
3071    let url = match resolve_bucket_url(&config.database, &db_name) {
3072        Some(u) => u,
3073        None => {
3074            eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
3075            return 2;
3076        }
3077    };
3078
3079    let mut ctx = match connect_and_check(&url).await {
3080        ContextOutcome::Ready(ctx) => ctx,
3081        ContextOutcome::UnsupportedVersion(e) => {
3082            crate::print_support_boundary_error("migrations baseline", &e);
3083            return 2;
3084        }
3085        ContextOutcome::RuntimeError(msg) => {
3086            eprintln!("djogi migrations baseline: pool: {msg}");
3087            return 1;
3088        }
3089    };
3090
3091    let lock_path = workspace.join(LOCK_FILE_NAME);
3092    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3093        Ok(g) => g,
3094        Err(e) => {
3095            eprintln!("djogi migrations baseline: workspace lock: {e}");
3096            return 1;
3097        }
3098    };
3099
3100    let app_label = app.unwrap_or("");
3101    let bucket = BucketKey {
3102        database: db_name,
3103        app: app_label.to_string(),
3104    };
3105
3106    let runner_ctx = RunnerCtx {
3107        bucket: bucket.clone(),
3108        version: version.to_string(),
3109        description: description.to_string(),
3110        // baseline_plan computes checksum_up from the live projection;
3111        // this field is not read on the baseline code path.
3112        checksum_up: String::new(),
3113        checksum_down: None,
3114        // baseline_plan refuses a caller-supplied snapshot — it
3115        // projects the live DB itself. Leave this None; the projection
3116        // is persisted to `snapshot_path` below.
3117        snapshot: None,
3118        snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
3119        // MigrateConfig does not derive Clone; construct from fields
3120        // (same pattern as apply_one_pending).
3121        config: djogi::config::MigrateConfig {
3122            concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
3123            strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
3124            pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
3125            pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
3126        },
3127        out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
3128        audit_pool: match djogi::migrate::resolve_audit_url(&config) {
3129            Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
3130            Err(_) => None,
3131        },
3132        runner_identity,
3133    };
3134
3135    match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
3136        Ok(report) => {
3137            println!(
3138                "djogi migrations baseline: established baseline `{}` \
3139                 (ledger_id={}) in {:.1}s",
3140                version,
3141                report.ledger_id,
3142                report.execution_time_ms as f64 / 1000.0
3143            );
3144            0
3145        }
3146        Err(e) => {
3147            eprintln!("djogi migrations baseline: {e}");
3148            baseline_error_exit_code(&e)
3149        }
3150    }
3151}
3152
3153/// Map a [`RunnerError`] produced by [`baseline_plan`] onto the CLI
3154/// exit-code contract.
3155/// The flat [`runner_error_exit_code`] (always `1`) is wrong for
3156/// baseline: a duplicate-version collision is a refusal the operator
3157/// must resolve by choosing a new version, and a blind retry hits the
3158/// same collision — that must surface as exit `2`, matching the
3159/// `migrations apply` doc-contract ("re-running reports
3160/// `VersionAlreadyApplied` (exit 2)") and the `repair` family's
3161/// [`repair_error_exit_code`] convention.
3162/// `RunnerError` is `#[non_exhaustive]`, so the wildcard arm is
3163/// load-bearing: any variant NOT named below defaults to exit `1`
3164/// (transient — a retry may succeed). That is the safe default for the
3165/// I/O- and connection-shaped variants the baseline path can hit
3166/// (projection failure, ledger bootstrap / write / query failure,
3167/// snapshot persist failure, pinned-session checkout failure,
3168/// advisory-lock contention). Only the genuine refusals are pulled out
3169/// into the exit-`2` arm.
3170fn baseline_error_exit_code(err: &RunnerError) -> i32 {
3171    match err {
3172        // ── Exit 2: refusals — the operator must intervene; a blind
3173        // retry hits the same condition.
3174        // - A duplicate version (terminal or non-terminal) means the
3175        // chosen baseline version is already taken; pick another.
3176        // - A caller-supplied snapshot is a programming error in the
3177        // wiring (the CLI always passes `snapshot: None`), surfaced
3178        // as a structural refusal rather than a retryable fault.
3179        // - An out-of-order rejection is a policy refusal identical to
3180        // the apply path's.
3181        // - AdvisoryUnlockReturnedFalse is a session-pinning correctness
3182        // failure (PG returned false for pg_advisory_unlock); it is not
3183        // transient — matches the repair family's exit-2 treatment.
3184        // - SnapshotPersistFailed in the baseline path is a post-ledger
3185        // failure: baseline_inner inserts the terminal ledger row BEFORE
3186        // writing the snapshot. A retry with the same version therefore
3187        // hits VersionAlreadyApplied (exit 2) before it can write the
3188        // snapshot. Exit 1 (retryable) would give false hope; exit 2
3189        // signals that operator intervention is needed (run
3190        // `repair snapshot-rebuild` or choose a new version).
3191        RunnerError::VersionAlreadyApplied { .. }
3192        | RunnerError::VersionCollisionNonTerminal { .. }
3193        | RunnerError::BaselineSnapshotShouldNotBeProvided
3194        | RunnerError::AdvisoryUnlockReturnedFalse { .. }
3195        | RunnerError::SnapshotPersistFailed { .. }
3196        | RunnerError::OutOfOrderRejected { .. } => 2,
3197        // ── Exit 1: everything else (transient I/O / connection / SQL /
3198        // projection failures). `#[non_exhaustive]` makes this wildcard
3199        // mandatory; new transient-shaped variants inherit the retryable
3200        // default.
3201        _ => 1,
3202    }
3203}
3204
3205#[cfg(test)]
3206mod tests {
3207    use super::*;
3208    use std::fs;
3209    use std::sync::atomic::{AtomicUsize, Ordering};
3210
3211    struct DatabaseUrlEnvGuard {
3212        _lock: std::sync::MutexGuard<'static, ()>,
3213        prior: Option<String>,
3214    }
3215
3216    impl DatabaseUrlEnvGuard {
3217        fn new() -> Self {
3218            Self {
3219                _lock: crate::test_env_lock(),
3220                prior: std::env::var("DATABASE_URL").ok(),
3221            }
3222        }
3223
3224        fn set(&self, value: &str) {
3225            unsafe { std::env::set_var("DATABASE_URL", value) };
3226        }
3227
3228        fn remove(&self) {
3229            unsafe { std::env::remove_var("DATABASE_URL") };
3230        }
3231    }
3232
3233    impl Drop for DatabaseUrlEnvGuard {
3234        fn drop(&mut self) {
3235            match &self.prior {
3236                Some(value) => unsafe { std::env::set_var("DATABASE_URL", value) },
3237                None => unsafe { std::env::remove_var("DATABASE_URL") },
3238            }
3239        }
3240    }
3241
3242    fn temp_workspace(tag: &str) -> std::path::PathBuf {
3243        static COUNTER: AtomicUsize = AtomicUsize::new(0);
3244        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3245        let nanos = std::time::SystemTime::now()
3246            .duration_since(std::time::UNIX_EPOCH)
3247            .unwrap()
3248            .as_nanos();
3249        let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
3250        fs::create_dir_all(&p).unwrap();
3251        p
3252    }
3253
3254    fn write_unreachable_config(work: &std::path::Path) {
3255        let toml = "[database]\nurl = \"postgres://localhost:1/djogi_unreachable\"\n\
3256                    max_connections = 1\ndev_mode = false\n\
3257                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3258        fs::write(work.join("Djogi.toml"), toml).unwrap();
3259    }
3260
3261    fn without_database_url<T>(f: impl FnOnce() -> T) -> T {
3262        let env_guard = DatabaseUrlEnvGuard::new();
3263        env_guard.remove();
3264        f()
3265    }
3266
3267    #[test]
3268    fn database_url_env_guard_restores_prior_value() {
3269        let env_guard = DatabaseUrlEnvGuard::new();
3270        let expected = env_guard.prior.clone();
3271        let next = if expected.as_deref() == Some("postgres://from-env/test") {
3272            "postgres://temporary/test"
3273        } else {
3274            "postgres://from-env/test"
3275        };
3276        env_guard.set(next);
3277        drop(env_guard);
3278        assert_eq!(std::env::var("DATABASE_URL").ok(), expected);
3279    }
3280
3281    fn current_production_phase_zero_sql(tag: &str) -> String {
3282        let work = temp_workspace(tag);
3283        let lock_path = work.join(LOCK_FILE_NAME);
3284        let guard = acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT).expect("lock");
3285        let models: std::collections::BTreeMap<
3286            djogi::migrate::BucketKey,
3287            djogi::migrate::AppliedSchema,
3288        > = std::collections::BTreeMap::new();
3289        let apps = vec![AppLifecycle {
3290            label: "billing".to_string(),
3291            database: "main".to_string(),
3292            renamed_from: None,
3293            tombstone: false,
3294        }];
3295        let emitted = djogi::migrate::ensure_phase_zero_emitted(
3296            &work,
3297            &models,
3298            &apps,
3299            time::OffsetDateTime::now_utc(),
3300            &guard,
3301        )
3302        .expect("auto-emit Phase 0");
3303        let sql = fs::read_to_string(&emitted[0].up_sql_path).expect("read emitted Phase 0");
3304        drop(guard);
3305        let _ = fs::remove_dir_all(&work);
3306        sql
3307    }
3308
3309    fn markerless_seed_phase_zero_sql(tag: &str) -> String {
3310        let mut sql = current_production_phase_zero_sql(tag);
3311        sql.push_str("\nINSERT INTO heer.heer_nodes (id) VALUES (1);\n");
3312        sql
3313    }
3314
3315    fn phase_zero_with_seed_statement(tag: &str, statement: &str) -> String {
3316        let mut sql = current_production_phase_zero_sql(tag);
3317        sql.push('\n');
3318        sql.push_str(statement);
3319        sql.push('\n');
3320        sql
3321    }
3322
3323    fn extended_seed_statement_cases() -> [(&'static str, &'static str); 4] {
3324        [
3325            (
3326                "cte_insert",
3327                "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
3328            ),
3329            (
3330                "cte_delete",
3331                "WITH moved AS (DELETE FROM heer.heer_node_state RETURNING *) SELECT 1;",
3332            ),
3333            (
3334                "merge",
3335                "MERGE INTO heer.heer_nodes AS target USING incoming ON false WHEN NOT MATCHED THEN INSERT (id) VALUES (1);",
3336            ),
3337            (
3338                "copy_from",
3339                "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
3340            ),
3341        ]
3342    }
3343
3344    fn generated_stale_phase_zero_sql(tag: &str) -> String {
3345        let mut sql = current_production_phase_zero_sql(tag);
3346        sql.push_str(
3347            "\nALTER DATABASE \"mydb\" SET heer.node_id = '1';\n\
3348             ALTER DATABASE \"mydb\" SET heer.ranj_node_id = '1';\n\
3349             SET heer.node_id = '1';\n\
3350             SET heer.ranj_node_id = '1';\n",
3351        );
3352        sql
3353    }
3354
3355    fn seed_capable_phase_zero_sql() -> String {
3356        djogi::testing::phase_zero_sql_for_testing("main", true)
3357            .expect("compose seed-capable Phase 0")
3358    }
3359
3360    fn write_pending_json(path: &Path, database: &str, app: &str, version: &str) {
3361        let pending = PendingPlan {
3362            format_version: "1".to_string(),
3363            bucket_database: database.to_string(),
3364            bucket_app: app.to_string(),
3365            version: version.to_string(),
3366            slug: "test".to_string(),
3367            model_snapshot: djogi::migrate::AppliedSchema {
3368                djogi_version: "0.1.0".to_string(),
3369                enums: std::collections::BTreeMap::new(),
3370                format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3371                generated_at: "2026-06-06T00:00:00Z".to_string(),
3372                indexes: Vec::new(),
3373                models: std::collections::BTreeMap::new(),
3374                registered_apps: vec![app.to_string()],
3375            },
3376            checksum_up: "V1:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
3377                .to_string(),
3378            checksum_down: None,
3379            composed_at: "2026-06-06T00:00:00Z".to_string(),
3380        };
3381        if let Some(parent) = path.parent() {
3382            fs::create_dir_all(parent).unwrap();
3383        }
3384        fs::write(path, serde_json::to_vec_pretty(&pending).unwrap()).unwrap();
3385    }
3386
3387    /// The CLI's bucket-discovery walk must include directories that exist
3388    /// on disk but are absent from the current model inventory (the
3389    /// renamed-from case).
3390    #[test]
3391    fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
3392        let work = temp_workspace("b1_discover");
3393        // Lay down a `migrations/main/billing/schema_snapshot.json`
3394        // the OLD app's snapshot. The current model inventory
3395        // would NOT have this bucket because the app moved to
3396        // `invoicing` via `#[app(renamed_from = "billing")]`.
3397        let billing_dir = work.join("migrations/main/billing");
3398        fs::create_dir_all(&billing_dir).unwrap();
3399        fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
3400        // A second bucket — the global one for the same database
3401        // exists too. Exercise the multi-bucket walk.
3402        let global_dir = work.join("migrations/main/_global_");
3403        fs::create_dir_all(&global_dir).unwrap();
3404        fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
3405        // A third on-disk directory WITHOUT a snapshot file — must
3406        // not be reported (we only union buckets that actually
3407        // shipped a snapshot).
3408        let no_snap_dir = work.join("migrations/main/empty_app");
3409        fs::create_dir_all(&no_snap_dir).unwrap();
3410
3411        let buckets = discover_snapshot_buckets_on_disk(&work);
3412        let labels: std::collections::BTreeSet<&str> =
3413            buckets.iter().map(|b| b.app.as_str()).collect();
3414        assert!(
3415            labels.contains("billing"),
3416            "must include the renamed-from bucket: {labels:?}"
3417        );
3418        assert!(
3419            labels.contains(""),
3420            "must include the global bucket: {labels:?}"
3421        );
3422        assert!(
3423            !labels.contains("empty_app"),
3424            "must not include directories without a snapshot: {labels:?}"
3425        );
3426        let _ = fs::remove_dir_all(&work);
3427    }
3428
3429    /// The resolved workspace flows into config loading.
3430    /// `load_from_workspace` must read `<workspace>/Djogi.toml` not
3431    /// the cwd's. We assert that by writing a custom config with a
3432    /// distinctive `database.url` and confirming the loader sees it.
3433    #[test]
3434    fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
3435        let work = temp_workspace("a1_workspace_config");
3436        let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
3437                    max_connections = 1\ndev_mode = false\n\
3438                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3439        fs::write(work.join("Djogi.toml"), toml).unwrap();
3440        let env_guard = DatabaseUrlEnvGuard::new();
3441        env_guard.remove();
3442        let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
3443        assert_eq!(
3444            config.database.url,
3445            "postgres://discovered-by-workspace-flag/test"
3446        );
3447        assert_eq!(config.server.port, 1234);
3448        let _ = fs::remove_dir_all(&work);
3449    }
3450
3451    /// Env override precedence: A `DATABASE_URL` in the environment
3452    /// must beat any value in
3453    /// `<workspace>/Djogi.toml`, matching the security contract that
3454    /// secrets only live in env vars.
3455    #[test]
3456    fn a1_round2_env_override_beats_workspace_toml() {
3457        let work = temp_workspace("a1r2_env_override");
3458        let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
3459                    max_connections = 1\ndev_mode = false\n\
3460                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3461        fs::write(work.join("Djogi.toml"), toml).unwrap();
3462        let env_guard = DatabaseUrlEnvGuard::new();
3463        env_guard.set("postgres://from-env/test");
3464        let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
3465        assert_eq!(
3466            config.database.url, "postgres://from-env/test",
3467            "env DATABASE_URL must win over workspace Djogi.toml"
3468        );
3469        let _ = fs::remove_dir_all(&work);
3470    }
3471
3472    #[test]
3473    fn apply_no_pending_is_identity_free_and_skips_pool_connect() {
3474        let work = temp_workspace("apply_no_pending");
3475        write_unreachable_config(&work);
3476
3477        let exit = without_database_url(|| {
3478            let runtime = tokio::runtime::Builder::new_current_thread()
3479                .enable_all()
3480                .build()
3481                .expect("runtime");
3482            runtime.block_on(run_apply(&work, &FakeMode::Real, None, false))
3483        });
3484
3485        assert_eq!(
3486            exit, 0,
3487            "no-pending apply must return before identity resolution or pool checkout"
3488        );
3489        let _ = fs::remove_dir_all(&work);
3490    }
3491
3492    #[test]
3493    fn discover_pending_plans_orders_phase_zero_before_normal_global() {
3494        let work = temp_workspace("discover_pending_phase_zero_first");
3495        write_pending_json(
3496            &djogi::migrate::pending_json_path(
3497                &work,
3498                &BucketKey {
3499                    database: "main".to_string(),
3500                    app: String::new(),
3501                },
3502            ),
3503            "main",
3504            "",
3505            "V20260606010101__later_global",
3506        );
3507        write_pending_json(
3508            &djogi::migrate::phase_zero_pending_json_path(
3509                &work,
3510                "main",
3511                djogi::migrate::PHASE_ZERO_VERSION,
3512            ),
3513            "main",
3514            "",
3515            djogi::migrate::PHASE_ZERO_VERSION,
3516        );
3517
3518        let discovered = discover_pending_plans(&work).expect("discover");
3519        assert_eq!(discovered.len(), 2);
3520        assert_eq!(
3521            discovered[0].plan.version,
3522            djogi::migrate::PHASE_ZERO_VERSION
3523        );
3524        assert!(discovered[0].is_phase_zero);
3525        assert_eq!(discovered[1].plan.version, "V20260606010101__later_global");
3526        let _ = fs::remove_dir_all(&work);
3527    }
3528
3529    #[test]
3530    fn discover_pending_plans_refuses_malformed_pending_json() {
3531        let work = temp_workspace("discover_pending_malformed");
3532        let path = djogi::migrate::pending_json_path(
3533            &work,
3534            &BucketKey {
3535                database: "main".to_string(),
3536                app: String::new(),
3537            },
3538        );
3539        fs::create_dir_all(path.parent().unwrap()).unwrap();
3540        fs::write(&path, b"{ not json").unwrap();
3541
3542        let err = discover_pending_plans(&work).expect_err("malformed pending must refuse");
3543        assert!(err.contains("parse pending JSON"));
3544        let _ = fs::remove_dir_all(&work);
3545    }
3546
3547    #[test]
3548    fn discover_pending_plans_refuses_hidden_phase_zero_database_mismatch() {
3549        let work = temp_workspace("discover_pending_phase_zero_db_mismatch");
3550        write_pending_json(
3551            &djogi::migrate::phase_zero_pending_json_path(
3552                &work,
3553                "main",
3554                djogi::migrate::PHASE_ZERO_VERSION,
3555            ),
3556            "other_db",
3557            "",
3558            djogi::migrate::PHASE_ZERO_VERSION,
3559        );
3560
3561        let err = discover_pending_plans(&work).expect_err("hidden Phase 0 mismatch must refuse");
3562        assert!(
3563            err.contains("expected main from path"),
3564            "unexpected error: {err}"
3565        );
3566        let _ = fs::remove_dir_all(&work);
3567    }
3568
3569    #[test]
3570    fn discover_pending_plans_refuses_normal_global_phase_zero_pending() {
3571        let work = temp_workspace("discover_pending_normal_global_phase_zero");
3572        let path = djogi::migrate::pending_json_path(
3573            &work,
3574            &BucketKey {
3575                database: "main".to_string(),
3576                app: String::new(),
3577            },
3578        );
3579        write_pending_json(&path, "main", "", djogi::migrate::PHASE_ZERO_VERSION);
3580
3581        let err = discover_pending_plans(&work).expect_err("normal-global Phase 0 must refuse");
3582        assert!(
3583            err.contains("Phase 0") && err.contains(".phase_zero"),
3584            "unexpected error: {err}"
3585        );
3586        let _ = fs::remove_dir_all(&work);
3587    }
3588
3589    #[test]
3590    fn discover_pending_plans_refuses_normal_pending_app_mismatch() {
3591        let work = temp_workspace("discover_pending_normal_app_mismatch");
3592        let path = djogi::migrate::pending_json_path(
3593            &work,
3594            &BucketKey {
3595                database: "main".to_string(),
3596                app: "billing".to_string(),
3597            },
3598        );
3599        write_pending_json(&path, "main", "audit", "V20260606010101__mismatch");
3600
3601        let err = discover_pending_plans(&work).expect_err("normal app mismatch must refuse");
3602        assert!(
3603            err.contains("expected billing from path"),
3604            "unexpected error: {err}"
3605        );
3606        let _ = fs::remove_dir_all(&work);
3607    }
3608
3609    #[test]
3610    fn discover_pending_plans_refuses_noncanonical_normal_pending_filename() {
3611        let work = temp_workspace("discover_pending_noncanonical_filename");
3612        let path = work.join("target/djogi_pending/main/bad-name.json");
3613        write_pending_json(&path, "main", "bad-name", "V20260606010101__bad_name");
3614
3615        let err = discover_pending_plans(&work).expect_err("non-canonical filename must refuse");
3616        assert!(
3617            err.contains("non-canonical app filename"),
3618            "unexpected error: {err}"
3619        );
3620        let _ = fs::remove_dir_all(&work);
3621    }
3622
3623    #[test]
3624    fn load_verified_pending_for_apply_refuses_changed_artifact() {
3625        let work = temp_workspace("apply_pending_changed_after_discovery");
3626        let path = djogi::migrate::pending_json_path(
3627            &work,
3628            &BucketKey {
3629                database: "main".to_string(),
3630                app: String::new(),
3631            },
3632        );
3633        write_pending_json(&path, "main", "", "V20260606010101__stable");
3634        let discovered = discover_pending_plans(&work).expect("discover");
3635        fs::write(
3636            &path,
3637            serde_json::to_vec_pretty(&PendingPlan {
3638                version: "V20260606010102__changed".to_string(),
3639                ..discovered[0].plan.clone()
3640            })
3641            .unwrap(),
3642        )
3643        .unwrap();
3644
3645        let err = load_verified_pending_for_apply(&discovered[0])
3646            .expect_err("apply must refuse a changed pending artifact");
3647        assert!(
3648            err.contains("changed after discovery"),
3649            "unexpected error: {err}"
3650        );
3651        let _ = fs::remove_dir_all(&work);
3652    }
3653
3654    #[test]
3655    fn reconcile_pending_plans_after_lock_refuses_added_artifact() {
3656        let work = temp_workspace("apply_pending_added_before_lock");
3657        let path = djogi::migrate::pending_json_path(
3658            &work,
3659            &BucketKey {
3660                database: "main".to_string(),
3661                app: String::new(),
3662            },
3663        );
3664        write_pending_json(&path, "main", "", "V20260606010101__stable");
3665        let discovered = discover_pending_plans(&work).expect("discover");
3666        write_pending_json(
3667            &djogi::migrate::phase_zero_pending_json_path(
3668                &work,
3669                "main",
3670                djogi::migrate::PHASE_ZERO_VERSION,
3671            ),
3672            "main",
3673            "",
3674            djogi::migrate::PHASE_ZERO_VERSION,
3675        );
3676
3677        let err = reconcile_pending_plans_after_lock(&work, &discovered)
3678            .expect_err("locked reconciliation must refuse a changed pending set");
3679        assert!(
3680            err.contains("changed while waiting for the workspace lock"),
3681            "unexpected error: {err}"
3682        );
3683        let _ = fs::remove_dir_all(&work);
3684    }
3685
3686    #[test]
3687    fn reconcile_pending_plans_after_lock_accepts_unchanged_set() {
3688        let work = temp_workspace("apply_pending_stable_under_lock");
3689        let path = djogi::migrate::pending_json_path(
3690            &work,
3691            &BucketKey {
3692                database: "main".to_string(),
3693                app: String::new(),
3694            },
3695        );
3696        write_pending_json(&path, "main", "", "V20260606010101__stable");
3697        let discovered = discover_pending_plans(&work).expect("discover");
3698
3699        let locked = reconcile_pending_plans_after_lock(&work, &discovered)
3700            .expect("unchanged set must reconcile");
3701        assert_eq!(locked, discovered);
3702        let _ = fs::remove_dir_all(&work);
3703    }
3704
3705    #[test]
3706    fn repair_checksum_drift_is_identity_free() {
3707        let work = temp_workspace("repair_checksum_identity_free");
3708        write_unreachable_config(&work);
3709
3710        let exit = without_database_url(|| {
3711            repair_checksum_drift_cmd(
3712                "V20260601000000__repair_checksum",
3713                None,
3714                None,
3715                Some("V1:0000000000000000000000000000000000000000000000000000000000000000"),
3716                None,
3717                Some(work.clone()),
3718            )
3719        });
3720
3721        assert_eq!(
3722            exit,
3723            ExitCode::from(1),
3724            "checksum-drift should reach pool connection without shared identity validation"
3725        );
3726        let _ = fs::remove_dir_all(&work);
3727    }
3728
3729    #[test]
3730    fn repair_partial_apply_is_identity_free() {
3731        let work = temp_workspace("repair_partial_identity_free");
3732        write_unreachable_config(&work);
3733
3734        let exit = without_database_url(|| {
3735            repair_partial_apply_cmd(
3736                "V20260601000000__repair_partial",
3737                PartialApplyResolution::MarkRolledBack,
3738                "operator confirmed rollback",
3739                None,
3740                None,
3741                Some(work.clone()),
3742            )
3743        });
3744
3745        assert_eq!(
3746            exit,
3747            ExitCode::from(1),
3748            "partial-apply should reach pool connection without shared identity validation"
3749        );
3750        let _ = fs::remove_dir_all(&work);
3751    }
3752
3753    #[test]
3754    fn repair_snapshot_rebuild_is_identity_free() {
3755        let work = temp_workspace("repair_snapshot_identity_free");
3756        write_unreachable_config(&work);
3757
3758        let exit = without_database_url(|| {
3759            repair_snapshot_rebuild_cmd(None, None, None, Some(work.clone()))
3760        });
3761
3762        assert_eq!(
3763            exit,
3764            ExitCode::from(1),
3765            "snapshot-rebuild should reach pool connection without shared identity validation"
3766        );
3767        let _ = fs::remove_dir_all(&work);
3768    }
3769
3770    /// `compose_with_inputs` must consume the disk-discovered buckets, not
3771    /// just the inventory's. We set up a
3772    /// `migrations/main/billing/schema_snapshot.json` with a `widgets`
3773    /// table, pass an EMPTY models map (simulating "billing app was
3774    /// removed from the workspace"), set `allow_destructive = true`,
3775    /// and assert the resulting up SQL contains `DROP TABLE
3776    /// "widgets"`. If the disk-walk regressed and `compose_with_inputs`
3777    /// only loaded snapshots for inventory-known buckets, the differ
3778    /// would never see billing's snapshot and the compose would exit
3779    /// `NothingToCompose` (no DROP, no SQL written).
3780    /// End-to-end regression guard.
3781    #[test]
3782    fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
3783        use djogi::migrate::projection::BucketKey;
3784        use djogi::migrate::schema::{
3785            ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
3786        };
3787        use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
3788        use std::collections::BTreeMap;
3789
3790        let work = temp_workspace("b1r2_compose_uses_discovery");
3791
3792        // Build a billing-bucket snapshot with one `widgets` table
3793        // and write it to disk under `migrations/main/billing/`.
3794        let billing_bucket = BucketKey {
3795            database: "main".into(),
3796            app: "billing".into(),
3797        };
3798        let mut billing_snap = AppliedSchema {
3799            djogi_version: env!("CARGO_PKG_VERSION").to_string(),
3800            enums: BTreeMap::new(),
3801            format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
3802            generated_at: "2026-04-25T00:00:00Z".to_string(),
3803            indexes: Vec::new(),
3804            models: BTreeMap::new(),
3805            registered_apps: vec!["billing".to_string()],
3806        };
3807        billing_snap.models.insert(
3808            "widgets".to_string(),
3809            TableSchema {
3810                app: Some("billing".to_string()),
3811                columns: vec![ColumnSchema {
3812                    check: None,
3813                    comment: None,
3814                    default_sql: Some("heerid_next_desc()".to_string()),
3815                    foreign_key: None,
3816                    generated: None,
3817                    identity: None,
3818                    index_type: None,
3819                    indexed: false,
3820                    max_length: None,
3821                    name: "id".to_string(),
3822                    nullable: false,
3823                    on_delete: None,
3824                    outbox_exclude: false,
3825                    rationale: None,
3826                    relation_kind: None,
3827                    renamed_from: None,
3828                    sequence_within: None,
3829                    sql_type: "BIGINT".to_string(),
3830                    unique: false,
3831                    type_change_using: None,
3832                }],
3833                exclusion_constraints: Vec::new(),
3834                fts: None,
3835                is_through: false,
3836                moved_from_app: None,
3837                partition: None,
3838                primary_key: PrimaryKeySchema {
3839                    columns: vec!["id".to_string()],
3840                    kind: PkKindSchema::HeerIdRecencyBiased,
3841                },
3842                rationale: None,
3843                renamed_from: None,
3844                rls_enabled: false,
3845                table: "widgets".to_string(),
3846                table_comment: None,
3847                storage_params: None,
3848                tablespace: None,
3849                tenant_key: None,
3850            },
3851        );
3852        let snap_path = snapshot_path(&work, &billing_bucket);
3853        save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
3854
3855        // EMPTY models — simulates the billing crate having been
3856        // removed from the workspace. Without the disk-walk this
3857        // bucket would never reach the differ.
3858        let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
3859        let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
3860
3861        let exit = compose_with_inputs(
3862            &work,
3863            "drop billing remnant",
3864            true,  // allow_destructive — billing's snapshot will produce DROP ops
3865            false, // force_overwrite
3866            &empty_models,
3867            &[AppLifecycle {
3868                label: "billing".to_string(),
3869                database: "main".to_string(),
3870                renamed_from: None,
3871                tombstone: true, // intentional removal channel
3872            }],
3873            now,
3874            None, // pk_flip_join_table_option — no flip in this test
3875        );
3876        assert_eq!(exit, ExitCode::from(0), "compose must succeed");
3877
3878        // The composed up SQL must carry DROP TABLE for billing's
3879        // widgets — that is the whole point. Find the file and check.
3880        let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
3881        let mut up_path: Option<PathBuf> = None;
3882        for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
3883            let n = entry.file_name().to_string_lossy().to_string();
3884            // Up file pattern: starts with "V", ends with ".sdjql", does
3885            // NOT contain ".down.".
3886            if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
3887                up_path = Some(entry.path());
3888                break;
3889            }
3890        }
3891        let up_path = up_path.expect("compose must have written an up SQL file");
3892        let up_sql = fs::read_to_string(&up_path).unwrap();
3893        assert!(
3894            up_sql.contains("DROP TABLE \"widgets\""),
3895            "compose must have seen the disk snapshot and emitted DROP TABLE — \
3896             this proves discover_snapshot_buckets_on_disk reached the differ. \
3897             SQL: {up_sql}"
3898        );
3899        let _ = fs::remove_dir_all(&work);
3900    }
3901
3902    /// `status_cmd` invokes its tokio runtime and
3903    /// fails fast on a malformed `Djogi.toml`. We don't need a live
3904    /// Postgres for this assertion — the test is that the workspace
3905    /// path is threaded through the loader and TOML errors surface
3906    /// promptly. (The earlier `a1_load_from_workspace_reads_path_specific_djogi_toml`
3907    /// covers the well-formed case; this is the malformed-input
3908    /// path-threading proof.)
3909    #[test]
3910    fn a1_round2_status_cmd_threads_workspace_to_config() {
3911        let work = temp_workspace("a1r2_status_workspace");
3912        // Write a deliberately malformed TOML so config load fails.
3913        // If the workspace path wasn't threaded, status_cmd would
3914        // try the cwd's Djogi.toml (typically absent) and silently
3915        // fall through to defaults, giving a different error code.
3916        fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
3917        let exit = status_cmd(Some(work.clone()));
3918        assert_eq!(
3919            exit,
3920            ExitCode::from(1),
3921            "malformed workspace Djogi.toml must surface as config load error"
3922        );
3923        let _ = fs::remove_dir_all(&work);
3924    }
3925
3926    // ── AttuneError → exit code matrix ──────────────────────────────
3927
3928    /// Every `AttuneError::Refused(_)` variant must map to exit code `2`
3929    /// per `docs/spec/configuration.md` §14. The pre-fix implementation
3930    /// flattened every error to `1`, so an operator running attune in CI
3931    /// could not distinguish "policy gate refused before any side effect"
3932    /// from "ran half a step and failed mid-flight".
3933    #[test]
3934    fn u3_attune_refusal_variants_map_to_exit_code_two() {
3935        use djogi::migrate::AttuneRefusal;
3936        let cases = [
3937            AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
3938                database_url: "postgres://prod.example.com/main".to_string(),
3939            }),
3940            AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
3941                profile: "production".to_string(),
3942            }),
3943            // Dev_mode and DJOGI_ENV gates both produce `AttuneError::Refused(_)`
3944            // so they share the exit-code-2 mapping.
3945            AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
3946            AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
3947                env_value: "production".to_string(),
3948            }),
3949            AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
3950                version: "V20260101000000__missing".to_string(),
3951            }),
3952            AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
3953                version: "V20260101000000__shared".to_string(),
3954                buckets: vec!["main/users".to_string(), "main/billing".to_string()],
3955            }),
3956        ];
3957        for err in &cases {
3958            assert_eq!(
3959                attune_error_exit_code(err),
3960                2,
3961                "refusal variant must map to exit 2: {err}"
3962            );
3963        }
3964    }
3965
3966    /// Every runtime `AttuneError` variant must map to exit code `1`
3967    /// per `docs/spec/configuration.md` §14. CI may safely retry runtime
3968    /// failures; a refusal (exit `2`) signals "operator must intervene"
3969    /// and retrying without operator action would just refuse again.
3970    #[test]
3971    fn u3_attune_runtime_variants_map_to_exit_code_one() {
3972        let cases = [
3973            AttuneError::FilesystemScanFailed {
3974                source: std::io::Error::other("disk full"),
3975            },
3976            AttuneError::SqlReadFailed {
3977                path: PathBuf::from("/tmp/x.sdjql"),
3978                source: std::io::Error::other("permission denied"),
3979            },
3980            AttuneError::SqlWriteFailed {
3981                path: PathBuf::from("/tmp/x.sdjql"),
3982                source: std::io::Error::other("read-only fs"),
3983            },
3984            AttuneError::SqlDeleteFailed {
3985                path: PathBuf::from("/tmp/x.sdjql"),
3986                source: std::io::Error::other("not found"),
3987            },
3988            AttuneError::GitPublishFailed {
3989                stderr: "fatal: refusing to push".to_string(),
3990                status_code: Some(128),
3991            },
3992        ];
3993        for err in &cases {
3994            assert_eq!(
3995                attune_error_exit_code(err),
3996                1,
3997                "runtime variant must map to exit 1: {err}"
3998            );
3999        }
4000    }
4001
4002    // ── issue #354: baseline exit-code mapping ──────────────────────────
4003
4004    /// The refusal-class `RunnerError` variants the baseline path can
4005    /// `baseline_cmd` validates the `--reason` guard before any DB
4006    /// work. An empty or whitespace-only reason must return exit 2
4007    /// without touching the filesystem or network — the guard fires
4008    /// on the CLI-owned string before the tokio runtime is even built.
4009    #[test]
4010    fn baseline_empty_reason_exits_code_2() {
4011        let result = baseline_cmd(
4012            "V00000000000000__baseline",
4013            "description",
4014            "",
4015            None,
4016            None,
4017            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4018            None,  // node_id
4019            false, // single_node_dev
4020        );
4021        assert_eq!(
4022            result,
4023            ExitCode::from(2),
4024            "empty --reason must exit 2 before any DB work"
4025        );
4026    }
4027
4028    #[test]
4029    fn baseline_whitespace_reason_exits_code_2() {
4030        let result = baseline_cmd(
4031            "V00000000000000__baseline",
4032            "description",
4033            "   ",
4034            None,
4035            None,
4036            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4037            None,  // node_id
4038            false, // single_node_dev
4039        );
4040        assert_eq!(
4041            result,
4042            ExitCode::from(2),
4043            "whitespace-only --reason must exit 2 before any DB work"
4044        );
4045    }
4046
4047    /// surface must map to exit `2` — a blind retry would hit the same
4048    /// condition, so CI must treat them as "operator must intervene"
4049    /// rather than retryable. A duplicate baseline version (terminal or
4050    /// non-terminal), a wiring bug that supplies a snapshot, and an
4051    /// out-of-order rejection are all refusals.
4052    #[test]
4053    fn baseline_refusal_variants_map_to_exit_code_two() {
4054        let cases = [
4055            RunnerError::VersionAlreadyApplied {
4056                version: "V00000000000000__baseline".to_string(),
4057                applied_at: None,
4058            },
4059            RunnerError::VersionCollisionNonTerminal {
4060                version: "V00000000000000__baseline".to_string(),
4061                status: LedgerStatus::Pending,
4062                run_id: 1,
4063            },
4064            RunnerError::BaselineSnapshotShouldNotBeProvided,
4065            RunnerError::AdvisoryUnlockReturnedFalse {
4066                bucket: BucketKey {
4067                    database: "main".to_string(),
4068                    app: String::new(),
4069                },
4070                key: 0x0102_0304_0506_0708,
4071            },
4072            RunnerError::OutOfOrderRejected {
4073                version: "V00000000000000__baseline".to_string(),
4074                conflicting_version: "V20260101000000__later".to_string(),
4075                conflicting_applied_at: None,
4076            },
4077        ];
4078        for err in &cases {
4079            assert_eq!(
4080                baseline_error_exit_code(err),
4081                2,
4082                "baseline refusal variant must map to exit 2: {err}"
4083            );
4084        }
4085    }
4086
4087    /// Transient `RunnerError` variants reachable from the baseline path
4088    /// must map to exit `1` (retryable). The `#[non_exhaustive]`
4089    /// wildcard arm guarantees any unnamed variant also lands on `1`;
4090    /// these representative cases pin the projection / ledger / snapshot
4091    /// failure shapes the baseline runner can actually emit.
4092    #[test]
4093    fn baseline_transient_variants_map_to_exit_code_one() {
4094        use djogi::error::{DbError, DjogiError};
4095        let cases = [
4096            RunnerError::LedgerBootstrapFailed {
4097                source: DjogiError::Db(DbError::other("create table failed")),
4098            },
4099            RunnerError::LedgerWriteFailed {
4100                version: "V00000000000000__baseline".to_string(),
4101                source: DjogiError::Db(DbError::other("insert failed")),
4102            },
4103            RunnerError::PinnedSessionCheckoutFailed {
4104                source: DjogiError::Db(DbError::other("pool exhausted")),
4105            },
4106            RunnerError::AdvisoryLockFailed {
4107                bucket: BucketKey {
4108                    database: "main".to_string(),
4109                    app: String::new(),
4110                },
4111                key: 0x0102_0304_0506_0708,
4112                attempts: 3,
4113            },
4114        ];
4115        for err in &cases {
4116            assert_eq!(
4117                baseline_error_exit_code(err),
4118                1,
4119                "baseline transient variant must map to exit 1: {err}"
4120            );
4121        }
4122    }
4123
4124    // ── REQ-326: --fake / --reason validation tests ─────────────────────
4125
4126    /// REQ-326-5: --fake without --reason must exit with code 2.
4127    #[test]
4128    fn fake_without_reason_exits_code_2() {
4129        let result = apply_cmd(
4130            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4131            true,
4132            None,
4133            None,  // node_id
4134            false, // single_node_dev
4135        );
4136        assert_eq!(
4137            result,
4138            ExitCode::from(2),
4139            "--fake without --reason must exit 2"
4140        );
4141    }
4142
4143    /// REQ-326-5: --fake with blank reason must exit with code 2.
4144    #[test]
4145    fn fake_with_empty_reason_exits_code_2() {
4146        let result = apply_cmd(
4147            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4148            true,
4149            Some(String::new()),
4150            None,  // node_id
4151            false, // single_node_dev
4152        );
4153        assert_eq!(
4154            result,
4155            ExitCode::from(2),
4156            "--fake with empty reason must exit 2"
4157        );
4158    }
4159
4160    /// REQ-326-5: --fake with whitespace-only reason must exit with code 2.
4161    #[test]
4162    fn fake_with_whitespace_reason_exits_code_2() {
4163        let result = apply_cmd(
4164            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4165            true,
4166            Some("   ".to_string()),
4167            None,  // node_id
4168            false, // single_node_dev
4169        );
4170        assert_eq!(
4171            result,
4172            ExitCode::from(2),
4173            "--fake with whitespace reason must exit 2"
4174        );
4175    }
4176
4177    /// --reason without --fake is accepted (silently ignored).
4178    #[test]
4179    fn reason_without_fake_is_accepted() {
4180        // This should NOT exit 2; it will proceed to config load which
4181        // may fail on nonexistent workspace, but the --reason flag itself
4182        // is accepted. We verify the function does not early-exit with code 2.
4183        let result = apply_cmd(
4184            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4185            false, // NOT fake
4186            Some("test reason".to_string()),
4187            None, // node_id — identity resolution is tested separately;
4188            true, // single_node_dev — provide explicit dev mode to bypass resolver
4189        );
4190        // Should be 1 (config error) not 2 (refusal)
4191        assert_ne!(
4192            result,
4193            ExitCode::from(2),
4194            "--reason without --fake should not refuse"
4195        );
4196    }
4197
4198    // ── render_verify_report ─────────────────────────────────────
4199    // `render_verify_report` returns `Vec<String>` so the rendering is
4200    // assertable without capturing stdout. Each test pins the exact lines
4201    // the operator sees for one report shape.
4202
4203    /// Build a bucket for render tests.
4204    fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
4205        djogi::migrate::BucketKey {
4206            database: database.to_string(),
4207            app: app.to_string(),
4208        }
4209    }
4210
4211    /// Construct a [`VerifyDiagnostic`] tersely for render tests.
4212    fn diag(
4213        code: &str,
4214        severity: djogi::migrate::VerifySeverity,
4215        message: &str,
4216        location: Option<&str>,
4217    ) -> djogi::migrate::VerifyDiagnostic {
4218        djogi::migrate::VerifyDiagnostic {
4219            code: code.to_string(),
4220            severity,
4221            message: message.to_string(),
4222            location: location.map(str::to_string),
4223        }
4224    }
4225
4226    #[test]
4227    fn render_verify_report_clean_output() {
4228        use djogi::migrate::VerifyReport;
4229
4230        let report = VerifyReport {
4231            diagnostics: vec![],
4232            latest_applied_version: Some("001_initial".to_string()),
4233            applied_count: 3,
4234            unfinished_count: 0,
4235        };
4236        let bucket = render_bucket("main", "");
4237
4238        let lines = render_verify_report(&report, &bucket);
4239
4240        assert!(
4241            lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
4242            "missing ledger line; got {lines:?}"
4243        );
4244        assert!(
4245            lines.contains(&"No drift detected. Schema is consistent.".to_string()),
4246            "missing clean line; got {lines:?}"
4247        );
4248        assert!(
4249            lines.iter().any(|l| l.contains("Result: PASSED")),
4250            "missing PASSED result; got {lines:?}"
4251        );
4252        assert!(
4253            !lines.iter().any(|l| l.contains("FAILED")),
4254            "clean report must not say FAILED; got {lines:?}"
4255        );
4256    }
4257
4258    #[test]
4259    fn render_verify_report_with_errors() {
4260        use djogi::migrate::{VerifyReport, VerifySeverity};
4261
4262        // Diagnostics are pre-sorted by `(code, location)` exactly as the
4263        // library returns them — render does not re-sort.
4264        let report = VerifyReport {
4265            diagnostics: vec![
4266                diag(
4267                    "D601",
4268                    VerifySeverity::Error,
4269                    "Snapshot table missing from live DB",
4270                    Some("users"),
4271                ),
4272                diag(
4273                    "D611",
4274                    VerifySeverity::Warning,
4275                    "Live index not present in snapshot",
4276                    Some("idx_posts_created"),
4277                ),
4278            ],
4279            latest_applied_version: Some("V20260501000000__add_users".to_string()),
4280            applied_count: 2,
4281            unfinished_count: 0,
4282        };
4283        let bucket = render_bucket("main", "myapp");
4284
4285        assert!(report.has_errors());
4286        let lines = render_verify_report(&report, &bucket);
4287
4288        assert!(
4289            lines
4290                .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
4291            "missing D601 line; got {lines:?}"
4292        );
4293        assert!(
4294            lines.contains(
4295                &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
4296            ),
4297            "missing D611 line; got {lines:?}"
4298        );
4299        assert!(
4300            lines.iter().any(|l| l.contains("Result: FAILED")),
4301            "error report must say FAILED; got {lines:?}"
4302        );
4303    }
4304
4305    #[test]
4306    fn render_verify_report_header_shows_global_and_named_app() {
4307        use djogi::migrate::VerifyReport;
4308
4309        let report = VerifyReport {
4310            diagnostics: vec![],
4311            latest_applied_version: None,
4312            applied_count: 0,
4313            unfinished_count: 0,
4314        };
4315
4316        // Empty app label → `_global_` in the header.
4317        let global = render_verify_report(&report, &render_bucket("main", ""));
4318        assert_eq!(
4319            global.first().map(String::as_str),
4320            Some("djogi migrations verify — main/_global_"),
4321            "global bucket header; got {global:?}"
4322        );
4323
4324        // Named app → the label verbatim in the header.
4325        let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
4326        assert_eq!(
4327            named.first().map(String::as_str),
4328            Some("djogi migrations verify — crud_log/billing"),
4329            "named bucket header; got {named:?}"
4330        );
4331    }
4332
4333    #[test]
4334    fn render_verify_report_warning_only_passes_with_warnings() {
4335        use djogi::migrate::{VerifyReport, VerifySeverity};
4336
4337        let report = VerifyReport {
4338            diagnostics: vec![diag(
4339                "D606",
4340                VerifySeverity::Warning,
4341                "type differs (advisory)",
4342                Some("users.age"),
4343            )],
4344            latest_applied_version: Some("001_initial".to_string()),
4345            applied_count: 1,
4346            unfinished_count: 0,
4347        };
4348        let lines = render_verify_report(&report, &render_bucket("main", ""));
4349
4350        assert!(
4351            lines
4352                .iter()
4353                .any(|l| l.contains("Result: PASSED with warnings")),
4354            "warning-only must PASS with warnings; got {lines:?}"
4355        );
4356        assert!(
4357            !lines.iter().any(|l| l.contains("FAILED")),
4358            "warning-only must not say FAILED; got {lines:?}"
4359        );
4360    }
4361
4362    #[test]
4363    fn render_verify_report_empty_ledger_line() {
4364        use djogi::migrate::VerifyReport;
4365
4366        let report = VerifyReport {
4367            diagnostics: vec![],
4368            latest_applied_version: None,
4369            applied_count: 0,
4370            unfinished_count: 0,
4371        };
4372        let lines = render_verify_report(&report, &render_bucket("main", ""));
4373
4374        assert!(
4375            lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
4376            "empty ledger line; got {lines:?}"
4377        );
4378    }
4379
4380    #[test]
4381    fn render_verify_report_unfinished_ledger_line() {
4382        use djogi::migrate::VerifyReport;
4383
4384        let report = VerifyReport {
4385            diagnostics: vec![],
4386            latest_applied_version: Some("V20260501000000__add_users".to_string()),
4387            applied_count: 2,
4388            unfinished_count: 1,
4389        };
4390        let lines = render_verify_report(&report, &render_bucket("main", ""));
4391
4392        assert!(
4393            lines.contains(
4394                &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
4395            ),
4396            "unfinished ledger line; got {lines:?}"
4397        );
4398    }
4399
4400    #[test]
4401    fn render_verify_report_info_with_no_location_uses_dash() {
4402        use djogi::migrate::{VerifyReport, VerifySeverity};
4403
4404        // An Info diagnostic with `location: None` exercises the
4405        // `unwrap_or("-")` path, and the all-info summary line.
4406        let report = VerifyReport {
4407            diagnostics: vec![diag(
4408                "D692",
4409                VerifySeverity::Info,
4410                "enum type(s) declared; not yet checked",
4411                None,
4412            )],
4413            latest_applied_version: Some("001_initial".to_string()),
4414            applied_count: 1,
4415            unfinished_count: 0,
4416        };
4417        let lines = render_verify_report(&report, &render_bucket("main", ""));
4418
4419        assert!(
4420            lines.iter().any(|l| l.contains("(-)")),
4421            "location: None must render as (-); got {lines:?}"
4422        );
4423        assert!(
4424            lines.contains(&"Result: PASSED (1 info(s))".to_string()),
4425            "all-info summary; got {lines:?}"
4426        );
4427    }
4428
4429    // ── resolve_bucket_url (Class A) ─────────────────────────────────────
4430
4431    fn db_config(
4432        url: &str,
4433        crud_log_url: Option<&str>,
4434        event_log_url: Option<&str>,
4435    ) -> djogi::config::DatabaseConfig {
4436        djogi::config::DatabaseConfig {
4437            url: url.to_string(),
4438            crud_log_url: crud_log_url.map(str::to_string),
4439            event_log_url: event_log_url.map(str::to_string),
4440            max_connections: None,
4441            dev_mode: false,
4442        }
4443    }
4444
4445    #[test]
4446    fn resolve_bucket_url_main_uses_app_url_verbatim() {
4447        // "main" must use the app URL verbatim even when the path
4448        // component is not literally "main" — deriving would target a
4449        // database that does not exist.
4450        let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
4451        assert_eq!(
4452            resolve_bucket_url(&cfg, "main").as_deref(),
4453            Some("postgres://user:pass@localhost:5432/myapp_prod"),
4454            "main must return the app URL unchanged"
4455        );
4456    }
4457
4458    #[test]
4459    fn resolve_bucket_url_crud_log_prefers_explicit_url() {
4460        let cfg = db_config(
4461            "postgres://localhost/main",
4462            Some("postgres://localhost/explicit_crud"),
4463            None,
4464        );
4465        assert_eq!(
4466            resolve_bucket_url(&cfg, "crud_log").as_deref(),
4467            Some("postgres://localhost/explicit_crud"),
4468            "crud_log must prefer the explicit crud_log_url"
4469        );
4470    }
4471
4472    #[test]
4473    fn resolve_bucket_url_event_log_prefers_explicit_url() {
4474        let cfg = db_config(
4475            "postgres://localhost/main",
4476            None,
4477            Some("postgres://localhost/explicit_event"),
4478        );
4479        assert_eq!(
4480            resolve_bucket_url(&cfg, "event_log").as_deref(),
4481            Some("postgres://localhost/explicit_event"),
4482            "event_log must prefer the explicit event_log_url"
4483        );
4484    }
4485
4486    #[test]
4487    fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
4488        // An empty explicit URL is treated as absent — derive from the app
4489        // URL's path component instead.
4490        let cfg = db_config("postgres://localhost/main", Some(""), Some("   "));
4491        // crud_log: empty string → derive.
4492        assert_eq!(
4493            resolve_bucket_url(&cfg, "crud_log").as_deref(),
4494            Some("postgres://localhost/crud_log"),
4495            "empty crud_log_url must fall back to derived"
4496        );
4497        // event_log: whitespace is NOT empty, so it is used verbatim — the
4498        // emptiness check is a strict `is_empty`, matching the spec.
4499        assert_eq!(
4500            resolve_bucket_url(&cfg, "event_log").as_deref(),
4501            Some("   "),
4502            "non-empty (whitespace) event_log_url is used verbatim"
4503        );
4504    }
4505
4506    #[test]
4507    fn resolve_bucket_url_other_database_derives_from_app_url() {
4508        let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
4509        assert_eq!(
4510            resolve_bucket_url(&cfg, "analytics").as_deref(),
4511            Some("postgres://user:pass@localhost:5432/analytics"),
4512            "an arbitrary database name derives by path splice"
4513        );
4514    }
4515
4516    #[test]
4517    fn resolve_bucket_url_pathless_url_returns_none() {
4518        // A URL with no recognisable path component cannot be derived.
4519        let cfg = db_config("postgres://localhost", None, None);
4520        assert_eq!(
4521            resolve_bucket_url(&cfg, "crud_log"),
4522            None,
4523            "pathless URL must yield None for a derived database"
4524        );
4525    }
4526
4527    #[test]
4528    fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
4529        // "main" short-circuits before derivation, so even a pathless URL
4530        // returns it verbatim — the app pool is the operator's to define.
4531        let cfg = db_config("postgres://localhost", None, None);
4532        assert_eq!(
4533            resolve_bucket_url(&cfg, "main").as_deref(),
4534            Some("postgres://localhost"),
4535            "main returns the app URL verbatim regardless of path"
4536        );
4537    }
4538
4539    #[test]
4540    fn resolve_apply_target_urls_uses_pending_bucket_databases() {
4541        let work = temp_workspace("apply_target_urls");
4542        write_pending_json(
4543            &djogi::migrate::pending_json_path(
4544                &work,
4545                &BucketKey {
4546                    database: "main".to_string(),
4547                    app: String::new(),
4548                },
4549            ),
4550            "main",
4551            "",
4552            "V20260607010101__main_global",
4553        );
4554        write_pending_json(
4555            &djogi::migrate::pending_json_path(
4556                &work,
4557                &BucketKey {
4558                    database: "crud_log".to_string(),
4559                    app: "audit".to_string(),
4560                },
4561            ),
4562            "crud_log",
4563            "audit",
4564            "V20260607010102__crud_log_audit",
4565        );
4566
4567        let discovered = discover_pending_plans(&work).expect("discover");
4568        let cfg = db_config(
4569            "postgres://user:pass@localhost:5432/myapp_prod",
4570            Some("postgres://user:pass@localhost:5432/myapp_crud"),
4571            None,
4572        );
4573
4574        let urls = resolve_apply_target_urls(&discovered, &cfg).expect("resolve");
4575        assert_eq!(
4576            urls.len(),
4577            2,
4578            "apply must preserve distinct target databases"
4579        );
4580        assert_eq!(
4581            urls.get("main").map(String::as_str),
4582            Some("postgres://user:pass@localhost:5432/myapp_prod"),
4583            "main pending plans must keep the app database URL"
4584        );
4585        assert_eq!(
4586            urls.get("crud_log").map(String::as_str),
4587            Some("postgres://user:pass@localhost:5432/myapp_crud"),
4588            "crud_log pending plans must route through the crud_log database URL"
4589        );
4590        let _ = fs::remove_dir_all(&work);
4591    }
4592
4593    #[test]
4594    fn resolve_apply_target_urls_refuses_unresolvable_pending_database() {
4595        let work = temp_workspace("apply_target_urls_unresolvable");
4596        write_pending_json(
4597            &djogi::migrate::pending_json_path(
4598                &work,
4599                &BucketKey {
4600                    database: "analytics".to_string(),
4601                    app: String::new(),
4602                },
4603            ),
4604            "analytics",
4605            "",
4606            "V20260607010103__analytics_global",
4607        );
4608
4609        let discovered = discover_pending_plans(&work).expect("discover");
4610        let cfg = db_config("postgres://localhost", None, None);
4611        let err = resolve_apply_target_urls(&discovered, &cfg)
4612            .expect_err("pathless app URL must refuse a derived pending database");
4613        assert!(err.contains("analytics"), "unexpected error: {err}");
4614        let _ = fs::remove_dir_all(&work);
4615    }
4616
4617    // ── Stage 4D: CLI cleanup identity-free Phase 0 guard ─────────────
4618
4619    #[test]
4620    fn classify_phase_zero_bytes_identity_free_production_is_ok() {
4621        let sql = current_production_phase_zero_sql("current_bytes");
4622        assert!(
4623            classify_phase_zero_bytes(sql.as_bytes()).is_none(),
4624            "production Phase 0 should be identity-free replay-current (no refusal)"
4625        );
4626    }
4627
4628    #[test]
4629    fn classify_phase_zero_bytes_seed_capable_is_refused() {
4630        let sql = seed_capable_phase_zero_sql();
4631        let refusal = classify_phase_zero_bytes(sql.as_bytes());
4632        assert!(
4633            refusal.is_some(),
4634            "seed-capable Phase 0 should be refused by cleanup guard"
4635        );
4636        assert!(refusal.unwrap().contains("seed-capable"));
4637    }
4638
4639    #[test]
4640    fn classify_phase_zero_bytes_generated_stale_is_refused() {
4641        let sql = generated_stale_phase_zero_sql("stale_bytes");
4642        let refusal = classify_phase_zero_bytes(sql.as_bytes());
4643        assert!(
4644            refusal.is_some(),
4645            "generated-stale Phase 0 should be refused"
4646        );
4647        assert!(refusal.unwrap().contains("generated-stale"));
4648    }
4649
4650    #[test]
4651    fn classify_phase_zero_bytes_markerless_seed_is_refused() {
4652        let sql = markerless_seed_phase_zero_sql("markerless_seed_bytes");
4653        let refusal = classify_phase_zero_bytes(sql.as_bytes());
4654        assert!(
4655            refusal.is_some(),
4656            "markerless seed Phase 0 should be refused by cleanup guard"
4657        );
4658        assert!(refusal.unwrap().contains("seed-dml"));
4659    }
4660
4661    #[test]
4662    fn classify_phase_zero_bytes_extended_seed_dml_forms_are_refused() {
4663        for (name, statement) in extended_seed_statement_cases() {
4664            let sql =
4665                phase_zero_with_seed_statement(&format!("extended_seed_bytes_{name}"), statement);
4666            let refusal = classify_phase_zero_bytes(sql.as_bytes());
4667            let msg = refusal.expect("extended seed Phase 0 should be refused");
4668            assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
4669        }
4670    }
4671
4672    #[test]
4673    fn classify_phase_zero_bytes_ambiguous_is_refused() {
4674        // Hand-edited or ambiguous Phase 0.
4675        let sql = "CREATE SCHEMA IF NOT EXISTS heer;\n\
4676                   ALTER DATABASE \"mydb\" SET heer.node_id = '1';\n";
4677        let refusal = classify_phase_zero_bytes(sql.as_bytes());
4678        assert!(refusal.is_some(), "ambiguous Phase 0 should be refused");
4679        assert!(refusal.unwrap().contains("ambiguous"));
4680    }
4681
4682    #[test]
4683    fn classify_phase_zero_bytes_missing_is_refused() {
4684        let refusal = classify_phase_zero_bytes(b"  \n\t  ");
4685        assert!(refusal.is_some(), "missing Phase 0 should be refused");
4686        assert!(refusal.unwrap().contains("missing"));
4687    }
4688
4689    #[test]
4690    fn classify_phase_zero_for_cleanup_refuses_stale_replay_plan() {
4691        let work = temp_workspace("stale_cleanup");
4692        let bucket_dir = work.join("migrations/main/_global_");
4693        fs::create_dir_all(&bucket_dir).unwrap();
4694
4695        // Write a stale replay plan JSON.
4696        let replay = CliReplayPlan {
4697            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4698            classification: CliClassification::Additive,
4699            checksum_up: "V1:aabbccdd".to_string(),
4700            checksum_down: None,
4701            segments: vec![CliReplaySegment {
4702                kind: CliSegmentKind::Transactional,
4703                statements: vec![CliReplayStatement {
4704                    label: "phase_zero_bootstrap".to_string(),
4705                    up: generated_stale_phase_zero_sql("stale_replay"),
4706                }],
4707            }],
4708        };
4709        fs::write(
4710            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4711            serde_json::to_string(&replay).unwrap(),
4712        )
4713        .unwrap();
4714
4715        let bucket = djogi::migrate::BucketKey {
4716            database: "main".to_string(),
4717            app: String::new(),
4718        };
4719        let refusal = classify_phase_zero_for_cleanup(
4720            &work,
4721            &bucket,
4722            djogi::migrate::PHASE_ZERO_VERSION,
4723            "V1:aabbccdd",
4724            None,
4725        );
4726        assert!(
4727            refusal.is_some(),
4728            "stale Phase 0 replay plan should be refused by cleanup guard"
4729        );
4730        let msg = refusal.unwrap();
4731        assert!(msg.contains("generated-stale"), "refusal reason: {msg}");
4732
4733        let _ = fs::remove_dir_all(&work);
4734    }
4735
4736    #[test]
4737    fn classify_phase_zero_for_cleanup_allows_current_replay_plan() {
4738        let work = temp_workspace("current_cleanup");
4739        let bucket_dir = work.join("migrations/main/_global_");
4740        fs::create_dir_all(&bucket_dir).unwrap();
4741
4742        // Write a current (production) replay plan JSON.
4743        let replay = CliReplayPlan {
4744            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4745            classification: CliClassification::Additive,
4746            checksum_up: "V1:eeff0011".to_string(),
4747            checksum_down: None,
4748            segments: vec![CliReplaySegment {
4749                kind: CliSegmentKind::Transactional,
4750                statements: vec![CliReplayStatement {
4751                    label: "phase_zero_bootstrap".to_string(),
4752                    up: current_production_phase_zero_sql("current_replay"),
4753                }],
4754            }],
4755        };
4756        fs::write(
4757            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4758            serde_json::to_string(&replay).unwrap(),
4759        )
4760        .unwrap();
4761
4762        let bucket = djogi::migrate::BucketKey {
4763            database: "main".to_string(),
4764            app: String::new(),
4765        };
4766        let refusal = classify_phase_zero_for_cleanup(
4767            &work,
4768            &bucket,
4769            djogi::migrate::PHASE_ZERO_VERSION,
4770            "V1:eeff0011",
4771            None,
4772        );
4773        assert!(
4774            refusal.is_none(),
4775            "identity-free Phase 0 should be allowed by cleanup guard; got: {refusal:?}"
4776        );
4777
4778        let _ = fs::remove_dir_all(&work);
4779    }
4780
4781    #[test]
4782    fn classify_phase_zero_for_cleanup_refuses_seed_capable_replay_plan() {
4783        let work = temp_workspace("seed_cleanup_replay_plan");
4784        let bucket_dir = work.join("migrations/main/_global_");
4785        fs::create_dir_all(&bucket_dir).unwrap();
4786
4787        let replay = CliReplayPlan {
4788            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4789            classification: CliClassification::Additive,
4790            checksum_up: "V1:11223344".to_string(),
4791            checksum_down: None,
4792            segments: vec![CliReplaySegment {
4793                kind: CliSegmentKind::Transactional,
4794                statements: vec![CliReplayStatement {
4795                    label: "phase_zero_bootstrap".to_string(),
4796                    up: seed_capable_phase_zero_sql(),
4797                }],
4798            }],
4799        };
4800        fs::write(
4801            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4802            serde_json::to_string(&replay).unwrap(),
4803        )
4804        .unwrap();
4805
4806        let bucket = djogi::migrate::BucketKey {
4807            database: "main".to_string(),
4808            app: String::new(),
4809        };
4810        let refusal = classify_phase_zero_for_cleanup(
4811            &work,
4812            &bucket,
4813            djogi::migrate::PHASE_ZERO_VERSION,
4814            "V1:11223344",
4815            None,
4816        );
4817        let msg = refusal.expect("seed-capable replay plan must refuse");
4818        assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
4819
4820        let _ = fs::remove_dir_all(&work);
4821    }
4822
4823    #[test]
4824    fn classify_phase_zero_for_cleanup_refuses_markerless_seed_replay_plan() {
4825        let work = temp_workspace("markerless_seed_cleanup_replay_plan");
4826        let bucket_dir = work.join("migrations/main/_global_");
4827        fs::create_dir_all(&bucket_dir).unwrap();
4828
4829        let replay = CliReplayPlan {
4830            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4831            classification: CliClassification::Additive,
4832            checksum_up: "V1:55667788".to_string(),
4833            checksum_down: None,
4834            segments: vec![CliReplaySegment {
4835                kind: CliSegmentKind::Transactional,
4836                statements: vec![CliReplayStatement {
4837                    label: "phase_zero_bootstrap".to_string(),
4838                    up: markerless_seed_phase_zero_sql("markerless_seed_replay"),
4839                }],
4840            }],
4841        };
4842        fs::write(
4843            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4844            serde_json::to_string(&replay).unwrap(),
4845        )
4846        .unwrap();
4847
4848        let bucket = djogi::migrate::BucketKey {
4849            database: "main".to_string(),
4850            app: String::new(),
4851        };
4852        let refusal = classify_phase_zero_for_cleanup(
4853            &work,
4854            &bucket,
4855            djogi::migrate::PHASE_ZERO_VERSION,
4856            "V1:55667788",
4857            None,
4858        );
4859        let msg = refusal.expect("markerless seed replay plan must refuse");
4860        assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
4861
4862        let _ = fs::remove_dir_all(&work);
4863    }
4864
4865    #[test]
4866    fn classify_phase_zero_for_cleanup_refuses_cte_seed_dml_replay_plan() {
4867        let work = temp_workspace("cte_seed_cleanup_replay_plan");
4868        let bucket_dir = work.join("migrations/main/_global_");
4869        fs::create_dir_all(&bucket_dir).unwrap();
4870
4871        let replay = CliReplayPlan {
4872            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4873            classification: CliClassification::Additive,
4874            checksum_up: "V1:66778899".to_string(),
4875            checksum_down: None,
4876            segments: vec![CliReplaySegment {
4877                kind: CliSegmentKind::Transactional,
4878                statements: vec![CliReplayStatement {
4879                    label: "phase_zero_bootstrap".to_string(),
4880                    up: phase_zero_with_seed_statement(
4881                        "cte_seed_cleanup_replay",
4882                        "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
4883                    ),
4884                }],
4885            }],
4886        };
4887        fs::write(
4888            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4889            serde_json::to_string(&replay).unwrap(),
4890        )
4891        .unwrap();
4892
4893        let bucket = djogi::migrate::BucketKey {
4894            database: "main".to_string(),
4895            app: String::new(),
4896        };
4897        let refusal = classify_phase_zero_for_cleanup(
4898            &work,
4899            &bucket,
4900            djogi::migrate::PHASE_ZERO_VERSION,
4901            "V1:66778899",
4902            None,
4903        );
4904        let msg = refusal.expect("CTE seed replay plan must refuse");
4905        assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
4906
4907        let _ = fs::remove_dir_all(&work);
4908    }
4909
4910    #[test]
4911    fn classify_phase_zero_for_cleanup_fallback_sql_file() {
4912        let work = temp_workspace("fallback_cleanup");
4913        let bucket_dir = work.join("migrations/main/_global_");
4914        fs::create_dir_all(&bucket_dir).unwrap();
4915
4916        let up_sql = current_production_phase_zero_sql("fallback_sql");
4917        let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
4918        fs::write(bucket_dir.join(&up_filename), up_sql).unwrap();
4919
4920        let bucket = djogi::migrate::BucketKey {
4921            database: "main".to_string(),
4922            app: String::new(),
4923        };
4924        let refusal = classify_phase_zero_for_cleanup(
4925            &work,
4926            &bucket,
4927            djogi::migrate::PHASE_ZERO_VERSION,
4928            "V1:anychecksum",
4929            None,
4930        );
4931        assert!(
4932            refusal.is_none(),
4933            "identity-free Phase 0 fallback SQL should be allowed; got: {refusal:?}"
4934        );
4935
4936        let _ = fs::remove_dir_all(&work);
4937    }
4938
4939    #[test]
4940    fn classify_phase_zero_for_cleanup_refuses_seed_capable_fallback_sql_file() {
4941        let work = temp_workspace("seed_cleanup_fallback");
4942        let bucket_dir = work.join("migrations/main/_global_");
4943        fs::create_dir_all(&bucket_dir).unwrap();
4944
4945        let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
4946        fs::write(bucket_dir.join(&up_filename), seed_capable_phase_zero_sql()).unwrap();
4947
4948        let bucket = djogi::migrate::BucketKey {
4949            database: "main".to_string(),
4950            app: String::new(),
4951        };
4952        let refusal = classify_phase_zero_for_cleanup(
4953            &work,
4954            &bucket,
4955            djogi::migrate::PHASE_ZERO_VERSION,
4956            "V1:anychecksum",
4957            None,
4958        );
4959        let msg = refusal.expect("seed-capable fallback SQL must refuse");
4960        assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
4961
4962        let _ = fs::remove_dir_all(&work);
4963    }
4964
4965    #[test]
4966    fn classify_phase_zero_for_cleanup_refuses_markerless_seed_fallback_sql_file() {
4967        let work = temp_workspace("markerless_seed_cleanup_fallback");
4968        let bucket_dir = work.join("migrations/main/_global_");
4969        fs::create_dir_all(&bucket_dir).unwrap();
4970
4971        let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
4972        fs::write(
4973            bucket_dir.join(&up_filename),
4974            markerless_seed_phase_zero_sql("markerless_seed_fallback"),
4975        )
4976        .unwrap();
4977
4978        let bucket = djogi::migrate::BucketKey {
4979            database: "main".to_string(),
4980            app: String::new(),
4981        };
4982        let refusal = classify_phase_zero_for_cleanup(
4983            &work,
4984            &bucket,
4985            djogi::migrate::PHASE_ZERO_VERSION,
4986            "V1:anychecksum",
4987            None,
4988        );
4989        let msg = refusal.expect("markerless seed fallback SQL must refuse");
4990        assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
4991
4992        let _ = fs::remove_dir_all(&work);
4993    }
4994
4995    #[test]
4996    fn classify_phase_zero_for_cleanup_refuses_copy_from_seed_fallback_sql_file() {
4997        let work = temp_workspace("copy_seed_cleanup_fallback");
4998        let bucket_dir = work.join("migrations/main/_global_");
4999        fs::create_dir_all(&bucket_dir).unwrap();
5000
5001        let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
5002        fs::write(
5003            bucket_dir.join(&up_filename),
5004            phase_zero_with_seed_statement(
5005                "copy_seed_cleanup_fallback",
5006                "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
5007            ),
5008        )
5009        .unwrap();
5010
5011        let bucket = djogi::migrate::BucketKey {
5012            database: "main".to_string(),
5013            app: String::new(),
5014        };
5015        let refusal = classify_phase_zero_for_cleanup(
5016            &work,
5017            &bucket,
5018            djogi::migrate::PHASE_ZERO_VERSION,
5019            "V1:anychecksum",
5020            None,
5021        );
5022        let msg = refusal.expect("COPY FROM seed fallback SQL must refuse");
5023        assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
5024
5025        let _ = fs::remove_dir_all(&work);
5026    }
5027}