Skip to main content

djogi_cli/
migrations.rs

1//! `djogi migrations` subcommand glue
2//! Two leaves: `compose` and `status`. Both flow through the public
3//! `djogi::migrate` API. Compose acquires the workspace file lock for
4//! the duration of the call; status is read-only and does not.
5//! The CLI surface here is intentionally thin — all the real logic
6//! lives in the library so integration tests can exercise it without
7//! spawning subprocesses.
8
9use std::path::{Path, PathBuf};
10use std::process::ExitCode;
11
12use djogi::apps::AppRegistry;
13use djogi::migrate::{
14    AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
15    DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
16    RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
17    VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
18    compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
19    repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
20};
21
22// Re-export for the apply command's ledger state machine.
23use djogi::migrate::LedgerStatus;
24
25// CLI-side enums declared at the crate root (`main.rs` is the binary's
26// root module — there is no `mod main`), reached here as `crate::*`.
27use crate::{PartialApplyResolutionCli, RepairSubcommand};
28
29// ── Replay plan deserialization ──────────────────────────────────────────
30
31/// Local mirror of `StoredReplayPlan` (pub(crate) in the library).
32/// The committed replay plan JSON written by `compose` at
33/// `migrations/<database>/<app>/<version>.plan.json`. This struct
34/// allows the CLI to parse it and construct a proper [`MigrationPlan`]
35/// with correct segment structure and checksums.
36#[derive(Debug, Clone, serde::Deserialize)]
37struct CliReplayPlan {
38    format_version: String,
39    checksum_up: String,
40    checksum_down: Option<String>,
41    classification: CliClassification,
42    segments: Vec<CliReplaySegment>,
43}
44
45#[derive(Debug, Clone, serde::Deserialize)]
46#[serde(tag = "kind", rename_all = "snake_case")]
47enum CliClassification {
48    NoOp,
49    Additive,
50    Reversible,
51    Destructive,
52    Lossy,
53    Unsupported {
54        reason: String,
55    },
56    PkTypeFlip {
57        co_destructive: bool,
58        co_lossy: bool,
59    },
60}
61
62#[derive(Debug, Clone, serde::Deserialize)]
63struct CliReplaySegment {
64    kind: CliSegmentKind,
65    statements: Vec<CliReplayStatement>,
66}
67
68#[derive(Debug, Clone, serde::Deserialize)]
69#[serde(rename_all = "snake_case")]
70enum CliSegmentKind {
71    Transactional,
72    NonTransactional,
73    MetadataOnly,
74}
75
76#[derive(Debug, Clone, serde::Deserialize)]
77struct CliReplayStatement {
78    label: String,
79    up: String,
80}
81
82/// Expected format version for the committed replay plan JSON.
83const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
84
85/// Load the committed replay plan from disk and convert to a
86/// [`djogi::migrate::MigrationPlan`]. Returns `(plan, checksum_up, checksum_down)`.
87/// Falls back to reading the up/down SQL files and constructing a
88/// single-segment transactional plan when the replay plan JSON is
89/// absent or invalid. This mirrors the reset.rs fallback path.
90fn load_replay_plan_from_disk(
91    workspace: &Path,
92    bucket: &djogi::migrate::BucketKey,
93    version: &str,
94    pending_checksum_up: &str,
95    pending_checksum_down: Option<&str>,
96) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
97    // Try to load the committed replay plan JSON first.
98    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
99    let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
100
101    if let Ok(bytes) = std::fs::read(&replay_plan_path) {
102        let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
103            Ok(s) => s,
104            Err(e) => {
105                return Err(ApplyReplayPlanError::Parse {
106                    path: replay_plan_path.clone(),
107                    source: e.to_string(),
108                });
109            }
110        };
111
112        if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
113            return Err(ApplyReplayPlanError::FormatVersion {
114                found: stored.format_version,
115                path: replay_plan_path.clone(),
116            });
117        }
118
119        // Verify checksums match the pending plan.
120        if stored.checksum_up != pending_checksum_up
121            || stored.checksum_down.as_deref() != pending_checksum_down
122        {
123            return Err(ApplyReplayPlanError::ChecksumMismatch);
124        }
125
126        let plan = djogi::migrate::MigrationPlan {
127            bucket: bucket.clone(),
128            classification: stored.classification.into(),
129            segments: stored
130                .segments
131                .into_iter()
132                .map(|seg| djogi::migrate::Segment {
133                    kind: seg.kind.into(),
134                    statements: seg
135                        .statements
136                        .into_iter()
137                        .map(|stmt| djogi::migrate::OperationSql {
138                            label: stmt.label,
139                            up: stmt.up,
140                            down: String::new(),
141                            lossy: None,
142                        })
143                        .collect(),
144                })
145                .collect(),
146        };
147
148        return Ok((plan, stored.checksum_up, stored.checksum_down));
149    }
150
151    // Fallback: read SQL files and construct single-segment plan.
152    let up_filename = djogi::migrate::up_filename(version);
153    let down_filename = djogi::migrate::down_filename(version);
154    let up_path = bucket_dir.join(&up_filename);
155    let down_path = bucket_dir.join(&down_filename);
156
157    let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
158        path: up_path.clone(),
159        source: e.to_string(),
160    })?;
161
162    let down_sql = match std::fs::read_to_string(&down_path) {
163        Ok(sql) => sql,
164        Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
165        Err(e) => {
166            return Err(ApplyReplayPlanError::SqlRead {
167                path: down_path.clone(),
168                source: e.to_string(),
169            });
170        }
171    };
172
173    // Compute checksum for the single-segment fallback. The runner
174    // recomputes from the plan's SQL fragments and verifies against
175    // what we provide in RunnerCtx, so they must match.
176    let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
177
178    // Build a single-transactional-segment plan. This is correct for
179    // most migrations — only CONCURRENTLY indexes require non-tx
180    // segments, and those always have a replay plan JSON.
181    let plan = djogi::migrate::MigrationPlan {
182        bucket: bucket.clone(),
183        classification: djogi::migrate::Classification::Additive,
184        segments: vec![djogi::migrate::Segment {
185            kind: djogi::migrate::SegmentKind::Transactional,
186            statements: vec![djogi::migrate::OperationSql {
187                label: format!("replay {version}"),
188                up: up_sql,
189                down: down_sql,
190                lossy: None,
191            }],
192        }],
193    };
194
195    Ok((plan, computed_checksum_up, None))
196}
197
198/// Errors from [`load_replay_plan_from_disk`].
199#[derive(Debug)]
200enum ApplyReplayPlanError {
201    Parse { path: PathBuf, source: String },
202    FormatVersion { found: String, path: PathBuf },
203    ChecksumMismatch,
204    SqlRead { path: PathBuf, source: String },
205}
206
207impl std::fmt::Display for ApplyReplayPlanError {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        match self {
210            Self::Parse { path, source } => {
211                write!(f, "parse replay plan {}: {source}", path.display())
212            }
213            Self::FormatVersion { found, path } => write!(
214                f,
215                "replay plan format version mismatch in {}: expected {}, found {}",
216                path.display(),
217                CLI_REPLAY_PLAN_FORMAT_VERSION,
218                found
219            ),
220            Self::ChecksumMismatch => {
221                write!(f, "checksum mismatch between pending JSON and replay plan")
222            }
223            Self::SqlRead { path, source } => {
224                write!(f, "read SQL file {}: {source}", path.display())
225            }
226        }
227    }
228}
229
230impl std::error::Error for ApplyReplayPlanError {}
231
232// ── Type conversions from CLI-local types to library types ────────────────
233
234impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
235    fn from(kind: CliSegmentKind) -> Self {
236        match kind {
237            CliSegmentKind::Transactional => Self::Transactional,
238            CliSegmentKind::NonTransactional => Self::NonTransactional,
239            CliSegmentKind::MetadataOnly => Self::MetadataOnly,
240        }
241    }
242}
243
244impl From<CliClassification> for djogi::migrate::Classification {
245    fn from(classification: CliClassification) -> Self {
246        match classification {
247            CliClassification::NoOp => Self::NoOp,
248            CliClassification::Additive => Self::Additive,
249            CliClassification::Reversible => Self::Reversible,
250            CliClassification::Destructive => Self::Destructive,
251            CliClassification::Lossy => Self::Lossy,
252            CliClassification::Unsupported { reason } => Self::Unsupported { reason },
253            CliClassification::PkTypeFlip {
254                co_destructive,
255                co_lossy,
256            } => Self::PkTypeFlip {
257                co_destructive,
258                co_lossy,
259            },
260        }
261    }
262}
263
264/// Resolve the workspace root from the `--workspace` flag. When the
265/// flag is absent we use the current working directory — the typical
266/// invocation pattern is `cd <project>` then `djogi migrations …`.
267fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
268    workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
269}
270
271/// Walk the on-disk `migrations/<database>/<app>/` tree and return the
272/// set of buckets that already have a `schema_snapshot.json` file.
273/// Compose's `snapshots` map must include the OLD bucket of any
274/// renamed app — and that bucket is guaranteed to be absent from the
275/// current `models` inventory because the `#[app(renamed_from =
276/// "old")]` annotation lives on the NEW app. Walking disk directly
277/// recovers those orphaned snapshots so the differ sees both sides of
278/// a rename.
279/// Each entry maps to a [`djogi::migrate::projection::BucketKey`]
280/// using the inverse of [`djogi::migrate::app_dirname`] (synthetic
281/// `_global_` directory → empty-string label).
282fn discover_snapshot_buckets_on_disk(
283    workspace: &Path,
284) -> Vec<djogi::migrate::projection::BucketKey> {
285    let mut out = Vec::new();
286    let migrations_root = djogi::migrate::migrations_root(workspace);
287    let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
288        return out;
289    };
290    for db_entry in db_entries.flatten() {
291        let Ok(ft) = db_entry.file_type() else {
292            continue;
293        };
294        if !ft.is_dir() {
295            continue;
296        }
297        let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
298            continue;
299        };
300        let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
301            continue;
302        };
303        for app_entry in app_entries.flatten() {
304            let Ok(ft) = app_entry.file_type() else {
305                continue;
306            };
307            if !ft.is_dir() {
308                continue;
309            }
310            let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
311                continue;
312            };
313            let snap_path = app_entry.path().join("schema_snapshot.json");
314            if !snap_path.exists() {
315                continue;
316            }
317            let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
318            out.push(djogi::migrate::projection::BucketKey {
319                database: database.clone(),
320                app: label,
321            });
322        }
323    }
324    out
325}
326
327/// `djogi migrations compose` entry point.
328pub fn compose_cmd(
329    provider: &dyn DescriptorProvider,
330    name: &str,
331    allow_destructive: bool,
332    force_overwrite: bool,
333    workspace: Option<PathBuf>,
334) -> ExitCode {
335    let workspace = resolve_workspace(workspace);
336    let models = match project_from_provider(provider) {
337        Ok(m) => m,
338        Err(e) => {
339            eprintln!("djogi migrations compose: projection error: {e}");
340            return ExitCode::from(1);
341        }
342    };
343    let apps: Vec<AppLifecycle> = provider
344        .apps()
345        .iter()
346        .map(|d| AppLifecycle {
347            label: d.label.to_string(),
348            database: d.database.to_string(),
349            renamed_from: d.renamed_from.map(str::to_string),
350            tombstone: d.tombstone,
351        })
352        .collect();
353    // The resolved workspace flows into config loading. Compose consumes
354    // the [`MigrateConfig::pk_flip_join_table_option`] knob so we no
355    // longer drop the parsed config — the join-table layout
356    // selected in `Djogi.toml` reaches the differ via this path.
357    let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
358        Ok(c) => c,
359        Err(e) => {
360            eprintln!("djogi migrations compose: config load: {e}");
361            return ExitCode::from(1);
362        }
363    };
364    let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
365        djogi_config.migrate.pk_flip_join_table_option,
366    );
367    compose_with_inputs(
368        &workspace,
369        name,
370        allow_destructive,
371        force_overwrite,
372        &models,
373        &apps,
374        time::OffsetDateTime::now_utc(),
375        Some(pk_flip_option),
376    )
377}
378
379/// Shared compose body — separated from [`compose_cmd`] so tests can
380/// drive it with explicit `models` and `apps` (the production entry
381/// point sources both from `inventory::iter` and `AppRegistry::all`,
382/// which are global state and thus not directly addressable from a
383/// unit test).
384/// Acquires the workspace lock, walks the on-disk migration tree to
385/// recover orphaned snapshots (renamed-from buckets), and
386/// invokes [`djogi::migrate::compose`].
387// Compose has 8 inputs because it sits at the bridge between
388// CLI flag parsing (workspace / name / flags / clock) and the
389// engine (`models` / `apps` / `pk_flip_join_table_option`).
390// Folding these into a struct would push the same fields onto
391// the caller; the CLI tests already pass them positionally and
392// a struct-based refactor would be churn for no clarity gain.
393#[allow(clippy::too_many_arguments)]
394fn compose_with_inputs(
395    workspace: &Path,
396    name: &str,
397    allow_destructive: bool,
398    force_overwrite: bool,
399    models: &std::collections::BTreeMap<
400        djogi::migrate::projection::BucketKey,
401        djogi::migrate::AppliedSchema,
402    >,
403    apps: &[AppLifecycle],
404    now: time::OffsetDateTime,
405    pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
406) -> ExitCode {
407    let lock_path = workspace.join(LOCK_FILE_NAME);
408    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
409        Ok(g) => g,
410        Err(e) => {
411            eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
412            return ExitCode::from(1);
413        }
414    };
415
416    // Read snapshots from disk. The bucket set we load is the UNION of
417    // (a) every bucket the current projection knows about and (b) every
418    // on-disk bucket that has a snapshot file.
419    // Without (b) a renamed-from app's old snapshot is missed
420    // entirely (the new app's `BucketKey` differs and the differ
421    // never sees the old schema, breaking compose's rename + drop +
422    // move emission).
423    let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
424        models.keys().cloned().collect();
425    for bucket in discover_snapshot_buckets_on_disk(workspace) {
426        bucket_set.insert(bucket);
427    }
428
429    let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
430    for bucket in &bucket_set {
431        let path = djogi::migrate::snapshot_path(workspace, bucket);
432        match djogi::migrate::load_snapshot(&path) {
433            Ok(s) => {
434                snapshots.insert(bucket.clone(), s);
435            }
436            Err(djogi::migrate::SnapshotError::Io { source, .. })
437                if source.kind() == std::io::ErrorKind::NotFound =>
438            {
439                // Fresh app — no prior snapshot.
440            }
441            Err(e) => {
442                eprintln!(
443                    "djogi migrations compose: snapshot load failed at {}: {e}",
444                    path.display()
445                );
446                return ExitCode::from(1);
447            }
448        }
449    }
450
451    let req = ComposeRequest {
452        workspace_root: workspace,
453        models,
454        snapshots: &snapshots,
455        apps,
456        name,
457        allow_destructive,
458        force_overwrite,
459        now,
460        _guard: &guard,
461        pk_flip_join_table_option,
462        // Production: always run auto-emit. The flag is a
463        // test-only escape hatch for unit tests that exercise
464        // compose's lower-level write/rollback machinery in
465        // isolation; the CLI / production path always goes through
466        // the full bootstrap flow.
467        skip_phase_zero_auto_emit: false,
468    };
469    match compose(req) {
470        Ok(report) => {
471            // Surface auto-emitted bootstraps before
472            // the regular composed buckets so the operator sees the
473            // bootstrap context before the per-bucket changes.
474            for emit in &report.emitted_phase_zero {
475                let ext_summary = if emit.extensions.is_empty() {
476                    "no extensions".to_string()
477                } else {
478                    format!(
479                        "extensions: {}",
480                        emit.extensions
481                            .iter()
482                            .cloned()
483                            .collect::<Vec<_>>()
484                            .join(", ")
485                    )
486                };
487                println!(
488                    "auto-emitted bootstrap migration: {database}/_global_ ({ext_summary})",
489                    database = emit.database,
490                );
491            }
492            for cb in &report.composed_buckets {
493                println!(
494                    "composed {database}/{app}: {version} ({classification:?})",
495                    database = cb.bucket.database,
496                    app = if cb.bucket.app.is_empty() {
497                        "_global_"
498                    } else {
499                        cb.bucket.app.as_str()
500                    },
501                    version = cb.version,
502                    classification = cb.classification,
503                );
504            }
505            ExitCode::from(0)
506        }
507        Err(ComposeError::NothingToCompose) => {
508            println!("nothing to compose — model state matches snapshot for every bucket");
509            // Per the inline-decisions: nothing-to-compose is
510            // not an error. The status command is the one that
511            // signals out-of-sync state via exit code.
512            ExitCode::from(0)
513        }
514        Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
515            eprintln!("djogi migrations compose: {text}");
516            // Exit 2 — refusal: models must be compiled in before dropping app linkage.
517            ExitCode::from(2)
518        }
519        Err(e) => {
520            eprintln!("djogi migrations compose: {e}");
521            ExitCode::from(1)
522        }
523    }
524}
525
526/// `djogi migrations status` entry point.
527/// Read-only — does not acquire the workspace lock. Reads the
528/// migration ledger from the active database via
529/// [`djogi::context::DjogiContext`].
530pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
531    let workspace = resolve_workspace(workspace);
532
533    // Build a tokio runtime so we can drive the async ledger query.
534    let runtime = match tokio::runtime::Builder::new_current_thread()
535        .enable_all()
536        .build()
537    {
538        Ok(r) => r,
539        Err(e) => {
540            eprintln!("djogi migrations status: tokio runtime: {e}");
541            return ExitCode::from(1);
542        }
543    };
544
545    let exit = runtime.block_on(async { run_status(&workspace).await });
546    ExitCode::from(exit as u8)
547}
548
549/// Async body of [`status_cmd`]. Returns the desired exit code.
550/// The resolved `workspace` path feeds
551/// [`djogi::config::DjogiConfig::load_from_workspace`] so a
552/// `--workspace /custom/path` actually reads `/<custom>/Djogi.toml`
553/// instead of always picking up the cwd's config. Production callers
554/// running from inside the project root (the typical case) get the
555/// previous behaviour for free — `resolve_workspace(None)` returns
556/// `cwd`.
557async fn run_status(workspace: &Path) -> i32 {
558    use djogi::config::DjogiConfig;
559
560    let config = match DjogiConfig::load_from_workspace(workspace) {
561        Ok(c) => c,
562        Err(e) => {
563            eprintln!("djogi migrations status: config load: {e}");
564            return 1;
565        }
566    };
567
568    let mut ctx = match connect_and_check(&config.database.url).await {
569        ContextOutcome::Ready(ctx) => ctx,
570        ContextOutcome::UnsupportedVersion(e) => {
571            crate::print_support_boundary_error("migrations status", &e);
572            return 2;
573        }
574        ContextOutcome::RuntimeError(msg) => {
575            eprintln!("djogi migrations status: pool: {msg}");
576            return 1;
577        }
578    };
579
580    let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
581        Ok(rows) => rows,
582        Err(e) => {
583            // A missing ledger table is treated as "no migrations
584            // applied" — print the empty state and exit 0.
585            if e.to_string().contains("djogi_schema_migrations") {
586                println!("No migrations recorded.");
587                return 0;
588            }
589            eprintln!("djogi migrations status: ledger read: {e}");
590            return 1;
591        }
592    };
593
594    let registered: Vec<String> = AppRegistry::all()
595        .iter()
596        .map(|d| d.label.to_string())
597        .collect();
598    let report = djogi::migrate::render_status(&rows, &registered);
599    for line in &report.lines {
600        println!("{line}");
601    }
602    report.exit_code
603}
604
605/// Outcome of [`connect_and_check`] — connecting a pool and running the
606/// Postgres-version preflight, with the support-boundary refusal kept
607/// distinct from ordinary runtime failures.
608/// The three arms drive different exit codes at the call site:
609/// - [`ContextOutcome::Ready`] — pool connected and PG ≥ 18; proceed.
610/// - [`ContextOutcome::UnsupportedVersion`] — PG < 18. The caller renders
611///   the support-boundary message via
612///   [`crate::print_support_boundary_error`] and exits `2` (refusal: the
613///   operator must upgrade Postgres; retrying changes nothing).
614/// - [`ContextOutcome::RuntimeError`] — pool connect failed, the preflight
615///   query errored, or any other non-version `DjogiError`. The caller
616///   prints the message and exits `1` (transient: CI may retry).
617// The `Ready` variant holds a `DjogiContext` (large — it wraps a
618// `DjogiPool`), while the other two variants are small (`DjogiError` /
619// `String`). Boxing `Ready` would add a heap allocation on the success
620// path; this value is constructed and immediately matched at each call
621// site (never stored in a collection), so the wider stack value is a
622// transient one-off, not a per-element penalty. Same trade-off and
623// rationale as `ContextInner` in `djogi::context` (see its
624// `large_enum_variant` allow).
625#[allow(clippy::large_enum_variant)]
626enum ContextOutcome {
627    /// Pool connected and the PG-version preflight passed.
628    Ready(djogi::context::DjogiContext),
629    /// The PG-version preflight refused — server is below the minimum
630    /// supported major version.
631    UnsupportedVersion(djogi::error::DjogiError),
632    /// A runtime failure (connect / preflight / other) — already rendered
633    /// to a string so the call site need not re-match.
634    RuntimeError(String),
635}
636
637/// Connect a pool from `url` and run the Postgres-version preflight,
638/// returning a typed [`ContextOutcome`].
639/// Splits the support-boundary refusal (PG < 18, exit `2`) from runtime
640/// failures (connect / query errors, exit `1`) so each call site can map
641/// the outcome onto the documented exit-code matrix. Connects via the
642/// public `DjogiPool::connect` entry point, then hands the pool to the
643/// public `DjogiContext::from_pool` API once the version check passes.
644async fn connect_and_check(url: &str) -> ContextOutcome {
645    let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
646        Ok(p) => p,
647        Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
648    };
649    match djogi::pg::preflight::check_postgres_version(&pool).await {
650        Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
651        // `DjogiError` is `#[non_exhaustive]`, so the `@`-bound
652        // `UnsupportedPostgresVersion` arm needs the trailing `_` catch-all.
653        Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
654            ContextOutcome::UnsupportedVersion(e)
655        }
656        Err(other) => ContextOutcome::RuntimeError(other.to_string()),
657    }
658}
659
660/// Resolve the connection URL for a single migration-bucket database.
661/// Verify routes each bucket to the pool for its `database` component.
662/// The mapping mirrors Djogi's three-database architecture:
663/// - `"main"` ([`djogi::apps::AppDescriptor::GLOBAL_DATABASE`]) always uses
664///   the app URL verbatim. We do NOT derive it by splicing `"main"` into
665///   the path, because the operator's app URL may carry a path component
666///   that is not literally named `main` (e.g. `…/myapp_prod`); deriving
667///   would target a database that does not exist.
668/// - `"crud_log"` / `"event_log"` prefer the explicit
669///   [`djogi::config::DatabaseConfig::crud_log_url`] /
670///   [`event_log_url`](djogi::config::DatabaseConfig::event_log_url) when
671///   set to a non-empty value, matching how the audit / event pools are
672///   resolved elsewhere.
673/// - Any other database name (and the log databases when their explicit
674///   URL is absent) is derived by splicing the name into the app URL's
675///   path component via [`djogi::migrate::derive_per_database_url`].
676///   Returns `None` when derivation fails (the app URL has no recognisable
677///   path component); the caller surfaces that as a runtime error for the
678///   affected bucket.
679fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
680    // "main" always uses the app URL verbatim — do NOT derive, as the app
681    // URL may not have a path component named "main".
682    if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
683        return Some(db_config.url.clone());
684    }
685    if database == "crud_log"
686        && let Some(u) = db_config.crud_log_url.as_deref()
687        && !u.is_empty()
688    {
689        return Some(u.to_string());
690    }
691    if database == "event_log"
692        && let Some(u) = db_config.event_log_url.as_deref()
693        && !u.is_empty()
694    {
695        return Some(u.to_string());
696    }
697    djogi::migrate::derive_per_database_url(&db_config.url, database)
698}
699
700/// `djogi migrations apply` entry point.
701/// Discovers pending JSON files under `target/djogi_pending/`, loads the
702/// committed replay plan for each, and drives [`djogi::migrate::apply_plan`]
703/// through the library runner with full crash recovery via the ledger state
704/// machine.
705pub fn apply_cmd(workspace: Option<PathBuf>, fake: bool, reason: Option<String>) -> ExitCode {
706    let workspace = resolve_workspace(workspace);
707
708    // Validate --fake / --reason pairing before doing any expensive work.
709    let mode = if fake {
710        match reason {
711            Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
712            Some(_) => {
713                eprintln!(
714                    "djogi migrations apply --fake: --reason must not be empty; \
715                     supply a non-empty reason why these migrations are being \
716                     faked (e.g. 'schema pre-exists from prior tooling')"
717                );
718                return ExitCode::from(2);
719            }
720            None => {
721                eprintln!(
722                    "djogi migrations apply --fake: --reason is required; \
723                     supply a reason why these migrations are being faked \
724                     (e.g. 'schema pre-exists from prior tooling'). \
725                     This is recorded in the ledger audit trail."
726                );
727                return ExitCode::from(2);
728            }
729        }
730    } else {
731        FakeMode::Real
732    };
733
734    let runtime = match tokio::runtime::Builder::new_current_thread()
735        .enable_all()
736        .build()
737    {
738        Ok(r) => r,
739        Err(e) => {
740            eprintln!("djogi migrations apply: tokio runtime: {e}");
741            return ExitCode::from(1);
742        }
743    };
744
745    let exit = runtime.block_on(async { run_apply(&workspace, &mode).await });
746    ExitCode::from(exit as u8)
747}
748
749/// Controls whether `apply_one_pending` executes SQL or records a
750/// fake-apply row in the ledger.
751#[derive(Debug, Clone)]
752enum FakeMode {
753    /// Execute DDL via `apply_plan`. Normal migration apply.
754    Real,
755    /// Skip DDL; record `status = 'faked'` via `fake_apply_plan`.
756    Fake { reason: String },
757}
758
759/// Async body of [`apply_cmd`]. Returns the desired exit code.
760async fn run_apply(workspace: &Path, mode: &FakeMode) -> i32 {
761    use djogi::config::DjogiConfig;
762
763    let action_verb = match mode {
764        FakeMode::Real => "apply",
765        FakeMode::Fake { .. } => "fake-apply",
766    };
767    let progress_verb = match mode {
768        FakeMode::Real => "applying",
769        FakeMode::Fake { .. } => "faking",
770    };
771
772    // 1. Load config.
773    let config = match DjogiConfig::load_from_workspace(workspace) {
774        Ok(c) => c,
775        Err(e) => {
776            eprintln!("djogi migrations {action_verb}: config load: {e}");
777            return 2;
778        }
779    };
780
781    // 2. Build pool and check PG version preflight.
782    let pool = match djogi::pg::pool::DjogiPool::connect(&config.database.url).await {
783        Ok(p) => p,
784        Err(e) => {
785            eprintln!("djogi migrations {action_verb}: pool connect: {e}");
786            return 1;
787        }
788    };
789    if let Err(e) = djogi::pg::preflight::check_postgres_version(&pool).await {
790        crate::print_support_boundary_error("migrations apply", &e);
791        return 2;
792    }
793
794    // 3. Discover pending JSONs.
795    let pending_files = discover_pending_plans(workspace);
796    if pending_files.is_empty() {
797        println!("No pending migrations to {action_verb}.");
798        return 0;
799    }
800
801    // 4. Acquire workspace lock.
802    let lock_path = workspace.join(LOCK_FILE_NAME);
803    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
804        Ok(g) => g,
805        Err(e) => {
806            eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
807            return 1;
808        }
809    };
810
811    // 5. Build audit pool (optional — silently skipped if unavailable).
812    let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
813        Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
814        Err(_) => None,
815    };
816
817    // 6. Build context from pool (not pinned yet — apply_plan pins internally).
818    let mut ctx = djogi::context::DjogiContext::from_pool(pool);
819
820    // 7. Apply each pending migration in order.
821    for (pending_path, bucket_database, app_label) in &pending_files {
822        println!("  {progress_verb} {bucket_database}/{app_label}...");
823        let result = apply_one_pending(
824            &mut ctx,
825            workspace,
826            pending_path,
827            bucket_database.clone(),
828            app_label.clone(),
829            &config,
830            &guard,
831            audit_pool.as_ref(),
832            mode,
833        )
834        .await;
835
836        match result {
837            ApplyResult::Ok => match mode {
838                FakeMode::Real => {
839                    println!("Applied: {bucket_database}/{app_label}");
840                }
841                FakeMode::Fake { .. } => {
842                    println!(
843                        "  faked {bucket_database}/{app_label}: \
844                             recorded in ledger with status = 'faked' (no SQL executed)"
845                    );
846                }
847            },
848            ApplyResult::Skipped(reason) => {
849                println!("Skipped {bucket_database}/{app_label}: {reason}");
850            }
851            ApplyResult::Refused(reason) => {
852                eprintln!(
853                    "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
854                );
855                return 2;
856            }
857            ApplyResult::RunnerError(e) => {
858                eprintln!(
859                    "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
860                );
861                return runner_error_exit_code(&e);
862            }
863        }
864    }
865
866    let summary_verb = match mode {
867        FakeMode::Real => "applied",
868        FakeMode::Fake { .. } => "faked",
869    };
870    println!("{summary_verb} {} migration(s).", pending_files.len());
871    0
872}
873
874/// Outcome of applying a single pending migration.
875#[derive(Debug)]
876enum ApplyResult {
877    /// Migration applied successfully.
878    Ok,
879    /// Migration skipped (already applied or no-op).
880    Skipped(String),
881    /// User-facing refusal — exit code 2.
882    Refused(String),
883    /// Runner error — exit code 1.
884    RunnerError(RunnerError),
885}
886
887/// Scan `target/djogi_pending/` for pending JSON files.
888/// Returns a list of `(path, database, app)` tuples sorted by file
889/// name so the apply order is deterministic. Each path points to a
890/// valid JSON file that was discovered on disk.
891fn discover_pending_plans(workspace: &Path) -> Vec<(PathBuf, String, String)> {
892    let pending_root = djogi::migrate::pending_root(workspace);
893    let mut out = Vec::new();
894
895    let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
896        return out;
897    };
898
899    for db_entry in db_entries.flatten() {
900        let db_name = match db_entry.file_name().to_str().map(str::to_string) {
901            Some(n) => n,
902            None => continue,
903        };
904        if db_name.starts_with('.') {
905            continue;
906        }
907
908        let db_dir = db_entry.path();
909        if !db_dir.is_dir() {
910            continue;
911        }
912
913        let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
914            continue;
915        };
916
917        for app_entry in app_entries.flatten() {
918            let path = app_entry.path();
919            if !path.is_file() {
920                continue;
921            }
922            let filename = match path.file_name().and_then(|f| f.to_str()) {
923                Some(f) => f,
924                None => continue,
925            };
926            // Filter: must be a .json file, not the special _global_.json
927            // pattern which is handled correctly by the naming function.
928            if !filename.ends_with(".json") {
929                continue;
930            }
931            // Extract app label from filename by stripping .json extension.
932            // The pending JSON filename is `<app>.json` or `_global_.json`.
933            let app_label = if let Some(stripped) = filename.strip_suffix(".json") {
934                stripped.to_string()
935            } else {
936                continue;
937            };
938
939            out.push((path, db_name.clone(), app_label));
940        }
941    }
942
943    out.sort_by(|a, b| a.0.cmp(&b.0));
944    out
945}
946
947/// Apply a single pending migration.
948/// Loads the pending JSON to recover bucket and version, checks the
949/// ledger state machine for crash recovery, loads the committed replay
950/// plan (or falls back to a single-segment plan from the SQL file), and
951/// drives [`djogi::migrate::apply_plan`].
952/// Uses the bypass attribute because deleting failed ledger rows requires
953/// raw SQL that is not exposed through the public typed API.
954// apply_one_pending carries 9 arguments because it sits at the bridge
955// between the CLI dispatch (workspace, path, bucket info) and the
956// library runner (config, guard, audit pool, mode). Folding these into a
957// struct would push the same fields onto the caller and add churn for
958// no clarity gain — the pattern matches compose_with_inputs and attune.
959#[allow(clippy::too_many_arguments)]
960#[djogi::deliberately_bypass_convention_with_raw_sql]
961// JUSTIFICATION (PIN): apply_one_pending needs to delete stale failed
962// ledger rows via `DELETE FROM djogi_schema_migrations WHERE version = $1`.
963// The public API has no delete operation — `select_all_ledger_rows` is read-only and
964// `insert_pending` is write-only. This is the minimal raw SQL surface
965// required for crash recovery.
966async fn apply_one_pending(
967    ctx: &mut djogi::context::DjogiContext,
968    workspace: &Path,
969    pending_path: &Path,
970    bucket_database: String,
971    app_label: String,
972    config: &djogi::config::DjogiConfig,
973    guard: &djogi::migrate::WorkspaceGuard,
974    audit_pool: Option<&deadpool_postgres::Pool>,
975    mode: &FakeMode,
976) -> ApplyResult {
977    // 1. Parse pending JSON to get bucket + version + checksums.
978    let pending_bytes = match std::fs::read(pending_path) {
979        Ok(b) => b,
980        Err(e) => {
981            return ApplyResult::Refused(format!("read pending JSON: {e}"));
982        }
983    };
984    let pending: PendingPlan = match serde_json::from_slice(&pending_bytes) {
985        Ok(p) => p,
986        Err(e) => {
987            return ApplyResult::Refused(format!("parse pending JSON: {e}"));
988        }
989    };
990
991    // Resolve bucket key from pending plan fields. The `_global_` app
992    // maps to empty string (synthetic global bucket).
993    let resolved_app = if app_label == "_global_" {
994        String::new()
995    } else {
996        app_label.clone()
997    };
998    let bucket = djogi::migrate::BucketKey {
999        database: bucket_database,
1000        app: resolved_app,
1001    };
1002
1003    // 2. Check ledger state machine for this version.
1004    match check_ledger_state(ctx, &pending.version).await {
1005        LedgerState::NotPresent => {} /* normal path */
1006        LedgerState::AlreadyApplied => {
1007            return ApplyResult::Skipped("already applied".to_string());
1008        }
1009        LedgerState::PendingOrPartial(existing_status) => {
1010            // Pending or partial state from a previous interrupted run.
1011            // Failed and RolledBack are non-terminal stale rows that block
1012            // re-apply — delete them and proceed. Pending rows require
1013            // explicit operator resolution.
1014            if existing_status == LedgerStatus::Failed
1015                || existing_status == LedgerStatus::RolledBack
1016            {
1017                // Both Failed and RolledBack rows are non-terminal stale rows
1018                // that block re-apply. delete_failed_ledger_row is a status-
1019                // agnostic DELETE by version; the name reflects the original
1020                // crash-recovery use case but the operation applies equally to
1021                // rolled-back rows.
1022                if let Err(e) = delete_failed_ledger_row(ctx, &pending.version).await {
1023                    return ApplyResult::Refused(format!(
1024                        "clean {} ledger row: {e}",
1025                        existing_status.as_db_str()
1026                    ));
1027                }
1028            } else {
1029                return ApplyResult::Refused(format!(
1030                    "version already in {} state — resolve before re-applying",
1031                    existing_status.as_db_str()
1032                ));
1033            }
1034        }
1035    }
1036
1037    // 3. Load committed replay plan (or fall back to single-segment).
1038    let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1039        workspace,
1040        &bucket,
1041        &pending.version,
1042        &pending.checksum_up,
1043        pending.checksum_down.as_deref(),
1044    ) {
1045        Ok(result) => result,
1046        Err(e) => {
1047            return ApplyResult::Refused(format!("load replay plan: {e}"));
1048        }
1049    };
1050
1051    // 4. Construct RunnerCtx.
1052    let runner_ctx = RunnerCtx {
1053        bucket: bucket.clone(),
1054        version: pending.version.clone(),
1055        description: pending.slug.clone(),
1056        checksum_up,
1057        checksum_down,
1058        snapshot: Some(pending.model_snapshot.clone()),
1059        snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
1060        // MigrateConfig does not derive Clone; construct from fields.
1061        config: djogi::config::MigrateConfig {
1062            concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1063            strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1064            pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1065            pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1066        },
1067        out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1068        audit_pool: audit_pool.cloned(),
1069    };
1070
1071    // 5. Apply (or fake-apply) the plan through the library runner.
1072    let runner_result = match mode {
1073        FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1074        FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1075    };
1076    match runner_result {
1077        Ok(_) => ApplyResult::Ok,
1078        Err(e) => ApplyResult::RunnerError(e),
1079    }
1080}
1081
1082/// Ledger state for a given migration version.
1083#[derive(Debug)]
1084enum LedgerState {
1085    /// No row exists — first apply.
1086    NotPresent,
1087    /// Row exists and is in terminal applied state.
1088    AlreadyApplied,
1089    /// Row exists in a non-terminal state with the specific status.
1090    PendingOrPartial(LedgerStatus),
1091}
1092
1093/// Check the ledger for an existing row matching `version`.
1094async fn check_ledger_state(ctx: &mut djogi::context::DjogiContext, version: &str) -> LedgerState {
1095    let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1096        // Ledger table might not exist yet — treat as NotPresent so
1097        // the runner can bootstrap it.
1098        return LedgerState::NotPresent;
1099    };
1100
1101    let existing = rows.iter().find(|r| r.version == version);
1102    match existing {
1103        None => LedgerState::NotPresent,
1104        Some(row) => match row.status {
1105            LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1106                LedgerState::AlreadyApplied
1107            }
1108            LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1109                LedgerState::PendingOrPartial(row.status)
1110            }
1111        },
1112    }
1113}
1114
1115/// Map a [`RunnerError`] to an exit code.
1116/// All runner errors map to exit code 1 (apply failure). Exit code 2
1117/// is reserved for user-facing refusals that happen before the runner
1118/// is invoked.
1119fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1120    1
1121}
1122
1123#[djogi::deliberately_bypass_convention_with_raw_sql]
1124// JUSTIFICATION (PIN): delete_failed_ledger_row removes a stale Failed
1125// row so the migration can be retried. The public API has no delete
1126// operation for ledger rows — only select_all_ledger_rows and insert_pending
1127// are exposed. This DELETE is the minimal raw SQL required for crash recovery.
1128async fn delete_failed_ledger_row(
1129    ctx: &mut djogi::context::DjogiContext,
1130    version: &str,
1131) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1132    ctx.raw_execute(
1133        "DELETE FROM djogi_schema_migrations WHERE version = $1",
1134        &[&version],
1135    )
1136    .await?;
1137    Ok(())
1138}
1139
1140/// Reconstruct the snapshot path for a bucket: `migrations/<database>/<app>/schema_snapshot.json`.
1141fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1142    let migrations_root = djogi::migrate::migrations_root(workspace);
1143    migrations_root
1144        .join(&bucket.database)
1145        .join(djogi::migrate::app_dirname(&bucket.app))
1146        .join("schema_snapshot.json")
1147}
1148
1149/// `djogi migrations attune` entry point.
1150/// Mode selection (per CLI flags):
1151/// | `--record-ledger` | `--squash` | resolved mode |
1152/// |-----------|-----------|---------------|
1153/// | false | false | [`AttuneMode::DiffOnly`] (read-only diff) |
1154/// | true | false | [`AttuneMode::Record`] |
1155/// | false | true | [`AttuneMode::Squash { from, publish, app }`] |
1156/// | true | true | rejected by clap (`conflicts_with`) |
1157/// Argument semantics:
1158/// - `target` is an optional positional Git target (commit / tag /
1159///   branch). When supplied, attune resolves it (local first, fetch
1160///   on miss) before any DB / disk mutation.
1161/// - `apply` gates DB / disk mutation. Without it, every mode is a
1162///   dry-run.
1163/// - `record` controls the parent repo's recorded submodule pointer
1164///   (separate from `record_ledger`, which controls the
1165///   `djogi_schema_migrations` ledger inserts).
1166///   `--squash` requires `--from <ver>`; an absent `from` while
1167///   `--squash` is set surfaces as a CLI error before any work happens.
1168// The CLI dispatch carries 11 inputs because the attune surface is
1169// the broadest in the migrations CLI — target
1170// resolution + dry-run + record-ledger + record-pointer + squash +
1171// publish all live on the same command. Folding them into a struct
1172// would push the same fields onto the caller; the dispatch above
1173// already passes them positionally and a struct refactor would be
1174// churn for no clarity gain.
1175#[allow(clippy::too_many_arguments)]
1176pub fn attune_cmd(
1177    target: Option<&str>,
1178    apply: bool,
1179    record: bool,
1180    record_ledger: bool,
1181    record_reason: &str,
1182    squash: bool,
1183    from: Option<&str>,
1184    publish: bool,
1185    app: Option<&str>,
1186    workspace: Option<PathBuf>,
1187) -> ExitCode {
1188    let workspace = resolve_workspace(workspace);
1189    let mode = match (record_ledger, squash) {
1190        (false, false) => AttuneMode::DiffOnly,
1191        (true, false) => AttuneMode::Record {
1192            reason: record_reason.to_string(),
1193        },
1194        (false, true) => match from {
1195            Some(v) if !v.is_empty() => AttuneMode::Squash {
1196                from: v.to_string(),
1197                publish,
1198                app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1199            },
1200            _ => {
1201                eprintln!(
1202                    "djogi migrations attune --squash requires --from <version> (e.g. \
1203                     `--from V20260101000000__init`)"
1204                );
1205                return ExitCode::from(2);
1206            }
1207        },
1208        (true, true) => {
1209            // Already rejected by clap's `conflicts_with`; this branch
1210            // is defensive in case the flag is added programmatically.
1211            eprintln!(
1212                "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1213            );
1214            return ExitCode::from(2);
1215        }
1216    };
1217
1218    let runtime = match tokio::runtime::Builder::new_current_thread()
1219        .enable_all()
1220        .build()
1221    {
1222        Ok(r) => r,
1223        Err(e) => {
1224            eprintln!("djogi migrations attune: tokio runtime: {e}");
1225            return ExitCode::from(1);
1226        }
1227    };
1228
1229    let target_owned = target.map(str::to_string);
1230    let exit =
1231        runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1232    ExitCode::from(exit as u8)
1233}
1234
1235/// Async body of [`attune_cmd`]. Loads config, builds the context,
1236/// acquires the workspace lock, invokes the library entry point.
1237async fn run_attune(
1238    workspace: &Path,
1239    mode: AttuneMode,
1240    target: Option<String>,
1241    apply: bool,
1242    record: bool,
1243) -> i32 {
1244    use djogi::config::DjogiConfig;
1245
1246    let config = match DjogiConfig::load_from_workspace(workspace) {
1247        Ok(c) => c,
1248        Err(e) => {
1249            eprintln!("djogi migrations attune: config load: {e}");
1250            return 1;
1251        }
1252    };
1253
1254    let mut ctx = match connect_and_check(&config.database.url).await {
1255        ContextOutcome::Ready(ctx) => ctx,
1256        ContextOutcome::UnsupportedVersion(e) => {
1257            crate::print_support_boundary_error("migrations attune", &e);
1258            return 2;
1259        }
1260        ContextOutcome::RuntimeError(msg) => {
1261            eprintln!("djogi migrations attune: pool: {msg}");
1262            return 1;
1263        }
1264    };
1265
1266    // All three modes acquire the workspace lock per the v3 file-lock
1267    // contract — even DiffOnly takes the lock so a concurrent compose
1268    // / apply cannot mutate the tree mid-scan.
1269    let lock_path = workspace.join(LOCK_FILE_NAME);
1270    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1271        Ok(g) => g,
1272        Err(e) => {
1273            eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
1274            return 1;
1275        }
1276    };
1277
1278    let req = AttuneRequest {
1279        workspace_root: workspace,
1280        database_url: &config.database.url,
1281        profile: &config.profile,
1282        // Thread `[database].dev_mode` to the squash gate. Read-only modes
1283        // (`DiffOnly`, `Record`) ignore it; `Squash` mode refuses unless
1284        // this is `true`.
1285        dev_mode: config.database.dev_mode,
1286        // The operator-supplied target + the `--apply` / `--record` gates
1287        // flow through to the library
1288        // entry point. The library owns the resolution + parent-pointer
1289        // update; the CLI is just plumbing.
1290        target: target.as_deref(),
1291        apply,
1292        record,
1293        mode,
1294        _guard: &guard,
1295    };
1296    match attune(&mut ctx, req).await {
1297        Ok(report) => {
1298            if report.entries.is_empty() {
1299                println!("attune: no drift");
1300            } else {
1301                for entry in &report.entries {
1302                    let app_display = if entry.bucket.app.is_empty() {
1303                        "_global_"
1304                    } else {
1305                        entry.bucket.app.as_str()
1306                    };
1307                    println!(
1308                        "  {kind:<10}  {database}/{app}  {version}",
1309                        kind = entry.kind.as_str(),
1310                        database = entry.bucket.database,
1311                        app = app_display,
1312                        version = entry.version,
1313                    );
1314                }
1315            }
1316            // Surface structured diagnostics — today this carries the
1317            // LedgerTableMissing notice when DiffOnly runs on a
1318            // fresh database.
1319            for diag in &report.diagnostics {
1320                println!("  diagnostic: {diag}");
1321            }
1322            if let Some(sha) = &report.resolved_target {
1323                println!("resolved target: {sha}");
1324            }
1325            if let Some(squashed) = &report.squashed_to {
1326                println!("squashed to: {squashed}");
1327            }
1328            if report.published {
1329                println!("published to remote");
1330            }
1331            if report.parent_pointer_updated {
1332                println!("parent submodule pointer updated");
1333            }
1334            0
1335        }
1336        Err(e) => {
1337            eprintln!("djogi migrations attune: {e}");
1338            attune_error_exit_code(&e)
1339        }
1340    }
1341}
1342
1343/// Map an [`AttuneError`] variant onto the documented exit-code
1344/// matrix (`docs/spec/configuration.md` §14):
1345/// - Refusal variants → exit code `2` ("operator must intervene;
1346///   nothing happened"). Today every refusal flows through
1347///   [`AttuneError::Refused`]; the localhost gate, the dev-profile
1348///   gate, the missing-version refusal, and the ambiguous-version
1349///   refusal are all reachable through that variant.
1350/// - Runtime variants → exit code `1` ("we tried; something broke"
1351///   filesystem scan, ledger query, SQL read/write/delete, git
1352///   publish). CI may safely retry these.
1353///   Pulled out as a free function so unit tests can pin every variant
1354///   without spinning a Tokio runtime. Operators rely on the 1-vs-2
1355///   distinction to tell "refused before any side effect" from "ran and
1356///   failed mid-flight".
1357fn attune_error_exit_code(err: &AttuneError) -> i32 {
1358    match err {
1359        AttuneError::Refused(_) => 2,
1360        AttuneError::FilesystemScanFailed { .. }
1361        | AttuneError::LedgerQueryFailed { .. }
1362        | AttuneError::SqlReadFailed { .. }
1363        | AttuneError::SqlWriteFailed { .. }
1364        | AttuneError::SqlDeleteFailed { .. }
1365        | AttuneError::GitPublishFailed { .. }
1366        | AttuneError::GitTargetResolveFailed { .. }
1367        | AttuneError::GitFetchFailed { .. }
1368        | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
1369    }
1370}
1371
1372/// `djogi migrations verify` entry point.
1373/// Read-only — does not acquire the workspace lock. Reads the live
1374/// Postgres catalog via [`djogi::context::DjogiContext`] and compares
1375/// against the projected schema from the descriptor inventory.
1376/// Exit codes: 0 on success (no error-level diagnostics), 1 on runtime
1377/// error (config / network / SQL / projection), 2 on refusal
1378/// (below PG 18).
1379pub fn verify_cmd(
1380    provider: &dyn DescriptorProvider,
1381    workspace: Option<PathBuf>,
1382    strict: bool,
1383) -> ExitCode {
1384    let workspace = resolve_workspace(workspace);
1385
1386    let runtime = match tokio::runtime::Builder::new_current_thread()
1387        .enable_all()
1388        .build()
1389    {
1390        Ok(r) => r,
1391        Err(e) => {
1392            eprintln!("djogi migrations verify: tokio runtime: {e}");
1393            return ExitCode::from(1);
1394        }
1395    };
1396
1397    let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
1398    ExitCode::from(exit as u8)
1399}
1400
1401/// Async body of [`verify_cmd`]. Returns the desired exit code.
1402/// Verify is multi-database aware: each `(database, app)` bucket is routed
1403/// to the pool for its `database` component via [`resolve_bucket_url`], and
1404/// the per-database context is connected lazily and cached so a database
1405/// with several app buckets connects once. The bucket set is the UNION of
1406/// the inventory projection and the on-disk snapshot tree, so an orphaned
1407/// snapshot (a removed app's snapshot still on disk) is verified and
1408/// surfaces drift rather than being silently skipped .
1409/// Exit codes:
1410/// - `0` — every bucket verified with no error-severity diagnostic.
1411/// - `1` — at least one runtime failure (pool / snapshot / verify error)
1412///   or at least one bucket reported an error-severity diagnostic.
1413/// - `2` — the server is below the minimum supported Postgres version
1414///   (a server-global refusal: verify returns immediately).
1415async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
1416    use djogi::config::DjogiConfig;
1417
1418    // 0. Zero-descriptor refusal (§5.6 / REQ-370-8). `verify` refuses with
1419    // the dual-cause diagnostic + exit 2 ONLY when there are NEITHER
1420    // descriptors NOR on-disk snapshots — the genuinely unusable state
1421    // (a standalone binary with nothing to verify against). When
1422    // snapshots exist, verify DEGRADES to snapshot-only (the union below
1423    // enumerates the disk buckets), so we must not refuse here.
1424    // Guard on `provider.models().is_empty()` rather than the projected
1425    // `bucket_set`: projection always seeds the synthetic global bucket
1426    // (`(main, "")`), so the bucket set is never empty and is the wrong
1427    // signal for "no descriptors". This is the same guard the
1428    // compose/schema/docs gates in `lib.rs` use.
1429    if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
1430        crate::print_zero_descriptor_diagnostic("migrations verify");
1431        return 2;
1432    }
1433
1434    // 1. Load config from workspace.
1435    let config = match DjogiConfig::load_from_workspace(workspace) {
1436        Ok(c) => c,
1437        Err(e) => {
1438            eprintln!("djogi migrations verify: config load: {e}");
1439            return 1;
1440        }
1441    };
1442
1443    // 2. Project schema from descriptor provider.
1444    let models = match project_from_provider(provider) {
1445        Ok(m) => m,
1446        Err(e) => {
1447            eprintln!("djogi migrations verify: projection error: {e}");
1448            return 1;
1449        }
1450    };
1451
1452    // 3. Build the bucket set as the UNION of the inventory projection and
1453    // the on-disk snapshot tree . An orphaned snapshot
1454    // a removed app whose snapshot still sits on disk — is absent from
1455    // `models` but present on disk; without the union it would never be
1456    // verified and out-of-band drift would go unreported.
1457    let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
1458        models.keys().cloned().collect();
1459    for bucket in discover_snapshot_buckets_on_disk(workspace) {
1460        bucket_set.insert(bucket);
1461    }
1462    // The zero-descriptor refusal (step 0) already returned for the only
1463    // state that yields an empty bucket set (no descriptors + no snapshots).
1464    // Projection always seeds the synthetic global bucket, so reaching here
1465    // with an empty set is impossible; if a future projection change ever
1466    // breaks that invariant, fail closed with the dual-cause refusal rather
1467    // than silently reporting success on a binary that verified nothing.
1468    if bucket_set.is_empty() {
1469        crate::print_zero_descriptor_diagnostic("migrations verify");
1470        return 2;
1471    }
1472
1473    // 4. Policy configuration for the --strict flag.
1474    let policy = djogi::config::PolicyConfig {
1475        strict_out_of_order: strict,
1476    };
1477
1478    // 5. Pre-compute the set of databases that have at least one INVENTORY
1479    // bucket with non-empty models. Orphan-only databases (snapshots on
1480    // disk but no registered models) are excluded — `unwrap_or(false)`
1481    // treats a disk-only bucket as model-less. This gates D699 inside
1482    // `verify_bucket`: an orphan-only database has no live tables to
1483    // miss, so D601 is the actionable signal instead.
1484    let database_has_models: std::collections::HashSet<String> = bucket_set
1485        .iter()
1486        .filter(|b| {
1487            models
1488                .get(*b)
1489                .map(|s| !s.models.is_empty())
1490                .unwrap_or(false)
1491        })
1492        .map(|b| b.database.clone())
1493        .collect();
1494
1495    // 6. Per-database context cache + dedup sets. Contexts are connected
1496    // lazily (only for databases that have a bucket needing a live read)
1497    // and reused across that database's app buckets. `seen_ledger_databases`
1498    // ensures the ledger-lifecycle diagnostics (D621/D622/D699) are
1499    // emitted once per database, not once per app bucket.
1500    let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
1501        std::collections::BTreeMap::new();
1502    let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
1503    let mut exit_code: i32 = 0;
1504
1505    // 7. Verify each bucket.
1506    for bucket in &bucket_set {
1507        // a. Resolve the per-database URL.
1508        let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
1509            let bd = if bucket.app.is_empty() {
1510                "_global_"
1511            } else {
1512                &bucket.app
1513            };
1514            eprintln!(
1515                "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
1516                 check that config.database.url has a valid path component",
1517                bucket.database, bucket.database, bd
1518            );
1519            exit_code = 1;
1520            continue;
1521        };
1522
1523        // b. Connect (lazily, once per distinct database). PG < 18 is a
1524        // server-global refusal — there is no point continuing to other
1525        // buckets, so we return 2 immediately.
1526        if !contexts.contains_key(&bucket.database) {
1527            match connect_and_check(&url).await {
1528                ContextOutcome::Ready(ctx) => {
1529                    contexts.insert(bucket.database.clone(), ctx);
1530                }
1531                ContextOutcome::UnsupportedVersion(e) => {
1532                    crate::print_support_boundary_error("migrations verify", &e);
1533                    return 2;
1534                }
1535                ContextOutcome::RuntimeError(msg) => {
1536                    eprintln!(
1537                        "djogi migrations verify: pool for '{}': {msg}",
1538                        bucket.database
1539                    );
1540                    exit_code = 1;
1541                    continue;
1542                }
1543            }
1544        }
1545
1546        // c. Load the snapshot. A missing snapshot for a bucket that HAS
1547        // registered models is a hard error (exit 1) — the operator must
1548        // record a baseline; a missing snapshot for a model-less bucket
1549        // is informational.
1550        let snap_path = snapshot_path(workspace, bucket);
1551        let snapshot = match load_snapshot(&snap_path) {
1552            Ok(s) => s,
1553            Err(SnapshotError::Io { source, .. })
1554                if source.kind() == std::io::ErrorKind::NotFound =>
1555            {
1556                let bd = if bucket.app.is_empty() {
1557                    "_global_"
1558                } else {
1559                    &bucket.app
1560                };
1561                let has_models = models
1562                    .get(bucket)
1563                    .map(|s| !s.models.is_empty())
1564                    .unwrap_or(false);
1565                if has_models {
1566                    eprintln!(
1567                        "djogi migrations verify: {}/{} has registered models but no \
1568                         snapshot; run `djogi migrations compose` then \
1569                         `djogi migrations apply` to record a baseline",
1570                        bucket.database, bd
1571                    );
1572                    exit_code = 1;
1573                } else {
1574                    println!("No snapshot found for bucket {}/{}", bucket.database, bd);
1575                }
1576                continue;
1577            }
1578            Err(e) => {
1579                let bd = if bucket.app.is_empty() {
1580                    "_global_"
1581                } else {
1582                    &bucket.app
1583                };
1584                eprintln!(
1585                    "djogi migrations verify: load snapshot for {}/{}: {e}",
1586                    bucket.database, bd
1587                );
1588                exit_code = 1;
1589                continue;
1590            }
1591        };
1592
1593        // d. Compute ledger-emission flags. The ledger is shared per
1594        // database; emit its lifecycle diagnostics once per database
1595        // (the first bucket of each database that reaches this point),
1596        // and only for databases that actually have registered models.
1597        let db_has_models = database_has_models.contains(&bucket.database);
1598        let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
1599
1600        // e. Run the bucket-scoped verify against the routed context.
1601        let ctx = contexts
1602            .get_mut(&bucket.database)
1603            .expect("context inserted above");
1604        let report = match djogi::migrate::verify_bucket(
1605            ctx,
1606            bucket,
1607            &snapshot,
1608            &policy,
1609            emit_ledger,
1610            db_has_models,
1611        )
1612        .await
1613        {
1614            Ok(r) => r,
1615            Err(e) => {
1616                let bd = if bucket.app.is_empty() {
1617                    "_global_"
1618                } else {
1619                    &bucket.app
1620                };
1621                eprintln!(
1622                    "djogi migrations verify: error for {}/{}: {e}",
1623                    bucket.database, bd
1624                );
1625                exit_code = 1;
1626                continue;
1627            }
1628        };
1629
1630        // f. Render and fold the bucket's error state into the exit code.
1631        for line in render_verify_report(&report, bucket) {
1632            println!("{line}");
1633        }
1634        if report.has_errors() {
1635            exit_code = 1;
1636        }
1637    }
1638
1639    exit_code
1640}
1641
1642/// Render a [`VerifyReport`] to a vector of output lines.
1643/// Format: one line per diagnostic with severity prefix, code, location,
1644/// and message. Summary line at the end. Output is deterministic because
1645/// `report.diagnostics` is already sorted by `(code, location)`.
1646/// Returns the lines instead of printing directly so the rendering is unit-
1647/// testable ; the caller iterates the returned vector and prints each
1648/// line. Blank separator lines are returned as empty strings.
1649fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
1650    let mut lines: Vec<String> = Vec::new();
1651
1652    let app_display = if bucket.app.is_empty() {
1653        "_global_"
1654    } else {
1655        &bucket.app
1656    };
1657    lines.push(format!(
1658        "djogi migrations verify — {}/{}",
1659        bucket.database, app_display
1660    ));
1661    lines.push("──────────────────────────────────────────".to_string());
1662
1663    match (
1664        &report.latest_applied_version,
1665        report.applied_count,
1666        report.unfinished_count,
1667    ) {
1668        (Some(version), applied, 0) => {
1669            lines.push(format!("Ledger: {applied} applied, latest {version}"));
1670        }
1671        (Some(version), applied, unfinished) => {
1672            lines.push(format!(
1673                "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
1674            ));
1675        }
1676        (None, 0, 0) => {
1677            lines.push("Ledger: empty (no migrations applied yet)".to_string());
1678        }
1679        _ => {}
1680    }
1681    lines.push(String::new());
1682
1683    if report.diagnostics.is_empty() {
1684        lines.push("No drift detected. Schema is consistent.".to_string());
1685    } else {
1686        for d in &report.diagnostics {
1687            let severity = match d.severity {
1688                VerifySeverity::Info => "INFO",
1689                VerifySeverity::Warning => "WARN",
1690                VerifySeverity::Error => "ERROR",
1691            };
1692            let location = d.location.as_deref().unwrap_or("-");
1693            lines.push(format!(
1694                "[{severity}] {code} ({loc}): {msg}",
1695                severity = severity,
1696                code = d.code,
1697                loc = location,
1698                msg = d.message
1699            ));
1700        }
1701    }
1702
1703    let errors = report
1704        .diagnostics
1705        .iter()
1706        .filter(|d| d.severity == VerifySeverity::Error)
1707        .count();
1708    let warnings = report
1709        .diagnostics
1710        .iter()
1711        .filter(|d| d.severity == VerifySeverity::Warning)
1712        .count();
1713    let infos = report
1714        .diagnostics
1715        .iter()
1716        .filter(|d| d.severity == VerifySeverity::Info)
1717        .count();
1718
1719    if errors > 0 {
1720        lines.push(String::new());
1721        lines.push(format!(
1722            "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
1723        ));
1724    } else if warnings > 0 {
1725        lines.push(String::new());
1726        lines.push(format!(
1727            "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
1728        ));
1729    } else {
1730        lines.push(String::new());
1731        lines.push(format!("Result: PASSED ({infos} info(s))"));
1732    }
1733
1734    lines
1735}
1736
1737// ── repair subcommand dispatch ────────────────────────────────────────────
1738
1739impl From<PartialApplyResolutionCli> for PartialApplyResolution {
1740    fn from(cli: PartialApplyResolutionCli) -> Self {
1741        match cli {
1742            PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
1743            PartialApplyResolutionCli::Faked => Self::MarkFaked,
1744            PartialApplyResolutionCli::Applied => Self::MarkApplied,
1745        }
1746    }
1747}
1748
1749/// `djogi migrations repair <subcommand>` entry point.
1750/// Routes each subcommand to its glue function. The glue functions own
1751/// the runtime / config / pool / lock / report-render lifecycle; this
1752/// router only destructures the parsed clap variant.
1753pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
1754    match command {
1755        RepairSubcommand::ChecksumDrift {
1756            version,
1757            app,
1758            database,
1759            checksum_up,
1760            checksum_down,
1761            workspace,
1762        } => repair_checksum_drift_cmd(
1763            &version,
1764            app.as_deref(),
1765            database.as_deref(),
1766            checksum_up.as_deref(),
1767            checksum_down.as_deref(),
1768            workspace,
1769        ),
1770        RepairSubcommand::PartialApply {
1771            version,
1772            resolution,
1773            note,
1774            app,
1775            database,
1776            workspace,
1777        } => repair_partial_apply_cmd(
1778            &version,
1779            resolution.into(),
1780            &note,
1781            app.as_deref(),
1782            database.as_deref(),
1783            workspace,
1784        ),
1785        RepairSubcommand::ResumePartial {
1786            version,
1787            app,
1788            database,
1789            workspace,
1790        } => repair_resume_partial_apply_cmd(
1791            &version,
1792            app.as_deref(),
1793            database.as_deref(),
1794            workspace,
1795        ),
1796        RepairSubcommand::SnapshotRebuild {
1797            app,
1798            database,
1799            snapshot_path,
1800            workspace,
1801        } => repair_snapshot_rebuild_cmd(
1802            app.as_deref(),
1803            database.as_deref(),
1804            snapshot_path.as_deref(),
1805            workspace,
1806        ),
1807    }
1808}
1809
1810/// Render a [`RepairReport`] to stdout. Shared across all four repair
1811/// glue functions so the operator sees a consistent action / ledger /
1812/// snapshot summary regardless of which repair ran.
1813fn render_repair_report(report: &RepairReport) {
1814    for action in &report.actions_taken {
1815        println!("  {action}");
1816    }
1817    if !report.ledger_changes.is_empty() {
1818        println!("Ledger changes:");
1819        for lc in &report.ledger_changes {
1820            println!(
1821                "  {} | {} | {} -> {}",
1822                lc.version, lc.column, lc.before, lc.after,
1823            );
1824        }
1825    }
1826    if !report.snapshot_changes.is_empty() {
1827        println!("Snapshot changes:");
1828        for sc in &report.snapshot_changes {
1829            println!("  {} | {}", sc.path.display(), sc.description);
1830        }
1831    }
1832}
1833
1834/// Map a [`RepairError`] onto the CLI exit-code contract.
1835/// `RepairError` is NOT `#[non_exhaustive]`, so this match is
1836/// **exhaustive with NO `_ =>` wildcard** by deliberate design: a future
1837/// variant breaks compilation here, forcing a conscious exit-code
1838/// classification rather than silently bucketing an unclassified error.
1839/// Classification rule — when a new variant is added, classify it the
1840/// same way:
1841/// - **Exit 1 (retryable):** variants wrapping a transient I/O /
1842///   connection / pool / SQL failure (a `source: DjogiError`, snapshot
1843///   filesystem I/O, or advisory-lock contention). A retry may succeed.
1844/// - **Exit 2 (refusal):** structural refusals and ledger-logic guards
1845///   that require operator intervention. A blind retry hits the same
1846///   refusal.
1847fn repair_error_exit_code(err: &RepairError) -> i32 {
1848    match err {
1849        // ── Exit 1: transient I/O / connection / pool / SQL failures.
1850        // These wrap a DjogiError (network, connection, query) or a
1851        // filesystem error and may succeed on retry.
1852        RepairError::LedgerIo { .. }                  // ledger DB I/O
1853        | RepairError::SnapshotIo { .. }              // snapshot filesystem I/O
1854        | RepairError::AdvisoryLockFailed { .. }      // lock held by a concurrent runner; retry after it releases
1855        | RepairError::AdvisoryLockQueryFailed { .. } // pg_try_advisory_lock query itself errored
1856        | RepairError::PinnedSessionCheckoutFailed { .. } // could not check out a pinned session from the pool
1857        | RepairError::ResumeStepFailed { .. }        // a replayed statement failed; partial state recorded, retryable
1858        | RepairError::ResumeProgressAckFailed { .. } // step committed but the progress ack write failed; retryable
1859        => 1,
1860
1861        // ── Exit 2: refusals and structural / ledger-logic guards.
1862        // The operator must investigate and intervene; a blind retry
1863        // would hit the same refusal.
1864        RepairError::VersionNotFound { .. }
1865        | RepairError::InsufficientConfirmation
1866        | RepairError::InvalidChecksum { .. }
1867        | RepairError::InvalidResolution { .. }
1868        | RepairError::BucketAppMismatch { .. }
1869        | RepairError::PlanVersionMismatch { .. }
1870        | RepairError::PlanChecksumMismatch { .. }
1871        | RepairError::LeafIdentityMismatch { .. }
1872        | RepairError::NothingToResume { .. }
1873        | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
1874        | RepairError::SuppliedSnapshotDiverges { .. }
1875        | RepairError::AdvisoryUnlockReturnedFalse { .. } // session-pinning correctness failure — not a blind retry
1876        | RepairError::ResumePlanShapeMismatch { .. }
1877        | RepairError::ReplayPlanShapeMismatch { .. }
1878        => 2,
1879    }
1880}
1881
1882/// Resolve the database name for bucket construction. Uses the explicit
1883/// `--database` flag if provided, otherwise defaults to `"main"` (the
1884/// global database name — see [`djogi::apps::AppDescriptor::GLOBAL_DATABASE`]).
1885/// `_config` is threaded so this single helper can grow a config-driven
1886/// default database (should `DjogiConfig` gain one) without changing
1887/// every call site.
1888fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
1889    database.unwrap_or("main").to_string()
1890}
1891
1892/// Compute the `V1:`-prefixed checksum of a committed up SQL file on disk,
1893/// using the canonical fragment-level domain (strips the composed-file
1894/// header and label comments, matching what compose stores in the ledger).
1895/// The naive whole-file checksum is WRONG here: compose stores checksums
1896/// computed over the [`djogi::migrate::OperationSql`] fragments only,
1897/// without the rendered file's `-- Djogi composed migration — up` header
1898/// or the per-statement label comment lines. Recomputing over the full
1899/// file content would never match the ledger value, so the drift repair
1900/// would write a checksum that immediately re-drifts. Delegating to
1901/// [`djogi::migrate::compute_committed_sql_checksum`] keeps the CLI's
1902/// recompute path in the same domain as compose.
1903/// Returns the underlying [`std::io::Error`] unchanged so the caller can
1904/// surface a missing/unreadable up file as a retryable I/O error.
1905fn compute_checksum_up_from_disk(
1906    workspace: &Path,
1907    bucket: &djogi::migrate::BucketKey,
1908    version: &str,
1909) -> std::io::Result<String> {
1910    let path =
1911        djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
1912    let sql = std::fs::read_to_string(&path)?;
1913    Ok(djogi::migrate::compute_committed_sql_checksum(
1914        &sql,
1915        djogi::migrate::ResetSqlSide::Up,
1916    ))
1917}
1918
1919/// Compute the canonical checksum of a committed down SQL file on disk,
1920/// using the same fragment-level domain as compose (see
1921/// [`compute_checksum_up_from_disk`] for why the whole-file checksum is
1922/// wrong).
1923/// Returns `Ok(None)` when the file is absent
1924/// ([`std::io::ErrorKind::NotFound`]) or when the file contains only SQL
1925/// comments — both map onto compose's `NULL` `checksum_down` sentinel for
1926/// comment-only down files. Returns `Err` for any other I/O failure so a
1927/// retry after the file is restored can succeed.
1928fn compute_checksum_down_from_disk(
1929    workspace: &Path,
1930    bucket: &djogi::migrate::BucketKey,
1931    version: &str,
1932) -> std::io::Result<Option<String>> {
1933    let path =
1934        djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
1935    let sql = match std::fs::read_to_string(&path) {
1936        Ok(s) => s,
1937        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1938        Err(e) => return Err(e),
1939    };
1940    Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
1941}
1942
1943/// `djogi migrations repair checksum-drift` entry point.
1944/// Updates the `checksum_up` / `checksum_down` columns of an
1945/// already-applied ledger row after its committed SQL was edited. When
1946/// `--checksum-up` / `--checksum-down` are omitted, the checksums are
1947/// recomputed from the committed files on disk (a missing down file is a
1948/// no-op; any other read error aborts with exit 1).
1949pub fn repair_checksum_drift_cmd(
1950    version: &str,
1951    app: Option<&str>,
1952    database: Option<&str>,
1953    checksum_up: Option<&str>,
1954    checksum_down: Option<&str>,
1955    workspace: Option<PathBuf>,
1956) -> ExitCode {
1957    let workspace = resolve_workspace(workspace);
1958    let runtime = match tokio::runtime::Builder::new_current_thread()
1959        .enable_all()
1960        .build()
1961    {
1962        Ok(r) => r,
1963        Err(e) => {
1964            eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
1965            return ExitCode::from(1);
1966        }
1967    };
1968    let exit = runtime.block_on(async {
1969        run_repair_checksum_drift(
1970            &workspace,
1971            version,
1972            app,
1973            database,
1974            checksum_up,
1975            checksum_down,
1976        )
1977        .await
1978    });
1979    ExitCode::from(exit as u8)
1980}
1981
1982/// Async body of [`repair_checksum_drift_cmd`]. Returns the desired exit code.
1983async fn run_repair_checksum_drift(
1984    workspace: &Path,
1985    version: &str,
1986    app: Option<&str>,
1987    database: Option<&str>,
1988    checksum_up: Option<&str>,
1989    checksum_down: Option<&str>,
1990) -> i32 {
1991    use djogi::config::DjogiConfig;
1992
1993    let config = match DjogiConfig::load_from_workspace(workspace) {
1994        Ok(c) => c,
1995        Err(e) => {
1996            eprintln!("djogi migrations repair checksum-drift: config load: {e}");
1997            return 1;
1998        }
1999    };
2000
2001    // Resolve the per-database URL BEFORE connecting: `--database
2002    // crud_log` / `event_log` operate on a different bucket's ledger than
2003    // the app DB, so connecting to `config.database.url` first would
2004    // silently mutate the wrong database.
2005    let db_name = resolve_database(database, &config);
2006    let url = match resolve_bucket_url(&config.database, &db_name) {
2007        Some(u) => u,
2008        None => {
2009            eprintln!(
2010                "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
2011            );
2012            return 2;
2013        }
2014    };
2015
2016    let mut ctx = match connect_and_check(&url).await {
2017        ContextOutcome::Ready(ctx) => ctx,
2018        ContextOutcome::UnsupportedVersion(e) => {
2019            crate::print_support_boundary_error("migrations repair checksum-drift", &e);
2020            return 2;
2021        }
2022        ContextOutcome::RuntimeError(msg) => {
2023            eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
2024            return 1;
2025        }
2026    };
2027
2028    let lock_path = workspace.join(LOCK_FILE_NAME);
2029    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2030        Ok(g) => g,
2031        Err(e) => {
2032            eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
2033            return 1;
2034        }
2035    };
2036
2037    let app_label = app.unwrap_or("");
2038    let bucket = BucketKey {
2039        database: db_name,
2040        app: app_label.to_string(),
2041    };
2042
2043    let new_checksum_up = match checksum_up {
2044        Some(c) => c.to_string(),
2045        None => {
2046            // Auto-compute from the committed up SQL file on disk. A
2047            // missing or unreadable up file is an environment I/O error,
2048            // not an operator-facing refusal — exit 1 (same class as the
2049            // down file's non-NotFound branch below), so a retry after
2050            // the file is restored can succeed.
2051            match compute_checksum_up_from_disk(workspace, &bucket, version) {
2052                Ok(cs) => cs,
2053                Err(e) => {
2054                    eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
2055                    return 1;
2056                }
2057            }
2058        }
2059    };
2060
2061    let resolved_checksum_down = match checksum_down {
2062        Some(c) => Some(c.to_string()),
2063        None => {
2064            // Auto-compute from the down file; a missing down file (or a
2065            // comment-only down file) is a no-op (no down checksum), other
2066            // read errors surface. NotFound is folded into `Ok(None)` by
2067            // `compute_checksum_down_from_disk`.
2068            match compute_checksum_down_from_disk(workspace, &bucket, version) {
2069                Ok(cs_opt) => cs_opt,
2070                Err(e) => {
2071                    eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
2072                    return 1;
2073                }
2074            }
2075        }
2076    };
2077
2078    match repair_checksum_drift(
2079        &mut ctx,
2080        &guard,
2081        &bucket,
2082        version,
2083        &new_checksum_up,
2084        resolved_checksum_down.as_deref(),
2085        RepairConfirmation::OperatorAcknowledged,
2086    )
2087    .await
2088    {
2089        Ok(report) => {
2090            render_repair_report(&report);
2091            0
2092        }
2093        Err(e) => {
2094            eprintln!("djogi migrations repair checksum-drift: {e}");
2095            repair_error_exit_code(&e)
2096        }
2097    }
2098}
2099
2100/// `djogi migrations repair partial-apply` entry point.
2101/// Resolves a partial-apply ledger row by rewriting its status to
2102/// `rolled_back`, `faked`, or `applied`. No SQL executes — only the
2103/// ledger row is mutated.
2104pub fn repair_partial_apply_cmd(
2105    version: &str,
2106    resolution: PartialApplyResolution,
2107    note: &str,
2108    app: Option<&str>,
2109    database: Option<&str>,
2110    workspace: Option<PathBuf>,
2111) -> ExitCode {
2112    let workspace = resolve_workspace(workspace);
2113    let runtime = match tokio::runtime::Builder::new_current_thread()
2114        .enable_all()
2115        .build()
2116    {
2117        Ok(r) => r,
2118        Err(e) => {
2119            eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
2120            return ExitCode::from(1);
2121        }
2122    };
2123    let exit = runtime.block_on(async {
2124        run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
2125    });
2126    ExitCode::from(exit as u8)
2127}
2128
2129/// Async body of [`repair_partial_apply_cmd`]. Returns the desired exit code.
2130async fn run_repair_partial_apply(
2131    workspace: &Path,
2132    version: &str,
2133    resolution: PartialApplyResolution,
2134    note: &str,
2135    app: Option<&str>,
2136    database: Option<&str>,
2137) -> i32 {
2138    use djogi::config::DjogiConfig;
2139
2140    let config = match DjogiConfig::load_from_workspace(workspace) {
2141        Ok(c) => c,
2142        Err(e) => {
2143            eprintln!("djogi migrations repair partial-apply: config load: {e}");
2144            return 1;
2145        }
2146    };
2147
2148    // Resolve the per-database URL BEFORE connecting: `--database
2149    // crud_log` / `event_log` operate on a different bucket's ledger than
2150    // the app DB, so connecting to `config.database.url` first would
2151    // silently mutate the wrong database.
2152    let db_name = resolve_database(database, &config);
2153    let url = match resolve_bucket_url(&config.database, &db_name) {
2154        Some(u) => u,
2155        None => {
2156            eprintln!(
2157                "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
2158            );
2159            return 2;
2160        }
2161    };
2162
2163    let mut ctx = match connect_and_check(&url).await {
2164        ContextOutcome::Ready(ctx) => ctx,
2165        ContextOutcome::UnsupportedVersion(e) => {
2166            crate::print_support_boundary_error("migrations repair partial-apply", &e);
2167            return 2;
2168        }
2169        ContextOutcome::RuntimeError(msg) => {
2170            eprintln!("djogi migrations repair partial-apply: pool: {msg}");
2171            return 1;
2172        }
2173    };
2174
2175    let lock_path = workspace.join(LOCK_FILE_NAME);
2176    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2177        Ok(g) => g,
2178        Err(e) => {
2179            eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
2180            return 1;
2181        }
2182    };
2183
2184    let app_label = app.unwrap_or("");
2185    let bucket = BucketKey {
2186        database: db_name,
2187        app: app_label.to_string(),
2188    };
2189
2190    match repair_partial_apply(
2191        &mut ctx,
2192        &guard,
2193        &bucket,
2194        version,
2195        resolution,
2196        note,
2197        RepairConfirmation::OperatorAcknowledged,
2198    )
2199    .await
2200    {
2201        Ok(report) => {
2202            render_repair_report(&report);
2203            0
2204        }
2205        Err(e) => {
2206            eprintln!("djogi migrations repair partial-apply: {e}");
2207            repair_error_exit_code(&e)
2208        }
2209    }
2210}
2211
2212/// `djogi migrations repair resume-partial` entry point.
2213/// Resumes an interrupted non-transactional apply by loading the
2214/// committed `<version>.plan.json` and replaying its remaining steps.
2215/// Loads the committed plan directly (no CLI-level checksum pre-gate);
2216/// the library validates the plan against the ledger row internally.
2217pub fn repair_resume_partial_apply_cmd(
2218    version: &str,
2219    app: Option<&str>,
2220    database: Option<&str>,
2221    workspace: Option<PathBuf>,
2222) -> ExitCode {
2223    let workspace = resolve_workspace(workspace);
2224    let runtime = match tokio::runtime::Builder::new_current_thread()
2225        .enable_all()
2226        .build()
2227    {
2228        Ok(r) => r,
2229        Err(e) => {
2230            eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
2231            return ExitCode::from(1);
2232        }
2233    };
2234    let exit = runtime
2235        .block_on(async { run_repair_resume_partial(&workspace, version, app, database).await });
2236    ExitCode::from(exit as u8)
2237}
2238
2239/// Async body of [`repair_resume_partial_apply_cmd`]. Returns the desired exit code.
2240async fn run_repair_resume_partial(
2241    workspace: &Path,
2242    version: &str,
2243    app: Option<&str>,
2244    database: Option<&str>,
2245) -> i32 {
2246    use djogi::config::DjogiConfig;
2247
2248    let config = match DjogiConfig::load_from_workspace(workspace) {
2249        Ok(c) => c,
2250        Err(e) => {
2251            eprintln!("djogi migrations repair resume-partial: config load: {e}");
2252            return 1;
2253        }
2254    };
2255
2256    // Resolve the per-database URL BEFORE connecting: `--database
2257    // crud_log` / `event_log` operate on a different bucket's ledger than
2258    // the app DB, so connecting to `config.database.url` first would
2259    // silently mutate the wrong database.
2260    let db_name = resolve_database(database, &config);
2261    let url = match resolve_bucket_url(&config.database, &db_name) {
2262        Some(u) => u,
2263        None => {
2264            eprintln!(
2265                "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
2266            );
2267            return 2;
2268        }
2269    };
2270
2271    let mut ctx = match connect_and_check(&url).await {
2272        ContextOutcome::Ready(ctx) => ctx,
2273        ContextOutcome::UnsupportedVersion(e) => {
2274            crate::print_support_boundary_error("migrations repair resume-partial", &e);
2275            return 2;
2276        }
2277        ContextOutcome::RuntimeError(msg) => {
2278            eprintln!("djogi migrations repair resume-partial: pool: {msg}");
2279            return 1;
2280        }
2281    };
2282
2283    let lock_path = workspace.join(LOCK_FILE_NAME);
2284    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2285        Ok(g) => g,
2286        Err(e) => {
2287            eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
2288            return 1;
2289        }
2290    };
2291
2292    let app_label = app.unwrap_or("");
2293    let bucket = BucketKey {
2294        database: db_name,
2295        app: app_label.to_string(),
2296    };
2297
2298    // Load the committed replay plan directly from disk — no CLI-level
2299    // checksum pre-gate, because repair_resume_partial_apply validates
2300    // plan↔ledger checksums itself. Synthesizing a whole-file checksum
2301    // here would not match the per-statement-fragment checksums stored
2302    // in the plan JSON.
2303    let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
2304        Ok(p) => p,
2305        Err(e) => {
2306            eprintln!("djogi migrations repair resume-partial: load plan: {e}");
2307            return 2;
2308        }
2309    };
2310
2311    match repair_resume_partial_apply(
2312        &mut ctx,
2313        &guard,
2314        version,
2315        &plan,
2316        RepairConfirmation::OperatorAcknowledged,
2317    )
2318    .await
2319    {
2320        Ok(report) => {
2321            render_repair_report(&report);
2322            0
2323        }
2324        Err(e) => {
2325            eprintln!("djogi migrations repair resume-partial: {e}");
2326            repair_error_exit_code(&e)
2327        }
2328    }
2329}
2330
2331/// Load the committed `<version>.plan.json` for `resume-partial` without
2332/// the CLI-level checksum pre-gate.
2333/// [`repair_resume_partial_apply`] validates the plan against the ledger
2334/// row internally (`PlanVersionMismatch` / `PlanChecksumMismatch`), so
2335/// re-gating here with a hand-rolled whole-file checksum would be both
2336/// wrong (the plan stores per-statement-fragment checksums) and
2337/// redundant. This helper therefore deliberately does NOT reuse
2338/// [`load_replay_plan_from_disk`] (a pending-apply helper that DOES
2339/// checksum-gate) — it reuses only that function's `CliReplay*`
2340/// deserialization + segment-conversion shape.
2341/// Returns a human-readable error string on a missing/unparseable plan
2342/// file or a format-version mismatch. A missing plan file maps to exit 2
2343/// at the call site (the committed plan is a precondition of resume).
2344fn load_committed_plan_for_resume(
2345    workspace: &Path,
2346    bucket: &djogi::migrate::BucketKey,
2347    version: &str,
2348) -> Result<djogi::migrate::MigrationPlan, String> {
2349    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
2350    let plan_path = bucket_dir.join(format!("{version}.plan.json"));
2351    let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
2352    let stored: CliReplayPlan = serde_json::from_slice(&bytes)
2353        .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
2354    if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
2355        return Err(format!(
2356            "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
2357            plan_path.display(),
2358            stored.format_version,
2359        ));
2360    }
2361    Ok(djogi::migrate::MigrationPlan {
2362        bucket: bucket.clone(),
2363        classification: stored.classification.into(),
2364        segments: stored
2365            .segments
2366            .into_iter()
2367            .map(|seg| djogi::migrate::Segment {
2368                kind: seg.kind.into(),
2369                statements: seg
2370                    .statements
2371                    .into_iter()
2372                    .map(|stmt| djogi::migrate::OperationSql {
2373                        label: stmt.label,
2374                        up: stmt.up,
2375                        down: String::new(),
2376                        lossy: None,
2377                    })
2378                    .collect(),
2379            })
2380            .collect(),
2381    })
2382}
2383
2384/// `djogi migrations repair snapshot-rebuild` entry point.
2385/// Rebuilds a bucket's schema snapshot by walking the ledger and
2386/// re-projecting from live database state. When `--snapshot-path` is
2387/// omitted, the path is derived from
2388/// `migrations/<database>/<app>/schema_snapshot.json`.
2389pub fn repair_snapshot_rebuild_cmd(
2390    app: Option<&str>,
2391    database: Option<&str>,
2392    snapshot_path: Option<&Path>,
2393    workspace: Option<PathBuf>,
2394) -> ExitCode {
2395    let workspace = resolve_workspace(workspace);
2396    let runtime = match tokio::runtime::Builder::new_current_thread()
2397        .enable_all()
2398        .build()
2399    {
2400        Ok(r) => r,
2401        Err(e) => {
2402            eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
2403            return ExitCode::from(1);
2404        }
2405    };
2406    let exit = runtime.block_on(async {
2407        run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
2408    });
2409    ExitCode::from(exit as u8)
2410}
2411
2412/// Async body of [`repair_snapshot_rebuild_cmd`]. Returns the desired exit code.
2413async fn run_repair_snapshot_rebuild(
2414    workspace: &Path,
2415    app: Option<&str>,
2416    database: Option<&str>,
2417    snapshot_path: Option<&Path>,
2418) -> i32 {
2419    use djogi::config::DjogiConfig;
2420
2421    let config = match DjogiConfig::load_from_workspace(workspace) {
2422        Ok(c) => c,
2423        Err(e) => {
2424            eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
2425            return 1;
2426        }
2427    };
2428
2429    // Resolve the per-database URL BEFORE connecting: `--database
2430    // crud_log` / `event_log` operate on a different bucket's ledger than
2431    // the app DB, so connecting to `config.database.url` first would
2432    // silently rebuild the snapshot from the wrong database.
2433    let db_name = resolve_database(database, &config);
2434    let url = match resolve_bucket_url(&config.database, &db_name) {
2435        Some(u) => u,
2436        None => {
2437            eprintln!(
2438                "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
2439            );
2440            return 2;
2441        }
2442    };
2443
2444    let mut ctx = match connect_and_check(&url).await {
2445        ContextOutcome::Ready(ctx) => ctx,
2446        ContextOutcome::UnsupportedVersion(e) => {
2447            crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
2448            return 2;
2449        }
2450        ContextOutcome::RuntimeError(msg) => {
2451            eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
2452            return 1;
2453        }
2454    };
2455
2456    let lock_path = workspace.join(LOCK_FILE_NAME);
2457    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2458        Ok(g) => g,
2459        Err(e) => {
2460            eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
2461            return 1;
2462        }
2463    };
2464
2465    let app_label = app.unwrap_or("");
2466    let bucket = BucketKey {
2467        database: db_name,
2468        app: app_label.to_string(),
2469    };
2470
2471    let snap_path = match snapshot_path {
2472        Some(p) => p.to_path_buf(),
2473        None => reconstruct_snapshot_path(workspace, &bucket),
2474    };
2475
2476    match repair_snapshot_rebuild(
2477        &mut ctx,
2478        &guard,
2479        &bucket,
2480        &snap_path,
2481        RepairConfirmation::OperatorAcknowledged,
2482    )
2483    .await
2484    {
2485        Ok(report) => {
2486            render_repair_report(&report);
2487            0
2488        }
2489        Err(e) => {
2490            eprintln!("djogi migrations repair snapshot-rebuild: {e}");
2491            repair_error_exit_code(&e)
2492        }
2493    }
2494}
2495
2496// ── baseline command ──────────────────────────────────────────────────────
2497
2498/// `djogi migrations baseline` entry point.
2499/// Establishes a baseline ledger row + snapshot for an existing
2500/// database adopted under Djogi's migration ledger. The schema already
2501/// exists, so `compose` + `apply` cannot run against the populated
2502/// database without a starting point; baseline projects the live
2503/// catalog into a single `baseline` ledger row (no SQL runs against
2504/// user tables) and persists the projected snapshot as the canonical
2505/// baseline so future migrations diff against the real DB state.
2506/// `--reason` is required and must be non-empty — it is recorded in the
2507/// ledger row's `partial_apply_note` for the audit trail. An empty
2508/// reason is a refusal (exit 2) caught before any DB work.
2509/// Exit codes: `0` success, `1` runtime error (config / pool / projection
2510/// failure), `2` refusal (empty `--reason`, unresolvable database URL,
2511/// duplicate version collision, snapshot-persist failure after ledger
2512/// insert, session-pinning correctness failure, or below PG 18).
2513pub fn baseline_cmd(
2514    version: &str,
2515    description: &str,
2516    reason: &str,
2517    app: Option<&str>,
2518    database: Option<&str>,
2519    workspace: Option<PathBuf>,
2520) -> ExitCode {
2521    // Validate --reason before any expensive work, mirroring the
2522    // `apply --fake --reason` empty-reason gate. The library's
2523    // baseline_plan does not itself reject an empty reason (it records
2524    // whatever string it is handed), so the CLI owns this guard.
2525    if reason.trim().is_empty() {
2526        eprintln!(
2527            "djogi migrations baseline: --reason must not be empty; \
2528             supply a non-empty reason why this baseline is being established \
2529             (e.g. 'schema pre-exists from prior tooling'). \
2530             This is recorded in the ledger audit trail."
2531        );
2532        return ExitCode::from(2);
2533    }
2534
2535    let workspace = resolve_workspace(workspace);
2536    let runtime = match tokio::runtime::Builder::new_current_thread()
2537        .enable_all()
2538        .build()
2539    {
2540        Ok(r) => r,
2541        Err(e) => {
2542            eprintln!("djogi migrations baseline: tokio runtime: {e}");
2543            return ExitCode::from(1);
2544        }
2545    };
2546    let exit = runtime.block_on(async {
2547        run_baseline(&workspace, version, description, reason, app, database).await
2548    });
2549    ExitCode::from(exit as u8)
2550}
2551
2552/// Async body of [`baseline_cmd`]. Returns the desired exit code.
2553/// Resolves the per-database URL BEFORE connecting (a `--database
2554/// crud_log` / `event_log` baseline targets a different bucket's ledger
2555/// than the app DB), connects + runs the PG-version preflight via
2556/// [`connect_and_check`], acquires the workspace file lock, then drives
2557/// [`baseline_plan`]. The runner projects the live schema itself and
2558/// computes the baseline checksum from that projection, so the
2559/// `RunnerCtx` is constructed with `snapshot: None` (requires the
2560/// caller NOT supply a snapshot) and an empty `checksum_up` (the
2561/// baseline path never reads it).
2562async fn run_baseline(
2563    workspace: &Path,
2564    version: &str,
2565    description: &str,
2566    reason: &str,
2567    app: Option<&str>,
2568    database: Option<&str>,
2569) -> i32 {
2570    use djogi::config::DjogiConfig;
2571
2572    let config = match DjogiConfig::load_from_workspace(workspace) {
2573        Ok(c) => c,
2574        Err(e) => {
2575            eprintln!("djogi migrations baseline: config load: {e}");
2576            return 1;
2577        }
2578    };
2579
2580    // Resolve the per-database URL BEFORE connecting: `--database
2581    // crud_log` / `event_log` operate on a different bucket's ledger
2582    // than the app DB, so connecting to `config.database.url` first
2583    // would silently baseline the wrong database.
2584    let db_name = resolve_database(database, &config);
2585    let url = match resolve_bucket_url(&config.database, &db_name) {
2586        Some(u) => u,
2587        None => {
2588            eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
2589            return 2;
2590        }
2591    };
2592
2593    let mut ctx = match connect_and_check(&url).await {
2594        ContextOutcome::Ready(ctx) => ctx,
2595        ContextOutcome::UnsupportedVersion(e) => {
2596            crate::print_support_boundary_error("migrations baseline", &e);
2597            return 2;
2598        }
2599        ContextOutcome::RuntimeError(msg) => {
2600            eprintln!("djogi migrations baseline: pool: {msg}");
2601            return 1;
2602        }
2603    };
2604
2605    let lock_path = workspace.join(LOCK_FILE_NAME);
2606    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2607        Ok(g) => g,
2608        Err(e) => {
2609            eprintln!("djogi migrations baseline: workspace lock: {e}");
2610            return 1;
2611        }
2612    };
2613
2614    let app_label = app.unwrap_or("");
2615    let bucket = BucketKey {
2616        database: db_name,
2617        app: app_label.to_string(),
2618    };
2619
2620    let runner_ctx = RunnerCtx {
2621        bucket: bucket.clone(),
2622        version: version.to_string(),
2623        description: description.to_string(),
2624        // baseline_plan computes checksum_up from the live projection;
2625        // this field is not read on the baseline code path.
2626        checksum_up: String::new(),
2627        checksum_down: None,
2628        // baseline_plan refuses a caller-supplied snapshot — it
2629        // projects the live DB itself. Leave this None; the projection
2630        // is persisted to `snapshot_path` below.
2631        snapshot: None,
2632        snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
2633        // MigrateConfig does not derive Clone; construct from fields
2634        // (same pattern as apply_one_pending).
2635        config: djogi::config::MigrateConfig {
2636            concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
2637            strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
2638            pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
2639            pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
2640        },
2641        out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
2642        audit_pool: match djogi::migrate::resolve_audit_url(&config) {
2643            Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
2644            Err(_) => None,
2645        },
2646    };
2647
2648    match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
2649        Ok(report) => {
2650            println!(
2651                "djogi migrations baseline: established baseline `{}` \
2652                 (ledger_id={}) in {:.1}s",
2653                version,
2654                report.ledger_id,
2655                report.execution_time_ms as f64 / 1000.0
2656            );
2657            0
2658        }
2659        Err(e) => {
2660            eprintln!("djogi migrations baseline: {e}");
2661            baseline_error_exit_code(&e)
2662        }
2663    }
2664}
2665
2666/// Map a [`RunnerError`] produced by [`baseline_plan`] onto the CLI
2667/// exit-code contract.
2668/// The flat [`runner_error_exit_code`] (always `1`) is wrong for
2669/// baseline: a duplicate-version collision is a refusal the operator
2670/// must resolve by choosing a new version, and a blind retry hits the
2671/// same collision — that must surface as exit `2`, matching the
2672/// `migrations apply` doc-contract ("re-running reports
2673/// `VersionAlreadyApplied` (exit 2)") and the `repair` family's
2674/// [`repair_error_exit_code`] convention.
2675/// `RunnerError` is `#[non_exhaustive]`, so the wildcard arm is
2676/// load-bearing: any variant NOT named below defaults to exit `1`
2677/// (transient — a retry may succeed). That is the safe default for the
2678/// I/O- and connection-shaped variants the baseline path can hit
2679/// (projection failure, ledger bootstrap / write / query failure,
2680/// snapshot persist failure, pinned-session checkout failure,
2681/// advisory-lock contention). Only the genuine refusals are pulled out
2682/// into the exit-`2` arm.
2683fn baseline_error_exit_code(err: &RunnerError) -> i32 {
2684    match err {
2685        // ── Exit 2: refusals — the operator must intervene; a blind
2686        // retry hits the same condition.
2687        // - A duplicate version (terminal or non-terminal) means the
2688        // chosen baseline version is already taken; pick another.
2689        // - A caller-supplied snapshot is a programming error in the
2690        // wiring (the CLI always passes `snapshot: None`), surfaced
2691        // as a structural refusal rather than a retryable fault.
2692        // - An out-of-order rejection is a policy refusal identical to
2693        // the apply path's.
2694        // - AdvisoryUnlockReturnedFalse is a session-pinning correctness
2695        // failure (PG returned false for pg_advisory_unlock); it is not
2696        // transient — matches the repair family's exit-2 treatment.
2697        // - SnapshotPersistFailed in the baseline path is a post-ledger
2698        // failure: baseline_inner inserts the terminal ledger row BEFORE
2699        // writing the snapshot. A retry with the same version therefore
2700        // hits VersionAlreadyApplied (exit 2) before it can write the
2701        // snapshot. Exit 1 (retryable) would give false hope; exit 2
2702        // signals that operator intervention is needed (run
2703        // `repair snapshot-rebuild` or choose a new version).
2704        RunnerError::VersionAlreadyApplied { .. }
2705        | RunnerError::VersionCollisionNonTerminal { .. }
2706        | RunnerError::BaselineSnapshotShouldNotBeProvided
2707        | RunnerError::AdvisoryUnlockReturnedFalse { .. }
2708        | RunnerError::SnapshotPersistFailed { .. }
2709        | RunnerError::OutOfOrderRejected { .. } => 2,
2710        // ── Exit 1: everything else (transient I/O / connection / SQL /
2711        // projection failures). `#[non_exhaustive]` makes this wildcard
2712        // mandatory; new transient-shaped variants inherit the retryable
2713        // default.
2714        _ => 1,
2715    }
2716}
2717
2718#[cfg(test)]
2719mod tests {
2720    use super::*;
2721    use std::fs;
2722    use std::sync::atomic::{AtomicUsize, Ordering};
2723
2724    fn temp_workspace(tag: &str) -> std::path::PathBuf {
2725        static COUNTER: AtomicUsize = AtomicUsize::new(0);
2726        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2727        let nanos = std::time::SystemTime::now()
2728            .duration_since(std::time::UNIX_EPOCH)
2729            .unwrap()
2730            .as_nanos();
2731        let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
2732        fs::create_dir_all(&p).unwrap();
2733        p
2734    }
2735
2736    /// The CLI's bucket-discovery walk must include directories that exist
2737    /// on disk but are absent from the current model inventory (the
2738    /// renamed-from case).
2739    #[test]
2740    fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
2741        let work = temp_workspace("b1_discover");
2742        // Lay down a `migrations/main/billing/schema_snapshot.json`
2743        // the OLD app's snapshot. The current model inventory
2744        // would NOT have this bucket because the app moved to
2745        // `invoicing` via `#[app(renamed_from = "billing")]`.
2746        let billing_dir = work.join("migrations/main/billing");
2747        fs::create_dir_all(&billing_dir).unwrap();
2748        fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
2749        // A second bucket — the global one for the same database
2750        // exists too. Exercise the multi-bucket walk.
2751        let global_dir = work.join("migrations/main/_global_");
2752        fs::create_dir_all(&global_dir).unwrap();
2753        fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
2754        // A third on-disk directory WITHOUT a snapshot file — must
2755        // not be reported (we only union buckets that actually
2756        // shipped a snapshot).
2757        let no_snap_dir = work.join("migrations/main/empty_app");
2758        fs::create_dir_all(&no_snap_dir).unwrap();
2759
2760        let buckets = discover_snapshot_buckets_on_disk(&work);
2761        let labels: std::collections::BTreeSet<&str> =
2762            buckets.iter().map(|b| b.app.as_str()).collect();
2763        assert!(
2764            labels.contains("billing"),
2765            "must include the renamed-from bucket: {labels:?}"
2766        );
2767        assert!(
2768            labels.contains(""),
2769            "must include the global bucket: {labels:?}"
2770        );
2771        assert!(
2772            !labels.contains("empty_app"),
2773            "must not include directories without a snapshot: {labels:?}"
2774        );
2775        let _ = fs::remove_dir_all(&work);
2776    }
2777
2778    /// The resolved workspace flows into config loading.
2779    /// `load_from_workspace` must read `<workspace>/Djogi.toml` not
2780    /// the cwd's. We assert that by writing a custom config with a
2781    /// distinctive `database.url` and confirming the loader sees it.
2782    #[test]
2783    fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
2784        let work = temp_workspace("a1_workspace_config");
2785        let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
2786                    max_connections = 1\ndev_mode = false\n\
2787                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
2788        fs::write(work.join("Djogi.toml"), toml).unwrap();
2789        // Save and clear DATABASE_URL so the env override doesn't
2790        // mask the file value during this test.
2791        let prior = std::env::var("DATABASE_URL").ok();
2792        // SAFETY: tests run with --test-threads=1 per the project's
2793        // pre-commit policy, so concurrent env mutation is not a
2794        // concern in this configuration.
2795        unsafe { std::env::remove_var("DATABASE_URL") };
2796        let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
2797        assert_eq!(
2798            config.database.url,
2799            "postgres://discovered-by-workspace-flag/test"
2800        );
2801        assert_eq!(config.server.port, 1234);
2802        if let Some(v) = prior {
2803            unsafe { std::env::set_var("DATABASE_URL", v) };
2804        }
2805        let _ = fs::remove_dir_all(&work);
2806    }
2807
2808    /// Env override precedence: A `DATABASE_URL` in the environment
2809    /// must beat any value in
2810    /// `<workspace>/Djogi.toml`, matching the security contract that
2811    /// secrets only live in env vars.
2812    #[test]
2813    fn a1_round2_env_override_beats_workspace_toml() {
2814        let work = temp_workspace("a1r2_env_override");
2815        let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
2816                    max_connections = 1\ndev_mode = false\n\
2817                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
2818        fs::write(work.join("Djogi.toml"), toml).unwrap();
2819        let prior = std::env::var("DATABASE_URL").ok();
2820        // SAFETY: --test-threads=1; no concurrent env mutation.
2821        unsafe { std::env::set_var("DATABASE_URL", "postgres://from-env/test") };
2822        let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
2823        assert_eq!(
2824            config.database.url, "postgres://from-env/test",
2825            "env DATABASE_URL must win over workspace Djogi.toml"
2826        );
2827        match prior {
2828            Some(v) => unsafe { std::env::set_var("DATABASE_URL", v) },
2829            None => unsafe { std::env::remove_var("DATABASE_URL") },
2830        }
2831        let _ = fs::remove_dir_all(&work);
2832    }
2833
2834    /// `compose_with_inputs` must consume the disk-discovered buckets, not
2835    /// just the inventory's. We set up a
2836    /// `migrations/main/billing/schema_snapshot.json` with a `widgets`
2837    /// table, pass an EMPTY models map (simulating "billing app was
2838    /// removed from the workspace"), set `allow_destructive = true`,
2839    /// and assert the resulting up SQL contains `DROP TABLE
2840    /// "widgets"`. If the disk-walk regressed and `compose_with_inputs`
2841    /// only loaded snapshots for inventory-known buckets, the differ
2842    /// would never see billing's snapshot and the compose would exit
2843    /// `NothingToCompose` (no DROP, no SQL written).
2844    /// End-to-end regression guard.
2845    #[test]
2846    fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
2847        use djogi::migrate::projection::BucketKey;
2848        use djogi::migrate::schema::{
2849            ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
2850        };
2851        use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
2852        use std::collections::BTreeMap;
2853
2854        let work = temp_workspace("b1r2_compose_uses_discovery");
2855
2856        // Build a billing-bucket snapshot with one `widgets` table
2857        // and write it to disk under `migrations/main/billing/`.
2858        let billing_bucket = BucketKey {
2859            database: "main".into(),
2860            app: "billing".into(),
2861        };
2862        let mut billing_snap = AppliedSchema {
2863            djogi_version: env!("CARGO_PKG_VERSION").to_string(),
2864            enums: BTreeMap::new(),
2865            format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
2866            generated_at: "2026-04-25T00:00:00Z".to_string(),
2867            indexes: Vec::new(),
2868            models: BTreeMap::new(),
2869            registered_apps: vec!["billing".to_string()],
2870        };
2871        billing_snap.models.insert(
2872            "widgets".to_string(),
2873            TableSchema {
2874                app: Some("billing".to_string()),
2875                columns: vec![ColumnSchema {
2876                    check: None,
2877                    comment: None,
2878                    default_sql: Some("heerid_next_desc()".to_string()),
2879                    foreign_key: None,
2880                    generated: None,
2881                    identity: None,
2882                    index_type: None,
2883                    indexed: false,
2884                    max_length: None,
2885                    name: "id".to_string(),
2886                    nullable: false,
2887                    on_delete: None,
2888                    outbox_exclude: false,
2889                    rationale: None,
2890                    relation_kind: None,
2891                    renamed_from: None,
2892                    sequence_within: None,
2893                    sql_type: "BIGINT".to_string(),
2894                    unique: false,
2895                    type_change_using: None,
2896                }],
2897                exclusion_constraints: Vec::new(),
2898                fts: None,
2899                is_through: false,
2900                moved_from_app: None,
2901                partition: None,
2902                primary_key: PrimaryKeySchema {
2903                    columns: vec!["id".to_string()],
2904                    kind: PkKindSchema::HeerIdRecencyBiased,
2905                },
2906                rationale: None,
2907                renamed_from: None,
2908                rls_enabled: false,
2909                table: "widgets".to_string(),
2910                table_comment: None,
2911                storage_params: None,
2912                tablespace: None,
2913                tenant_key: None,
2914            },
2915        );
2916        let snap_path = snapshot_path(&work, &billing_bucket);
2917        save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
2918
2919        // EMPTY models — simulates the billing crate having been
2920        // removed from the workspace. Without the disk-walk this
2921        // bucket would never reach the differ.
2922        let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
2923        let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
2924
2925        let exit = compose_with_inputs(
2926            &work,
2927            "drop billing remnant",
2928            true,  // allow_destructive — billing's snapshot will produce DROP ops
2929            false, // force_overwrite
2930            &empty_models,
2931            &[AppLifecycle {
2932                label: "billing".to_string(),
2933                database: "main".to_string(),
2934                renamed_from: None,
2935                tombstone: true, // intentional removal channel
2936            }],
2937            now,
2938            None, // pk_flip_join_table_option — no flip in this test
2939        );
2940        assert_eq!(exit, ExitCode::from(0), "compose must succeed");
2941
2942        // The composed up SQL must carry DROP TABLE for billing's
2943        // widgets — that is the whole point. Find the file and check.
2944        let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
2945        let mut up_path: Option<PathBuf> = None;
2946        for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
2947            let n = entry.file_name().to_string_lossy().to_string();
2948            // Up file pattern: starts with "V", ends with ".sdjql", does
2949            // NOT contain ".down.".
2950            if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
2951                up_path = Some(entry.path());
2952                break;
2953            }
2954        }
2955        let up_path = up_path.expect("compose must have written an up SQL file");
2956        let up_sql = fs::read_to_string(&up_path).unwrap();
2957        assert!(
2958            up_sql.contains("DROP TABLE \"widgets\""),
2959            "compose must have seen the disk snapshot and emitted DROP TABLE — \
2960             this proves discover_snapshot_buckets_on_disk reached the differ. \
2961             SQL: {up_sql}"
2962        );
2963        let _ = fs::remove_dir_all(&work);
2964    }
2965
2966    /// `status_cmd` invokes its tokio runtime and
2967    /// fails fast on a malformed `Djogi.toml`. We don't need a live
2968    /// Postgres for this assertion — the test is that the workspace
2969    /// path is threaded through the loader and TOML errors surface
2970    /// promptly. (The earlier `a1_load_from_workspace_reads_path_specific_djogi_toml`
2971    /// covers the well-formed case; this is the malformed-input
2972    /// path-threading proof.)
2973    #[test]
2974    fn a1_round2_status_cmd_threads_workspace_to_config() {
2975        let work = temp_workspace("a1r2_status_workspace");
2976        // Write a deliberately malformed TOML so config load fails.
2977        // If the workspace path wasn't threaded, status_cmd would
2978        // try the cwd's Djogi.toml (typically absent) and silently
2979        // fall through to defaults, giving a different error code.
2980        fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
2981        let exit = status_cmd(Some(work.clone()));
2982        assert_eq!(
2983            exit,
2984            ExitCode::from(1),
2985            "malformed workspace Djogi.toml must surface as config load error"
2986        );
2987        let _ = fs::remove_dir_all(&work);
2988    }
2989
2990    // ── AttuneError → exit code matrix ──────────────────────────────
2991
2992    /// Every `AttuneError::Refused(_)` variant must map to exit code `2`
2993    /// per `docs/spec/configuration.md` §14. The pre-fix implementation
2994    /// flattened every error to `1`, so an operator running attune in CI
2995    /// could not distinguish "policy gate refused before any side effect"
2996    /// from "ran half a step and failed mid-flight".
2997    #[test]
2998    fn u3_attune_refusal_variants_map_to_exit_code_two() {
2999        use djogi::migrate::AttuneRefusal;
3000        let cases = [
3001            AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
3002                database_url: "postgres://prod.example.com/main".to_string(),
3003            }),
3004            AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
3005                profile: "production".to_string(),
3006            }),
3007            // Dev_mode and DJOGI_ENV gates both produce `AttuneError::Refused(_)`
3008            // so they share the exit-code-2 mapping.
3009            AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
3010            AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
3011                env_value: "production".to_string(),
3012            }),
3013            AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
3014                version: "V20260101000000__missing".to_string(),
3015            }),
3016            AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
3017                version: "V20260101000000__shared".to_string(),
3018                buckets: vec!["main/users".to_string(), "main/billing".to_string()],
3019            }),
3020        ];
3021        for err in &cases {
3022            assert_eq!(
3023                attune_error_exit_code(err),
3024                2,
3025                "refusal variant must map to exit 2: {err}"
3026            );
3027        }
3028    }
3029
3030    /// Every runtime `AttuneError` variant must map to exit code `1`
3031    /// per `docs/spec/configuration.md` §14. CI may safely retry runtime
3032    /// failures; a refusal (exit `2`) signals "operator must intervene"
3033    /// and retrying without operator action would just refuse again.
3034    #[test]
3035    fn u3_attune_runtime_variants_map_to_exit_code_one() {
3036        let cases = [
3037            AttuneError::FilesystemScanFailed {
3038                source: std::io::Error::other("disk full"),
3039            },
3040            AttuneError::SqlReadFailed {
3041                path: PathBuf::from("/tmp/x.sdjql"),
3042                source: std::io::Error::other("permission denied"),
3043            },
3044            AttuneError::SqlWriteFailed {
3045                path: PathBuf::from("/tmp/x.sdjql"),
3046                source: std::io::Error::other("read-only fs"),
3047            },
3048            AttuneError::SqlDeleteFailed {
3049                path: PathBuf::from("/tmp/x.sdjql"),
3050                source: std::io::Error::other("not found"),
3051            },
3052            AttuneError::GitPublishFailed {
3053                stderr: "fatal: refusing to push".to_string(),
3054                status_code: Some(128),
3055            },
3056        ];
3057        for err in &cases {
3058            assert_eq!(
3059                attune_error_exit_code(err),
3060                1,
3061                "runtime variant must map to exit 1: {err}"
3062            );
3063        }
3064    }
3065
3066    // ── issue #354: baseline exit-code mapping ──────────────────────────
3067
3068    /// The refusal-class `RunnerError` variants the baseline path can
3069    /// `baseline_cmd` validates the `--reason` guard before any DB
3070    /// work. An empty or whitespace-only reason must return exit 2
3071    /// without touching the filesystem or network — the guard fires
3072    /// on the CLI-owned string before the tokio runtime is even built.
3073    #[test]
3074    fn baseline_empty_reason_exits_code_2() {
3075        let result = baseline_cmd(
3076            "V00000000000000__baseline",
3077            "description",
3078            "",
3079            None,
3080            None,
3081            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3082        );
3083        assert_eq!(
3084            result,
3085            ExitCode::from(2),
3086            "empty --reason must exit 2 before any DB work"
3087        );
3088    }
3089
3090    #[test]
3091    fn baseline_whitespace_reason_exits_code_2() {
3092        let result = baseline_cmd(
3093            "V00000000000000__baseline",
3094            "description",
3095            "   ",
3096            None,
3097            None,
3098            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3099        );
3100        assert_eq!(
3101            result,
3102            ExitCode::from(2),
3103            "whitespace-only --reason must exit 2 before any DB work"
3104        );
3105    }
3106
3107    /// surface must map to exit `2` — a blind retry would hit the same
3108    /// condition, so CI must treat them as "operator must intervene"
3109    /// rather than retryable. A duplicate baseline version (terminal or
3110    /// non-terminal), a wiring bug that supplies a snapshot, and an
3111    /// out-of-order rejection are all refusals.
3112    #[test]
3113    fn baseline_refusal_variants_map_to_exit_code_two() {
3114        let cases = [
3115            RunnerError::VersionAlreadyApplied {
3116                version: "V00000000000000__baseline".to_string(),
3117                applied_at: None,
3118            },
3119            RunnerError::VersionCollisionNonTerminal {
3120                version: "V00000000000000__baseline".to_string(),
3121                status: LedgerStatus::Pending,
3122                run_id: 1,
3123            },
3124            RunnerError::BaselineSnapshotShouldNotBeProvided,
3125            RunnerError::AdvisoryUnlockReturnedFalse {
3126                bucket: BucketKey {
3127                    database: "main".to_string(),
3128                    app: String::new(),
3129                },
3130                key: 0x0102_0304_0506_0708,
3131            },
3132            RunnerError::OutOfOrderRejected {
3133                version: "V00000000000000__baseline".to_string(),
3134                conflicting_version: "V20260101000000__later".to_string(),
3135                conflicting_applied_at: None,
3136            },
3137        ];
3138        for err in &cases {
3139            assert_eq!(
3140                baseline_error_exit_code(err),
3141                2,
3142                "baseline refusal variant must map to exit 2: {err}"
3143            );
3144        }
3145    }
3146
3147    /// Transient `RunnerError` variants reachable from the baseline path
3148    /// must map to exit `1` (retryable). The `#[non_exhaustive]`
3149    /// wildcard arm guarantees any unnamed variant also lands on `1`;
3150    /// these representative cases pin the projection / ledger / snapshot
3151    /// failure shapes the baseline runner can actually emit.
3152    #[test]
3153    fn baseline_transient_variants_map_to_exit_code_one() {
3154        use djogi::error::{DbError, DjogiError};
3155        let cases = [
3156            RunnerError::LedgerBootstrapFailed {
3157                source: DjogiError::Db(DbError::other("create table failed")),
3158            },
3159            RunnerError::LedgerWriteFailed {
3160                version: "V00000000000000__baseline".to_string(),
3161                source: DjogiError::Db(DbError::other("insert failed")),
3162            },
3163            RunnerError::PinnedSessionCheckoutFailed {
3164                source: DjogiError::Db(DbError::other("pool exhausted")),
3165            },
3166            RunnerError::AdvisoryLockFailed {
3167                bucket: BucketKey {
3168                    database: "main".to_string(),
3169                    app: String::new(),
3170                },
3171                key: 0x0102_0304_0506_0708,
3172                attempts: 3,
3173            },
3174        ];
3175        for err in &cases {
3176            assert_eq!(
3177                baseline_error_exit_code(err),
3178                1,
3179                "baseline transient variant must map to exit 1: {err}"
3180            );
3181        }
3182    }
3183
3184    // ── REQ-326: --fake / --reason validation tests ─────────────────────
3185
3186    /// REQ-326-5: --fake without --reason must exit with code 2.
3187    #[test]
3188    fn fake_without_reason_exits_code_2() {
3189        let result = apply_cmd(
3190            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3191            true,
3192            None,
3193        );
3194        assert_eq!(
3195            result,
3196            ExitCode::from(2),
3197            "--fake without --reason must exit 2"
3198        );
3199    }
3200
3201    /// REQ-326-5: --fake with blank reason must exit with code 2.
3202    #[test]
3203    fn fake_with_empty_reason_exits_code_2() {
3204        let result = apply_cmd(
3205            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3206            true,
3207            Some(String::new()),
3208        );
3209        assert_eq!(
3210            result,
3211            ExitCode::from(2),
3212            "--fake with empty reason must exit 2"
3213        );
3214    }
3215
3216    /// REQ-326-5: --fake with whitespace-only reason must exit with code 2.
3217    #[test]
3218    fn fake_with_whitespace_reason_exits_code_2() {
3219        let result = apply_cmd(
3220            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3221            true,
3222            Some("   ".to_string()),
3223        );
3224        assert_eq!(
3225            result,
3226            ExitCode::from(2),
3227            "--fake with whitespace reason must exit 2"
3228        );
3229    }
3230
3231    /// --reason without --fake is accepted (silently ignored).
3232    #[test]
3233    fn reason_without_fake_is_accepted() {
3234        // This should NOT exit 2; it will proceed to config load which
3235        // may fail on nonexistent workspace, but the --reason flag itself
3236        // is accepted. We verify the function does not early-exit with code 2.
3237        let result = apply_cmd(
3238            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3239            false, // NOT fake
3240            Some("test reason".to_string()),
3241        );
3242        // Should be 1 (config error) not 2 (refusal)
3243        assert_ne!(
3244            result,
3245            ExitCode::from(2),
3246            "--reason without --fake should not refuse"
3247        );
3248    }
3249
3250    // ── render_verify_report ─────────────────────────────────────
3251    // `render_verify_report` returns `Vec<String>` so the rendering is
3252    // assertable without capturing stdout. Each test pins the exact lines
3253    // the operator sees for one report shape.
3254
3255    /// Build a bucket for render tests.
3256    fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
3257        djogi::migrate::BucketKey {
3258            database: database.to_string(),
3259            app: app.to_string(),
3260        }
3261    }
3262
3263    /// Construct a [`VerifyDiagnostic`] tersely for render tests.
3264    fn diag(
3265        code: &str,
3266        severity: djogi::migrate::VerifySeverity,
3267        message: &str,
3268        location: Option<&str>,
3269    ) -> djogi::migrate::VerifyDiagnostic {
3270        djogi::migrate::VerifyDiagnostic {
3271            code: code.to_string(),
3272            severity,
3273            message: message.to_string(),
3274            location: location.map(str::to_string),
3275        }
3276    }
3277
3278    #[test]
3279    fn render_verify_report_clean_output() {
3280        use djogi::migrate::VerifyReport;
3281
3282        let report = VerifyReport {
3283            diagnostics: vec![],
3284            latest_applied_version: Some("001_initial".to_string()),
3285            applied_count: 3,
3286            unfinished_count: 0,
3287        };
3288        let bucket = render_bucket("main", "");
3289
3290        let lines = render_verify_report(&report, &bucket);
3291
3292        assert!(
3293            lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
3294            "missing ledger line; got {lines:?}"
3295        );
3296        assert!(
3297            lines.contains(&"No drift detected. Schema is consistent.".to_string()),
3298            "missing clean line; got {lines:?}"
3299        );
3300        assert!(
3301            lines.iter().any(|l| l.contains("Result: PASSED")),
3302            "missing PASSED result; got {lines:?}"
3303        );
3304        assert!(
3305            !lines.iter().any(|l| l.contains("FAILED")),
3306            "clean report must not say FAILED; got {lines:?}"
3307        );
3308    }
3309
3310    #[test]
3311    fn render_verify_report_with_errors() {
3312        use djogi::migrate::{VerifyReport, VerifySeverity};
3313
3314        // Diagnostics are pre-sorted by `(code, location)` exactly as the
3315        // library returns them — render does not re-sort.
3316        let report = VerifyReport {
3317            diagnostics: vec![
3318                diag(
3319                    "D601",
3320                    VerifySeverity::Error,
3321                    "Snapshot table missing from live DB",
3322                    Some("users"),
3323                ),
3324                diag(
3325                    "D611",
3326                    VerifySeverity::Warning,
3327                    "Live index not present in snapshot",
3328                    Some("idx_posts_created"),
3329                ),
3330            ],
3331            latest_applied_version: Some("V20260501000000__add_users".to_string()),
3332            applied_count: 2,
3333            unfinished_count: 0,
3334        };
3335        let bucket = render_bucket("main", "myapp");
3336
3337        assert!(report.has_errors());
3338        let lines = render_verify_report(&report, &bucket);
3339
3340        assert!(
3341            lines
3342                .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
3343            "missing D601 line; got {lines:?}"
3344        );
3345        assert!(
3346            lines.contains(
3347                &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
3348            ),
3349            "missing D611 line; got {lines:?}"
3350        );
3351        assert!(
3352            lines.iter().any(|l| l.contains("Result: FAILED")),
3353            "error report must say FAILED; got {lines:?}"
3354        );
3355    }
3356
3357    #[test]
3358    fn render_verify_report_header_shows_global_and_named_app() {
3359        use djogi::migrate::VerifyReport;
3360
3361        let report = VerifyReport {
3362            diagnostics: vec![],
3363            latest_applied_version: None,
3364            applied_count: 0,
3365            unfinished_count: 0,
3366        };
3367
3368        // Empty app label → `_global_` in the header.
3369        let global = render_verify_report(&report, &render_bucket("main", ""));
3370        assert_eq!(
3371            global.first().map(String::as_str),
3372            Some("djogi migrations verify — main/_global_"),
3373            "global bucket header; got {global:?}"
3374        );
3375
3376        // Named app → the label verbatim in the header.
3377        let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
3378        assert_eq!(
3379            named.first().map(String::as_str),
3380            Some("djogi migrations verify — crud_log/billing"),
3381            "named bucket header; got {named:?}"
3382        );
3383    }
3384
3385    #[test]
3386    fn render_verify_report_warning_only_passes_with_warnings() {
3387        use djogi::migrate::{VerifyReport, VerifySeverity};
3388
3389        let report = VerifyReport {
3390            diagnostics: vec![diag(
3391                "D606",
3392                VerifySeverity::Warning,
3393                "type differs (advisory)",
3394                Some("users.age"),
3395            )],
3396            latest_applied_version: Some("001_initial".to_string()),
3397            applied_count: 1,
3398            unfinished_count: 0,
3399        };
3400        let lines = render_verify_report(&report, &render_bucket("main", ""));
3401
3402        assert!(
3403            lines
3404                .iter()
3405                .any(|l| l.contains("Result: PASSED with warnings")),
3406            "warning-only must PASS with warnings; got {lines:?}"
3407        );
3408        assert!(
3409            !lines.iter().any(|l| l.contains("FAILED")),
3410            "warning-only must not say FAILED; got {lines:?}"
3411        );
3412    }
3413
3414    #[test]
3415    fn render_verify_report_empty_ledger_line() {
3416        use djogi::migrate::VerifyReport;
3417
3418        let report = VerifyReport {
3419            diagnostics: vec![],
3420            latest_applied_version: None,
3421            applied_count: 0,
3422            unfinished_count: 0,
3423        };
3424        let lines = render_verify_report(&report, &render_bucket("main", ""));
3425
3426        assert!(
3427            lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
3428            "empty ledger line; got {lines:?}"
3429        );
3430    }
3431
3432    #[test]
3433    fn render_verify_report_unfinished_ledger_line() {
3434        use djogi::migrate::VerifyReport;
3435
3436        let report = VerifyReport {
3437            diagnostics: vec![],
3438            latest_applied_version: Some("V20260501000000__add_users".to_string()),
3439            applied_count: 2,
3440            unfinished_count: 1,
3441        };
3442        let lines = render_verify_report(&report, &render_bucket("main", ""));
3443
3444        assert!(
3445            lines.contains(
3446                &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
3447            ),
3448            "unfinished ledger line; got {lines:?}"
3449        );
3450    }
3451
3452    #[test]
3453    fn render_verify_report_info_with_no_location_uses_dash() {
3454        use djogi::migrate::{VerifyReport, VerifySeverity};
3455
3456        // An Info diagnostic with `location: None` exercises the
3457        // `unwrap_or("-")` path, and the all-info summary line.
3458        let report = VerifyReport {
3459            diagnostics: vec![diag(
3460                "D692",
3461                VerifySeverity::Info,
3462                "enum type(s) declared; not yet checked",
3463                None,
3464            )],
3465            latest_applied_version: Some("001_initial".to_string()),
3466            applied_count: 1,
3467            unfinished_count: 0,
3468        };
3469        let lines = render_verify_report(&report, &render_bucket("main", ""));
3470
3471        assert!(
3472            lines.iter().any(|l| l.contains("(-)")),
3473            "location: None must render as (-); got {lines:?}"
3474        );
3475        assert!(
3476            lines.contains(&"Result: PASSED (1 info(s))".to_string()),
3477            "all-info summary; got {lines:?}"
3478        );
3479    }
3480
3481    // ── resolve_bucket_url (Class A) ─────────────────────────────────────
3482
3483    fn db_config(
3484        url: &str,
3485        crud_log_url: Option<&str>,
3486        event_log_url: Option<&str>,
3487    ) -> djogi::config::DatabaseConfig {
3488        djogi::config::DatabaseConfig {
3489            url: url.to_string(),
3490            crud_log_url: crud_log_url.map(str::to_string),
3491            event_log_url: event_log_url.map(str::to_string),
3492            max_connections: None,
3493            dev_mode: false,
3494        }
3495    }
3496
3497    #[test]
3498    fn resolve_bucket_url_main_uses_app_url_verbatim() {
3499        // "main" must use the app URL verbatim even when the path
3500        // component is not literally "main" — deriving would target a
3501        // database that does not exist.
3502        let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
3503        assert_eq!(
3504            resolve_bucket_url(&cfg, "main").as_deref(),
3505            Some("postgres://user:pass@localhost:5432/myapp_prod"),
3506            "main must return the app URL unchanged"
3507        );
3508    }
3509
3510    #[test]
3511    fn resolve_bucket_url_crud_log_prefers_explicit_url() {
3512        let cfg = db_config(
3513            "postgres://localhost/main",
3514            Some("postgres://localhost/explicit_crud"),
3515            None,
3516        );
3517        assert_eq!(
3518            resolve_bucket_url(&cfg, "crud_log").as_deref(),
3519            Some("postgres://localhost/explicit_crud"),
3520            "crud_log must prefer the explicit crud_log_url"
3521        );
3522    }
3523
3524    #[test]
3525    fn resolve_bucket_url_event_log_prefers_explicit_url() {
3526        let cfg = db_config(
3527            "postgres://localhost/main",
3528            None,
3529            Some("postgres://localhost/explicit_event"),
3530        );
3531        assert_eq!(
3532            resolve_bucket_url(&cfg, "event_log").as_deref(),
3533            Some("postgres://localhost/explicit_event"),
3534            "event_log must prefer the explicit event_log_url"
3535        );
3536    }
3537
3538    #[test]
3539    fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
3540        // An empty explicit URL is treated as absent — derive from the app
3541        // URL's path component instead.
3542        let cfg = db_config("postgres://localhost/main", Some(""), Some("   "));
3543        // crud_log: empty string → derive.
3544        assert_eq!(
3545            resolve_bucket_url(&cfg, "crud_log").as_deref(),
3546            Some("postgres://localhost/crud_log"),
3547            "empty crud_log_url must fall back to derived"
3548        );
3549        // event_log: whitespace is NOT empty, so it is used verbatim — the
3550        // emptiness check is a strict `is_empty`, matching the spec.
3551        assert_eq!(
3552            resolve_bucket_url(&cfg, "event_log").as_deref(),
3553            Some("   "),
3554            "non-empty (whitespace) event_log_url is used verbatim"
3555        );
3556    }
3557
3558    #[test]
3559    fn resolve_bucket_url_other_database_derives_from_app_url() {
3560        let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
3561        assert_eq!(
3562            resolve_bucket_url(&cfg, "analytics").as_deref(),
3563            Some("postgres://user:pass@localhost:5432/analytics"),
3564            "an arbitrary database name derives by path splice"
3565        );
3566    }
3567
3568    #[test]
3569    fn resolve_bucket_url_pathless_url_returns_none() {
3570        // A URL with no recognisable path component cannot be derived.
3571        let cfg = db_config("postgres://localhost", None, None);
3572        assert_eq!(
3573            resolve_bucket_url(&cfg, "crud_log"),
3574            None,
3575            "pathless URL must yield None for a derived database"
3576        );
3577    }
3578
3579    #[test]
3580    fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
3581        // "main" short-circuits before derivation, so even a pathless URL
3582        // returns it verbatim — the app pool is the operator's to define.
3583        let cfg = db_config("postgres://localhost", None, None);
3584        assert_eq!(
3585            resolve_bucket_url(&cfg, "main").as_deref(),
3586            Some("postgres://localhost"),
3587            "main returns the app URL verbatim regardless of path"
3588        );
3589    }
3590}