Skip to main content

djogi_cli/
migrations.rs

1//! `djogi migrations` subcommand glue
2//! Two leaves: `compose` and `status`. Both flow through the public
3//! `djogi::migrate` API. Compose acquires the workspace file lock for
4//! the duration of the call; status is read-only and does not.
5//! The CLI surface here is intentionally thin — all the real logic
6//! lives in the library so integration tests can exercise it without
7//! spawning subprocesses.
8
9use std::path::{Path, PathBuf};
10use std::process::ExitCode;
11
12use djogi::apps::AppRegistry;
13use djogi::migrate::{
14    AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
15    DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
16    RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
17    VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
18    compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
19    repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
20};
21
22// Re-export for the apply command's ledger state machine.
23use djogi::migrate::LedgerStatus;
24
25// CLI-side enums declared at the crate root (`main.rs` is the binary's
26// root module — there is no `mod main`), reached here as `crate::*`.
27use crate::{PartialApplyResolutionCli, RepairSubcommand};
28
29// ── Replay plan deserialization ──────────────────────────────────────────
30
31/// Local mirror of `StoredReplayPlan` (pub(crate) in the library).
32/// The committed replay plan JSON written by `compose` at
33/// `migrations/<database>/<app>/<version>.plan.json`. This struct
34/// allows the CLI to parse it and construct a proper [`MigrationPlan`]
35/// with correct segment structure and checksums.
36#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
37struct CliReplayPlan {
38    format_version: String,
39    checksum_up: String,
40    checksum_down: Option<String>,
41    classification: CliClassification,
42    segments: Vec<CliReplaySegment>,
43}
44
45#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
46#[serde(tag = "kind", rename_all = "snake_case")]
47enum CliClassification {
48    NoOp,
49    Additive,
50    Reversible,
51    Destructive,
52    Lossy,
53    Unsupported {
54        reason: String,
55    },
56    PkTypeFlip {
57        co_destructive: bool,
58        co_lossy: bool,
59    },
60}
61
62#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
63struct CliReplaySegment {
64    kind: CliSegmentKind,
65    statements: Vec<CliReplayStatement>,
66}
67
68#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
69#[serde(rename_all = "snake_case")]
70enum CliSegmentKind {
71    Transactional,
72    NonTransactional,
73    MetadataOnly,
74}
75
76#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
77struct CliReplayStatement {
78    label: String,
79    up: String,
80}
81
82/// Expected format version for the committed replay plan JSON.
83const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
84
85/// Load the committed replay plan from disk and convert to a
86/// [`djogi::migrate::MigrationPlan`]. Returns `(plan, checksum_up, checksum_down)`.
87/// Falls back to reading the up/down SQL files and constructing a
88/// single-segment transactional plan when the replay plan JSON is
89/// absent or invalid. This mirrors the reset.rs fallback path.
90fn load_replay_plan_from_disk(
91    workspace: &Path,
92    bucket: &djogi::migrate::BucketKey,
93    version: &str,
94    pending_checksum_up: &str,
95    pending_checksum_down: Option<&str>,
96) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
97    // Try to load the committed replay plan JSON first.
98    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
99    let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
100
101    if let Ok(bytes) = std::fs::read(&replay_plan_path) {
102        let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
103            Ok(s) => s,
104            Err(e) => {
105                return Err(ApplyReplayPlanError::Parse {
106                    path: replay_plan_path.clone(),
107                    source: e.to_string(),
108                });
109            }
110        };
111
112        if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
113            return Err(ApplyReplayPlanError::FormatVersion {
114                found: stored.format_version,
115                path: replay_plan_path.clone(),
116            });
117        }
118
119        // Verify checksums match the pending plan.
120        if stored.checksum_up != pending_checksum_up
121            || stored.checksum_down.as_deref() != pending_checksum_down
122        {
123            return Err(ApplyReplayPlanError::ChecksumMismatch);
124        }
125
126        let plan = djogi::migrate::MigrationPlan {
127            bucket: bucket.clone(),
128            classification: stored.classification.into(),
129            segments: stored
130                .segments
131                .into_iter()
132                .map(|seg| djogi::migrate::Segment {
133                    kind: seg.kind.into(),
134                    statements: seg
135                        .statements
136                        .into_iter()
137                        .map(|stmt| djogi::migrate::OperationSql {
138                            label: stmt.label,
139                            up: stmt.up,
140                            down: String::new(),
141                            lossy: None,
142                        })
143                        .collect(),
144                })
145                .collect(),
146        };
147
148        return Ok((plan, stored.checksum_up, stored.checksum_down));
149    }
150
151    // Fallback: read SQL files and construct single-segment plan.
152    let up_filename = djogi::migrate::up_filename(version);
153    let down_filename = djogi::migrate::down_filename(version);
154    let up_path = bucket_dir.join(&up_filename);
155    let down_path = bucket_dir.join(&down_filename);
156
157    let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
158        path: up_path.clone(),
159        source: e.to_string(),
160    })?;
161
162    let down_sql = match std::fs::read_to_string(&down_path) {
163        Ok(sql) => sql,
164        Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
165        Err(e) => {
166            return Err(ApplyReplayPlanError::SqlRead {
167                path: down_path.clone(),
168                source: e.to_string(),
169            });
170        }
171    };
172
173    // Compute checksum for the single-segment fallback. The runner
174    // recomputes from the plan's SQL fragments and verifies against
175    // what we provide in RunnerCtx, so they must match.
176    let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
177
178    // Build a single-transactional-segment plan. This is correct for
179    // most migrations — only CONCURRENTLY indexes require non-tx
180    // segments, and those always have a replay plan JSON.
181    let plan = djogi::migrate::MigrationPlan {
182        bucket: bucket.clone(),
183        classification: djogi::migrate::Classification::Additive,
184        segments: vec![djogi::migrate::Segment {
185            kind: djogi::migrate::SegmentKind::Transactional,
186            statements: vec![djogi::migrate::OperationSql {
187                label: format!("replay {version}"),
188                up: up_sql,
189                down: down_sql,
190                lossy: None,
191            }],
192        }],
193    };
194
195    Ok((plan, computed_checksum_up, None))
196}
197
198/// Errors from [`load_replay_plan_from_disk`].
199#[derive(Debug)]
200enum ApplyReplayPlanError {
201    Parse { path: PathBuf, source: String },
202    FormatVersion { found: String, path: PathBuf },
203    ChecksumMismatch,
204    SqlRead { path: PathBuf, source: String },
205}
206
207impl std::fmt::Display for ApplyReplayPlanError {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        match self {
210            Self::Parse { path, source } => {
211                write!(f, "parse replay plan {}: {source}", path.display())
212            }
213            Self::FormatVersion { found, path } => write!(
214                f,
215                "replay plan format version mismatch in {}: expected {}, found {}",
216                path.display(),
217                CLI_REPLAY_PLAN_FORMAT_VERSION,
218                found
219            ),
220            Self::ChecksumMismatch => {
221                write!(f, "checksum mismatch between pending JSON and replay plan")
222            }
223            Self::SqlRead { path, source } => {
224                write!(f, "read SQL file {}: {source}", path.display())
225            }
226        }
227    }
228}
229
230impl std::error::Error for ApplyReplayPlanError {}
231
232/// Classify a Phase 0 artifact for the CLI cleanup path (#386).
233/// Loads the committed replay plan JSON or falls back to the SQL file,
234/// classifies the up SQL using [`djogi::migrate::classify_phase_zero_artifact`],
235/// and returns `Some(reason)` unless the artifact is identity-free
236/// replay-current.
237/// Returns `None` when the artifact is safe for migration replay.
238fn classify_phase_zero_for_cleanup(
239    workspace: &Path,
240    bucket: &djogi::migrate::BucketKey,
241    version: &str,
242    pending_checksum_up: &str,
243    pending_checksum_down: Option<&str>,
244) -> Option<String> {
245    // Try to load the committed replay plan JSON first.
246    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
247    let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
248
249    if let Ok(bytes) = std::fs::read(&replay_plan_path) {
250        let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
251            Ok(s) => s,
252            Err(e) => {
253                return Some(format!("parse replay plan: {e}"));
254            }
255        };
256
257        if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
258            return Some(format!(
259                "replay plan format version mismatch: expected {}, found {}",
260                CLI_REPLAY_PLAN_FORMAT_VERSION, stored.format_version
261            ));
262        }
263
264        // Verify checksums match the pending plan.
265        if stored.checksum_up != pending_checksum_up
266            || stored.checksum_down.as_deref() != pending_checksum_down
267        {
268            return Some("checksum mismatch between pending JSON and replay plan".to_string());
269        }
270
271        // Reconstruct the up SQL from the replay plan segments for classification.
272        let up_sql: String = stored
273            .segments
274            .iter()
275            .flat_map(|seg| seg.statements.iter())
276            .map(|stmt| stmt.up.as_str())
277            .collect::<Vec<&str>>()
278            .join("\n");
279
280        return classify_phase_zero_bytes(up_sql.as_bytes());
281    }
282
283    // Fallback: read the up SQL file directly.
284    let up_filename = djogi::migrate::up_filename(version);
285    let up_path = bucket_dir.join(&up_filename);
286    match std::fs::read_to_string(&up_path) {
287        Ok(up_sql) => classify_phase_zero_bytes(up_sql.as_bytes()),
288        Err(e) => Some(format!("read up SQL file {}: {e}", up_path.display())),
289    }
290}
291
292/// Classify raw bytes as Phase 0 artifact and return refusal reason unless it
293/// is identity-free replay-current.
294fn classify_phase_zero_bytes(bytes: &[u8]) -> Option<String> {
295    match djogi::migrate::classify_phase_zero_artifact(bytes) {
296        djogi::migrate::PhaseZeroArtifactState::IdentityFreeCurrent => None,
297        djogi::migrate::PhaseZeroArtifactState::SeedCapableRuntimeCurrent => {
298            Some("seed-capable runtime-only artifact detected".to_string())
299        }
300        djogi::migrate::PhaseZeroArtifactState::SeedDmlNotRuntimeCurrent => {
301            Some("seed-dml non-runtime-current artifact detected".to_string())
302        }
303        djogi::migrate::PhaseZeroArtifactState::GeneratedStale => {
304            Some("generated-stale artifact detected".to_string())
305        }
306        djogi::migrate::PhaseZeroArtifactState::Ambiguous => {
307            Some("ambiguous or hand-edited artifact detected".to_string())
308        }
309        djogi::migrate::PhaseZeroArtifactState::Incomplete => {
310            Some("incomplete artifact (truncated generation)".to_string())
311        }
312        djogi::migrate::PhaseZeroArtifactState::Missing => Some("missing artifact".to_string()),
313    }
314}
315
316// ── Type conversions from CLI-local types to library types ────────────────
317
318impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
319    fn from(kind: CliSegmentKind) -> Self {
320        match kind {
321            CliSegmentKind::Transactional => Self::Transactional,
322            CliSegmentKind::NonTransactional => Self::NonTransactional,
323            CliSegmentKind::MetadataOnly => Self::MetadataOnly,
324        }
325    }
326}
327
328impl From<CliClassification> for djogi::migrate::Classification {
329    fn from(classification: CliClassification) -> Self {
330        match classification {
331            CliClassification::NoOp => Self::NoOp,
332            CliClassification::Additive => Self::Additive,
333            CliClassification::Reversible => Self::Reversible,
334            CliClassification::Destructive => Self::Destructive,
335            CliClassification::Lossy => Self::Lossy,
336            CliClassification::Unsupported { reason } => Self::Unsupported { reason },
337            CliClassification::PkTypeFlip {
338                co_destructive,
339                co_lossy,
340            } => Self::PkTypeFlip {
341                co_destructive,
342                co_lossy,
343            },
344        }
345    }
346}
347
348/// Resolve the workspace root from the `--workspace` flag. When the
349/// flag is absent we use the current working directory — the typical
350/// invocation pattern is `cd <project>` then `djogi migrations …`.
351fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
352    workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
353}
354
355/// Walk the on-disk `migrations/<database>/<app>/` tree and return the
356/// set of buckets that already have a `schema_snapshot.json` file.
357/// Compose's `snapshots` map must include the OLD bucket of any
358/// renamed app — and that bucket is guaranteed to be absent from the
359/// current `models` inventory because the `#[app(renamed_from =
360/// "old")]` annotation lives on the NEW app. Walking disk directly
361/// recovers those orphaned snapshots so the differ sees both sides of
362/// a rename.
363/// Each entry maps to a [`djogi::migrate::projection::BucketKey`]
364/// using the inverse of [`djogi::migrate::app_dirname`] (synthetic
365/// `_global_` directory → empty-string label).
366fn discover_snapshot_buckets_on_disk(
367    workspace: &Path,
368) -> Vec<djogi::migrate::projection::BucketKey> {
369    let mut out = Vec::new();
370    let migrations_root = djogi::migrate::migrations_root(workspace);
371    let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
372        return out;
373    };
374    for db_entry in db_entries.flatten() {
375        let Ok(ft) = db_entry.file_type() else {
376            continue;
377        };
378        if !ft.is_dir() {
379            continue;
380        }
381        let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
382            continue;
383        };
384        let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
385            continue;
386        };
387        for app_entry in app_entries.flatten() {
388            let Ok(ft) = app_entry.file_type() else {
389                continue;
390            };
391            if !ft.is_dir() {
392                continue;
393            }
394            let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
395                continue;
396            };
397            let snap_path = app_entry.path().join("schema_snapshot.json");
398            if !snap_path.exists() {
399                continue;
400            }
401            let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
402            out.push(djogi::migrate::projection::BucketKey {
403                database: database.clone(),
404                app: label,
405            });
406        }
407    }
408    out
409}
410
411/// `djogi migrations compose` entry point.
412pub fn compose_cmd(
413    provider: &dyn DescriptorProvider,
414    name: &str,
415    allow_destructive: bool,
416    force_overwrite: bool,
417    workspace: Option<PathBuf>,
418) -> ExitCode {
419    let workspace = resolve_workspace(workspace);
420    let models = match project_from_provider(provider) {
421        Ok(m) => m,
422        Err(e) => {
423            eprintln!("djogi migrations compose: projection error: {e}");
424            return ExitCode::from(1);
425        }
426    };
427    let apps: Vec<AppLifecycle> = provider
428        .apps()
429        .iter()
430        .map(|d| AppLifecycle {
431            label: d.label.to_string(),
432            database: d.database.to_string(),
433            renamed_from: d.renamed_from.map(str::to_string),
434            tombstone: d.tombstone,
435        })
436        .collect();
437    // The resolved workspace flows into config loading. Compose consumes
438    // the [`MigrateConfig::pk_flip_join_table_option`] knob so we no
439    // longer drop the parsed config — the join-table layout
440    // selected in `Djogi.toml` reaches the differ via this path.
441    let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
442        Ok(c) => c,
443        Err(e) => {
444            eprintln!("djogi migrations compose: config load: {e}");
445            return ExitCode::from(1);
446        }
447    };
448    let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
449        djogi_config.migrate.pk_flip_join_table_option,
450    );
451    compose_with_inputs(
452        &workspace,
453        name,
454        allow_destructive,
455        force_overwrite,
456        &models,
457        &apps,
458        time::OffsetDateTime::now_utc(),
459        Some(pk_flip_option),
460    )
461}
462
463/// Shared compose body — separated from [`compose_cmd`] so tests can
464/// drive it with explicit `models` and `apps` (the production entry
465/// point sources both from `inventory::iter` and `AppRegistry::all`,
466/// which are global state and thus not directly addressable from a
467/// unit test).
468/// Acquires the workspace lock, walks the on-disk migration tree to
469/// recover orphaned snapshots (renamed-from buckets), and
470/// invokes [`djogi::migrate::compose`].
471// Compose has 8 inputs because it sits at the bridge between
472// CLI flag parsing (workspace / name / flags / clock) and the
473// engine (`models` / `apps` / `pk_flip_join_table_option`).
474// Folding these into a struct would push the same fields onto
475// the caller; the CLI tests already pass them positionally and
476// a struct-based refactor would be churn for no clarity gain.
477#[allow(clippy::too_many_arguments)]
478fn compose_with_inputs(
479    workspace: &Path,
480    name: &str,
481    allow_destructive: bool,
482    force_overwrite: bool,
483    models: &std::collections::BTreeMap<
484        djogi::migrate::projection::BucketKey,
485        djogi::migrate::AppliedSchema,
486    >,
487    apps: &[AppLifecycle],
488    now: time::OffsetDateTime,
489    pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
490) -> ExitCode {
491    let lock_path = workspace.join(LOCK_FILE_NAME);
492    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
493        Ok(g) => g,
494        Err(e) => {
495            eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
496            return ExitCode::from(1);
497        }
498    };
499
500    // Read snapshots from disk. The bucket set we load is the UNION of
501    // (a) every bucket the current projection knows about and (b) every
502    // on-disk bucket that has a snapshot file.
503    // Without (b) a renamed-from app's old snapshot is missed
504    // entirely (the new app's `BucketKey` differs and the differ
505    // never sees the old schema, breaking compose's rename + drop +
506    // move emission).
507    let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
508        models.keys().cloned().collect();
509    for bucket in discover_snapshot_buckets_on_disk(workspace) {
510        bucket_set.insert(bucket);
511    }
512
513    let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
514    for bucket in &bucket_set {
515        let path = djogi::migrate::snapshot_path(workspace, bucket);
516        match djogi::migrate::load_snapshot(&path) {
517            Ok(s) => {
518                snapshots.insert(bucket.clone(), s);
519            }
520            Err(djogi::migrate::SnapshotError::Io { source, .. })
521                if source.kind() == std::io::ErrorKind::NotFound =>
522            {
523                // Fresh app — no prior snapshot.
524            }
525            Err(e) => {
526                eprintln!(
527                    "djogi migrations compose: snapshot load failed at {}: {e}",
528                    path.display()
529                );
530                return ExitCode::from(1);
531            }
532        }
533    }
534
535    let req = ComposeRequest {
536        workspace_root: workspace,
537        models,
538        snapshots: &snapshots,
539        apps,
540        name,
541        allow_destructive,
542        force_overwrite,
543        now,
544        _guard: &guard,
545        pk_flip_join_table_option,
546        // Production: always run auto-emit. The flag is a
547        // test-only escape hatch for unit tests that exercise
548        // compose's lower-level write/rollback machinery in
549        // isolation; the CLI / production path always goes through
550        // the full bootstrap flow.
551        skip_phase_zero_auto_emit: false,
552    };
553    match compose(req) {
554        Ok(report) => {
555            // Surface auto-emitted bootstraps before
556            // the regular composed buckets so the operator sees the
557            // bootstrap context before the per-bucket changes.
558            for emit in &report.emitted_phase_zero {
559                let ext_summary = if emit.extensions.is_empty() {
560                    "no extensions".to_string()
561                } else {
562                    format!(
563                        "extensions: {}",
564                        emit.extensions
565                            .iter()
566                            .cloned()
567                            .collect::<Vec<_>>()
568                            .join(", ")
569                    )
570                };
571                println!(
572                    "auto-emitted bootstrap migration: {database}/_global_ ({ext_summary})",
573                    database = emit.database,
574                );
575            }
576            for cb in &report.composed_buckets {
577                println!(
578                    "composed {database}/{app}: {version} ({classification:?})",
579                    database = cb.bucket.database,
580                    app = if cb.bucket.app.is_empty() {
581                        "_global_"
582                    } else {
583                        cb.bucket.app.as_str()
584                    },
585                    version = cb.version,
586                    classification = cb.classification,
587                );
588            }
589            for bucket in &report.converged_snapshot_buckets {
590                println!(
591                    "snapshot converged: {database}/{app} — snapshot updated to scoped enum set, no migration needed",
592                    database = bucket.database,
593                    app = if bucket.app.is_empty() {
594                        "_global_"
595                    } else {
596                        bucket.app.as_str()
597                    },
598                );
599            }
600            ExitCode::from(0)
601        }
602        Err(ComposeError::NothingToCompose) => {
603            println!("nothing to compose — model state matches snapshot for every bucket");
604            // Per the inline-decisions: nothing-to-compose is
605            // not an error. The status command is the one that
606            // signals out-of-sync state via exit code.
607            ExitCode::from(0)
608        }
609        Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
610            eprintln!("djogi migrations compose: {text}");
611            // Exit 2 — refusal: models must be compiled in before dropping app linkage.
612            ExitCode::from(2)
613        }
614        Err(e @ ComposeError::CrossBucketForeignKeyCycle { .. }) => {
615            eprintln!("djogi migrations compose: {e}");
616            // Exit 2 — operator-actionable refusal: the operator resolves the
617            // cycle (merge the apps or drop one FK direction). A blind retry
618            // would refuse identically, so this is exit 2, not the exit-1
619            // unexpected-error catch-all below.
620            ExitCode::from(2)
621        }
622        Err(e) => {
623            eprintln!("djogi migrations compose: {e}");
624            ExitCode::from(1)
625        }
626    }
627}
628
629/// `djogi migrations status` entry point.
630/// Read-only — does not acquire the workspace lock. Reads the
631/// migration ledger from the active database via
632/// [`djogi::context::DjogiContext`].
633pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
634    let workspace = resolve_workspace(workspace);
635
636    // Build a tokio runtime so we can drive the async ledger query.
637    let runtime = match tokio::runtime::Builder::new_current_thread()
638        .enable_all()
639        .build()
640    {
641        Ok(r) => r,
642        Err(e) => {
643            eprintln!("djogi migrations status: tokio runtime: {e}");
644            return ExitCode::from(1);
645        }
646    };
647
648    let exit = runtime.block_on(async { run_status(&workspace).await });
649    ExitCode::from(exit as u8)
650}
651
652/// Async body of [`status_cmd`]. Returns the desired exit code.
653/// The resolved `workspace` path feeds
654/// [`djogi::config::DjogiConfig::load_from_workspace`] so a
655/// `--workspace /custom/path` actually reads `/<custom>/Djogi.toml`
656/// instead of always picking up the cwd's config. Production callers
657/// running from inside the project root (the typical case) get the
658/// previous behaviour for free — `resolve_workspace(None)` returns
659/// `cwd`.
660async fn run_status(workspace: &Path) -> i32 {
661    use djogi::config::DjogiConfig;
662
663    let config = match DjogiConfig::load_from_workspace(workspace) {
664        Ok(c) => c,
665        Err(e) => {
666            eprintln!("djogi migrations status: config load: {e}");
667            return 1;
668        }
669    };
670
671    let mut ctx = match connect_and_check(&config.database.url).await {
672        ContextOutcome::Ready(ctx) => ctx,
673        ContextOutcome::UnsupportedVersion(e) => {
674            crate::print_support_boundary_error("migrations status", &e);
675            return 2;
676        }
677        ContextOutcome::RuntimeError(msg) => {
678            eprintln!("djogi migrations status: pool: {msg}");
679            return 1;
680        }
681    };
682
683    let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
684        Ok(rows) => rows,
685        Err(e) => {
686            // A missing ledger table is treated as "no migrations
687            // applied" — print the empty state and exit 0.
688            if e.to_string().contains("djogi_schema_migrations") {
689                println!("No migrations recorded.");
690                return 0;
691            }
692            eprintln!("djogi migrations status: ledger read: {e}");
693            return 1;
694        }
695    };
696
697    let registered: Vec<String> = AppRegistry::all()
698        .iter()
699        .map(|d| d.label.to_string())
700        .collect();
701    let report = djogi::migrate::render_status(&rows, &registered);
702    for line in &report.lines {
703        println!("{line}");
704    }
705    report.exit_code
706}
707
708/// Outcome of [`connect_and_check`] — connecting a pool and running the
709/// Postgres-version preflight, with the support-boundary refusal kept
710/// distinct from ordinary runtime failures.
711/// The three arms drive different exit codes at the call site:
712/// - [`ContextOutcome::Ready`] — pool connected and PG ≥ 18; proceed.
713/// - [`ContextOutcome::UnsupportedVersion`] — PG < 18. The caller renders
714///   the support-boundary message via
715///   [`crate::print_support_boundary_error`] and exits `2` (refusal: the
716///   operator must upgrade Postgres; retrying changes nothing).
717/// - [`ContextOutcome::RuntimeError`] — pool connect failed, the preflight
718///   query errored, or any other non-version `DjogiError`. The caller
719///   prints the message and exits `1` (transient: CI may retry).
720// The `Ready` variant holds a `DjogiContext` (large — it wraps a
721// `DjogiPool`), while the other two variants are small (`DjogiError` /
722// `String`). Boxing `Ready` would add a heap allocation on the success
723// path; this value is constructed and immediately matched at each call
724// site (never stored in a collection), so the wider stack value is a
725// transient one-off, not a per-element penalty. Same trade-off and
726// rationale as `ContextInner` in `djogi::context` (see its
727// `large_enum_variant` allow).
728#[allow(clippy::large_enum_variant)]
729enum ContextOutcome {
730    /// Pool connected and the PG-version preflight passed.
731    Ready(djogi::context::DjogiContext),
732    /// The PG-version preflight refused — server is below the minimum
733    /// supported major version.
734    UnsupportedVersion(djogi::error::DjogiError),
735    /// A runtime failure (connect / preflight / other) — already rendered
736    /// to a string so the call site need not re-match.
737    RuntimeError(String),
738}
739
740/// Connect a pool from `url` and run the Postgres-version preflight,
741/// returning a typed [`ContextOutcome`].
742/// Splits the support-boundary refusal (PG < 18, exit `2`) from runtime
743/// failures (connect / query errors, exit `1`) so each call site can map
744/// the outcome onto the documented exit-code matrix. Connects via the
745/// public `DjogiPool::connect` entry point, then hands the pool to the
746/// public `DjogiContext::from_pool` API once the version check passes.
747async fn connect_and_check(url: &str) -> ContextOutcome {
748    let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
749        Ok(p) => p,
750        Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
751    };
752    match djogi::pg::preflight::check_postgres_version(&pool).await {
753        Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
754        // `DjogiError` is `#[non_exhaustive]`, so the `@`-bound
755        // `UnsupportedPostgresVersion` arm needs the trailing `_` catch-all.
756        Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
757            ContextOutcome::UnsupportedVersion(e)
758        }
759        Err(other) => ContextOutcome::RuntimeError(other.to_string()),
760    }
761}
762
763/// Resolve the connection URL for a single migration-bucket database.
764/// Verify routes each bucket to the pool for its `database` component.
765/// The mapping mirrors Djogi's three-database architecture:
766/// - `"main"` ([`djogi::apps::AppDescriptor::GLOBAL_DATABASE`]) always uses
767///   the app URL verbatim. We do NOT derive it by splicing `"main"` into
768///   the path, because the operator's app URL may carry a path component
769///   that is not literally named `main` (e.g. `…/myapp_prod`); deriving
770///   would target a database that does not exist.
771/// - `"crud_log"` / `"event_log"` prefer the explicit
772///   [`djogi::config::DatabaseConfig::crud_log_url`] /
773///   [`event_log_url`](djogi::config::DatabaseConfig::event_log_url) when
774///   set to a non-empty value, matching how the audit / event pools are
775///   resolved elsewhere.
776/// - Any other database name (and the log databases when their explicit
777///   URL is absent) is derived by splicing the name into the app URL's
778///   path component via [`djogi::migrate::derive_per_database_url`].
779///   Returns `None` when derivation fails (the app URL has no recognisable
780///   path component); the caller surfaces that as a runtime error for the
781///   affected bucket.
782fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
783    // "main" always uses the app URL verbatim — do NOT derive, as the app
784    // URL may not have a path component named "main".
785    if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
786        return Some(db_config.url.clone());
787    }
788    if database == "crud_log"
789        && let Some(u) = db_config.crud_log_url.as_deref()
790        && !u.is_empty()
791    {
792        return Some(u.to_string());
793    }
794    if database == "event_log"
795        && let Some(u) = db_config.event_log_url.as_deref()
796        && !u.is_empty()
797    {
798        return Some(u.to_string());
799    }
800    djogi::migrate::derive_per_database_url(&db_config.url, database)
801}
802
803/// `djogi migrations apply` entry point.
804/// Discovers pending JSON files under `target/djogi_pending/`, loads the
805/// committed replay plan for each, and drives [`djogi::migrate::apply_plan`]
806/// through the library runner after CLI-side ledger-state classification.
807/// `Pending` rows require operator resolution. Caller-gated `Failed`/`RolledBack`
808/// rows are reapply-blocking cleanup candidates before runner invocation. Phase
809/// 0 cleanup is identity-free replay-current-only: seed-capable runtime,
810/// seed-DML non-runtime-current, missing, incomplete, generated-stale, or
811/// ambiguous artifacts refuse before delete.
812pub fn apply_cmd(
813    workspace: Option<PathBuf>,
814    fake: bool,
815    reason: Option<String>,
816    node_id: Option<u32>,
817    single_node_dev: bool,
818) -> ExitCode {
819    let workspace = resolve_workspace(workspace);
820
821    // Validate --fake / --reason pairing before doing any expensive work.
822    let mode = if fake {
823        match reason {
824            Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
825            Some(_) => {
826                eprintln!(
827                    "djogi migrations apply --fake: --reason must not be empty; \
828                     supply a non-empty reason why these migrations are being \
829                     faked (e.g. 'schema pre-exists from prior tooling')"
830                );
831                return ExitCode::from(2);
832            }
833            None => {
834                eprintln!(
835                    "djogi migrations apply --fake: --reason is required; \
836                     supply a reason why these migrations are being faked \
837                     (e.g. 'schema pre-exists from prior tooling'). \
838                     This is recorded in the ledger audit trail."
839                );
840                return ExitCode::from(2);
841            }
842        }
843    } else {
844        FakeMode::Real
845    };
846
847    let runtime = match tokio::runtime::Builder::new_current_thread()
848        .enable_all()
849        .build()
850    {
851        Ok(r) => r,
852        Err(e) => {
853            eprintln!("djogi migrations apply: tokio runtime: {e}");
854            return ExitCode::from(1);
855        }
856    };
857
858    let exit =
859        runtime.block_on(async { run_apply(&workspace, &mode, node_id, single_node_dev).await });
860    ExitCode::from(exit as u8)
861}
862
863/// Controls whether `apply_one_pending` executes SQL or records a
864/// fake-apply row in the ledger.
865#[derive(Debug, Clone)]
866enum FakeMode {
867    /// Execute DDL via `apply_plan`. Normal migration apply.
868    Real,
869    /// Skip DDL; record `status = 'faked'` via `fake_apply_plan`.
870    Fake { reason: String },
871}
872
873/// Async body of [`apply_cmd`]. Returns the desired exit code.
874async fn run_apply(
875    workspace: &Path,
876    mode: &FakeMode,
877    node_id: Option<u32>,
878    single_node_dev: bool,
879) -> i32 {
880    use djogi::config::DjogiConfig;
881
882    let action_verb = match mode {
883        FakeMode::Real => "apply",
884        FakeMode::Fake { .. } => "fake-apply",
885    };
886    let progress_verb = match mode {
887        FakeMode::Real => "applying",
888        FakeMode::Fake { .. } => "faking",
889    };
890
891    // 1. Load config.
892    let config = match DjogiConfig::load_from_workspace(workspace) {
893        Ok(c) => c,
894        Err(e) => {
895            eprintln!("djogi migrations {action_verb}: config load: {e}");
896            return 2;
897        }
898    };
899
900    // 2. Discover pending JSONs before resolving identity or connecting to DB.
901    // No-pending apply (zero pending files) is an identity-free inverse —
902    // skip the resolver and pool connection entirely when no pending plans exist.
903    let pending_files = match discover_pending_plans(workspace) {
904        Ok(pending_files) => pending_files,
905        Err(e) => {
906            eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
907            return 2;
908        }
909    };
910    if pending_files.is_empty() {
911        println!("No pending migrations to {action_verb}.");
912        return 0;
913    }
914
915    // 3. Resolve node identity for identity-bearing operations (only when work exists).
916    // Both real apply and fake-apply are identity-bearing (run-id generation + ledger).
917    let runner_identity = match crate::identity::resolve_identity(
918        node_id,
919        single_node_dev,
920        &config.profile,
921        action_verb,
922    ) {
923        Ok(resolved) => Some(resolved.into_runner_identity()),
924        Err(e) => {
925            let _ = crate::identity::print_identity_error(action_verb, &e);
926            return 2;
927        }
928    };
929
930    // 4. Resolve one URL per pending database target, then connect and
931    // preflight a dedicated context for each database before taking the
932    // workspace lock. The runner routes queries through the supplied
933    // context pool, so apply must bind one context per bucket.database.
934    let target_urls = match resolve_apply_target_urls(&pending_files, &config.database) {
935        Ok(urls) => urls,
936        Err(e) => {
937            eprintln!("djogi migrations {action_verb}: target routing: {e}");
938            return 2;
939        }
940    };
941    let mut contexts = std::collections::BTreeMap::<String, djogi::context::DjogiContext>::new();
942    for (database, url) in &target_urls {
943        match connect_and_check(url).await {
944            ContextOutcome::Ready(ctx) => {
945                contexts.insert(database.clone(), ctx);
946            }
947            ContextOutcome::UnsupportedVersion(e) => {
948                crate::print_support_boundary_error("migrations apply", &e);
949                return 2;
950            }
951            ContextOutcome::RuntimeError(msg) => {
952                eprintln!("djogi migrations {action_verb}: pool for '{database}': {msg}");
953                return 1;
954            }
955        }
956    }
957
958    // 5. Acquire workspace lock.
959    let lock_path = workspace.join(LOCK_FILE_NAME);
960    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
961        Ok(g) => g,
962        Err(e) => {
963            eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
964            return 1;
965        }
966    };
967
968    // 6. Reconcile the pending set under the lock before any cleanup/apply work.
969    let pending_files = match reconcile_pending_plans_after_lock(workspace, &pending_files) {
970        Ok(pending_files) => pending_files,
971        Err(e) => {
972            eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
973            return 2;
974        }
975    };
976
977    // 7. Build audit pool (optional — silently skipped if unavailable).
978    let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
979        Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
980        Err(_) => None,
981    };
982
983    // 8. Apply each pending migration through the context for its
984    // bucket database. The pending discovery sweep already deduped and
985    // preflighted the target database set above.
986    for pending_file in &pending_files {
987        let bucket_database = &pending_file.bucket.database;
988        let app_label = &pending_file.bucket.app;
989        let Some(ctx) = contexts.get_mut(bucket_database) else {
990            eprintln!(
991                "djogi migrations {action_verb}: internal error: missing context for database '{bucket_database}'"
992            );
993            return 1;
994        };
995        println!("  {progress_verb} {bucket_database}/{app_label}...");
996        let result = apply_one_pending(
997            ctx,
998            workspace,
999            pending_file,
1000            &config,
1001            &guard,
1002            audit_pool.as_ref(),
1003            mode,
1004            runner_identity,
1005        )
1006        .await;
1007
1008        match result {
1009            ApplyResult::Ok => match mode {
1010                FakeMode::Real => {
1011                    println!("Applied: {bucket_database}/{app_label}");
1012                }
1013                FakeMode::Fake { .. } => {
1014                    println!(
1015                        "  faked {bucket_database}/{app_label}: \
1016                             recorded in ledger with status = 'faked' (no SQL executed)"
1017                    );
1018                }
1019            },
1020            ApplyResult::Skipped(reason) => {
1021                println!("Skipped {bucket_database}/{app_label}: {reason}");
1022            }
1023            ApplyResult::Refused(reason) => {
1024                eprintln!(
1025                    "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
1026                );
1027                return 2;
1028            }
1029            ApplyResult::RunnerError(e) => {
1030                eprintln!(
1031                    "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
1032                );
1033                return runner_error_exit_code(&e);
1034            }
1035        }
1036    }
1037
1038    let summary_verb = match mode {
1039        FakeMode::Real => "applied",
1040        FakeMode::Fake { .. } => "faked",
1041    };
1042    println!("{summary_verb} {} migration(s).", pending_files.len());
1043    0
1044}
1045
1046/// Outcome of applying a single pending migration.
1047#[derive(Debug)]
1048enum ApplyResult {
1049    /// Migration applied successfully.
1050    Ok,
1051    /// Migration skipped (already applied or no-op).
1052    Skipped(String),
1053    /// User-facing refusal — exit code 2.
1054    Refused(String),
1055    /// Runner error — exit code 1.
1056    RunnerError(RunnerError),
1057}
1058
1059#[derive(Debug, Clone, PartialEq, Eq)]
1060struct DiscoveredPendingPlan {
1061    path: PathBuf,
1062    bucket: BucketKey,
1063    plan: PendingPlan,
1064    is_phase_zero: bool,
1065}
1066
1067fn is_acceptable_pending_path_component(bytes: &[u8]) -> bool {
1068    if bytes.is_empty() || bytes.len() > 63 {
1069        return false;
1070    }
1071    if bytes[0] == b'.' {
1072        return false;
1073    }
1074    let first = bytes[0];
1075    if first != b'_' && !first.is_ascii_alphabetic() {
1076        return false;
1077    }
1078    for &b in &bytes[1..] {
1079        if b != b'_' && !b.is_ascii_alphanumeric() {
1080            return false;
1081        }
1082    }
1083    true
1084}
1085
1086fn canonical_pending_filename(app_label: &str) -> String {
1087    format!("{}.json", djogi::migrate::app_dirname(app_label))
1088}
1089
1090fn validate_hidden_phase_zero_pending(
1091    path: PathBuf,
1092    database: &str,
1093) -> Result<DiscoveredPendingPlan, String> {
1094    let filename = path
1095        .file_name()
1096        .and_then(|f| f.to_str())
1097        .ok_or_else(|| format!("non-utf8 Phase 0 pending path {}", path.display()))?;
1098    let expected_filename = format!("{}.json", djogi::migrate::PHASE_ZERO_VERSION);
1099    if filename != expected_filename {
1100        return Err(format!(
1101            "hidden Phase 0 pending path {} must use canonical filename {}",
1102            path.display(),
1103            expected_filename
1104        ));
1105    }
1106    let plan = djogi::migrate::load_pending(&path)
1107        .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1108    if plan.bucket_database != database {
1109        return Err(format!(
1110            "pending JSON {} has bucket database {}, expected {} from path",
1111            path.display(),
1112            plan.bucket_database,
1113            database
1114        ));
1115    }
1116    if !plan.bucket_app.is_empty() {
1117        return Err(format!(
1118            "pending JSON {} must target the global bucket in hidden Phase 0 namespace",
1119            path.display()
1120        ));
1121    }
1122    if plan.version != djogi::migrate::PHASE_ZERO_VERSION {
1123        return Err(format!(
1124            "pending JSON {} must use Phase 0 version {}, found {}",
1125            path.display(),
1126            djogi::migrate::PHASE_ZERO_VERSION,
1127            plan.version
1128        ));
1129    }
1130    Ok(DiscoveredPendingPlan {
1131        path,
1132        bucket: BucketKey {
1133            database: database.to_string(),
1134            app: String::new(),
1135        },
1136        plan,
1137        is_phase_zero: true,
1138    })
1139}
1140
1141fn validate_normal_pending(
1142    path: PathBuf,
1143    database: &str,
1144    filename: &str,
1145) -> Result<DiscoveredPendingPlan, String> {
1146    let Some(stem) = filename.strip_suffix(".json") else {
1147        return Err(format!(
1148            "pending path {} must end with .json",
1149            path.display()
1150        ));
1151    };
1152    let app = if stem == "_global_" {
1153        String::new()
1154    } else {
1155        if !is_acceptable_pending_path_component(stem.as_bytes()) {
1156            return Err(format!(
1157                "pending path {} uses non-canonical app filename {}",
1158                path.display(),
1159                filename
1160            ));
1161        }
1162        stem.to_string()
1163    };
1164    let expected_filename = canonical_pending_filename(&app);
1165    if filename != expected_filename {
1166        return Err(format!(
1167            "pending path {} must use canonical filename {}",
1168            path.display(),
1169            expected_filename
1170        ));
1171    }
1172    let plan = djogi::migrate::load_pending(&path)
1173        .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1174    if plan.bucket_database != database {
1175        return Err(format!(
1176            "pending JSON {} has bucket database {}, expected {} from path",
1177            path.display(),
1178            plan.bucket_database,
1179            database
1180        ));
1181    }
1182    if plan.bucket_app != app {
1183        let expected_app = if app.is_empty() {
1184            "_global_"
1185        } else {
1186            app.as_str()
1187        };
1188        let found_app = if plan.bucket_app.is_empty() {
1189            "_global_"
1190        } else {
1191            plan.bucket_app.as_str()
1192        };
1193        return Err(format!(
1194            "pending JSON {} has bucket app {}, expected {} from path",
1195            path.display(),
1196            found_app,
1197            expected_app
1198        ));
1199    }
1200    if plan.version == djogi::migrate::PHASE_ZERO_VERSION {
1201        return Err(format!(
1202            "pending JSON {} must use the hidden .phase_zero namespace for Phase 0",
1203            path.display()
1204        ));
1205    }
1206    Ok(DiscoveredPendingPlan {
1207        path,
1208        bucket: BucketKey {
1209            database: database.to_string(),
1210            app,
1211        },
1212        is_phase_zero: false,
1213        plan,
1214    })
1215}
1216
1217/// Scan `target/djogi_pending/` for pending JSON files.
1218/// Returns parsed pending plans sorted by version so Phase 0 runs
1219/// before later normal-global work. Malformed or duplicate pending
1220/// identities refuse rather than being guessed from filenames.
1221fn discover_pending_plans(workspace: &Path) -> Result<Vec<DiscoveredPendingPlan>, String> {
1222    let pending_root = djogi::migrate::pending_root(workspace);
1223    let mut out = Vec::new();
1224    let mut seen_identities = std::collections::BTreeSet::new();
1225
1226    let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
1227        return Ok(out);
1228    };
1229
1230    for db_entry in db_entries.flatten() {
1231        let db_name = match db_entry.file_name().to_str().map(str::to_string) {
1232            Some(n) => n,
1233            None => continue,
1234        };
1235        if !is_acceptable_pending_path_component(db_name.as_bytes()) {
1236            continue;
1237        }
1238
1239        let db_dir = db_entry.path();
1240        if !db_dir.is_dir() {
1241            continue;
1242        }
1243
1244        let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
1245            continue;
1246        };
1247
1248        for app_entry in app_entries.flatten() {
1249            let path = app_entry.path();
1250            let file_type = match app_entry.file_type() {
1251                Ok(file_type) => file_type,
1252                Err(_) => continue,
1253            };
1254            if file_type.is_dir() {
1255                if app_entry.file_name().to_str() == Some(".phase_zero") {
1256                    let Ok(phase_zero_entries) = std::fs::read_dir(&path) else {
1257                        continue;
1258                    };
1259                    for phase_zero_entry in phase_zero_entries.flatten() {
1260                        let phase_zero_path = phase_zero_entry.path();
1261                        if !phase_zero_path.is_file() {
1262                            continue;
1263                        }
1264                        let discovered =
1265                            validate_hidden_phase_zero_pending(phase_zero_path, &db_name)?;
1266                        let identity = (
1267                            discovered.bucket.database.clone(),
1268                            discovered.bucket.app.clone(),
1269                            discovered.plan.version.clone(),
1270                        );
1271                        if !seen_identities.insert(identity.clone()) {
1272                            return Err(format!(
1273                                "duplicate pending identity discovered for {}/{}/{}",
1274                                identity.0,
1275                                if identity.1.is_empty() {
1276                                    "_global_"
1277                                } else {
1278                                    identity.1.as_str()
1279                                },
1280                                identity.2
1281                            ));
1282                        }
1283                        out.push(discovered);
1284                    }
1285                }
1286                continue;
1287            }
1288            if !file_type.is_file() {
1289                continue;
1290            }
1291            let filename = match path.file_name().and_then(|f| f.to_str()) {
1292                Some(f) => f.to_string(),
1293                None => continue,
1294            };
1295            if !filename.ends_with(".json") {
1296                continue;
1297            }
1298            let discovered = validate_normal_pending(path, &db_name, &filename)?;
1299            let identity = (
1300                discovered.bucket.database.clone(),
1301                discovered.bucket.app.clone(),
1302                discovered.plan.version.clone(),
1303            );
1304            if !seen_identities.insert(identity.clone()) {
1305                return Err(format!(
1306                    "duplicate pending identity discovered for {}/{}/{}",
1307                    identity.0,
1308                    if identity.1.is_empty() {
1309                        "_global_"
1310                    } else {
1311                        identity.1.as_str()
1312                    },
1313                    identity.2
1314                ));
1315            }
1316            out.push(discovered);
1317        }
1318    }
1319
1320    // Stage 1 — global stable order: version, then phase-zero precedence,
1321    // then path (also the within-group tiebreak seed).
1322    out.sort_by(|a, b| {
1323        a.plan
1324            .version
1325            .cmp(&b.plan.version)
1326            .then_with(|| b.is_phase_zero.cmp(&a.is_phase_zero))
1327            .then_with(|| a.path.cmp(&b.path))
1328    });
1329
1330    // Stage 2: within each (database, version, is_phase_zero) group,
1331    // reorder by the recorded depends_on (Kahn; stage-1 alphabetical order
1332    // is the deterministic tiebreak). Dependencies naming buckets outside
1333    // the group are ignored — their migrations applied in an earlier run.
1334    // A cycle is a compose bug or a hand-edited pending file; refuse loudly.
1335    let out = order_pending_groups_by_dependencies(out)?;
1336
1337    Ok(out)
1338}
1339
1340/// Within each same-(database, version, is_phase_zero) group, reorder by
1341/// the recorded depends_on list using Kahn's algorithm. The stage-1 sort
1342/// provides a deterministic alphabetical tiebreak for nodes with equal
1343/// in-degree. Dependencies on buckets not present in the current group
1344/// are ignored (their migrations already applied). Returns an error on
1345/// cycle — the compose side should have caught this, but apply guards
1346/// against hand-edited or corrupted pending files.
1347///
1348/// Algorithmic twin of `order_buckets` in compose.rs; kept local because
1349/// the CLI cannot call private compose helpers across crates.
1350fn order_pending_groups_by_dependencies(
1351    out: Vec<DiscoveredPendingPlan>,
1352) -> Result<Vec<DiscoveredPendingPlan>, String> {
1353    // Group by (database, version, is_phase_zero). Since stage 1 already
1354    // sorted by these keys, consecutive entries share the same group.
1355    let mut result = Vec::with_capacity(out.len());
1356    let mut i = 0;
1357    while i < out.len() {
1358        let mut j = i + 1;
1359        while j < out.len()
1360            && out[j].bucket.database == out[i].bucket.database
1361            && out[j].plan.version == out[i].plan.version
1362            && out[j].is_phase_zero == out[i].is_phase_zero
1363        {
1364            j += 1;
1365        }
1366
1367        // Validate depends_on labels for all entries in this group before
1368        // any topo-sort (including the singleton fast-path that bypasses it).
1369        // Discovery validates pending *filenames*, but depends_on labels live
1370        // inside the pending JSON and are otherwise unchecked — a hand-edited
1371        // or corrupted label (path traversal, whitespace) would slip through
1372        // the singleton fast-path silently.
1373        for entry in &out[i..j] {
1374            for dep_app in &entry.plan.depends_on {
1375                if !is_acceptable_pending_path_component(dep_app.as_bytes()) {
1376                    return Err(format!(
1377                        "pending plan for {}/{} has invalid depends_on label {:?}",
1378                        entry.bucket.database, entry.bucket.app, dep_app,
1379                    ));
1380                }
1381            }
1382        }
1383
1384        // Process the group [i..j)
1385        if j - i <= 1 {
1386            // Single-element or empty group: no reordering needed.
1387            result.append(&mut out[i..j].to_vec());
1388            i = j;
1389            continue;
1390        }
1391
1392        let database = &out[i].bucket.database;
1393        let version = &out[i].plan.version;
1394
1395        // Build the dependency graph within this group.
1396        let group_len = j - i;
1397        let mut in_degree = vec![0usize; group_len];
1398        let mut reverse: Vec<Vec<usize>> = vec![Vec::new(); group_len];
1399
1400        // Build app→index lookup for this group (O(n)).
1401        let app_to_idx: std::collections::HashMap<&str, usize> = out[i..j]
1402            .iter()
1403            .enumerate()
1404            .map(|(idx, entry)| (entry.bucket.app.as_str(), idx))
1405            .collect();
1406
1407        for (k_idx, entry) in out[i..j].iter().enumerate() {
1408            for dep_app in &entry.plan.depends_on {
1409                let Some(&dep_idx) = app_to_idx.get(dep_app.as_str()) else {
1410                    continue; // outside group — ignore (REQ-398-6)
1411                };
1412                if dep_idx != k_idx {
1413                    in_degree[k_idx] += 1;
1414                    reverse[dep_idx].push(k_idx);
1415                }
1416            }
1417        }
1418
1419        // Kahn's algorithm with BTreeSet for deterministic (min-first) tiebreak.
1420        let mut ready: std::collections::BTreeSet<usize> =
1421            (0..group_len).filter(|&idx| in_degree[idx] == 0).collect();
1422
1423        let mut ordered = Vec::with_capacity(group_len);
1424        while let Some(idx) = ready.iter().next().cloned() {
1425            ready.remove(&idx);
1426            ordered.push(idx);
1427            for &dependent in &reverse[idx] {
1428                in_degree[dependent] -= 1;
1429                if in_degree[dependent] == 0 {
1430                    ready.insert(dependent);
1431                }
1432            }
1433        }
1434
1435        if ordered.len() != group_len {
1436            let mut chain: Vec<String> = (0..group_len)
1437                .filter(|&idx| in_degree[idx] > 0)
1438                .map(|idx| out[i + idx].bucket.app.clone())
1439                .collect();
1440            chain.sort();
1441            return Err(format!(
1442                "pending migrations for database `{database}` version `{version}` \
1443                 declare a dependency cycle between apps: {chain:?}; \
1444                 recompose or inspect hand-edited pending files"
1445            ));
1446        }
1447
1448        for idx in ordered {
1449            result.push(out[i + idx].clone());
1450        }
1451        i = j;
1452    }
1453
1454    Ok(result)
1455}
1456
1457fn load_verified_pending_for_apply(
1458    pending_file: &DiscoveredPendingPlan,
1459) -> Result<PendingPlan, String> {
1460    let pending_bytes =
1461        std::fs::read(&pending_file.path).map_err(|e| format!("read pending JSON: {e}"))?;
1462    let pending: PendingPlan =
1463        serde_json::from_slice(&pending_bytes).map_err(|e| format!("parse pending JSON: {e}"))?;
1464    if pending != pending_file.plan {
1465        return Err(format!(
1466            "pending JSON changed after discovery at {}; rerun the command",
1467            pending_file.path.display()
1468        ));
1469    }
1470    Ok(pending)
1471}
1472
1473fn resolve_apply_target_urls(
1474    pending_files: &[DiscoveredPendingPlan],
1475    db_config: &djogi::config::DatabaseConfig,
1476) -> Result<std::collections::BTreeMap<String, String>, String> {
1477    let mut urls = std::collections::BTreeMap::new();
1478    for pending_file in pending_files {
1479        let database = &pending_file.bucket.database;
1480        if urls.contains_key(database) {
1481            continue;
1482        }
1483        let Some(url) = resolve_bucket_url(db_config, database) else {
1484            return Err(format!("cannot derive a database URL for `{database}`"));
1485        };
1486        urls.insert(database.clone(), url);
1487    }
1488    Ok(urls)
1489}
1490
1491fn reconcile_pending_plans_after_lock(
1492    workspace: &Path,
1493    pre_lock_pending_files: &[DiscoveredPendingPlan],
1494) -> Result<Vec<DiscoveredPendingPlan>, String> {
1495    let locked_pending_files = discover_pending_plans(workspace)?;
1496    if locked_pending_files != pre_lock_pending_files {
1497        return Err(
1498            "pending migration set changed while waiting for the workspace lock; rerun the command"
1499                .to_string(),
1500        );
1501    }
1502    Ok(locked_pending_files)
1503}
1504
1505/// Apply a single pending migration.
1506/// Re-loads the pending JSON after discovery and refuses if the bytes no
1507/// longer match the path-verified artifact, then checks the ledger-state
1508/// classification, loads the committed replay plan (or falls back to a
1509/// single-segment plan from the SQL file), and drives
1510/// [`djogi::migrate::apply_plan`]. `Pending` rows require operator resolution;
1511/// caller-gated `Failed`/`RolledBack` rows are reapply-blocking cleanup
1512/// candidates before runner invocation. Phase 0 cleanup refuses anything other
1513/// than identity-free replay-current before delete.
1514/// Uses the bypass attribute because deleting reapply-blocking
1515/// Failed/RolledBack ledger rows requires raw SQL that is not exposed through
1516/// the public typed API.
1517// apply_one_pending carries 9 arguments because it sits at the bridge
1518// between the CLI dispatch (workspace, path, bucket info) and the
1519// library runner (config, guard, audit pool, mode). Folding these into a
1520// struct would push the same fields onto the caller and add churn for
1521// no clarity gain — the pattern matches compose_with_inputs and attune.
1522#[allow(clippy::too_many_arguments)]
1523#[djogi::deliberately_bypass_convention_with_raw_sql]
1524// JUSTIFICATION (PIN): apply_one_pending owns the shared cleanup path for
1525// caller-gated Failed/RolledBack rows via
1526// `DELETE FROM djogi_schema_migrations WHERE version = $1 AND app_label = $2`.
1527// The public API has no delete operation — `select_all_ledger_rows` is read-only
1528// and `insert_pending` is write-only. This is the minimal raw SQL surface for
1529// reapply-blocking ledger-row cleanup.
1530async fn apply_one_pending(
1531    ctx: &mut djogi::context::DjogiContext,
1532    workspace: &Path,
1533    pending_file: &DiscoveredPendingPlan,
1534    config: &djogi::config::DjogiConfig,
1535    guard: &djogi::migrate::WorkspaceGuard,
1536    audit_pool: Option<&deadpool_postgres::Pool>,
1537    mode: &FakeMode,
1538    runner_identity: Option<djogi::migrate::RunnerIdentity>,
1539) -> ApplyResult {
1540    // 1. Parse pending JSON to get bucket + version + checksums.
1541    let pending = match load_verified_pending_for_apply(pending_file) {
1542        Ok(pending) => pending,
1543        Err(e) => return ApplyResult::Refused(e),
1544    };
1545
1546    let bucket = pending_file.bucket.clone();
1547
1548    // 2. Check ledger state machine for this (version, app_label) stream.
1549    match check_ledger_state(ctx, &pending.version, &bucket.app).await {
1550        LedgerState::NotPresent => {} /* normal path */
1551        LedgerState::AlreadyApplied => {
1552            return ApplyResult::Skipped("already applied".to_string());
1553        }
1554        LedgerState::PendingOrPartial(existing_status) => {
1555            // Pending rows require explicit operator resolution.
1556            // Caller-gated Failed and RolledBack rows are reapply-blocking
1557            // cleanup candidates before runner invocation.
1558            if existing_status == LedgerStatus::Failed
1559                || existing_status == LedgerStatus::RolledBack
1560            {
1561                // #386: Phase 0 cleanup must classify before deleting.
1562                // Load the committed replay plan or fallback SQL first,
1563                // and refuse any non-identity-free Phase 0 artifact before
1564                // removing the failed/rolled_back row. This applies to both
1565                // real apply and fake apply paths.
1566                if pending.version == djogi::migrate::PHASE_ZERO_VERSION {
1567                    let cleanup_refusal = classify_phase_zero_for_cleanup(
1568                        workspace,
1569                        &bucket,
1570                        &pending.version,
1571                        &pending.checksum_up,
1572                        pending.checksum_down.as_deref(),
1573                    );
1574                    if let Some(reason) = cleanup_refusal {
1575                        return ApplyResult::Refused(format!(
1576                            "Phase 0 cleanup refused: {reason}; \
1577                             refusing before deleting {} row to prevent stale replay",
1578                            existing_status.as_db_str()
1579                        ));
1580                    }
1581                }
1582
1583                // Failed and RolledBack rows both block re-apply, but callers
1584                // gate which statuses may be cleaned before reaching this
1585                // status-agnostic DELETE helper.
1586                if let Err(e) =
1587                    delete_reapply_blocking_ledger_row(ctx, &pending.version, &bucket.app).await
1588                {
1589                    return ApplyResult::Refused(format!(
1590                        "clean {} ledger row: {e}",
1591                        existing_status.as_db_str()
1592                    ));
1593                }
1594            } else {
1595                return ApplyResult::Refused(format!(
1596                    "version already in {} state — resolve before re-applying",
1597                    existing_status.as_db_str()
1598                ));
1599            }
1600        }
1601    }
1602
1603    // 3. Load committed replay plan (or fall back to single-segment).
1604    let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1605        workspace,
1606        &bucket,
1607        &pending.version,
1608        &pending.checksum_up,
1609        pending.checksum_down.as_deref(),
1610    ) {
1611        Ok(result) => result,
1612        Err(e) => {
1613            return ApplyResult::Refused(format!("load replay plan: {e}"));
1614        }
1615    };
1616
1617    // 4. Construct RunnerCtx.
1618    let runner_ctx = RunnerCtx {
1619        bucket: bucket.clone(),
1620        version: pending.version.clone(),
1621        description: pending.slug.clone(),
1622        checksum_up,
1623        checksum_down,
1624        snapshot: Some(pending.model_snapshot.clone()),
1625        snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
1626        // MigrateConfig does not derive Clone; construct from fields.
1627        config: djogi::config::MigrateConfig {
1628            concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1629            strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1630            pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1631            pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1632        },
1633        out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1634        audit_pool: audit_pool.cloned(),
1635        runner_identity,
1636    };
1637
1638    // 5. Apply (or fake-apply) the plan through the library runner.
1639    let runner_result = match mode {
1640        FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1641        FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1642    };
1643    match runner_result {
1644        Ok(_) => ApplyResult::Ok,
1645        Err(e) => ApplyResult::RunnerError(e),
1646    }
1647}
1648
1649/// Ledger state for a given migration version.
1650#[derive(Debug)]
1651enum LedgerState {
1652    /// No row exists — first apply.
1653    NotPresent,
1654    /// Row exists and is in terminal applied state.
1655    AlreadyApplied,
1656    /// Row exists in a non-terminal state with the specific status.
1657    PendingOrPartial(LedgerStatus),
1658}
1659
1660/// Check the ledger for an existing row matching `(version, app_label)`.
1661async fn check_ledger_state(
1662    ctx: &mut djogi::context::DjogiContext,
1663    version: &str,
1664    app_label: &str,
1665) -> LedgerState {
1666    let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1667        // Ledger table might not exist yet — treat as NotPresent so
1668        // the runner can bootstrap it.
1669        return LedgerState::NotPresent;
1670    };
1671
1672    let existing = rows
1673        .iter()
1674        .find(|r| r.version == version && r.app_label == app_label);
1675    match existing {
1676        None => LedgerState::NotPresent,
1677        Some(row) => match row.status {
1678            LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1679                LedgerState::AlreadyApplied
1680            }
1681            LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1682                LedgerState::PendingOrPartial(row.status)
1683            }
1684        },
1685    }
1686}
1687
1688/// Map a [`RunnerError`] to an exit code.
1689/// All runner errors map to exit code 1 (apply failure). Exit code 2
1690/// is reserved for user-facing refusals that happen before the runner
1691/// is invoked.
1692fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1693    1
1694}
1695
1696#[djogi::deliberately_bypass_convention_with_raw_sql]
1697// JUSTIFICATION (PIN): delete_reapply_blocking_ledger_row removes a caller-
1698// gated Failed or RolledBack row so the migration can be retried. The public
1699// API has no delete operation for ledger rows — only select_all_ledger_rows
1700// and insert_pending are exposed. This DELETE is the minimal raw SQL for
1701// reapply-blocking ledger-row cleanup.
1702async fn delete_reapply_blocking_ledger_row(
1703    ctx: &mut djogi::context::DjogiContext,
1704    version: &str,
1705    app_label: &str,
1706) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1707    ctx.raw_execute(
1708        "DELETE FROM djogi_schema_migrations \
1709         WHERE version = $1 AND app_label = $2",
1710        &[&version, &app_label],
1711    )
1712    .await?;
1713    Ok(())
1714}
1715
1716/// Reconstruct the snapshot path for a bucket: `migrations/<database>/<app>/schema_snapshot.json`.
1717fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1718    let migrations_root = djogi::migrate::migrations_root(workspace);
1719    migrations_root
1720        .join(&bucket.database)
1721        .join(djogi::migrate::app_dirname(&bucket.app))
1722        .join("schema_snapshot.json")
1723}
1724
1725/// `djogi migrations attune` entry point.
1726/// Mode selection (per CLI flags):
1727/// | `--record-ledger` | `--squash` | resolved mode |
1728/// |-----------|-----------|---------------|
1729/// | false | false | [`AttuneMode::DiffOnly`] (read-only diff) |
1730/// | true | false | [`AttuneMode::Record`] |
1731/// | false | true | [`AttuneMode::Squash { from, publish, app }`] |
1732/// | true | true | rejected by clap (`conflicts_with`) |
1733/// Argument semantics:
1734/// - `target` is an optional positional Git target (commit / tag /
1735///   branch). When supplied, attune resolves it (local first, fetch
1736///   on miss) before any DB / disk mutation.
1737/// - `apply` gates DB / disk mutation. Without it, every mode is a
1738///   dry-run.
1739/// - `record` controls the parent repo's recorded submodule pointer
1740///   (separate from `record_ledger`, which controls the
1741///   `djogi_schema_migrations` ledger inserts).
1742///   `--squash` requires `--from <ver>`; an absent `from` while
1743///   `--squash` is set surfaces as a CLI error before any work happens.
1744// The CLI dispatch carries 11 inputs because the attune surface is
1745// the broadest in the migrations CLI — target
1746// resolution + dry-run + record-ledger + record-pointer + squash +
1747// publish all live on the same command. Folding them into a struct
1748// would push the same fields onto the caller; the dispatch above
1749// already passes them positionally and a struct refactor would be
1750// churn for no clarity gain.
1751#[allow(clippy::too_many_arguments)]
1752pub fn attune_cmd(
1753    target: Option<&str>,
1754    apply: bool,
1755    record: bool,
1756    record_ledger: bool,
1757    record_reason: &str,
1758    squash: bool,
1759    from: Option<&str>,
1760    publish: bool,
1761    app: Option<&str>,
1762    workspace: Option<PathBuf>,
1763) -> ExitCode {
1764    let workspace = resolve_workspace(workspace);
1765    let mode = match (record_ledger, squash) {
1766        (false, false) => AttuneMode::DiffOnly,
1767        (true, false) => AttuneMode::Record {
1768            reason: record_reason.to_string(),
1769        },
1770        (false, true) => match from {
1771            Some(v) if !v.is_empty() => AttuneMode::Squash {
1772                from: v.to_string(),
1773                publish,
1774                app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1775            },
1776            _ => {
1777                eprintln!(
1778                    "djogi migrations attune --squash requires --from <version> (e.g. \
1779                     `--from V20260101000000__init`)"
1780                );
1781                return ExitCode::from(2);
1782            }
1783        },
1784        (true, true) => {
1785            // Already rejected by clap's `conflicts_with`; this branch
1786            // is defensive in case the flag is added programmatically.
1787            eprintln!(
1788                "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1789            );
1790            return ExitCode::from(2);
1791        }
1792    };
1793
1794    let runtime = match tokio::runtime::Builder::new_current_thread()
1795        .enable_all()
1796        .build()
1797    {
1798        Ok(r) => r,
1799        Err(e) => {
1800            eprintln!("djogi migrations attune: tokio runtime: {e}");
1801            return ExitCode::from(1);
1802        }
1803    };
1804
1805    let target_owned = target.map(str::to_string);
1806    let exit =
1807        runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1808    ExitCode::from(exit as u8)
1809}
1810
1811/// Async body of [`attune_cmd`]. Loads config, builds the context,
1812/// acquires the workspace lock, invokes the library entry point.
1813async fn run_attune(
1814    workspace: &Path,
1815    mode: AttuneMode,
1816    target: Option<String>,
1817    apply: bool,
1818    record: bool,
1819) -> i32 {
1820    use djogi::config::DjogiConfig;
1821
1822    let config = match DjogiConfig::load_from_workspace(workspace) {
1823        Ok(c) => c,
1824        Err(e) => {
1825            eprintln!("djogi migrations attune: config load: {e}");
1826            return 1;
1827        }
1828    };
1829
1830    let mut ctx = match connect_and_check(&config.database.url).await {
1831        ContextOutcome::Ready(ctx) => ctx,
1832        ContextOutcome::UnsupportedVersion(e) => {
1833            crate::print_support_boundary_error("migrations attune", &e);
1834            return 2;
1835        }
1836        ContextOutcome::RuntimeError(msg) => {
1837            eprintln!("djogi migrations attune: pool: {msg}");
1838            return 1;
1839        }
1840    };
1841
1842    // All three modes acquire the workspace lock per the v3 file-lock
1843    // contract — even DiffOnly takes the lock so a concurrent compose
1844    // / apply cannot mutate the tree mid-scan.
1845    let lock_path = workspace.join(LOCK_FILE_NAME);
1846    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1847        Ok(g) => g,
1848        Err(e) => {
1849            eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
1850            return 1;
1851        }
1852    };
1853
1854    let req = AttuneRequest {
1855        workspace_root: workspace,
1856        database_url: &config.database.url,
1857        profile: &config.profile,
1858        // Thread `[database].dev_mode` to the squash gate. Read-only modes
1859        // (`DiffOnly`, `Record`) ignore it; `Squash` mode refuses unless
1860        // this is `true`.
1861        dev_mode: config.database.dev_mode,
1862        // The operator-supplied target + the `--apply` / `--record` gates
1863        // flow through to the library
1864        // entry point. The library owns the resolution + parent-pointer
1865        // update; the CLI is just plumbing.
1866        target: target.as_deref(),
1867        apply,
1868        record,
1869        mode,
1870        _guard: &guard,
1871    };
1872    match attune(&mut ctx, req).await {
1873        Ok(report) => {
1874            if report.entries.is_empty() {
1875                println!("attune: no drift");
1876            } else {
1877                for entry in &report.entries {
1878                    let app_display = if entry.bucket.app.is_empty() {
1879                        "_global_"
1880                    } else {
1881                        entry.bucket.app.as_str()
1882                    };
1883                    println!(
1884                        "  {kind:<10}  {database}/{app}  {version}",
1885                        kind = entry.kind.as_str(),
1886                        database = entry.bucket.database,
1887                        app = app_display,
1888                        version = entry.version,
1889                    );
1890                }
1891            }
1892            // Surface structured diagnostics — today this carries the
1893            // LedgerTableMissing notice when DiffOnly runs on a
1894            // fresh database.
1895            for diag in &report.diagnostics {
1896                println!("  diagnostic: {diag}");
1897            }
1898            if let Some(sha) = &report.resolved_target {
1899                println!("resolved target: {sha}");
1900            }
1901            if let Some(squashed) = &report.squashed_to {
1902                println!("squashed to: {squashed}");
1903            }
1904            if report.published {
1905                println!("published to remote");
1906            }
1907            if report.parent_pointer_updated {
1908                println!("parent submodule pointer updated");
1909            }
1910            0
1911        }
1912        Err(e) => {
1913            eprintln!("djogi migrations attune: {e}");
1914            attune_error_exit_code(&e)
1915        }
1916    }
1917}
1918
1919/// Map an [`AttuneError`] variant onto the documented exit-code
1920/// matrix (`docs/spec/configuration.md` §14):
1921/// - Refusal variants → exit code `2` ("operator must intervene;
1922///   nothing happened"). Today every refusal flows through
1923///   [`AttuneError::Refused`]; the localhost gate, the dev-profile
1924///   gate, the missing-version refusal, and the ambiguous-version
1925///   refusal are all reachable through that variant.
1926/// - Runtime variants → exit code `1` ("we tried; something broke"
1927///   filesystem scan, ledger query, SQL read/write/delete, git
1928///   publish). CI may safely retry these.
1929///   Pulled out as a free function so unit tests can pin every variant
1930///   without spinning a Tokio runtime. Operators rely on the 1-vs-2
1931///   distinction to tell "refused before any side effect" from "ran and
1932///   failed mid-flight".
1933fn attune_error_exit_code(err: &AttuneError) -> i32 {
1934    match err {
1935        AttuneError::Refused(_) => 2,
1936        AttuneError::FilesystemScanFailed { .. }
1937        | AttuneError::LedgerQueryFailed { .. }
1938        | AttuneError::SqlReadFailed { .. }
1939        | AttuneError::SqlWriteFailed { .. }
1940        | AttuneError::SqlDeleteFailed { .. }
1941        | AttuneError::GitPublishFailed { .. }
1942        | AttuneError::GitTargetResolveFailed { .. }
1943        | AttuneError::GitFetchFailed { .. }
1944        | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
1945    }
1946}
1947
1948/// `djogi migrations verify` entry point.
1949/// Read-only — does not acquire the workspace lock. Reads the live
1950/// Postgres catalog via [`djogi::context::DjogiContext`] and compares
1951/// against the projected schema from the descriptor inventory.
1952/// Exit codes: 0 on success (no error-level diagnostics), 1 on runtime
1953/// error (config / network / SQL / projection), 2 on refusal
1954/// (below PG 18).
1955pub fn verify_cmd(
1956    provider: &dyn DescriptorProvider,
1957    workspace: Option<PathBuf>,
1958    strict: bool,
1959) -> ExitCode {
1960    let workspace = resolve_workspace(workspace);
1961
1962    let runtime = match tokio::runtime::Builder::new_current_thread()
1963        .enable_all()
1964        .build()
1965    {
1966        Ok(r) => r,
1967        Err(e) => {
1968            eprintln!("djogi migrations verify: tokio runtime: {e}");
1969            return ExitCode::from(1);
1970        }
1971    };
1972
1973    let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
1974    ExitCode::from(exit as u8)
1975}
1976
1977/// Async body of [`verify_cmd`]. Returns the desired exit code.
1978/// Verify is multi-database aware: each `(database, app)` bucket is routed
1979/// to the pool for its `database` component via [`resolve_bucket_url`], and
1980/// the per-database context is connected lazily and cached so a database
1981/// with several app buckets connects once. The bucket set is the UNION of
1982/// the inventory projection and the on-disk snapshot tree, so an orphaned
1983/// snapshot (a removed app's snapshot still on disk) is verified and
1984/// surfaces drift rather than being silently skipped .
1985/// Exit codes:
1986/// - `0` — every bucket verified with no error-severity diagnostic.
1987/// - `1` — at least one runtime failure (pool / snapshot / verify error)
1988///   or at least one bucket reported an error-severity diagnostic.
1989/// - `2` — the server is below the minimum supported Postgres version
1990///   (a server-global refusal: verify returns immediately).
1991async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
1992    use djogi::config::DjogiConfig;
1993
1994    // 0. Zero-descriptor refusal (§5.6 / REQ-370-8). `verify` refuses with
1995    // the dual-cause diagnostic + exit 2 ONLY when there are NEITHER
1996    // descriptors NOR on-disk snapshots — the genuinely unusable state
1997    // (a standalone binary with nothing to verify against). When
1998    // snapshots exist, verify DEGRADES to snapshot-only (the union below
1999    // enumerates the disk buckets), so we must not refuse here.
2000    // Guard on `provider.models().is_empty()` rather than the projected
2001    // `bucket_set`: projection always seeds the synthetic global bucket
2002    // (`(main, "")`), so the bucket set is never empty and is the wrong
2003    // signal for "no descriptors". This is the same guard the
2004    // compose/schema/docs gates in `lib.rs` use.
2005    if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
2006        crate::print_zero_descriptor_diagnostic("migrations verify");
2007        return 2;
2008    }
2009
2010    // 1. Load config from workspace.
2011    let config = match DjogiConfig::load_from_workspace(workspace) {
2012        Ok(c) => c,
2013        Err(e) => {
2014            eprintln!("djogi migrations verify: config load: {e}");
2015            return 1;
2016        }
2017    };
2018
2019    // 2. Project schema from descriptor provider.
2020    let models = match project_from_provider(provider) {
2021        Ok(m) => m,
2022        Err(e) => {
2023            eprintln!("djogi migrations verify: projection error: {e}");
2024            return 1;
2025        }
2026    };
2027
2028    // 3. Build the bucket set as the UNION of the inventory projection and
2029    // the on-disk snapshot tree . An orphaned snapshot
2030    // a removed app whose snapshot still sits on disk — is absent from
2031    // `models` but present on disk; without the union it would never be
2032    // verified and out-of-band drift would go unreported.
2033    let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
2034        models.keys().cloned().collect();
2035    for bucket in discover_snapshot_buckets_on_disk(workspace) {
2036        bucket_set.insert(bucket);
2037    }
2038    // The zero-descriptor refusal (step 0) already returned for the only
2039    // state that yields an empty bucket set (no descriptors + no snapshots).
2040    // Projection always seeds the synthetic global bucket, so reaching here
2041    // with an empty set is impossible; if a future projection change ever
2042    // breaks that invariant, fail closed with the dual-cause refusal rather
2043    // than silently reporting success on a binary that verified nothing.
2044    if bucket_set.is_empty() {
2045        crate::print_zero_descriptor_diagnostic("migrations verify");
2046        return 2;
2047    }
2048
2049    // 4. Policy configuration for the --strict flag.
2050    let policy = djogi::config::PolicyConfig {
2051        strict_out_of_order: strict,
2052    };
2053
2054    // 5. Pre-compute the set of databases that have at least one INVENTORY
2055    // bucket with non-empty models. Orphan-only databases (snapshots on
2056    // disk but no registered models) are excluded — `unwrap_or(false)`
2057    // treats a disk-only bucket as model-less. This gates D699 inside
2058    // `verify_bucket`: an orphan-only database has no live tables to
2059    // miss, so D601 is the actionable signal instead.
2060    let database_has_models: std::collections::HashSet<String> = bucket_set
2061        .iter()
2062        .filter(|b| {
2063            models
2064                .get(*b)
2065                .map(|s| !s.models.is_empty())
2066                .unwrap_or(false)
2067        })
2068        .map(|b| b.database.clone())
2069        .collect();
2070
2071    // 6. Per-database context cache + dedup sets. Contexts are connected
2072    // lazily (only for databases that have a bucket needing a live read)
2073    // and reused across that database's app buckets. `seen_ledger_databases`
2074    // ensures the ledger-lifecycle diagnostics (D621/D622/D699) are
2075    // emitted once per database, not once per app bucket.
2076    let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
2077        std::collections::BTreeMap::new();
2078    let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
2079    let mut exit_code: i32 = 0;
2080
2081    // 7. Verify each bucket.
2082    for bucket in &bucket_set {
2083        // a. Resolve the per-database URL.
2084        let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
2085            let bd = if bucket.app.is_empty() {
2086                "_global_"
2087            } else {
2088                &bucket.app
2089            };
2090            eprintln!(
2091                "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
2092                 check that config.database.url has a valid path component",
2093                bucket.database, bucket.database, bd
2094            );
2095            exit_code = 1;
2096            continue;
2097        };
2098
2099        // b. Connect (lazily, once per distinct database). PG < 18 is a
2100        // server-global refusal — there is no point continuing to other
2101        // buckets, so we return 2 immediately.
2102        if !contexts.contains_key(&bucket.database) {
2103            match connect_and_check(&url).await {
2104                ContextOutcome::Ready(ctx) => {
2105                    contexts.insert(bucket.database.clone(), ctx);
2106                }
2107                ContextOutcome::UnsupportedVersion(e) => {
2108                    crate::print_support_boundary_error("migrations verify", &e);
2109                    return 2;
2110                }
2111                ContextOutcome::RuntimeError(msg) => {
2112                    eprintln!(
2113                        "djogi migrations verify: pool for '{}': {msg}",
2114                        bucket.database
2115                    );
2116                    exit_code = 1;
2117                    continue;
2118                }
2119            }
2120        }
2121
2122        // c. Load the snapshot. A missing snapshot for a bucket that HAS
2123        // registered models is a hard error (exit 1) — the operator must
2124        // record a baseline; a missing snapshot for a model-less bucket
2125        // is informational.
2126        let snap_path = snapshot_path(workspace, bucket);
2127        let snapshot = match load_snapshot(&snap_path) {
2128            Ok(s) => s,
2129            Err(SnapshotError::Io { source, .. })
2130                if source.kind() == std::io::ErrorKind::NotFound =>
2131            {
2132                let bd = if bucket.app.is_empty() {
2133                    "_global_"
2134                } else {
2135                    &bucket.app
2136                };
2137                let has_models = models
2138                    .get(bucket)
2139                    .map(|s| !s.models.is_empty())
2140                    .unwrap_or(false);
2141                if has_models {
2142                    eprintln!(
2143                        "djogi migrations verify: {}/{} has registered models but no \
2144                         snapshot; run `djogi migrations compose` then \
2145                         `djogi migrations apply` to record a baseline",
2146                        bucket.database, bd
2147                    );
2148                    exit_code = 1;
2149                } else {
2150                    println!("No snapshot found for bucket {}/{}", bucket.database, bd);
2151                }
2152                continue;
2153            }
2154            Err(e) => {
2155                let bd = if bucket.app.is_empty() {
2156                    "_global_"
2157                } else {
2158                    &bucket.app
2159                };
2160                eprintln!(
2161                    "djogi migrations verify: load snapshot for {}/{}: {e}",
2162                    bucket.database, bd
2163                );
2164                exit_code = 1;
2165                continue;
2166            }
2167        };
2168
2169        // d. Compute ledger-emission flags. The ledger is shared per
2170        // database; emit its lifecycle diagnostics once per database
2171        // (the first bucket of each database that reaches this point),
2172        // and only for databases that actually have registered models.
2173        let db_has_models = database_has_models.contains(&bucket.database);
2174        let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
2175
2176        // e. Run the bucket-scoped verify against the routed context.
2177        let ctx = contexts
2178            .get_mut(&bucket.database)
2179            .expect("context inserted above");
2180        let report = match djogi::migrate::verify_bucket(
2181            ctx,
2182            bucket,
2183            &snapshot,
2184            &policy,
2185            emit_ledger,
2186            db_has_models,
2187        )
2188        .await
2189        {
2190            Ok(r) => r,
2191            Err(e) => {
2192                let bd = if bucket.app.is_empty() {
2193                    "_global_"
2194                } else {
2195                    &bucket.app
2196                };
2197                eprintln!(
2198                    "djogi migrations verify: error for {}/{}: {e}",
2199                    bucket.database, bd
2200                );
2201                exit_code = 1;
2202                continue;
2203            }
2204        };
2205
2206        // f. Render and fold the bucket's error state into the exit code.
2207        for line in render_verify_report(&report, bucket) {
2208            println!("{line}");
2209        }
2210        if report.has_errors() {
2211            exit_code = 1;
2212        }
2213    }
2214
2215    exit_code
2216}
2217
2218/// Render a [`VerifyReport`] to a vector of output lines.
2219/// Format: one line per diagnostic with severity prefix, code, location,
2220/// and message. Summary line at the end. Output is deterministic because
2221/// `report.diagnostics` is already sorted by `(code, location)`.
2222/// Returns the lines instead of printing directly so the rendering is unit-
2223/// testable ; the caller iterates the returned vector and prints each
2224/// line. Blank separator lines are returned as empty strings.
2225fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
2226    let mut lines: Vec<String> = Vec::new();
2227
2228    let app_display = if bucket.app.is_empty() {
2229        "_global_"
2230    } else {
2231        &bucket.app
2232    };
2233    lines.push(format!(
2234        "djogi migrations verify — {}/{}",
2235        bucket.database, app_display
2236    ));
2237    lines.push("──────────────────────────────────────────".to_string());
2238
2239    match (
2240        &report.latest_applied_version,
2241        report.applied_count,
2242        report.unfinished_count,
2243    ) {
2244        (Some(version), applied, 0) => {
2245            lines.push(format!("Ledger: {applied} applied, latest {version}"));
2246        }
2247        (Some(version), applied, unfinished) => {
2248            lines.push(format!(
2249                "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
2250            ));
2251        }
2252        (None, 0, 0) => {
2253            lines.push("Ledger: empty (no migrations applied yet)".to_string());
2254        }
2255        _ => {}
2256    }
2257    lines.push(String::new());
2258
2259    if report.diagnostics.is_empty() {
2260        lines.push("No drift detected. Schema is consistent.".to_string());
2261    } else {
2262        for d in &report.diagnostics {
2263            let severity = match d.severity {
2264                VerifySeverity::Info => "INFO",
2265                VerifySeverity::Warning => "WARN",
2266                VerifySeverity::Error => "ERROR",
2267            };
2268            let location = d.location.as_deref().unwrap_or("-");
2269            lines.push(format!(
2270                "[{severity}] {code} ({loc}): {msg}",
2271                severity = severity,
2272                code = d.code,
2273                loc = location,
2274                msg = d.message
2275            ));
2276        }
2277    }
2278
2279    let errors = report
2280        .diagnostics
2281        .iter()
2282        .filter(|d| d.severity == VerifySeverity::Error)
2283        .count();
2284    let warnings = report
2285        .diagnostics
2286        .iter()
2287        .filter(|d| d.severity == VerifySeverity::Warning)
2288        .count();
2289    let infos = report
2290        .diagnostics
2291        .iter()
2292        .filter(|d| d.severity == VerifySeverity::Info)
2293        .count();
2294
2295    if errors > 0 {
2296        lines.push(String::new());
2297        lines.push(format!(
2298            "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
2299        ));
2300    } else if warnings > 0 {
2301        lines.push(String::new());
2302        lines.push(format!(
2303            "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
2304        ));
2305    } else {
2306        lines.push(String::new());
2307        lines.push(format!("Result: PASSED ({infos} info(s))"));
2308    }
2309
2310    lines
2311}
2312
2313// ── repair subcommand dispatch ────────────────────────────────────────────
2314
2315impl From<PartialApplyResolutionCli> for PartialApplyResolution {
2316    fn from(cli: PartialApplyResolutionCli) -> Self {
2317        match cli {
2318            PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
2319            PartialApplyResolutionCli::Faked => Self::MarkFaked,
2320            PartialApplyResolutionCli::Applied => Self::MarkApplied,
2321        }
2322    }
2323}
2324
2325/// `djogi migrations repair <subcommand>` entry point.
2326/// Routes each subcommand to its glue function. The glue functions own
2327/// the runtime / config / pool / lock / report-render lifecycle; this
2328/// router only destructures the parsed clap variant.
2329pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
2330    match command {
2331        RepairSubcommand::ChecksumDrift {
2332            version,
2333            app,
2334            database,
2335            checksum_up,
2336            checksum_down,
2337            workspace,
2338        } => repair_checksum_drift_cmd(
2339            &version,
2340            app.as_deref(),
2341            database.as_deref(),
2342            checksum_up.as_deref(),
2343            checksum_down.as_deref(),
2344            workspace,
2345        ),
2346        RepairSubcommand::PartialApply {
2347            version,
2348            resolution,
2349            note,
2350            app,
2351            database,
2352            workspace,
2353        } => repair_partial_apply_cmd(
2354            &version,
2355            resolution.into(),
2356            &note,
2357            app.as_deref(),
2358            database.as_deref(),
2359            workspace,
2360        ),
2361        RepairSubcommand::ResumePartial {
2362            version,
2363            app,
2364            database,
2365            workspace,
2366            node_id,
2367            single_node_dev,
2368        } => repair_resume_partial_apply_cmd(
2369            &version,
2370            app.as_deref(),
2371            database.as_deref(),
2372            workspace,
2373            node_id,
2374            single_node_dev,
2375        ),
2376        RepairSubcommand::SnapshotRebuild {
2377            app,
2378            database,
2379            snapshot_path,
2380            workspace,
2381        } => repair_snapshot_rebuild_cmd(
2382            app.as_deref(),
2383            database.as_deref(),
2384            snapshot_path.as_deref(),
2385            workspace,
2386        ),
2387    }
2388}
2389
2390/// Render a [`RepairReport`] to stdout. Shared across all four repair
2391/// glue functions so the operator sees a consistent action / ledger /
2392/// snapshot summary regardless of which repair ran.
2393fn render_repair_report(report: &RepairReport) {
2394    for action in &report.actions_taken {
2395        println!("  {action}");
2396    }
2397    if !report.ledger_changes.is_empty() {
2398        println!("Ledger changes:");
2399        for lc in &report.ledger_changes {
2400            println!(
2401                "  {} | {} | {} -> {}",
2402                lc.version, lc.column, lc.before, lc.after,
2403            );
2404        }
2405    }
2406    if !report.snapshot_changes.is_empty() {
2407        println!("Snapshot changes:");
2408        for sc in &report.snapshot_changes {
2409            println!("  {} | {}", sc.path.display(), sc.description);
2410        }
2411    }
2412}
2413
2414/// Map a [`RepairError`] onto the CLI exit-code contract.
2415/// `RepairError` is NOT `#[non_exhaustive]`, so this match is
2416/// **exhaustive with NO `_ =>` wildcard** by deliberate design: a future
2417/// variant breaks compilation here, forcing a conscious exit-code
2418/// classification rather than silently bucketing an unclassified error.
2419/// Classification rule — when a new variant is added, classify it the
2420/// same way:
2421/// - **Exit 1 (retryable):** variants wrapping a transient I/O /
2422///   connection / pool / SQL failure (a `source: DjogiError`, snapshot
2423///   filesystem I/O, or advisory-lock contention). A retry may succeed.
2424/// - **Exit 2 (refusal):** structural refusals and ledger-logic guards
2425///   that require operator intervention. A blind retry hits the same
2426///   refusal.
2427fn repair_error_exit_code(err: &RepairError) -> i32 {
2428    match err {
2429        // ── Exit 1: transient I/O / connection / pool / SQL failures.
2430        // These wrap a DjogiError (network, connection, query) or a
2431        // filesystem error and may succeed on retry.
2432        RepairError::LedgerIo { .. }                  // ledger DB I/O
2433        | RepairError::SnapshotIo { .. }              // snapshot filesystem I/O
2434        | RepairError::AdvisoryLockFailed { .. }      // lock held by a concurrent runner; retry after it releases
2435        | RepairError::AdvisoryLockQueryFailed { .. } // pg_try_advisory_lock query itself errored
2436        | RepairError::PinnedSessionCheckoutFailed { .. } // could not check out a pinned session from the pool
2437        | RepairError::ResumeStepFailed { .. }        // a replayed statement failed; partial state recorded, retryable
2438        | RepairError::ResumeProgressAckFailed { .. } // step committed but the progress ack write failed; retryable
2439        => 1,
2440
2441        // ── Exit 2: refusals and structural / ledger-logic guards.
2442        // The operator must investigate and intervene; a blind retry
2443        // would hit the same refusal.
2444        RepairError::VersionNotFound { .. }
2445        | RepairError::InsufficientConfirmation
2446        | RepairError::InvalidChecksum { .. }
2447        | RepairError::InvalidResolution { .. }
2448        | RepairError::BucketAppMismatch { .. }
2449        | RepairError::PlanVersionMismatch { .. }
2450        | RepairError::PlanChecksumMismatch { .. }
2451        | RepairError::LeafIdentityMismatch { .. }
2452        | RepairError::NothingToResume { .. }
2453        | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
2454        | RepairError::SuppliedSnapshotDiverges { .. }
2455        | RepairError::AdvisoryUnlockReturnedFalse { .. } // session-pinning correctness failure — not a blind retry
2456        | RepairError::ResumePlanShapeMismatch { .. }
2457        | RepairError::ReplayPlanShapeMismatch { .. }
2458        | RepairError::PhaseZeroArtifactRefused { .. }  // #386: refusal — operator must replace the stale file
2459        | RepairError::MissingResumeIdentity { .. }     // #386: refusal — operator must supply identity for resume
2460        => 2,
2461    }
2462}
2463
2464/// Resolve the database name for bucket construction. Uses the explicit
2465/// `--database` flag if provided, otherwise defaults to `"main"` (the
2466/// global database name — see [`djogi::apps::AppDescriptor::GLOBAL_DATABASE`]).
2467/// `_config` is threaded so this single helper can grow a config-driven
2468/// default database (should `DjogiConfig` gain one) without changing
2469/// every call site.
2470fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
2471    database.unwrap_or("main").to_string()
2472}
2473
2474/// Compute the `V1:`-prefixed checksum of a committed up SQL file on disk,
2475/// using the canonical fragment-level domain (strips the composed-file
2476/// header and label comments, matching what compose stores in the ledger).
2477/// The naive whole-file checksum is WRONG here: compose stores checksums
2478/// computed over the [`djogi::migrate::OperationSql`] fragments only,
2479/// without the rendered file's `-- Djogi composed migration — up` header
2480/// or the per-statement label comment lines. Recomputing over the full
2481/// file content would never match the ledger value, so the drift repair
2482/// would write a checksum that immediately re-drifts. Delegating to
2483/// [`djogi::migrate::compute_committed_sql_checksum`] keeps the CLI's
2484/// recompute path in the same domain as compose.
2485/// Returns the underlying [`std::io::Error`] unchanged so the caller can
2486/// surface a missing/unreadable up file as a retryable I/O error.
2487fn compute_checksum_up_from_disk(
2488    workspace: &Path,
2489    bucket: &djogi::migrate::BucketKey,
2490    version: &str,
2491) -> std::io::Result<String> {
2492    let path =
2493        djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
2494    let sql = std::fs::read_to_string(&path)?;
2495    Ok(djogi::migrate::compute_committed_sql_checksum(
2496        &sql,
2497        djogi::migrate::ResetSqlSide::Up,
2498    ))
2499}
2500
2501/// Compute the canonical checksum of a committed down SQL file on disk,
2502/// using the same fragment-level domain as compose (see
2503/// [`compute_checksum_up_from_disk`] for why the whole-file checksum is
2504/// wrong).
2505/// Returns `Ok(None)` when the file is absent
2506/// ([`std::io::ErrorKind::NotFound`]) or when the file contains only SQL
2507/// comments — both map onto compose's `NULL` `checksum_down` sentinel for
2508/// comment-only down files. Returns `Err` for any other I/O failure so a
2509/// retry after the file is restored can succeed.
2510fn compute_checksum_down_from_disk(
2511    workspace: &Path,
2512    bucket: &djogi::migrate::BucketKey,
2513    version: &str,
2514) -> std::io::Result<Option<String>> {
2515    let path =
2516        djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
2517    let sql = match std::fs::read_to_string(&path) {
2518        Ok(s) => s,
2519        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
2520        Err(e) => return Err(e),
2521    };
2522    Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
2523}
2524
2525/// `djogi migrations repair checksum-drift` entry point.
2526/// Updates the `checksum_up` / `checksum_down` columns of an
2527/// already-applied ledger row after its committed SQL was edited. When
2528/// `--checksum-up` / `--checksum-down` are omitted, the checksums are
2529/// recomputed from the committed files on disk (a missing down file is a
2530/// no-op; any other read error aborts with exit 1).
2531pub fn repair_checksum_drift_cmd(
2532    version: &str,
2533    app: Option<&str>,
2534    database: Option<&str>,
2535    checksum_up: Option<&str>,
2536    checksum_down: Option<&str>,
2537    workspace: Option<PathBuf>,
2538) -> ExitCode {
2539    let workspace = resolve_workspace(workspace);
2540    let runtime = match tokio::runtime::Builder::new_current_thread()
2541        .enable_all()
2542        .build()
2543    {
2544        Ok(r) => r,
2545        Err(e) => {
2546            eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
2547            return ExitCode::from(1);
2548        }
2549    };
2550    let exit = runtime.block_on(async {
2551        run_repair_checksum_drift(
2552            &workspace,
2553            version,
2554            app,
2555            database,
2556            checksum_up,
2557            checksum_down,
2558        )
2559        .await
2560    });
2561    ExitCode::from(exit as u8)
2562}
2563
2564/// Async body of [`repair_checksum_drift_cmd`]. Returns the desired exit code.
2565async fn run_repair_checksum_drift(
2566    workspace: &Path,
2567    version: &str,
2568    app: Option<&str>,
2569    database: Option<&str>,
2570    checksum_up: Option<&str>,
2571    checksum_down: Option<&str>,
2572) -> i32 {
2573    use djogi::config::DjogiConfig;
2574
2575    let config = match DjogiConfig::load_from_workspace(workspace) {
2576        Ok(c) => c,
2577        Err(e) => {
2578            eprintln!("djogi migrations repair checksum-drift: config load: {e}");
2579            return 1;
2580        }
2581    };
2582
2583    // Resolve the per-database URL BEFORE connecting: `--database
2584    // crud_log` / `event_log` operate on a different bucket's ledger than
2585    // the app DB, so connecting to `config.database.url` first would
2586    // silently mutate the wrong database.
2587    let db_name = resolve_database(database, &config);
2588    let url = match resolve_bucket_url(&config.database, &db_name) {
2589        Some(u) => u,
2590        None => {
2591            eprintln!(
2592                "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
2593            );
2594            return 2;
2595        }
2596    };
2597
2598    let mut ctx = match connect_and_check(&url).await {
2599        ContextOutcome::Ready(ctx) => ctx,
2600        ContextOutcome::UnsupportedVersion(e) => {
2601            crate::print_support_boundary_error("migrations repair checksum-drift", &e);
2602            return 2;
2603        }
2604        ContextOutcome::RuntimeError(msg) => {
2605            eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
2606            return 1;
2607        }
2608    };
2609
2610    let lock_path = workspace.join(LOCK_FILE_NAME);
2611    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2612        Ok(g) => g,
2613        Err(e) => {
2614            eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
2615            return 1;
2616        }
2617    };
2618
2619    let app_label = app.unwrap_or("");
2620    let bucket = BucketKey {
2621        database: db_name,
2622        app: app_label.to_string(),
2623    };
2624
2625    let new_checksum_up = match checksum_up {
2626        Some(c) => c.to_string(),
2627        None => {
2628            // Auto-compute from the committed up SQL file on disk. A
2629            // missing or unreadable up file is an environment I/O error,
2630            // not an operator-facing refusal — exit 1 (same class as the
2631            // down file's non-NotFound branch below), so a retry after
2632            // the file is restored can succeed.
2633            match compute_checksum_up_from_disk(workspace, &bucket, version) {
2634                Ok(cs) => cs,
2635                Err(e) => {
2636                    eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
2637                    return 1;
2638                }
2639            }
2640        }
2641    };
2642
2643    let resolved_checksum_down = match checksum_down {
2644        Some(c) => Some(c.to_string()),
2645        None => {
2646            // Auto-compute from the down file; a missing down file (or a
2647            // comment-only down file) is a no-op (no down checksum), other
2648            // read errors surface. NotFound is folded into `Ok(None)` by
2649            // `compute_checksum_down_from_disk`.
2650            match compute_checksum_down_from_disk(workspace, &bucket, version) {
2651                Ok(cs_opt) => cs_opt,
2652                Err(e) => {
2653                    eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
2654                    return 1;
2655                }
2656            }
2657        }
2658    };
2659
2660    match repair_checksum_drift(
2661        &mut ctx,
2662        &guard,
2663        &bucket,
2664        version,
2665        workspace,
2666        &new_checksum_up,
2667        resolved_checksum_down.as_deref(),
2668        RepairConfirmation::OperatorAcknowledged,
2669    )
2670    .await
2671    {
2672        Ok(report) => {
2673            render_repair_report(&report);
2674            0
2675        }
2676        Err(e) => {
2677            eprintln!("djogi migrations repair checksum-drift: {e}");
2678            repair_error_exit_code(&e)
2679        }
2680    }
2681}
2682
2683/// `djogi migrations repair partial-apply` entry point.
2684/// Resolves a partial-apply ledger row by rewriting its status to
2685/// `rolled_back`, `faked`, or `applied`. No SQL executes — only the
2686/// ledger row is mutated.
2687pub fn repair_partial_apply_cmd(
2688    version: &str,
2689    resolution: PartialApplyResolution,
2690    note: &str,
2691    app: Option<&str>,
2692    database: Option<&str>,
2693    workspace: Option<PathBuf>,
2694) -> ExitCode {
2695    let workspace = resolve_workspace(workspace);
2696    let runtime = match tokio::runtime::Builder::new_current_thread()
2697        .enable_all()
2698        .build()
2699    {
2700        Ok(r) => r,
2701        Err(e) => {
2702            eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
2703            return ExitCode::from(1);
2704        }
2705    };
2706    let exit = runtime.block_on(async {
2707        run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
2708    });
2709    ExitCode::from(exit as u8)
2710}
2711
2712/// Async body of [`repair_partial_apply_cmd`]. Returns the desired exit code.
2713async fn run_repair_partial_apply(
2714    workspace: &Path,
2715    version: &str,
2716    resolution: PartialApplyResolution,
2717    note: &str,
2718    app: Option<&str>,
2719    database: Option<&str>,
2720) -> i32 {
2721    use djogi::config::DjogiConfig;
2722
2723    let config = match DjogiConfig::load_from_workspace(workspace) {
2724        Ok(c) => c,
2725        Err(e) => {
2726            eprintln!("djogi migrations repair partial-apply: config load: {e}");
2727            return 1;
2728        }
2729    };
2730
2731    // Resolve the per-database URL BEFORE connecting: `--database
2732    // crud_log` / `event_log` operate on a different bucket's ledger than
2733    // the app DB, so connecting to `config.database.url` first would
2734    // silently mutate the wrong database.
2735    let db_name = resolve_database(database, &config);
2736    let url = match resolve_bucket_url(&config.database, &db_name) {
2737        Some(u) => u,
2738        None => {
2739            eprintln!(
2740                "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
2741            );
2742            return 2;
2743        }
2744    };
2745
2746    let mut ctx = match connect_and_check(&url).await {
2747        ContextOutcome::Ready(ctx) => ctx,
2748        ContextOutcome::UnsupportedVersion(e) => {
2749            crate::print_support_boundary_error("migrations repair partial-apply", &e);
2750            return 2;
2751        }
2752        ContextOutcome::RuntimeError(msg) => {
2753            eprintln!("djogi migrations repair partial-apply: pool: {msg}");
2754            return 1;
2755        }
2756    };
2757
2758    let lock_path = workspace.join(LOCK_FILE_NAME);
2759    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2760        Ok(g) => g,
2761        Err(e) => {
2762            eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
2763            return 1;
2764        }
2765    };
2766
2767    let app_label = app.unwrap_or("");
2768    let bucket = BucketKey {
2769        database: db_name,
2770        app: app_label.to_string(),
2771    };
2772
2773    match repair_partial_apply(
2774        &mut ctx,
2775        &guard,
2776        &bucket,
2777        version,
2778        workspace,
2779        resolution,
2780        note,
2781        RepairConfirmation::OperatorAcknowledged,
2782    )
2783    .await
2784    {
2785        Ok(report) => {
2786            render_repair_report(&report);
2787            0
2788        }
2789        Err(e) => {
2790            eprintln!("djogi migrations repair partial-apply: {e}");
2791            repair_error_exit_code(&e)
2792        }
2793    }
2794}
2795
2796/// `djogi migrations repair resume-partial` entry point.
2797/// Resumes an interrupted non-transactional apply by loading the
2798/// committed `<version>.plan.json` and replaying its remaining steps.
2799/// Loads the committed plan directly (no CLI-level checksum pre-gate);
2800/// the library validates the plan against the ledger row internally.
2801pub fn repair_resume_partial_apply_cmd(
2802    version: &str,
2803    app: Option<&str>,
2804    database: Option<&str>,
2805    workspace: Option<PathBuf>,
2806    node_id: Option<u32>,
2807    single_node_dev: bool,
2808) -> ExitCode {
2809    let workspace = resolve_workspace(workspace);
2810    let runtime = match tokio::runtime::Builder::new_current_thread()
2811        .enable_all()
2812        .build()
2813    {
2814        Ok(r) => r,
2815        Err(e) => {
2816            eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
2817            return ExitCode::from(1);
2818        }
2819    };
2820    let exit = runtime.block_on(async {
2821        run_repair_resume_partial(&workspace, version, app, database, node_id, single_node_dev)
2822            .await
2823    });
2824    ExitCode::from(exit as u8)
2825}
2826
2827/// Async body of [`repair_resume_partial_apply_cmd`]. Returns the desired exit code.
2828async fn run_repair_resume_partial(
2829    workspace: &Path,
2830    version: &str,
2831    app: Option<&str>,
2832    database: Option<&str>,
2833    node_id: Option<u32>,
2834    single_node_dev: bool,
2835) -> i32 {
2836    use djogi::config::DjogiConfig;
2837
2838    let config = match DjogiConfig::load_from_workspace(workspace) {
2839        Ok(c) => c,
2840        Err(e) => {
2841            eprintln!("djogi migrations repair resume-partial: config load: {e}");
2842            return 1;
2843        }
2844    };
2845
2846    // Resolve node identity before any DB work.
2847    let runner_identity = match crate::identity::resolve_identity(
2848        node_id,
2849        single_node_dev,
2850        &config.profile,
2851        "repair resume-partial",
2852    ) {
2853        Ok(resolved) => Some(resolved.into_runner_identity()),
2854        Err(e) => {
2855            let _ = crate::identity::print_identity_error("repair resume-partial", &e);
2856            return 2;
2857        }
2858    };
2859
2860    // Resolve the per-database URL BEFORE connecting: `--database
2861    // crud_log` / `event_log` operate on a different bucket's ledger than
2862    // the app DB, so connecting to `config.database.url` first would
2863    // silently mutate the wrong database.
2864    let db_name = resolve_database(database, &config);
2865    let url = match resolve_bucket_url(&config.database, &db_name) {
2866        Some(u) => u,
2867        None => {
2868            eprintln!(
2869                "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
2870            );
2871            return 2;
2872        }
2873    };
2874
2875    let mut ctx = match connect_and_check(&url).await {
2876        ContextOutcome::Ready(ctx) => ctx,
2877        ContextOutcome::UnsupportedVersion(e) => {
2878            crate::print_support_boundary_error("migrations repair resume-partial", &e);
2879            return 2;
2880        }
2881        ContextOutcome::RuntimeError(msg) => {
2882            eprintln!("djogi migrations repair resume-partial: pool: {msg}");
2883            return 1;
2884        }
2885    };
2886
2887    let lock_path = workspace.join(LOCK_FILE_NAME);
2888    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2889        Ok(g) => g,
2890        Err(e) => {
2891            eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
2892            return 1;
2893        }
2894    };
2895
2896    let app_label = app.unwrap_or("");
2897    let bucket = BucketKey {
2898        database: db_name,
2899        app: app_label.to_string(),
2900    };
2901
2902    // Load the committed replay plan directly from disk — no CLI-level
2903    // checksum pre-gate, because repair_resume_partial_apply validates
2904    // plan↔ledger checksums itself. Synthesizing a whole-file checksum
2905    // here would not match the per-statement-fragment checksums stored
2906    // in the plan JSON.
2907    let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
2908        Ok(p) => p,
2909        Err(e) => {
2910            eprintln!("djogi migrations repair resume-partial: load plan: {e}");
2911            return 2;
2912        }
2913    };
2914
2915    match repair_resume_partial_apply(
2916        &mut ctx,
2917        &guard,
2918        workspace,
2919        version,
2920        &plan,
2921        runner_identity,
2922        RepairConfirmation::OperatorAcknowledged,
2923    )
2924    .await
2925    {
2926        Ok(report) => {
2927            render_repair_report(&report);
2928            0
2929        }
2930        Err(e) => {
2931            eprintln!("djogi migrations repair resume-partial: {e}");
2932            repair_error_exit_code(&e)
2933        }
2934    }
2935}
2936
2937/// Load the committed `<version>.plan.json` for `resume-partial` without
2938/// the CLI-level checksum pre-gate.
2939/// [`repair_resume_partial_apply`] validates the plan against the ledger
2940/// row internally (`PlanVersionMismatch` / `PlanChecksumMismatch`), so
2941/// re-gating here with a hand-rolled whole-file checksum would be both
2942/// wrong (the plan stores per-statement-fragment checksums) and
2943/// redundant. This helper therefore deliberately does NOT reuse
2944/// [`load_replay_plan_from_disk`] (a pending-apply helper that DOES
2945/// checksum-gate) — it reuses only that function's `CliReplay*`
2946/// deserialization + segment-conversion shape.
2947/// Returns a human-readable error string on a missing/unparseable plan
2948/// file or a format-version mismatch. A missing plan file maps to exit 2
2949/// at the call site (the committed plan is a precondition of resume).
2950fn load_committed_plan_for_resume(
2951    workspace: &Path,
2952    bucket: &djogi::migrate::BucketKey,
2953    version: &str,
2954) -> Result<djogi::migrate::MigrationPlan, String> {
2955    let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
2956    let plan_path = bucket_dir.join(format!("{version}.plan.json"));
2957    let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
2958    let stored: CliReplayPlan = serde_json::from_slice(&bytes)
2959        .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
2960    if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
2961        return Err(format!(
2962            "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
2963            plan_path.display(),
2964            stored.format_version,
2965        ));
2966    }
2967    Ok(djogi::migrate::MigrationPlan {
2968        bucket: bucket.clone(),
2969        classification: stored.classification.into(),
2970        segments: stored
2971            .segments
2972            .into_iter()
2973            .map(|seg| djogi::migrate::Segment {
2974                kind: seg.kind.into(),
2975                statements: seg
2976                    .statements
2977                    .into_iter()
2978                    .map(|stmt| djogi::migrate::OperationSql {
2979                        label: stmt.label,
2980                        up: stmt.up,
2981                        down: String::new(),
2982                        lossy: None,
2983                    })
2984                    .collect(),
2985            })
2986            .collect(),
2987    })
2988}
2989
2990/// `djogi migrations repair snapshot-rebuild` entry point.
2991/// Rebuilds a bucket's schema snapshot by walking the ledger and
2992/// re-projecting from live database state. When `--snapshot-path` is
2993/// omitted, the path is derived from
2994/// `migrations/<database>/<app>/schema_snapshot.json`.
2995pub fn repair_snapshot_rebuild_cmd(
2996    app: Option<&str>,
2997    database: Option<&str>,
2998    snapshot_path: Option<&Path>,
2999    workspace: Option<PathBuf>,
3000) -> ExitCode {
3001    let workspace = resolve_workspace(workspace);
3002    let runtime = match tokio::runtime::Builder::new_current_thread()
3003        .enable_all()
3004        .build()
3005    {
3006        Ok(r) => r,
3007        Err(e) => {
3008            eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
3009            return ExitCode::from(1);
3010        }
3011    };
3012    let exit = runtime.block_on(async {
3013        run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
3014    });
3015    ExitCode::from(exit as u8)
3016}
3017
3018/// Async body of [`repair_snapshot_rebuild_cmd`]. Returns the desired exit code.
3019async fn run_repair_snapshot_rebuild(
3020    workspace: &Path,
3021    app: Option<&str>,
3022    database: Option<&str>,
3023    snapshot_path: Option<&Path>,
3024) -> i32 {
3025    use djogi::config::DjogiConfig;
3026
3027    let config = match DjogiConfig::load_from_workspace(workspace) {
3028        Ok(c) => c,
3029        Err(e) => {
3030            eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
3031            return 1;
3032        }
3033    };
3034
3035    // Resolve the per-database URL BEFORE connecting: `--database
3036    // crud_log` / `event_log` operate on a different bucket's ledger than
3037    // the app DB, so connecting to `config.database.url` first would
3038    // silently rebuild the snapshot from the wrong database.
3039    let db_name = resolve_database(database, &config);
3040    let url = match resolve_bucket_url(&config.database, &db_name) {
3041        Some(u) => u,
3042        None => {
3043            eprintln!(
3044                "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
3045            );
3046            return 2;
3047        }
3048    };
3049
3050    let mut ctx = match connect_and_check(&url).await {
3051        ContextOutcome::Ready(ctx) => ctx,
3052        ContextOutcome::UnsupportedVersion(e) => {
3053            crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
3054            return 2;
3055        }
3056        ContextOutcome::RuntimeError(msg) => {
3057            eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
3058            return 1;
3059        }
3060    };
3061
3062    let lock_path = workspace.join(LOCK_FILE_NAME);
3063    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3064        Ok(g) => g,
3065        Err(e) => {
3066            eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
3067            return 1;
3068        }
3069    };
3070
3071    let app_label = app.unwrap_or("");
3072    let bucket = BucketKey {
3073        database: db_name,
3074        app: app_label.to_string(),
3075    };
3076
3077    let snap_path = match snapshot_path {
3078        Some(p) => p.to_path_buf(),
3079        None => reconstruct_snapshot_path(workspace, &bucket),
3080    };
3081
3082    match repair_snapshot_rebuild(
3083        &mut ctx,
3084        &guard,
3085        &bucket,
3086        &snap_path,
3087        RepairConfirmation::OperatorAcknowledged,
3088    )
3089    .await
3090    {
3091        Ok(report) => {
3092            render_repair_report(&report);
3093            0
3094        }
3095        Err(e) => {
3096            eprintln!("djogi migrations repair snapshot-rebuild: {e}");
3097            repair_error_exit_code(&e)
3098        }
3099    }
3100}
3101
3102// ── baseline command ──────────────────────────────────────────────────────
3103
3104/// `djogi migrations baseline` entry point.
3105/// Establishes a baseline ledger row + snapshot for an existing
3106/// database adopted under Djogi's migration ledger. The schema already
3107/// exists, so `compose` + `apply` cannot run against the populated
3108/// database without a starting point; baseline projects the live
3109/// catalog into a single `baseline` ledger row (no SQL runs against
3110/// user tables) and persists the projected snapshot as the canonical
3111/// baseline so future migrations diff against the real DB state.
3112/// `--reason` is required and must be non-empty — it is recorded in the
3113/// ledger row's `partial_apply_note` for the audit trail. An empty
3114/// reason is a refusal (exit 2) caught before any DB work.
3115/// Exit codes: `0` success, `1` runtime error (config / pool / projection
3116/// failure), `2` refusal (empty `--reason`, unresolvable database URL,
3117/// duplicate version collision, snapshot-persist failure after ledger
3118/// insert, session-pinning correctness failure, or below PG 18).
3119#[expect(
3120    clippy::too_many_arguments,
3121    reason = "CLI command entry point mirrors clap arguments explicitly"
3122)]
3123pub fn baseline_cmd(
3124    version: &str,
3125    description: &str,
3126    reason: &str,
3127    app: Option<&str>,
3128    database: Option<&str>,
3129    workspace: Option<PathBuf>,
3130    node_id: Option<u32>,
3131    single_node_dev: bool,
3132) -> ExitCode {
3133    // Validate --reason before any expensive work, mirroring the
3134    // `apply --fake --reason` empty-reason gate. The library's
3135    // baseline_plan does not itself reject an empty reason (it records
3136    // whatever string it is handed), so the CLI owns this guard.
3137    if reason.trim().is_empty() {
3138        eprintln!(
3139            "djogi migrations baseline: --reason must not be empty; \
3140             supply a non-empty reason why this baseline is being established \
3141             (e.g. 'schema pre-exists from prior tooling'). \
3142             This is recorded in the ledger audit trail."
3143        );
3144        return ExitCode::from(2);
3145    }
3146
3147    let workspace = resolve_workspace(workspace);
3148    let runtime = match tokio::runtime::Builder::new_current_thread()
3149        .enable_all()
3150        .build()
3151    {
3152        Ok(r) => r,
3153        Err(e) => {
3154            eprintln!("djogi migrations baseline: tokio runtime: {e}");
3155            return ExitCode::from(1);
3156        }
3157    };
3158    let exit = runtime.block_on(async {
3159        run_baseline(
3160            &workspace,
3161            version,
3162            description,
3163            reason,
3164            app,
3165            database,
3166            node_id,
3167            single_node_dev,
3168        )
3169        .await
3170    });
3171    ExitCode::from(exit as u8)
3172}
3173
3174/// Async body of [`baseline_cmd`]. Returns the desired exit code.
3175/// Resolves the per-database URL BEFORE connecting (a `--database
3176/// crud_log` / `event_log` baseline targets a different bucket's ledger
3177/// than the app DB), connects + runs the PG-version preflight via
3178/// [`connect_and_check`], acquires the workspace file lock, then drives
3179/// [`baseline_plan`]. The runner projects the live schema itself and
3180/// computes the baseline checksum from that projection, so the
3181/// `RunnerCtx` is constructed with `snapshot: None` (requires the
3182/// caller NOT supply a snapshot) and an empty `checksum_up` (the
3183/// baseline path never reads it).
3184#[expect(
3185    clippy::too_many_arguments,
3186    reason = "baseline async body keeps CLI arguments explicit through validation and connection setup"
3187)]
3188async fn run_baseline(
3189    workspace: &Path,
3190    version: &str,
3191    description: &str,
3192    reason: &str,
3193    app: Option<&str>,
3194    database: Option<&str>,
3195    node_id: Option<u32>,
3196    single_node_dev: bool,
3197) -> i32 {
3198    use djogi::config::DjogiConfig;
3199
3200    let config = match DjogiConfig::load_from_workspace(workspace) {
3201        Ok(c) => c,
3202        Err(e) => {
3203            eprintln!("djogi migrations baseline: config load: {e}");
3204            return 1;
3205        }
3206    };
3207
3208    // Resolve node identity before any DB work.
3209    let runner_identity = match crate::identity::resolve_identity(
3210        node_id,
3211        single_node_dev,
3212        &config.profile,
3213        "baseline",
3214    ) {
3215        Ok(resolved) => Some(resolved.into_runner_identity()),
3216        Err(e) => {
3217            let _ = crate::identity::print_identity_error("baseline", &e);
3218            return 2;
3219        }
3220    };
3221
3222    // Resolve the per-database URL BEFORE connecting: `--database
3223    // crud_log` / `event_log` operate on a different bucket's ledger
3224    // than the app DB, so connecting to `config.database.url` first
3225    // would silently baseline the wrong database.
3226    let db_name = resolve_database(database, &config);
3227    let url = match resolve_bucket_url(&config.database, &db_name) {
3228        Some(u) => u,
3229        None => {
3230            eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
3231            return 2;
3232        }
3233    };
3234
3235    let mut ctx = match connect_and_check(&url).await {
3236        ContextOutcome::Ready(ctx) => ctx,
3237        ContextOutcome::UnsupportedVersion(e) => {
3238            crate::print_support_boundary_error("migrations baseline", &e);
3239            return 2;
3240        }
3241        ContextOutcome::RuntimeError(msg) => {
3242            eprintln!("djogi migrations baseline: pool: {msg}");
3243            return 1;
3244        }
3245    };
3246
3247    let lock_path = workspace.join(LOCK_FILE_NAME);
3248    let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3249        Ok(g) => g,
3250        Err(e) => {
3251            eprintln!("djogi migrations baseline: workspace lock: {e}");
3252            return 1;
3253        }
3254    };
3255
3256    let app_label = app.unwrap_or("");
3257    let bucket = BucketKey {
3258        database: db_name,
3259        app: app_label.to_string(),
3260    };
3261
3262    let runner_ctx = RunnerCtx {
3263        bucket: bucket.clone(),
3264        version: version.to_string(),
3265        description: description.to_string(),
3266        // baseline_plan computes checksum_up from the live projection;
3267        // this field is not read on the baseline code path.
3268        checksum_up: String::new(),
3269        checksum_down: None,
3270        // baseline_plan refuses a caller-supplied snapshot — it
3271        // projects the live DB itself. Leave this None; the projection
3272        // is persisted to `snapshot_path` below.
3273        snapshot: None,
3274        snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
3275        // MigrateConfig does not derive Clone; construct from fields
3276        // (same pattern as apply_one_pending).
3277        config: djogi::config::MigrateConfig {
3278            concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
3279            strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
3280            pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
3281            pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
3282        },
3283        out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
3284        audit_pool: match djogi::migrate::resolve_audit_url(&config) {
3285            Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
3286            Err(_) => None,
3287        },
3288        runner_identity,
3289    };
3290
3291    match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
3292        Ok(report) => {
3293            println!(
3294                "djogi migrations baseline: established baseline `{}` \
3295                 (ledger_id={}) in {:.1}s",
3296                version,
3297                report.ledger_id,
3298                report.execution_time_ms as f64 / 1000.0
3299            );
3300            0
3301        }
3302        Err(e) => {
3303            eprintln!("djogi migrations baseline: {e}");
3304            baseline_error_exit_code(&e)
3305        }
3306    }
3307}
3308
3309/// Map a [`RunnerError`] produced by [`baseline_plan`] onto the CLI
3310/// exit-code contract.
3311/// The flat [`runner_error_exit_code`] (always `1`) is wrong for
3312/// baseline: a duplicate-version collision is a refusal the operator
3313/// must resolve by choosing a new version, and a blind retry hits the
3314/// same collision — that must surface as exit `2`, matching the
3315/// `migrations apply` doc-contract ("re-running reports
3316/// `VersionAlreadyApplied` (exit 2)") and the `repair` family's
3317/// [`repair_error_exit_code`] convention.
3318/// `RunnerError` is `#[non_exhaustive]`, so the wildcard arm is
3319/// load-bearing: any variant NOT named below defaults to exit `1`
3320/// (transient — a retry may succeed). That is the safe default for the
3321/// I/O- and connection-shaped variants the baseline path can hit
3322/// (projection failure, ledger bootstrap / write / query failure,
3323/// snapshot persist failure, pinned-session checkout failure,
3324/// advisory-lock contention). Only the genuine refusals are pulled out
3325/// into the exit-`2` arm.
3326fn baseline_error_exit_code(err: &RunnerError) -> i32 {
3327    match err {
3328        // ── Exit 2: refusals — the operator must intervene; a blind
3329        // retry hits the same condition.
3330        // - A duplicate version (terminal or non-terminal) means the
3331        // chosen baseline version is already taken; pick another.
3332        // - A caller-supplied snapshot is a programming error in the
3333        // wiring (the CLI always passes `snapshot: None`), surfaced
3334        // as a structural refusal rather than a retryable fault.
3335        // - An out-of-order rejection is a policy refusal identical to
3336        // the apply path's.
3337        // - AdvisoryUnlockReturnedFalse is a session-pinning correctness
3338        // failure (PG returned false for pg_advisory_unlock); it is not
3339        // transient — matches the repair family's exit-2 treatment.
3340        // - SnapshotPersistFailed in the baseline path is a post-ledger
3341        // failure: baseline_inner inserts the terminal ledger row BEFORE
3342        // writing the snapshot. A retry with the same version therefore
3343        // hits VersionAlreadyApplied (exit 2) before it can write the
3344        // snapshot. Exit 1 (retryable) would give false hope; exit 2
3345        // signals that operator intervention is needed (run
3346        // `repair snapshot-rebuild` or choose a new version).
3347        RunnerError::VersionAlreadyApplied { .. }
3348        | RunnerError::VersionCollisionNonTerminal { .. }
3349        | RunnerError::BaselineSnapshotShouldNotBeProvided
3350        | RunnerError::AdvisoryUnlockReturnedFalse { .. }
3351        | RunnerError::SnapshotPersistFailed { .. }
3352        | RunnerError::OutOfOrderRejected { .. } => 2,
3353        // ── Exit 1: everything else (transient I/O / connection / SQL /
3354        // projection failures). `#[non_exhaustive]` makes this wildcard
3355        // mandatory; new transient-shaped variants inherit the retryable
3356        // default.
3357        _ => 1,
3358    }
3359}
3360
3361#[cfg(test)]
3362mod tests {
3363    use super::*;
3364    use djogi::__bypass::RawAccessExt as _;
3365    use std::fs;
3366    use std::sync::atomic::{AtomicUsize, Ordering};
3367
3368    struct DatabaseUrlEnvGuard {
3369        _lock: std::sync::MutexGuard<'static, ()>,
3370        prior: Option<String>,
3371    }
3372
3373    impl DatabaseUrlEnvGuard {
3374        fn new() -> Self {
3375            Self {
3376                _lock: crate::test_env_lock(),
3377                prior: std::env::var("DATABASE_URL").ok(),
3378            }
3379        }
3380
3381        fn set(&self, value: &str) {
3382            unsafe { std::env::set_var("DATABASE_URL", value) };
3383        }
3384
3385        fn remove(&self) {
3386            unsafe { std::env::remove_var("DATABASE_URL") };
3387        }
3388    }
3389
3390    impl Drop for DatabaseUrlEnvGuard {
3391        fn drop(&mut self) {
3392            match &self.prior {
3393                Some(value) => unsafe { std::env::set_var("DATABASE_URL", value) },
3394                None => unsafe { std::env::remove_var("DATABASE_URL") },
3395            }
3396        }
3397    }
3398
3399    fn temp_workspace(tag: &str) -> std::path::PathBuf {
3400        static COUNTER: AtomicUsize = AtomicUsize::new(0);
3401        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3402        let nanos = std::time::SystemTime::now()
3403            .duration_since(std::time::UNIX_EPOCH)
3404            .unwrap()
3405            .as_nanos();
3406        let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
3407        fs::create_dir_all(&p).unwrap();
3408        p
3409    }
3410
3411    fn write_unreachable_config(work: &std::path::Path) {
3412        let toml = "[database]\nurl = \"postgres://localhost:1/djogi_unreachable\"\n\
3413                    max_connections = 1\ndev_mode = false\n\
3414                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3415        fs::write(work.join("Djogi.toml"), toml).unwrap();
3416    }
3417
3418    fn without_database_url<T>(f: impl FnOnce() -> T) -> T {
3419        let env_guard = DatabaseUrlEnvGuard::new();
3420        env_guard.remove();
3421        f()
3422    }
3423
3424    #[test]
3425    fn database_url_env_guard_restores_prior_value() {
3426        let env_guard = DatabaseUrlEnvGuard::new();
3427        let expected = env_guard.prior.clone();
3428        let next = if expected.as_deref() == Some("postgres://from-env/test") {
3429            "postgres://temporary/test"
3430        } else {
3431            "postgres://from-env/test"
3432        };
3433        env_guard.set(next);
3434        drop(env_guard);
3435        assert_eq!(std::env::var("DATABASE_URL").ok(), expected);
3436    }
3437
3438    fn current_production_phase_zero_sql(tag: &str) -> String {
3439        let work = temp_workspace(tag);
3440        let lock_path = work.join(LOCK_FILE_NAME);
3441        let guard = acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT).expect("lock");
3442        let models: std::collections::BTreeMap<
3443            djogi::migrate::BucketKey,
3444            djogi::migrate::AppliedSchema,
3445        > = std::collections::BTreeMap::new();
3446        let apps = vec![AppLifecycle {
3447            label: "billing".to_string(),
3448            database: "main".to_string(),
3449            renamed_from: None,
3450            tombstone: false,
3451        }];
3452        let emitted = djogi::migrate::ensure_phase_zero_emitted(
3453            &work,
3454            &models,
3455            &apps,
3456            time::OffsetDateTime::now_utc(),
3457            &guard,
3458        )
3459        .expect("auto-emit Phase 0");
3460        let sql = fs::read_to_string(&emitted[0].up_sql_path).expect("read emitted Phase 0");
3461        drop(guard);
3462        let _ = fs::remove_dir_all(&work);
3463        sql
3464    }
3465
3466    fn markerless_seed_phase_zero_sql(tag: &str) -> String {
3467        let mut sql = current_production_phase_zero_sql(tag);
3468        sql.push_str("\nINSERT INTO heer.heer_nodes (id) VALUES (1);\n");
3469        sql
3470    }
3471
3472    fn phase_zero_with_seed_statement(tag: &str, statement: &str) -> String {
3473        let mut sql = current_production_phase_zero_sql(tag);
3474        sql.push('\n');
3475        sql.push_str(statement);
3476        sql.push('\n');
3477        sql
3478    }
3479
3480    fn extended_seed_statement_cases() -> [(&'static str, &'static str); 4] {
3481        [
3482            (
3483                "cte_insert",
3484                "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
3485            ),
3486            (
3487                "cte_delete",
3488                "WITH moved AS (DELETE FROM heer.heer_node_state RETURNING *) SELECT 1;",
3489            ),
3490            (
3491                "merge",
3492                "MERGE INTO heer.heer_nodes AS target USING incoming ON false WHEN NOT MATCHED THEN INSERT (id) VALUES (1);",
3493            ),
3494            (
3495                "copy_from",
3496                "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
3497            ),
3498        ]
3499    }
3500
3501    fn generated_stale_phase_zero_sql(tag: &str) -> String {
3502        let mut sql = current_production_phase_zero_sql(tag);
3503        sql.push_str(
3504            "\nALTER DATABASE \"mydb\" SET heer.node_id = '1';\n\
3505             ALTER DATABASE \"mydb\" SET heer.ranj_node_id = '1';\n\
3506             SET heer.node_id = '1';\n\
3507             SET heer.ranj_node_id = '1';\n",
3508        );
3509        sql
3510    }
3511
3512    fn seed_capable_phase_zero_sql() -> String {
3513        djogi::testing::phase_zero_sql_for_testing("main", true)
3514            .expect("compose seed-capable Phase 0")
3515    }
3516
3517    fn write_pending_json(
3518        path: &Path,
3519        database: &str,
3520        app: &str,
3521        version: &str,
3522        depends_on: &[&str],
3523    ) {
3524        let pending = PendingPlan {
3525            format_version: djogi::migrate::PENDING_FORMAT_VERSION.to_string(),
3526            bucket_database: database.to_string(),
3527            bucket_app: app.to_string(),
3528            version: version.to_string(),
3529            slug: "test".to_string(),
3530            model_snapshot: djogi::migrate::AppliedSchema {
3531                djogi_version: "0.1.0".to_string(),
3532                enums: std::collections::BTreeMap::new(),
3533                format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3534                generated_at: "2026-06-06T00:00:00Z".to_string(),
3535                indexes: Vec::new(),
3536                models: std::collections::BTreeMap::new(),
3537                registered_apps: vec![app.to_string()],
3538            },
3539            checksum_up: "V1:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
3540                .to_string(),
3541            checksum_down: None,
3542            composed_at: "2026-06-06T00:00:00Z".to_string(),
3543            depends_on: depends_on.iter().map(|s| s.to_string()).collect(),
3544        };
3545        if let Some(parent) = path.parent() {
3546            fs::create_dir_all(parent).unwrap();
3547        }
3548        fs::write(path, serde_json::to_vec_pretty(&pending).unwrap()).unwrap();
3549    }
3550
3551    /// The CLI's bucket-discovery walk must include directories that exist
3552    /// on disk but are absent from the current model inventory (the
3553    /// renamed-from case).
3554    #[test]
3555    fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
3556        let work = temp_workspace("b1_discover");
3557        // Lay down a `migrations/main/billing/schema_snapshot.json`
3558        // the OLD app's snapshot. The current model inventory
3559        // would NOT have this bucket because the app moved to
3560        // `invoicing` via `#[app(renamed_from = "billing")]`.
3561        let billing_dir = work.join("migrations/main/billing");
3562        fs::create_dir_all(&billing_dir).unwrap();
3563        fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
3564        // A second bucket — the global one for the same database
3565        // exists too. Exercise the multi-bucket walk.
3566        let global_dir = work.join("migrations/main/_global_");
3567        fs::create_dir_all(&global_dir).unwrap();
3568        fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
3569        // A third on-disk directory WITHOUT a snapshot file — must
3570        // not be reported (we only union buckets that actually
3571        // shipped a snapshot).
3572        let no_snap_dir = work.join("migrations/main/empty_app");
3573        fs::create_dir_all(&no_snap_dir).unwrap();
3574
3575        let buckets = discover_snapshot_buckets_on_disk(&work);
3576        let labels: std::collections::BTreeSet<&str> =
3577            buckets.iter().map(|b| b.app.as_str()).collect();
3578        assert!(
3579            labels.contains("billing"),
3580            "must include the renamed-from bucket: {labels:?}"
3581        );
3582        assert!(
3583            labels.contains(""),
3584            "must include the global bucket: {labels:?}"
3585        );
3586        assert!(
3587            !labels.contains("empty_app"),
3588            "must not include directories without a snapshot: {labels:?}"
3589        );
3590        let _ = fs::remove_dir_all(&work);
3591    }
3592
3593    /// The resolved workspace flows into config loading.
3594    /// `load_from_workspace` must read `<workspace>/Djogi.toml` not
3595    /// the cwd's. We assert that by writing a custom config with a
3596    /// distinctive `database.url` and confirming the loader sees it.
3597    #[test]
3598    fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
3599        let work = temp_workspace("a1_workspace_config");
3600        let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
3601                    max_connections = 1\ndev_mode = false\n\
3602                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3603        fs::write(work.join("Djogi.toml"), toml).unwrap();
3604        let env_guard = DatabaseUrlEnvGuard::new();
3605        env_guard.remove();
3606        let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
3607        assert_eq!(
3608            config.database.url,
3609            "postgres://discovered-by-workspace-flag/test"
3610        );
3611        assert_eq!(config.server.port, 1234);
3612        let _ = fs::remove_dir_all(&work);
3613    }
3614
3615    /// Env override precedence: A `DATABASE_URL` in the environment
3616    /// must beat any value in
3617    /// `<workspace>/Djogi.toml`, matching the security contract that
3618    /// secrets only live in env vars.
3619    #[test]
3620    fn a1_round2_env_override_beats_workspace_toml() {
3621        let work = temp_workspace("a1r2_env_override");
3622        let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
3623                    max_connections = 1\ndev_mode = false\n\
3624                    [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3625        fs::write(work.join("Djogi.toml"), toml).unwrap();
3626        let env_guard = DatabaseUrlEnvGuard::new();
3627        env_guard.set("postgres://from-env/test");
3628        let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
3629        assert_eq!(
3630            config.database.url, "postgres://from-env/test",
3631            "env DATABASE_URL must win over workspace Djogi.toml"
3632        );
3633        let _ = fs::remove_dir_all(&work);
3634    }
3635
3636    #[test]
3637    fn apply_no_pending_is_identity_free_and_skips_pool_connect() {
3638        let work = temp_workspace("apply_no_pending");
3639        write_unreachable_config(&work);
3640
3641        let exit = without_database_url(|| {
3642            let runtime = tokio::runtime::Builder::new_current_thread()
3643                .enable_all()
3644                .build()
3645                .expect("runtime");
3646            runtime.block_on(run_apply(&work, &FakeMode::Real, None, false))
3647        });
3648
3649        assert_eq!(
3650            exit, 0,
3651            "no-pending apply must return before identity resolution or pool checkout"
3652        );
3653        let _ = fs::remove_dir_all(&work);
3654    }
3655
3656    #[test]
3657    fn discover_pending_plans_orders_phase_zero_before_normal_global() {
3658        let work = temp_workspace("discover_pending_phase_zero_first");
3659        write_pending_json(
3660            &djogi::migrate::pending_json_path(
3661                &work,
3662                &BucketKey {
3663                    database: "main".to_string(),
3664                    app: String::new(),
3665                },
3666            ),
3667            "main",
3668            "",
3669            "V20260606010101__later_global",
3670            &[],
3671        );
3672        write_pending_json(
3673            &djogi::migrate::phase_zero_pending_json_path(
3674                &work,
3675                "main",
3676                djogi::migrate::PHASE_ZERO_VERSION,
3677            ),
3678            "main",
3679            "",
3680            djogi::migrate::PHASE_ZERO_VERSION,
3681            &[],
3682        );
3683
3684        let discovered = discover_pending_plans(&work).expect("discover");
3685        assert_eq!(discovered.len(), 2);
3686        assert_eq!(
3687            discovered[0].plan.version,
3688            djogi::migrate::PHASE_ZERO_VERSION
3689        );
3690        assert!(discovered[0].is_phase_zero);
3691        assert_eq!(discovered[1].plan.version, "V20260606010101__later_global");
3692        let _ = fs::remove_dir_all(&work);
3693    }
3694
3695    /// Same-version buckets order by recorded depends_on, not path order.
3696    /// `system` depends on `users`, so `users` must come first even though
3697    /// `system` sorts earlier alphabetically.
3698    #[test]
3699    fn discover_orders_same_version_buckets_by_depends_on() {
3700        let work = temp_workspace("discover_pending_depends_on");
3701        write_pending_json(
3702            &djogi::migrate::pending_json_path(
3703                &work,
3704                &BucketKey {
3705                    database: "main".to_string(),
3706                    app: "system".to_string(),
3707                },
3708            ),
3709            "main",
3710            "system",
3711            "V20260609000000__initial",
3712            &["users"],
3713        );
3714        write_pending_json(
3715            &djogi::migrate::pending_json_path(
3716                &work,
3717                &BucketKey {
3718                    database: "main".to_string(),
3719                    app: "users".to_string(),
3720                },
3721            ),
3722            "main",
3723            "users",
3724            "V20260609000000__initial",
3725            &[],
3726        );
3727
3728        let plans = discover_pending_plans(&work).expect("discovers");
3729        let apps: Vec<&str> = plans.iter().map(|p| p.bucket.app.as_str()).collect();
3730        assert_eq!(apps, ["users", "system"]);
3731        let _ = fs::remove_dir_all(&work);
3732    }
3733
3734    /// Buckets with no dependencies should be ordered alphabetically by app
3735    /// name as a deterministic tiebreak in Kahn's topological sort.
3736    #[test]
3737    fn discover_orders_no_dependency_buckets_alphabetically() {
3738        let work = temp_workspace("discover_pending_alpha_tiebreak");
3739        // Three buckets, same version, no dependencies — should emit alpha, bravo, charlie
3740        for app in &["charlie", "bravo", "alpha"] {
3741            write_pending_json(
3742                &djogi::migrate::pending_json_path(
3743                    &work,
3744                    &BucketKey {
3745                        database: "main".to_string(),
3746                        app: app.to_string(),
3747                    },
3748                ),
3749                "main",
3750                app,
3751                "V20260609000000__initial",
3752                &[],
3753            );
3754        }
3755
3756        let plans = discover_pending_plans(&work).expect("discovers");
3757        let apps: Vec<&str> = plans.iter().map(|p| p.bucket.app.as_str()).collect();
3758        assert_eq!(apps, ["alpha", "bravo", "charlie"]);
3759        let _ = fs::remove_dir_all(&work);
3760    }
3761
3762    /// depends_on referencing a bucket NOT in the current pending set is
3763    /// silently ignored (REQ-398-6: already applied earlier / no delta this run).
3764    #[test]
3765    fn discover_depends_on_missing_bucket_is_ignored() {
3766        let work = temp_workspace("discover_pending_deps_missing");
3767        // system depends on billing, but billing has no pending file
3768        write_pending_json(
3769            &djogi::migrate::pending_json_path(
3770                &work,
3771                &BucketKey {
3772                    database: "main".to_string(),
3773                    app: "system".to_string(),
3774                },
3775            ),
3776            "main",
3777            "system",
3778            "V20260609000000__initial",
3779            &["billing"],
3780        );
3781
3782        let plans = discover_pending_plans(&work).expect("discovers");
3783        assert_eq!(plans.len(), 1);
3784        assert_eq!(plans[0].bucket.app, "system");
3785        let _ = fs::remove_dir_all(&work);
3786    }
3787
3788    /// Same-version buckets with a dependency cycle are refused at apply time
3789    /// (REQ-398-7 defensive half — compose should have caught this, but apply
3790    /// guards against hand-edited or corrupted pending files).
3791    #[test]
3792    fn discover_depends_on_cycle_is_refused() {
3793        let work = temp_workspace("discover_pending_deps_cycle");
3794        write_pending_json(
3795            &djogi::migrate::pending_json_path(
3796                &work,
3797                &BucketKey {
3798                    database: "main".to_string(),
3799                    app: "alpha".to_string(),
3800                },
3801            ),
3802            "main",
3803            "alpha",
3804            "V20260609000000__initial",
3805            &["beta"],
3806        );
3807        write_pending_json(
3808            &djogi::migrate::pending_json_path(
3809                &work,
3810                &BucketKey {
3811                    database: "main".to_string(),
3812                    app: "beta".to_string(),
3813                },
3814            ),
3815            "main",
3816            "beta",
3817            "V20260609000000__initial",
3818            &["alpha"],
3819        );
3820
3821        let err = discover_pending_plans(&work).expect_err("cycle must be refused");
3822        assert!(
3823            err.contains("alpha") && err.contains("beta") && err.contains("cycle"),
3824            "error should name both apps and mention cycle, got: {err}"
3825        );
3826        let _ = fs::remove_dir_all(&work);
3827    }
3828
3829    /// A singleton pending group whose `depends_on` carries a label that
3830    /// fails `is_acceptable_pending_path_component` must be refused. The
3831    /// singleton fast-path bypasses the topo-sort, so without the
3832    /// pre-fast-path validation loop a hand-edited or corrupted label
3833    /// (path traversal, embedded whitespace) would slip through silently.
3834    /// Drives `order_pending_groups_by_dependencies` directly because the
3835    /// invalid label lives inside the pending JSON, not in the filename
3836    /// that discovery already validates.
3837    #[test]
3838    fn single_bucket_with_invalid_depends_on_is_refused() {
3839        let make_singleton = |dep: &str| -> Vec<DiscoveredPendingPlan> {
3840            let plan = PendingPlan {
3841                format_version: djogi::migrate::PENDING_FORMAT_VERSION.to_string(),
3842                bucket_database: "main".to_string(),
3843                bucket_app: "system".to_string(),
3844                version: "V20260609000000__initial".to_string(),
3845                slug: "test".to_string(),
3846                model_snapshot: djogi::migrate::AppliedSchema {
3847                    djogi_version: "0.1.0".to_string(),
3848                    enums: std::collections::BTreeMap::new(),
3849                    format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3850                    generated_at: "2026-06-09T00:00:00Z".to_string(),
3851                    indexes: Vec::new(),
3852                    models: std::collections::BTreeMap::new(),
3853                    registered_apps: vec!["system".to_string()],
3854                },
3855                checksum_up: "V1:".to_string() + &"a".repeat(64),
3856                checksum_down: None,
3857                composed_at: "2026-06-09T00:00:00Z".to_string(),
3858                depends_on: vec![dep.to_string()],
3859            };
3860            vec![DiscoveredPendingPlan {
3861                path: PathBuf::from("target/djogi_pending/main/system.json"),
3862                bucket: BucketKey {
3863                    database: "main".to_string(),
3864                    app: "system".to_string(),
3865                },
3866                plan,
3867                is_phase_zero: false,
3868            }]
3869        };
3870
3871        for bad_label in ["../traversal", "has space"] {
3872            let err = order_pending_groups_by_dependencies(make_singleton(bad_label))
3873                .expect_err("invalid singleton depends_on label must be refused");
3874            assert!(
3875                err.contains("invalid depends_on label")
3876                    && err.contains("main")
3877                    && err.contains("system"),
3878                "[{bad_label}] error must name database, app, and the invalid label: {err}"
3879            );
3880        }
3881    }
3882
3883    /// End-to-end test: two buckets, same version, `system.event_log`
3884    /// FK→`users.users`, composed and applied through real Postgres.
3885    /// Asserts both tables exist, the FK constraint exists in pg_constraint,
3886    /// and the ledger has exactly two rows for the composed version.
3887    /// Uses `#[djogi_test]` for per-test database isolation (the macro drops
3888    /// the test database on normal return or caught panic).
3889    #[djogi::djogi_test]
3890    async fn cross_bucket_fk_applies_in_dependency_order(mut ctx: djogi::context::DjogiContext) {
3891        // Unique suffix for table names — avoids collisions when tests run in parallel.
3892        static E2E_COUNTER: AtomicUsize = AtomicUsize::new(0);
3893        let n = E2E_COUNTER.fetch_add(1, Ordering::SeqCst);
3894        let users_table = format!("e2e_users_{n}");
3895        let event_log_table = format!("e2e_event_log_{n}");
3896
3897        let work = temp_workspace("cross-bucket-fk-e2e");
3898        let guard = djogi::migrate::acquire_workspace_lock(
3899            &work.join(LOCK_FILE_NAME),
3900            std::time::Duration::from_secs(5),
3901        )
3902        .expect("lock workspace");
3903
3904        // Construct models: users bucket (PK only) + system bucket (FK→users).
3905        let mut models: std::collections::BTreeMap<
3906            djogi::migrate::BucketKey,
3907            djogi::migrate::AppliedSchema,
3908        > = std::collections::BTreeMap::new();
3909
3910        let users_bucket = BucketKey {
3911            database: "main".into(),
3912            app: "users".into(),
3913        };
3914        let system_bucket = BucketKey {
3915            database: "main".into(),
3916            app: "system".into(),
3917        };
3918
3919        {
3920            let mut users_schema = djogi::migrate::AppliedSchema {
3921                djogi_version: env!("CARGO_PKG_VERSION").to_string(),
3922                enums: std::collections::BTreeMap::new(),
3923                format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3924                generated_at: "2026-06-10T00:00:00Z".to_string(),
3925                indexes: Vec::new(),
3926                models: std::collections::BTreeMap::new(),
3927                registered_apps: vec!["users".to_string()],
3928            };
3929            users_schema.models.insert(
3930                users_table.clone(),
3931                djogi::migrate::TableSchema {
3932                    app: Some("users".to_string()),
3933                    columns: vec![djogi::migrate::ColumnSchema {
3934                        name: "id".to_string(),
3935                        sql_type: "BIGINT".to_string(),
3936                        nullable: false,
3937                        default_sql: Some("heerid_next_desc()".to_string()),
3938                        ..default_col()
3939                    }],
3940                    primary_key: djogi::migrate::PrimaryKeySchema {
3941                        columns: vec!["id".to_string()],
3942                        kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
3943                    },
3944                    table: users_table.clone(),
3945                    ..default_table()
3946                },
3947            );
3948            models.insert(users_bucket.clone(), users_schema);
3949        }
3950
3951        {
3952            let mut system_schema = djogi::migrate::AppliedSchema {
3953                djogi_version: env!("CARGO_PKG_VERSION").to_string(),
3954                enums: std::collections::BTreeMap::new(),
3955                format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3956                generated_at: "2026-06-10T00:00:00Z".to_string(),
3957                indexes: Vec::new(),
3958                models: std::collections::BTreeMap::new(),
3959                registered_apps: vec!["system".to_string()],
3960            };
3961            system_schema.models.insert(
3962                event_log_table.clone(),
3963                djogi::migrate::TableSchema {
3964                    app: Some("system".to_string()),
3965                    columns: vec![
3966                        djogi::migrate::ColumnSchema {
3967                            name: "id".to_string(),
3968                            sql_type: "BIGINT".to_string(),
3969                            nullable: false,
3970                            default_sql: Some("heerid_next_desc()".to_string()),
3971                            ..default_col()
3972                        },
3973                        djogi::migrate::ColumnSchema {
3974                            name: "user_id".to_string(),
3975                            sql_type: "BIGINT".to_string(),
3976                            nullable: false,
3977                            foreign_key: Some(djogi::migrate::ForeignKeySchema {
3978                                deferrable: false,
3979                                initially_deferred: false,
3980                                on_delete: djogi::migrate::OnDeleteSchema::Restrict,
3981                                ref_column: "id".to_string(),
3982                                ref_table: users_table.clone(),
3983                            }),
3984                            ..default_col()
3985                        },
3986                    ],
3987                    primary_key: djogi::migrate::PrimaryKeySchema {
3988                        columns: vec!["id".to_string()],
3989                        kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
3990                    },
3991                    table: event_log_table.clone(),
3992                    ..default_table()
3993                },
3994            );
3995            models.insert(system_bucket.clone(), system_schema);
3996        }
3997
3998        // Empty snapshots — fresh compose so differ sees all tables as new.
3999        let mut snapshots = std::collections::BTreeMap::new();
4000        for bucket in [&users_bucket, &system_bucket] {
4001            snapshots.insert(
4002                bucket.clone(),
4003                djogi::migrate::AppliedSchema {
4004                    djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4005                    enums: std::collections::BTreeMap::new(),
4006                    format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4007                    generated_at: "2026-06-10T00:00:00Z".to_string(),
4008                    indexes: Vec::new(),
4009                    models: std::collections::BTreeMap::new(),
4010                    registered_apps: vec![bucket.app.clone()],
4011                },
4012            );
4013        }
4014
4015        let apps = vec![
4016            djogi::migrate::AppLifecycle {
4017                label: "users".into(),
4018                database: "main".into(),
4019                renamed_from: None,
4020                tombstone: false,
4021            },
4022            djogi::migrate::AppLifecycle {
4023                label: "system".into(),
4024                database: "main".into(),
4025                renamed_from: None,
4026                tombstone: false,
4027            },
4028        ];
4029
4030        // Compose — generates pending files + migration SQL.
4031        let compose_req = djogi::migrate::ComposeRequest {
4032            workspace_root: &work,
4033            models: &models,
4034            snapshots: &snapshots,
4035            apps: &apps,
4036            name: "cross-bucket-fk",
4037            allow_destructive: false,
4038            force_overwrite: false,
4039            now: time::OffsetDateTime::UNIX_EPOCH
4040                + time::Duration::days(19726)
4041                + time::Duration::seconds(0),
4042            _guard: &guard,
4043            pk_flip_join_table_option: None,
4044            // Emit Phase 0 so the runner's SingleNodeDev provisioning sets the
4045            // heer.node_id GUC on the migration session before binding node 1.
4046            // Phase 0 lands at PHASE_ZERO_VERSION (different from composed_version)
4047            // so the version-scoped ledger assertions below are unaffected.
4048            skip_phase_zero_auto_emit: false,
4049        };
4050
4051        let compose_report = djogi::migrate::compose(compose_req).expect("compose");
4052        assert!(
4053            !compose_report.composed_buckets.is_empty(),
4054            "compose should produce delta buckets"
4055        );
4056
4057        // Release the workspace lock before driving run_apply: run_apply acquires the
4058        // same lock internally (step 5). The lock was only needed for the compose
4059        // phase; holding it through the spawn_blocking call causes flock(LOCK_EX|LOCK_NB)
4060        // to return EWOULDBLOCK on the second open-file-description, blocking run_apply
4061        // for the full GUARD_DEFAULT_TIMEOUT (30 s) and returning exit code 1.
4062        drop(guard);
4063
4064        // Extract the composed version from the report.
4065        let composed_version = &compose_report.composed_buckets[0].version;
4066
4067        // `run_apply` reads config from a Djogi.toml file rather than accepting
4068        // a DjogiContext, so we construct the per-test database URL by querying
4069        // current_database() and replacing it in the admin DATABASE_URL.
4070        let test_db = ctx
4071            .raw_scalar::<String>("SELECT current_database()", &[])
4072            .await
4073            .expect("current_database");
4074        let admin_url = std::env::var("DATABASE_URL").expect(
4075            "DATABASE_URL must be set for djogi_test \
4076             (e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
4077        );
4078        let test_db_url = replace_db_in_url(&admin_url, &test_db)
4079            .expect("construct per-test database URL from DATABASE_URL");
4080
4081        // Write minimal workspace config for run_apply.
4082        fs::write(
4083            work.join("Djogi.toml"),
4084            format!(
4085                "[database]\nurl = \"{test_db_url}\"\n\
4086                 max_connections = 1\ndev_mode = false\n\
4087                 [server]\nhost = \"127.0.0.1\"\nport = 8080\n"
4088            ),
4089        )
4090        .unwrap();
4091
4092        // Override DATABASE_URL to the per-test database for the duration of
4093        // run_apply. load_from_workspace unconditionally replaces config.database.url
4094        // with DATABASE_URL when the env var is present, so without this the
4095        // migrations land in the admin database rather than the per-test one.
4096        // DatabaseUrlEnvGuard holds the process-wide env mutex and restores the
4097        // prior value on Drop. #[djogi_test] does not acquire this mutex, so
4098        // there is no deadlock risk.
4099        let db_url_guard = DatabaseUrlEnvGuard::new();
4100        db_url_guard.set(&test_db_url);
4101
4102        // Drive the apply loop through run_apply (same path as `djogi migrations apply`).
4103        // spawn_blocking avoids a nested-runtime panic: djogi_test already owns a
4104        // tokio runtime; creating another with block_on from inside async context
4105        // panics. A blocking thread has no runtime, so the new runtime is safe there.
4106        let exit = {
4107            let work = work.clone();
4108            tokio::task::spawn_blocking(move || {
4109                tokio::runtime::Builder::new_current_thread()
4110                    .enable_all()
4111                    .build()
4112                    .expect("runtime")
4113                    .block_on(run_apply(
4114                        &work,
4115                        &FakeMode::Real,
4116                        None,
4117                        true, // single_node_dev: select SingleNodeDev identity so Phase 0 seeds + binds node 1
4118                    ))
4119            })
4120            .await
4121            .expect("spawn_blocking join")
4122        };
4123
4124        // Restore DATABASE_URL before the assertions; the per-test ctx already
4125        // points at the correct database and does not use DATABASE_URL.
4126        drop(db_url_guard);
4127
4128        assert_eq!(
4129            exit, 0,
4130            "apply should succeed (tables created in FK dependency order)"
4131        );
4132
4133        // Assert 1: FK constraint exists on event_log → users.
4134        let fk_rows = ctx
4135            .raw_rows(
4136                "SELECT c.conname \
4137                 FROM pg_constraint c \
4138                 JOIN pg_class r ON r.oid = c.conrelid \
4139                 JOIN pg_class f ON f.oid = c.confrelid \
4140                 WHERE r.relname = $1 AND c.contype = 'f' AND f.relname = $2",
4141                &[&event_log_table.as_str(), &users_table.as_str()],
4142            )
4143            .await
4144            .expect("query pg_constraint");
4145        assert!(
4146            !fk_rows.is_empty(),
4147            "FK constraint should exist from {event_log_table} → {users_table}"
4148        );
4149
4150        // Assert 2: Ledger has exactly TWO rows for the composed version
4151        // (one per bucket: users and system). Do NOT assert total row count —
4152        // phase-zero row also exists at PHASE_ZERO_VERSION.
4153        let ledger_rows = ctx
4154            .raw_rows(
4155                "SELECT app_label FROM djogi_schema_migrations \
4156                 WHERE version = $1 AND status = 'applied'",
4157                &[&composed_version.as_str()],
4158            )
4159            .await
4160            .expect("query ledger");
4161        assert_eq!(
4162            ledger_rows.len(),
4163            2,
4164            "ledger should have exactly 2 rows for composed version {composed_version} \
4165             (users + system), got {} rows",
4166            ledger_rows.len()
4167        );
4168        let app_labels: Vec<String> = ledger_rows
4169            .iter()
4170            .map(|row| row.try_get(0).expect("decode app_label"))
4171            .collect();
4172        assert!(
4173            app_labels.contains(&"users".to_string()),
4174            "ledger should have 'users' bucket: {app_labels:?}"
4175        );
4176        assert!(
4177            app_labels.contains(&"system".to_string()),
4178            "ledger should have 'system' bucket: {app_labels:?}"
4179        );
4180
4181        // Assert 3: Verify ordering — users applied before system.
4182        let ordered_rows = ctx
4183            .raw_rows(
4184                "SELECT app_label, id FROM djogi_schema_migrations \
4185                 WHERE version = $1 AND status = 'applied' ORDER BY id",
4186                &[&composed_version.as_str()],
4187            )
4188            .await
4189            .expect("query ledger ordered");
4190        assert_eq!(ordered_rows[0].try_get::<_, String>(0).unwrap(), "users");
4191        assert_eq!(ordered_rows[1].try_get::<_, String>(0).unwrap(), "system");
4192
4193        let _ = fs::remove_dir_all(&work);
4194
4195        // Note: reverting the stage-2 topo sort in discover_pending_plans
4196        // (removing the order_pending_groups_by_dependencies call) would cause
4197        // this test to fail — `system` sorts before `users` alphabetically,
4198        // so the FK constraint on event_log.user_id → users.id would fire
4199        // before the users table exists (SQLSTATE 42P01 undefined_table).
4200    }
4201
4202    /// End-to-end test: two buckets sharing an enum type (`mood`) plus
4203    /// a cross-bucket FK. Compose + apply, then assert exactly one `CREATE TYPE`
4204    /// in Postgres, both tables exist, and no error during apply.
4205    #[djogi::djogi_test]
4206    async fn shared_enum_multi_slice_applies(mut ctx: djogi::context::DjogiContext) {
4207        // Unique suffix for table names — avoids collisions when tests run in parallel.
4208        static E2E_COUNTER: AtomicUsize = AtomicUsize::new(0);
4209        let n = E2E_COUNTER.fetch_add(1, Ordering::SeqCst);
4210        let posts_table = format!("e2e_posts_{n}");
4211        let comments_table = format!("e2e_comments_{n}");
4212
4213        let work = temp_workspace("shared-enum-e2e");
4214        let guard = djogi::migrate::acquire_workspace_lock(
4215            &work.join(LOCK_FILE_NAME),
4216            std::time::Duration::from_secs(5),
4217        )
4218        .expect("lock workspace");
4219
4220        // Construct models: alpha bucket (owns enum + posts table) +
4221        // beta bucket (references same enum + FK → alpha.posts).
4222        let mut models: std::collections::BTreeMap<
4223            djogi::migrate::BucketKey,
4224            djogi::migrate::AppliedSchema,
4225        > = std::collections::BTreeMap::new();
4226
4227        let alpha_bucket = BucketKey {
4228            database: "main".into(),
4229            app: "alpha".into(),
4230        };
4231        let beta_bucket = BucketKey {
4232            database: "main".into(),
4233            app: "beta".into(),
4234        };
4235
4236        // Shared enum schema.
4237        let mood_enum = djogi::migrate::schema::EnumSchema {
4238            name: "mood".to_string(),
4239            variants: vec!["happy".to_string(), "sad".to_string()],
4240        };
4241
4242        // Alpha: posts table with mood column (enum owner by alphabetical order).
4243        {
4244            let mut alpha_schema = djogi::migrate::AppliedSchema {
4245                djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4246                enums: std::collections::BTreeMap::new(),
4247                format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4248                generated_at: "2026-06-10T00:00:00Z".to_string(),
4249                indexes: Vec::new(),
4250                models: std::collections::BTreeMap::new(),
4251                registered_apps: vec!["alpha".to_string()],
4252            };
4253            alpha_schema
4254                .enums
4255                .insert("mood".to_string(), mood_enum.clone());
4256            alpha_schema.models.insert(
4257                posts_table.clone(),
4258                djogi::migrate::TableSchema {
4259                    app: Some("alpha".to_string()),
4260                    columns: vec![
4261                        djogi::migrate::ColumnSchema {
4262                            name: "id".to_string(),
4263                            sql_type: "BIGINT".to_string(),
4264                            nullable: false,
4265                            default_sql: Some("heerid_next_desc()".to_string()),
4266                            ..default_col()
4267                        },
4268                        djogi::migrate::ColumnSchema {
4269                            name: "mood".to_string(),
4270                            sql_type: "mood".to_string(),
4271                            nullable: true,
4272                            ..default_col()
4273                        },
4274                    ],
4275                    primary_key: djogi::migrate::PrimaryKeySchema {
4276                        columns: vec!["id".to_string()],
4277                        kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
4278                    },
4279                    table: posts_table.clone(),
4280                    ..default_table()
4281                },
4282            );
4283            models.insert(alpha_bucket.clone(), alpha_schema);
4284        }
4285
4286        // Beta: comments table with mood column + FK → alpha.posts.
4287        {
4288            let mut beta_schema = djogi::migrate::AppliedSchema {
4289                djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4290                enums: std::collections::BTreeMap::new(),
4291                format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4292                generated_at: "2026-06-10T00:00:00Z".to_string(),
4293                indexes: Vec::new(),
4294                models: std::collections::BTreeMap::new(),
4295                registered_apps: vec!["beta".to_string()],
4296            };
4297            beta_schema.enums.insert("mood".to_string(), mood_enum);
4298            beta_schema.models.insert(
4299                comments_table.clone(),
4300                djogi::migrate::TableSchema {
4301                    app: Some("beta".to_string()),
4302                    columns: vec![
4303                        djogi::migrate::ColumnSchema {
4304                            name: "id".to_string(),
4305                            sql_type: "BIGINT".to_string(),
4306                            nullable: false,
4307                            default_sql: Some("heerid_next_desc()".to_string()),
4308                            ..default_col()
4309                        },
4310                        djogi::migrate::ColumnSchema {
4311                            name: "post_id".to_string(),
4312                            sql_type: "BIGINT".to_string(),
4313                            nullable: false,
4314                            foreign_key: Some(djogi::migrate::ForeignKeySchema {
4315                                deferrable: false,
4316                                initially_deferred: false,
4317                                on_delete: djogi::migrate::OnDeleteSchema::Restrict,
4318                                ref_column: "id".to_string(),
4319                                ref_table: posts_table.clone(),
4320                            }),
4321                            ..default_col()
4322                        },
4323                        djogi::migrate::ColumnSchema {
4324                            name: "author_mood".to_string(),
4325                            sql_type: "mood".to_string(),
4326                            nullable: true,
4327                            ..default_col()
4328                        },
4329                    ],
4330                    primary_key: djogi::migrate::PrimaryKeySchema {
4331                        columns: vec!["id".to_string()],
4332                        kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
4333                    },
4334                    table: comments_table.clone(),
4335                    ..default_table()
4336                },
4337            );
4338            models.insert(beta_bucket.clone(), beta_schema);
4339        }
4340
4341        // Empty snapshots — fresh compose so differ sees all tables + enum as new.
4342        let mut snapshots = std::collections::BTreeMap::new();
4343        for bucket in [&alpha_bucket, &beta_bucket] {
4344            snapshots.insert(
4345                bucket.clone(),
4346                djogi::migrate::AppliedSchema {
4347                    djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4348                    enums: std::collections::BTreeMap::new(),
4349                    format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4350                    generated_at: "2026-06-10T00:00:00Z".to_string(),
4351                    indexes: Vec::new(),
4352                    models: std::collections::BTreeMap::new(),
4353                    registered_apps: vec![bucket.app.clone()],
4354                },
4355            );
4356        }
4357
4358        let apps = vec![
4359            djogi::migrate::AppLifecycle {
4360                label: "alpha".into(),
4361                database: "main".into(),
4362                renamed_from: None,
4363                tombstone: false,
4364            },
4365            djogi::migrate::AppLifecycle {
4366                label: "beta".into(),
4367                database: "main".into(),
4368                renamed_from: None,
4369                tombstone: false,
4370            },
4371        ];
4372
4373        // Compose — generates pending files + migration SQL.
4374        let compose_req = djogi::migrate::ComposeRequest {
4375            workspace_root: &work,
4376            models: &models,
4377            snapshots: &snapshots,
4378            apps: &apps,
4379            name: "shared-enum-multi-slice",
4380            allow_destructive: false,
4381            force_overwrite: false,
4382            now: time::OffsetDateTime::UNIX_EPOCH
4383                + time::Duration::days(19726)
4384                + time::Duration::seconds(0),
4385            _guard: &guard,
4386            pk_flip_join_table_option: None,
4387            skip_phase_zero_auto_emit: false,
4388        };
4389
4390        let compose_report = djogi::migrate::compose(compose_req).expect("compose");
4391        assert!(
4392            !compose_report.composed_buckets.is_empty(),
4393            "compose should produce delta buckets"
4394        );
4395
4396        // Release the workspace lock before driving run_apply.
4397        drop(guard);
4398
4399        // Extract the composed version from the report.
4400        let composed_version = &compose_report.composed_buckets[0].version;
4401
4402        // Construct per-test database URL from DATABASE_URL.
4403        let test_db = ctx
4404            .raw_scalar::<String>("SELECT current_database()", &[])
4405            .await
4406            .expect("current_database");
4407        let admin_url = std::env::var("DATABASE_URL").expect(
4408            "DATABASE_URL must be set for djogi_test \
4409             (e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
4410        );
4411        let test_db_url = replace_db_in_url(&admin_url, &test_db)
4412            .expect("construct per-test database URL from DATABASE_URL");
4413
4414        // Write minimal workspace config for run_apply.
4415        fs::write(
4416            work.join("Djogi.toml"),
4417            format!(
4418                "[database]\nurl = \"{test_db_url}\"\n\
4419                 max_connections = 1\ndev_mode = false\n\
4420                 [server]\nhost = \"127.0.0.1\"\nport = 8080\n"
4421            ),
4422        )
4423        .unwrap();
4424
4425        let db_url_guard = DatabaseUrlEnvGuard::new();
4426        db_url_guard.set(&test_db_url);
4427
4428        // Drive the apply loop through run_apply.
4429        let exit = {
4430            let work = work.clone();
4431            tokio::task::spawn_blocking(move || {
4432                tokio::runtime::Builder::new_current_thread()
4433                    .enable_all()
4434                    .build()
4435                    .expect("runtime")
4436                    .block_on(run_apply(
4437                        &work,
4438                        &FakeMode::Real,
4439                        None,
4440                        true, // single_node_dev
4441                    ))
4442            })
4443            .await
4444            .expect("spawn_blocking join")
4445        };
4446
4447        drop(db_url_guard);
4448
4449        assert_eq!(
4450            exit, 0,
4451            "apply should succeed (enum created once, tables in FK order)"
4452        );
4453
4454        // Assert 1: Exactly one `mood` enum type in pg_type.
4455        let mood_count = ctx
4456            .raw_scalar::<i64>(
4457                "SELECT count(*) FROM pg_type WHERE typname = $1",
4458                &[&"mood"],
4459            )
4460            .await
4461            .expect("query pg_type for mood");
4462        assert_eq!(
4463            mood_count, 1,
4464            "exactly one mood enum type should exist in pg_type, got {mood_count}"
4465        );
4466
4467        // Assert 2: Both tables exist.
4468        let table_count = ctx
4469            .raw_scalar::<i64>(
4470                "SELECT count(*) FROM pg_class WHERE relname = $1 OR relname = $2",
4471                &[&posts_table.as_str(), &comments_table.as_str()],
4472            )
4473            .await
4474            .expect("query pg_class for tables");
4475        assert_eq!(
4476            table_count, 2,
4477            "both tables should exist ({posts_table}, {comments_table}), got {table_count}"
4478        );
4479
4480        // Assert 3: Ledger has exactly two rows for the composed version.
4481        let ledger_rows = ctx
4482            .raw_rows(
4483                "SELECT app_label FROM djogi_schema_migrations \
4484                 WHERE version = $1 AND status = 'applied'",
4485                &[&composed_version.as_str()],
4486            )
4487            .await
4488            .expect("query ledger");
4489        assert_eq!(
4490            ledger_rows.len(),
4491            2,
4492            "ledger should have exactly 2 rows for composed version {composed_version} \
4493             (alpha + beta), got {} rows",
4494            ledger_rows.len()
4495        );
4496
4497        let _ = fs::remove_dir_all(&work);
4498    }
4499
4500    /// Replace the database component of a Postgres URL with a new name.
4501    /// Mirrors `djogi::migrate::reset::replace_db_in_url`; inlined here
4502    /// so the test module does not depend on that internal path.
4503    fn replace_db_in_url(url: &str, new_db: &str) -> Option<String> {
4504        let body = url
4505            .strip_prefix("postgres://")
4506            .or_else(|| url.strip_prefix("postgresql://"))?;
4507        let scheme = if url.starts_with("postgres://") {
4508            "postgres://"
4509        } else {
4510            "postgresql://"
4511        };
4512        let mut idx = 0usize;
4513        let body_bytes = body.as_bytes();
4514        while idx < body_bytes.len() && body_bytes[idx] != b'/' {
4515            idx += 1;
4516        }
4517        if idx >= body_bytes.len() {
4518            return None;
4519        }
4520        let authority = &body[..idx];
4521        let path_start = idx + 1;
4522        let mut path_end = path_start;
4523        while path_end < body_bytes.len() && body_bytes[path_end] != b'?' {
4524            path_end += 1;
4525        }
4526        let trailing = &body[path_end..];
4527        Some(format!("{scheme}{authority}/{new_db}{trailing}"))
4528    }
4529
4530    fn default_col() -> djogi::migrate::ColumnSchema {
4531        djogi::migrate::ColumnSchema {
4532            check: None,
4533            codec: None,
4534            comment: None,
4535            default_sql: None,
4536            foreign_key: None,
4537            generated: None,
4538            identity: None,
4539            index_type: None,
4540            indexed: false,
4541            max_length: None,
4542            name: "".to_string(),
4543            nullable: false,
4544            on_delete: None,
4545            outbox_exclude: false,
4546            rationale: None,
4547            relation_kind: None,
4548            renamed_from: None,
4549            sequence_within: None,
4550            sql_type: "".to_string(),
4551            unique: false,
4552            type_change_using: None,
4553        }
4554    }
4555
4556    fn default_table() -> djogi::migrate::TableSchema {
4557        djogi::migrate::TableSchema {
4558            app: None,
4559            columns: Vec::new(),
4560            exclusion_constraints: Vec::new(),
4561            fts: None,
4562            is_through: false,
4563            moved_from_app: None,
4564            partition: None,
4565            primary_key: djogi::migrate::PrimaryKeySchema {
4566                columns: Vec::new(),
4567                kind: djogi::migrate::PkKindSchema::Composite,
4568            },
4569            rationale: None,
4570            renamed_from: None,
4571            rls_enabled: false,
4572            table: "".to_string(),
4573            table_comment: None,
4574            storage_params: None,
4575            tablespace: None,
4576            tenant_key: None,
4577        }
4578    }
4579
4580    #[test]
4581    fn discover_pending_plans_refuses_malformed_pending_json() {
4582        let work = temp_workspace("discover_pending_malformed");
4583        let path = djogi::migrate::pending_json_path(
4584            &work,
4585            &BucketKey {
4586                database: "main".to_string(),
4587                app: String::new(),
4588            },
4589        );
4590        fs::create_dir_all(path.parent().unwrap()).unwrap();
4591        fs::write(&path, b"{ not json").unwrap();
4592
4593        let err = discover_pending_plans(&work).expect_err("malformed pending must refuse");
4594        assert!(err.contains("parse pending JSON"));
4595        let _ = fs::remove_dir_all(&work);
4596    }
4597
4598    #[test]
4599    fn discover_pending_plans_refuses_hidden_phase_zero_database_mismatch() {
4600        let work = temp_workspace("discover_pending_phase_zero_db_mismatch");
4601        write_pending_json(
4602            &djogi::migrate::phase_zero_pending_json_path(
4603                &work,
4604                "main",
4605                djogi::migrate::PHASE_ZERO_VERSION,
4606            ),
4607            "other_db",
4608            "",
4609            djogi::migrate::PHASE_ZERO_VERSION,
4610            &[],
4611        );
4612
4613        let err = discover_pending_plans(&work).expect_err("hidden Phase 0 mismatch must refuse");
4614        assert!(
4615            err.contains("expected main from path"),
4616            "unexpected error: {err}"
4617        );
4618        let _ = fs::remove_dir_all(&work);
4619    }
4620
4621    #[test]
4622    fn discover_pending_plans_refuses_normal_global_phase_zero_pending() {
4623        let work = temp_workspace("discover_pending_normal_global_phase_zero");
4624        let path = djogi::migrate::pending_json_path(
4625            &work,
4626            &BucketKey {
4627                database: "main".to_string(),
4628                app: String::new(),
4629            },
4630        );
4631        write_pending_json(&path, "main", "", djogi::migrate::PHASE_ZERO_VERSION, &[]);
4632
4633        let err = discover_pending_plans(&work).expect_err("normal-global Phase 0 must refuse");
4634        assert!(
4635            err.contains("Phase 0") && err.contains(".phase_zero"),
4636            "unexpected error: {err}"
4637        );
4638        let _ = fs::remove_dir_all(&work);
4639    }
4640
4641    #[test]
4642    fn discover_pending_plans_refuses_normal_pending_app_mismatch() {
4643        let work = temp_workspace("discover_pending_normal_app_mismatch");
4644        let path = djogi::migrate::pending_json_path(
4645            &work,
4646            &BucketKey {
4647                database: "main".to_string(),
4648                app: "billing".to_string(),
4649            },
4650        );
4651        write_pending_json(&path, "main", "audit", "V20260606010101__mismatch", &[]);
4652
4653        let err = discover_pending_plans(&work).expect_err("normal app mismatch must refuse");
4654        assert!(
4655            err.contains("expected billing from path"),
4656            "unexpected error: {err}"
4657        );
4658        let _ = fs::remove_dir_all(&work);
4659    }
4660
4661    #[test]
4662    fn discover_pending_plans_refuses_noncanonical_normal_pending_filename() {
4663        let work = temp_workspace("discover_pending_noncanonical_filename");
4664        let path = work.join("target/djogi_pending/main/bad-name.json");
4665        write_pending_json(&path, "main", "bad-name", "V20260606010101__bad_name", &[]);
4666
4667        let err = discover_pending_plans(&work).expect_err("non-canonical filename must refuse");
4668        assert!(
4669            err.contains("non-canonical app filename"),
4670            "unexpected error: {err}"
4671        );
4672        let _ = fs::remove_dir_all(&work);
4673    }
4674
4675    #[test]
4676    fn load_verified_pending_for_apply_refuses_changed_artifact() {
4677        let work = temp_workspace("apply_pending_changed_after_discovery");
4678        let path = djogi::migrate::pending_json_path(
4679            &work,
4680            &BucketKey {
4681                database: "main".to_string(),
4682                app: String::new(),
4683            },
4684        );
4685        write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
4686        let discovered = discover_pending_plans(&work).expect("discover");
4687        fs::write(
4688            &path,
4689            serde_json::to_vec_pretty(&PendingPlan {
4690                version: "V20260606010102__changed".to_string(),
4691                ..discovered[0].plan.clone()
4692            })
4693            .unwrap(),
4694        )
4695        .unwrap();
4696
4697        let err = load_verified_pending_for_apply(&discovered[0])
4698            .expect_err("apply must refuse a changed pending artifact");
4699        assert!(
4700            err.contains("changed after discovery"),
4701            "unexpected error: {err}"
4702        );
4703        let _ = fs::remove_dir_all(&work);
4704    }
4705
4706    #[test]
4707    fn reconcile_pending_plans_after_lock_refuses_added_artifact() {
4708        let work = temp_workspace("apply_pending_added_before_lock");
4709        let path = djogi::migrate::pending_json_path(
4710            &work,
4711            &BucketKey {
4712                database: "main".to_string(),
4713                app: String::new(),
4714            },
4715        );
4716        write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
4717        let discovered = discover_pending_plans(&work).expect("discover");
4718        write_pending_json(
4719            &djogi::migrate::phase_zero_pending_json_path(
4720                &work,
4721                "main",
4722                djogi::migrate::PHASE_ZERO_VERSION,
4723            ),
4724            "main",
4725            "",
4726            djogi::migrate::PHASE_ZERO_VERSION,
4727            &[],
4728        );
4729
4730        let err = reconcile_pending_plans_after_lock(&work, &discovered)
4731            .expect_err("locked reconciliation must refuse a changed pending set");
4732        assert!(
4733            err.contains("changed while waiting for the workspace lock"),
4734            "unexpected error: {err}"
4735        );
4736        let _ = fs::remove_dir_all(&work);
4737    }
4738
4739    #[test]
4740    fn reconcile_pending_plans_after_lock_accepts_unchanged_set() {
4741        let work = temp_workspace("apply_pending_stable_under_lock");
4742        let path = djogi::migrate::pending_json_path(
4743            &work,
4744            &BucketKey {
4745                database: "main".to_string(),
4746                app: String::new(),
4747            },
4748        );
4749        write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
4750        let discovered = discover_pending_plans(&work).expect("discover");
4751
4752        let locked = reconcile_pending_plans_after_lock(&work, &discovered)
4753            .expect("unchanged set must reconcile");
4754        assert_eq!(locked, discovered);
4755        let _ = fs::remove_dir_all(&work);
4756    }
4757
4758    #[test]
4759    fn repair_checksum_drift_is_identity_free() {
4760        let work = temp_workspace("repair_checksum_identity_free");
4761        write_unreachable_config(&work);
4762
4763        let exit = without_database_url(|| {
4764            repair_checksum_drift_cmd(
4765                "V20260601000000__repair_checksum",
4766                None,
4767                None,
4768                Some("V1:0000000000000000000000000000000000000000000000000000000000000000"),
4769                None,
4770                Some(work.clone()),
4771            )
4772        });
4773
4774        assert_eq!(
4775            exit,
4776            ExitCode::from(1),
4777            "checksum-drift should reach pool connection without shared identity validation"
4778        );
4779        let _ = fs::remove_dir_all(&work);
4780    }
4781
4782    #[test]
4783    fn repair_partial_apply_is_identity_free() {
4784        let work = temp_workspace("repair_partial_identity_free");
4785        write_unreachable_config(&work);
4786
4787        let exit = without_database_url(|| {
4788            repair_partial_apply_cmd(
4789                "V20260601000000__repair_partial",
4790                PartialApplyResolution::MarkRolledBack,
4791                "operator confirmed rollback",
4792                None,
4793                None,
4794                Some(work.clone()),
4795            )
4796        });
4797
4798        assert_eq!(
4799            exit,
4800            ExitCode::from(1),
4801            "partial-apply should reach pool connection without shared identity validation"
4802        );
4803        let _ = fs::remove_dir_all(&work);
4804    }
4805
4806    #[test]
4807    fn repair_snapshot_rebuild_is_identity_free() {
4808        let work = temp_workspace("repair_snapshot_identity_free");
4809        write_unreachable_config(&work);
4810
4811        let exit = without_database_url(|| {
4812            repair_snapshot_rebuild_cmd(None, None, None, Some(work.clone()))
4813        });
4814
4815        assert_eq!(
4816            exit,
4817            ExitCode::from(1),
4818            "snapshot-rebuild should reach pool connection without shared identity validation"
4819        );
4820        let _ = fs::remove_dir_all(&work);
4821    }
4822
4823    /// `compose_with_inputs` must consume the disk-discovered buckets, not
4824    /// just the inventory's. We set up a
4825    /// `migrations/main/billing/schema_snapshot.json` with a `widgets`
4826    /// table, pass an EMPTY models map (simulating "billing app was
4827    /// removed from the workspace"), set `allow_destructive = true`,
4828    /// and assert the resulting up SQL contains `DROP TABLE
4829    /// "widgets"`. If the disk-walk regressed and `compose_with_inputs`
4830    /// only loaded snapshots for inventory-known buckets, the differ
4831    /// would never see billing's snapshot and the compose would exit
4832    /// `NothingToCompose` (no DROP, no SQL written).
4833    /// End-to-end regression guard.
4834    #[test]
4835    fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
4836        use djogi::migrate::projection::BucketKey;
4837        use djogi::migrate::schema::{
4838            ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
4839        };
4840        use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
4841        use std::collections::BTreeMap;
4842
4843        let work = temp_workspace("b1r2_compose_uses_discovery");
4844
4845        // Build a billing-bucket snapshot with one `widgets` table
4846        // and write it to disk under `migrations/main/billing/`.
4847        let billing_bucket = BucketKey {
4848            database: "main".into(),
4849            app: "billing".into(),
4850        };
4851        let mut billing_snap = AppliedSchema {
4852            djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4853            enums: BTreeMap::new(),
4854            format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
4855            generated_at: "2026-04-25T00:00:00Z".to_string(),
4856            indexes: Vec::new(),
4857            models: BTreeMap::new(),
4858            registered_apps: vec!["billing".to_string()],
4859        };
4860        billing_snap.models.insert(
4861            "widgets".to_string(),
4862            TableSchema {
4863                app: Some("billing".to_string()),
4864                columns: vec![ColumnSchema {
4865                    check: None,
4866                    codec: None,
4867                    comment: None,
4868                    default_sql: Some("heerid_next_desc()".to_string()),
4869                    foreign_key: None,
4870                    generated: None,
4871                    identity: None,
4872                    index_type: None,
4873                    indexed: false,
4874                    max_length: None,
4875                    name: "id".to_string(),
4876                    nullable: false,
4877                    on_delete: None,
4878                    outbox_exclude: false,
4879                    rationale: None,
4880                    relation_kind: None,
4881                    renamed_from: None,
4882                    sequence_within: None,
4883                    sql_type: "BIGINT".to_string(),
4884                    unique: false,
4885                    type_change_using: None,
4886                }],
4887                exclusion_constraints: Vec::new(),
4888                fts: None,
4889                is_through: false,
4890                moved_from_app: None,
4891                partition: None,
4892                primary_key: PrimaryKeySchema {
4893                    columns: vec!["id".to_string()],
4894                    kind: PkKindSchema::HeerIdRecencyBiased,
4895                },
4896                rationale: None,
4897                renamed_from: None,
4898                rls_enabled: false,
4899                table: "widgets".to_string(),
4900                table_comment: None,
4901                storage_params: None,
4902                tablespace: None,
4903                tenant_key: None,
4904            },
4905        );
4906        let snap_path = snapshot_path(&work, &billing_bucket);
4907        save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
4908
4909        // EMPTY models — simulates the billing crate having been
4910        // removed from the workspace. Without the disk-walk this
4911        // bucket would never reach the differ.
4912        let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
4913        let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
4914
4915        let exit = compose_with_inputs(
4916            &work,
4917            "drop billing remnant",
4918            true,  // allow_destructive — billing's snapshot will produce DROP ops
4919            false, // force_overwrite
4920            &empty_models,
4921            &[AppLifecycle {
4922                label: "billing".to_string(),
4923                database: "main".to_string(),
4924                renamed_from: None,
4925                tombstone: true, // intentional removal channel
4926            }],
4927            now,
4928            None, // pk_flip_join_table_option — no flip in this test
4929        );
4930        assert_eq!(exit, ExitCode::from(0), "compose must succeed");
4931
4932        // The composed up SQL must carry DROP TABLE for billing's
4933        // widgets — that is the whole point. Find the file and check.
4934        let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
4935        let mut up_path: Option<PathBuf> = None;
4936        for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
4937            let n = entry.file_name().to_string_lossy().to_string();
4938            // Up file pattern: starts with "V", ends with ".sdjql", does
4939            // NOT contain ".down.".
4940            if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
4941                up_path = Some(entry.path());
4942                break;
4943            }
4944        }
4945        let up_path = up_path.expect("compose must have written an up SQL file");
4946        let up_sql = fs::read_to_string(&up_path).unwrap();
4947        assert!(
4948            up_sql.contains("DROP TABLE \"widgets\""),
4949            "compose must have seen the disk snapshot and emitted DROP TABLE — \
4950             this proves discover_snapshot_buckets_on_disk reached the differ. \
4951             SQL: {up_sql}"
4952        );
4953        let _ = fs::remove_dir_all(&work);
4954    }
4955
4956    /// A cross-bucket foreign-key cycle surfaced by `compose` must map to
4957    /// exit code 2 through `compose_with_inputs` — an operator-actionable
4958    /// refusal, not the exit-1 unexpected-error catch-all. The cycle is
4959    /// injected at the model level: app `a`'s table references app `b`'s
4960    /// table and vice versa, so no slice apply order satisfies both FKs
4961    /// and `compose` returns
4962    /// [`ComposeError::CrossBucketForeignKeyCycle`]. Before the dedicated
4963    /// arm was added this fell through to the catch-all and exited 1.
4964    #[test]
4965    fn compose_cycle_exits_with_code_two() {
4966        use djogi::migrate::projection::BucketKey;
4967        use djogi::migrate::schema::{
4968            AppliedSchema, ColumnSchema, ForeignKeySchema, OnDeleteSchema, PkKindSchema,
4969            PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
4970        };
4971        use std::collections::BTreeMap;
4972
4973        let work = temp_workspace("compose_cycle_exit_two");
4974
4975        // A column that foreign-keys to `target_table.id`.
4976        let fk_col = |name: &str, target_table: &str| -> ColumnSchema {
4977            ColumnSchema {
4978                name: name.to_string(),
4979                sql_type: "BIGINT".to_string(),
4980                foreign_key: Some(ForeignKeySchema {
4981                    deferrable: false,
4982                    initially_deferred: false,
4983                    on_delete: OnDeleteSchema::Restrict,
4984                    ref_column: "id".to_string(),
4985                    ref_table: target_table.to_string(),
4986                }),
4987                ..default_col()
4988            }
4989        };
4990
4991        // A table with a HeerId PK `id` column and one FK column.
4992        let table_with_fk =
4993            |app: &str, table: &str, fk_name: &str, fk_target: &str| -> TableSchema {
4994                let id_col = ColumnSchema {
4995                    name: "id".to_string(),
4996                    sql_type: "BIGINT".to_string(),
4997                    default_sql: Some("heerid_next_desc()".to_string()),
4998                    ..default_col()
4999                };
5000                TableSchema {
5001                    app: Some(app.to_string()),
5002                    columns: vec![id_col, fk_col(fk_name, fk_target)],
5003                    primary_key: PrimaryKeySchema {
5004                        columns: vec!["id".to_string()],
5005                        kind: PkKindSchema::HeerIdRecencyBiased,
5006                    },
5007                    table: table.to_string(),
5008                    ..default_table()
5009                }
5010            };
5011
5012        let schema_for =
5013            |app: &str, table: &str, fk_name: &str, fk_target: &str| -> AppliedSchema {
5014                let mut models = BTreeMap::new();
5015                models.insert(
5016                    table.to_string(),
5017                    table_with_fk(app, table, fk_name, fk_target),
5018                );
5019                AppliedSchema {
5020                    djogi_version: env!("CARGO_PKG_VERSION").to_string(),
5021                    enums: BTreeMap::new(),
5022                    format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
5023                    generated_at: "2026-06-10T00:00:00Z".to_string(),
5024                    indexes: Vec::new(),
5025                    models,
5026                    registered_apps: vec![app.to_string()],
5027                }
5028            };
5029
5030        let a_bucket = BucketKey {
5031            database: "main".into(),
5032            app: "a".into(),
5033        };
5034        let b_bucket = BucketKey {
5035            database: "main".into(),
5036            app: "b".into(),
5037        };
5038
5039        // a.table_a.b_id → b.table_b ; b.table_b.a_id → a.table_a (cycle).
5040        let mut models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
5041        models.insert(a_bucket, schema_for("a", "table_a", "b_id", "table_b"));
5042        models.insert(b_bucket, schema_for("b", "table_b", "a_id", "table_a"));
5043
5044        let now = time::OffsetDateTime::from_unix_timestamp(1_749_513_600).unwrap();
5045        let exit = compose_with_inputs(
5046            &work,
5047            "cross-bucket cycle",
5048            false, // allow_destructive — irrelevant; the cycle refuses first
5049            false, // force_overwrite
5050            &models,
5051            &[
5052                AppLifecycle {
5053                    label: "a".to_string(),
5054                    database: "main".to_string(),
5055                    renamed_from: None,
5056                    tombstone: false,
5057                },
5058                AppLifecycle {
5059                    label: "b".to_string(),
5060                    database: "main".to_string(),
5061                    renamed_from: None,
5062                    tombstone: false,
5063                },
5064            ],
5065            now,
5066            None, // pk_flip_join_table_option — no flip in this test
5067        );
5068
5069        assert_eq!(
5070            exit,
5071            ExitCode::from(2),
5072            "a cross-bucket FK cycle must exit 2 (operator-actionable refusal), not 1"
5073        );
5074        let _ = fs::remove_dir_all(&work);
5075    }
5076
5077    /// `status_cmd` invokes its tokio runtime and
5078    /// fails fast on a malformed `Djogi.toml`. We don't need a live
5079    /// Postgres for this assertion — the test is that the workspace
5080    /// path is threaded through the loader and TOML errors surface
5081    /// promptly. (The earlier `a1_load_from_workspace_reads_path_specific_djogi_toml`
5082    /// covers the well-formed case; this is the malformed-input
5083    /// path-threading proof.)
5084    #[test]
5085    fn a1_round2_status_cmd_threads_workspace_to_config() {
5086        let work = temp_workspace("a1r2_status_workspace");
5087        // Write a deliberately malformed TOML so config load fails.
5088        // If the workspace path wasn't threaded, status_cmd would
5089        // try the cwd's Djogi.toml (typically absent) and silently
5090        // fall through to defaults, giving a different error code.
5091        fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
5092        let exit = status_cmd(Some(work.clone()));
5093        assert_eq!(
5094            exit,
5095            ExitCode::from(1),
5096            "malformed workspace Djogi.toml must surface as config load error"
5097        );
5098        let _ = fs::remove_dir_all(&work);
5099    }
5100
5101    // ── AttuneError → exit code matrix ──────────────────────────────
5102
5103    /// Every `AttuneError::Refused(_)` variant must map to exit code `2`
5104    /// per `docs/spec/configuration.md` §14. The pre-fix implementation
5105    /// flattened every error to `1`, so an operator running attune in CI
5106    /// could not distinguish "policy gate refused before any side effect"
5107    /// from "ran half a step and failed mid-flight".
5108    #[test]
5109    fn u3_attune_refusal_variants_map_to_exit_code_two() {
5110        use djogi::migrate::AttuneRefusal;
5111        let cases = [
5112            AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
5113                database_url: "postgres://prod.example.com/main".to_string(),
5114            }),
5115            AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
5116                profile: "production".to_string(),
5117            }),
5118            // Dev_mode and DJOGI_ENV gates both produce `AttuneError::Refused(_)`
5119            // so they share the exit-code-2 mapping.
5120            AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
5121            AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
5122                env_value: "production".to_string(),
5123            }),
5124            AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
5125                version: "V20260101000000__missing".to_string(),
5126            }),
5127            AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
5128                version: "V20260101000000__shared".to_string(),
5129                buckets: vec!["main/users".to_string(), "main/billing".to_string()],
5130            }),
5131        ];
5132        for err in &cases {
5133            assert_eq!(
5134                attune_error_exit_code(err),
5135                2,
5136                "refusal variant must map to exit 2: {err}"
5137            );
5138        }
5139    }
5140
5141    /// Every runtime `AttuneError` variant must map to exit code `1`
5142    /// per `docs/spec/configuration.md` §14. CI may safely retry runtime
5143    /// failures; a refusal (exit `2`) signals "operator must intervene"
5144    /// and retrying without operator action would just refuse again.
5145    #[test]
5146    fn u3_attune_runtime_variants_map_to_exit_code_one() {
5147        let cases = [
5148            AttuneError::FilesystemScanFailed {
5149                source: std::io::Error::other("disk full"),
5150            },
5151            AttuneError::SqlReadFailed {
5152                path: PathBuf::from("/tmp/x.sdjql"),
5153                source: std::io::Error::other("permission denied"),
5154            },
5155            AttuneError::SqlWriteFailed {
5156                path: PathBuf::from("/tmp/x.sdjql"),
5157                source: std::io::Error::other("read-only fs"),
5158            },
5159            AttuneError::SqlDeleteFailed {
5160                path: PathBuf::from("/tmp/x.sdjql"),
5161                source: std::io::Error::other("not found"),
5162            },
5163            AttuneError::GitPublishFailed {
5164                stderr: "fatal: refusing to push".to_string(),
5165                status_code: Some(128),
5166            },
5167        ];
5168        for err in &cases {
5169            assert_eq!(
5170                attune_error_exit_code(err),
5171                1,
5172                "runtime variant must map to exit 1: {err}"
5173            );
5174        }
5175    }
5176
5177    // ── issue #354: baseline exit-code mapping ──────────────────────────
5178
5179    /// The refusal-class `RunnerError` variants the baseline path can
5180    /// `baseline_cmd` validates the `--reason` guard before any DB
5181    /// work. An empty or whitespace-only reason must return exit 2
5182    /// without touching the filesystem or network — the guard fires
5183    /// on the CLI-owned string before the tokio runtime is even built.
5184    #[test]
5185    fn baseline_empty_reason_exits_code_2() {
5186        let result = baseline_cmd(
5187            "V00000000000000__baseline",
5188            "description",
5189            "",
5190            None,
5191            None,
5192            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5193            None,  // node_id
5194            false, // single_node_dev
5195        );
5196        assert_eq!(
5197            result,
5198            ExitCode::from(2),
5199            "empty --reason must exit 2 before any DB work"
5200        );
5201    }
5202
5203    #[test]
5204    fn baseline_whitespace_reason_exits_code_2() {
5205        let result = baseline_cmd(
5206            "V00000000000000__baseline",
5207            "description",
5208            "   ",
5209            None,
5210            None,
5211            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5212            None,  // node_id
5213            false, // single_node_dev
5214        );
5215        assert_eq!(
5216            result,
5217            ExitCode::from(2),
5218            "whitespace-only --reason must exit 2 before any DB work"
5219        );
5220    }
5221
5222    /// surface must map to exit `2` — a blind retry would hit the same
5223    /// condition, so CI must treat them as "operator must intervene"
5224    /// rather than retryable. A duplicate baseline version (terminal or
5225    /// non-terminal), a wiring bug that supplies a snapshot, and an
5226    /// out-of-order rejection are all refusals.
5227    #[test]
5228    fn baseline_refusal_variants_map_to_exit_code_two() {
5229        let cases = [
5230            RunnerError::VersionAlreadyApplied {
5231                version: "V00000000000000__baseline".to_string(),
5232                applied_at: None,
5233            },
5234            RunnerError::VersionCollisionNonTerminal {
5235                version: "V00000000000000__baseline".to_string(),
5236                status: LedgerStatus::Pending,
5237                run_id: 1,
5238            },
5239            RunnerError::BaselineSnapshotShouldNotBeProvided,
5240            RunnerError::AdvisoryUnlockReturnedFalse {
5241                bucket: BucketKey {
5242                    database: "main".to_string(),
5243                    app: String::new(),
5244                },
5245                key: 0x0102_0304_0506_0708,
5246            },
5247            RunnerError::OutOfOrderRejected {
5248                version: "V00000000000000__baseline".to_string(),
5249                conflicting_version: "V20260101000000__later".to_string(),
5250                conflicting_applied_at: None,
5251            },
5252        ];
5253        for err in &cases {
5254            assert_eq!(
5255                baseline_error_exit_code(err),
5256                2,
5257                "baseline refusal variant must map to exit 2: {err}"
5258            );
5259        }
5260    }
5261
5262    /// Transient `RunnerError` variants reachable from the baseline path
5263    /// must map to exit `1` (retryable). The `#[non_exhaustive]`
5264    /// wildcard arm guarantees any unnamed variant also lands on `1`;
5265    /// these representative cases pin the projection / ledger / snapshot
5266    /// failure shapes the baseline runner can actually emit.
5267    #[test]
5268    fn baseline_transient_variants_map_to_exit_code_one() {
5269        use djogi::error::{DbError, DjogiError};
5270        let cases = [
5271            RunnerError::LedgerBootstrapFailed {
5272                source: DjogiError::Db(DbError::other("create table failed")),
5273            },
5274            RunnerError::LedgerWriteFailed {
5275                version: "V00000000000000__baseline".to_string(),
5276                source: DjogiError::Db(DbError::other("insert failed")),
5277            },
5278            RunnerError::PinnedSessionCheckoutFailed {
5279                source: DjogiError::Db(DbError::other("pool exhausted")),
5280            },
5281            RunnerError::AdvisoryLockFailed {
5282                bucket: BucketKey {
5283                    database: "main".to_string(),
5284                    app: String::new(),
5285                },
5286                key: 0x0102_0304_0506_0708,
5287                attempts: 3,
5288            },
5289        ];
5290        for err in &cases {
5291            assert_eq!(
5292                baseline_error_exit_code(err),
5293                1,
5294                "baseline transient variant must map to exit 1: {err}"
5295            );
5296        }
5297    }
5298
5299    // ── REQ-326: --fake / --reason validation tests ─────────────────────
5300
5301    /// REQ-326-5: --fake without --reason must exit with code 2.
5302    #[test]
5303    fn fake_without_reason_exits_code_2() {
5304        let result = apply_cmd(
5305            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5306            true,
5307            None,
5308            None,  // node_id
5309            false, // single_node_dev
5310        );
5311        assert_eq!(
5312            result,
5313            ExitCode::from(2),
5314            "--fake without --reason must exit 2"
5315        );
5316    }
5317
5318    /// REQ-326-5: --fake with blank reason must exit with code 2.
5319    #[test]
5320    fn fake_with_empty_reason_exits_code_2() {
5321        let result = apply_cmd(
5322            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5323            true,
5324            Some(String::new()),
5325            None,  // node_id
5326            false, // single_node_dev
5327        );
5328        assert_eq!(
5329            result,
5330            ExitCode::from(2),
5331            "--fake with empty reason must exit 2"
5332        );
5333    }
5334
5335    /// REQ-326-5: --fake with whitespace-only reason must exit with code 2.
5336    #[test]
5337    fn fake_with_whitespace_reason_exits_code_2() {
5338        let result = apply_cmd(
5339            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5340            true,
5341            Some("   ".to_string()),
5342            None,  // node_id
5343            false, // single_node_dev
5344        );
5345        assert_eq!(
5346            result,
5347            ExitCode::from(2),
5348            "--fake with whitespace reason must exit 2"
5349        );
5350    }
5351
5352    /// --reason without --fake is accepted (silently ignored).
5353    #[test]
5354    fn reason_without_fake_is_accepted() {
5355        // This should NOT exit 2; it will proceed to config load which
5356        // may fail on nonexistent workspace, but the --reason flag itself
5357        // is accepted. We verify the function does not early-exit with code 2.
5358        let result = apply_cmd(
5359            Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5360            false, // NOT fake
5361            Some("test reason".to_string()),
5362            None, // node_id — identity resolution is tested separately;
5363            true, // single_node_dev — provide explicit dev mode to bypass resolver
5364        );
5365        // Should be 1 (config error) not 2 (refusal)
5366        assert_ne!(
5367            result,
5368            ExitCode::from(2),
5369            "--reason without --fake should not refuse"
5370        );
5371    }
5372
5373    // ── render_verify_report ─────────────────────────────────────
5374    // `render_verify_report` returns `Vec<String>` so the rendering is
5375    // assertable without capturing stdout. Each test pins the exact lines
5376    // the operator sees for one report shape.
5377
5378    /// Build a bucket for render tests.
5379    fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
5380        djogi::migrate::BucketKey {
5381            database: database.to_string(),
5382            app: app.to_string(),
5383        }
5384    }
5385
5386    /// Construct a [`VerifyDiagnostic`] tersely for render tests.
5387    fn diag(
5388        code: &str,
5389        severity: djogi::migrate::VerifySeverity,
5390        message: &str,
5391        location: Option<&str>,
5392    ) -> djogi::migrate::VerifyDiagnostic {
5393        djogi::migrate::VerifyDiagnostic {
5394            code: code.to_string(),
5395            severity,
5396            message: message.to_string(),
5397            location: location.map(str::to_string),
5398        }
5399    }
5400
5401    #[test]
5402    fn render_verify_report_clean_output() {
5403        use djogi::migrate::VerifyReport;
5404
5405        let report = VerifyReport {
5406            diagnostics: vec![],
5407            latest_applied_version: Some("001_initial".to_string()),
5408            applied_count: 3,
5409            unfinished_count: 0,
5410        };
5411        let bucket = render_bucket("main", "");
5412
5413        let lines = render_verify_report(&report, &bucket);
5414
5415        assert!(
5416            lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
5417            "missing ledger line; got {lines:?}"
5418        );
5419        assert!(
5420            lines.contains(&"No drift detected. Schema is consistent.".to_string()),
5421            "missing clean line; got {lines:?}"
5422        );
5423        assert!(
5424            lines.iter().any(|l| l.contains("Result: PASSED")),
5425            "missing PASSED result; got {lines:?}"
5426        );
5427        assert!(
5428            !lines.iter().any(|l| l.contains("FAILED")),
5429            "clean report must not say FAILED; got {lines:?}"
5430        );
5431    }
5432
5433    #[test]
5434    fn render_verify_report_with_errors() {
5435        use djogi::migrate::{VerifyReport, VerifySeverity};
5436
5437        // Diagnostics are pre-sorted by `(code, location)` exactly as the
5438        // library returns them — render does not re-sort.
5439        let report = VerifyReport {
5440            diagnostics: vec![
5441                diag(
5442                    "D601",
5443                    VerifySeverity::Error,
5444                    "Snapshot table missing from live DB",
5445                    Some("users"),
5446                ),
5447                diag(
5448                    "D611",
5449                    VerifySeverity::Warning,
5450                    "Live index not present in snapshot",
5451                    Some("idx_posts_created"),
5452                ),
5453            ],
5454            latest_applied_version: Some("V20260501000000__add_users".to_string()),
5455            applied_count: 2,
5456            unfinished_count: 0,
5457        };
5458        let bucket = render_bucket("main", "myapp");
5459
5460        assert!(report.has_errors());
5461        let lines = render_verify_report(&report, &bucket);
5462
5463        assert!(
5464            lines
5465                .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
5466            "missing D601 line; got {lines:?}"
5467        );
5468        assert!(
5469            lines.contains(
5470                &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
5471            ),
5472            "missing D611 line; got {lines:?}"
5473        );
5474        assert!(
5475            lines.iter().any(|l| l.contains("Result: FAILED")),
5476            "error report must say FAILED; got {lines:?}"
5477        );
5478    }
5479
5480    #[test]
5481    fn render_verify_report_header_shows_global_and_named_app() {
5482        use djogi::migrate::VerifyReport;
5483
5484        let report = VerifyReport {
5485            diagnostics: vec![],
5486            latest_applied_version: None,
5487            applied_count: 0,
5488            unfinished_count: 0,
5489        };
5490
5491        // Empty app label → `_global_` in the header.
5492        let global = render_verify_report(&report, &render_bucket("main", ""));
5493        assert_eq!(
5494            global.first().map(String::as_str),
5495            Some("djogi migrations verify — main/_global_"),
5496            "global bucket header; got {global:?}"
5497        );
5498
5499        // Named app → the label verbatim in the header.
5500        let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
5501        assert_eq!(
5502            named.first().map(String::as_str),
5503            Some("djogi migrations verify — crud_log/billing"),
5504            "named bucket header; got {named:?}"
5505        );
5506    }
5507
5508    #[test]
5509    fn render_verify_report_warning_only_passes_with_warnings() {
5510        use djogi::migrate::{VerifyReport, VerifySeverity};
5511
5512        let report = VerifyReport {
5513            diagnostics: vec![diag(
5514                "D606",
5515                VerifySeverity::Warning,
5516                "type differs (advisory)",
5517                Some("users.age"),
5518            )],
5519            latest_applied_version: Some("001_initial".to_string()),
5520            applied_count: 1,
5521            unfinished_count: 0,
5522        };
5523        let lines = render_verify_report(&report, &render_bucket("main", ""));
5524
5525        assert!(
5526            lines
5527                .iter()
5528                .any(|l| l.contains("Result: PASSED with warnings")),
5529            "warning-only must PASS with warnings; got {lines:?}"
5530        );
5531        assert!(
5532            !lines.iter().any(|l| l.contains("FAILED")),
5533            "warning-only must not say FAILED; got {lines:?}"
5534        );
5535    }
5536
5537    #[test]
5538    fn render_verify_report_empty_ledger_line() {
5539        use djogi::migrate::VerifyReport;
5540
5541        let report = VerifyReport {
5542            diagnostics: vec![],
5543            latest_applied_version: None,
5544            applied_count: 0,
5545            unfinished_count: 0,
5546        };
5547        let lines = render_verify_report(&report, &render_bucket("main", ""));
5548
5549        assert!(
5550            lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
5551            "empty ledger line; got {lines:?}"
5552        );
5553    }
5554
5555    #[test]
5556    fn render_verify_report_unfinished_ledger_line() {
5557        use djogi::migrate::VerifyReport;
5558
5559        let report = VerifyReport {
5560            diagnostics: vec![],
5561            latest_applied_version: Some("V20260501000000__add_users".to_string()),
5562            applied_count: 2,
5563            unfinished_count: 1,
5564        };
5565        let lines = render_verify_report(&report, &render_bucket("main", ""));
5566
5567        assert!(
5568            lines.contains(
5569                &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
5570            ),
5571            "unfinished ledger line; got {lines:?}"
5572        );
5573    }
5574
5575    #[test]
5576    fn render_verify_report_info_with_no_location_uses_dash() {
5577        use djogi::migrate::{VerifyReport, VerifySeverity};
5578
5579        // An Info diagnostic with `location: None` exercises the
5580        // `unwrap_or("-")` path, and the all-info summary line.
5581        let report = VerifyReport {
5582            diagnostics: vec![diag(
5583                "D692",
5584                VerifySeverity::Info,
5585                "enum type(s) declared; not yet checked",
5586                None,
5587            )],
5588            latest_applied_version: Some("001_initial".to_string()),
5589            applied_count: 1,
5590            unfinished_count: 0,
5591        };
5592        let lines = render_verify_report(&report, &render_bucket("main", ""));
5593
5594        assert!(
5595            lines.iter().any(|l| l.contains("(-)")),
5596            "location: None must render as (-); got {lines:?}"
5597        );
5598        assert!(
5599            lines.contains(&"Result: PASSED (1 info(s))".to_string()),
5600            "all-info summary; got {lines:?}"
5601        );
5602    }
5603
5604    // ── resolve_bucket_url (Class A) ─────────────────────────────────────
5605
5606    fn db_config(
5607        url: &str,
5608        crud_log_url: Option<&str>,
5609        event_log_url: Option<&str>,
5610    ) -> djogi::config::DatabaseConfig {
5611        djogi::config::DatabaseConfig {
5612            url: url.to_string(),
5613            crud_log_url: crud_log_url.map(str::to_string),
5614            event_log_url: event_log_url.map(str::to_string),
5615            max_connections: None,
5616            dev_mode: false,
5617        }
5618    }
5619
5620    #[test]
5621    fn resolve_bucket_url_main_uses_app_url_verbatim() {
5622        // "main" must use the app URL verbatim even when the path
5623        // component is not literally "main" — deriving would target a
5624        // database that does not exist.
5625        let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
5626        assert_eq!(
5627            resolve_bucket_url(&cfg, "main").as_deref(),
5628            Some("postgres://user:pass@localhost:5432/myapp_prod"),
5629            "main must return the app URL unchanged"
5630        );
5631    }
5632
5633    #[test]
5634    fn resolve_bucket_url_crud_log_prefers_explicit_url() {
5635        let cfg = db_config(
5636            "postgres://localhost/main",
5637            Some("postgres://localhost/explicit_crud"),
5638            None,
5639        );
5640        assert_eq!(
5641            resolve_bucket_url(&cfg, "crud_log").as_deref(),
5642            Some("postgres://localhost/explicit_crud"),
5643            "crud_log must prefer the explicit crud_log_url"
5644        );
5645    }
5646
5647    #[test]
5648    fn resolve_bucket_url_event_log_prefers_explicit_url() {
5649        let cfg = db_config(
5650            "postgres://localhost/main",
5651            None,
5652            Some("postgres://localhost/explicit_event"),
5653        );
5654        assert_eq!(
5655            resolve_bucket_url(&cfg, "event_log").as_deref(),
5656            Some("postgres://localhost/explicit_event"),
5657            "event_log must prefer the explicit event_log_url"
5658        );
5659    }
5660
5661    #[test]
5662    fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
5663        // An empty explicit URL is treated as absent — derive from the app
5664        // URL's path component instead.
5665        let cfg = db_config("postgres://localhost/main", Some(""), Some("   "));
5666        // crud_log: empty string → derive.
5667        assert_eq!(
5668            resolve_bucket_url(&cfg, "crud_log").as_deref(),
5669            Some("postgres://localhost/crud_log"),
5670            "empty crud_log_url must fall back to derived"
5671        );
5672        // event_log: whitespace is NOT empty, so it is used verbatim — the
5673        // emptiness check is a strict `is_empty`, matching the spec.
5674        assert_eq!(
5675            resolve_bucket_url(&cfg, "event_log").as_deref(),
5676            Some("   "),
5677            "non-empty (whitespace) event_log_url is used verbatim"
5678        );
5679    }
5680
5681    #[test]
5682    fn resolve_bucket_url_other_database_derives_from_app_url() {
5683        let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
5684        assert_eq!(
5685            resolve_bucket_url(&cfg, "analytics").as_deref(),
5686            Some("postgres://user:pass@localhost:5432/analytics"),
5687            "an arbitrary database name derives by path splice"
5688        );
5689    }
5690
5691    #[test]
5692    fn resolve_bucket_url_pathless_url_returns_none() {
5693        // A URL with no recognisable path component cannot be derived.
5694        let cfg = db_config("postgres://localhost", None, None);
5695        assert_eq!(
5696            resolve_bucket_url(&cfg, "crud_log"),
5697            None,
5698            "pathless URL must yield None for a derived database"
5699        );
5700    }
5701
5702    #[test]
5703    fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
5704        // "main" short-circuits before derivation, so even a pathless URL
5705        // returns it verbatim — the app pool is the operator's to define.
5706        let cfg = db_config("postgres://localhost", None, None);
5707        assert_eq!(
5708            resolve_bucket_url(&cfg, "main").as_deref(),
5709            Some("postgres://localhost"),
5710            "main returns the app URL verbatim regardless of path"
5711        );
5712    }
5713
5714    #[test]
5715    fn resolve_apply_target_urls_uses_pending_bucket_databases() {
5716        let work = temp_workspace("apply_target_urls");
5717        write_pending_json(
5718            &djogi::migrate::pending_json_path(
5719                &work,
5720                &BucketKey {
5721                    database: "main".to_string(),
5722                    app: String::new(),
5723                },
5724            ),
5725            "main",
5726            "",
5727            "V20260607010101__main_global",
5728            &[],
5729        );
5730        write_pending_json(
5731            &djogi::migrate::pending_json_path(
5732                &work,
5733                &BucketKey {
5734                    database: "crud_log".to_string(),
5735                    app: "audit".to_string(),
5736                },
5737            ),
5738            "crud_log",
5739            "audit",
5740            "V20260607010102__crud_log_audit",
5741            &[],
5742        );
5743
5744        let discovered = discover_pending_plans(&work).expect("discover");
5745        let cfg = db_config(
5746            "postgres://user:pass@localhost:5432/myapp_prod",
5747            Some("postgres://user:pass@localhost:5432/myapp_crud"),
5748            None,
5749        );
5750
5751        let urls = resolve_apply_target_urls(&discovered, &cfg).expect("resolve");
5752        assert_eq!(
5753            urls.len(),
5754            2,
5755            "apply must preserve distinct target databases"
5756        );
5757        assert_eq!(
5758            urls.get("main").map(String::as_str),
5759            Some("postgres://user:pass@localhost:5432/myapp_prod"),
5760            "main pending plans must keep the app database URL"
5761        );
5762        assert_eq!(
5763            urls.get("crud_log").map(String::as_str),
5764            Some("postgres://user:pass@localhost:5432/myapp_crud"),
5765            "crud_log pending plans must route through the crud_log database URL"
5766        );
5767        let _ = fs::remove_dir_all(&work);
5768    }
5769
5770    #[test]
5771    fn resolve_apply_target_urls_refuses_unresolvable_pending_database() {
5772        let work = temp_workspace("apply_target_urls_unresolvable");
5773        write_pending_json(
5774            &djogi::migrate::pending_json_path(
5775                &work,
5776                &BucketKey {
5777                    database: "analytics".to_string(),
5778                    app: String::new(),
5779                },
5780            ),
5781            "analytics",
5782            "",
5783            "V20260607010103__analytics_global",
5784            &[],
5785        );
5786
5787        let discovered = discover_pending_plans(&work).expect("discover");
5788        let cfg = db_config("postgres://localhost", None, None);
5789        let err = resolve_apply_target_urls(&discovered, &cfg)
5790            .expect_err("pathless app URL must refuse a derived pending database");
5791        assert!(err.contains("analytics"), "unexpected error: {err}");
5792        let _ = fs::remove_dir_all(&work);
5793    }
5794
5795    // ── Stage 4D: CLI cleanup identity-free Phase 0 guard ─────────────
5796
5797    #[test]
5798    fn classify_phase_zero_bytes_identity_free_production_is_ok() {
5799        let sql = current_production_phase_zero_sql("current_bytes");
5800        assert!(
5801            classify_phase_zero_bytes(sql.as_bytes()).is_none(),
5802            "production Phase 0 should be identity-free replay-current (no refusal)"
5803        );
5804    }
5805
5806    #[test]
5807    fn classify_phase_zero_bytes_seed_capable_is_refused() {
5808        let sql = seed_capable_phase_zero_sql();
5809        let refusal = classify_phase_zero_bytes(sql.as_bytes());
5810        assert!(
5811            refusal.is_some(),
5812            "seed-capable Phase 0 should be refused by cleanup guard"
5813        );
5814        assert!(refusal.unwrap().contains("seed-capable"));
5815    }
5816
5817    #[test]
5818    fn classify_phase_zero_bytes_generated_stale_is_refused() {
5819        let sql = generated_stale_phase_zero_sql("stale_bytes");
5820        let refusal = classify_phase_zero_bytes(sql.as_bytes());
5821        assert!(
5822            refusal.is_some(),
5823            "generated-stale Phase 0 should be refused"
5824        );
5825        assert!(refusal.unwrap().contains("generated-stale"));
5826    }
5827
5828    #[test]
5829    fn classify_phase_zero_bytes_markerless_seed_is_refused() {
5830        let sql = markerless_seed_phase_zero_sql("markerless_seed_bytes");
5831        let refusal = classify_phase_zero_bytes(sql.as_bytes());
5832        assert!(
5833            refusal.is_some(),
5834            "markerless seed Phase 0 should be refused by cleanup guard"
5835        );
5836        assert!(refusal.unwrap().contains("seed-dml"));
5837    }
5838
5839    #[test]
5840    fn classify_phase_zero_bytes_extended_seed_dml_forms_are_refused() {
5841        for (name, statement) in extended_seed_statement_cases() {
5842            let sql =
5843                phase_zero_with_seed_statement(&format!("extended_seed_bytes_{name}"), statement);
5844            let refusal = classify_phase_zero_bytes(sql.as_bytes());
5845            let msg = refusal.expect("extended seed Phase 0 should be refused");
5846            assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
5847        }
5848    }
5849
5850    #[test]
5851    fn classify_phase_zero_bytes_ambiguous_is_refused() {
5852        // Hand-edited or ambiguous Phase 0.
5853        let sql = "CREATE SCHEMA IF NOT EXISTS heer;\n\
5854                   ALTER DATABASE \"mydb\" SET heer.node_id = '1';\n";
5855        let refusal = classify_phase_zero_bytes(sql.as_bytes());
5856        assert!(refusal.is_some(), "ambiguous Phase 0 should be refused");
5857        assert!(refusal.unwrap().contains("ambiguous"));
5858    }
5859
5860    #[test]
5861    fn classify_phase_zero_bytes_missing_is_refused() {
5862        let refusal = classify_phase_zero_bytes(b"  \n\t  ");
5863        assert!(refusal.is_some(), "missing Phase 0 should be refused");
5864        assert!(refusal.unwrap().contains("missing"));
5865    }
5866
5867    #[test]
5868    fn classify_phase_zero_for_cleanup_refuses_stale_replay_plan() {
5869        let work = temp_workspace("stale_cleanup");
5870        let bucket_dir = work.join("migrations/main/_global_");
5871        fs::create_dir_all(&bucket_dir).unwrap();
5872
5873        // Write a stale replay plan JSON.
5874        let replay = CliReplayPlan {
5875            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
5876            classification: CliClassification::Additive,
5877            checksum_up: "V1:aabbccdd".to_string(),
5878            checksum_down: None,
5879            segments: vec![CliReplaySegment {
5880                kind: CliSegmentKind::Transactional,
5881                statements: vec![CliReplayStatement {
5882                    label: "phase_zero_bootstrap".to_string(),
5883                    up: generated_stale_phase_zero_sql("stale_replay"),
5884                }],
5885            }],
5886        };
5887        fs::write(
5888            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
5889            serde_json::to_string(&replay).unwrap(),
5890        )
5891        .unwrap();
5892
5893        let bucket = djogi::migrate::BucketKey {
5894            database: "main".to_string(),
5895            app: String::new(),
5896        };
5897        let refusal = classify_phase_zero_for_cleanup(
5898            &work,
5899            &bucket,
5900            djogi::migrate::PHASE_ZERO_VERSION,
5901            "V1:aabbccdd",
5902            None,
5903        );
5904        assert!(
5905            refusal.is_some(),
5906            "stale Phase 0 replay plan should be refused by cleanup guard"
5907        );
5908        let msg = refusal.unwrap();
5909        assert!(msg.contains("generated-stale"), "refusal reason: {msg}");
5910
5911        let _ = fs::remove_dir_all(&work);
5912    }
5913
5914    #[test]
5915    fn classify_phase_zero_for_cleanup_allows_current_replay_plan() {
5916        let work = temp_workspace("current_cleanup");
5917        let bucket_dir = work.join("migrations/main/_global_");
5918        fs::create_dir_all(&bucket_dir).unwrap();
5919
5920        // Write a current (production) replay plan JSON.
5921        let replay = CliReplayPlan {
5922            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
5923            classification: CliClassification::Additive,
5924            checksum_up: "V1:eeff0011".to_string(),
5925            checksum_down: None,
5926            segments: vec![CliReplaySegment {
5927                kind: CliSegmentKind::Transactional,
5928                statements: vec![CliReplayStatement {
5929                    label: "phase_zero_bootstrap".to_string(),
5930                    up: current_production_phase_zero_sql("current_replay"),
5931                }],
5932            }],
5933        };
5934        fs::write(
5935            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
5936            serde_json::to_string(&replay).unwrap(),
5937        )
5938        .unwrap();
5939
5940        let bucket = djogi::migrate::BucketKey {
5941            database: "main".to_string(),
5942            app: String::new(),
5943        };
5944        let refusal = classify_phase_zero_for_cleanup(
5945            &work,
5946            &bucket,
5947            djogi::migrate::PHASE_ZERO_VERSION,
5948            "V1:eeff0011",
5949            None,
5950        );
5951        assert!(
5952            refusal.is_none(),
5953            "identity-free Phase 0 should be allowed by cleanup guard; got: {refusal:?}"
5954        );
5955
5956        let _ = fs::remove_dir_all(&work);
5957    }
5958
5959    #[test]
5960    fn classify_phase_zero_for_cleanup_refuses_seed_capable_replay_plan() {
5961        let work = temp_workspace("seed_cleanup_replay_plan");
5962        let bucket_dir = work.join("migrations/main/_global_");
5963        fs::create_dir_all(&bucket_dir).unwrap();
5964
5965        let replay = CliReplayPlan {
5966            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
5967            classification: CliClassification::Additive,
5968            checksum_up: "V1:11223344".to_string(),
5969            checksum_down: None,
5970            segments: vec![CliReplaySegment {
5971                kind: CliSegmentKind::Transactional,
5972                statements: vec![CliReplayStatement {
5973                    label: "phase_zero_bootstrap".to_string(),
5974                    up: seed_capable_phase_zero_sql(),
5975                }],
5976            }],
5977        };
5978        fs::write(
5979            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
5980            serde_json::to_string(&replay).unwrap(),
5981        )
5982        .unwrap();
5983
5984        let bucket = djogi::migrate::BucketKey {
5985            database: "main".to_string(),
5986            app: String::new(),
5987        };
5988        let refusal = classify_phase_zero_for_cleanup(
5989            &work,
5990            &bucket,
5991            djogi::migrate::PHASE_ZERO_VERSION,
5992            "V1:11223344",
5993            None,
5994        );
5995        let msg = refusal.expect("seed-capable replay plan must refuse");
5996        assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
5997
5998        let _ = fs::remove_dir_all(&work);
5999    }
6000
6001    #[test]
6002    fn classify_phase_zero_for_cleanup_refuses_markerless_seed_replay_plan() {
6003        let work = temp_workspace("markerless_seed_cleanup_replay_plan");
6004        let bucket_dir = work.join("migrations/main/_global_");
6005        fs::create_dir_all(&bucket_dir).unwrap();
6006
6007        let replay = CliReplayPlan {
6008            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
6009            classification: CliClassification::Additive,
6010            checksum_up: "V1:55667788".to_string(),
6011            checksum_down: None,
6012            segments: vec![CliReplaySegment {
6013                kind: CliSegmentKind::Transactional,
6014                statements: vec![CliReplayStatement {
6015                    label: "phase_zero_bootstrap".to_string(),
6016                    up: markerless_seed_phase_zero_sql("markerless_seed_replay"),
6017                }],
6018            }],
6019        };
6020        fs::write(
6021            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
6022            serde_json::to_string(&replay).unwrap(),
6023        )
6024        .unwrap();
6025
6026        let bucket = djogi::migrate::BucketKey {
6027            database: "main".to_string(),
6028            app: String::new(),
6029        };
6030        let refusal = classify_phase_zero_for_cleanup(
6031            &work,
6032            &bucket,
6033            djogi::migrate::PHASE_ZERO_VERSION,
6034            "V1:55667788",
6035            None,
6036        );
6037        let msg = refusal.expect("markerless seed replay plan must refuse");
6038        assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
6039
6040        let _ = fs::remove_dir_all(&work);
6041    }
6042
6043    #[test]
6044    fn classify_phase_zero_for_cleanup_refuses_cte_seed_dml_replay_plan() {
6045        let work = temp_workspace("cte_seed_cleanup_replay_plan");
6046        let bucket_dir = work.join("migrations/main/_global_");
6047        fs::create_dir_all(&bucket_dir).unwrap();
6048
6049        let replay = CliReplayPlan {
6050            format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
6051            classification: CliClassification::Additive,
6052            checksum_up: "V1:66778899".to_string(),
6053            checksum_down: None,
6054            segments: vec![CliReplaySegment {
6055                kind: CliSegmentKind::Transactional,
6056                statements: vec![CliReplayStatement {
6057                    label: "phase_zero_bootstrap".to_string(),
6058                    up: phase_zero_with_seed_statement(
6059                        "cte_seed_cleanup_replay",
6060                        "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
6061                    ),
6062                }],
6063            }],
6064        };
6065        fs::write(
6066            bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
6067            serde_json::to_string(&replay).unwrap(),
6068        )
6069        .unwrap();
6070
6071        let bucket = djogi::migrate::BucketKey {
6072            database: "main".to_string(),
6073            app: String::new(),
6074        };
6075        let refusal = classify_phase_zero_for_cleanup(
6076            &work,
6077            &bucket,
6078            djogi::migrate::PHASE_ZERO_VERSION,
6079            "V1:66778899",
6080            None,
6081        );
6082        let msg = refusal.expect("CTE seed replay plan must refuse");
6083        assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
6084
6085        let _ = fs::remove_dir_all(&work);
6086    }
6087
6088    #[test]
6089    fn classify_phase_zero_for_cleanup_fallback_sql_file() {
6090        let work = temp_workspace("fallback_cleanup");
6091        let bucket_dir = work.join("migrations/main/_global_");
6092        fs::create_dir_all(&bucket_dir).unwrap();
6093
6094        let up_sql = current_production_phase_zero_sql("fallback_sql");
6095        let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
6096        fs::write(bucket_dir.join(&up_filename), up_sql).unwrap();
6097
6098        let bucket = djogi::migrate::BucketKey {
6099            database: "main".to_string(),
6100            app: String::new(),
6101        };
6102        let refusal = classify_phase_zero_for_cleanup(
6103            &work,
6104            &bucket,
6105            djogi::migrate::PHASE_ZERO_VERSION,
6106            "V1:anychecksum",
6107            None,
6108        );
6109        assert!(
6110            refusal.is_none(),
6111            "identity-free Phase 0 fallback SQL should be allowed; got: {refusal:?}"
6112        );
6113
6114        let _ = fs::remove_dir_all(&work);
6115    }
6116
6117    #[test]
6118    fn classify_phase_zero_for_cleanup_refuses_seed_capable_fallback_sql_file() {
6119        let work = temp_workspace("seed_cleanup_fallback");
6120        let bucket_dir = work.join("migrations/main/_global_");
6121        fs::create_dir_all(&bucket_dir).unwrap();
6122
6123        let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
6124        fs::write(bucket_dir.join(&up_filename), seed_capable_phase_zero_sql()).unwrap();
6125
6126        let bucket = djogi::migrate::BucketKey {
6127            database: "main".to_string(),
6128            app: String::new(),
6129        };
6130        let refusal = classify_phase_zero_for_cleanup(
6131            &work,
6132            &bucket,
6133            djogi::migrate::PHASE_ZERO_VERSION,
6134            "V1:anychecksum",
6135            None,
6136        );
6137        let msg = refusal.expect("seed-capable fallback SQL must refuse");
6138        assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
6139
6140        let _ = fs::remove_dir_all(&work);
6141    }
6142
6143    #[test]
6144    fn classify_phase_zero_for_cleanup_refuses_markerless_seed_fallback_sql_file() {
6145        let work = temp_workspace("markerless_seed_cleanup_fallback");
6146        let bucket_dir = work.join("migrations/main/_global_");
6147        fs::create_dir_all(&bucket_dir).unwrap();
6148
6149        let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
6150        fs::write(
6151            bucket_dir.join(&up_filename),
6152            markerless_seed_phase_zero_sql("markerless_seed_fallback"),
6153        )
6154        .unwrap();
6155
6156        let bucket = djogi::migrate::BucketKey {
6157            database: "main".to_string(),
6158            app: String::new(),
6159        };
6160        let refusal = classify_phase_zero_for_cleanup(
6161            &work,
6162            &bucket,
6163            djogi::migrate::PHASE_ZERO_VERSION,
6164            "V1:anychecksum",
6165            None,
6166        );
6167        let msg = refusal.expect("markerless seed fallback SQL must refuse");
6168        assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
6169
6170        let _ = fs::remove_dir_all(&work);
6171    }
6172
6173    #[test]
6174    fn classify_phase_zero_for_cleanup_refuses_copy_from_seed_fallback_sql_file() {
6175        let work = temp_workspace("copy_seed_cleanup_fallback");
6176        let bucket_dir = work.join("migrations/main/_global_");
6177        fs::create_dir_all(&bucket_dir).unwrap();
6178
6179        let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
6180        fs::write(
6181            bucket_dir.join(&up_filename),
6182            phase_zero_with_seed_statement(
6183                "copy_seed_cleanup_fallback",
6184                "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
6185            ),
6186        )
6187        .unwrap();
6188
6189        let bucket = djogi::migrate::BucketKey {
6190            database: "main".to_string(),
6191            app: String::new(),
6192        };
6193        let refusal = classify_phase_zero_for_cleanup(
6194            &work,
6195            &bucket,
6196            djogi::migrate::PHASE_ZERO_VERSION,
6197            "V1:anychecksum",
6198            None,
6199        );
6200        let msg = refusal.expect("COPY FROM seed fallback SQL must refuse");
6201        assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
6202
6203        let _ = fs::remove_dir_all(&work);
6204    }
6205
6206    // ── per-app version-stream test ─────────────────────────────────
6207
6208    #[djogi::djogi_test]
6209    async fn check_ledger_state_is_app_scoped(mut ctx: djogi::context::DjogiContext) {
6210        use djogi::migrate::{ExecutionMode, LedgerRow, LedgerStatus};
6211
6212        // Bootstrap the ledger table so insert_pending works.
6213        djogi::migrate::bootstrap_ledger(&mut ctx)
6214            .await
6215            .expect("bootstrap");
6216
6217        // Seed one applied row for app "users" at version V.
6218        let row = LedgerRow {
6219            version: "V20260609000000__t397".into(),
6220            description: "test migration".into(),
6221            checksum_up: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
6222            checksum_down: Some(
6223                "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".into(),
6224            ),
6225            execution_mode: ExecutionMode::Transactional,
6226            status: LedgerStatus::Pending,
6227            execution_time_ms: 0,
6228            out_of_order_flag: false,
6229            applied_steps_count: 0,
6230            total_steps: None,
6231            partial_apply_note: None,
6232            run_id: 1,
6233            snapshot_version: "0".into(),
6234            app_label: "users".into(),
6235            leaf_identity: None,
6236        };
6237        let ledger_id = djogi::migrate::insert_pending_ledger_row(&mut ctx, &row)
6238            .await
6239            .expect("insert pending");
6240        djogi::migrate::mark_ledger_applied(&mut ctx, ledger_id, 10, 1)
6241            .await
6242            .expect("mark applied");
6243
6244        // Different app stream must be NotPresent.
6245        let state = check_ledger_state(&mut ctx, "V20260609000000__t397", "system").await;
6246        assert!(
6247            matches!(state, LedgerState::NotPresent),
6248            "different app stream must be NotPresent, got {state:?}",
6249        );
6250
6251        // Same app stream must be AlreadyApplied.
6252        let state = check_ledger_state(&mut ctx, "V20260609000000__t397", "users").await;
6253        assert!(
6254            matches!(state, LedgerState::AlreadyApplied),
6255            "same app stream must be AlreadyApplied, got {state:?}",
6256        );
6257    }
6258}