Skip to main content

djogi_cli/
migrations.rs

1//! `djogi migrations` subcommand glue — Phase 7 T6.
2//!
3//! Two leaves: `compose` and `status`. Both flow through the public
4//! `djogi::migrate` API. Compose acquires the workspace file lock for
5//! the duration of the call; status is read-only and does not.
6//!
7//! The CLI surface here is intentionally thin — all the real logic
8//! lives in the library so integration tests can exercise it without
9//! spawning subprocesses.
10
11use std::path::{Path, PathBuf};
12use std::process::ExitCode;
13
14use djogi::apps::AppRegistry;
15use djogi::migrate::{
16    AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
17    DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
18    RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
19    VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
20    compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
21    repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
22};
23
24// Re-export for the apply command's ledger state machine.
25use djogi::migrate::LedgerStatus;
26
27// CLI-side enums declared at the crate root (`main.rs` is the binary's
28// root module — there is no `mod main`), reached here as `crate::*`.
29use crate::{PartialApplyResolutionCli, RepairSubcommand};
30
31// ── Replay plan deserialization ──────────────────────────────────────────
32
33/// Local mirror of `StoredReplayPlan` (pub(crate) in the library).
34///
35/// The committed replay plan JSON written by `compose` at
36/// `migrations/<database>/<app>/<version>.plan.json`. This struct
37/// allows the CLI to parse it and construct a proper [`MigrationPlan`]
38/// with correct segment structure and checksums.
39#[derive(Debug, Clone, serde::Deserialize)]
40struct CliReplayPlan {
41    format_version: String,
42    checksum_up: String,
43    checksum_down: Option<String>,
44    classification: CliClassification,
45    segments: Vec<CliReplaySegment>,
46}
47
48#[derive(Debug, Clone, serde::Deserialize)]
49#[serde(tag = "kind", rename_all = "snake_case")]
50enum CliClassification {
51    NoOp,
52    Additive,
53    Reversible,
54    Destructive,
55    Lossy,
56    Unsupported {
57        reason: String,
58    },
59    PkTypeFlip {
60        co_destructive: bool,
61        co_lossy: bool,
62    },
63}
64
65#[derive(Debug, Clone, serde::Deserialize)]
66struct CliReplaySegment {
67    kind: CliSegmentKind,
68    statements: Vec<CliReplayStatement>,
69}
70
71#[derive(Debug, Clone, serde::Deserialize)]
72#[serde(rename_all = "snake_case")]
73enum CliSegmentKind {
74    Transactional,
75    NonTransactional,
76    MetadataOnly,
77}
78
79#[derive(Debug, Clone, serde::Deserialize)]
80struct CliReplayStatement {
81    label: String,
82    up: String,
83}
84
85/// Expected format version for the committed replay plan JSON.
86const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
87
88/// Load the committed replay plan from disk and convert to a
89/// [`djogi::migrate::MigrationPlan`]. Returns `(plan, checksum_up, checksum_down)`.
90///
91/// Falls back to reading the up/down SQL files and constructing a
92/// single-segment transactional plan when the replay plan JSON is
93/// absent or invalid. This mirrors the reset.rs fallback path.
94fn load_replay_plan_from_disk(
95    workspace: &Path,
96    bucket: &djogi::migrate::BucketKey,
97    version: &str,
98    pending_checksum_up: &str,
99    pending_checksum_down: Option<&str>,
100) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
101    // Try to load the committed replay plan JSON first.
102    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
103    let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
104
105    if let Ok(bytes) = std::fs::read(&replay_plan_path) {
106        let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
107            Ok(s) => s,
108            Err(e) => {
109                return Err(ApplyReplayPlanError::Parse {
110                    path: replay_plan_path.clone(),
111                    source: e.to_string(),
112                });
113            }
114        };
115
116        if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
117            return Err(ApplyReplayPlanError::FormatVersion {
118                found: stored.format_version,
119                path: replay_plan_path.clone(),
120            });
121        }
122
123        // Verify checksums match the pending plan.
124        if stored.checksum_up != pending_checksum_up
125            || stored.checksum_down.as_deref() != pending_checksum_down
126        {
127            return Err(ApplyReplayPlanError::ChecksumMismatch);
128        }
129
130        let plan = djogi::migrate::MigrationPlan {
131            bucket: bucket.clone(),
132            classification: stored.classification.into(),
133            segments: stored
134                .segments
135                .into_iter()
136                .map(|seg| djogi::migrate::Segment {
137                    kind: seg.kind.into(),
138                    statements: seg
139                        .statements
140                        .into_iter()
141                        .map(|stmt| djogi::migrate::OperationSql {
142                            label: stmt.label,
143                            up: stmt.up,
144                            down: String::new(),
145                            lossy: None,
146                        })
147                        .collect(),
148                })
149                .collect(),
150        };
151
152        return Ok((plan, stored.checksum_up, stored.checksum_down));
153    }
154
155    // Fallback: read SQL files and construct single-segment plan.
156    let up_filename = djogi::migrate::up_filename(version);
157    let down_filename = djogi::migrate::down_filename(version);
158    let up_path = bucket_dir.join(&up_filename);
159    let down_path = bucket_dir.join(&down_filename);
160
161    let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
162        path: up_path.clone(),
163        source: e.to_string(),
164    })?;
165
166    let down_sql = match std::fs::read_to_string(&down_path) {
167        Ok(sql) => sql,
168        Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
169        Err(e) => {
170            return Err(ApplyReplayPlanError::SqlRead {
171                path: down_path.clone(),
172                source: e.to_string(),
173            });
174        }
175    };
176
177    // Compute checksum for the single-segment fallback. The runner
178    // recomputes from the plan's SQL fragments and verifies against
179    // what we provide in RunnerCtx, so they must match.
180    let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
181
182    // Build a single-transactional-segment plan. This is correct for
183    // most migrations — only CONCURRENTLY indexes require non-tx
184    // segments, and those always have a replay plan JSON.
185    let plan = djogi::migrate::MigrationPlan {
186        bucket: bucket.clone(),
187        classification: djogi::migrate::Classification::Additive,
188        segments: vec![djogi::migrate::Segment {
189            kind: djogi::migrate::SegmentKind::Transactional,
190            statements: vec![djogi::migrate::OperationSql {
191                label: format!("replay {version}"),
192                up: up_sql,
193                down: down_sql,
194                lossy: None,
195            }],
196        }],
197    };
198
199    Ok((plan, computed_checksum_up, None))
200}
201
202/// Errors from [`load_replay_plan_from_disk`].
203#[derive(Debug)]
204enum ApplyReplayPlanError {
205    Parse { path: PathBuf, source: String },
206    FormatVersion { found: String, path: PathBuf },
207    ChecksumMismatch,
208    SqlRead { path: PathBuf, source: String },
209}
210
211impl std::fmt::Display for ApplyReplayPlanError {
212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213        match self {
214            Self::Parse { path, source } => {
215                write!(f, "parse replay plan {}: {source}", path.display())
216            }
217            Self::FormatVersion { found, path } => write!(
218                f,
219                "replay plan format version mismatch in {}: expected {}, found {}",
220                path.display(),
221                CLI_REPLAY_PLAN_FORMAT_VERSION,
222                found
223            ),
224            Self::ChecksumMismatch => {
225                write!(f, "checksum mismatch between pending JSON and replay plan")
226            }
227            Self::SqlRead { path, source } => {
228                write!(f, "read SQL file {}: {source}", path.display())
229            }
230        }
231    }
232}
233
234impl std::error::Error for ApplyReplayPlanError {}
235
236// ── Type conversions from CLI-local types to library types ────────────────
237
238impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
239    fn from(kind: CliSegmentKind) -> Self {
240        match kind {
241            CliSegmentKind::Transactional => Self::Transactional,
242            CliSegmentKind::NonTransactional => Self::NonTransactional,
243            CliSegmentKind::MetadataOnly => Self::MetadataOnly,
244        }
245    }
246}
247
248impl From<CliClassification> for djogi::migrate::Classification {
249    fn from(classification: CliClassification) -> Self {
250        match classification {
251            CliClassification::NoOp => Self::NoOp,
252            CliClassification::Additive => Self::Additive,
253            CliClassification::Reversible => Self::Reversible,
254            CliClassification::Destructive => Self::Destructive,
255            CliClassification::Lossy => Self::Lossy,
256            CliClassification::Unsupported { reason } => Self::Unsupported { reason },
257            CliClassification::PkTypeFlip {
258                co_destructive,
259                co_lossy,
260            } => Self::PkTypeFlip {
261                co_destructive,
262                co_lossy,
263            },
264        }
265    }
266}
267
268/// Resolve the workspace root from the `--workspace` flag. When the
269/// flag is absent we use the current working directory — the typical
270/// invocation pattern is `cd <project>` then `djogi migrations …`.
271fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
272    workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
273}
274
275/// Walk the on-disk `migrations/<database>/<app>/` tree and return the
276/// set of buckets that already have a `schema_snapshot.json` file.
277///
278/// Compose's `snapshots` map must include the OLD bucket of any
279/// renamed app — and that bucket is guaranteed to be absent from the
280/// current `models` inventory because the `#[app(renamed_from =
281/// "old")]` annotation lives on the NEW app. Walking disk directly
282/// recovers those orphaned snapshots so the differ sees both sides of
283/// a rename.
284///
285/// Each entry maps to a [`djogi::migrate::projection::BucketKey`]
286/// using the inverse of [`djogi::migrate::app_dirname`] (synthetic
287/// `_global_` directory → empty-string label).
288fn discover_snapshot_buckets_on_disk(
289    workspace: &Path,
290) -> Vec<djogi::migrate::projection::BucketKey> {
291    let mut out = Vec::new();
292    let migrations_root = djogi::migrate::migrations_root(workspace);
293    let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
294        return out;
295    };
296    for db_entry in db_entries.flatten() {
297        let Ok(ft) = db_entry.file_type() else {
298            continue;
299        };
300        if !ft.is_dir() {
301            continue;
302        }
303        let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
304            continue;
305        };
306        let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
307            continue;
308        };
309        for app_entry in app_entries.flatten() {
310            let Ok(ft) = app_entry.file_type() else {
311                continue;
312            };
313            if !ft.is_dir() {
314                continue;
315            }
316            let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
317                continue;
318            };
319            let snap_path = app_entry.path().join("schema_snapshot.json");
320            if !snap_path.exists() {
321                continue;
322            }
323            let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
324            out.push(djogi::migrate::projection::BucketKey {
325                database: database.clone(),
326                app: label,
327            });
328        }
329    }
330    out
331}
332
333/// `djogi migrations compose` entry point.
334pub fn compose_cmd(
335    provider: &dyn DescriptorProvider,
336    name: &str,
337    allow_destructive: bool,
338    force_overwrite: bool,
339    workspace: Option<PathBuf>,
340) -> ExitCode {
341    let workspace = resolve_workspace(workspace);
342    let models = match project_from_provider(provider) {
343        Ok(m) => m,
344        Err(e) => {
345            eprintln!("djogi migrations compose: projection error: {e}");
346            return ExitCode::from(1);
347        }
348    };
349    let apps: Vec<AppLifecycle> = provider
350        .apps()
351        .iter()
352        .map(|d| AppLifecycle {
353            label: d.label.to_string(),
354            database: d.database.to_string(),
355            renamed_from: d.renamed_from.map(str::to_string),
356            tombstone: d.tombstone,
357        })
358        .collect();
359    // Codex round-2 A-1: the resolved workspace flows into config
360    // loading too. Round-2 / B-12 update: compose now consumes the
361    // [`MigrateConfig::pk_flip_join_table_option`] knob so we no
362    // longer drop the parsed config — the join-table layout
363    // selected in `Djogi.toml` reaches the differ via this path.
364    let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
365        Ok(c) => c,
366        Err(e) => {
367            eprintln!("djogi migrations compose: config load: {e}");
368            return ExitCode::from(1);
369        }
370    };
371    let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
372        djogi_config.migrate.pk_flip_join_table_option,
373    );
374    compose_with_inputs(
375        &workspace,
376        name,
377        allow_destructive,
378        force_overwrite,
379        &models,
380        &apps,
381        time::OffsetDateTime::now_utc(),
382        Some(pk_flip_option),
383    )
384}
385
386/// Shared compose body — separated from [`compose_cmd`] so tests can
387/// drive it with explicit `models` and `apps` (the production entry
388/// point sources both from `inventory::iter` and `AppRegistry::all`,
389/// which are global state and thus not directly addressable from a
390/// unit test).
391///
392/// Acquires the workspace lock, walks the on-disk migration tree to
393/// recover orphaned snapshots (Codex B-1 — renamed-from buckets), and
394/// invokes [`djogi::migrate::compose`].
395// Compose has 8 inputs because it sits at the bridge between
396// CLI flag parsing (workspace / name / flags / clock) and the
397// engine (`models` / `apps` / `pk_flip_join_table_option`).
398// Folding these into a struct would push the same fields onto
399// the caller; the CLI tests already pass them positionally and
400// a struct-based refactor would be churn for no clarity gain.
401#[allow(clippy::too_many_arguments)]
402fn compose_with_inputs(
403    workspace: &Path,
404    name: &str,
405    allow_destructive: bool,
406    force_overwrite: bool,
407    models: &std::collections::BTreeMap<
408        djogi::migrate::projection::BucketKey,
409        djogi::migrate::AppliedSchema,
410    >,
411    apps: &[AppLifecycle],
412    now: time::OffsetDateTime,
413    pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
414) -> ExitCode {
415    let lock_path = workspace.join(LOCK_FILE_NAME);
416    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
417        Ok(g) => g,
418        Err(e) => {
419            eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
420            return ExitCode::from(1);
421        }
422    };
423
424    // Read snapshots from disk. Codex B-1: the bucket set we load is
425    // the UNION of (a) every bucket the current projection knows
426    // about and (b) every on-disk bucket that has a snapshot file.
427    // Without (b) a renamed-from app's old snapshot is missed
428    // entirely (the new app's `BucketKey` differs and the differ
429    // never sees the old schema, breaking compose's rename + drop +
430    // move emission).
431    let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
432        models.keys().cloned().collect();
433    for bucket in discover_snapshot_buckets_on_disk(workspace) {
434        bucket_set.insert(bucket);
435    }
436
437    let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
438    for bucket in &bucket_set {
439        let path = djogi::migrate::snapshot_path(workspace, bucket);
440        match djogi::migrate::load_snapshot(&path) {
441            Ok(s) => {
442                snapshots.insert(bucket.clone(), s);
443            }
444            Err(djogi::migrate::SnapshotError::Io { source, .. })
445                if source.kind() == std::io::ErrorKind::NotFound =>
446            {
447                // Fresh app — no prior snapshot.
448            }
449            Err(e) => {
450                eprintln!(
451                    "djogi migrations compose: snapshot load failed at {}: {e}",
452                    path.display()
453                );
454                return ExitCode::from(1);
455            }
456        }
457    }
458
459    let req = ComposeRequest {
460        workspace_root: workspace,
461        models,
462        snapshots: &snapshots,
463        apps,
464        name,
465        allow_destructive,
466        force_overwrite,
467        now,
468        _guard: &guard,
469        pk_flip_join_table_option,
470        // Production: always run Phase 0 auto-emit. The flag is a
471        // test-only escape hatch for unit tests that exercise
472        // compose's lower-level write/rollback machinery in
473        // isolation; the CLI / production path always goes through
474        // the full bootstrap flow.
475        skip_phase_zero_auto_emit: false,
476    };
477    match compose(req) {
478        Ok(report) => {
479            // Track 0: surface auto-emitted Phase 0 bootstraps before
480            // the regular composed buckets so the operator sees the
481            // bootstrap context before the per-bucket changes.
482            for emit in &report.emitted_phase_zero {
483                let ext_summary = if emit.extensions.is_empty() {
484                    "no extensions".to_string()
485                } else {
486                    format!(
487                        "extensions: {}",
488                        emit.extensions
489                            .iter()
490                            .cloned()
491                            .collect::<Vec<_>>()
492                            .join(", ")
493                    )
494                };
495                println!(
496                    "auto-emitted Phase 0 bootstrap: {database}/_global_ ({ext_summary})",
497                    database = emit.database,
498                );
499            }
500            for cb in &report.composed_buckets {
501                println!(
502                    "composed {database}/{app}: {version} ({classification:?})",
503                    database = cb.bucket.database,
504                    app = if cb.bucket.app.is_empty() {
505                        "_global_"
506                    } else {
507                        cb.bucket.app.as_str()
508                    },
509                    version = cb.version,
510                    classification = cb.classification,
511                );
512            }
513            ExitCode::from(0)
514        }
515        Err(ComposeError::NothingToCompose) => {
516            println!("nothing to compose — model state matches snapshot for every bucket");
517            // Per the v3 §3 inline-decisions: nothing-to-compose is
518            // not an error. The status command is the one that
519            // signals out-of-sync state via exit code.
520            ExitCode::from(0)
521        }
522        Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
523            eprintln!("djogi migrations compose: {text}");
524            // Exit 2 — refusal: models must be compiled in before dropping app linkage.
525            ExitCode::from(2)
526        }
527        Err(e) => {
528            eprintln!("djogi migrations compose: {e}");
529            ExitCode::from(1)
530        }
531    }
532}
533
534/// `djogi migrations status` entry point.
535///
536/// Read-only — does not acquire the workspace lock. Reads the
537/// migration ledger from the active database via
538/// [`djogi::context::DjogiContext`].
539pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
540    let workspace = resolve_workspace(workspace);
541
542    // Build a tokio runtime so we can drive the async ledger query.
543    let runtime = match tokio::runtime::Builder::new_current_thread()
544        .enable_all()
545        .build()
546    {
547        Ok(r) => r,
548        Err(e) => {
549            eprintln!("djogi migrations status: tokio runtime: {e}");
550            return ExitCode::from(1);
551        }
552    };
553
554    let exit = runtime.block_on(async { run_status(&workspace).await });
555    ExitCode::from(exit as u8)
556}
557
558/// Async body of [`status_cmd`]. Returns the desired exit code.
559///
560/// Codex A-1: the resolved `workspace` path now feeds
561/// [`djogi::config::DjogiConfig::load_from_workspace`] so a
562/// `--workspace /custom/path` actually reads `/<custom>/Djogi.toml`
563/// instead of always picking up the cwd's config. Production callers
564/// running from inside the project root (the typical case) get the
565/// previous behaviour for free — `resolve_workspace(None)` returns
566/// `cwd`.
567async fn run_status(workspace: &Path) -> i32 {
568    use djogi::config::DjogiConfig;
569
570    let config = match DjogiConfig::load_from_workspace(workspace) {
571        Ok(c) => c,
572        Err(e) => {
573            eprintln!("djogi migrations status: config load: {e}");
574            return 1;
575        }
576    };
577
578    let mut ctx = match connect_and_check(&config.database.url).await {
579        ContextOutcome::Ready(ctx) => ctx,
580        ContextOutcome::UnsupportedVersion(e) => {
581            crate::print_support_boundary_error("migrations status", &e);
582            return 2;
583        }
584        ContextOutcome::RuntimeError(msg) => {
585            eprintln!("djogi migrations status: pool: {msg}");
586            return 1;
587        }
588    };
589
590    let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
591        Ok(rows) => rows,
592        Err(e) => {
593            // A missing ledger table is treated as "no migrations
594            // applied" — print the empty state and exit 0.
595            if e.to_string().contains("djogi_schema_migrations") {
596                println!("No migrations recorded.");
597                return 0;
598            }
599            eprintln!("djogi migrations status: ledger read: {e}");
600            return 1;
601        }
602    };
603
604    let registered: Vec<String> = AppRegistry::all()
605        .iter()
606        .map(|d| d.label.to_string())
607        .collect();
608    let report = djogi::migrate::render_status(&rows, &registered);
609    for line in &report.lines {
610        println!("{line}");
611    }
612    report.exit_code
613}
614
615/// Outcome of [`connect_and_check`] — connecting a pool and running the
616/// Postgres-version preflight, with the support-boundary refusal kept
617/// distinct from ordinary runtime failures.
618///
619/// The three arms drive different exit codes at the call site:
620///
621/// - [`ContextOutcome::Ready`] — pool connected and PG ≥ 18; proceed.
622/// - [`ContextOutcome::UnsupportedVersion`] — PG < 18. The caller renders
623///   the support-boundary message via
624///   [`crate::print_support_boundary_error`] and exits `2` (refusal: the
625///   operator must upgrade Postgres; retrying changes nothing).
626/// - [`ContextOutcome::RuntimeError`] — pool connect failed, the preflight
627///   query errored, or any other non-version `DjogiError`. The caller
628///   prints the message and exits `1` (transient: CI may retry).
629// The `Ready` variant holds a `DjogiContext` (large — it wraps a
630// `DjogiPool`), while the other two variants are small (`DjogiError` /
631// `String`). Boxing `Ready` would add a heap allocation on the success
632// path; this value is constructed and immediately matched at each call
633// site (never stored in a collection), so the wider stack value is a
634// transient one-off, not a per-element penalty. Same trade-off and
635// rationale as `ContextInner` in `djogi::context` (see its
636// `large_enum_variant` allow).
637#[allow(clippy::large_enum_variant)]
638enum ContextOutcome {
639    /// Pool connected and the PG-version preflight passed.
640    Ready(djogi::context::DjogiContext),
641    /// The PG-version preflight refused — server is below the minimum
642    /// supported major version.
643    UnsupportedVersion(djogi::error::DjogiError),
644    /// A runtime failure (connect / preflight / other) — already rendered
645    /// to a string so the call site need not re-match.
646    RuntimeError(String),
647}
648
649/// Connect a pool from `url` and run the Postgres-version preflight,
650/// returning a typed [`ContextOutcome`].
651///
652/// Splits the support-boundary refusal (PG < 18, exit `2`) from runtime
653/// failures (connect / query errors, exit `1`) so each call site can map
654/// the outcome onto the documented exit-code matrix. Connects via the
655/// public `DjogiPool::connect` entry point, then hands the pool to the
656/// public `DjogiContext::from_pool` API once the version check passes.
657async fn connect_and_check(url: &str) -> ContextOutcome {
658    let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
659        Ok(p) => p,
660        Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
661    };
662    match djogi::pg::preflight::check_postgres_version(&pool).await {
663        Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
664        // `DjogiError` is `#[non_exhaustive]`, so the `@`-bound
665        // `UnsupportedPostgresVersion` arm needs the trailing `_` catch-all.
666        Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
667            ContextOutcome::UnsupportedVersion(e)
668        }
669        Err(other) => ContextOutcome::RuntimeError(other.to_string()),
670    }
671}
672
673/// Resolve the connection URL for a single migration-bucket database.
674///
675/// Verify routes each bucket to the pool for its `database` component.
676/// The mapping mirrors Djogi's three-database architecture:
677///
678/// - `"main"` ([`djogi::apps::AppDescriptor::GLOBAL_DATABASE`]) always uses
679///   the app URL verbatim. We do NOT derive it by splicing `"main"` into
680///   the path, because the operator's app URL may carry a path component
681///   that is not literally named `main` (e.g. `…/myapp_prod`); deriving
682///   would target a database that does not exist.
683/// - `"crud_log"` / `"event_log"` prefer the explicit
684///   [`djogi::config::DatabaseConfig::crud_log_url`] /
685///   [`event_log_url`](djogi::config::DatabaseConfig::event_log_url) when
686///   set to a non-empty value, matching how the audit / event pools are
687///   resolved elsewhere.
688/// - Any other database name (and the log databases when their explicit
689///   URL is absent) is derived by splicing the name into the app URL's
690///   path component via [`djogi::migrate::derive_per_database_url`].
691///
692/// Returns `None` when derivation fails (the app URL has no recognisable
693/// path component); the caller surfaces that as a runtime error for the
694/// affected bucket.
695fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
696    // "main" always uses the app URL verbatim — do NOT derive, as the app
697    // URL may not have a path component named "main".
698    if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
699        return Some(db_config.url.clone());
700    }
701    if database == "crud_log"
702        && let Some(u) = db_config.crud_log_url.as_deref()
703        && !u.is_empty()
704    {
705        return Some(u.to_string());
706    }
707    if database == "event_log"
708        && let Some(u) = db_config.event_log_url.as_deref()
709        && !u.is_empty()
710    {
711        return Some(u.to_string());
712    }
713    djogi::migrate::derive_per_database_url(&db_config.url, database)
714}
715
716/// `djogi migrations apply` entry point.
717///
718/// Discovers pending JSON files under `target/djogi_pending/`, loads the
719/// committed replay plan for each, and drives [`djogi::migrate::apply_plan`]
720/// through the library runner with full crash recovery via the ledger state
721/// machine.
722pub fn apply_cmd(workspace: Option<PathBuf>, fake: bool, reason: Option<String>) -> ExitCode {
723    let workspace = resolve_workspace(workspace);
724
725    // Validate --fake / --reason pairing before doing any expensive work.
726    let mode = if fake {
727        match reason {
728            Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
729            Some(_) => {
730                eprintln!(
731                    "djogi migrations apply --fake: --reason must not be empty; \
732                     supply a non-empty reason why these migrations are being \
733                     faked (e.g. 'schema pre-exists from prior tooling')"
734                );
735                return ExitCode::from(2);
736            }
737            None => {
738                eprintln!(
739                    "djogi migrations apply --fake: --reason is required; \
740                     supply a reason why these migrations are being faked \
741                     (e.g. 'schema pre-exists from prior tooling'). \
742                     This is recorded in the ledger audit trail."
743                );
744                return ExitCode::from(2);
745            }
746        }
747    } else {
748        FakeMode::Real
749    };
750
751    let runtime = match tokio::runtime::Builder::new_current_thread()
752        .enable_all()
753        .build()
754    {
755        Ok(r) => r,
756        Err(e) => {
757            eprintln!("djogi migrations apply: tokio runtime: {e}");
758            return ExitCode::from(1);
759        }
760    };
761
762    let exit = runtime.block_on(async { run_apply(&workspace, &mode).await });
763    ExitCode::from(exit as u8)
764}
765
766/// Controls whether `apply_one_pending` executes SQL or records a
767/// fake-apply row in the ledger.
768#[derive(Debug, Clone)]
769enum FakeMode {
770    /// Execute DDL via `apply_plan`. Normal migration apply.
771    Real,
772    /// Skip DDL; record `status = 'faked'` via `fake_apply_plan`.
773    Fake { reason: String },
774}
775
776/// Async body of [`apply_cmd`]. Returns the desired exit code.
777async fn run_apply(workspace: &Path, mode: &FakeMode) -> i32 {
778    use djogi::config::DjogiConfig;
779
780    let action_verb = match mode {
781        FakeMode::Real => "apply",
782        FakeMode::Fake { .. } => "fake-apply",
783    };
784    let progress_verb = match mode {
785        FakeMode::Real => "applying",
786        FakeMode::Fake { .. } => "faking",
787    };
788
789    // 1. Load config.
790    let config = match DjogiConfig::load_from_workspace(workspace) {
791        Ok(c) => c,
792        Err(e) => {
793            eprintln!("djogi migrations {action_verb}: config load: {e}");
794            return 2;
795        }
796    };
797
798    // 2. Build pool and check PG version preflight.
799    let pool = match djogi::pg::pool::DjogiPool::connect(&config.database.url).await {
800        Ok(p) => p,
801        Err(e) => {
802            eprintln!("djogi migrations {action_verb}: pool connect: {e}");
803            return 1;
804        }
805    };
806    if let Err(e) = djogi::pg::preflight::check_postgres_version(&pool).await {
807        crate::print_support_boundary_error("migrations apply", &e);
808        return 2;
809    }
810
811    // 3. Discover pending JSONs.
812    let pending_files = discover_pending_plans(workspace);
813    if pending_files.is_empty() {
814        println!("No pending migrations to {action_verb}.");
815        return 0;
816    }
817
818    // 4. Acquire workspace lock.
819    let lock_path = workspace.join(LOCK_FILE_NAME);
820    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
821        Ok(g) => g,
822        Err(e) => {
823            eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
824            return 1;
825        }
826    };
827
828    // 5. Build audit pool (optional — silently skipped if unavailable).
829    let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
830        Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
831        Err(_) => None,
832    };
833
834    // 6. Build context from pool (not pinned yet — apply_plan pins internally).
835    let mut ctx = djogi::context::DjogiContext::from_pool(pool);
836
837    // 7. Apply each pending migration in order.
838    for (pending_path, bucket_database, app_label) in &pending_files {
839        println!("  {progress_verb} {bucket_database}/{app_label}...");
840        let result = apply_one_pending(
841            &mut ctx,
842            workspace,
843            pending_path,
844            bucket_database.clone(),
845            app_label.clone(),
846            &config,
847            &guard,
848            audit_pool.as_ref(),
849            mode,
850        )
851        .await;
852
853        match result {
854            ApplyResult::Ok => match mode {
855                FakeMode::Real => {
856                    println!("Applied: {bucket_database}/{app_label}");
857                }
858                FakeMode::Fake { .. } => {
859                    println!(
860                        "  faked {bucket_database}/{app_label}: \
861                             recorded in ledger with status = 'faked' (no SQL executed)"
862                    );
863                }
864            },
865            ApplyResult::Skipped(reason) => {
866                println!("Skipped {bucket_database}/{app_label}: {reason}");
867            }
868            ApplyResult::Refused(reason) => {
869                eprintln!(
870                    "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
871                );
872                return 2;
873            }
874            ApplyResult::RunnerError(e) => {
875                eprintln!(
876                    "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
877                );
878                return runner_error_exit_code(&e);
879            }
880        }
881    }
882
883    let summary_verb = match mode {
884        FakeMode::Real => "applied",
885        FakeMode::Fake { .. } => "faked",
886    };
887    println!("{summary_verb} {} migration(s).", pending_files.len());
888    0
889}
890
891/// Outcome of applying a single pending migration.
892#[derive(Debug)]
893enum ApplyResult {
894    /// Migration applied successfully.
895    Ok,
896    /// Migration skipped (already applied or no-op).
897    Skipped(String),
898    /// User-facing refusal — exit code 2.
899    Refused(String),
900    /// Runner error — exit code 1.
901    RunnerError(RunnerError),
902}
903
904/// Scan `target/djogi_pending/` for pending JSON files.
905///
906/// Returns a list of `(path, database, app)` tuples sorted by file
907/// name so the apply order is deterministic. Each path points to a
908/// valid JSON file that was discovered on disk.
909fn discover_pending_plans(workspace: &Path) -> Vec<(PathBuf, String, String)> {
910    let pending_root = djogi::migrate::pending_root(workspace);
911    let mut out = Vec::new();
912
913    let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
914        return out;
915    };
916
917    for db_entry in db_entries.flatten() {
918        let db_name = match db_entry.file_name().to_str().map(str::to_string) {
919            Some(n) => n,
920            None => continue,
921        };
922        if db_name.starts_with('.') {
923            continue;
924        }
925
926        let db_dir = db_entry.path();
927        if !db_dir.is_dir() {
928            continue;
929        }
930
931        let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
932            continue;
933        };
934
935        for app_entry in app_entries.flatten() {
936            let path = app_entry.path();
937            if !path.is_file() {
938                continue;
939            }
940            let filename = match path.file_name().and_then(|f| f.to_str()) {
941                Some(f) => f,
942                None => continue,
943            };
944            // Filter: must be a .json file, not the special _global_.json
945            // pattern which is handled correctly by the naming function.
946            if !filename.ends_with(".json") {
947                continue;
948            }
949            // Extract app label from filename by stripping .json extension.
950            // The pending JSON filename is `<app>.json` or `_global_.json`.
951            let app_label = if let Some(stripped) = filename.strip_suffix(".json") {
952                stripped.to_string()
953            } else {
954                continue;
955            };
956
957            out.push((path, db_name.clone(), app_label));
958        }
959    }
960
961    out.sort_by(|a, b| a.0.cmp(&b.0));
962    out
963}
964
965/// Apply a single pending migration.
966///
967/// Loads the pending JSON to recover bucket and version, checks the
968/// ledger state machine for crash recovery, loads the committed replay
969/// plan (or falls back to a single-segment plan from the SQL file), and
970/// drives [`djogi::migrate::apply_plan`].
971///
972/// Uses the bypass attribute because deleting failed ledger rows requires
973/// raw SQL that is not exposed through the public typed API.
974// apply_one_pending carries 9 arguments because it sits at the bridge
975// between the CLI dispatch (workspace, path, bucket info) and the
976// library runner (config, guard, audit pool, mode). Folding these into a
977// struct would push the same fields onto the caller and add churn for
978// no clarity gain — the pattern matches compose_with_inputs and attune.
979#[allow(clippy::too_many_arguments)]
980#[djogi::deliberately_bypass_convention_with_raw_sql]
981// JUSTIFICATION (PIN): apply_one_pending needs to delete stale failed
982// ledger rows via `DELETE FROM djogi_schema_migrations WHERE version = $1`.
983// The public API has no delete operation — `select_all_ledger_rows` is read-only and
984// `insert_pending` is write-only. This is the minimal raw SQL surface
985// required for crash recovery.
986async fn apply_one_pending(
987    ctx: &mut djogi::context::DjogiContext,
988    workspace: &Path,
989    pending_path: &Path,
990    bucket_database: String,
991    app_label: String,
992    config: &djogi::config::DjogiConfig,
993    guard: &djogi::migrate::WorkspaceGuard,
994    audit_pool: Option<&deadpool_postgres::Pool>,
995    mode: &FakeMode,
996) -> ApplyResult {
997    // 1. Parse pending JSON to get bucket + version + checksums.
998    let pending_bytes = match std::fs::read(pending_path) {
999        Ok(b) => b,
1000        Err(e) => {
1001            return ApplyResult::Refused(format!("read pending JSON: {e}"));
1002        }
1003    };
1004    let pending: PendingPlan = match serde_json::from_slice(&pending_bytes) {
1005        Ok(p) => p,
1006        Err(e) => {
1007            return ApplyResult::Refused(format!("parse pending JSON: {e}"));
1008        }
1009    };
1010
1011    // Resolve bucket key from pending plan fields. The `_global_` app
1012    // maps to empty string (synthetic global bucket).
1013    let resolved_app = if app_label == "_global_" {
1014        String::new()
1015    } else {
1016        app_label.clone()
1017    };
1018    let bucket = djogi::migrate::BucketKey {
1019        database: bucket_database,
1020        app: resolved_app,
1021    };
1022
1023    // 2. Check ledger state machine for this version.
1024    match check_ledger_state(ctx, &pending.version).await {
1025        LedgerState::NotPresent => {} /* normal path */
1026        LedgerState::AlreadyApplied => {
1027            return ApplyResult::Skipped("already applied".to_string());
1028        }
1029        LedgerState::PendingOrPartial(existing_status) => {
1030            // Pending or partial state from a previous interrupted run.
1031            // Failed and RolledBack are non-terminal stale rows that block
1032            // re-apply — delete them and proceed. Pending rows require
1033            // explicit operator resolution.
1034            if existing_status == LedgerStatus::Failed
1035                || existing_status == LedgerStatus::RolledBack
1036            {
1037                // Both Failed and RolledBack rows are non-terminal stale rows
1038                // that block re-apply. delete_failed_ledger_row is a status-
1039                // agnostic DELETE by version; the name reflects the original
1040                // crash-recovery use case but the operation applies equally to
1041                // rolled-back rows.
1042                if let Err(e) = delete_failed_ledger_row(ctx, &pending.version).await {
1043                    return ApplyResult::Refused(format!(
1044                        "clean {} ledger row: {e}",
1045                        existing_status.as_db_str()
1046                    ));
1047                }
1048            } else {
1049                return ApplyResult::Refused(format!(
1050                    "version already in {} state — resolve before re-applying",
1051                    existing_status.as_db_str()
1052                ));
1053            }
1054        }
1055    }
1056
1057    // 3. Load committed replay plan (or fall back to single-segment).
1058    let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1059        workspace,
1060        &bucket,
1061        &pending.version,
1062        &pending.checksum_up,
1063        pending.checksum_down.as_deref(),
1064    ) {
1065        Ok(result) => result,
1066        Err(e) => {
1067            return ApplyResult::Refused(format!("load replay plan: {e}"));
1068        }
1069    };
1070
1071    // 4. Construct RunnerCtx.
1072    let runner_ctx = RunnerCtx {
1073        bucket: bucket.clone(),
1074        version: pending.version.clone(),
1075        description: pending.slug.clone(),
1076        checksum_up,
1077        checksum_down,
1078        snapshot: Some(pending.model_snapshot.clone()),
1079        snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
1080        // MigrateConfig does not derive Clone; construct from fields.
1081        config: djogi::config::MigrateConfig {
1082            concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1083            strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1084            pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1085            pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1086        },
1087        out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1088        audit_pool: audit_pool.cloned(),
1089    };
1090
1091    // 5. Apply (or fake-apply) the plan through the library runner.
1092    let runner_result = match mode {
1093        FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1094        FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1095    };
1096    match runner_result {
1097        Ok(_) => ApplyResult::Ok,
1098        Err(e) => ApplyResult::RunnerError(e),
1099    }
1100}
1101
1102/// Ledger state for a given migration version.
1103#[derive(Debug)]
1104enum LedgerState {
1105    /// No row exists — first apply.
1106    NotPresent,
1107    /// Row exists and is in terminal applied state.
1108    AlreadyApplied,
1109    /// Row exists in a non-terminal state with the specific status.
1110    PendingOrPartial(LedgerStatus),
1111}
1112
1113/// Check the ledger for an existing row matching `version`.
1114async fn check_ledger_state(ctx: &mut djogi::context::DjogiContext, version: &str) -> LedgerState {
1115    let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1116        // Ledger table might not exist yet — treat as NotPresent so
1117        // the runner can bootstrap it.
1118        return LedgerState::NotPresent;
1119    };
1120
1121    let existing = rows.iter().find(|r| r.version == version);
1122    match existing {
1123        None => LedgerState::NotPresent,
1124        Some(row) => match row.status {
1125            LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1126                LedgerState::AlreadyApplied
1127            }
1128            LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1129                LedgerState::PendingOrPartial(row.status)
1130            }
1131        },
1132    }
1133}
1134
1135/// Map a [`RunnerError`] to an exit code.
1136///
1137/// All runner errors map to exit code 1 (apply failure). Exit code 2
1138/// is reserved for user-facing refusals that happen before the runner
1139/// is invoked.
1140fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1141    1
1142}
1143
1144#[djogi::deliberately_bypass_convention_with_raw_sql]
1145// JUSTIFICATION (PIN): delete_failed_ledger_row removes a stale Failed
1146// row so the migration can be retried. The public API has no delete
1147// operation for ledger rows — only select_all_ledger_rows and insert_pending
1148// are exposed. This DELETE is the minimal raw SQL required for crash recovery.
1149async fn delete_failed_ledger_row(
1150    ctx: &mut djogi::context::DjogiContext,
1151    version: &str,
1152) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1153    ctx.raw_execute(
1154        "DELETE FROM djogi_schema_migrations WHERE version = $1",
1155        &[&version],
1156    )
1157    .await?;
1158    Ok(())
1159}
1160
1161/// Reconstruct the snapshot path for a bucket: `migrations/<database>/<app>/schema_snapshot.json`.
1162fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1163    let migrations_root = djogi::migrate::migrations_root(workspace);
1164    migrations_root
1165        .join(&bucket.database)
1166        .join(djogi::migrate::app_dirname(&bucket.app))
1167        .join("schema_snapshot.json")
1168}
1169
1170/// `djogi migrations attune` entry point.
1171///
1172/// Mode selection (per CLI flags):
1173///
1174/// | `--record-ledger` | `--squash` | resolved mode |
1175/// |-----------|-----------|---------------|
1176/// | false | false | [`AttuneMode::DiffOnly`] (read-only diff) |
1177/// | true  | false | [`AttuneMode::Record`] |
1178/// | false | true  | [`AttuneMode::Squash { from, publish, app }`] |
1179/// | true  | true  | rejected by clap (`conflicts_with`) |
1180///
1181/// Argument semantics:
1182/// - `target` is an optional positional Git target (commit / tag /
1183///   branch). When supplied, attune resolves it (local first, fetch
1184///   on miss) before any DB / disk mutation.
1185/// - `apply` gates DB / disk mutation. Without it, every mode is a
1186///   dry-run.
1187/// - `record` controls the parent repo's recorded submodule pointer
1188///   (separate from `record_ledger`, which controls the
1189///   `djogi_schema_migrations` ledger inserts).
1190///
1191/// `--squash` requires `--from <ver>`; an absent `from` while
1192/// `--squash` is set surfaces as a CLI error before any work happens.
1193// Codex umbrella U-1: the CLI dispatch carries 11 inputs because the
1194// attune surface is the broadest in the migrations CLI — target
1195// resolution + dry-run + record-ledger + record-pointer + squash +
1196// publish all live on the same command. Folding them into a struct
1197// would push the same fields onto the caller; the dispatch above
1198// already passes them positionally and a struct refactor would be
1199// churn for no clarity gain.
1200#[allow(clippy::too_many_arguments)]
1201pub fn attune_cmd(
1202    target: Option<&str>,
1203    apply: bool,
1204    record: bool,
1205    record_ledger: bool,
1206    record_reason: &str,
1207    squash: bool,
1208    from: Option<&str>,
1209    publish: bool,
1210    app: Option<&str>,
1211    workspace: Option<PathBuf>,
1212) -> ExitCode {
1213    let workspace = resolve_workspace(workspace);
1214    let mode = match (record_ledger, squash) {
1215        (false, false) => AttuneMode::DiffOnly,
1216        (true, false) => AttuneMode::Record {
1217            reason: record_reason.to_string(),
1218        },
1219        (false, true) => match from {
1220            Some(v) if !v.is_empty() => AttuneMode::Squash {
1221                from: v.to_string(),
1222                publish,
1223                app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1224            },
1225            _ => {
1226                eprintln!(
1227                    "djogi migrations attune --squash requires --from <version> (e.g. \
1228                     `--from V20260101000000__init`)"
1229                );
1230                return ExitCode::from(2);
1231            }
1232        },
1233        (true, true) => {
1234            // Already rejected by clap's `conflicts_with`; this branch
1235            // is defensive in case the flag is added programmatically.
1236            eprintln!(
1237                "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1238            );
1239            return ExitCode::from(2);
1240        }
1241    };
1242
1243    let runtime = match tokio::runtime::Builder::new_current_thread()
1244        .enable_all()
1245        .build()
1246    {
1247        Ok(r) => r,
1248        Err(e) => {
1249            eprintln!("djogi migrations attune: tokio runtime: {e}");
1250            return ExitCode::from(1);
1251        }
1252    };
1253
1254    let target_owned = target.map(str::to_string);
1255    let exit =
1256        runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1257    ExitCode::from(exit as u8)
1258}
1259
1260/// Async body of [`attune_cmd`]. Loads config, builds the context,
1261/// acquires the workspace lock, invokes the library entry point.
1262async fn run_attune(
1263    workspace: &Path,
1264    mode: AttuneMode,
1265    target: Option<String>,
1266    apply: bool,
1267    record: bool,
1268) -> i32 {
1269    use djogi::config::DjogiConfig;
1270
1271    let config = match DjogiConfig::load_from_workspace(workspace) {
1272        Ok(c) => c,
1273        Err(e) => {
1274            eprintln!("djogi migrations attune: config load: {e}");
1275            return 1;
1276        }
1277    };
1278
1279    let mut ctx = match connect_and_check(&config.database.url).await {
1280        ContextOutcome::Ready(ctx) => ctx,
1281        ContextOutcome::UnsupportedVersion(e) => {
1282            crate::print_support_boundary_error("migrations attune", &e);
1283            return 2;
1284        }
1285        ContextOutcome::RuntimeError(msg) => {
1286            eprintln!("djogi migrations attune: pool: {msg}");
1287            return 1;
1288        }
1289    };
1290
1291    // All three modes acquire the workspace lock per the v3 file-lock
1292    // contract — even DiffOnly takes the lock so a concurrent compose
1293    // / apply cannot mutate the tree mid-scan.
1294    let lock_path = workspace.join(LOCK_FILE_NAME);
1295    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1296        Ok(g) => g,
1297        Err(e) => {
1298            eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
1299            return 1;
1300        }
1301    };
1302
1303    let req = AttuneRequest {
1304        workspace_root: workspace,
1305        database_url: &config.database.url,
1306        profile: &config.profile,
1307        // Codex umbrella U-2: thread `[database].dev_mode` to the
1308        // squash gate. Read-only modes (`DiffOnly`, `Record`) ignore
1309        // it; `Squash` mode refuses unless this is `true`.
1310        dev_mode: config.database.dev_mode,
1311        // Codex umbrella U-1: the operator-supplied target + the
1312        // `--apply` / `--record` gates flow through to the library
1313        // entry point. The library owns the resolution + parent-pointer
1314        // update; the CLI is just plumbing.
1315        target: target.as_deref(),
1316        apply,
1317        record,
1318        mode,
1319        _guard: &guard,
1320    };
1321    match attune(&mut ctx, req).await {
1322        Ok(report) => {
1323            if report.entries.is_empty() {
1324                println!("attune: no drift");
1325            } else {
1326                for entry in &report.entries {
1327                    let app_display = if entry.bucket.app.is_empty() {
1328                        "_global_"
1329                    } else {
1330                        entry.bucket.app.as_str()
1331                    };
1332                    println!(
1333                        "  {kind:<10}  {database}/{app}  {version}",
1334                        kind = entry.kind.as_str(),
1335                        database = entry.bucket.database,
1336                        app = app_display,
1337                        version = entry.version,
1338                    );
1339                }
1340            }
1341            // Surface structured diagnostics — today this carries the
1342            // B-3 LedgerTableMissing notice when DiffOnly runs on a
1343            // fresh database.
1344            for diag in &report.diagnostics {
1345                println!("  diagnostic: {diag}");
1346            }
1347            if let Some(sha) = &report.resolved_target {
1348                println!("resolved target: {sha}");
1349            }
1350            if let Some(squashed) = &report.squashed_to {
1351                println!("squashed to: {squashed}");
1352            }
1353            if report.published {
1354                println!("published to remote");
1355            }
1356            if report.parent_pointer_updated {
1357                println!("parent submodule pointer updated");
1358            }
1359            0
1360        }
1361        Err(e) => {
1362            eprintln!("djogi migrations attune: {e}");
1363            attune_error_exit_code(&e)
1364        }
1365    }
1366}
1367
1368/// Map an [`AttuneError`] variant onto the documented exit-code
1369/// matrix (`docs/spec/configuration.md` §14):
1370///
1371/// - Refusal variants → exit code `2` ("operator must intervene;
1372///   nothing happened"). Today every refusal flows through
1373///   [`AttuneError::Refused`]; the localhost gate, the dev-profile
1374///   gate, the missing-version refusal, and the ambiguous-version
1375///   refusal are all reachable through that variant.
1376/// - Runtime variants → exit code `1` ("we tried; something broke" —
1377///   filesystem scan, ledger query, SQL read/write/delete, git
1378///   publish). CI may safely retry these.
1379///
1380/// Pulled out as a free function so unit tests can pin every variant
1381/// without spinning a Tokio runtime. Operators rely on the 1-vs-2
1382/// distinction to tell "refused before any side effect" from "ran and
1383/// failed mid-flight".
1384fn attune_error_exit_code(err: &AttuneError) -> i32 {
1385    match err {
1386        AttuneError::Refused(_) => 2,
1387        AttuneError::FilesystemScanFailed { .. }
1388        | AttuneError::LedgerQueryFailed { .. }
1389        | AttuneError::SqlReadFailed { .. }
1390        | AttuneError::SqlWriteFailed { .. }
1391        | AttuneError::SqlDeleteFailed { .. }
1392        | AttuneError::GitPublishFailed { .. }
1393        | AttuneError::GitTargetResolveFailed { .. }
1394        | AttuneError::GitFetchFailed { .. }
1395        | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
1396    }
1397}
1398
1399/// `djogi migrations verify` entry point.
1400///
1401/// Read-only — does not acquire the workspace lock. Reads the live
1402/// Postgres catalog via [`djogi::context::DjogiContext`] and compares
1403/// against the projected schema from the descriptor inventory.
1404///
1405/// Exit codes: 0 on success (no error-level diagnostics), 1 on runtime
1406/// error (config / network / SQL / projection), 2 on refusal
1407/// (below PG 18).
1408pub fn verify_cmd(
1409    provider: &dyn DescriptorProvider,
1410    workspace: Option<PathBuf>,
1411    strict: bool,
1412) -> ExitCode {
1413    let workspace = resolve_workspace(workspace);
1414
1415    let runtime = match tokio::runtime::Builder::new_current_thread()
1416        .enable_all()
1417        .build()
1418    {
1419        Ok(r) => r,
1420        Err(e) => {
1421            eprintln!("djogi migrations verify: tokio runtime: {e}");
1422            return ExitCode::from(1);
1423        }
1424    };
1425
1426    let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
1427    ExitCode::from(exit as u8)
1428}
1429
1430/// Async body of [`verify_cmd`]. Returns the desired exit code.
1431///
1432/// Verify is multi-database aware: each `(database, app)` bucket is routed
1433/// to the pool for its `database` component via [`resolve_bucket_url`], and
1434/// the per-database context is connected lazily and cached so a database
1435/// with several app buckets connects once. The bucket set is the UNION of
1436/// the inventory projection and the on-disk snapshot tree, so an orphaned
1437/// snapshot (a removed app's snapshot still on disk) is verified and
1438/// surfaces drift rather than being silently skipped (FBB-2 / Class D).
1439///
1440/// Exit codes:
1441/// - `0` — every bucket verified with no error-severity diagnostic.
1442/// - `1` — at least one runtime failure (pool / snapshot / verify error)
1443///   or at least one bucket reported an error-severity diagnostic.
1444/// - `2` — the server is below the minimum supported Postgres version
1445///   (a server-global refusal: verify returns immediately).
1446async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
1447    use djogi::config::DjogiConfig;
1448
1449    // 0. Zero-descriptor refusal (§5.6 / REQ-370-8). `verify` refuses with
1450    //    the dual-cause diagnostic + exit 2 ONLY when there are NEITHER
1451    //    descriptors NOR on-disk snapshots — the genuinely unusable state
1452    //    (a standalone binary with nothing to verify against). When
1453    //    snapshots exist, verify DEGRADES to snapshot-only (the union below
1454    //    enumerates the disk buckets), so we must not refuse here.
1455    //
1456    //    Guard on `provider.models().is_empty()` rather than the projected
1457    //    `bucket_set`: projection always seeds the synthetic global bucket
1458    //    (`(main, "")`), so the bucket set is never empty and is the wrong
1459    //    signal for "no descriptors". This is the same guard the
1460    //    compose/schema/docs gates in `lib.rs` use.
1461    if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
1462        crate::print_zero_descriptor_diagnostic("migrations verify");
1463        return 2;
1464    }
1465
1466    // 1. Load config from workspace.
1467    let config = match DjogiConfig::load_from_workspace(workspace) {
1468        Ok(c) => c,
1469        Err(e) => {
1470            eprintln!("djogi migrations verify: config load: {e}");
1471            return 1;
1472        }
1473    };
1474
1475    // 2. Project schema from descriptor provider.
1476    let models = match project_from_provider(provider) {
1477        Ok(m) => m,
1478        Err(e) => {
1479            eprintln!("djogi migrations verify: projection error: {e}");
1480            return 1;
1481        }
1482    };
1483
1484    // 3. Build the bucket set as the UNION of the inventory projection and
1485    //    the on-disk snapshot tree (FBB-2 / Class D). An orphaned snapshot
1486    //    — a removed app whose snapshot still sits on disk — is absent from
1487    //    `models` but present on disk; without the union it would never be
1488    //    verified and out-of-band drift would go unreported.
1489    let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
1490        models.keys().cloned().collect();
1491    for bucket in discover_snapshot_buckets_on_disk(workspace) {
1492        bucket_set.insert(bucket);
1493    }
1494    // The zero-descriptor refusal (step 0) already returned for the only
1495    // state that yields an empty bucket set (no descriptors + no snapshots).
1496    // Projection always seeds the synthetic global bucket, so reaching here
1497    // with an empty set is impossible; if a future projection change ever
1498    // breaks that invariant, fail closed with the dual-cause refusal rather
1499    // than silently reporting success on a binary that verified nothing.
1500    if bucket_set.is_empty() {
1501        crate::print_zero_descriptor_diagnostic("migrations verify");
1502        return 2;
1503    }
1504
1505    // 4. Policy configuration for the --strict flag.
1506    let policy = djogi::config::PolicyConfig {
1507        strict_out_of_order: strict,
1508    };
1509
1510    // 5. Pre-compute the set of databases that have at least one INVENTORY
1511    //    bucket with non-empty models. Orphan-only databases (snapshots on
1512    //    disk but no registered models) are excluded — `unwrap_or(false)`
1513    //    treats a disk-only bucket as model-less. This gates D699 inside
1514    //    `verify_bucket`: an orphan-only database has no live tables to
1515    //    miss, so D601 is the actionable signal instead.
1516    let database_has_models: std::collections::HashSet<String> = bucket_set
1517        .iter()
1518        .filter(|b| {
1519            models
1520                .get(*b)
1521                .map(|s| !s.models.is_empty())
1522                .unwrap_or(false)
1523        })
1524        .map(|b| b.database.clone())
1525        .collect();
1526
1527    // 6. Per-database context cache + dedup sets. Contexts are connected
1528    //    lazily (only for databases that have a bucket needing a live read)
1529    //    and reused across that database's app buckets. `seen_ledger_databases`
1530    //    ensures the ledger-lifecycle diagnostics (D621/D622/D699) are
1531    //    emitted once per database, not once per app bucket.
1532    let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
1533        std::collections::BTreeMap::new();
1534    let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
1535    let mut exit_code: i32 = 0;
1536
1537    // 7. Verify each bucket.
1538    for bucket in &bucket_set {
1539        // a. Resolve the per-database URL.
1540        let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
1541            let bd = if bucket.app.is_empty() {
1542                "_global_"
1543            } else {
1544                &bucket.app
1545            };
1546            eprintln!(
1547                "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
1548                 check that config.database.url has a valid path component",
1549                bucket.database, bucket.database, bd
1550            );
1551            exit_code = 1;
1552            continue;
1553        };
1554
1555        // b. Connect (lazily, once per distinct database). PG < 18 is a
1556        //    server-global refusal — there is no point continuing to other
1557        //    buckets, so we return 2 immediately.
1558        if !contexts.contains_key(&bucket.database) {
1559            match connect_and_check(&url).await {
1560                ContextOutcome::Ready(ctx) => {
1561                    contexts.insert(bucket.database.clone(), ctx);
1562                }
1563                ContextOutcome::UnsupportedVersion(e) => {
1564                    crate::print_support_boundary_error("migrations verify", &e);
1565                    return 2;
1566                }
1567                ContextOutcome::RuntimeError(msg) => {
1568                    eprintln!(
1569                        "djogi migrations verify: pool for '{}': {msg}",
1570                        bucket.database
1571                    );
1572                    exit_code = 1;
1573                    continue;
1574                }
1575            }
1576        }
1577
1578        // c. Load the snapshot. A missing snapshot for a bucket that HAS
1579        //    registered models is a hard error (exit 1) — the operator must
1580        //    record a baseline; a missing snapshot for a model-less bucket
1581        //    is informational (BLOCK-4 / Class C).
1582        let snap_path = snapshot_path(workspace, bucket);
1583        let snapshot = match load_snapshot(&snap_path) {
1584            Ok(s) => s,
1585            Err(SnapshotError::Io { source, .. })
1586                if source.kind() == std::io::ErrorKind::NotFound =>
1587            {
1588                let bd = if bucket.app.is_empty() {
1589                    "_global_"
1590                } else {
1591                    &bucket.app
1592                };
1593                let has_models = models
1594                    .get(bucket)
1595                    .map(|s| !s.models.is_empty())
1596                    .unwrap_or(false);
1597                if has_models {
1598                    eprintln!(
1599                        "djogi migrations verify: {}/{} has registered models but no \
1600                         snapshot; run `djogi migrations compose` then \
1601                         `djogi migrations apply` to record a baseline",
1602                        bucket.database, bd
1603                    );
1604                    exit_code = 1;
1605                } else {
1606                    println!("No snapshot found for bucket {}/{}", bucket.database, bd);
1607                }
1608                continue;
1609            }
1610            Err(e) => {
1611                let bd = if bucket.app.is_empty() {
1612                    "_global_"
1613                } else {
1614                    &bucket.app
1615                };
1616                eprintln!(
1617                    "djogi migrations verify: load snapshot for {}/{}: {e}",
1618                    bucket.database, bd
1619                );
1620                exit_code = 1;
1621                continue;
1622            }
1623        };
1624
1625        // d. Compute ledger-emission flags. The ledger is shared per
1626        //    database; emit its lifecycle diagnostics once per database
1627        //    (the first bucket of each database that reaches this point),
1628        //    and only for databases that actually have registered models.
1629        let db_has_models = database_has_models.contains(&bucket.database);
1630        let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
1631
1632        // e. Run the bucket-scoped verify against the routed context.
1633        let ctx = contexts
1634            .get_mut(&bucket.database)
1635            .expect("context inserted above");
1636        let report = match djogi::migrate::verify_bucket(
1637            ctx,
1638            bucket,
1639            &snapshot,
1640            &policy,
1641            emit_ledger,
1642            db_has_models,
1643        )
1644        .await
1645        {
1646            Ok(r) => r,
1647            Err(e) => {
1648                let bd = if bucket.app.is_empty() {
1649                    "_global_"
1650                } else {
1651                    &bucket.app
1652                };
1653                eprintln!(
1654                    "djogi migrations verify: error for {}/{}: {e}",
1655                    bucket.database, bd
1656                );
1657                exit_code = 1;
1658                continue;
1659            }
1660        };
1661
1662        // f. Render and fold the bucket's error state into the exit code.
1663        for line in render_verify_report(&report, bucket) {
1664            println!("{line}");
1665        }
1666        if report.has_errors() {
1667            exit_code = 1;
1668        }
1669    }
1670
1671    exit_code
1672}
1673
1674/// Render a [`VerifyReport`] to a vector of output lines.
1675///
1676/// Format: one line per diagnostic with severity prefix, code, location,
1677/// and message. Summary line at the end. Output is deterministic because
1678/// `report.diagnostics` is already sorted by `(code, location)`.
1679///
1680/// Returns the lines instead of printing directly so the rendering is unit-
1681/// testable (FBB-1); the caller iterates the returned vector and prints each
1682/// line. Blank separator lines are returned as empty strings.
1683fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
1684    let mut lines: Vec<String> = Vec::new();
1685
1686    let app_display = if bucket.app.is_empty() {
1687        "_global_"
1688    } else {
1689        &bucket.app
1690    };
1691    lines.push(format!(
1692        "djogi migrations verify — {}/{}",
1693        bucket.database, app_display
1694    ));
1695    lines.push("──────────────────────────────────────────".to_string());
1696
1697    match (
1698        &report.latest_applied_version,
1699        report.applied_count,
1700        report.unfinished_count,
1701    ) {
1702        (Some(version), applied, 0) => {
1703            lines.push(format!("Ledger: {applied} applied, latest {version}"));
1704        }
1705        (Some(version), applied, unfinished) => {
1706            lines.push(format!(
1707                "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
1708            ));
1709        }
1710        (None, 0, 0) => {
1711            lines.push("Ledger: empty (no migrations applied yet)".to_string());
1712        }
1713        _ => {}
1714    }
1715    lines.push(String::new());
1716
1717    if report.diagnostics.is_empty() {
1718        lines.push("No drift detected. Schema is consistent.".to_string());
1719    } else {
1720        for d in &report.diagnostics {
1721            let severity = match d.severity {
1722                VerifySeverity::Info => "INFO",
1723                VerifySeverity::Warning => "WARN",
1724                VerifySeverity::Error => "ERROR",
1725            };
1726            let location = d.location.as_deref().unwrap_or("-");
1727            lines.push(format!(
1728                "[{severity}] {code} ({loc}): {msg}",
1729                severity = severity,
1730                code = d.code,
1731                loc = location,
1732                msg = d.message
1733            ));
1734        }
1735    }
1736
1737    let errors = report
1738        .diagnostics
1739        .iter()
1740        .filter(|d| d.severity == VerifySeverity::Error)
1741        .count();
1742    let warnings = report
1743        .diagnostics
1744        .iter()
1745        .filter(|d| d.severity == VerifySeverity::Warning)
1746        .count();
1747    let infos = report
1748        .diagnostics
1749        .iter()
1750        .filter(|d| d.severity == VerifySeverity::Info)
1751        .count();
1752
1753    if errors > 0 {
1754        lines.push(String::new());
1755        lines.push(format!(
1756            "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
1757        ));
1758    } else if warnings > 0 {
1759        lines.push(String::new());
1760        lines.push(format!(
1761            "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
1762        ));
1763    } else {
1764        lines.push(String::new());
1765        lines.push(format!("Result: PASSED ({infos} info(s))"));
1766    }
1767
1768    lines
1769}
1770
1771// ── repair subcommand dispatch ────────────────────────────────────────────
1772
1773impl From<PartialApplyResolutionCli> for PartialApplyResolution {
1774    fn from(cli: PartialApplyResolutionCli) -> Self {
1775        match cli {
1776            PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
1777            PartialApplyResolutionCli::Faked => Self::MarkFaked,
1778            PartialApplyResolutionCli::Applied => Self::MarkApplied,
1779        }
1780    }
1781}
1782
1783/// `djogi migrations repair <subcommand>` entry point.
1784///
1785/// Routes each subcommand to its glue function. The glue functions own
1786/// the runtime / config / pool / lock / report-render lifecycle; this
1787/// router only destructures the parsed clap variant.
1788pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
1789    match command {
1790        RepairSubcommand::ChecksumDrift {
1791            version,
1792            app,
1793            database,
1794            checksum_up,
1795            checksum_down,
1796            workspace,
1797        } => repair_checksum_drift_cmd(
1798            &version,
1799            app.as_deref(),
1800            database.as_deref(),
1801            checksum_up.as_deref(),
1802            checksum_down.as_deref(),
1803            workspace,
1804        ),
1805        RepairSubcommand::PartialApply {
1806            version,
1807            resolution,
1808            note,
1809            app,
1810            database,
1811            workspace,
1812        } => repair_partial_apply_cmd(
1813            &version,
1814            resolution.into(),
1815            &note,
1816            app.as_deref(),
1817            database.as_deref(),
1818            workspace,
1819        ),
1820        RepairSubcommand::ResumePartial {
1821            version,
1822            app,
1823            database,
1824            workspace,
1825        } => repair_resume_partial_apply_cmd(
1826            &version,
1827            app.as_deref(),
1828            database.as_deref(),
1829            workspace,
1830        ),
1831        RepairSubcommand::SnapshotRebuild {
1832            app,
1833            database,
1834            snapshot_path,
1835            workspace,
1836        } => repair_snapshot_rebuild_cmd(
1837            app.as_deref(),
1838            database.as_deref(),
1839            snapshot_path.as_deref(),
1840            workspace,
1841        ),
1842    }
1843}
1844
1845/// Render a [`RepairReport`] to stdout. Shared across all four repair
1846/// glue functions so the operator sees a consistent action / ledger /
1847/// snapshot summary regardless of which repair ran.
1848fn render_repair_report(report: &RepairReport) {
1849    for action in &report.actions_taken {
1850        println!("  {action}");
1851    }
1852    if !report.ledger_changes.is_empty() {
1853        println!("Ledger changes:");
1854        for lc in &report.ledger_changes {
1855            println!(
1856                "  {} | {} | {} -> {}",
1857                lc.version, lc.column, lc.before, lc.after,
1858            );
1859        }
1860    }
1861    if !report.snapshot_changes.is_empty() {
1862        println!("Snapshot changes:");
1863        for sc in &report.snapshot_changes {
1864            println!("  {} | {}", sc.path.display(), sc.description);
1865        }
1866    }
1867}
1868
1869/// Map a [`RepairError`] onto the CLI exit-code contract.
1870///
1871/// `RepairError` is NOT `#[non_exhaustive]`, so this match is
1872/// **exhaustive with NO `_ =>` wildcard** by deliberate design: a future
1873/// variant breaks compilation here, forcing a conscious exit-code
1874/// classification rather than silently bucketing an unclassified error.
1875///
1876/// Classification rule — when a new variant is added, classify it the
1877/// same way:
1878/// - **Exit 1 (retryable):** variants wrapping a transient I/O /
1879///   connection / pool / SQL failure (a `source: DjogiError`, snapshot
1880///   filesystem I/O, or advisory-lock contention). A retry may succeed.
1881/// - **Exit 2 (refusal):** structural refusals and ledger-logic guards
1882///   that require operator intervention. A blind retry hits the same
1883///   refusal.
1884fn repair_error_exit_code(err: &RepairError) -> i32 {
1885    match err {
1886        // ── Exit 1: transient I/O / connection / pool / SQL failures.
1887        // These wrap a DjogiError (network, connection, query) or a
1888        // filesystem error and may succeed on retry.
1889        RepairError::LedgerIo { .. }                  // ledger DB I/O
1890        | RepairError::SnapshotIo { .. }              // snapshot filesystem I/O
1891        | RepairError::AdvisoryLockFailed { .. }      // lock held by a concurrent runner; retry after it releases
1892        | RepairError::AdvisoryLockQueryFailed { .. } // pg_try_advisory_lock query itself errored
1893        | RepairError::PinnedSessionCheckoutFailed { .. } // could not check out a pinned session from the pool
1894        | RepairError::ResumeStepFailed { .. }        // a replayed statement failed; partial state recorded, retryable
1895        | RepairError::ResumeProgressAckFailed { .. } // step committed but the progress ack write failed; retryable
1896        => 1,
1897
1898        // ── Exit 2: refusals and structural / ledger-logic guards.
1899        // The operator must investigate and intervene; a blind retry
1900        // would hit the same refusal.
1901        RepairError::VersionNotFound { .. }
1902        | RepairError::InsufficientConfirmation
1903        | RepairError::InvalidChecksum { .. }
1904        | RepairError::InvalidResolution { .. }
1905        | RepairError::BucketAppMismatch { .. }
1906        | RepairError::PlanVersionMismatch { .. }
1907        | RepairError::PlanChecksumMismatch { .. }
1908        | RepairError::LeafIdentityMismatch { .. }
1909        | RepairError::NothingToResume { .. }
1910        | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
1911        | RepairError::SuppliedSnapshotDiverges { .. }
1912        | RepairError::AdvisoryUnlockReturnedFalse { .. } // session-pinning correctness failure — not a blind retry
1913        | RepairError::ResumePlanShapeMismatch { .. }
1914        | RepairError::ReplayPlanShapeMismatch { .. }
1915        => 2,
1916    }
1917}
1918
1919/// Resolve the database name for bucket construction. Uses the explicit
1920/// `--database` flag if provided, otherwise defaults to `"main"` (the
1921/// global database name — see [`djogi::apps::AppDescriptor::GLOBAL_DATABASE`]).
1922///
1923/// `_config` is threaded so this single helper can grow a config-driven
1924/// default database (should `DjogiConfig` gain one) without changing
1925/// every call site.
1926fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
1927    database.unwrap_or("main").to_string()
1928}
1929
1930/// Compute the `V1:`-prefixed checksum of a committed up SQL file on disk,
1931/// using the canonical fragment-level domain (strips the composed-file
1932/// header and label comments, matching what compose stores in the ledger).
1933///
1934/// The naive whole-file checksum is WRONG here: compose stores checksums
1935/// computed over the [`djogi::migrate::OperationSql`] fragments only,
1936/// without the rendered file's `-- Djogi composed migration — up` header
1937/// or the per-statement label comment lines. Recomputing over the full
1938/// file content would never match the ledger value, so the drift repair
1939/// would write a checksum that immediately re-drifts. Delegating to
1940/// [`djogi::migrate::compute_committed_sql_checksum`] keeps the CLI's
1941/// recompute path in the same domain as compose.
1942///
1943/// Returns the underlying [`std::io::Error`] unchanged so the caller can
1944/// surface a missing/unreadable up file as a retryable I/O error.
1945fn compute_checksum_up_from_disk(
1946    workspace: &Path,
1947    bucket: &djogi::migrate::BucketKey,
1948    version: &str,
1949) -> std::io::Result<String> {
1950    let path =
1951        djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
1952    let sql = std::fs::read_to_string(&path)?;
1953    Ok(djogi::migrate::compute_committed_sql_checksum(
1954        &sql,
1955        djogi::migrate::ResetSqlSide::Up,
1956    ))
1957}
1958
1959/// Compute the canonical checksum of a committed down SQL file on disk,
1960/// using the same fragment-level domain as compose (see
1961/// [`compute_checksum_up_from_disk`] for why the whole-file checksum is
1962/// wrong).
1963///
1964/// Returns `Ok(None)` when the file is absent
1965/// ([`std::io::ErrorKind::NotFound`]) or when the file contains only SQL
1966/// comments — both map onto compose's `NULL` `checksum_down` sentinel for
1967/// comment-only down files. Returns `Err` for any other I/O failure so a
1968/// retry after the file is restored can succeed.
1969fn compute_checksum_down_from_disk(
1970    workspace: &Path,
1971    bucket: &djogi::migrate::BucketKey,
1972    version: &str,
1973) -> std::io::Result<Option<String>> {
1974    let path =
1975        djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
1976    let sql = match std::fs::read_to_string(&path) {
1977        Ok(s) => s,
1978        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1979        Err(e) => return Err(e),
1980    };
1981    Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
1982}
1983
1984/// `djogi migrations repair checksum-drift` entry point.
1985///
1986/// Updates the `checksum_up` / `checksum_down` columns of an
1987/// already-applied ledger row after its committed SQL was edited. When
1988/// `--checksum-up` / `--checksum-down` are omitted, the checksums are
1989/// recomputed from the committed files on disk (a missing down file is a
1990/// no-op; any other read error aborts with exit 1).
1991pub fn repair_checksum_drift_cmd(
1992    version: &str,
1993    app: Option<&str>,
1994    database: Option<&str>,
1995    checksum_up: Option<&str>,
1996    checksum_down: Option<&str>,
1997    workspace: Option<PathBuf>,
1998) -> ExitCode {
1999    let workspace = resolve_workspace(workspace);
2000    let runtime = match tokio::runtime::Builder::new_current_thread()
2001        .enable_all()
2002        .build()
2003    {
2004        Ok(r) => r,
2005        Err(e) => {
2006            eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
2007            return ExitCode::from(1);
2008        }
2009    };
2010    let exit = runtime.block_on(async {
2011        run_repair_checksum_drift(
2012            &workspace,
2013            version,
2014            app,
2015            database,
2016            checksum_up,
2017            checksum_down,
2018        )
2019        .await
2020    });
2021    ExitCode::from(exit as u8)
2022}
2023
2024/// Async body of [`repair_checksum_drift_cmd`]. Returns the desired exit code.
2025async fn run_repair_checksum_drift(
2026    workspace: &Path,
2027    version: &str,
2028    app: Option<&str>,
2029    database: Option<&str>,
2030    checksum_up: Option<&str>,
2031    checksum_down: Option<&str>,
2032) -> i32 {
2033    use djogi::config::DjogiConfig;
2034
2035    let config = match DjogiConfig::load_from_workspace(workspace) {
2036        Ok(c) => c,
2037        Err(e) => {
2038            eprintln!("djogi migrations repair checksum-drift: config load: {e}");
2039            return 1;
2040        }
2041    };
2042
2043    // Resolve the per-database URL BEFORE connecting: `--database
2044    // crud_log` / `event_log` operate on a different bucket's ledger than
2045    // the app DB, so connecting to `config.database.url` first would
2046    // silently mutate the wrong database.
2047    let db_name = resolve_database(database, &config);
2048    let url = match resolve_bucket_url(&config.database, &db_name) {
2049        Some(u) => u,
2050        None => {
2051            eprintln!(
2052                "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
2053            );
2054            return 2;
2055        }
2056    };
2057
2058    let mut ctx = match connect_and_check(&url).await {
2059        ContextOutcome::Ready(ctx) => ctx,
2060        ContextOutcome::UnsupportedVersion(e) => {
2061            crate::print_support_boundary_error("migrations repair checksum-drift", &e);
2062            return 2;
2063        }
2064        ContextOutcome::RuntimeError(msg) => {
2065            eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
2066            return 1;
2067        }
2068    };
2069
2070    let lock_path = workspace.join(LOCK_FILE_NAME);
2071    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2072        Ok(g) => g,
2073        Err(e) => {
2074            eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
2075            return 1;
2076        }
2077    };
2078
2079    let app_label = app.unwrap_or("");
2080    let bucket = BucketKey {
2081        database: db_name,
2082        app: app_label.to_string(),
2083    };
2084
2085    let new_checksum_up = match checksum_up {
2086        Some(c) => c.to_string(),
2087        None => {
2088            // Auto-compute from the committed up SQL file on disk. A
2089            // missing or unreadable up file is an environment I/O error,
2090            // not an operator-facing refusal — exit 1 (same class as the
2091            // down file's non-NotFound branch below), so a retry after
2092            // the file is restored can succeed.
2093            match compute_checksum_up_from_disk(workspace, &bucket, version) {
2094                Ok(cs) => cs,
2095                Err(e) => {
2096                    eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
2097                    return 1;
2098                }
2099            }
2100        }
2101    };
2102
2103    let resolved_checksum_down = match checksum_down {
2104        Some(c) => Some(c.to_string()),
2105        None => {
2106            // Auto-compute from the down file; a missing down file (or a
2107            // comment-only down file) is a no-op (no down checksum), other
2108            // read errors surface. NotFound is folded into `Ok(None)` by
2109            // `compute_checksum_down_from_disk`.
2110            match compute_checksum_down_from_disk(workspace, &bucket, version) {
2111                Ok(cs_opt) => cs_opt,
2112                Err(e) => {
2113                    eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
2114                    return 1;
2115                }
2116            }
2117        }
2118    };
2119
2120    match repair_checksum_drift(
2121        &mut ctx,
2122        &guard,
2123        &bucket,
2124        version,
2125        &new_checksum_up,
2126        resolved_checksum_down.as_deref(),
2127        RepairConfirmation::OperatorAcknowledged,
2128    )
2129    .await
2130    {
2131        Ok(report) => {
2132            render_repair_report(&report);
2133            0
2134        }
2135        Err(e) => {
2136            eprintln!("djogi migrations repair checksum-drift: {e}");
2137            repair_error_exit_code(&e)
2138        }
2139    }
2140}
2141
2142/// `djogi migrations repair partial-apply` entry point.
2143///
2144/// Resolves a partial-apply ledger row by rewriting its status to
2145/// `rolled_back`, `faked`, or `applied`. No SQL executes — only the
2146/// ledger row is mutated.
2147pub fn repair_partial_apply_cmd(
2148    version: &str,
2149    resolution: PartialApplyResolution,
2150    note: &str,
2151    app: Option<&str>,
2152    database: Option<&str>,
2153    workspace: Option<PathBuf>,
2154) -> ExitCode {
2155    let workspace = resolve_workspace(workspace);
2156    let runtime = match tokio::runtime::Builder::new_current_thread()
2157        .enable_all()
2158        .build()
2159    {
2160        Ok(r) => r,
2161        Err(e) => {
2162            eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
2163            return ExitCode::from(1);
2164        }
2165    };
2166    let exit = runtime.block_on(async {
2167        run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
2168    });
2169    ExitCode::from(exit as u8)
2170}
2171
2172/// Async body of [`repair_partial_apply_cmd`]. Returns the desired exit code.
2173async fn run_repair_partial_apply(
2174    workspace: &Path,
2175    version: &str,
2176    resolution: PartialApplyResolution,
2177    note: &str,
2178    app: Option<&str>,
2179    database: Option<&str>,
2180) -> i32 {
2181    use djogi::config::DjogiConfig;
2182
2183    let config = match DjogiConfig::load_from_workspace(workspace) {
2184        Ok(c) => c,
2185        Err(e) => {
2186            eprintln!("djogi migrations repair partial-apply: config load: {e}");
2187            return 1;
2188        }
2189    };
2190
2191    // Resolve the per-database URL BEFORE connecting: `--database
2192    // crud_log` / `event_log` operate on a different bucket's ledger than
2193    // the app DB, so connecting to `config.database.url` first would
2194    // silently mutate the wrong database.
2195    let db_name = resolve_database(database, &config);
2196    let url = match resolve_bucket_url(&config.database, &db_name) {
2197        Some(u) => u,
2198        None => {
2199            eprintln!(
2200                "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
2201            );
2202            return 2;
2203        }
2204    };
2205
2206    let mut ctx = match connect_and_check(&url).await {
2207        ContextOutcome::Ready(ctx) => ctx,
2208        ContextOutcome::UnsupportedVersion(e) => {
2209            crate::print_support_boundary_error("migrations repair partial-apply", &e);
2210            return 2;
2211        }
2212        ContextOutcome::RuntimeError(msg) => {
2213            eprintln!("djogi migrations repair partial-apply: pool: {msg}");
2214            return 1;
2215        }
2216    };
2217
2218    let lock_path = workspace.join(LOCK_FILE_NAME);
2219    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2220        Ok(g) => g,
2221        Err(e) => {
2222            eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
2223            return 1;
2224        }
2225    };
2226
2227    let app_label = app.unwrap_or("");
2228    let bucket = BucketKey {
2229        database: db_name,
2230        app: app_label.to_string(),
2231    };
2232
2233    match repair_partial_apply(
2234        &mut ctx,
2235        &guard,
2236        &bucket,
2237        version,
2238        resolution,
2239        note,
2240        RepairConfirmation::OperatorAcknowledged,
2241    )
2242    .await
2243    {
2244        Ok(report) => {
2245            render_repair_report(&report);
2246            0
2247        }
2248        Err(e) => {
2249            eprintln!("djogi migrations repair partial-apply: {e}");
2250            repair_error_exit_code(&e)
2251        }
2252    }
2253}
2254
2255/// `djogi migrations repair resume-partial` entry point.
2256///
2257/// Resumes an interrupted non-transactional apply by loading the
2258/// committed `<version>.plan.json` and replaying its remaining steps.
2259/// Loads the committed plan directly (no CLI-level checksum pre-gate);
2260/// the library validates the plan against the ledger row internally.
2261pub fn repair_resume_partial_apply_cmd(
2262    version: &str,
2263    app: Option<&str>,
2264    database: Option<&str>,
2265    workspace: Option<PathBuf>,
2266) -> ExitCode {
2267    let workspace = resolve_workspace(workspace);
2268    let runtime = match tokio::runtime::Builder::new_current_thread()
2269        .enable_all()
2270        .build()
2271    {
2272        Ok(r) => r,
2273        Err(e) => {
2274            eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
2275            return ExitCode::from(1);
2276        }
2277    };
2278    let exit = runtime
2279        .block_on(async { run_repair_resume_partial(&workspace, version, app, database).await });
2280    ExitCode::from(exit as u8)
2281}
2282
2283/// Async body of [`repair_resume_partial_apply_cmd`]. Returns the desired exit code.
2284async fn run_repair_resume_partial(
2285    workspace: &Path,
2286    version: &str,
2287    app: Option<&str>,
2288    database: Option<&str>,
2289) -> i32 {
2290    use djogi::config::DjogiConfig;
2291
2292    let config = match DjogiConfig::load_from_workspace(workspace) {
2293        Ok(c) => c,
2294        Err(e) => {
2295            eprintln!("djogi migrations repair resume-partial: config load: {e}");
2296            return 1;
2297        }
2298    };
2299
2300    // Resolve the per-database URL BEFORE connecting: `--database
2301    // crud_log` / `event_log` operate on a different bucket's ledger than
2302    // the app DB, so connecting to `config.database.url` first would
2303    // silently mutate the wrong database.
2304    let db_name = resolve_database(database, &config);
2305    let url = match resolve_bucket_url(&config.database, &db_name) {
2306        Some(u) => u,
2307        None => {
2308            eprintln!(
2309                "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
2310            );
2311            return 2;
2312        }
2313    };
2314
2315    let mut ctx = match connect_and_check(&url).await {
2316        ContextOutcome::Ready(ctx) => ctx,
2317        ContextOutcome::UnsupportedVersion(e) => {
2318            crate::print_support_boundary_error("migrations repair resume-partial", &e);
2319            return 2;
2320        }
2321        ContextOutcome::RuntimeError(msg) => {
2322            eprintln!("djogi migrations repair resume-partial: pool: {msg}");
2323            return 1;
2324        }
2325    };
2326
2327    let lock_path = workspace.join(LOCK_FILE_NAME);
2328    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2329        Ok(g) => g,
2330        Err(e) => {
2331            eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
2332            return 1;
2333        }
2334    };
2335
2336    let app_label = app.unwrap_or("");
2337    let bucket = BucketKey {
2338        database: db_name,
2339        app: app_label.to_string(),
2340    };
2341
2342    // Load the committed replay plan directly from disk — no CLI-level
2343    // checksum pre-gate, because repair_resume_partial_apply validates
2344    // plan↔ledger checksums itself. Synthesizing a whole-file checksum
2345    // here would not match the per-statement-fragment checksums stored
2346    // in the plan JSON.
2347    let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
2348        Ok(p) => p,
2349        Err(e) => {
2350            eprintln!("djogi migrations repair resume-partial: load plan: {e}");
2351            return 2;
2352        }
2353    };
2354
2355    match repair_resume_partial_apply(
2356        &mut ctx,
2357        &guard,
2358        version,
2359        &plan,
2360        RepairConfirmation::OperatorAcknowledged,
2361    )
2362    .await
2363    {
2364        Ok(report) => {
2365            render_repair_report(&report);
2366            0
2367        }
2368        Err(e) => {
2369            eprintln!("djogi migrations repair resume-partial: {e}");
2370            repair_error_exit_code(&e)
2371        }
2372    }
2373}
2374
2375/// Load the committed `<version>.plan.json` for `resume-partial` without
2376/// the CLI-level checksum pre-gate.
2377///
2378/// [`repair_resume_partial_apply`] validates the plan against the ledger
2379/// row internally (`PlanVersionMismatch` / `PlanChecksumMismatch`), so
2380/// re-gating here with a hand-rolled whole-file checksum would be both
2381/// wrong (the plan stores per-statement-fragment checksums) and
2382/// redundant. This helper therefore deliberately does NOT reuse
2383/// [`load_replay_plan_from_disk`] (a pending-apply helper that DOES
2384/// checksum-gate) — it reuses only that function's `CliReplay*`
2385/// deserialization + segment-conversion shape.
2386///
2387/// Returns a human-readable error string on a missing/unparseable plan
2388/// file or a format-version mismatch. A missing plan file maps to exit 2
2389/// at the call site (the committed plan is a precondition of resume).
2390fn load_committed_plan_for_resume(
2391    workspace: &Path,
2392    bucket: &djogi::migrate::BucketKey,
2393    version: &str,
2394) -> Result<djogi::migrate::MigrationPlan, String> {
2395    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
2396    let plan_path = bucket_dir.join(format!("{version}.plan.json"));
2397    let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
2398    let stored: CliReplayPlan = serde_json::from_slice(&bytes)
2399        .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
2400    if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
2401        return Err(format!(
2402            "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
2403            plan_path.display(),
2404            stored.format_version,
2405        ));
2406    }
2407    Ok(djogi::migrate::MigrationPlan {
2408        bucket: bucket.clone(),
2409        classification: stored.classification.into(),
2410        segments: stored
2411            .segments
2412            .into_iter()
2413            .map(|seg| djogi::migrate::Segment {
2414                kind: seg.kind.into(),
2415                statements: seg
2416                    .statements
2417                    .into_iter()
2418                    .map(|stmt| djogi::migrate::OperationSql {
2419                        label: stmt.label,
2420                        up: stmt.up,
2421                        down: String::new(),
2422                        lossy: None,
2423                    })
2424                    .collect(),
2425            })
2426            .collect(),
2427    })
2428}
2429
2430/// `djogi migrations repair snapshot-rebuild` entry point.
2431///
2432/// Rebuilds a bucket's schema snapshot by walking the ledger and
2433/// re-projecting from live database state. When `--snapshot-path` is
2434/// omitted, the path is derived from
2435/// `migrations/<database>/<app>/schema_snapshot.json`.
2436pub fn repair_snapshot_rebuild_cmd(
2437    app: Option<&str>,
2438    database: Option<&str>,
2439    snapshot_path: Option<&Path>,
2440    workspace: Option<PathBuf>,
2441) -> ExitCode {
2442    let workspace = resolve_workspace(workspace);
2443    let runtime = match tokio::runtime::Builder::new_current_thread()
2444        .enable_all()
2445        .build()
2446    {
2447        Ok(r) => r,
2448        Err(e) => {
2449            eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
2450            return ExitCode::from(1);
2451        }
2452    };
2453    let exit = runtime.block_on(async {
2454        run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
2455    });
2456    ExitCode::from(exit as u8)
2457}
2458
2459/// Async body of [`repair_snapshot_rebuild_cmd`]. Returns the desired exit code.
2460async fn run_repair_snapshot_rebuild(
2461    workspace: &Path,
2462    app: Option<&str>,
2463    database: Option<&str>,
2464    snapshot_path: Option<&Path>,
2465) -> i32 {
2466    use djogi::config::DjogiConfig;
2467
2468    let config = match DjogiConfig::load_from_workspace(workspace) {
2469        Ok(c) => c,
2470        Err(e) => {
2471            eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
2472            return 1;
2473        }
2474    };
2475
2476    // Resolve the per-database URL BEFORE connecting: `--database
2477    // crud_log` / `event_log` operate on a different bucket's ledger than
2478    // the app DB, so connecting to `config.database.url` first would
2479    // silently rebuild the snapshot from the wrong database.
2480    let db_name = resolve_database(database, &config);
2481    let url = match resolve_bucket_url(&config.database, &db_name) {
2482        Some(u) => u,
2483        None => {
2484            eprintln!(
2485                "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
2486            );
2487            return 2;
2488        }
2489    };
2490
2491    let mut ctx = match connect_and_check(&url).await {
2492        ContextOutcome::Ready(ctx) => ctx,
2493        ContextOutcome::UnsupportedVersion(e) => {
2494            crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
2495            return 2;
2496        }
2497        ContextOutcome::RuntimeError(msg) => {
2498            eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
2499            return 1;
2500        }
2501    };
2502
2503    let lock_path = workspace.join(LOCK_FILE_NAME);
2504    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2505        Ok(g) => g,
2506        Err(e) => {
2507            eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
2508            return 1;
2509        }
2510    };
2511
2512    let app_label = app.unwrap_or("");
2513    let bucket = BucketKey {
2514        database: db_name,
2515        app: app_label.to_string(),
2516    };
2517
2518    let snap_path = match snapshot_path {
2519        Some(p) => p.to_path_buf(),
2520        None => reconstruct_snapshot_path(workspace, &bucket),
2521    };
2522
2523    match repair_snapshot_rebuild(
2524        &mut ctx,
2525        &guard,
2526        &bucket,
2527        &snap_path,
2528        RepairConfirmation::OperatorAcknowledged,
2529    )
2530    .await
2531    {
2532        Ok(report) => {
2533            render_repair_report(&report);
2534            0
2535        }
2536        Err(e) => {
2537            eprintln!("djogi migrations repair snapshot-rebuild: {e}");
2538            repair_error_exit_code(&e)
2539        }
2540    }
2541}
2542
2543// ── baseline command ──────────────────────────────────────────────────────
2544
2545/// `djogi migrations baseline` entry point.
2546///
2547/// Establishes a baseline ledger row + snapshot for an existing
2548/// database adopted under Djogi's migration ledger. The schema already
2549/// exists, so `compose` + `apply` cannot run against the populated
2550/// database without a starting point; baseline projects the live
2551/// catalog into a single `baseline` ledger row (no SQL runs against
2552/// user tables) and persists the projected snapshot as the canonical
2553/// baseline so future migrations diff against the real DB state.
2554///
2555/// `--reason` is required and must be non-empty — it is recorded in the
2556/// ledger row's `partial_apply_note` for the audit trail. An empty
2557/// reason is a refusal (exit 2) caught before any DB work.
2558///
2559/// Exit codes: `0` success, `1` runtime error (config / pool / projection
2560/// failure), `2` refusal (empty `--reason`, unresolvable database URL,
2561/// duplicate version collision, snapshot-persist failure after ledger
2562/// insert, session-pinning correctness failure, or below PG 18).
2563pub fn baseline_cmd(
2564    version: &str,
2565    description: &str,
2566    reason: &str,
2567    app: Option<&str>,
2568    database: Option<&str>,
2569    workspace: Option<PathBuf>,
2570) -> ExitCode {
2571    // Validate --reason before any expensive work, mirroring the
2572    // `apply --fake --reason` empty-reason gate. The library's
2573    // baseline_plan does not itself reject an empty reason (it records
2574    // whatever string it is handed), so the CLI owns this guard.
2575    if reason.trim().is_empty() {
2576        eprintln!(
2577            "djogi migrations baseline: --reason must not be empty; \
2578             supply a non-empty reason why this baseline is being established \
2579             (e.g. 'schema pre-exists from prior tooling'). \
2580             This is recorded in the ledger audit trail."
2581        );
2582        return ExitCode::from(2);
2583    }
2584
2585    let workspace = resolve_workspace(workspace);
2586    let runtime = match tokio::runtime::Builder::new_current_thread()
2587        .enable_all()
2588        .build()
2589    {
2590        Ok(r) => r,
2591        Err(e) => {
2592            eprintln!("djogi migrations baseline: tokio runtime: {e}");
2593            return ExitCode::from(1);
2594        }
2595    };
2596    let exit = runtime.block_on(async {
2597        run_baseline(&workspace, version, description, reason, app, database).await
2598    });
2599    ExitCode::from(exit as u8)
2600}
2601
2602/// Async body of [`baseline_cmd`]. Returns the desired exit code.
2603///
2604/// Resolves the per-database URL BEFORE connecting (a `--database
2605/// crud_log` / `event_log` baseline targets a different bucket's ledger
2606/// than the app DB), connects + runs the PG-version preflight via
2607/// [`connect_and_check`], acquires the workspace file lock, then drives
2608/// [`baseline_plan`]. The runner projects the live schema itself and
2609/// computes the baseline checksum from that projection, so the
2610/// `RunnerCtx` is constructed with `snapshot: None` (B-11 requires the
2611/// caller NOT supply a snapshot) and an empty `checksum_up` (the
2612/// baseline path never reads it).
2613async fn run_baseline(
2614    workspace: &Path,
2615    version: &str,
2616    description: &str,
2617    reason: &str,
2618    app: Option<&str>,
2619    database: Option<&str>,
2620) -> i32 {
2621    use djogi::config::DjogiConfig;
2622
2623    let config = match DjogiConfig::load_from_workspace(workspace) {
2624        Ok(c) => c,
2625        Err(e) => {
2626            eprintln!("djogi migrations baseline: config load: {e}");
2627            return 1;
2628        }
2629    };
2630
2631    // Resolve the per-database URL BEFORE connecting: `--database
2632    // crud_log` / `event_log` operate on a different bucket's ledger
2633    // than the app DB, so connecting to `config.database.url` first
2634    // would silently baseline the wrong database.
2635    let db_name = resolve_database(database, &config);
2636    let url = match resolve_bucket_url(&config.database, &db_name) {
2637        Some(u) => u,
2638        None => {
2639            eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
2640            return 2;
2641        }
2642    };
2643
2644    let mut ctx = match connect_and_check(&url).await {
2645        ContextOutcome::Ready(ctx) => ctx,
2646        ContextOutcome::UnsupportedVersion(e) => {
2647            crate::print_support_boundary_error("migrations baseline", &e);
2648            return 2;
2649        }
2650        ContextOutcome::RuntimeError(msg) => {
2651            eprintln!("djogi migrations baseline: pool: {msg}");
2652            return 1;
2653        }
2654    };
2655
2656    let lock_path = workspace.join(LOCK_FILE_NAME);
2657    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2658        Ok(g) => g,
2659        Err(e) => {
2660            eprintln!("djogi migrations baseline: workspace lock: {e}");
2661            return 1;
2662        }
2663    };
2664
2665    let app_label = app.unwrap_or("");
2666    let bucket = BucketKey {
2667        database: db_name,
2668        app: app_label.to_string(),
2669    };
2670
2671    let runner_ctx = RunnerCtx {
2672        bucket: bucket.clone(),
2673        version: version.to_string(),
2674        description: description.to_string(),
2675        // baseline_plan computes checksum_up from the live projection;
2676        // this field is not read on the baseline code path.
2677        checksum_up: String::new(),
2678        checksum_down: None,
2679        // baseline_plan refuses a caller-supplied snapshot (B-11) — it
2680        // projects the live DB itself. Leave this None; the projection
2681        // is persisted to `snapshot_path` below.
2682        snapshot: None,
2683        snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
2684        // MigrateConfig does not derive Clone; construct from fields
2685        // (same pattern as apply_one_pending).
2686        config: djogi::config::MigrateConfig {
2687            concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
2688            strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
2689            pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
2690            pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
2691        },
2692        out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
2693        audit_pool: match djogi::migrate::resolve_audit_url(&config) {
2694            Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
2695            Err(_) => None,
2696        },
2697    };
2698
2699    match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
2700        Ok(report) => {
2701            println!(
2702                "djogi migrations baseline: established baseline `{}` \
2703                 (ledger_id={}) in {:.1}s",
2704                version,
2705                report.ledger_id,
2706                report.execution_time_ms as f64 / 1000.0
2707            );
2708            0
2709        }
2710        Err(e) => {
2711            eprintln!("djogi migrations baseline: {e}");
2712            baseline_error_exit_code(&e)
2713        }
2714    }
2715}
2716
2717/// Map a [`RunnerError`] produced by [`baseline_plan`] onto the CLI
2718/// exit-code contract.
2719///
2720/// The flat [`runner_error_exit_code`] (always `1`) is wrong for
2721/// baseline: a duplicate-version collision is a refusal the operator
2722/// must resolve by choosing a new version, and a blind retry hits the
2723/// same collision — that must surface as exit `2`, matching the
2724/// `migrations apply` doc-contract ("re-running reports
2725/// `VersionAlreadyApplied` (exit 2)") and the `repair` family's
2726/// [`repair_error_exit_code`] convention.
2727///
2728/// `RunnerError` is `#[non_exhaustive]`, so the wildcard arm is
2729/// load-bearing: any variant NOT named below defaults to exit `1`
2730/// (transient — a retry may succeed). That is the safe default for the
2731/// I/O- and connection-shaped variants the baseline path can hit
2732/// (projection failure, ledger bootstrap / write / query failure,
2733/// snapshot persist failure, pinned-session checkout failure,
2734/// advisory-lock contention). Only the genuine refusals are pulled out
2735/// into the exit-`2` arm.
2736fn baseline_error_exit_code(err: &RunnerError) -> i32 {
2737    match err {
2738        // ── Exit 2: refusals — the operator must intervene; a blind
2739        // retry hits the same condition.
2740        //
2741        // - A duplicate version (terminal or non-terminal) means the
2742        //   chosen baseline version is already taken; pick another.
2743        // - A caller-supplied snapshot is a programming error in the
2744        //   wiring (the CLI always passes `snapshot: None`), surfaced
2745        //   as a structural refusal rather than a retryable fault.
2746        // - An out-of-order rejection is a policy refusal identical to
2747        //   the apply path's.
2748        // - AdvisoryUnlockReturnedFalse is a session-pinning correctness
2749        //   failure (PG returned false for pg_advisory_unlock); it is not
2750        //   transient — matches the repair family's exit-2 treatment.
2751        // - SnapshotPersistFailed in the baseline path is a post-ledger
2752        //   failure: baseline_inner inserts the terminal ledger row BEFORE
2753        //   writing the snapshot. A retry with the same version therefore
2754        //   hits VersionAlreadyApplied (exit 2) before it can write the
2755        //   snapshot. Exit 1 (retryable) would give false hope; exit 2
2756        //   signals that operator intervention is needed (run
2757        //   `repair snapshot-rebuild` or choose a new version).
2758        RunnerError::VersionAlreadyApplied { .. }
2759        | RunnerError::VersionCollisionNonTerminal { .. }
2760        | RunnerError::BaselineSnapshotShouldNotBeProvided
2761        | RunnerError::AdvisoryUnlockReturnedFalse { .. }
2762        | RunnerError::SnapshotPersistFailed { .. }
2763        | RunnerError::OutOfOrderRejected { .. } => 2,
2764        // ── Exit 1: everything else (transient I/O / connection / SQL /
2765        // projection failures). `#[non_exhaustive]` makes this wildcard
2766        // mandatory; new transient-shaped variants inherit the retryable
2767        // default.
2768        _ => 1,
2769    }
2770}
2771
2772#[cfg(test)]
2773mod tests {
2774    use super::*;
2775    use std::fs;
2776    use std::sync::atomic::{AtomicUsize, Ordering};
2777
2778    fn temp_workspace(tag: &str) -> std::path::PathBuf {
2779        static COUNTER: AtomicUsize = AtomicUsize::new(0);
2780        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2781        let nanos = std::time::SystemTime::now()
2782            .duration_since(std::time::UNIX_EPOCH)
2783            .unwrap()
2784            .as_nanos();
2785        let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
2786        fs::create_dir_all(&p).unwrap();
2787        p
2788    }
2789
2790    /// Codex B-1 — the CLI's bucket-discovery walk must include
2791    /// directories that exist on disk but are absent from the current
2792    /// model inventory (the renamed-from case).
2793    #[test]
2794    fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
2795        let work = temp_workspace("b1_discover");
2796        // Lay down a `migrations/main/billing/schema_snapshot.json`
2797        // — the OLD app's snapshot. The current model inventory
2798        // would NOT have this bucket because the app moved to
2799        // `invoicing` via `#[app(renamed_from = "billing")]`.
2800        let billing_dir = work.join("migrations/main/billing");
2801        fs::create_dir_all(&billing_dir).unwrap();
2802        fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
2803        // A second bucket — the global one for the same database —
2804        // exists too. Exercise the multi-bucket walk.
2805        let global_dir = work.join("migrations/main/_global_");
2806        fs::create_dir_all(&global_dir).unwrap();
2807        fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
2808        // A third on-disk directory WITHOUT a snapshot file — must
2809        // not be reported (we only union buckets that actually
2810        // shipped a snapshot).
2811        let no_snap_dir = work.join("migrations/main/empty_app");
2812        fs::create_dir_all(&no_snap_dir).unwrap();
2813
2814        let buckets = discover_snapshot_buckets_on_disk(&work);
2815        let labels: std::collections::BTreeSet<&str> =
2816            buckets.iter().map(|b| b.app.as_str()).collect();
2817        assert!(
2818            labels.contains("billing"),
2819            "must include the renamed-from bucket: {labels:?}"
2820        );
2821        assert!(
2822            labels.contains(""),
2823            "must include the global bucket: {labels:?}"
2824        );
2825        assert!(
2826            !labels.contains("empty_app"),
2827            "must not include directories without a snapshot: {labels:?}"
2828        );
2829        let _ = fs::remove_dir_all(&work);
2830    }
2831
2832    /// Codex A-1 — the resolved workspace flows into config loading.
2833    /// `load_from_workspace` must read `<workspace>/Djogi.toml` not
2834    /// the cwd's. We assert that by writing a custom config with a
2835    /// distinctive `database.url` and confirming the loader sees it.
2836    #[test]
2837    fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
2838        let work = temp_workspace("a1_workspace_config");
2839        let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
2840                    max_connections = 1\ndev_mode = false\n\
2841                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
2842        fs::write(work.join("Djogi.toml"), toml).unwrap();
2843        // Save and clear DATABASE_URL so the env override doesn't
2844        // mask the file value during this test.
2845        let prior = std::env::var("DATABASE_URL").ok();
2846        // SAFETY: tests run with --test-threads=1 per the project's
2847        // pre-commit policy, so concurrent env mutation is not a
2848        // concern in this configuration.
2849        unsafe { std::env::remove_var("DATABASE_URL") };
2850        let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
2851        assert_eq!(
2852            config.database.url,
2853            "postgres://discovered-by-workspace-flag/test"
2854        );
2855        assert_eq!(config.server.port, 1234);
2856        if let Some(v) = prior {
2857            unsafe { std::env::set_var("DATABASE_URL", v) };
2858        }
2859        let _ = fs::remove_dir_all(&work);
2860    }
2861
2862    /// Codex round-2 A-1 — env override precedence. A `DATABASE_URL`
2863    /// in the environment must beat any value in
2864    /// `<workspace>/Djogi.toml`, matching the security contract that
2865    /// secrets only live in env vars.
2866    #[test]
2867    fn a1_round2_env_override_beats_workspace_toml() {
2868        let work = temp_workspace("a1r2_env_override");
2869        let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
2870                    max_connections = 1\ndev_mode = false\n\
2871                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
2872        fs::write(work.join("Djogi.toml"), toml).unwrap();
2873        let prior = std::env::var("DATABASE_URL").ok();
2874        // SAFETY: --test-threads=1; no concurrent env mutation.
2875        unsafe { std::env::set_var("DATABASE_URL", "postgres://from-env/test") };
2876        let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
2877        assert_eq!(
2878            config.database.url, "postgres://from-env/test",
2879            "env DATABASE_URL must win over workspace Djogi.toml"
2880        );
2881        match prior {
2882            Some(v) => unsafe { std::env::set_var("DATABASE_URL", v) },
2883            None => unsafe { std::env::remove_var("DATABASE_URL") },
2884        }
2885        let _ = fs::remove_dir_all(&work);
2886    }
2887
2888    /// Codex round-2 B-1 — `compose_with_inputs` must consume the
2889    /// disk-discovered buckets, not just the inventory's. We set up a
2890    /// `migrations/main/billing/schema_snapshot.json` with a `widgets`
2891    /// table, pass an EMPTY models map (simulating "billing app was
2892    /// removed from the workspace"), set `allow_destructive = true`,
2893    /// and assert the resulting up SQL contains `DROP TABLE
2894    /// "widgets"`. If the disk-walk regressed and `compose_with_inputs`
2895    /// only loaded snapshots for inventory-known buckets, the differ
2896    /// would never see billing's snapshot and the compose would exit
2897    /// `NothingToCompose` (no DROP, no SQL written).
2898    ///
2899    /// This is the end-to-end pinning B-1 round-1 missed.
2900    #[test]
2901    fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
2902        use djogi::migrate::projection::BucketKey;
2903        use djogi::migrate::schema::{
2904            ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
2905        };
2906        use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
2907        use std::collections::BTreeMap;
2908
2909        let work = temp_workspace("b1r2_compose_uses_discovery");
2910
2911        // Build a billing-bucket snapshot with one `widgets` table
2912        // and write it to disk under `migrations/main/billing/`.
2913        let billing_bucket = BucketKey {
2914            database: "main".into(),
2915            app: "billing".into(),
2916        };
2917        let mut billing_snap = AppliedSchema {
2918            djogi_version: env!("CARGO_PKG_VERSION").to_string(),
2919            enums: BTreeMap::new(),
2920            format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
2921            generated_at: "2026-04-25T00:00:00Z".to_string(),
2922            indexes: Vec::new(),
2923            models: BTreeMap::new(),
2924            registered_apps: vec!["billing".to_string()],
2925        };
2926        billing_snap.models.insert(
2927            "widgets".to_string(),
2928            TableSchema {
2929                app: Some("billing".to_string()),
2930                columns: vec![ColumnSchema {
2931                    check: None,
2932                    comment: None,
2933                    default_sql: Some("heerid_next_desc()".to_string()),
2934                    foreign_key: None,
2935                    generated: None,
2936                    identity: None,
2937                    index_type: None,
2938                    indexed: false,
2939                    max_length: None,
2940                    name: "id".to_string(),
2941                    nullable: false,
2942                    on_delete: None,
2943                    outbox_exclude: false,
2944                    rationale: None,
2945                    relation_kind: None,
2946                    renamed_from: None,
2947                    sequence_within: None,
2948                    sql_type: "BIGINT".to_string(),
2949                    unique: false,
2950                    type_change_using: None,
2951                }],
2952                exclusion_constraints: Vec::new(),
2953                fts: None,
2954                is_through: false,
2955                moved_from_app: None,
2956                partition: None,
2957                primary_key: PrimaryKeySchema {
2958                    columns: vec!["id".to_string()],
2959                    kind: PkKindSchema::HeerIdRecencyBiased,
2960                },
2961                rationale: None,
2962                renamed_from: None,
2963                rls_enabled: false,
2964                table: "widgets".to_string(),
2965                table_comment: None,
2966                storage_params: None,
2967                tablespace: None,
2968                tenant_key: None,
2969            },
2970        );
2971        let snap_path = snapshot_path(&work, &billing_bucket);
2972        save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
2973
2974        // EMPTY models — simulates the billing crate having been
2975        // removed from the workspace. Without the disk-walk this
2976        // bucket would never reach the differ.
2977        let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
2978        let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
2979
2980        let exit = compose_with_inputs(
2981            &work,
2982            "drop billing remnant",
2983            true,  // allow_destructive — billing's snapshot will produce DROP ops
2984            false, // force_overwrite
2985            &empty_models,
2986            &[AppLifecycle {
2987                label: "billing".to_string(),
2988                database: "main".to_string(),
2989                renamed_from: None,
2990                tombstone: true, // intentional removal channel
2991            }],
2992            now,
2993            None, // pk_flip_join_table_option — no flip in this test
2994        );
2995        assert_eq!(exit, ExitCode::from(0), "compose must succeed");
2996
2997        // The composed up SQL must carry DROP TABLE for billing's
2998        // widgets — that is the whole point. Find the file and check.
2999        let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
3000        let mut up_path: Option<PathBuf> = None;
3001        for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
3002            let n = entry.file_name().to_string_lossy().to_string();
3003            // Up file pattern: starts with "V", ends with ".sdjql", does
3004            // NOT contain ".down.".
3005            if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
3006                up_path = Some(entry.path());
3007                break;
3008            }
3009        }
3010        let up_path = up_path.expect("compose must have written an up SQL file");
3011        let up_sql = fs::read_to_string(&up_path).unwrap();
3012        assert!(
3013            up_sql.contains("DROP TABLE \"widgets\""),
3014            "compose must have seen the disk snapshot and emitted DROP TABLE — \
3015             this proves discover_snapshot_buckets_on_disk reached the differ. \
3016             SQL: {up_sql}"
3017        );
3018        let _ = fs::remove_dir_all(&work);
3019    }
3020
3021    /// Codex round-2 A-1 — `status_cmd` invokes its tokio runtime and
3022    /// fails fast on a malformed `Djogi.toml`. We don't need a live
3023    /// Postgres for this assertion — the test is that the workspace
3024    /// path is threaded through the loader and TOML errors surface
3025    /// promptly. (The earlier `a1_load_from_workspace_reads_path_specific_djogi_toml`
3026    /// covers the well-formed case; this is the malformed-input
3027    /// path-threading proof.)
3028    #[test]
3029    fn a1_round2_status_cmd_threads_workspace_to_config() {
3030        let work = temp_workspace("a1r2_status_workspace");
3031        // Write a deliberately malformed TOML so config load fails.
3032        // If the workspace path wasn't threaded, status_cmd would
3033        // try the cwd's Djogi.toml (typically absent) and silently
3034        // fall through to defaults, giving a different error code.
3035        fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
3036        let exit = status_cmd(Some(work.clone()));
3037        assert_eq!(
3038            exit,
3039            ExitCode::from(1),
3040            "malformed workspace Djogi.toml must surface as config load error"
3041        );
3042        let _ = fs::remove_dir_all(&work);
3043    }
3044
3045    // ── Codex umbrella U-3: AttuneError → exit code matrix ───────────────
3046
3047    /// Every `AttuneError::Refused(_)` variant must map to exit code `2`
3048    /// per `docs/spec/configuration.md` §14. The pre-fix implementation
3049    /// flattened every error to `1`, so an operator running attune in CI
3050    /// could not distinguish "policy gate refused before any side effect"
3051    /// from "ran half a step and failed mid-flight". Codex umbrella U-3
3052    /// flagged this as a blocker.
3053    #[test]
3054    fn u3_attune_refusal_variants_map_to_exit_code_two() {
3055        use djogi::migrate::AttuneRefusal;
3056        let cases = [
3057            AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
3058                database_url: "postgres://prod.example.com/main".to_string(),
3059            }),
3060            AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
3061                profile: "production".to_string(),
3062            }),
3063            // Codex umbrella U-2 — dev_mode and DJOGI_ENV gates added
3064            // in the same fixup chain. Both are `AttuneError::Refused(_)`
3065            // so they share the exit-code-2 mapping.
3066            AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
3067            AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
3068                env_value: "production".to_string(),
3069            }),
3070            AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
3071                version: "V20260101000000__missing".to_string(),
3072            }),
3073            AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
3074                version: "V20260101000000__shared".to_string(),
3075                buckets: vec!["main/users".to_string(), "main/billing".to_string()],
3076            }),
3077        ];
3078        for err in &cases {
3079            assert_eq!(
3080                attune_error_exit_code(err),
3081                2,
3082                "refusal variant must map to exit 2: {err}"
3083            );
3084        }
3085    }
3086
3087    /// Every runtime `AttuneError` variant must map to exit code `1`
3088    /// per `docs/spec/configuration.md` §14. CI may safely retry runtime
3089    /// failures; a refusal (exit `2`) signals "operator must intervene"
3090    /// and retrying without operator action would just refuse again.
3091    #[test]
3092    fn u3_attune_runtime_variants_map_to_exit_code_one() {
3093        let cases = [
3094            AttuneError::FilesystemScanFailed {
3095                source: std::io::Error::other("disk full"),
3096            },
3097            AttuneError::SqlReadFailed {
3098                path: PathBuf::from("/tmp/x.sdjql"),
3099                source: std::io::Error::other("permission denied"),
3100            },
3101            AttuneError::SqlWriteFailed {
3102                path: PathBuf::from("/tmp/x.sdjql"),
3103                source: std::io::Error::other("read-only fs"),
3104            },
3105            AttuneError::SqlDeleteFailed {
3106                path: PathBuf::from("/tmp/x.sdjql"),
3107                source: std::io::Error::other("not found"),
3108            },
3109            AttuneError::GitPublishFailed {
3110                stderr: "fatal: refusing to push".to_string(),
3111                status_code: Some(128),
3112            },
3113        ];
3114        for err in &cases {
3115            assert_eq!(
3116                attune_error_exit_code(err),
3117                1,
3118                "runtime variant must map to exit 1: {err}"
3119            );
3120        }
3121    }
3122
3123    // ── issue #354: baseline exit-code mapping ──────────────────────────
3124
3125    /// The refusal-class `RunnerError` variants the baseline path can
3126    /// `baseline_cmd` validates the `--reason` guard before any DB
3127    /// work. An empty or whitespace-only reason must return exit 2
3128    /// without touching the filesystem or network — the guard fires
3129    /// on the CLI-owned string before the tokio runtime is even built.
3130    #[test]
3131    fn baseline_empty_reason_exits_code_2() {
3132        let result = baseline_cmd(
3133            "V00000000000000__baseline",
3134            "description",
3135            "",
3136            None,
3137            None,
3138            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3139        );
3140        assert_eq!(
3141            result,
3142            ExitCode::from(2),
3143            "empty --reason must exit 2 before any DB work"
3144        );
3145    }
3146
3147    #[test]
3148    fn baseline_whitespace_reason_exits_code_2() {
3149        let result = baseline_cmd(
3150            "V00000000000000__baseline",
3151            "description",
3152            "   ",
3153            None,
3154            None,
3155            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3156        );
3157        assert_eq!(
3158            result,
3159            ExitCode::from(2),
3160            "whitespace-only --reason must exit 2 before any DB work"
3161        );
3162    }
3163
3164    /// surface must map to exit `2` — a blind retry would hit the same
3165    /// condition, so CI must treat them as "operator must intervene"
3166    /// rather than retryable. A duplicate baseline version (terminal or
3167    /// non-terminal), a wiring bug that supplies a snapshot, and an
3168    /// out-of-order rejection are all refusals.
3169    #[test]
3170    fn baseline_refusal_variants_map_to_exit_code_two() {
3171        let cases = [
3172            RunnerError::VersionAlreadyApplied {
3173                version: "V00000000000000__baseline".to_string(),
3174                applied_at: None,
3175            },
3176            RunnerError::VersionCollisionNonTerminal {
3177                version: "V00000000000000__baseline".to_string(),
3178                status: LedgerStatus::Pending,
3179                run_id: 1,
3180            },
3181            RunnerError::BaselineSnapshotShouldNotBeProvided,
3182            RunnerError::AdvisoryUnlockReturnedFalse {
3183                bucket: BucketKey {
3184                    database: "main".to_string(),
3185                    app: String::new(),
3186                },
3187                key: 0x0102_0304_0506_0708,
3188            },
3189            RunnerError::OutOfOrderRejected {
3190                version: "V00000000000000__baseline".to_string(),
3191                conflicting_version: "V20260101000000__later".to_string(),
3192                conflicting_applied_at: None,
3193            },
3194        ];
3195        for err in &cases {
3196            assert_eq!(
3197                baseline_error_exit_code(err),
3198                2,
3199                "baseline refusal variant must map to exit 2: {err}"
3200            );
3201        }
3202    }
3203
3204    /// Transient `RunnerError` variants reachable from the baseline path
3205    /// must map to exit `1` (retryable). The `#[non_exhaustive]`
3206    /// wildcard arm guarantees any unnamed variant also lands on `1`;
3207    /// these representative cases pin the projection / ledger / snapshot
3208    /// failure shapes the baseline runner can actually emit.
3209    #[test]
3210    fn baseline_transient_variants_map_to_exit_code_one() {
3211        use djogi::error::{DbError, DjogiError};
3212        let cases = [
3213            RunnerError::LedgerBootstrapFailed {
3214                source: DjogiError::Db(DbError::other("create table failed")),
3215            },
3216            RunnerError::LedgerWriteFailed {
3217                version: "V00000000000000__baseline".to_string(),
3218                source: DjogiError::Db(DbError::other("insert failed")),
3219            },
3220            RunnerError::PinnedSessionCheckoutFailed {
3221                source: DjogiError::Db(DbError::other("pool exhausted")),
3222            },
3223            RunnerError::AdvisoryLockFailed {
3224                bucket: BucketKey {
3225                    database: "main".to_string(),
3226                    app: String::new(),
3227                },
3228                key: 0x0102_0304_0506_0708,
3229                attempts: 3,
3230            },
3231        ];
3232        for err in &cases {
3233            assert_eq!(
3234                baseline_error_exit_code(err),
3235                1,
3236                "baseline transient variant must map to exit 1: {err}"
3237            );
3238        }
3239    }
3240
3241    // ── REQ-326: --fake / --reason validation tests ─────────────────────
3242
3243    /// REQ-326-5: --fake without --reason must exit with code 2.
3244    #[test]
3245    fn fake_without_reason_exits_code_2() {
3246        let result = apply_cmd(
3247            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3248            true,
3249            None,
3250        );
3251        assert_eq!(
3252            result,
3253            ExitCode::from(2),
3254            "--fake without --reason must exit 2"
3255        );
3256    }
3257
3258    /// REQ-326-5: --fake with blank reason must exit with code 2.
3259    #[test]
3260    fn fake_with_empty_reason_exits_code_2() {
3261        let result = apply_cmd(
3262            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3263            true,
3264            Some(String::new()),
3265        );
3266        assert_eq!(
3267            result,
3268            ExitCode::from(2),
3269            "--fake with empty reason must exit 2"
3270        );
3271    }
3272
3273    /// REQ-326-5: --fake with whitespace-only reason must exit with code 2.
3274    #[test]
3275    fn fake_with_whitespace_reason_exits_code_2() {
3276        let result = apply_cmd(
3277            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3278            true,
3279            Some("   ".to_string()),
3280        );
3281        assert_eq!(
3282            result,
3283            ExitCode::from(2),
3284            "--fake with whitespace reason must exit 2"
3285        );
3286    }
3287
3288    /// --reason without --fake is accepted (silently ignored).
3289    #[test]
3290    fn reason_without_fake_is_accepted() {
3291        // This should NOT exit 2; it will proceed to config load which
3292        // may fail on nonexistent workspace, but the --reason flag itself
3293        // is accepted. We verify the function does not early-exit with code 2.
3294        let result = apply_cmd(
3295            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3296            false, // NOT fake
3297            Some("test reason".to_string()),
3298        );
3299        // Should be 1 (config error) not 2 (refusal)
3300        assert_ne!(
3301            result,
3302            ExitCode::from(2),
3303            "--reason without --fake should not refuse"
3304        );
3305    }
3306
3307    // ── render_verify_report (FBB-1) ─────────────────────────────────────
3308    //
3309    // `render_verify_report` returns `Vec<String>` so the rendering is
3310    // assertable without capturing stdout. Each test pins the exact lines
3311    // the operator sees for one report shape.
3312
3313    /// Build a bucket for render tests.
3314    fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
3315        djogi::migrate::BucketKey {
3316            database: database.to_string(),
3317            app: app.to_string(),
3318        }
3319    }
3320
3321    /// Construct a [`VerifyDiagnostic`] tersely for render tests.
3322    fn diag(
3323        code: &str,
3324        severity: djogi::migrate::VerifySeverity,
3325        message: &str,
3326        location: Option<&str>,
3327    ) -> djogi::migrate::VerifyDiagnostic {
3328        djogi::migrate::VerifyDiagnostic {
3329            code: code.to_string(),
3330            severity,
3331            message: message.to_string(),
3332            location: location.map(str::to_string),
3333        }
3334    }
3335
3336    #[test]
3337    fn render_verify_report_clean_output() {
3338        use djogi::migrate::VerifyReport;
3339
3340        let report = VerifyReport {
3341            diagnostics: vec![],
3342            latest_applied_version: Some("001_initial".to_string()),
3343            applied_count: 3,
3344            unfinished_count: 0,
3345        };
3346        let bucket = render_bucket("main", "");
3347
3348        let lines = render_verify_report(&report, &bucket);
3349
3350        assert!(
3351            lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
3352            "missing ledger line; got {lines:?}"
3353        );
3354        assert!(
3355            lines.contains(&"No drift detected. Schema is consistent.".to_string()),
3356            "missing clean line; got {lines:?}"
3357        );
3358        assert!(
3359            lines.iter().any(|l| l.contains("Result: PASSED")),
3360            "missing PASSED result; got {lines:?}"
3361        );
3362        assert!(
3363            !lines.iter().any(|l| l.contains("FAILED")),
3364            "clean report must not say FAILED; got {lines:?}"
3365        );
3366    }
3367
3368    #[test]
3369    fn render_verify_report_with_errors() {
3370        use djogi::migrate::{VerifyReport, VerifySeverity};
3371
3372        // Diagnostics are pre-sorted by `(code, location)` exactly as the
3373        // library returns them — render does not re-sort.
3374        let report = VerifyReport {
3375            diagnostics: vec![
3376                diag(
3377                    "D601",
3378                    VerifySeverity::Error,
3379                    "Snapshot table missing from live DB",
3380                    Some("users"),
3381                ),
3382                diag(
3383                    "D611",
3384                    VerifySeverity::Warning,
3385                    "Live index not present in snapshot",
3386                    Some("idx_posts_created"),
3387                ),
3388            ],
3389            latest_applied_version: Some("V20260501000000__add_users".to_string()),
3390            applied_count: 2,
3391            unfinished_count: 0,
3392        };
3393        let bucket = render_bucket("main", "myapp");
3394
3395        assert!(report.has_errors());
3396        let lines = render_verify_report(&report, &bucket);
3397
3398        assert!(
3399            lines
3400                .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
3401            "missing D601 line; got {lines:?}"
3402        );
3403        assert!(
3404            lines.contains(
3405                &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
3406            ),
3407            "missing D611 line; got {lines:?}"
3408        );
3409        assert!(
3410            lines.iter().any(|l| l.contains("Result: FAILED")),
3411            "error report must say FAILED; got {lines:?}"
3412        );
3413    }
3414
3415    #[test]
3416    fn render_verify_report_header_shows_global_and_named_app() {
3417        use djogi::migrate::VerifyReport;
3418
3419        let report = VerifyReport {
3420            diagnostics: vec![],
3421            latest_applied_version: None,
3422            applied_count: 0,
3423            unfinished_count: 0,
3424        };
3425
3426        // Empty app label → `_global_` in the header.
3427        let global = render_verify_report(&report, &render_bucket("main", ""));
3428        assert_eq!(
3429            global.first().map(String::as_str),
3430            Some("djogi migrations verify — main/_global_"),
3431            "global bucket header; got {global:?}"
3432        );
3433
3434        // Named app → the label verbatim in the header.
3435        let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
3436        assert_eq!(
3437            named.first().map(String::as_str),
3438            Some("djogi migrations verify — crud_log/billing"),
3439            "named bucket header; got {named:?}"
3440        );
3441    }
3442
3443    #[test]
3444    fn render_verify_report_warning_only_passes_with_warnings() {
3445        use djogi::migrate::{VerifyReport, VerifySeverity};
3446
3447        let report = VerifyReport {
3448            diagnostics: vec![diag(
3449                "D606",
3450                VerifySeverity::Warning,
3451                "type differs (advisory)",
3452                Some("users.age"),
3453            )],
3454            latest_applied_version: Some("001_initial".to_string()),
3455            applied_count: 1,
3456            unfinished_count: 0,
3457        };
3458        let lines = render_verify_report(&report, &render_bucket("main", ""));
3459
3460        assert!(
3461            lines
3462                .iter()
3463                .any(|l| l.contains("Result: PASSED with warnings")),
3464            "warning-only must PASS with warnings; got {lines:?}"
3465        );
3466        assert!(
3467            !lines.iter().any(|l| l.contains("FAILED")),
3468            "warning-only must not say FAILED; got {lines:?}"
3469        );
3470    }
3471
3472    #[test]
3473    fn render_verify_report_empty_ledger_line() {
3474        use djogi::migrate::VerifyReport;
3475
3476        let report = VerifyReport {
3477            diagnostics: vec![],
3478            latest_applied_version: None,
3479            applied_count: 0,
3480            unfinished_count: 0,
3481        };
3482        let lines = render_verify_report(&report, &render_bucket("main", ""));
3483
3484        assert!(
3485            lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
3486            "empty ledger line; got {lines:?}"
3487        );
3488    }
3489
3490    #[test]
3491    fn render_verify_report_unfinished_ledger_line() {
3492        use djogi::migrate::VerifyReport;
3493
3494        let report = VerifyReport {
3495            diagnostics: vec![],
3496            latest_applied_version: Some("V20260501000000__add_users".to_string()),
3497            applied_count: 2,
3498            unfinished_count: 1,
3499        };
3500        let lines = render_verify_report(&report, &render_bucket("main", ""));
3501
3502        assert!(
3503            lines.contains(
3504                &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
3505            ),
3506            "unfinished ledger line; got {lines:?}"
3507        );
3508    }
3509
3510    #[test]
3511    fn render_verify_report_info_with_no_location_uses_dash() {
3512        use djogi::migrate::{VerifyReport, VerifySeverity};
3513
3514        // An Info diagnostic with `location: None` exercises the
3515        // `unwrap_or("-")` path, and the all-info summary line.
3516        let report = VerifyReport {
3517            diagnostics: vec![diag(
3518                "D692",
3519                VerifySeverity::Info,
3520                "enum type(s) declared; not yet checked",
3521                None,
3522            )],
3523            latest_applied_version: Some("001_initial".to_string()),
3524            applied_count: 1,
3525            unfinished_count: 0,
3526        };
3527        let lines = render_verify_report(&report, &render_bucket("main", ""));
3528
3529        assert!(
3530            lines.iter().any(|l| l.contains("(-)")),
3531            "location: None must render as (-); got {lines:?}"
3532        );
3533        assert!(
3534            lines.contains(&"Result: PASSED (1 info(s))".to_string()),
3535            "all-info summary; got {lines:?}"
3536        );
3537    }
3538
3539    // ── resolve_bucket_url (Class A) ─────────────────────────────────────
3540
3541    fn db_config(
3542        url: &str,
3543        crud_log_url: Option<&str>,
3544        event_log_url: Option<&str>,
3545    ) -> djogi::config::DatabaseConfig {
3546        djogi::config::DatabaseConfig {
3547            url: url.to_string(),
3548            crud_log_url: crud_log_url.map(str::to_string),
3549            event_log_url: event_log_url.map(str::to_string),
3550            max_connections: None,
3551            dev_mode: false,
3552        }
3553    }
3554
3555    #[test]
3556    fn resolve_bucket_url_main_uses_app_url_verbatim() {
3557        // "main" must use the app URL verbatim even when the path
3558        // component is not literally "main" — deriving would target a
3559        // database that does not exist.
3560        let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
3561        assert_eq!(
3562            resolve_bucket_url(&cfg, "main").as_deref(),
3563            Some("postgres://user:pass@localhost:5432/myapp_prod"),
3564            "main must return the app URL unchanged"
3565        );
3566    }
3567
3568    #[test]
3569    fn resolve_bucket_url_crud_log_prefers_explicit_url() {
3570        let cfg = db_config(
3571            "postgres://localhost/main",
3572            Some("postgres://localhost/explicit_crud"),
3573            None,
3574        );
3575        assert_eq!(
3576            resolve_bucket_url(&cfg, "crud_log").as_deref(),
3577            Some("postgres://localhost/explicit_crud"),
3578            "crud_log must prefer the explicit crud_log_url"
3579        );
3580    }
3581
3582    #[test]
3583    fn resolve_bucket_url_event_log_prefers_explicit_url() {
3584        let cfg = db_config(
3585            "postgres://localhost/main",
3586            None,
3587            Some("postgres://localhost/explicit_event"),
3588        );
3589        assert_eq!(
3590            resolve_bucket_url(&cfg, "event_log").as_deref(),
3591            Some("postgres://localhost/explicit_event"),
3592            "event_log must prefer the explicit event_log_url"
3593        );
3594    }
3595
3596    #[test]
3597    fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
3598        // An empty explicit URL is treated as absent — derive from the app
3599        // URL's path component instead.
3600        let cfg = db_config("postgres://localhost/main", Some(""), Some("   "));
3601        // crud_log: empty string → derive.
3602        assert_eq!(
3603            resolve_bucket_url(&cfg, "crud_log").as_deref(),
3604            Some("postgres://localhost/crud_log"),
3605            "empty crud_log_url must fall back to derived"
3606        );
3607        // event_log: whitespace is NOT empty, so it is used verbatim — the
3608        // emptiness check is a strict `is_empty`, matching the spec.
3609        assert_eq!(
3610            resolve_bucket_url(&cfg, "event_log").as_deref(),
3611            Some("   "),
3612            "non-empty (whitespace) event_log_url is used verbatim"
3613        );
3614    }
3615
3616    #[test]
3617    fn resolve_bucket_url_other_database_derives_from_app_url() {
3618        let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
3619        assert_eq!(
3620            resolve_bucket_url(&cfg, "analytics").as_deref(),
3621            Some("postgres://user:pass@localhost:5432/analytics"),
3622            "an arbitrary database name derives by path splice"
3623        );
3624    }
3625
3626    #[test]
3627    fn resolve_bucket_url_pathless_url_returns_none() {
3628        // A URL with no recognisable path component cannot be derived.
3629        let cfg = db_config("postgres://localhost", None, None);
3630        assert_eq!(
3631            resolve_bucket_url(&cfg, "crud_log"),
3632            None,
3633            "pathless URL must yield None for a derived database"
3634        );
3635    }
3636
3637    #[test]
3638    fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
3639        // "main" short-circuits before derivation, so even a pathless URL
3640        // returns it verbatim — the app pool is the operator's to define.
3641        let cfg = db_config("postgres://localhost", None, None);
3642        assert_eq!(
3643            resolve_bucket_url(&cfg, "main").as_deref(),
3644            Some("postgres://localhost"),
3645            "main returns the app URL verbatim regardless of path"
3646        );
3647    }
3648}