1use std::path::{Path, PathBuf};
10use std::process::ExitCode;
11
12use djogi::apps::AppRegistry;
13use djogi::migrate::{
14 AppLifecycle, AttuneError, AttuneMode, AttuneRequest, AutoEmitError, BootstrapError, BucketKey,
15 ComposeError, ComposeRequest, DescriptorProvider, DiffError, DriftBaseline,
16 GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan, RepairConfirmation,
17 RepairError, RepairReport, RollbackError, RunnerCtx, RunnerError, SnapshotError, SqlEmitError,
18 VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
19 compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
20 repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
21};
22
23use djogi::migrate::LedgerStatus;
25
26use crate::{PartialApplyResolutionCli, RepairSubcommand};
29
30#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
38struct CliReplayPlan {
39 format_version: String,
40 checksum_up: String,
41 checksum_down: Option<String>,
42 classification: CliClassification,
43 segments: Vec<CliReplaySegment>,
44}
45
46#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
47#[serde(tag = "kind", rename_all = "snake_case")]
48enum CliClassification {
49 NoOp,
50 Additive,
51 Reversible,
52 Destructive,
53 Lossy,
54 Unsupported {
55 reason: String,
56 },
57 PkTypeFlip {
58 co_destructive: bool,
59 co_lossy: bool,
60 },
61}
62
63#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
64struct CliReplaySegment {
65 kind: CliSegmentKind,
66 statements: Vec<CliReplayStatement>,
67}
68
69#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
70#[serde(rename_all = "snake_case")]
71enum CliSegmentKind {
72 Transactional,
73 NonTransactional,
74 MetadataOnly,
75}
76
77#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
78struct CliReplayStatement {
79 label: String,
80 up: String,
81}
82
83const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
85
86fn load_replay_plan_from_disk(
91 workspace: &Path,
92 bucket: &djogi::migrate::BucketKey,
93 version: &str,
94 pending_checksum_up: &str,
95 pending_checksum_down: Option<&str>,
96) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
97 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
99 let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
100
101 let sidecar_bytes = match std::fs::read(&replay_plan_path) {
102 Ok(bytes) => Some(bytes),
103 Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
104 Err(e) => {
105 return Err(ApplyReplayPlanError::PlanRead {
106 path: replay_plan_path.clone(),
107 source: e.to_string(),
108 });
109 }
110 };
111
112 if let Some(bytes) = sidecar_bytes {
113 let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
114 Ok(s) => s,
115 Err(e) => {
116 return Err(ApplyReplayPlanError::Parse {
117 path: replay_plan_path.clone(),
118 source: e.to_string(),
119 });
120 }
121 };
122
123 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
124 return Err(ApplyReplayPlanError::FormatVersion {
125 found: stored.format_version,
126 path: replay_plan_path.clone(),
127 });
128 }
129
130 if stored.checksum_up != pending_checksum_up
132 || stored.checksum_down.as_deref() != pending_checksum_down
133 {
134 return Err(ApplyReplayPlanError::ChecksumMismatch);
135 }
136
137 let plan = djogi::migrate::MigrationPlan {
138 bucket: bucket.clone(),
139 classification: stored.classification.into(),
140 segments: stored
141 .segments
142 .into_iter()
143 .map(|seg| djogi::migrate::Segment {
144 kind: seg.kind.into(),
145 statements: seg
146 .statements
147 .into_iter()
148 .map(|stmt| djogi::migrate::OperationSql {
149 label: stmt.label,
150 up: stmt.up,
151 down: String::new(),
152 lossy: None,
153 })
154 .collect(),
155 })
156 .collect(),
157 };
158
159 return Ok((plan, stored.checksum_up, stored.checksum_down));
160 }
161
162 let up_filename = djogi::migrate::up_filename(version);
165 let down_filename = djogi::migrate::down_filename(version);
166 let up_path = bucket_dir.join(&up_filename);
167 let down_path = bucket_dir.join(&down_filename);
168
169 let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
170 path: up_path.clone(),
171 source: e.to_string(),
172 })?;
173
174 let down_sql = match std::fs::read_to_string(&down_path) {
175 Ok(sql) => sql,
176 Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
177 Err(e) => {
178 return Err(ApplyReplayPlanError::SqlRead {
179 path: down_path.clone(),
180 source: e.to_string(),
181 });
182 }
183 };
184
185 let fallback =
188 djogi::migrate::canonical_fallback_replay_plan(bucket, version, &up_sql, &down_sql)
189 .map_err(|e| match e {
190 djogi::migrate::FallbackReplayPlanError::NonTransactionalStatement { shape } => {
191 ApplyReplayPlanError::NonTransactionalWithoutReplayPlan {
192 shape,
193 path: replay_plan_path.clone(),
194 }
195 }
196 })?;
197
198 if fallback.checksum_up != pending_checksum_up {
201 return Err(ApplyReplayPlanError::FallbackChecksumMismatch {
202 side: "up",
203 computed: fallback.checksum_up,
204 pending: pending_checksum_up.to_string(),
205 });
206 }
207
208 if fallback.checksum_down.as_deref() != pending_checksum_down {
213 return Err(ApplyReplayPlanError::FallbackChecksumMismatch {
214 side: "down",
215 computed: fallback.checksum_down.unwrap_or_default(),
216 pending: pending_checksum_down.unwrap_or_default().to_string(),
217 });
218 }
219
220 Ok((fallback.plan, fallback.checksum_up, fallback.checksum_down))
221}
222
223#[derive(Debug)]
225enum ApplyReplayPlanError {
226 PlanRead {
227 path: PathBuf,
228 source: String,
229 },
230 Parse {
231 path: PathBuf,
232 source: String,
233 },
234 FormatVersion {
235 found: String,
236 path: PathBuf,
237 },
238 ChecksumMismatch,
239 NonTransactionalWithoutReplayPlan {
240 shape: &'static str,
241 path: PathBuf,
242 },
243 SqlRead {
244 path: PathBuf,
245 source: String,
246 },
247 FallbackChecksumMismatch {
248 side: &'static str,
249 computed: String,
250 pending: String,
251 },
252}
253
254impl std::fmt::Display for ApplyReplayPlanError {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 match self {
257 Self::PlanRead { path, source } => {
258 write!(f, "read replay plan {}: {source}", path.display())
259 }
260 Self::Parse { path, source } => {
261 write!(f, "parse replay plan {}: {source}", path.display())
262 }
263 Self::FormatVersion { found, path } => write!(
264 f,
265 "replay plan format version mismatch in {}: expected {}, found {}",
266 path.display(),
267 CLI_REPLAY_PLAN_FORMAT_VERSION,
268 found
269 ),
270 Self::ChecksumMismatch => {
271 write!(f, "checksum mismatch between pending JSON and replay plan")
272 }
273 Self::NonTransactionalWithoutReplayPlan { shape, path } => write!(
274 f,
275 "migration contains `{shape}`, which cannot replay as a single \
276 transaction and requires its committed replay plan; restore {} \
277 (or re-run `djogi migrations compose`) and retry",
278 path.display()
279 ),
280 Self::SqlRead { path, source } => {
281 write!(f, "read SQL file {}: {source}", path.display())
282 }
283 Self::FallbackChecksumMismatch {
284 side,
285 computed,
286 pending,
287 } => write!(
288 f,
289 "committed {side} SQL checksum {computed} does not match the pending \
290 plan's {pending}; the file changed after compose — re-run \
291 `djogi migrations compose` (or restore the committed file)"
292 ),
293 }
294 }
295}
296
297impl std::error::Error for ApplyReplayPlanError {}
298
299fn classify_phase_zero_for_cleanup(
306 workspace: &Path,
307 bucket: &djogi::migrate::BucketKey,
308 version: &str,
309 pending_checksum_up: &str,
310 pending_checksum_down: Option<&str>,
311) -> Option<String> {
312 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
314 let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
315
316 if let Ok(bytes) = std::fs::read(&replay_plan_path) {
317 let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
318 Ok(s) => s,
319 Err(e) => {
320 return Some(format!("parse replay plan: {e}"));
321 }
322 };
323
324 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
325 return Some(format!(
326 "replay plan format version mismatch: expected {}, found {}",
327 CLI_REPLAY_PLAN_FORMAT_VERSION, stored.format_version
328 ));
329 }
330
331 if stored.checksum_up != pending_checksum_up
333 || stored.checksum_down.as_deref() != pending_checksum_down
334 {
335 return Some("checksum mismatch between pending JSON and replay plan".to_string());
336 }
337
338 let up_sql: String = stored
340 .segments
341 .iter()
342 .flat_map(|seg| seg.statements.iter())
343 .map(|stmt| stmt.up.as_str())
344 .collect::<Vec<&str>>()
345 .join("\n");
346
347 return classify_phase_zero_bytes(up_sql.as_bytes());
348 }
349
350 let up_filename = djogi::migrate::up_filename(version);
352 let up_path = bucket_dir.join(&up_filename);
353 match std::fs::read_to_string(&up_path) {
354 Ok(up_sql) => classify_phase_zero_bytes(up_sql.as_bytes()),
355 Err(e) => Some(format!("read up SQL file {}: {e}", up_path.display())),
356 }
357}
358
359fn classify_phase_zero_bytes(bytes: &[u8]) -> Option<String> {
362 match djogi::migrate::classify_phase_zero_artifact(bytes) {
363 djogi::migrate::PhaseZeroArtifactState::IdentityFreeCurrent => None,
364 djogi::migrate::PhaseZeroArtifactState::SeedCapableRuntimeCurrent => {
365 Some("seed-capable runtime-only artifact detected".to_string())
366 }
367 djogi::migrate::PhaseZeroArtifactState::SeedDmlNotRuntimeCurrent => {
368 Some("seed-dml non-runtime-current artifact detected".to_string())
369 }
370 djogi::migrate::PhaseZeroArtifactState::GeneratedStale => {
371 Some("generated-stale artifact detected".to_string())
372 }
373 djogi::migrate::PhaseZeroArtifactState::Ambiguous => {
374 Some("ambiguous or hand-edited artifact detected".to_string())
375 }
376 djogi::migrate::PhaseZeroArtifactState::Incomplete => {
377 Some("incomplete artifact (truncated generation)".to_string())
378 }
379 djogi::migrate::PhaseZeroArtifactState::Missing => Some("missing artifact".to_string()),
380 }
381}
382
383impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
386 fn from(kind: CliSegmentKind) -> Self {
387 match kind {
388 CliSegmentKind::Transactional => Self::Transactional,
389 CliSegmentKind::NonTransactional => Self::NonTransactional,
390 CliSegmentKind::MetadataOnly => Self::MetadataOnly,
391 }
392 }
393}
394
395impl From<CliClassification> for djogi::migrate::Classification {
396 fn from(classification: CliClassification) -> Self {
397 match classification {
398 CliClassification::NoOp => Self::NoOp,
399 CliClassification::Additive => Self::Additive,
400 CliClassification::Reversible => Self::Reversible,
401 CliClassification::Destructive => Self::Destructive,
402 CliClassification::Lossy => Self::Lossy,
403 CliClassification::Unsupported { reason } => Self::Unsupported { reason },
404 CliClassification::PkTypeFlip {
405 co_destructive,
406 co_lossy,
407 } => Self::PkTypeFlip {
408 co_destructive,
409 co_lossy,
410 },
411 }
412 }
413}
414
415fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
419 workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
420}
421
422fn discover_snapshot_buckets_on_disk(
434 workspace: &Path,
435) -> Vec<djogi::migrate::projection::BucketKey> {
436 let mut out = Vec::new();
437 let migrations_root = djogi::migrate::migrations_root(workspace);
438 let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
439 return out;
440 };
441 for db_entry in db_entries.flatten() {
442 let Ok(ft) = db_entry.file_type() else {
443 continue;
444 };
445 if !ft.is_dir() {
446 continue;
447 }
448 let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
449 continue;
450 };
451 let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
452 continue;
453 };
454 for app_entry in app_entries.flatten() {
455 let Ok(ft) = app_entry.file_type() else {
456 continue;
457 };
458 if !ft.is_dir() {
459 continue;
460 }
461 let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
462 continue;
463 };
464 let snap_path = app_entry.path().join("schema_snapshot.json");
465 if !snap_path.exists() {
466 continue;
467 }
468 let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
469 out.push(djogi::migrate::projection::BucketKey {
470 database: database.clone(),
471 app: label,
472 });
473 }
474 }
475 out
476}
477
478pub fn compose_cmd(
480 provider: &dyn DescriptorProvider,
481 name: &str,
482 allow_destructive: bool,
483 force_overwrite: bool,
484 workspace: Option<PathBuf>,
485) -> ExitCode {
486 let workspace = resolve_workspace(workspace);
487 let models = match project_from_provider(provider) {
488 Ok(m) => m,
489 Err(e) => {
490 eprintln!("djogi migrations compose: projection error: {e}");
491 return ExitCode::from(1);
492 }
493 };
494 let apps: Vec<AppLifecycle> = provider
495 .apps()
496 .iter()
497 .map(|d| AppLifecycle {
498 label: d.label.to_string(),
499 database: d.database.to_string(),
500 renamed_from: d.renamed_from.map(str::to_string),
501 tombstone: d.tombstone,
502 })
503 .collect();
504 let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
509 Ok(c) => c,
510 Err(e) => {
511 eprintln!("djogi migrations compose: config load: {e}");
512 return ExitCode::from(1);
513 }
514 };
515 let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
516 djogi_config.migrate.pk_flip_join_table_option,
517 );
518 compose_with_inputs(
519 &workspace,
520 name,
521 allow_destructive,
522 force_overwrite,
523 &models,
524 &apps,
525 time::OffsetDateTime::now_utc(),
526 Some(pk_flip_option),
527 )
528}
529
530#[allow(clippy::too_many_arguments)]
545fn compose_with_inputs(
546 workspace: &Path,
547 name: &str,
548 allow_destructive: bool,
549 force_overwrite: bool,
550 models: &std::collections::BTreeMap<
551 djogi::migrate::projection::BucketKey,
552 djogi::migrate::AppliedSchema,
553 >,
554 apps: &[AppLifecycle],
555 now: time::OffsetDateTime,
556 pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
557) -> ExitCode {
558 let lock_path = workspace.join(LOCK_FILE_NAME);
559 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
560 Ok(g) => g,
561 Err(e) => {
562 eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
563 return ExitCode::from(1);
564 }
565 };
566
567 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
575 models.keys().cloned().collect();
576 for bucket in discover_snapshot_buckets_on_disk(workspace) {
577 bucket_set.insert(bucket);
578 }
579
580 let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
581 for bucket in &bucket_set {
582 let path = djogi::migrate::snapshot_path(workspace, bucket);
583 match djogi::migrate::load_snapshot(&path) {
584 Ok(s) => {
585 snapshots.insert(bucket.clone(), s);
586 }
587 Err(djogi::migrate::SnapshotError::Io { source, .. })
588 if source.kind() == std::io::ErrorKind::NotFound =>
589 {
590 }
592 Err(e) => {
593 eprintln!(
594 "djogi migrations compose: snapshot load failed at {}: {e}",
595 path.display()
596 );
597 return ExitCode::from(1);
598 }
599 }
600 }
601
602 let req = ComposeRequest {
603 workspace_root: workspace,
604 models,
605 snapshots: &snapshots,
606 apps,
607 name,
608 allow_destructive,
609 force_overwrite,
610 now,
611 _guard: &guard,
612 pk_flip_join_table_option,
613 skip_phase_zero_auto_emit: false,
619 };
620 match compose(req) {
621 Ok(report) => {
622 for emit in &report.emitted_phase_zero {
626 let ext_summary = if emit.extensions.is_empty() {
627 "no extensions".to_string()
628 } else {
629 format!(
630 "extensions: {}",
631 emit.extensions
632 .iter()
633 .cloned()
634 .collect::<Vec<_>>()
635 .join(", ")
636 )
637 };
638 println!(
639 "auto-emitted bootstrap migration: {database}/_global_ ({ext_summary})",
640 database = emit.database,
641 );
642 }
643 for cb in &report.composed_buckets {
644 println!(
645 "composed {database}/{app}: {version} ({classification:?})",
646 database = cb.bucket.database,
647 app = if cb.bucket.app.is_empty() {
648 "_global_"
649 } else {
650 cb.bucket.app.as_str()
651 },
652 version = cb.version,
653 classification = cb.classification,
654 );
655 }
656 for bucket in &report.converged_snapshot_buckets {
657 println!(
658 "snapshot converged: {database}/{app} — snapshot updated to scoped enum set, no migration needed",
659 database = bucket.database,
660 app = if bucket.app.is_empty() {
661 "_global_"
662 } else {
663 bucket.app.as_str()
664 },
665 );
666 }
667 ExitCode::from(0)
668 }
669 Err(ComposeError::NothingToCompose) => {
670 println!("nothing to compose — model state matches snapshot for every bucket");
671 ExitCode::from(0)
675 }
676 Err(e) => {
677 eprintln!("djogi migrations compose: {e}");
678 ExitCode::from(compose_error_exit_code(&e) as u8)
679 }
680 }
681}
682
683fn compose_error_exit_code(error: &ComposeError) -> i32 {
702 match error {
703 ComposeError::NothingToCompose => 0,
704
705 ComposeError::DestructiveRequiresAllowDestructive { .. }
707 | ComposeError::TombstonedAppRequiresAllowDestructive { .. }
708 | ComposeError::UnsupportedDelta { .. }
709 | ComposeError::HandEditedMigrationWouldBeOverwritten { .. }
710 | ComposeError::PendingJsonWouldBeOverwritten { .. }
711 | ComposeError::FolderRenameTargetCollision { .. }
712 | ComposeError::LinkageDropWithoutModels { .. }
713 | ComposeError::CrossBucketForeignKeyCycle { .. } => 2,
714
715 ComposeError::SqlEmit(e) => match e {
721 SqlEmitError::Unsupported { .. }
722 | SqlEmitError::UnsupportedPartitionChange { .. }
723 | SqlEmitError::InvalidStorageParams { .. } => 2,
724 SqlEmitError::Diff(diff_err) => diff_exit_code(diff_err),
725 SqlEmitError::PkTypeFlipMustRouteToT9 { .. } => 1,
728 },
729
730 ComposeError::Diff(e) => diff_exit_code(e),
732
733 ComposeError::PhaseZeroAutoEmit(e) => match e {
737 AutoEmitError::Compose(BootstrapError::InvalidExtensionName { .. })
738 | AutoEmitError::Compose(BootstrapError::UnknownExtension { .. }) => 2,
739 AutoEmitError::Compose(BootstrapError::Db { .. })
740 | AutoEmitError::Io { .. }
741 | AutoEmitError::PendingJson(_) => 1,
742 },
743
744 ComposeError::Io { .. } | ComposeError::SerializeFailed(_) => 1,
747
748 _ => 1,
752 }
753}
754
755fn diff_exit_code(error: &DiffError) -> i32 {
763 match error {
764 DiffError::PkFlipCascadeDepthExceeded { .. }
767 | DiffError::PartitionedMultiParentClusterUnsupported { .. } => 2,
768 DiffError::PkFlipMalformedSelfFkMetadata(_) => 1,
771 }
772}
773
774pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
779 let workspace = resolve_workspace(workspace);
780
781 let runtime = match tokio::runtime::Builder::new_current_thread()
783 .enable_all()
784 .build()
785 {
786 Ok(r) => r,
787 Err(e) => {
788 eprintln!("djogi migrations status: tokio runtime: {e}");
789 return ExitCode::from(1);
790 }
791 };
792
793 let exit = runtime.block_on(async { run_status(&workspace).await });
794 ExitCode::from(exit as u8)
795}
796
797async fn run_status(workspace: &Path) -> i32 {
806 use djogi::config::DjogiConfig;
807
808 let config = match DjogiConfig::load_from_workspace(workspace) {
809 Ok(c) => c,
810 Err(e) => {
811 eprintln!("djogi migrations status: config load: {e}");
812 return 1;
813 }
814 };
815
816 let mut ctx = match connect_and_check(&config.database.url).await {
817 ContextOutcome::Ready(ctx) => ctx,
818 ContextOutcome::UnsupportedVersion(e) => {
819 crate::print_support_boundary_error("migrations status", &e);
820 return 2;
821 }
822 ContextOutcome::RuntimeError(msg) => {
823 eprintln!("djogi migrations status: pool: {msg}");
824 return 1;
825 }
826 };
827
828 let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
829 Ok(rows) => rows,
830 Err(e) => {
831 if e.to_string().contains("djogi_schema_migrations") {
834 println!("No migrations recorded.");
835 return 0;
836 }
837 eprintln!("djogi migrations status: ledger read: {e}");
838 return 1;
839 }
840 };
841
842 let registered: Vec<String> = AppRegistry::all()
843 .iter()
844 .map(|d| d.label.to_string())
845 .collect();
846 let report = djogi::migrate::render_status(&rows, ®istered);
847 for line in &report.lines {
848 println!("{line}");
849 }
850 report.exit_code
851}
852
853#[allow(clippy::large_enum_variant)]
874enum ContextOutcome {
875 Ready(djogi::context::DjogiContext),
877 UnsupportedVersion(djogi::error::DjogiError),
880 RuntimeError(String),
883}
884
885async fn connect_and_check(url: &str) -> ContextOutcome {
893 let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
894 Ok(p) => p,
895 Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
896 };
897 match djogi::pg::preflight::check_postgres_version(&pool).await {
898 Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
899 Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
902 ContextOutcome::UnsupportedVersion(e)
903 }
904 Err(other) => ContextOutcome::RuntimeError(other.to_string()),
905 }
906}
907
908fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
928 if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
931 return Some(db_config.url.clone());
932 }
933 if database == "crud_log"
934 && let Some(u) = db_config.crud_log_url.as_deref()
935 && !u.is_empty()
936 {
937 return Some(u.to_string());
938 }
939 if database == "event_log"
940 && let Some(u) = db_config.event_log_url.as_deref()
941 && !u.is_empty()
942 {
943 return Some(u.to_string());
944 }
945 djogi::migrate::derive_per_database_url(&db_config.url, database)
946}
947
948pub fn apply_cmd(
958 workspace: Option<PathBuf>,
959 fake: bool,
960 reason: Option<String>,
961 node_id: Option<u32>,
962 single_node_dev: bool,
963) -> ExitCode {
964 let workspace = resolve_workspace(workspace);
965
966 let mode = if fake {
968 match reason {
969 Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
970 Some(_) => {
971 eprintln!(
972 "djogi migrations apply --fake: --reason must not be empty; \
973 supply a non-empty reason why these migrations are being \
974 faked (e.g. 'schema pre-exists from prior tooling')"
975 );
976 return ExitCode::from(2);
977 }
978 None => {
979 eprintln!(
980 "djogi migrations apply --fake: --reason is required; \
981 supply a reason why these migrations are being faked \
982 (e.g. 'schema pre-exists from prior tooling'). \
983 This is recorded in the ledger audit trail."
984 );
985 return ExitCode::from(2);
986 }
987 }
988 } else {
989 FakeMode::Real
990 };
991
992 let runtime = match tokio::runtime::Builder::new_current_thread()
993 .enable_all()
994 .build()
995 {
996 Ok(r) => r,
997 Err(e) => {
998 eprintln!("djogi migrations apply: tokio runtime: {e}");
999 return ExitCode::from(1);
1000 }
1001 };
1002
1003 let exit =
1004 runtime.block_on(async { run_apply(&workspace, &mode, node_id, single_node_dev).await });
1005 ExitCode::from(exit as u8)
1006}
1007
1008#[derive(Debug, Clone)]
1011enum FakeMode {
1012 Real,
1014 Fake { reason: String },
1016}
1017
1018async fn run_apply(
1020 workspace: &Path,
1021 mode: &FakeMode,
1022 node_id: Option<u32>,
1023 single_node_dev: bool,
1024) -> i32 {
1025 use djogi::config::DjogiConfig;
1026
1027 let action_verb = match mode {
1028 FakeMode::Real => "apply",
1029 FakeMode::Fake { .. } => "fake-apply",
1030 };
1031 let progress_verb = match mode {
1032 FakeMode::Real => "applying",
1033 FakeMode::Fake { .. } => "faking",
1034 };
1035
1036 let config = match DjogiConfig::load_from_workspace(workspace) {
1038 Ok(c) => c,
1039 Err(e) => {
1040 eprintln!("djogi migrations {action_verb}: config load: {e}");
1041 return 2;
1042 }
1043 };
1044
1045 let pending_files = match discover_pending_plans(workspace) {
1049 Ok(pending_files) => pending_files,
1050 Err(e) => {
1051 eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
1052 return 2;
1053 }
1054 };
1055 if pending_files.is_empty() {
1056 println!("No pending migrations to {action_verb}.");
1057 return 0;
1058 }
1059
1060 let runner_identity = match crate::identity::resolve_identity(
1063 node_id,
1064 single_node_dev,
1065 &config.profile,
1066 action_verb,
1067 ) {
1068 Ok(resolved) => Some(resolved.into_runner_identity()),
1069 Err(e) => {
1070 let _ = crate::identity::print_identity_error(action_verb, &e);
1071 return 2;
1072 }
1073 };
1074
1075 let target_urls = match resolve_apply_target_urls(&pending_files, &config.database) {
1080 Ok(urls) => urls,
1081 Err(e) => {
1082 eprintln!("djogi migrations {action_verb}: target routing: {e}");
1083 return 2;
1084 }
1085 };
1086 let mut contexts = std::collections::BTreeMap::<String, djogi::context::DjogiContext>::new();
1087 for (database, url) in &target_urls {
1088 match connect_and_check(url).await {
1089 ContextOutcome::Ready(ctx) => {
1090 contexts.insert(database.clone(), ctx);
1091 }
1092 ContextOutcome::UnsupportedVersion(e) => {
1093 crate::print_support_boundary_error("migrations apply", &e);
1094 return 2;
1095 }
1096 ContextOutcome::RuntimeError(msg) => {
1097 eprintln!("djogi migrations {action_verb}: pool for '{database}': {msg}");
1098 return 1;
1099 }
1100 }
1101 }
1102
1103 let lock_path = workspace.join(LOCK_FILE_NAME);
1105 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1106 Ok(g) => g,
1107 Err(e) => {
1108 eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
1109 return 1;
1110 }
1111 };
1112
1113 let pending_files = match reconcile_pending_plans_after_lock(workspace, &pending_files) {
1115 Ok(pending_files) => pending_files,
1116 Err(e) => {
1117 eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
1118 return 2;
1119 }
1120 };
1121
1122 let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
1124 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
1125 Err(_) => None,
1126 };
1127
1128 for pending_file in &pending_files {
1132 let bucket_database = &pending_file.bucket.database;
1133 let app_label = &pending_file.bucket.app;
1134 let Some(ctx) = contexts.get_mut(bucket_database) else {
1135 eprintln!(
1136 "djogi migrations {action_verb}: internal error: missing context for database '{bucket_database}'"
1137 );
1138 return 1;
1139 };
1140 println!(" {progress_verb} {bucket_database}/{app_label}...");
1141 let result = apply_one_pending(
1142 ctx,
1143 workspace,
1144 pending_file,
1145 &config,
1146 &guard,
1147 audit_pool.as_ref(),
1148 mode,
1149 runner_identity,
1150 )
1151 .await;
1152
1153 match result {
1154 ApplyResult::Ok => match mode {
1155 FakeMode::Real => {
1156 println!("Applied: {bucket_database}/{app_label}");
1157 }
1158 FakeMode::Fake { .. } => {
1159 println!(
1160 " faked {bucket_database}/{app_label}: \
1161 recorded in ledger with status = 'faked' (no SQL executed)"
1162 );
1163 }
1164 },
1165 ApplyResult::Skipped(reason) => {
1166 println!("Skipped {bucket_database}/{app_label}: {reason}");
1167 }
1168 ApplyResult::Refused(reason) => {
1169 eprintln!(
1170 "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
1171 );
1172 return 2;
1173 }
1174 ApplyResult::RunnerError(e) => {
1175 match &e {
1176 RunnerError::DriftDetected { bucket, report } => {
1177 for line in render_drift_refusal(bucket, report) {
1178 eprintln!("{line}");
1179 }
1180 }
1181 _ => eprintln!(
1182 "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
1183 ),
1184 }
1185 return runner_error_exit_code(&e);
1186 }
1187 }
1188 }
1189
1190 let summary_verb = match mode {
1191 FakeMode::Real => "applied",
1192 FakeMode::Fake { .. } => "faked",
1193 };
1194 println!("{summary_verb} {} migration(s).", pending_files.len());
1195 0
1196}
1197
1198#[derive(Debug)]
1200enum ApplyResult {
1201 Ok,
1203 Skipped(String),
1205 Refused(String),
1207 RunnerError(RunnerError),
1210}
1211
1212#[derive(Debug, Clone, PartialEq, Eq)]
1213struct DiscoveredPendingPlan {
1214 path: PathBuf,
1215 bucket: BucketKey,
1216 plan: PendingPlan,
1217 is_phase_zero: bool,
1218}
1219
1220fn is_acceptable_pending_path_component(bytes: &[u8]) -> bool {
1221 if bytes.is_empty() || bytes.len() > 63 {
1222 return false;
1223 }
1224 if bytes[0] == b'.' {
1225 return false;
1226 }
1227 let first = bytes[0];
1228 if first != b'_' && !first.is_ascii_alphabetic() {
1229 return false;
1230 }
1231 for &b in &bytes[1..] {
1232 if b != b'_' && !b.is_ascii_alphanumeric() {
1233 return false;
1234 }
1235 }
1236 true
1237}
1238
1239fn canonical_pending_filename(app_label: &str) -> String {
1240 format!("{}.json", djogi::migrate::app_dirname(app_label))
1241}
1242
1243fn validate_hidden_phase_zero_pending(
1244 path: PathBuf,
1245 database: &str,
1246) -> Result<DiscoveredPendingPlan, String> {
1247 let filename = path
1248 .file_name()
1249 .and_then(|f| f.to_str())
1250 .ok_or_else(|| format!("non-utf8 Phase 0 pending path {}", path.display()))?;
1251 let expected_filename = format!("{}.json", djogi::migrate::PHASE_ZERO_VERSION);
1252 if filename != expected_filename {
1253 return Err(format!(
1254 "hidden Phase 0 pending path {} must use canonical filename {}",
1255 path.display(),
1256 expected_filename
1257 ));
1258 }
1259 let plan = djogi::migrate::load_pending(&path)
1260 .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1261 if plan.bucket_database != database {
1262 return Err(format!(
1263 "pending JSON {} has bucket database {}, expected {} from path",
1264 path.display(),
1265 plan.bucket_database,
1266 database
1267 ));
1268 }
1269 if !plan.bucket_app.is_empty() {
1270 return Err(format!(
1271 "pending JSON {} must target the global bucket in hidden Phase 0 namespace",
1272 path.display()
1273 ));
1274 }
1275 if plan.version != djogi::migrate::PHASE_ZERO_VERSION {
1276 return Err(format!(
1277 "pending JSON {} must use Phase 0 version {}, found {}",
1278 path.display(),
1279 djogi::migrate::PHASE_ZERO_VERSION,
1280 plan.version
1281 ));
1282 }
1283 Ok(DiscoveredPendingPlan {
1284 path,
1285 bucket: BucketKey {
1286 database: database.to_string(),
1287 app: String::new(),
1288 },
1289 plan,
1290 is_phase_zero: true,
1291 })
1292}
1293
1294fn validate_normal_pending(
1295 path: PathBuf,
1296 database: &str,
1297 filename: &str,
1298) -> Result<DiscoveredPendingPlan, String> {
1299 let Some(stem) = filename.strip_suffix(".json") else {
1300 return Err(format!(
1301 "pending path {} must end with .json",
1302 path.display()
1303 ));
1304 };
1305 let app = if stem == "_global_" {
1306 String::new()
1307 } else {
1308 if !is_acceptable_pending_path_component(stem.as_bytes()) {
1309 return Err(format!(
1310 "pending path {} uses non-canonical app filename {}",
1311 path.display(),
1312 filename
1313 ));
1314 }
1315 stem.to_string()
1316 };
1317 let expected_filename = canonical_pending_filename(&app);
1318 if filename != expected_filename {
1319 return Err(format!(
1320 "pending path {} must use canonical filename {}",
1321 path.display(),
1322 expected_filename
1323 ));
1324 }
1325 let plan = djogi::migrate::load_pending(&path)
1326 .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1327 if plan.bucket_database != database {
1328 return Err(format!(
1329 "pending JSON {} has bucket database {}, expected {} from path",
1330 path.display(),
1331 plan.bucket_database,
1332 database
1333 ));
1334 }
1335 if plan.bucket_app != app {
1336 let expected_app = if app.is_empty() {
1337 "_global_"
1338 } else {
1339 app.as_str()
1340 };
1341 let found_app = if plan.bucket_app.is_empty() {
1342 "_global_"
1343 } else {
1344 plan.bucket_app.as_str()
1345 };
1346 return Err(format!(
1347 "pending JSON {} has bucket app {}, expected {} from path",
1348 path.display(),
1349 found_app,
1350 expected_app
1351 ));
1352 }
1353 if plan.version == djogi::migrate::PHASE_ZERO_VERSION {
1354 return Err(format!(
1355 "pending JSON {} must use the hidden .phase_zero namespace for Phase 0",
1356 path.display()
1357 ));
1358 }
1359 Ok(DiscoveredPendingPlan {
1360 path,
1361 bucket: BucketKey {
1362 database: database.to_string(),
1363 app,
1364 },
1365 is_phase_zero: false,
1366 plan,
1367 })
1368}
1369
1370fn discover_pending_plans(workspace: &Path) -> Result<Vec<DiscoveredPendingPlan>, String> {
1375 let pending_root = djogi::migrate::pending_root(workspace);
1376 let mut out = Vec::new();
1377 let mut seen_identities = std::collections::BTreeSet::new();
1378
1379 let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
1380 return Ok(out);
1381 };
1382
1383 for db_entry in db_entries.flatten() {
1384 let db_name = match db_entry.file_name().to_str().map(str::to_string) {
1385 Some(n) => n,
1386 None => continue,
1387 };
1388 if !is_acceptable_pending_path_component(db_name.as_bytes()) {
1389 continue;
1390 }
1391
1392 let db_dir = db_entry.path();
1393 if !db_dir.is_dir() {
1394 continue;
1395 }
1396
1397 let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
1398 continue;
1399 };
1400
1401 for app_entry in app_entries.flatten() {
1402 let path = app_entry.path();
1403 let file_type = match app_entry.file_type() {
1404 Ok(file_type) => file_type,
1405 Err(_) => continue,
1406 };
1407 if file_type.is_dir() {
1408 if app_entry.file_name().to_str() == Some(".phase_zero") {
1409 let Ok(phase_zero_entries) = std::fs::read_dir(&path) else {
1410 continue;
1411 };
1412 for phase_zero_entry in phase_zero_entries.flatten() {
1413 let phase_zero_path = phase_zero_entry.path();
1414 if !phase_zero_path.is_file() {
1415 continue;
1416 }
1417 let discovered =
1418 validate_hidden_phase_zero_pending(phase_zero_path, &db_name)?;
1419 let identity = (
1420 discovered.bucket.database.clone(),
1421 discovered.bucket.app.clone(),
1422 discovered.plan.version.clone(),
1423 );
1424 if !seen_identities.insert(identity.clone()) {
1425 return Err(format!(
1426 "duplicate pending identity discovered for {}/{}/{}",
1427 identity.0,
1428 if identity.1.is_empty() {
1429 "_global_"
1430 } else {
1431 identity.1.as_str()
1432 },
1433 identity.2
1434 ));
1435 }
1436 out.push(discovered);
1437 }
1438 }
1439 continue;
1440 }
1441 if !file_type.is_file() {
1442 continue;
1443 }
1444 let filename = match path.file_name().and_then(|f| f.to_str()) {
1445 Some(f) => f.to_string(),
1446 None => continue,
1447 };
1448 if !filename.ends_with(".json") {
1449 continue;
1450 }
1451 let discovered = validate_normal_pending(path, &db_name, &filename)?;
1452 let identity = (
1453 discovered.bucket.database.clone(),
1454 discovered.bucket.app.clone(),
1455 discovered.plan.version.clone(),
1456 );
1457 if !seen_identities.insert(identity.clone()) {
1458 return Err(format!(
1459 "duplicate pending identity discovered for {}/{}/{}",
1460 identity.0,
1461 if identity.1.is_empty() {
1462 "_global_"
1463 } else {
1464 identity.1.as_str()
1465 },
1466 identity.2
1467 ));
1468 }
1469 out.push(discovered);
1470 }
1471 }
1472
1473 out.sort_by(|a, b| {
1476 a.plan
1477 .version
1478 .cmp(&b.plan.version)
1479 .then_with(|| b.is_phase_zero.cmp(&a.is_phase_zero))
1480 .then_with(|| a.path.cmp(&b.path))
1481 });
1482
1483 let out = order_pending_groups_by_dependencies(out)?;
1489
1490 Ok(out)
1491}
1492
1493fn order_pending_groups_by_dependencies(
1504 out: Vec<DiscoveredPendingPlan>,
1505) -> Result<Vec<DiscoveredPendingPlan>, String> {
1506 let mut result = Vec::with_capacity(out.len());
1509 let mut i = 0;
1510 while i < out.len() {
1511 let mut j = i + 1;
1512 while j < out.len()
1513 && out[j].bucket.database == out[i].bucket.database
1514 && out[j].plan.version == out[i].plan.version
1515 && out[j].is_phase_zero == out[i].is_phase_zero
1516 {
1517 j += 1;
1518 }
1519
1520 for entry in &out[i..j] {
1527 for dep_app in &entry.plan.depends_on {
1528 if !is_acceptable_pending_path_component(dep_app.as_bytes()) {
1529 return Err(format!(
1530 "pending plan for {}/{} has invalid depends_on label {:?}",
1531 entry.bucket.database, entry.bucket.app, dep_app,
1532 ));
1533 }
1534 }
1535 }
1536
1537 if j - i <= 1 {
1539 result.append(&mut out[i..j].to_vec());
1541 i = j;
1542 continue;
1543 }
1544
1545 let database = &out[i].bucket.database;
1546 let version = &out[i].plan.version;
1547
1548 let group_len = j - i;
1550 let mut in_degree = vec![0usize; group_len];
1551 let mut reverse: Vec<Vec<usize>> = vec![Vec::new(); group_len];
1552
1553 let app_to_idx: std::collections::HashMap<&str, usize> = out[i..j]
1555 .iter()
1556 .enumerate()
1557 .map(|(idx, entry)| (entry.bucket.app.as_str(), idx))
1558 .collect();
1559
1560 for (k_idx, entry) in out[i..j].iter().enumerate() {
1561 for dep_app in &entry.plan.depends_on {
1562 let Some(&dep_idx) = app_to_idx.get(dep_app.as_str()) else {
1563 continue; };
1565 if dep_idx != k_idx {
1566 in_degree[k_idx] += 1;
1567 reverse[dep_idx].push(k_idx);
1568 }
1569 }
1570 }
1571
1572 let mut ready: std::collections::BTreeSet<usize> =
1574 (0..group_len).filter(|&idx| in_degree[idx] == 0).collect();
1575
1576 let mut ordered = Vec::with_capacity(group_len);
1577 while let Some(idx) = ready.iter().next().cloned() {
1578 ready.remove(&idx);
1579 ordered.push(idx);
1580 for &dependent in &reverse[idx] {
1581 in_degree[dependent] -= 1;
1582 if in_degree[dependent] == 0 {
1583 ready.insert(dependent);
1584 }
1585 }
1586 }
1587
1588 if ordered.len() != group_len {
1589 let mut chain: Vec<String> = (0..group_len)
1590 .filter(|&idx| in_degree[idx] > 0)
1591 .map(|idx| out[i + idx].bucket.app.clone())
1592 .collect();
1593 chain.sort();
1594 return Err(format!(
1595 "pending migrations for database `{database}` version `{version}` \
1596 declare a dependency cycle between apps: {chain:?}; \
1597 recompose or inspect hand-edited pending files"
1598 ));
1599 }
1600
1601 for idx in ordered {
1602 result.push(out[i + idx].clone());
1603 }
1604 i = j;
1605 }
1606
1607 Ok(result)
1608}
1609
1610fn load_verified_pending_for_apply(
1611 pending_file: &DiscoveredPendingPlan,
1612) -> Result<PendingPlan, String> {
1613 let pending_bytes =
1614 std::fs::read(&pending_file.path).map_err(|e| format!("read pending JSON: {e}"))?;
1615 let pending: PendingPlan =
1616 serde_json::from_slice(&pending_bytes).map_err(|e| format!("parse pending JSON: {e}"))?;
1617 if pending != pending_file.plan {
1618 return Err(format!(
1619 "pending JSON changed after discovery at {}; rerun the command",
1620 pending_file.path.display()
1621 ));
1622 }
1623 Ok(pending)
1624}
1625
1626fn resolve_apply_target_urls(
1627 pending_files: &[DiscoveredPendingPlan],
1628 db_config: &djogi::config::DatabaseConfig,
1629) -> Result<std::collections::BTreeMap<String, String>, String> {
1630 let mut urls = std::collections::BTreeMap::new();
1631 for pending_file in pending_files {
1632 let database = &pending_file.bucket.database;
1633 if urls.contains_key(database) {
1634 continue;
1635 }
1636 let Some(url) = resolve_bucket_url(db_config, database) else {
1637 return Err(format!("cannot derive a database URL for `{database}`"));
1638 };
1639 urls.insert(database.clone(), url);
1640 }
1641 Ok(urls)
1642}
1643
1644fn reconcile_pending_plans_after_lock(
1645 workspace: &Path,
1646 pre_lock_pending_files: &[DiscoveredPendingPlan],
1647) -> Result<Vec<DiscoveredPendingPlan>, String> {
1648 let locked_pending_files = discover_pending_plans(workspace)?;
1649 if locked_pending_files != pre_lock_pending_files {
1650 return Err(
1651 "pending migration set changed while waiting for the workspace lock; rerun the command"
1652 .to_string(),
1653 );
1654 }
1655 Ok(locked_pending_files)
1656}
1657
1658#[allow(clippy::too_many_arguments)]
1676#[djogi::deliberately_bypass_convention_with_raw_sql]
1677async fn apply_one_pending(
1684 ctx: &mut djogi::context::DjogiContext,
1685 workspace: &Path,
1686 pending_file: &DiscoveredPendingPlan,
1687 config: &djogi::config::DjogiConfig,
1688 guard: &djogi::migrate::WorkspaceGuard,
1689 audit_pool: Option<&deadpool_postgres::Pool>,
1690 mode: &FakeMode,
1691 runner_identity: Option<djogi::migrate::RunnerIdentity>,
1692) -> ApplyResult {
1693 let pending = match load_verified_pending_for_apply(pending_file) {
1695 Ok(pending) => pending,
1696 Err(e) => return ApplyResult::Refused(e),
1697 };
1698
1699 let bucket = pending_file.bucket.clone();
1700
1701 match check_ledger_state(ctx, &pending.version, &bucket.app).await {
1703 LedgerState::NotPresent => {} LedgerState::AlreadyApplied => {
1705 return ApplyResult::Skipped("already applied".to_string());
1706 }
1707 LedgerState::PendingOrPartial(existing_status) => {
1708 if existing_status == LedgerStatus::Failed
1712 || existing_status == LedgerStatus::RolledBack
1713 {
1714 if pending.version == djogi::migrate::PHASE_ZERO_VERSION {
1720 let cleanup_refusal = classify_phase_zero_for_cleanup(
1721 workspace,
1722 &bucket,
1723 &pending.version,
1724 &pending.checksum_up,
1725 pending.checksum_down.as_deref(),
1726 );
1727 if let Some(reason) = cleanup_refusal {
1728 return ApplyResult::Refused(format!(
1729 "Phase 0 cleanup refused: {reason}; \
1730 refusing before deleting {} row to prevent stale replay",
1731 existing_status.as_db_str()
1732 ));
1733 }
1734 }
1735
1736 if let Err(e) =
1740 delete_reapply_blocking_ledger_row(ctx, &pending.version, &bucket.app).await
1741 {
1742 return ApplyResult::Refused(format!(
1743 "clean {} ledger row: {e}",
1744 existing_status.as_db_str()
1745 ));
1746 }
1747 } else {
1748 return ApplyResult::Refused(format!(
1749 "version already in {} state — resolve before re-applying",
1750 existing_status.as_db_str()
1751 ));
1752 }
1753 }
1754 }
1755
1756 let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1758 workspace,
1759 &bucket,
1760 &pending.version,
1761 &pending.checksum_up,
1762 pending.checksum_down.as_deref(),
1763 ) {
1764 Ok(result) => result,
1765 Err(e) => {
1766 return ApplyResult::Refused(format!("load replay plan: {e}"));
1767 }
1768 };
1769
1770 let snap_path = reconstruct_snapshot_path(workspace, &bucket);
1771 let drift_baseline = load_drift_baseline(mode, &snap_path);
1772
1773 let runner_ctx = RunnerCtx {
1775 bucket: bucket.clone(),
1776 version: pending.version.clone(),
1777 description: pending.slug.clone(),
1778 checksum_up,
1779 checksum_down,
1780 snapshot: Some(pending.model_snapshot.clone()),
1781 snapshot_path: Some(snap_path),
1782 config: djogi::config::MigrateConfig {
1784 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1785 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1786 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1787 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1788 },
1789 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1790 audit_pool: audit_pool.cloned(),
1791 runner_identity,
1792 drift_baseline,
1793 };
1794
1795 let runner_result = match mode {
1797 FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1798 FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1799 };
1800 match runner_result {
1801 Ok(_) => ApplyResult::Ok,
1802 Err(e) => ApplyResult::RunnerError(e),
1803 }
1804}
1805
1806#[derive(Debug)]
1808enum LedgerState {
1809 NotPresent,
1811 AlreadyApplied,
1813 PendingOrPartial(LedgerStatus),
1815}
1816
1817async fn check_ledger_state(
1819 ctx: &mut djogi::context::DjogiContext,
1820 version: &str,
1821 app_label: &str,
1822) -> LedgerState {
1823 let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1824 return LedgerState::NotPresent;
1827 };
1828
1829 let existing = rows
1830 .iter()
1831 .find(|r| r.version == version && r.app_label == app_label);
1832 match existing {
1833 None => LedgerState::NotPresent,
1834 Some(row) => match row.status {
1835 LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1836 LedgerState::AlreadyApplied
1837 }
1838 LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1839 LedgerState::PendingOrPartial(row.status)
1840 }
1841 },
1842 }
1843}
1844
1845fn runner_error_exit_code(error: &RunnerError) -> i32 {
1855 if error.is_operator_actionable() { 2 } else { 1 }
1856}
1857
1858#[djogi::deliberately_bypass_convention_with_raw_sql]
1859async fn delete_reapply_blocking_ledger_row(
1865 ctx: &mut djogi::context::DjogiContext,
1866 version: &str,
1867 app_label: &str,
1868) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1869 ctx.raw_execute(
1870 "DELETE FROM djogi_schema_migrations \
1871 WHERE version = $1 AND app_label = $2",
1872 &[&version, &app_label],
1873 )
1874 .await?;
1875 Ok(())
1876}
1877
1878fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1880 let migrations_root = djogi::migrate::migrations_root(workspace);
1881 migrations_root
1882 .join(&bucket.database)
1883 .join(djogi::migrate::app_dirname(&bucket.app))
1884 .join("schema_snapshot.json")
1885}
1886
1887fn load_drift_baseline(mode: &FakeMode, snap_path: &Path) -> DriftBaseline {
1900 if let FakeMode::Fake { .. } = mode {
1901 return DriftBaseline::Disabled;
1902 }
1903 match load_snapshot(snap_path) {
1904 Ok(snapshot) => DriftBaseline::Snapshot(snapshot),
1905 Err(SnapshotError::Io { source, .. }) if source.kind() == std::io::ErrorKind::NotFound => {
1906 DriftBaseline::Missing
1907 }
1908 Err(e) => DriftBaseline::Corrupted(e.to_string()),
1909 }
1910}
1911
1912#[allow(clippy::too_many_arguments)]
1939pub fn attune_cmd(
1940 target: Option<&str>,
1941 apply: bool,
1942 record: bool,
1943 record_ledger: bool,
1944 record_reason: &str,
1945 squash: bool,
1946 from: Option<&str>,
1947 publish: bool,
1948 app: Option<&str>,
1949 workspace: Option<PathBuf>,
1950) -> ExitCode {
1951 let workspace = resolve_workspace(workspace);
1952 let mode = match (record_ledger, squash) {
1953 (false, false) => AttuneMode::DiffOnly,
1954 (true, false) => AttuneMode::Record {
1955 reason: record_reason.to_string(),
1956 },
1957 (false, true) => match from {
1958 Some(v) if !v.is_empty() => AttuneMode::Squash {
1959 from: v.to_string(),
1960 publish,
1961 app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1962 },
1963 _ => {
1964 eprintln!(
1965 "djogi migrations attune --squash requires --from <version> (e.g. \
1966 `--from V20260101000000__init`)"
1967 );
1968 return ExitCode::from(2);
1969 }
1970 },
1971 (true, true) => {
1972 eprintln!(
1975 "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1976 );
1977 return ExitCode::from(2);
1978 }
1979 };
1980
1981 let runtime = match tokio::runtime::Builder::new_current_thread()
1982 .enable_all()
1983 .build()
1984 {
1985 Ok(r) => r,
1986 Err(e) => {
1987 eprintln!("djogi migrations attune: tokio runtime: {e}");
1988 return ExitCode::from(1);
1989 }
1990 };
1991
1992 let target_owned = target.map(str::to_string);
1993 let exit =
1994 runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1995 ExitCode::from(exit as u8)
1996}
1997
1998async fn run_attune(
2001 workspace: &Path,
2002 mode: AttuneMode,
2003 target: Option<String>,
2004 apply: bool,
2005 record: bool,
2006) -> i32 {
2007 use djogi::config::DjogiConfig;
2008
2009 let config = match DjogiConfig::load_from_workspace(workspace) {
2010 Ok(c) => c,
2011 Err(e) => {
2012 eprintln!("djogi migrations attune: config load: {e}");
2013 return 1;
2014 }
2015 };
2016
2017 let mut ctx = match connect_and_check(&config.database.url).await {
2018 ContextOutcome::Ready(ctx) => ctx,
2019 ContextOutcome::UnsupportedVersion(e) => {
2020 crate::print_support_boundary_error("migrations attune", &e);
2021 return 2;
2022 }
2023 ContextOutcome::RuntimeError(msg) => {
2024 eprintln!("djogi migrations attune: pool: {msg}");
2025 return 1;
2026 }
2027 };
2028
2029 let lock_path = workspace.join(LOCK_FILE_NAME);
2033 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2034 Ok(g) => g,
2035 Err(e) => {
2036 eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
2037 return 1;
2038 }
2039 };
2040
2041 let req = AttuneRequest {
2042 workspace_root: workspace,
2043 database_url: &config.database.url,
2044 profile: &config.profile,
2045 dev_mode: config.database.dev_mode,
2049 target: target.as_deref(),
2054 apply,
2055 record,
2056 mode,
2057 _guard: &guard,
2058 };
2059 match attune(&mut ctx, req).await {
2060 Ok(report) => {
2061 if report.entries.is_empty() {
2062 println!("attune: no drift");
2063 } else {
2064 for entry in &report.entries {
2065 let app_display = if entry.bucket.app.is_empty() {
2066 "_global_"
2067 } else {
2068 entry.bucket.app.as_str()
2069 };
2070 println!(
2071 " {kind:<10} {database}/{app} {version}",
2072 kind = entry.kind.as_str(),
2073 database = entry.bucket.database,
2074 app = app_display,
2075 version = entry.version,
2076 );
2077 }
2078 }
2079 for diag in &report.diagnostics {
2083 println!(" diagnostic: {diag}");
2084 }
2085 if let Some(sha) = &report.resolved_target {
2086 println!("resolved target: {sha}");
2087 }
2088 if let Some(squashed) = &report.squashed_to {
2089 println!("squashed to: {squashed}");
2090 }
2091 if report.published {
2092 println!("published to remote");
2093 }
2094 if report.parent_pointer_updated {
2095 println!("parent submodule pointer updated");
2096 }
2097 0
2098 }
2099 Err(e) => {
2100 eprintln!("djogi migrations attune: {e}");
2101 attune_error_exit_code(&e)
2102 }
2103 }
2104}
2105
2106fn attune_error_exit_code(err: &AttuneError) -> i32 {
2121 match err {
2122 AttuneError::Refused(_) => 2,
2123 AttuneError::FilesystemScanFailed { .. }
2124 | AttuneError::LedgerQueryFailed { .. }
2125 | AttuneError::SqlReadFailed { .. }
2126 | AttuneError::SqlWriteFailed { .. }
2127 | AttuneError::SqlDeleteFailed { .. }
2128 | AttuneError::GitPublishFailed { .. }
2129 | AttuneError::GitTargetResolveFailed { .. }
2130 | AttuneError::GitFetchFailed { .. }
2131 | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
2132 }
2133}
2134
2135pub fn verify_cmd(
2143 provider: &dyn DescriptorProvider,
2144 workspace: Option<PathBuf>,
2145 strict: bool,
2146) -> ExitCode {
2147 let workspace = resolve_workspace(workspace);
2148
2149 let runtime = match tokio::runtime::Builder::new_current_thread()
2150 .enable_all()
2151 .build()
2152 {
2153 Ok(r) => r,
2154 Err(e) => {
2155 eprintln!("djogi migrations verify: tokio runtime: {e}");
2156 return ExitCode::from(1);
2157 }
2158 };
2159
2160 let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
2161 ExitCode::from(exit as u8)
2162}
2163
2164async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
2179 use djogi::config::DjogiConfig;
2180
2181 if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
2193 crate::print_zero_descriptor_diagnostic("migrations verify");
2194 return 2;
2195 }
2196
2197 let config = match DjogiConfig::load_from_workspace(workspace) {
2199 Ok(c) => c,
2200 Err(e) => {
2201 eprintln!("djogi migrations verify: config load: {e}");
2202 return 1;
2203 }
2204 };
2205
2206 let models = match project_from_provider(provider) {
2208 Ok(m) => m,
2209 Err(e) => {
2210 eprintln!("djogi migrations verify: projection error: {e}");
2211 return 1;
2212 }
2213 };
2214
2215 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
2221 models.keys().cloned().collect();
2222 for bucket in discover_snapshot_buckets_on_disk(workspace) {
2223 bucket_set.insert(bucket);
2224 }
2225 if bucket_set.is_empty() {
2232 crate::print_zero_descriptor_diagnostic("migrations verify");
2233 return 2;
2234 }
2235
2236 let policy = djogi::config::PolicyConfig {
2238 strict_out_of_order: strict,
2239 };
2240
2241 let database_has_models: std::collections::HashSet<String> = bucket_set
2248 .iter()
2249 .filter(|b| {
2250 models
2251 .get(*b)
2252 .map(|s| !s.models.is_empty())
2253 .unwrap_or(false)
2254 })
2255 .map(|b| b.database.clone())
2256 .collect();
2257
2258 let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
2264 std::collections::BTreeMap::new();
2265 let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
2266 let mut exit_code: i32 = 0;
2267
2268 for bucket in &bucket_set {
2270 let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
2272 let bd = if bucket.app.is_empty() {
2273 "_global_"
2274 } else {
2275 &bucket.app
2276 };
2277 eprintln!(
2278 "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
2279 check that config.database.url has a valid path component",
2280 bucket.database, bucket.database, bd
2281 );
2282 exit_code = 1;
2283 continue;
2284 };
2285
2286 if !contexts.contains_key(&bucket.database) {
2290 match connect_and_check(&url).await {
2291 ContextOutcome::Ready(ctx) => {
2292 contexts.insert(bucket.database.clone(), ctx);
2293 }
2294 ContextOutcome::UnsupportedVersion(e) => {
2295 crate::print_support_boundary_error("migrations verify", &e);
2296 return 2;
2297 }
2298 ContextOutcome::RuntimeError(msg) => {
2299 eprintln!(
2300 "djogi migrations verify: pool for '{}': {msg}",
2301 bucket.database
2302 );
2303 exit_code = 1;
2304 continue;
2305 }
2306 }
2307 }
2308
2309 let snap_path = snapshot_path(workspace, bucket);
2314 let snapshot = match load_snapshot(&snap_path) {
2315 Ok(s) => s,
2316 Err(SnapshotError::Io { source, .. })
2317 if source.kind() == std::io::ErrorKind::NotFound =>
2318 {
2319 let bd = if bucket.app.is_empty() {
2320 "_global_"
2321 } else {
2322 &bucket.app
2323 };
2324 let has_models = models
2325 .get(bucket)
2326 .map(|s| !s.models.is_empty())
2327 .unwrap_or(false);
2328 if has_models {
2329 eprintln!(
2330 "djogi migrations verify: {}/{} has registered models but no \
2331 snapshot; run `djogi migrations compose` then \
2332 `djogi migrations apply` to record a baseline",
2333 bucket.database, bd
2334 );
2335 exit_code = 1;
2336 } else {
2337 println!("No snapshot found for bucket {}/{}", bucket.database, bd);
2338 }
2339 continue;
2340 }
2341 Err(e) => {
2342 let bd = if bucket.app.is_empty() {
2343 "_global_"
2344 } else {
2345 &bucket.app
2346 };
2347 eprintln!(
2348 "djogi migrations verify: load snapshot for {}/{}: {e}",
2349 bucket.database, bd
2350 );
2351 exit_code = 1;
2352 continue;
2353 }
2354 };
2355
2356 let db_has_models = database_has_models.contains(&bucket.database);
2361 let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
2362
2363 let ctx = contexts
2365 .get_mut(&bucket.database)
2366 .expect("context inserted above");
2367 let report = match djogi::migrate::verify_bucket(
2368 ctx,
2369 bucket,
2370 &snapshot,
2371 &policy,
2372 emit_ledger,
2373 db_has_models,
2374 )
2375 .await
2376 {
2377 Ok(r) => r,
2378 Err(e) => {
2379 let bd = if bucket.app.is_empty() {
2380 "_global_"
2381 } else {
2382 &bucket.app
2383 };
2384 eprintln!(
2385 "djogi migrations verify: error for {}/{}: {e}",
2386 bucket.database, bd
2387 );
2388 exit_code = 1;
2389 continue;
2390 }
2391 };
2392
2393 for line in render_verify_report(&report, bucket) {
2395 println!("{line}");
2396 }
2397 if report.has_errors() {
2398 exit_code = 1;
2399 }
2400 }
2401
2402 exit_code
2403}
2404
2405fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
2413 let mut lines: Vec<String> = Vec::new();
2414
2415 let app_display = if bucket.app.is_empty() {
2416 "_global_"
2417 } else {
2418 &bucket.app
2419 };
2420 lines.push(format!(
2421 "djogi migrations verify — {}/{}",
2422 bucket.database, app_display
2423 ));
2424 lines.push("──────────────────────────────────────────".to_string());
2425
2426 match (
2427 &report.latest_applied_version,
2428 report.applied_count,
2429 report.unfinished_count,
2430 ) {
2431 (Some(version), applied, 0) => {
2432 lines.push(format!("Ledger: {applied} applied, latest {version}"));
2433 }
2434 (Some(version), applied, unfinished) => {
2435 lines.push(format!(
2436 "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
2437 ));
2438 }
2439 (None, 0, 0) => {
2440 lines.push("Ledger: empty (no migrations applied yet)".to_string());
2441 }
2442 _ => {}
2443 }
2444 lines.push(String::new());
2445
2446 if report.diagnostics.is_empty() {
2447 lines.push("No drift detected. Schema is consistent.".to_string());
2448 } else {
2449 for d in &report.diagnostics {
2450 let severity = match d.severity {
2451 VerifySeverity::Info => "INFO",
2452 VerifySeverity::Warning => "WARN",
2453 VerifySeverity::Error => "ERROR",
2454 };
2455 let location = d.location.as_deref().unwrap_or("-");
2456 lines.push(format!(
2457 "[{severity}] {code} ({loc}): {msg}",
2458 severity = severity,
2459 code = d.code,
2460 loc = location,
2461 msg = d.message
2462 ));
2463 }
2464 }
2465
2466 let errors = report
2467 .diagnostics
2468 .iter()
2469 .filter(|d| d.severity == VerifySeverity::Error)
2470 .count();
2471 let warnings = report
2472 .diagnostics
2473 .iter()
2474 .filter(|d| d.severity == VerifySeverity::Warning)
2475 .count();
2476 let infos = report
2477 .diagnostics
2478 .iter()
2479 .filter(|d| d.severity == VerifySeverity::Info)
2480 .count();
2481
2482 if errors > 0 {
2483 lines.push(String::new());
2484 lines.push(format!(
2485 "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
2486 ));
2487 } else if warnings > 0 {
2488 lines.push(String::new());
2489 lines.push(format!(
2490 "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
2491 ));
2492 } else {
2493 lines.push(String::new());
2494 lines.push(format!("Result: PASSED ({infos} info(s))"));
2495 }
2496
2497 lines
2498}
2499
2500fn render_drift_refusal(report_bucket: &BucketKey, report: &VerifyReport) -> Vec<String> {
2501 let mut lines = render_verify_report(report, report_bucket);
2502 lines.push(String::new());
2503 lines.push(
2504 "Apply refused before any migration SQL ran because error-severity drift was detected."
2505 .to_string(),
2506 );
2507 lines.push(
2508 "Next steps: inspect with `djogi migrations verify`, reconcile intentional drift \
2509 with `djogi migrations attune`, or if drift is from partial non-transactional \
2510 progress, resume with `djogi migrations repair resume-partial`."
2511 .to_string(),
2512 );
2513 lines
2514}
2515
2516impl From<PartialApplyResolutionCli> for PartialApplyResolution {
2519 fn from(cli: PartialApplyResolutionCli) -> Self {
2520 match cli {
2521 PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
2522 PartialApplyResolutionCli::Faked => Self::MarkFaked,
2523 PartialApplyResolutionCli::Applied => Self::MarkApplied,
2524 }
2525 }
2526}
2527
2528pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
2533 match command {
2534 RepairSubcommand::ChecksumDrift {
2535 version,
2536 app,
2537 database,
2538 checksum_up,
2539 checksum_down,
2540 workspace,
2541 } => repair_checksum_drift_cmd(
2542 &version,
2543 app.as_deref(),
2544 database.as_deref(),
2545 checksum_up.as_deref(),
2546 checksum_down.as_deref(),
2547 workspace,
2548 ),
2549 RepairSubcommand::PartialApply {
2550 version,
2551 resolution,
2552 note,
2553 app,
2554 database,
2555 workspace,
2556 } => repair_partial_apply_cmd(
2557 &version,
2558 resolution.into(),
2559 ¬e,
2560 app.as_deref(),
2561 database.as_deref(),
2562 workspace,
2563 ),
2564 RepairSubcommand::ResumePartial {
2565 version,
2566 app,
2567 database,
2568 workspace,
2569 node_id,
2570 single_node_dev,
2571 } => repair_resume_partial_apply_cmd(
2572 &version,
2573 app.as_deref(),
2574 database.as_deref(),
2575 workspace,
2576 node_id,
2577 single_node_dev,
2578 ),
2579 RepairSubcommand::SnapshotRebuild {
2580 app,
2581 database,
2582 snapshot_path,
2583 workspace,
2584 } => repair_snapshot_rebuild_cmd(
2585 app.as_deref(),
2586 database.as_deref(),
2587 snapshot_path.as_deref(),
2588 workspace,
2589 ),
2590 }
2591}
2592
2593fn render_repair_report(report: &RepairReport) {
2597 for action in &report.actions_taken {
2598 println!(" {action}");
2599 }
2600 if !report.ledger_changes.is_empty() {
2601 println!("Ledger changes:");
2602 for lc in &report.ledger_changes {
2603 println!(
2604 " {} | {} | {} -> {}",
2605 lc.version, lc.column, lc.before, lc.after,
2606 );
2607 }
2608 }
2609 if !report.snapshot_changes.is_empty() {
2610 println!("Snapshot changes:");
2611 for sc in &report.snapshot_changes {
2612 println!(" {} | {}", sc.path.display(), sc.description);
2613 }
2614 }
2615}
2616
2617fn repair_error_exit_code(err: &RepairError) -> i32 {
2631 match err {
2632 RepairError::LedgerIo { .. } | RepairError::SnapshotIo { .. } | RepairError::AdvisoryLockFailed { .. } | RepairError::AdvisoryLockQueryFailed { .. } | RepairError::PinnedSessionCheckoutFailed { .. } | RepairError::ResumeStepFailed { .. } | RepairError::ResumeProgressAckFailed { .. } | RepairError::Runner(..) => 1,
2644
2645 RepairError::VersionNotFound { .. }
2649 | RepairError::InsufficientConfirmation
2650 | RepairError::InvalidChecksum { .. }
2651 | RepairError::InvalidResolution { .. }
2652 | RepairError::BucketAppMismatch { .. }
2653 | RepairError::PlanVersionMismatch { .. }
2654 | RepairError::PlanChecksumMismatch { .. }
2655 | RepairError::LeafIdentityMismatch { .. }
2656 | RepairError::NothingToResume { .. }
2657 | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
2658 | RepairError::SuppliedSnapshotDiverges { .. }
2659 | RepairError::AdvisoryUnlockReturnedFalse { .. } | RepairError::ResumePlanShapeMismatch { .. }
2661 | RepairError::ReplayPlanShapeMismatch { .. }
2662 | RepairError::PhaseZeroArtifactRefused { .. } | RepairError::MissingResumeIdentity { .. } => 2,
2665 }
2666}
2667
2668fn rollback_error_exit_code(error: &RollbackError) -> i32 {
2672 match error {
2673 RollbackError::Runner { source, .. } => runner_error_exit_code(source),
2674 RollbackError::LossyRollbackRefused { .. }
2675 | RollbackError::VersionNotRollbackable { .. }
2676 | RollbackError::VersionNotFound { .. }
2677 | RollbackError::BucketAppMismatch { .. }
2678 | RollbackError::ChecksumDrift { .. }
2679 | RollbackError::PriorSnapshotMissing
2680 | RollbackError::LeafIdentityMismatch { .. }
2681 | RollbackError::StalePhaseZeroDown { .. }
2682 | RollbackError::SnapshotPersistFailed { .. }
2688 | RollbackError::MissingRollbackIdentity { .. } => 2,
2689 RollbackError::DownStatementFailed { .. } => 1,
2690 }
2691}
2692
2693#[allow(clippy::too_many_arguments)]
2695pub fn rollback_cmd(
2696 to: Option<String>,
2697 dry_run: bool,
2698 allow_data_loss: bool,
2699 reason: Option<String>,
2700 app: Option<&str>,
2701 database: Option<&str>,
2702 workspace: Option<PathBuf>,
2703 node_id: Option<u32>,
2704 single_node_dev: bool,
2705) -> ExitCode {
2706 if allow_data_loss {
2707 match reason.as_deref() {
2708 Some(reason) if !reason.trim().is_empty() => {}
2709 Some(_) => {
2710 eprintln!(
2711 "djogi migrations rollback --allow-data-loss: --reason must not be empty; \
2712 supply a non-empty reason why lossy rollback is acceptable"
2713 );
2714 return ExitCode::from(2);
2715 }
2716 None => {
2717 eprintln!(
2718 "djogi migrations rollback --allow-data-loss: --reason is required; \
2719 supply a reason why lossy rollback is acceptable. \
2720 This is recorded in the ledger audit trail."
2721 );
2722 return ExitCode::from(2);
2723 }
2724 }
2725 }
2726
2727 let workspace = resolve_workspace(workspace);
2728 let runtime = match tokio::runtime::Builder::new_current_thread()
2729 .enable_all()
2730 .build()
2731 {
2732 Ok(r) => r,
2733 Err(e) => {
2734 eprintln!("djogi migrations rollback: tokio runtime: {e}");
2735 return ExitCode::from(1);
2736 }
2737 };
2738
2739 let exit = runtime.block_on(async {
2740 run_rollback(
2741 &workspace,
2742 to.as_deref(),
2743 dry_run,
2744 allow_data_loss,
2745 reason.as_deref(),
2746 app,
2747 database,
2748 node_id,
2749 single_node_dev,
2750 )
2751 .await
2752 });
2753 ExitCode::from(exit as u8)
2754}
2755
2756#[allow(clippy::too_many_arguments)]
2757async fn run_rollback(
2758 workspace: &Path,
2759 to: Option<&str>,
2760 dry_run: bool,
2761 allow_data_loss: bool,
2762 reason: Option<&str>,
2763 app: Option<&str>,
2764 database: Option<&str>,
2765 node_id: Option<u32>,
2766 single_node_dev: bool,
2767) -> i32 {
2768 use djogi::config::DjogiConfig;
2769
2770 let config = match DjogiConfig::load_from_workspace(workspace) {
2771 Ok(config) => config,
2772 Err(e) => {
2773 eprintln!("djogi migrations rollback: config load: {e}");
2774 return 1;
2775 }
2776 };
2777
2778 let runner_identity: Option<djogi::migrate::RunnerIdentity> = if dry_run {
2779 None
2780 } else {
2781 match crate::identity::resolve_identity(
2782 node_id,
2783 single_node_dev,
2784 &config.profile,
2785 "rollback",
2786 ) {
2787 Ok(resolved) => Some(resolved.into_runner_identity()),
2788 Err(e) => {
2789 let _ = crate::identity::print_identity_error("rollback", &e);
2790 return 2;
2791 }
2792 }
2793 };
2794
2795 let db_name = resolve_database(database, &config);
2796 let url = match resolve_bucket_url(&config.database, &db_name) {
2797 Some(url) => url,
2798 None => {
2799 eprintln!("djogi migrations rollback: cannot derive a database URL for `{db_name}`");
2800 return 2;
2801 }
2802 };
2803
2804 let mut ctx = match connect_and_check(&url).await {
2805 ContextOutcome::Ready(ctx) => ctx,
2806 ContextOutcome::UnsupportedVersion(e) => {
2807 crate::print_support_boundary_error("migrations rollback", &e);
2808 return 2;
2809 }
2810 ContextOutcome::RuntimeError(msg) => {
2811 eprintln!("djogi migrations rollback: pool: {msg}");
2812 return 1;
2813 }
2814 };
2815
2816 let app_label = app.unwrap_or("");
2817 let bucket = BucketKey {
2818 database: db_name,
2819 app: app_label.to_string(),
2820 };
2821
2822 let pre_lock_rows = match read_ledger_rows_or_empty(&mut ctx).await {
2823 Ok(rows) => rows,
2824 Err(msg) => {
2825 eprintln!("djogi migrations rollback: ledger read: {msg}");
2826 return 1;
2827 }
2828 };
2829 let pre_lock_targets = match select_rollback_targets(&pre_lock_rows, app_label, to) {
2830 Ok(targets) => targets,
2831 Err(msg) => {
2832 eprintln!("djogi migrations rollback: {msg}");
2833 return 2;
2834 }
2835 };
2836 if pre_lock_targets.is_empty() {
2837 println!("Nothing to roll back.");
2838 return 0;
2839 }
2840
2841 if dry_run {
2842 let gated_targets = match gate_rollback_targets(workspace, &bucket, &pre_lock_targets) {
2843 Ok(targets) => targets,
2844 Err(RollbackCliGateError::Refusal(msg)) => {
2845 eprintln!("djogi migrations rollback: {msg}");
2846 return 2;
2847 }
2848 Err(RollbackCliGateError::Io(msg)) => {
2849 eprintln!("djogi migrations rollback: {msg}");
2850 return 1;
2851 }
2852 };
2853 if !allow_data_loss && let Some((version, markers)) = first_lossy_target(&gated_targets) {
2854 eprintln!("djogi migrations rollback: rollback refused for `{version}`:");
2855 for marker in markers {
2856 eprintln!(" {marker}");
2857 }
2858 eprintln!("pass --allow-data-loss with --reason to proceed");
2859 return 2;
2860 }
2861 print_rollback_data_loss_warning();
2862 for target in &gated_targets {
2863 println!(
2864 "-- rollback {} ({}/{})",
2865 target.row.version,
2866 bucket.database,
2867 display_bucket_app(&bucket.app)
2868 );
2869 print!("{}", target.down_sql);
2870 if !target.down_sql.ends_with('\n') {
2871 println!();
2872 }
2873 }
2874 println!(
2875 "preview of the current ledger state; the real run re-reads the ledger under the workspace lock"
2876 );
2877 println!("dry run — nothing executed.");
2878 return 0;
2879 }
2880
2881 let lock_path = workspace.join(LOCK_FILE_NAME);
2882 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2883 Ok(guard) => guard,
2884 Err(e) => {
2885 eprintln!("djogi migrations rollback: workspace lock: {e}");
2886 return 1;
2887 }
2888 };
2889
2890 let locked_rows = match read_ledger_rows_or_empty(&mut ctx).await {
2891 Ok(rows) => rows,
2892 Err(msg) => {
2893 eprintln!("djogi migrations rollback: ledger read: {msg}");
2894 return 1;
2895 }
2896 };
2897 let locked_targets = match select_rollback_targets(&locked_rows, app_label, to) {
2898 Ok(targets) => targets,
2899 Err(msg) => {
2900 eprintln!("djogi migrations rollback: {msg}");
2901 return 2;
2902 }
2903 };
2904 if let Err(msg) = ensure_no_target_drift(&pre_lock_targets, &locked_targets) {
2905 eprintln!("djogi migrations rollback: {msg}");
2906 return 2;
2907 }
2908
2909 let gated_targets = match gate_rollback_targets(workspace, &bucket, &locked_targets) {
2910 Ok(targets) => targets,
2911 Err(RollbackCliGateError::Refusal(msg)) => {
2912 eprintln!("djogi migrations rollback: {msg}");
2913 return 2;
2914 }
2915 Err(RollbackCliGateError::Io(msg)) => {
2916 eprintln!("djogi migrations rollback: {msg}");
2917 return 1;
2918 }
2919 };
2920
2921 if !allow_data_loss && let Some((version, markers)) = first_lossy_target(&gated_targets) {
2922 eprintln!("djogi migrations rollback: rollback refused for `{version}`:");
2923 for marker in markers {
2924 eprintln!(" {marker}");
2925 }
2926 eprintln!("pass --allow-data-loss with --reason to proceed");
2927 return 2;
2928 }
2929
2930 print_rollback_data_loss_warning();
2931
2932 let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
2933 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
2934 Err(_) => None,
2935 };
2936 let mut rolled_back_count = 0usize;
2937 let mut loop_failure: Option<djogi::migrate::RollbackError> = None;
2938 let lossy_reason = reason.map(str::to_string);
2939
2940 for target in gated_targets {
2941 let plan = djogi::migrate::MigrationPlan {
2942 bucket: bucket.clone(),
2943 classification: djogi::migrate::Classification::Additive,
2944 segments: vec![djogi::migrate::Segment {
2945 kind: djogi::migrate::SegmentKind::Transactional,
2946 statements: vec![djogi::migrate::OperationSql {
2947 label: format!("rollback {}", target.row.version),
2948 up: target.up_sql.clone(),
2949 down: target.down_sql.clone(),
2950 lossy: None,
2951 }],
2952 }],
2953 };
2954 let runner_ctx = RunnerCtx {
2955 bucket: bucket.clone(),
2956 version: target.row.version.clone(),
2957 description: target.row.description.clone(),
2958 checksum_up: target.checksum_up.clone(),
2959 checksum_down: target.checksum_down.clone(),
2960 snapshot: None,
2961 snapshot_path: None,
2962 config: djogi::config::MigrateConfig {
2963 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
2964 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
2965 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
2966 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
2967 },
2968 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
2969 audit_pool: audit_pool.clone(),
2970 runner_identity,
2971 drift_baseline: DriftBaseline::Disabled,
2976 };
2977 let policy = match lossy_reason.as_deref() {
2978 Some(reason) => djogi::migrate::LossyRollbackPolicy::Allow {
2979 reason: reason.to_string(),
2980 },
2981 None => djogi::migrate::LossyRollbackPolicy::Refuse,
2982 };
2983
2984 println!(" rolling back {}...", target.row.version);
2985 match djogi::migrate::rollback_plan(&mut ctx, &plan, &runner_ctx, &guard, policy, None)
2986 .await
2987 {
2988 Ok(report) => {
2989 if let Some(lossy_reason) = report.lossy_reason.as_deref() {
2990 println!(
2991 " rolled back {} (lossy reason: {lossy_reason})",
2992 target.row.version
2993 );
2994 } else {
2995 println!(" rolled back {}", target.row.version);
2996 }
2997 rolled_back_count += 1;
2998 }
2999 Err(e) => {
3000 eprintln!("djogi migrations rollback: {e}");
3001 loop_failure = Some(e);
3002 break;
3003 }
3004 }
3005 }
3006
3007 let snapshot_path = reconstruct_snapshot_path(workspace, &bucket);
3008 let live_db_mutated = rolled_back_count > 0
3009 || loop_failure
3010 .as_ref()
3011 .is_some_and(djogi::migrate::RollbackError::live_db_committed);
3012
3013 match loop_failure {
3014 None => {
3015 match repair_snapshot_rebuild(
3016 &mut ctx,
3017 &guard,
3018 &bucket,
3019 &snapshot_path,
3020 RepairConfirmation::OperatorAcknowledged,
3021 )
3022 .await
3023 {
3024 Ok(_) => {
3025 println!(
3026 "rolled back {rolled_back_count} migration(s); snapshot re-projected."
3027 );
3028 0
3029 }
3030 Err(e) => {
3031 eprintln!(
3032 "djogi migrations rollback: rollback recorded; snapshot rebuild failed: {e} — run `djogi migrations repair snapshot-rebuild --app {} --database {}` to restore the snapshot",
3033 bucket.app, bucket.database,
3034 );
3035 2
3036 }
3037 }
3038 }
3039 Some(e) if live_db_mutated => {
3040 match repair_snapshot_rebuild(
3041 &mut ctx,
3042 &guard,
3043 &bucket,
3044 &snapshot_path,
3045 RepairConfirmation::OperatorAcknowledged,
3046 )
3047 .await
3048 {
3049 Ok(_) => {
3050 println!("snapshot re-projected to match committed rollback work.");
3051 }
3052 Err(rebuild_error) => {
3053 eprintln!(
3054 "djogi migrations rollback: snapshot may be stale: {rebuild_error} — run `djogi migrations repair snapshot-rebuild --app {} --database {}` to restore the snapshot",
3055 bucket.app, bucket.database,
3056 );
3057 }
3058 }
3059 rollback_error_exit_code(&e)
3060 }
3061 Some(e) => rollback_error_exit_code(&e),
3062 }
3063}
3064
3065fn select_rollback_targets<'a>(
3068 rows: &'a [djogi::migrate::LedgerSummaryRow],
3069 app_label: &str,
3070 to: Option<&str>,
3071) -> Result<Vec<&'a djogi::migrate::LedgerSummaryRow>, String> {
3072 use djogi::migrate::LedgerStatus;
3073
3074 let mut bucket_rows: Vec<&djogi::migrate::LedgerSummaryRow> = rows
3075 .iter()
3076 .filter(|row| row.app_label == app_label)
3077 .collect();
3078 bucket_rows.sort_by_key(|row| std::cmp::Reverse(row.id));
3079
3080 let floor_id = match to {
3081 None => None,
3082 Some(version) => {
3083 let target = bucket_rows
3084 .iter()
3085 .find(|row| row.version == version)
3086 .ok_or_else(|| {
3087 format!("--to version `{version}` is not present in this bucket's ledger")
3088 })?;
3089 if !matches!(
3090 target.status,
3091 LedgerStatus::Applied | LedgerStatus::Faked | LedgerStatus::Baseline
3092 ) {
3093 return Err(format!(
3094 "--to version `{version}` has status `{status}`; the rollback \
3095 target must remain applied (applied / faked / baseline)",
3096 status = target.status.as_db_str(),
3097 ));
3098 }
3099 Some(target.id)
3100 }
3101 };
3102
3103 let mut targets = Vec::new();
3104 for row in &bucket_rows {
3105 if let Some(floor) = floor_id
3106 && row.id <= floor
3107 {
3108 break;
3109 }
3110 match row.status {
3111 LedgerStatus::RolledBack => continue,
3112 LedgerStatus::Applied | LedgerStatus::Faked => {
3113 targets.push(*row);
3114 if floor_id.is_none() {
3115 break;
3116 }
3117 }
3118 LedgerStatus::Pending | LedgerStatus::Failed => {
3119 return Err(format!(
3120 "ledger row `{version}` has status `{status}`; resolve it with \
3121 `djogi migrations repair` before rolling back past it",
3122 version = row.version,
3123 status = row.status.as_db_str(),
3124 ));
3125 }
3126 LedgerStatus::Baseline => {
3127 if floor_id.is_none() {
3128 break;
3129 }
3130 return Err(format!(
3131 "cannot roll back past baseline row `{version}`",
3132 version = row.version,
3133 ));
3134 }
3135 }
3136 }
3137
3138 Ok(targets)
3139}
3140
3141fn ensure_no_target_drift(
3144 pre_lock: &[&djogi::migrate::LedgerSummaryRow],
3145 locked: &[&djogi::migrate::LedgerSummaryRow],
3146) -> Result<(), String> {
3147 let key = |set: &[&djogi::migrate::LedgerSummaryRow]| -> Vec<(i64, String, String)> {
3154 set.iter()
3155 .map(|row| {
3156 (
3157 row.id,
3158 row.version.clone(),
3159 row.status.as_db_str().to_string(),
3160 )
3161 })
3162 .collect()
3163 };
3164 if key(pre_lock) != key(locked) {
3165 return Err(
3166 "ledger changed while waiting for the workspace lock; rerun the command".to_string(),
3167 );
3168 }
3169 Ok(())
3170}
3171
3172fn scan_lossy_down_markers(down_sql: &str) -> Vec<String> {
3174 down_sql
3175 .lines()
3176 .map(str::trim_start)
3177 .filter(|line| line.starts_with("-- LOSSY"))
3178 .map(str::to_string)
3179 .collect()
3180}
3181
3182#[derive(Debug)]
3183enum RollbackCliGateError {
3184 Refusal(String),
3185 Io(String),
3186}
3187
3188#[derive(Debug)]
3189struct GatedRollbackTarget<'a> {
3190 row: &'a djogi::migrate::LedgerSummaryRow,
3191 up_sql: String,
3192 down_sql: String,
3193 checksum_up: String,
3194 checksum_down: Option<String>,
3195 lossy_markers: Vec<String>,
3196}
3197
3198async fn read_ledger_rows_or_empty(
3199 ctx: &mut djogi::context::DjogiContext,
3200) -> Result<Vec<djogi::migrate::LedgerSummaryRow>, String> {
3201 match djogi::migrate::select_all_ledger_rows(ctx).await {
3202 Ok(rows) => Ok(rows),
3203 Err(e) => {
3204 if e.to_string().contains("djogi_schema_migrations") {
3205 Ok(Vec::new())
3206 } else {
3207 Err(e.to_string())
3208 }
3209 }
3210 }
3211}
3212
3213fn gate_rollback_targets<'a>(
3214 workspace: &Path,
3215 bucket: &BucketKey,
3216 rows: &[&'a djogi::migrate::LedgerSummaryRow],
3217) -> Result<Vec<GatedRollbackTarget<'a>>, RollbackCliGateError> {
3218 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
3219 let mut gated = Vec::with_capacity(rows.len());
3220
3221 for row in rows {
3222 let down_path = bucket_dir.join(djogi::migrate::down_filename(&row.version));
3223 let down_sql = match std::fs::read_to_string(&down_path) {
3224 Ok(sql) => sql,
3225 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3226 return Err(RollbackCliGateError::Refusal(format!(
3227 "version `{}` has no committed down file; use `djogi migrations repair partial-apply {} rolled-back` if the rollback already happened out of band",
3228 row.version, row.version
3229 )));
3230 }
3231 Err(e) => {
3232 return Err(RollbackCliGateError::Io(format!(
3233 "read down SQL {}: {e}",
3234 down_path.display()
3235 )));
3236 }
3237 };
3238 let checksum_down = djogi::migrate::compute_committed_down_sql_checksum(&down_sql);
3239 if checksum_down.is_none() {
3240 return Err(RollbackCliGateError::Refusal(format!(
3241 "version `{}` has no executable down SQL; use `djogi migrations repair partial-apply {} rolled-back` if the rollback already happened out of band",
3242 row.version, row.version
3243 )));
3244 }
3245 if let Some(shape) = djogi::migrate::find_non_transactional_statement_shape(&down_sql) {
3246 return Err(RollbackCliGateError::Refusal(format!(
3247 "version `{}` contains non-transactional down SQL (`{shape}`); use the library rollback entry point for this migration",
3248 row.version
3249 )));
3250 }
3251
3252 let up_path = bucket_dir.join(djogi::migrate::up_filename(&row.version));
3253 let up_sql = std::fs::read_to_string(&up_path).map_err(|e| {
3254 if e.kind() == std::io::ErrorKind::NotFound {
3255 RollbackCliGateError::Refusal(format!(
3256 "version `{}` is missing its committed up file",
3257 row.version
3258 ))
3259 } else {
3260 RollbackCliGateError::Io(format!("read up SQL {}: {e}", up_path.display()))
3261 }
3262 })?;
3263 let checksum_up = djogi::migrate::compute_committed_sql_checksum(
3264 &up_sql,
3265 djogi::migrate::ResetSqlSide::Up,
3266 );
3267 let lossy_markers = scan_lossy_down_markers(&down_sql);
3268
3269 gated.push(GatedRollbackTarget {
3270 row,
3271 up_sql,
3272 down_sql,
3273 checksum_up,
3274 checksum_down,
3275 lossy_markers,
3276 });
3277 }
3278
3279 Ok(gated)
3280}
3281
3282fn first_lossy_target<'a>(
3283 targets: &'a [GatedRollbackTarget<'a>],
3284) -> Option<(&'a str, &'a [String])> {
3285 targets
3286 .iter()
3287 .find(|target| !target.lossy_markers.is_empty())
3288 .map(|target| (target.row.version.as_str(), target.lossy_markers.as_slice()))
3289}
3290
3291fn display_bucket_app(app_label: &str) -> &str {
3292 if app_label.is_empty() {
3293 "_global_"
3294 } else {
3295 app_label
3296 }
3297}
3298
3299fn print_rollback_data_loss_warning() {
3300 eprintln!(
3301 "WARNING: rollback executes committed down SQL and may permanently remove data or schema state."
3302 );
3303}
3304
3305fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
3312 database.unwrap_or("main").to_string()
3313}
3314
3315fn compute_checksum_up_from_disk(
3329 workspace: &Path,
3330 bucket: &djogi::migrate::BucketKey,
3331 version: &str,
3332) -> std::io::Result<String> {
3333 let path =
3334 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
3335 let sql = std::fs::read_to_string(&path)?;
3336 Ok(djogi::migrate::compute_committed_sql_checksum(
3337 &sql,
3338 djogi::migrate::ResetSqlSide::Up,
3339 ))
3340}
3341
3342fn compute_checksum_down_from_disk(
3352 workspace: &Path,
3353 bucket: &djogi::migrate::BucketKey,
3354 version: &str,
3355) -> std::io::Result<Option<String>> {
3356 let path =
3357 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
3358 let sql = match std::fs::read_to_string(&path) {
3359 Ok(s) => s,
3360 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
3361 Err(e) => return Err(e),
3362 };
3363 Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
3364}
3365
3366pub fn repair_checksum_drift_cmd(
3373 version: &str,
3374 app: Option<&str>,
3375 database: Option<&str>,
3376 checksum_up: Option<&str>,
3377 checksum_down: Option<&str>,
3378 workspace: Option<PathBuf>,
3379) -> ExitCode {
3380 let workspace = resolve_workspace(workspace);
3381 let runtime = match tokio::runtime::Builder::new_current_thread()
3382 .enable_all()
3383 .build()
3384 {
3385 Ok(r) => r,
3386 Err(e) => {
3387 eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
3388 return ExitCode::from(1);
3389 }
3390 };
3391 let exit = runtime.block_on(async {
3392 run_repair_checksum_drift(
3393 &workspace,
3394 version,
3395 app,
3396 database,
3397 checksum_up,
3398 checksum_down,
3399 )
3400 .await
3401 });
3402 ExitCode::from(exit as u8)
3403}
3404
3405async fn run_repair_checksum_drift(
3407 workspace: &Path,
3408 version: &str,
3409 app: Option<&str>,
3410 database: Option<&str>,
3411 checksum_up: Option<&str>,
3412 checksum_down: Option<&str>,
3413) -> i32 {
3414 use djogi::config::DjogiConfig;
3415
3416 let config = match DjogiConfig::load_from_workspace(workspace) {
3417 Ok(c) => c,
3418 Err(e) => {
3419 eprintln!("djogi migrations repair checksum-drift: config load: {e}");
3420 return 1;
3421 }
3422 };
3423
3424 let db_name = resolve_database(database, &config);
3429 let url = match resolve_bucket_url(&config.database, &db_name) {
3430 Some(u) => u,
3431 None => {
3432 eprintln!(
3433 "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
3434 );
3435 return 2;
3436 }
3437 };
3438
3439 let mut ctx = match connect_and_check(&url).await {
3440 ContextOutcome::Ready(ctx) => ctx,
3441 ContextOutcome::UnsupportedVersion(e) => {
3442 crate::print_support_boundary_error("migrations repair checksum-drift", &e);
3443 return 2;
3444 }
3445 ContextOutcome::RuntimeError(msg) => {
3446 eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
3447 return 1;
3448 }
3449 };
3450
3451 let lock_path = workspace.join(LOCK_FILE_NAME);
3452 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3453 Ok(g) => g,
3454 Err(e) => {
3455 eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
3456 return 1;
3457 }
3458 };
3459
3460 let app_label = app.unwrap_or("");
3461 let bucket = BucketKey {
3462 database: db_name,
3463 app: app_label.to_string(),
3464 };
3465
3466 let new_checksum_up = match checksum_up {
3467 Some(c) => c.to_string(),
3468 None => {
3469 match compute_checksum_up_from_disk(workspace, &bucket, version) {
3475 Ok(cs) => cs,
3476 Err(e) => {
3477 eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
3478 return 1;
3479 }
3480 }
3481 }
3482 };
3483
3484 let resolved_checksum_down = match checksum_down {
3485 Some(c) => Some(c.to_string()),
3486 None => {
3487 match compute_checksum_down_from_disk(workspace, &bucket, version) {
3492 Ok(cs_opt) => cs_opt,
3493 Err(e) => {
3494 eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
3495 return 1;
3496 }
3497 }
3498 }
3499 };
3500
3501 match repair_checksum_drift(
3502 &mut ctx,
3503 &guard,
3504 &bucket,
3505 version,
3506 workspace,
3507 &new_checksum_up,
3508 resolved_checksum_down.as_deref(),
3509 RepairConfirmation::OperatorAcknowledged,
3510 )
3511 .await
3512 {
3513 Ok(report) => {
3514 render_repair_report(&report);
3515 0
3516 }
3517 Err(e) => {
3518 eprintln!("djogi migrations repair checksum-drift: {e}");
3519 repair_error_exit_code(&e)
3520 }
3521 }
3522}
3523
3524pub fn repair_partial_apply_cmd(
3529 version: &str,
3530 resolution: PartialApplyResolution,
3531 note: &str,
3532 app: Option<&str>,
3533 database: Option<&str>,
3534 workspace: Option<PathBuf>,
3535) -> ExitCode {
3536 let workspace = resolve_workspace(workspace);
3537 let runtime = match tokio::runtime::Builder::new_current_thread()
3538 .enable_all()
3539 .build()
3540 {
3541 Ok(r) => r,
3542 Err(e) => {
3543 eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
3544 return ExitCode::from(1);
3545 }
3546 };
3547 let exit = runtime.block_on(async {
3548 run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
3549 });
3550 ExitCode::from(exit as u8)
3551}
3552
3553async fn run_repair_partial_apply(
3555 workspace: &Path,
3556 version: &str,
3557 resolution: PartialApplyResolution,
3558 note: &str,
3559 app: Option<&str>,
3560 database: Option<&str>,
3561) -> i32 {
3562 use djogi::config::DjogiConfig;
3563
3564 let config = match DjogiConfig::load_from_workspace(workspace) {
3565 Ok(c) => c,
3566 Err(e) => {
3567 eprintln!("djogi migrations repair partial-apply: config load: {e}");
3568 return 1;
3569 }
3570 };
3571
3572 let db_name = resolve_database(database, &config);
3577 let url = match resolve_bucket_url(&config.database, &db_name) {
3578 Some(u) => u,
3579 None => {
3580 eprintln!(
3581 "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
3582 );
3583 return 2;
3584 }
3585 };
3586
3587 let mut ctx = match connect_and_check(&url).await {
3588 ContextOutcome::Ready(ctx) => ctx,
3589 ContextOutcome::UnsupportedVersion(e) => {
3590 crate::print_support_boundary_error("migrations repair partial-apply", &e);
3591 return 2;
3592 }
3593 ContextOutcome::RuntimeError(msg) => {
3594 eprintln!("djogi migrations repair partial-apply: pool: {msg}");
3595 return 1;
3596 }
3597 };
3598
3599 let lock_path = workspace.join(LOCK_FILE_NAME);
3600 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3601 Ok(g) => g,
3602 Err(e) => {
3603 eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
3604 return 1;
3605 }
3606 };
3607
3608 let app_label = app.unwrap_or("");
3609 let bucket = BucketKey {
3610 database: db_name,
3611 app: app_label.to_string(),
3612 };
3613
3614 match repair_partial_apply(
3615 &mut ctx,
3616 &guard,
3617 &bucket,
3618 version,
3619 workspace,
3620 resolution,
3621 note,
3622 RepairConfirmation::OperatorAcknowledged,
3623 )
3624 .await
3625 {
3626 Ok(report) => {
3627 render_repair_report(&report);
3628 0
3629 }
3630 Err(e) => {
3631 eprintln!("djogi migrations repair partial-apply: {e}");
3632 repair_error_exit_code(&e)
3633 }
3634 }
3635}
3636
3637pub fn repair_resume_partial_apply_cmd(
3643 version: &str,
3644 app: Option<&str>,
3645 database: Option<&str>,
3646 workspace: Option<PathBuf>,
3647 node_id: Option<u32>,
3648 single_node_dev: bool,
3649) -> ExitCode {
3650 let workspace = resolve_workspace(workspace);
3651 let runtime = match tokio::runtime::Builder::new_current_thread()
3652 .enable_all()
3653 .build()
3654 {
3655 Ok(r) => r,
3656 Err(e) => {
3657 eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
3658 return ExitCode::from(1);
3659 }
3660 };
3661 let exit = runtime.block_on(async {
3662 run_repair_resume_partial(&workspace, version, app, database, node_id, single_node_dev)
3663 .await
3664 });
3665 ExitCode::from(exit as u8)
3666}
3667
3668async fn run_repair_resume_partial(
3670 workspace: &Path,
3671 version: &str,
3672 app: Option<&str>,
3673 database: Option<&str>,
3674 node_id: Option<u32>,
3675 single_node_dev: bool,
3676) -> i32 {
3677 use djogi::config::DjogiConfig;
3678
3679 let config = match DjogiConfig::load_from_workspace(workspace) {
3680 Ok(c) => c,
3681 Err(e) => {
3682 eprintln!("djogi migrations repair resume-partial: config load: {e}");
3683 return 1;
3684 }
3685 };
3686
3687 let runner_identity = match crate::identity::resolve_identity(
3689 node_id,
3690 single_node_dev,
3691 &config.profile,
3692 "repair resume-partial",
3693 ) {
3694 Ok(resolved) => Some(resolved.into_runner_identity()),
3695 Err(e) => {
3696 let _ = crate::identity::print_identity_error("repair resume-partial", &e);
3697 return 2;
3698 }
3699 };
3700
3701 let db_name = resolve_database(database, &config);
3706 let url = match resolve_bucket_url(&config.database, &db_name) {
3707 Some(u) => u,
3708 None => {
3709 eprintln!(
3710 "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
3711 );
3712 return 2;
3713 }
3714 };
3715
3716 let mut ctx = match connect_and_check(&url).await {
3717 ContextOutcome::Ready(ctx) => ctx,
3718 ContextOutcome::UnsupportedVersion(e) => {
3719 crate::print_support_boundary_error("migrations repair resume-partial", &e);
3720 return 2;
3721 }
3722 ContextOutcome::RuntimeError(msg) => {
3723 eprintln!("djogi migrations repair resume-partial: pool: {msg}");
3724 return 1;
3725 }
3726 };
3727
3728 let lock_path = workspace.join(LOCK_FILE_NAME);
3729 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3730 Ok(g) => g,
3731 Err(e) => {
3732 eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
3733 return 1;
3734 }
3735 };
3736
3737 let app_label = app.unwrap_or("");
3738 let bucket = BucketKey {
3739 database: db_name,
3740 app: app_label.to_string(),
3741 };
3742
3743 let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
3749 Ok(p) => p,
3750 Err(e) => {
3751 eprintln!("djogi migrations repair resume-partial: load plan: {e}");
3752 return 2;
3753 }
3754 };
3755
3756 match repair_resume_partial_apply(
3757 &mut ctx,
3758 &guard,
3759 workspace,
3760 version,
3761 &plan,
3762 runner_identity,
3763 RepairConfirmation::OperatorAcknowledged,
3764 )
3765 .await
3766 {
3767 Ok(report) => {
3768 render_repair_report(&report);
3769 0
3770 }
3771 Err(e) => {
3772 eprintln!("djogi migrations repair resume-partial: {e}");
3773 repair_error_exit_code(&e)
3774 }
3775 }
3776}
3777
3778fn load_committed_plan_for_resume(
3792 workspace: &Path,
3793 bucket: &djogi::migrate::BucketKey,
3794 version: &str,
3795) -> Result<djogi::migrate::MigrationPlan, String> {
3796 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
3797 let plan_path = bucket_dir.join(format!("{version}.plan.json"));
3798 let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
3799 let stored: CliReplayPlan = serde_json::from_slice(&bytes)
3800 .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
3801 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
3802 return Err(format!(
3803 "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
3804 plan_path.display(),
3805 stored.format_version,
3806 ));
3807 }
3808 Ok(djogi::migrate::MigrationPlan {
3809 bucket: bucket.clone(),
3810 classification: stored.classification.into(),
3811 segments: stored
3812 .segments
3813 .into_iter()
3814 .map(|seg| djogi::migrate::Segment {
3815 kind: seg.kind.into(),
3816 statements: seg
3817 .statements
3818 .into_iter()
3819 .map(|stmt| djogi::migrate::OperationSql {
3820 label: stmt.label,
3821 up: stmt.up,
3822 down: String::new(),
3823 lossy: None,
3824 })
3825 .collect(),
3826 })
3827 .collect(),
3828 })
3829}
3830
3831pub fn repair_snapshot_rebuild_cmd(
3837 app: Option<&str>,
3838 database: Option<&str>,
3839 snapshot_path: Option<&Path>,
3840 workspace: Option<PathBuf>,
3841) -> ExitCode {
3842 let workspace = resolve_workspace(workspace);
3843 let runtime = match tokio::runtime::Builder::new_current_thread()
3844 .enable_all()
3845 .build()
3846 {
3847 Ok(r) => r,
3848 Err(e) => {
3849 eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
3850 return ExitCode::from(1);
3851 }
3852 };
3853 let exit = runtime.block_on(async {
3854 run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
3855 });
3856 ExitCode::from(exit as u8)
3857}
3858
3859async fn run_repair_snapshot_rebuild(
3861 workspace: &Path,
3862 app: Option<&str>,
3863 database: Option<&str>,
3864 snapshot_path: Option<&Path>,
3865) -> i32 {
3866 use djogi::config::DjogiConfig;
3867
3868 let config = match DjogiConfig::load_from_workspace(workspace) {
3869 Ok(c) => c,
3870 Err(e) => {
3871 eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
3872 return 1;
3873 }
3874 };
3875
3876 let db_name = resolve_database(database, &config);
3881 let url = match resolve_bucket_url(&config.database, &db_name) {
3882 Some(u) => u,
3883 None => {
3884 eprintln!(
3885 "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
3886 );
3887 return 2;
3888 }
3889 };
3890
3891 let mut ctx = match connect_and_check(&url).await {
3892 ContextOutcome::Ready(ctx) => ctx,
3893 ContextOutcome::UnsupportedVersion(e) => {
3894 crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
3895 return 2;
3896 }
3897 ContextOutcome::RuntimeError(msg) => {
3898 eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
3899 return 1;
3900 }
3901 };
3902
3903 let lock_path = workspace.join(LOCK_FILE_NAME);
3904 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3905 Ok(g) => g,
3906 Err(e) => {
3907 eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
3908 return 1;
3909 }
3910 };
3911
3912 let app_label = app.unwrap_or("");
3913 let bucket = BucketKey {
3914 database: db_name,
3915 app: app_label.to_string(),
3916 };
3917
3918 let snap_path = match snapshot_path {
3919 Some(p) => p.to_path_buf(),
3920 None => reconstruct_snapshot_path(workspace, &bucket),
3921 };
3922
3923 match repair_snapshot_rebuild(
3924 &mut ctx,
3925 &guard,
3926 &bucket,
3927 &snap_path,
3928 RepairConfirmation::OperatorAcknowledged,
3929 )
3930 .await
3931 {
3932 Ok(report) => {
3933 render_repair_report(&report);
3934 0
3935 }
3936 Err(e) => {
3937 eprintln!("djogi migrations repair snapshot-rebuild: {e}");
3938 repair_error_exit_code(&e)
3939 }
3940 }
3941}
3942
3943#[expect(
3961 clippy::too_many_arguments,
3962 reason = "CLI command entry point mirrors clap arguments explicitly"
3963)]
3964pub fn baseline_cmd(
3965 version: &str,
3966 description: &str,
3967 reason: &str,
3968 app: Option<&str>,
3969 database: Option<&str>,
3970 workspace: Option<PathBuf>,
3971 node_id: Option<u32>,
3972 single_node_dev: bool,
3973) -> ExitCode {
3974 if reason.trim().is_empty() {
3979 eprintln!(
3980 "djogi migrations baseline: --reason must not be empty; \
3981 supply a non-empty reason why this baseline is being established \
3982 (e.g. 'schema pre-exists from prior tooling'). \
3983 This is recorded in the ledger audit trail."
3984 );
3985 return ExitCode::from(2);
3986 }
3987
3988 let workspace = resolve_workspace(workspace);
3989 let runtime = match tokio::runtime::Builder::new_current_thread()
3990 .enable_all()
3991 .build()
3992 {
3993 Ok(r) => r,
3994 Err(e) => {
3995 eprintln!("djogi migrations baseline: tokio runtime: {e}");
3996 return ExitCode::from(1);
3997 }
3998 };
3999 let exit = runtime.block_on(async {
4000 run_baseline(
4001 &workspace,
4002 version,
4003 description,
4004 reason,
4005 app,
4006 database,
4007 node_id,
4008 single_node_dev,
4009 )
4010 .await
4011 });
4012 ExitCode::from(exit as u8)
4013}
4014
4015#[expect(
4026 clippy::too_many_arguments,
4027 reason = "baseline async body keeps CLI arguments explicit through validation and connection setup"
4028)]
4029async fn run_baseline(
4030 workspace: &Path,
4031 version: &str,
4032 description: &str,
4033 reason: &str,
4034 app: Option<&str>,
4035 database: Option<&str>,
4036 node_id: Option<u32>,
4037 single_node_dev: bool,
4038) -> i32 {
4039 use djogi::config::DjogiConfig;
4040
4041 let config = match DjogiConfig::load_from_workspace(workspace) {
4042 Ok(c) => c,
4043 Err(e) => {
4044 eprintln!("djogi migrations baseline: config load: {e}");
4045 return 1;
4046 }
4047 };
4048
4049 let runner_identity = match crate::identity::resolve_identity(
4051 node_id,
4052 single_node_dev,
4053 &config.profile,
4054 "baseline",
4055 ) {
4056 Ok(resolved) => Some(resolved.into_runner_identity()),
4057 Err(e) => {
4058 let _ = crate::identity::print_identity_error("baseline", &e);
4059 return 2;
4060 }
4061 };
4062
4063 let db_name = resolve_database(database, &config);
4068 let url = match resolve_bucket_url(&config.database, &db_name) {
4069 Some(u) => u,
4070 None => {
4071 eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
4072 return 2;
4073 }
4074 };
4075
4076 let mut ctx = match connect_and_check(&url).await {
4077 ContextOutcome::Ready(ctx) => ctx,
4078 ContextOutcome::UnsupportedVersion(e) => {
4079 crate::print_support_boundary_error("migrations baseline", &e);
4080 return 2;
4081 }
4082 ContextOutcome::RuntimeError(msg) => {
4083 eprintln!("djogi migrations baseline: pool: {msg}");
4084 return 1;
4085 }
4086 };
4087
4088 let lock_path = workspace.join(LOCK_FILE_NAME);
4089 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
4090 Ok(g) => g,
4091 Err(e) => {
4092 eprintln!("djogi migrations baseline: workspace lock: {e}");
4093 return 1;
4094 }
4095 };
4096
4097 let app_label = app.unwrap_or("");
4098 let bucket = BucketKey {
4099 database: db_name,
4100 app: app_label.to_string(),
4101 };
4102
4103 let runner_ctx = RunnerCtx {
4104 bucket: bucket.clone(),
4105 version: version.to_string(),
4106 description: description.to_string(),
4107 checksum_up: String::new(),
4110 checksum_down: None,
4111 snapshot: None,
4115 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
4116 config: djogi::config::MigrateConfig {
4119 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
4120 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
4121 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
4122 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
4123 },
4124 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
4125 audit_pool: match djogi::migrate::resolve_audit_url(&config) {
4126 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
4127 Err(_) => None,
4128 },
4129 runner_identity,
4130 drift_baseline: DriftBaseline::Disabled,
4131 };
4132
4133 match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
4134 Ok(report) => {
4135 println!(
4136 "djogi migrations baseline: established baseline `{}` \
4137 (ledger_id={}) in {:.1}s",
4138 version,
4139 report.ledger_id,
4140 report.execution_time_ms as f64 / 1000.0
4141 );
4142 0
4143 }
4144 Err(e) => {
4145 eprintln!("djogi migrations baseline: {e}");
4146 runner_error_exit_code(&e)
4147 }
4148 }
4149}
4150
4151#[cfg(test)]
4152mod tests {
4153 use super::*;
4154 use djogi::__bypass::RawAccessExt as _;
4155 use std::fs;
4156 use std::sync::atomic::{AtomicUsize, Ordering};
4157
4158 struct DatabaseUrlEnvGuard {
4159 _lock: std::sync::MutexGuard<'static, ()>,
4160 prior: Option<String>,
4161 }
4162
4163 impl DatabaseUrlEnvGuard {
4164 fn new() -> Self {
4165 Self {
4166 _lock: crate::test_env_lock(),
4167 prior: std::env::var("DATABASE_URL").ok(),
4168 }
4169 }
4170
4171 fn set(&self, value: &str) {
4172 unsafe { std::env::set_var("DATABASE_URL", value) };
4173 }
4174
4175 fn remove(&self) {
4176 unsafe { std::env::remove_var("DATABASE_URL") };
4177 }
4178 }
4179
4180 impl Drop for DatabaseUrlEnvGuard {
4181 fn drop(&mut self) {
4182 match &self.prior {
4183 Some(value) => unsafe { std::env::set_var("DATABASE_URL", value) },
4184 None => unsafe { std::env::remove_var("DATABASE_URL") },
4185 }
4186 }
4187 }
4188
4189 fn temp_workspace(tag: &str) -> std::path::PathBuf {
4190 static COUNTER: AtomicUsize = AtomicUsize::new(0);
4191 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
4192 let nanos = std::time::SystemTime::now()
4193 .duration_since(std::time::UNIX_EPOCH)
4194 .unwrap()
4195 .as_nanos();
4196 let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
4197 fs::create_dir_all(&p).unwrap();
4198 p
4199 }
4200
4201 fn ledger_row(
4202 id: i64,
4203 version: &str,
4204 status: LedgerStatus,
4205 app_label: &str,
4206 ) -> djogi::migrate::LedgerSummaryRow {
4207 djogi::migrate::LedgerSummaryRow {
4208 id,
4209 version: version.to_string(),
4210 description: format!("desc {version}"),
4211 status,
4212 execution_time_ms: 0,
4213 applied_at_rfc3339: "2026-01-01T00:00:00Z".to_string(),
4214 applied_by: "test".to_string(),
4215 run_id: id,
4216 partial_apply_note: None,
4217 app_label: app_label.to_string(),
4218 out_of_order_flag: false,
4219 }
4220 }
4221
4222 fn write_unreachable_config(work: &std::path::Path) {
4223 let toml = "[database]\nurl = \"postgres://localhost:1/djogi_unreachable\"\n\
4224 max_connections = 1\ndev_mode = false\n\
4225 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
4226 fs::write(work.join("Djogi.toml"), toml).unwrap();
4227 }
4228
4229 fn without_database_url<T>(f: impl FnOnce() -> T) -> T {
4230 let env_guard = DatabaseUrlEnvGuard::new();
4231 env_guard.remove();
4232 f()
4233 }
4234
4235 #[test]
4236 fn database_url_env_guard_restores_prior_value() {
4237 let env_guard = DatabaseUrlEnvGuard::new();
4238 let expected = env_guard.prior.clone();
4239 let next = if expected.as_deref() == Some("postgres://from-env/test") {
4240 "postgres://temporary/test"
4241 } else {
4242 "postgres://from-env/test"
4243 };
4244 env_guard.set(next);
4245 drop(env_guard);
4246 assert_eq!(std::env::var("DATABASE_URL").ok(), expected);
4247 }
4248
4249 fn current_production_phase_zero_sql(tag: &str) -> String {
4250 let work = temp_workspace(tag);
4251 let lock_path = work.join(LOCK_FILE_NAME);
4252 let guard = acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT).expect("lock");
4253 let models: std::collections::BTreeMap<
4254 djogi::migrate::BucketKey,
4255 djogi::migrate::AppliedSchema,
4256 > = std::collections::BTreeMap::new();
4257 let apps = vec![AppLifecycle {
4258 label: "billing".to_string(),
4259 database: "main".to_string(),
4260 renamed_from: None,
4261 tombstone: false,
4262 }];
4263 let emitted = djogi::migrate::ensure_phase_zero_emitted(
4264 &work,
4265 &models,
4266 &apps,
4267 time::OffsetDateTime::now_utc(),
4268 &guard,
4269 )
4270 .expect("auto-emit Phase 0");
4271 let sql = fs::read_to_string(&emitted[0].up_sql_path).expect("read emitted Phase 0");
4272 drop(guard);
4273 let _ = fs::remove_dir_all(&work);
4274 sql
4275 }
4276
4277 fn markerless_seed_phase_zero_sql(tag: &str) -> String {
4278 let mut sql = current_production_phase_zero_sql(tag);
4279 sql.push_str("\nINSERT INTO heer.heer_nodes (id) VALUES (1);\n");
4280 sql
4281 }
4282
4283 fn phase_zero_with_seed_statement(tag: &str, statement: &str) -> String {
4284 let mut sql = current_production_phase_zero_sql(tag);
4285 sql.push('\n');
4286 sql.push_str(statement);
4287 sql.push('\n');
4288 sql
4289 }
4290
4291 fn extended_seed_statement_cases() -> [(&'static str, &'static str); 4] {
4292 [
4293 (
4294 "cte_insert",
4295 "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
4296 ),
4297 (
4298 "cte_delete",
4299 "WITH moved AS (DELETE FROM heer.heer_node_state RETURNING *) SELECT 1;",
4300 ),
4301 (
4302 "merge",
4303 "MERGE INTO heer.heer_nodes AS target USING incoming ON false WHEN NOT MATCHED THEN INSERT (id) VALUES (1);",
4304 ),
4305 (
4306 "copy_from",
4307 "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
4308 ),
4309 ]
4310 }
4311
4312 fn generated_stale_phase_zero_sql(tag: &str) -> String {
4313 let mut sql = current_production_phase_zero_sql(tag);
4314 sql.push_str(
4315 "\nALTER DATABASE \"mydb\" SET heer.node_id = '1';\n\
4316 ALTER DATABASE \"mydb\" SET heer.ranj_node_id = '1';\n\
4317 SET heer.node_id = '1';\n\
4318 SET heer.ranj_node_id = '1';\n",
4319 );
4320 sql
4321 }
4322
4323 fn seed_capable_phase_zero_sql() -> String {
4324 djogi::testing::phase_zero_sql_for_testing("main", true)
4325 .expect("compose seed-capable Phase 0")
4326 }
4327
4328 fn write_pending_json(
4329 path: &Path,
4330 database: &str,
4331 app: &str,
4332 version: &str,
4333 depends_on: &[&str],
4334 ) {
4335 let pending = PendingPlan {
4336 format_version: djogi::migrate::PENDING_FORMAT_VERSION.to_string(),
4337 bucket_database: database.to_string(),
4338 bucket_app: app.to_string(),
4339 version: version.to_string(),
4340 slug: "test".to_string(),
4341 model_snapshot: djogi::migrate::AppliedSchema {
4342 djogi_version: "0.1.0".to_string(),
4343 enums: std::collections::BTreeMap::new(),
4344 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4345 generated_at: "2026-06-06T00:00:00Z".to_string(),
4346 indexes: Vec::new(),
4347 models: std::collections::BTreeMap::new(),
4348 registered_apps: vec![app.to_string()],
4349 },
4350 checksum_up: "V1:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
4351 .to_string(),
4352 checksum_down: None,
4353 composed_at: "2026-06-06T00:00:00Z".to_string(),
4354 depends_on: depends_on.iter().map(|s| s.to_string()).collect(),
4355 };
4356 if let Some(parent) = path.parent() {
4357 fs::create_dir_all(parent).unwrap();
4358 }
4359 fs::write(path, serde_json::to_vec_pretty(&pending).unwrap()).unwrap();
4360 }
4361
4362 fn write_fallback_migration_files(
4363 work: &std::path::Path,
4364 version: &str,
4365 up: &str,
4366 down: Option<&str>,
4367 ) -> djogi::migrate::BucketKey {
4368 let bucket = djogi::migrate::BucketKey {
4369 database: "main".to_string(),
4370 app: String::new(),
4371 };
4372 let dir = djogi::migrate::bucket_dir(work, &bucket);
4373 fs::create_dir_all(&dir).unwrap();
4374 fs::write(dir.join(djogi::migrate::up_filename(version)), up).unwrap();
4375 if let Some(down) = down {
4376 fs::write(dir.join(djogi::migrate::down_filename(version)), down).unwrap();
4377 }
4378 bucket
4379 }
4380
4381 const COMPOSED_UP_FIXTURE: &str = "-- Djogi composed migration — up\n\
4382 -- Version: V20260612000000__add_widgets\n\
4383 -- Bucket: main/_global_\n\
4384 -- Classification: Additive\n\
4385 --\n\
4386 -- DO NOT EDIT — regenerate via `djogi migrations compose`.\n\
4387 \n\
4388 -- CreateModel widgets\n\
4389 CREATE TABLE \"widgets\" (\"id\" BIGINT PRIMARY KEY);\n\
4390 \n\
4391 -- AddIndex widgets_id_idx\n\
4392 CREATE INDEX \"widgets_id_idx\" ON \"widgets\" (\"id\");\n\
4393 \n";
4394
4395 const COMPOSED_DOWN_FIXTURE: &str = "-- Djogi composed migration — down\n\
4396 -- Version: V20260612000000__add_widgets\n\
4397 -- Bucket: main/_global_\n\
4398 --\n\
4399 -- DO NOT EDIT — regenerate via `djogi migrations compose`.\n\
4400 \n\
4401 -- AddIndex widgets_id_idx\n\
4402 DROP INDEX \"widgets_id_idx\";\n\
4403 \n\
4404 -- CreateModel widgets\n\
4405 DROP TABLE \"widgets\";\n\
4406 \n";
4407
4408 #[test]
4409 fn fallback_checksums_match_canonical_domain_for_composed_file() {
4410 let work = temp_workspace("fallback-canonical");
4411 let version = "V20260612000000__add_widgets";
4412 let bucket = write_fallback_migration_files(
4413 &work,
4414 version,
4415 COMPOSED_UP_FIXTURE,
4416 Some(COMPOSED_DOWN_FIXTURE),
4417 );
4418 let canonical_up = djogi::migrate::compute_committed_sql_checksum(
4419 COMPOSED_UP_FIXTURE,
4420 djogi::migrate::ResetSqlSide::Up,
4421 );
4422 let canonical_down =
4423 djogi::migrate::compute_committed_down_sql_checksum(COMPOSED_DOWN_FIXTURE);
4424
4425 let (plan, checksum_up, checksum_down) = load_replay_plan_from_disk(
4426 &work,
4427 &bucket,
4428 version,
4429 &canonical_up,
4430 canonical_down.as_deref(),
4431 )
4432 .expect("fallback must load");
4433
4434 assert_eq!(checksum_up, canonical_up);
4435 assert_eq!(checksum_down, canonical_down);
4436 assert!(checksum_down.is_some());
4437
4438 let rehash = djogi::migrate::compute_checksum(
4439 plan.segments
4440 .iter()
4441 .flat_map(|segment| segment.statements.iter())
4442 .map(|stmt| stmt.up.as_str()),
4443 );
4444 assert_eq!(rehash, checksum_up);
4445 }
4446
4447 #[test]
4448 fn fallback_down_checksum_none_for_comment_only_down() {
4449 let work = temp_workspace("fallback-comment-down");
4450 let version = "V20260612000001__no_rollback";
4451 let up = "CREATE TABLE plain (id BIGINT);\n";
4452 let bucket = write_fallback_migration_files(&work, version, up, Some("-- no rollback\n"));
4453 let pending_up = djogi::migrate::compute_checksum([up]);
4454 let (_, _, checksum_down) =
4455 load_replay_plan_from_disk(&work, &bucket, version, &pending_up, None).expect("load");
4456 assert_eq!(checksum_down, None);
4457 }
4458
4459 #[test]
4460 fn fallback_down_checksum_some_for_executable_hand_authored_down() {
4461 let work = temp_workspace("fallback-plain-down");
4462 let version = "V20260612000002__plain";
4463 let up = "CREATE TABLE plain (id BIGINT);\n";
4464 let down = "DROP TABLE plain;\n";
4465 let bucket = write_fallback_migration_files(&work, version, up, Some(down));
4466 let pending_up = djogi::migrate::compute_checksum([up]);
4467 let pending_down = djogi::migrate::compute_checksum([down]);
4468 let (_, checksum_up, checksum_down) =
4469 load_replay_plan_from_disk(&work, &bucket, version, &pending_up, Some(&pending_down))
4470 .expect("load");
4471 assert_eq!(checksum_up, pending_up);
4472 assert_eq!(checksum_down, Some(pending_down));
4473 }
4474
4475 #[test]
4476 fn fallback_refuses_non_transactional_shape_without_replay_plan() {
4477 let work = temp_workspace("fallback-nontx");
4478 let version = "V20260612000003__conc_idx";
4479 let up = "CREATE INDEX CONCURRENTLY widgets_idx ON widgets (id);";
4480 let bucket = write_fallback_migration_files(&work, version, up, None);
4481 let pending_up = djogi::migrate::compute_checksum([up]);
4482 let err = load_replay_plan_from_disk(&work, &bucket, version, &pending_up, None)
4483 .expect_err("non-transactional migrations should refuse fallback");
4484 let rendered = err.to_string();
4485 assert!(
4486 rendered.contains("CREATE INDEX CONCURRENTLY"),
4487 "actionable shape in: {rendered}"
4488 );
4489 }
4490
4491 #[test]
4492 fn fallback_refuses_when_up_file_diverges_from_pending_checksum() {
4493 let work = temp_workspace("fallback-tamper");
4494 let version = "V20260612000004__tampered";
4495 let bucket = write_fallback_migration_files(&work, version, COMPOSED_UP_FIXTURE, None);
4496 let stale_pending = djogi::migrate::compute_checksum(["something else entirely"]);
4497 let err = load_replay_plan_from_disk(&work, &bucket, version, &stale_pending, None)
4498 .expect_err("up-domain mismatch must not be silently ignored");
4499 assert!(err.to_string().contains("checksum"), "actionable: {err}");
4500 }
4501
4502 #[test]
4508 fn fallback_refuses_when_down_file_diverges_from_pending_checksum() {
4509 let work = temp_workspace("fallback-tamper-down");
4510 let version = "V20260612000004__tampered_down";
4511 let up = "CREATE TABLE downcheck (id BIGINT);\n";
4512 let original_down = "DROP TABLE downcheck;\n";
4513 let tampered_down = "DROP TABLE downcheck; -- tampered\n";
4514
4515 let bucket = write_fallback_migration_files(&work, version, up, Some(tampered_down));
4517
4518 let pending_up = djogi::migrate::compute_checksum([up]);
4520 let pending_down = djogi::migrate::compute_checksum([original_down]);
4521
4522 let err =
4525 load_replay_plan_from_disk(&work, &bucket, version, &pending_up, Some(&pending_down))
4526 .expect_err("down-domain mismatch must not be silently ignored");
4527
4528 let rendered = err.to_string();
4529 assert!(
4530 rendered.contains("checksum"),
4531 "error message must mention checksum: {rendered}"
4532 );
4533 assert!(
4534 rendered.contains("down"),
4535 "error message must identify the down side: {rendered}"
4536 );
4537 }
4538
4539 #[test]
4540 fn fallback_unreadable_replay_plan_sidecar_is_an_error_not_a_silent_fallback() {
4541 let work = temp_workspace("fallback-badplan");
4542 let version = "V20260612000005__badplan";
4543 let bucket =
4544 write_fallback_migration_files(&work, version, "CREATE TABLE t (id BIGINT);\n", None);
4545 let plan_path =
4546 djogi::migrate::bucket_dir(&work, &bucket).join(format!("{version}.plan.json"));
4547 fs::create_dir_all(&plan_path).unwrap();
4548 let pending_up = djogi::migrate::compute_checksum(["CREATE TABLE t (id BIGINT);\n"]);
4549 load_replay_plan_from_disk(&work, &bucket, version, &pending_up, None)
4550 .expect_err("non-NotFound sidecar read errors must surface");
4551 }
4552
4553 #[test]
4557 fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
4558 let work = temp_workspace("b1_discover");
4559 let billing_dir = work.join("migrations/main/billing");
4564 fs::create_dir_all(&billing_dir).unwrap();
4565 fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
4566 let global_dir = work.join("migrations/main/_global_");
4569 fs::create_dir_all(&global_dir).unwrap();
4570 fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
4571 let no_snap_dir = work.join("migrations/main/empty_app");
4575 fs::create_dir_all(&no_snap_dir).unwrap();
4576
4577 let buckets = discover_snapshot_buckets_on_disk(&work);
4578 let labels: std::collections::BTreeSet<&str> =
4579 buckets.iter().map(|b| b.app.as_str()).collect();
4580 assert!(
4581 labels.contains("billing"),
4582 "must include the renamed-from bucket: {labels:?}"
4583 );
4584 assert!(
4585 labels.contains(""),
4586 "must include the global bucket: {labels:?}"
4587 );
4588 assert!(
4589 !labels.contains("empty_app"),
4590 "must not include directories without a snapshot: {labels:?}"
4591 );
4592 let _ = fs::remove_dir_all(&work);
4593 }
4594
4595 #[test]
4600 fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
4601 let work = temp_workspace("a1_workspace_config");
4602 let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
4603 max_connections = 1\ndev_mode = false\n\
4604 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
4605 fs::write(work.join("Djogi.toml"), toml).unwrap();
4606 let env_guard = DatabaseUrlEnvGuard::new();
4607 env_guard.remove();
4608 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
4609 assert_eq!(
4610 config.database.url,
4611 "postgres://discovered-by-workspace-flag/test"
4612 );
4613 assert_eq!(config.server.port, 1234);
4614 let _ = fs::remove_dir_all(&work);
4615 }
4616
4617 #[test]
4622 fn a1_round2_env_override_beats_workspace_toml() {
4623 let work = temp_workspace("a1r2_env_override");
4624 let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
4625 max_connections = 1\ndev_mode = false\n\
4626 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
4627 fs::write(work.join("Djogi.toml"), toml).unwrap();
4628 let env_guard = DatabaseUrlEnvGuard::new();
4629 env_guard.set("postgres://from-env/test");
4630 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
4631 assert_eq!(
4632 config.database.url, "postgres://from-env/test",
4633 "env DATABASE_URL must win over workspace Djogi.toml"
4634 );
4635 let _ = fs::remove_dir_all(&work);
4636 }
4637
4638 #[test]
4639 fn apply_no_pending_is_identity_free_and_skips_pool_connect() {
4640 let work = temp_workspace("apply_no_pending");
4641 write_unreachable_config(&work);
4642
4643 let exit = without_database_url(|| {
4644 let runtime = tokio::runtime::Builder::new_current_thread()
4645 .enable_all()
4646 .build()
4647 .expect("runtime");
4648 runtime.block_on(run_apply(&work, &FakeMode::Real, None, false))
4649 });
4650
4651 assert_eq!(
4652 exit, 0,
4653 "no-pending apply must return before identity resolution or pool checkout"
4654 );
4655 let _ = fs::remove_dir_all(&work);
4656 }
4657
4658 #[test]
4659 fn discover_pending_plans_orders_phase_zero_before_normal_global() {
4660 let work = temp_workspace("discover_pending_phase_zero_first");
4661 write_pending_json(
4662 &djogi::migrate::pending_json_path(
4663 &work,
4664 &BucketKey {
4665 database: "main".to_string(),
4666 app: String::new(),
4667 },
4668 ),
4669 "main",
4670 "",
4671 "V20260606010101__later_global",
4672 &[],
4673 );
4674 write_pending_json(
4675 &djogi::migrate::phase_zero_pending_json_path(
4676 &work,
4677 "main",
4678 djogi::migrate::PHASE_ZERO_VERSION,
4679 ),
4680 "main",
4681 "",
4682 djogi::migrate::PHASE_ZERO_VERSION,
4683 &[],
4684 );
4685
4686 let discovered = discover_pending_plans(&work).expect("discover");
4687 assert_eq!(discovered.len(), 2);
4688 assert_eq!(
4689 discovered[0].plan.version,
4690 djogi::migrate::PHASE_ZERO_VERSION
4691 );
4692 assert!(discovered[0].is_phase_zero);
4693 assert_eq!(discovered[1].plan.version, "V20260606010101__later_global");
4694 let _ = fs::remove_dir_all(&work);
4695 }
4696
4697 #[test]
4701 fn discover_orders_same_version_buckets_by_depends_on() {
4702 let work = temp_workspace("discover_pending_depends_on");
4703 write_pending_json(
4704 &djogi::migrate::pending_json_path(
4705 &work,
4706 &BucketKey {
4707 database: "main".to_string(),
4708 app: "system".to_string(),
4709 },
4710 ),
4711 "main",
4712 "system",
4713 "V20260609000000__initial",
4714 &["users"],
4715 );
4716 write_pending_json(
4717 &djogi::migrate::pending_json_path(
4718 &work,
4719 &BucketKey {
4720 database: "main".to_string(),
4721 app: "users".to_string(),
4722 },
4723 ),
4724 "main",
4725 "users",
4726 "V20260609000000__initial",
4727 &[],
4728 );
4729
4730 let plans = discover_pending_plans(&work).expect("discovers");
4731 let apps: Vec<&str> = plans.iter().map(|p| p.bucket.app.as_str()).collect();
4732 assert_eq!(apps, ["users", "system"]);
4733 let _ = fs::remove_dir_all(&work);
4734 }
4735
4736 #[test]
4739 fn discover_orders_no_dependency_buckets_alphabetically() {
4740 let work = temp_workspace("discover_pending_alpha_tiebreak");
4741 for app in &["charlie", "bravo", "alpha"] {
4743 write_pending_json(
4744 &djogi::migrate::pending_json_path(
4745 &work,
4746 &BucketKey {
4747 database: "main".to_string(),
4748 app: app.to_string(),
4749 },
4750 ),
4751 "main",
4752 app,
4753 "V20260609000000__initial",
4754 &[],
4755 );
4756 }
4757
4758 let plans = discover_pending_plans(&work).expect("discovers");
4759 let apps: Vec<&str> = plans.iter().map(|p| p.bucket.app.as_str()).collect();
4760 assert_eq!(apps, ["alpha", "bravo", "charlie"]);
4761 let _ = fs::remove_dir_all(&work);
4762 }
4763
4764 #[test]
4767 fn discover_depends_on_missing_bucket_is_ignored() {
4768 let work = temp_workspace("discover_pending_deps_missing");
4769 write_pending_json(
4771 &djogi::migrate::pending_json_path(
4772 &work,
4773 &BucketKey {
4774 database: "main".to_string(),
4775 app: "system".to_string(),
4776 },
4777 ),
4778 "main",
4779 "system",
4780 "V20260609000000__initial",
4781 &["billing"],
4782 );
4783
4784 let plans = discover_pending_plans(&work).expect("discovers");
4785 assert_eq!(plans.len(), 1);
4786 assert_eq!(plans[0].bucket.app, "system");
4787 let _ = fs::remove_dir_all(&work);
4788 }
4789
4790 #[test]
4794 fn discover_depends_on_cycle_is_refused() {
4795 let work = temp_workspace("discover_pending_deps_cycle");
4796 write_pending_json(
4797 &djogi::migrate::pending_json_path(
4798 &work,
4799 &BucketKey {
4800 database: "main".to_string(),
4801 app: "alpha".to_string(),
4802 },
4803 ),
4804 "main",
4805 "alpha",
4806 "V20260609000000__initial",
4807 &["beta"],
4808 );
4809 write_pending_json(
4810 &djogi::migrate::pending_json_path(
4811 &work,
4812 &BucketKey {
4813 database: "main".to_string(),
4814 app: "beta".to_string(),
4815 },
4816 ),
4817 "main",
4818 "beta",
4819 "V20260609000000__initial",
4820 &["alpha"],
4821 );
4822
4823 let err = discover_pending_plans(&work).expect_err("cycle must be refused");
4824 assert!(
4825 err.contains("alpha") && err.contains("beta") && err.contains("cycle"),
4826 "error should name both apps and mention cycle, got: {err}"
4827 );
4828 let _ = fs::remove_dir_all(&work);
4829 }
4830
4831 #[test]
4840 fn single_bucket_with_invalid_depends_on_is_refused() {
4841 let make_singleton = |dep: &str| -> Vec<DiscoveredPendingPlan> {
4842 let plan = PendingPlan {
4843 format_version: djogi::migrate::PENDING_FORMAT_VERSION.to_string(),
4844 bucket_database: "main".to_string(),
4845 bucket_app: "system".to_string(),
4846 version: "V20260609000000__initial".to_string(),
4847 slug: "test".to_string(),
4848 model_snapshot: djogi::migrate::AppliedSchema {
4849 djogi_version: "0.1.0".to_string(),
4850 enums: std::collections::BTreeMap::new(),
4851 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4852 generated_at: "2026-06-09T00:00:00Z".to_string(),
4853 indexes: Vec::new(),
4854 models: std::collections::BTreeMap::new(),
4855 registered_apps: vec!["system".to_string()],
4856 },
4857 checksum_up: "V1:".to_string() + &"a".repeat(64),
4858 checksum_down: None,
4859 composed_at: "2026-06-09T00:00:00Z".to_string(),
4860 depends_on: vec![dep.to_string()],
4861 };
4862 vec![DiscoveredPendingPlan {
4863 path: PathBuf::from("target/djogi_pending/main/system.json"),
4864 bucket: BucketKey {
4865 database: "main".to_string(),
4866 app: "system".to_string(),
4867 },
4868 plan,
4869 is_phase_zero: false,
4870 }]
4871 };
4872
4873 for bad_label in ["../traversal", "has space"] {
4874 let err = order_pending_groups_by_dependencies(make_singleton(bad_label))
4875 .expect_err("invalid singleton depends_on label must be refused");
4876 assert!(
4877 err.contains("invalid depends_on label")
4878 && err.contains("main")
4879 && err.contains("system"),
4880 "[{bad_label}] error must name database, app, and the invalid label: {err}"
4881 );
4882 }
4883 }
4884
4885 #[djogi::djogi_test]
4892 async fn cross_bucket_fk_applies_in_dependency_order(mut ctx: djogi::context::DjogiContext) {
4893 static E2E_COUNTER: AtomicUsize = AtomicUsize::new(0);
4895 let n = E2E_COUNTER.fetch_add(1, Ordering::SeqCst);
4896 let users_table = format!("e2e_users_{n}");
4897 let event_log_table = format!("e2e_event_log_{n}");
4898
4899 let work = temp_workspace("cross-bucket-fk-e2e");
4900 let guard = djogi::migrate::acquire_workspace_lock(
4901 &work.join(LOCK_FILE_NAME),
4902 std::time::Duration::from_secs(5),
4903 )
4904 .expect("lock workspace");
4905
4906 let mut models: std::collections::BTreeMap<
4908 djogi::migrate::BucketKey,
4909 djogi::migrate::AppliedSchema,
4910 > = std::collections::BTreeMap::new();
4911
4912 let users_bucket = BucketKey {
4913 database: "main".into(),
4914 app: "users".into(),
4915 };
4916 let system_bucket = BucketKey {
4917 database: "main".into(),
4918 app: "system".into(),
4919 };
4920
4921 {
4922 let mut users_schema = djogi::migrate::AppliedSchema {
4923 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4924 enums: std::collections::BTreeMap::new(),
4925 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4926 generated_at: "2026-06-10T00:00:00Z".to_string(),
4927 indexes: Vec::new(),
4928 models: std::collections::BTreeMap::new(),
4929 registered_apps: vec!["users".to_string()],
4930 };
4931 users_schema.models.insert(
4932 users_table.clone(),
4933 djogi::migrate::TableSchema {
4934 app: Some("users".to_string()),
4935 columns: vec![djogi::migrate::ColumnSchema {
4936 name: "id".to_string(),
4937 sql_type: "BIGINT".to_string(),
4938 nullable: false,
4939 default_sql: Some("heerid_next_desc()".to_string()),
4940 ..default_col()
4941 }],
4942 primary_key: djogi::migrate::PrimaryKeySchema {
4943 columns: vec!["id".to_string()],
4944 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
4945 },
4946 table: users_table.clone(),
4947 ..default_table()
4948 },
4949 );
4950 models.insert(users_bucket.clone(), users_schema);
4951 }
4952
4953 {
4954 let mut system_schema = djogi::migrate::AppliedSchema {
4955 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4956 enums: std::collections::BTreeMap::new(),
4957 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4958 generated_at: "2026-06-10T00:00:00Z".to_string(),
4959 indexes: Vec::new(),
4960 models: std::collections::BTreeMap::new(),
4961 registered_apps: vec!["system".to_string()],
4962 };
4963 system_schema.models.insert(
4964 event_log_table.clone(),
4965 djogi::migrate::TableSchema {
4966 app: Some("system".to_string()),
4967 columns: vec![
4968 djogi::migrate::ColumnSchema {
4969 name: "id".to_string(),
4970 sql_type: "BIGINT".to_string(),
4971 nullable: false,
4972 default_sql: Some("heerid_next_desc()".to_string()),
4973 ..default_col()
4974 },
4975 djogi::migrate::ColumnSchema {
4976 name: "user_id".to_string(),
4977 sql_type: "BIGINT".to_string(),
4978 nullable: false,
4979 foreign_key: Some(djogi::migrate::ForeignKeySchema {
4980 deferrable: false,
4981 initially_deferred: false,
4982 on_delete: djogi::migrate::OnDeleteSchema::Restrict,
4983 ref_column: "id".to_string(),
4984 ref_table: users_table.clone(),
4985 }),
4986 ..default_col()
4987 },
4988 ],
4989 primary_key: djogi::migrate::PrimaryKeySchema {
4990 columns: vec!["id".to_string()],
4991 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
4992 },
4993 table: event_log_table.clone(),
4994 ..default_table()
4995 },
4996 );
4997 models.insert(system_bucket.clone(), system_schema);
4998 }
4999
5000 let mut snapshots = std::collections::BTreeMap::new();
5002 for bucket in [&users_bucket, &system_bucket] {
5003 snapshots.insert(
5004 bucket.clone(),
5005 djogi::migrate::AppliedSchema {
5006 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
5007 enums: std::collections::BTreeMap::new(),
5008 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
5009 generated_at: "2026-06-10T00:00:00Z".to_string(),
5010 indexes: Vec::new(),
5011 models: std::collections::BTreeMap::new(),
5012 registered_apps: vec![bucket.app.clone()],
5013 },
5014 );
5015 }
5016
5017 let apps = vec![
5018 djogi::migrate::AppLifecycle {
5019 label: "users".into(),
5020 database: "main".into(),
5021 renamed_from: None,
5022 tombstone: false,
5023 },
5024 djogi::migrate::AppLifecycle {
5025 label: "system".into(),
5026 database: "main".into(),
5027 renamed_from: None,
5028 tombstone: false,
5029 },
5030 ];
5031
5032 let compose_req = djogi::migrate::ComposeRequest {
5034 workspace_root: &work,
5035 models: &models,
5036 snapshots: &snapshots,
5037 apps: &apps,
5038 name: "cross-bucket-fk",
5039 allow_destructive: false,
5040 force_overwrite: false,
5041 now: time::OffsetDateTime::UNIX_EPOCH
5042 + time::Duration::days(19726)
5043 + time::Duration::seconds(0),
5044 _guard: &guard,
5045 pk_flip_join_table_option: None,
5046 skip_phase_zero_auto_emit: false,
5051 };
5052
5053 let compose_report = djogi::migrate::compose(compose_req).expect("compose");
5054 assert!(
5055 !compose_report.composed_buckets.is_empty(),
5056 "compose should produce delta buckets"
5057 );
5058
5059 drop(guard);
5065
5066 let composed_version = &compose_report.composed_buckets[0].version;
5068
5069 let test_db = ctx
5073 .raw_scalar::<String>("SELECT current_database()", &[])
5074 .await
5075 .expect("current_database");
5076 let admin_url = std::env::var("DATABASE_URL").expect(
5077 "DATABASE_URL must be set for djogi_test \
5078 (e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
5079 );
5080 let test_db_url = replace_db_in_url(&admin_url, &test_db)
5081 .expect("construct per-test database URL from DATABASE_URL");
5082
5083 fs::write(
5085 work.join("Djogi.toml"),
5086 format!(
5087 "[database]\nurl = \"{test_db_url}\"\n\
5088 max_connections = 1\ndev_mode = false\n\
5089 [server]\nhost = \"127.0.0.1\"\nport = 8080\n"
5090 ),
5091 )
5092 .unwrap();
5093
5094 let db_url_guard = DatabaseUrlEnvGuard::new();
5102 db_url_guard.set(&test_db_url);
5103
5104 let exit = {
5109 let work = work.clone();
5110 tokio::task::spawn_blocking(move || {
5111 tokio::runtime::Builder::new_current_thread()
5112 .enable_all()
5113 .build()
5114 .expect("runtime")
5115 .block_on(run_apply(
5116 &work,
5117 &FakeMode::Real,
5118 None,
5119 true, ))
5121 })
5122 .await
5123 .expect("spawn_blocking join")
5124 };
5125
5126 drop(db_url_guard);
5129
5130 assert_eq!(
5131 exit, 0,
5132 "apply should succeed (tables created in FK dependency order)"
5133 );
5134
5135 let fk_rows = ctx
5137 .raw_rows(
5138 "SELECT c.conname \
5139 FROM pg_constraint c \
5140 JOIN pg_class r ON r.oid = c.conrelid \
5141 JOIN pg_class f ON f.oid = c.confrelid \
5142 WHERE r.relname = $1 AND c.contype = 'f' AND f.relname = $2",
5143 &[&event_log_table.as_str(), &users_table.as_str()],
5144 )
5145 .await
5146 .expect("query pg_constraint");
5147 assert!(
5148 !fk_rows.is_empty(),
5149 "FK constraint should exist from {event_log_table} → {users_table}"
5150 );
5151
5152 let ledger_rows = ctx
5156 .raw_rows(
5157 "SELECT app_label FROM djogi_schema_migrations \
5158 WHERE version = $1 AND status = 'applied'",
5159 &[&composed_version.as_str()],
5160 )
5161 .await
5162 .expect("query ledger");
5163 assert_eq!(
5164 ledger_rows.len(),
5165 2,
5166 "ledger should have exactly 2 rows for composed version {composed_version} \
5167 (users + system), got {} rows",
5168 ledger_rows.len()
5169 );
5170 let app_labels: Vec<String> = ledger_rows
5171 .iter()
5172 .map(|row| row.try_get(0).expect("decode app_label"))
5173 .collect();
5174 assert!(
5175 app_labels.contains(&"users".to_string()),
5176 "ledger should have 'users' bucket: {app_labels:?}"
5177 );
5178 assert!(
5179 app_labels.contains(&"system".to_string()),
5180 "ledger should have 'system' bucket: {app_labels:?}"
5181 );
5182
5183 let ordered_rows = ctx
5185 .raw_rows(
5186 "SELECT app_label, id FROM djogi_schema_migrations \
5187 WHERE version = $1 AND status = 'applied' ORDER BY id",
5188 &[&composed_version.as_str()],
5189 )
5190 .await
5191 .expect("query ledger ordered");
5192 assert_eq!(ordered_rows[0].try_get::<_, String>(0).unwrap(), "users");
5193 assert_eq!(ordered_rows[1].try_get::<_, String>(0).unwrap(), "system");
5194
5195 let _ = fs::remove_dir_all(&work);
5196
5197 }
5203
5204 #[djogi::djogi_test]
5208 async fn shared_enum_multi_slice_applies(mut ctx: djogi::context::DjogiContext) {
5209 static E2E_COUNTER: AtomicUsize = AtomicUsize::new(0);
5211 let n = E2E_COUNTER.fetch_add(1, Ordering::SeqCst);
5212 let posts_table = format!("e2e_posts_{n}");
5213 let comments_table = format!("e2e_comments_{n}");
5214
5215 let work = temp_workspace("shared-enum-e2e");
5216 let guard = djogi::migrate::acquire_workspace_lock(
5217 &work.join(LOCK_FILE_NAME),
5218 std::time::Duration::from_secs(5),
5219 )
5220 .expect("lock workspace");
5221
5222 let mut models: std::collections::BTreeMap<
5225 djogi::migrate::BucketKey,
5226 djogi::migrate::AppliedSchema,
5227 > = std::collections::BTreeMap::new();
5228
5229 let alpha_bucket = BucketKey {
5230 database: "main".into(),
5231 app: "alpha".into(),
5232 };
5233 let beta_bucket = BucketKey {
5234 database: "main".into(),
5235 app: "beta".into(),
5236 };
5237
5238 let mood_enum = djogi::migrate::schema::EnumSchema {
5240 name: "mood".to_string(),
5241 variants: vec!["happy".to_string(), "sad".to_string()],
5242 };
5243
5244 {
5246 let mut alpha_schema = djogi::migrate::AppliedSchema {
5247 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
5248 enums: std::collections::BTreeMap::new(),
5249 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
5250 generated_at: "2026-06-10T00:00:00Z".to_string(),
5251 indexes: Vec::new(),
5252 models: std::collections::BTreeMap::new(),
5253 registered_apps: vec!["alpha".to_string()],
5254 };
5255 alpha_schema
5256 .enums
5257 .insert("mood".to_string(), mood_enum.clone());
5258 alpha_schema.models.insert(
5259 posts_table.clone(),
5260 djogi::migrate::TableSchema {
5261 app: Some("alpha".to_string()),
5262 columns: vec![
5263 djogi::migrate::ColumnSchema {
5264 name: "id".to_string(),
5265 sql_type: "BIGINT".to_string(),
5266 nullable: false,
5267 default_sql: Some("heerid_next_desc()".to_string()),
5268 ..default_col()
5269 },
5270 djogi::migrate::ColumnSchema {
5271 name: "mood".to_string(),
5272 sql_type: "mood".to_string(),
5273 nullable: true,
5274 ..default_col()
5275 },
5276 ],
5277 primary_key: djogi::migrate::PrimaryKeySchema {
5278 columns: vec!["id".to_string()],
5279 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
5280 },
5281 table: posts_table.clone(),
5282 ..default_table()
5283 },
5284 );
5285 models.insert(alpha_bucket.clone(), alpha_schema);
5286 }
5287
5288 {
5290 let mut beta_schema = djogi::migrate::AppliedSchema {
5291 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
5292 enums: std::collections::BTreeMap::new(),
5293 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
5294 generated_at: "2026-06-10T00:00:00Z".to_string(),
5295 indexes: Vec::new(),
5296 models: std::collections::BTreeMap::new(),
5297 registered_apps: vec!["beta".to_string()],
5298 };
5299 beta_schema.enums.insert("mood".to_string(), mood_enum);
5300 beta_schema.models.insert(
5301 comments_table.clone(),
5302 djogi::migrate::TableSchema {
5303 app: Some("beta".to_string()),
5304 columns: vec![
5305 djogi::migrate::ColumnSchema {
5306 name: "id".to_string(),
5307 sql_type: "BIGINT".to_string(),
5308 nullable: false,
5309 default_sql: Some("heerid_next_desc()".to_string()),
5310 ..default_col()
5311 },
5312 djogi::migrate::ColumnSchema {
5313 name: "post_id".to_string(),
5314 sql_type: "BIGINT".to_string(),
5315 nullable: false,
5316 foreign_key: Some(djogi::migrate::ForeignKeySchema {
5317 deferrable: false,
5318 initially_deferred: false,
5319 on_delete: djogi::migrate::OnDeleteSchema::Restrict,
5320 ref_column: "id".to_string(),
5321 ref_table: posts_table.clone(),
5322 }),
5323 ..default_col()
5324 },
5325 djogi::migrate::ColumnSchema {
5326 name: "author_mood".to_string(),
5327 sql_type: "mood".to_string(),
5328 nullable: true,
5329 ..default_col()
5330 },
5331 ],
5332 primary_key: djogi::migrate::PrimaryKeySchema {
5333 columns: vec!["id".to_string()],
5334 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
5335 },
5336 table: comments_table.clone(),
5337 ..default_table()
5338 },
5339 );
5340 models.insert(beta_bucket.clone(), beta_schema);
5341 }
5342
5343 let mut snapshots = std::collections::BTreeMap::new();
5345 for bucket in [&alpha_bucket, &beta_bucket] {
5346 snapshots.insert(
5347 bucket.clone(),
5348 djogi::migrate::AppliedSchema {
5349 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
5350 enums: std::collections::BTreeMap::new(),
5351 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
5352 generated_at: "2026-06-10T00:00:00Z".to_string(),
5353 indexes: Vec::new(),
5354 models: std::collections::BTreeMap::new(),
5355 registered_apps: vec![bucket.app.clone()],
5356 },
5357 );
5358 }
5359
5360 let apps = vec![
5361 djogi::migrate::AppLifecycle {
5362 label: "alpha".into(),
5363 database: "main".into(),
5364 renamed_from: None,
5365 tombstone: false,
5366 },
5367 djogi::migrate::AppLifecycle {
5368 label: "beta".into(),
5369 database: "main".into(),
5370 renamed_from: None,
5371 tombstone: false,
5372 },
5373 ];
5374
5375 let compose_req = djogi::migrate::ComposeRequest {
5377 workspace_root: &work,
5378 models: &models,
5379 snapshots: &snapshots,
5380 apps: &apps,
5381 name: "shared-enum-multi-slice",
5382 allow_destructive: false,
5383 force_overwrite: false,
5384 now: time::OffsetDateTime::UNIX_EPOCH
5385 + time::Duration::days(19726)
5386 + time::Duration::seconds(0),
5387 _guard: &guard,
5388 pk_flip_join_table_option: None,
5389 skip_phase_zero_auto_emit: false,
5390 };
5391
5392 let compose_report = djogi::migrate::compose(compose_req).expect("compose");
5393 assert!(
5394 !compose_report.composed_buckets.is_empty(),
5395 "compose should produce delta buckets"
5396 );
5397
5398 drop(guard);
5400
5401 let composed_version = &compose_report.composed_buckets[0].version;
5403
5404 let test_db = ctx
5406 .raw_scalar::<String>("SELECT current_database()", &[])
5407 .await
5408 .expect("current_database");
5409 let admin_url = std::env::var("DATABASE_URL").expect(
5410 "DATABASE_URL must be set for djogi_test \
5411 (e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
5412 );
5413 let test_db_url = replace_db_in_url(&admin_url, &test_db)
5414 .expect("construct per-test database URL from DATABASE_URL");
5415
5416 fs::write(
5418 work.join("Djogi.toml"),
5419 format!(
5420 "[database]\nurl = \"{test_db_url}\"\n\
5421 max_connections = 1\ndev_mode = false\n\
5422 [server]\nhost = \"127.0.0.1\"\nport = 8080\n"
5423 ),
5424 )
5425 .unwrap();
5426
5427 let db_url_guard = DatabaseUrlEnvGuard::new();
5428 db_url_guard.set(&test_db_url);
5429
5430 let exit = {
5432 let work = work.clone();
5433 tokio::task::spawn_blocking(move || {
5434 tokio::runtime::Builder::new_current_thread()
5435 .enable_all()
5436 .build()
5437 .expect("runtime")
5438 .block_on(run_apply(
5439 &work,
5440 &FakeMode::Real,
5441 None,
5442 true, ))
5444 })
5445 .await
5446 .expect("spawn_blocking join")
5447 };
5448
5449 drop(db_url_guard);
5450
5451 assert_eq!(
5452 exit, 0,
5453 "apply should succeed (enum created once, tables in FK order)"
5454 );
5455
5456 let mood_count = ctx
5458 .raw_scalar::<i64>(
5459 "SELECT count(*) FROM pg_type WHERE typname = $1",
5460 &[&"mood"],
5461 )
5462 .await
5463 .expect("query pg_type for mood");
5464 assert_eq!(
5465 mood_count, 1,
5466 "exactly one mood enum type should exist in pg_type, got {mood_count}"
5467 );
5468
5469 let table_count = ctx
5471 .raw_scalar::<i64>(
5472 "SELECT count(*) FROM pg_class WHERE relname = $1 OR relname = $2",
5473 &[&posts_table.as_str(), &comments_table.as_str()],
5474 )
5475 .await
5476 .expect("query pg_class for tables");
5477 assert_eq!(
5478 table_count, 2,
5479 "both tables should exist ({posts_table}, {comments_table}), got {table_count}"
5480 );
5481
5482 let ledger_rows = ctx
5484 .raw_rows(
5485 "SELECT app_label FROM djogi_schema_migrations \
5486 WHERE version = $1 AND status = 'applied'",
5487 &[&composed_version.as_str()],
5488 )
5489 .await
5490 .expect("query ledger");
5491 assert_eq!(
5492 ledger_rows.len(),
5493 2,
5494 "ledger should have exactly 2 rows for composed version {composed_version} \
5495 (alpha + beta), got {} rows",
5496 ledger_rows.len()
5497 );
5498
5499 let _ = fs::remove_dir_all(&work);
5500 }
5501
5502 fn bucket_snapshot(
5503 app: &str,
5504 models: std::collections::BTreeMap<String, djogi::migrate::TableSchema>,
5505 indexes: Vec<djogi::migrate::IndexSchema>,
5506 ) -> djogi::migrate::AppliedSchema {
5507 djogi::migrate::AppliedSchema {
5508 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
5509 enums: std::collections::BTreeMap::new(),
5510 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
5511 generated_at: "2026-06-13T00:00:00Z".to_string(),
5512 indexes,
5513 models,
5514 registered_apps: vec![app.to_string()],
5515 }
5516 }
5517
5518 fn simple_table(
5519 app: &str,
5520 table: &str,
5521 mut columns: Vec<djogi::migrate::ColumnSchema>,
5522 ) -> djogi::migrate::TableSchema {
5523 for column in &mut columns {
5524 if column.name == "id" && column.default_sql.is_none() {
5525 column.default_sql = Some("heerid_next_desc()".to_string());
5526 }
5527 }
5528 djogi::migrate::TableSchema {
5529 app: Some(app.to_string()),
5530 columns,
5531 primary_key: djogi::migrate::PrimaryKeySchema {
5532 columns: vec!["id".to_string()],
5533 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
5534 },
5535 table: table.to_string(),
5536 ..default_table()
5537 }
5538 }
5539
5540 fn id_col() -> djogi::migrate::ColumnSchema {
5541 djogi::migrate::ColumnSchema {
5542 name: "id".to_string(),
5543 sql_type: "BIGINT".to_string(),
5544 nullable: false,
5545 default_sql: Some("heerid_next_desc()".to_string()),
5546 ..default_col()
5547 }
5548 }
5549
5550 fn text_col(name: &str, nullable: bool) -> djogi::migrate::ColumnSchema {
5551 djogi::migrate::ColumnSchema {
5552 name: name.to_string(),
5553 sql_type: "TEXT".to_string(),
5554 nullable,
5555 ..default_col()
5556 }
5557 }
5558
5559 fn bigint_fk_col(
5560 name: &str,
5561 ref_table: &str,
5562 on_delete: djogi::migrate::OnDeleteSchema,
5563 ) -> djogi::migrate::ColumnSchema {
5564 djogi::migrate::ColumnSchema {
5565 name: name.to_string(),
5566 sql_type: "BIGINT".to_string(),
5567 nullable: false,
5568 foreign_key: Some(djogi::migrate::ForeignKeySchema {
5569 deferrable: false,
5570 initially_deferred: false,
5571 on_delete,
5572 ref_column: "id".to_string(),
5573 ref_table: ref_table.to_string(),
5574 }),
5575 ..default_col()
5576 }
5577 }
5578
5579 fn btree_index(table: &str, name: &str, column: &str) -> djogi::migrate::IndexSchema {
5580 djogi::migrate::IndexSchema {
5581 extension_dependency: None,
5582 include: vec![],
5583 index_type: djogi::migrate::IndexTypeSchema::BTree,
5584 kind: djogi::migrate::IndexKindSchema::NonUnique,
5585 name: name.to_string(),
5586 nulls_not_distinct: false,
5587 predicate: None,
5588 requires_out_of_transaction: false,
5589 table: table.to_string(),
5590 target: djogi::migrate::IndexTargetSchema::Columns(vec![
5591 djogi::migrate::IndexColumnSchema {
5592 name: column.to_string(),
5593 nulls: djogi::migrate::IndexNullsOrderSchema::Default,
5594 opclass: None,
5595 order: djogi::migrate::IndexOrderSchema::Asc,
5596 },
5597 ]),
5598 }
5599 }
5600
5601 async fn run_apply_in_test_db(
5602 ctx: &mut djogi::context::DjogiContext,
5603 work: &std::path::Path,
5604 mode: FakeMode,
5605 ) -> i32 {
5606 let test_db = ctx
5607 .raw_scalar::<String>("SELECT current_database()", &[])
5608 .await
5609 .expect("current_database");
5610 let admin_url = std::env::var("DATABASE_URL").expect(
5611 "DATABASE_URL must be set for djogi_test \
5612 (e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
5613 );
5614 let test_db_url = replace_db_in_url(&admin_url, &test_db)
5615 .expect("construct per-test database URL from DATABASE_URL");
5616
5617 fs::write(
5618 work.join("Djogi.toml"),
5619 format!(
5620 "[database]\nurl = \"{test_db_url}\"\n\
5621 max_connections = 1\ndev_mode = false\n\
5622 [server]\nhost = \"127.0.0.1\"\nport = 8080\n"
5623 ),
5624 )
5625 .unwrap();
5626
5627 let db_url_guard = DatabaseUrlEnvGuard::new();
5628 db_url_guard.set(&test_db_url);
5629
5630 let exit = {
5631 let work = work.to_path_buf();
5632 tokio::task::spawn_blocking(move || {
5633 tokio::runtime::Builder::new_current_thread()
5634 .enable_all()
5635 .build()
5636 .expect("runtime")
5637 .block_on(run_apply(&work, &mode, None, true))
5638 })
5639 .await
5640 .expect("spawn_blocking join")
5641 };
5642
5643 drop(db_url_guard);
5644 exit
5645 }
5646
5647 fn compose_bucket_migration(
5648 work: &std::path::Path,
5649 bucket: &BucketKey,
5650 current: djogi::migrate::AppliedSchema,
5651 previous: djogi::migrate::AppliedSchema,
5652 name: &str,
5653 now: time::OffsetDateTime,
5654 ) -> djogi::migrate::ComposeReport {
5655 let guard = djogi::migrate::acquire_workspace_lock(
5656 &work.join(LOCK_FILE_NAME),
5657 std::time::Duration::from_secs(5),
5658 )
5659 .expect("lock workspace");
5660
5661 let mut models = std::collections::BTreeMap::new();
5662 models.insert(bucket.clone(), current);
5663
5664 let mut snapshots = std::collections::BTreeMap::new();
5665 snapshots.insert(bucket.clone(), previous);
5666
5667 let apps = vec![djogi::migrate::AppLifecycle {
5668 label: bucket.app.clone(),
5669 database: bucket.database.clone(),
5670 renamed_from: None,
5671 tombstone: false,
5672 }];
5673
5674 let report = djogi::migrate::compose(djogi::migrate::ComposeRequest {
5675 workspace_root: work,
5676 models: &models,
5677 snapshots: &snapshots,
5678 apps: &apps,
5679 name,
5680 allow_destructive: false,
5681 force_overwrite: false,
5682 now,
5683 _guard: &guard,
5684 pk_flip_join_table_option: None,
5685 skip_phase_zero_auto_emit: false,
5686 })
5687 .expect("compose");
5688 drop(guard);
5689 report
5690 }
5691
5692 #[djogi::djogi_test]
5693 async fn apply_refuses_on_column_drift_before_second_migration(
5694 mut ctx: djogi::context::DjogiContext,
5695 ) {
5696 static DRIFT_COUNTER: AtomicUsize = AtomicUsize::new(0);
5697 let n = DRIFT_COUNTER.fetch_add(1, Ordering::SeqCst);
5698 let table = format!("drift_users_{n}");
5699 let bucket = BucketKey {
5700 database: "main".into(),
5701 app: "billing".into(),
5702 };
5703 let work = temp_workspace("drift-column-refusal");
5704
5705 let v1_models = std::collections::BTreeMap::from([(
5706 table.clone(),
5707 simple_table(&bucket.app, &table, vec![id_col(), text_col("name", false)]),
5708 )]);
5709 let v1_snapshot = bucket_snapshot(&bucket.app, v1_models.clone(), vec![]);
5710 let empty_snapshot =
5711 bucket_snapshot(&bucket.app, std::collections::BTreeMap::new(), vec![]);
5712 let first_report = compose_bucket_migration(
5713 &work,
5714 &bucket,
5715 v1_snapshot.clone(),
5716 empty_snapshot,
5717 "drift-column-v1",
5718 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19729),
5719 );
5720 assert!(
5721 !first_report.composed_buckets.is_empty(),
5722 "first compose must emit"
5723 );
5724 let first_version = first_report.composed_buckets[0].version.clone();
5725
5726 let first_exit = run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await;
5727 assert_eq!(first_exit, 0, "initial apply must succeed");
5728
5729 ctx.raw_execute(
5730 &format!("ALTER TABLE {table} RENAME COLUMN name TO full_name"),
5731 &[],
5732 )
5733 .await
5734 .expect("rename live column out of band");
5735
5736 let v2_models = std::collections::BTreeMap::from([(
5737 table.clone(),
5738 simple_table(
5739 &bucket.app,
5740 &table,
5741 vec![id_col(), text_col("name", false), text_col("email", true)],
5742 ),
5743 )]);
5744 let v2_snapshot = bucket_snapshot(&bucket.app, v2_models, vec![]);
5745 let second_report = compose_bucket_migration(
5746 &work,
5747 &bucket,
5748 v2_snapshot,
5749 v1_snapshot,
5750 "drift-column-v2",
5751 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19730),
5752 );
5753 let second_version = second_report.composed_buckets[0].version.clone();
5754
5755 let second_exit = run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await;
5756 assert_eq!(second_exit, 2, "drifted apply must refuse");
5757
5758 let applied_count = ctx
5759 .raw_scalar::<i64>(
5760 "SELECT count(*) FROM djogi_schema_migrations \
5761 WHERE version = $1 AND app_label = $2",
5762 &[&second_version, &bucket.app],
5763 )
5764 .await
5765 .expect("count second-version rows");
5766 assert_eq!(
5767 applied_count, 0,
5768 "refusal must not insert second ledger row"
5769 );
5770
5771 let email_exists = ctx
5772 .raw_scalar::<bool>(
5773 "SELECT EXISTS(
5774 SELECT 1
5775 FROM information_schema.columns
5776 WHERE table_name = $1 AND column_name = 'email'
5777 )",
5778 &[&table],
5779 )
5780 .await
5781 .expect("check email column");
5782 assert!(!email_exists, "refusal must precede second migration SQL");
5783
5784 let first_rows = ctx
5785 .raw_scalar::<i64>(
5786 "SELECT count(*) FROM djogi_schema_migrations \
5787 WHERE version = $1 AND app_label = $2 AND status = 'applied'",
5788 &[&first_version, &bucket.app],
5789 )
5790 .await
5791 .expect("count first-version rows");
5792 assert_eq!(first_rows, 1, "baseline apply should remain intact");
5793 }
5794
5795 #[djogi::djogi_test]
5796 async fn apply_refuses_on_missing_snapshot_for_applied_bucket(
5797 mut ctx: djogi::context::DjogiContext,
5798 ) {
5799 static DRIFT_COUNTER: AtomicUsize = AtomicUsize::new(0);
5800 let n = DRIFT_COUNTER.fetch_add(1, Ordering::SeqCst);
5801 let table = format!("drift_missing_snapshot_{n}");
5802 let bucket = BucketKey {
5803 database: "main".into(),
5804 app: "billing".into(),
5805 };
5806 let work = temp_workspace("drift-missing-snapshot");
5807
5808 let v1_models = std::collections::BTreeMap::from([(
5809 table.clone(),
5810 simple_table(&bucket.app, &table, vec![id_col(), text_col("name", false)]),
5811 )]);
5812 let v1_snapshot = bucket_snapshot(&bucket.app, v1_models.clone(), vec![]);
5813 let empty_snapshot =
5814 bucket_snapshot(&bucket.app, std::collections::BTreeMap::new(), vec![]);
5815 compose_bucket_migration(
5816 &work,
5817 &bucket,
5818 v1_snapshot.clone(),
5819 empty_snapshot,
5820 "drift-missing-v1",
5821 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19731),
5822 );
5823 assert_eq!(
5824 run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await,
5825 0
5826 );
5827
5828 let v2_models = std::collections::BTreeMap::from([(
5829 table.clone(),
5830 simple_table(
5831 &bucket.app,
5832 &table,
5833 vec![id_col(), text_col("name", false), text_col("email", true)],
5834 ),
5835 )]);
5836 compose_bucket_migration(
5837 &work,
5838 &bucket,
5839 bucket_snapshot(&bucket.app, v2_models, vec![]),
5840 v1_snapshot,
5841 "drift-missing-v2",
5842 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19732),
5843 );
5844
5845 let snap_path = reconstruct_snapshot_path(&work, &bucket);
5846 fs::remove_file(&snap_path).expect("delete recorded snapshot");
5847
5848 let exit = run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await;
5849 assert_eq!(exit, 2, "missing baseline must refuse on applied bucket");
5850 }
5851
5852 #[djogi::djogi_test]
5853 async fn apply_refuses_on_dropped_index_drift(mut ctx: djogi::context::DjogiContext) {
5854 static DRIFT_COUNTER: AtomicUsize = AtomicUsize::new(0);
5855 let n = DRIFT_COUNTER.fetch_add(1, Ordering::SeqCst);
5856 let table = format!("drift_index_{n}");
5857 let index_name = format!("{table}_name_idx");
5858 let bucket = BucketKey {
5859 database: "main".into(),
5860 app: "billing".into(),
5861 };
5862 let work = temp_workspace("drift-index-refusal");
5863
5864 let base_table = simple_table(&bucket.app, &table, vec![id_col(), text_col("name", false)]);
5865 let v1_snapshot = bucket_snapshot(
5866 &bucket.app,
5867 std::collections::BTreeMap::from([(table.clone(), base_table.clone())]),
5868 vec![btree_index(&table, &index_name, "name")],
5869 );
5870 let empty_snapshot =
5871 bucket_snapshot(&bucket.app, std::collections::BTreeMap::new(), vec![]);
5872 compose_bucket_migration(
5873 &work,
5874 &bucket,
5875 v1_snapshot.clone(),
5876 empty_snapshot,
5877 "drift-index-v1",
5878 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19733),
5879 );
5880 assert_eq!(
5881 run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await,
5882 0
5883 );
5884
5885 ctx.raw_execute(&format!("DROP INDEX {index_name}"), &[])
5886 .await
5887 .expect("drop index out of band");
5888
5889 let v2_snapshot = bucket_snapshot(
5890 &bucket.app,
5891 std::collections::BTreeMap::from([(
5892 table.clone(),
5893 simple_table(
5894 &bucket.app,
5895 &table,
5896 vec![id_col(), text_col("name", false), text_col("email", true)],
5897 ),
5898 )]),
5899 vec![btree_index(&table, &index_name, "name")],
5900 );
5901 compose_bucket_migration(
5902 &work,
5903 &bucket,
5904 v2_snapshot,
5905 v1_snapshot,
5906 "drift-index-v2",
5907 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19734),
5908 );
5909
5910 let exit = run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await;
5911 assert_eq!(exit, 2, "dropped index drift must refuse");
5912 }
5913
5914 #[djogi::djogi_test]
5915 async fn apply_refuses_on_foreign_key_shape_drift(mut ctx: djogi::context::DjogiContext) {
5916 static DRIFT_COUNTER: AtomicUsize = AtomicUsize::new(0);
5917 let n = DRIFT_COUNTER.fetch_add(1, Ordering::SeqCst);
5918 let users = format!("drift_fk_users_{n}");
5919 let posts = format!("drift_fk_posts_{n}");
5920 let bucket = BucketKey {
5921 database: "main".into(),
5922 app: "billing".into(),
5923 };
5924 let work = temp_workspace("drift-fk-refusal");
5925
5926 let v1_models = std::collections::BTreeMap::from([
5927 (
5928 users.clone(),
5929 simple_table(&bucket.app, &users, vec![id_col(), text_col("name", false)]),
5930 ),
5931 (
5932 posts.clone(),
5933 simple_table(
5934 &bucket.app,
5935 &posts,
5936 vec![
5937 id_col(),
5938 bigint_fk_col("user_id", &users, djogi::migrate::OnDeleteSchema::Restrict),
5939 ],
5940 ),
5941 ),
5942 ]);
5943 let v1_snapshot = bucket_snapshot(&bucket.app, v1_models.clone(), vec![]);
5944 let empty_snapshot =
5945 bucket_snapshot(&bucket.app, std::collections::BTreeMap::new(), vec![]);
5946 compose_bucket_migration(
5947 &work,
5948 &bucket,
5949 v1_snapshot.clone(),
5950 empty_snapshot,
5951 "drift-fk-v1",
5952 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19735),
5953 );
5954 assert_eq!(
5955 run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await,
5956 0
5957 );
5958
5959 let fk_name = ctx
5960 .raw_scalar::<String>(
5961 "SELECT c.conname
5962 FROM pg_constraint c
5963 JOIN pg_class t ON t.oid = c.conrelid
5964 WHERE t.relname = $1 AND c.contype = 'f'
5965 LIMIT 1",
5966 &[&posts.as_str()],
5967 )
5968 .await
5969 .expect("lookup live FK name");
5970 ctx.raw_ddl(&format!(
5971 "ALTER TABLE {posts} DROP CONSTRAINT {fk_name}; \
5972 ALTER TABLE {posts} ADD CONSTRAINT {fk_name} \
5973 FOREIGN KEY (user_id) REFERENCES {users}(id) ON DELETE CASCADE"
5974 ))
5975 .await
5976 .expect("rewrite FK out of band");
5977
5978 let v2_models = std::collections::BTreeMap::from([
5979 (
5980 users.clone(),
5981 simple_table(&bucket.app, &users, vec![id_col(), text_col("name", false)]),
5982 ),
5983 (
5984 posts.clone(),
5985 simple_table(
5986 &bucket.app,
5987 &posts,
5988 vec![
5989 id_col(),
5990 bigint_fk_col("user_id", &users, djogi::migrate::OnDeleteSchema::Restrict),
5991 text_col("note", true),
5992 ],
5993 ),
5994 ),
5995 ]);
5996 compose_bucket_migration(
5997 &work,
5998 &bucket,
5999 bucket_snapshot(&bucket.app, v2_models, vec![]),
6000 v1_snapshot,
6001 "drift-fk-v2",
6002 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19736),
6003 );
6004
6005 let exit = run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await;
6006 assert_eq!(exit, 2, "FK shape drift must refuse");
6007 }
6008
6009 #[djogi::djogi_test]
6010 async fn fake_apply_succeeds_with_corrupt_snapshot_on_disk(
6011 mut ctx: djogi::context::DjogiContext,
6012 ) {
6013 static DRIFT_COUNTER: AtomicUsize = AtomicUsize::new(0);
6014 let n = DRIFT_COUNTER.fetch_add(1, Ordering::SeqCst);
6015 let table = format!("fake_corrupt_snapshot_{n}");
6016 let bucket = BucketKey {
6017 database: "main".into(),
6018 app: "billing".into(),
6019 };
6020 let work = temp_workspace("fake-corrupt-snapshot");
6021
6022 let v1_models = std::collections::BTreeMap::from([(
6023 table.clone(),
6024 simple_table(&bucket.app, &table, vec![id_col(), text_col("name", false)]),
6025 )]);
6026 let v1_snapshot = bucket_snapshot(&bucket.app, v1_models.clone(), vec![]);
6027 let empty_snapshot =
6028 bucket_snapshot(&bucket.app, std::collections::BTreeMap::new(), vec![]);
6029 compose_bucket_migration(
6030 &work,
6031 &bucket,
6032 v1_snapshot.clone(),
6033 empty_snapshot,
6034 "fake-corrupt-v1",
6035 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19737),
6036 );
6037 assert_eq!(
6038 run_apply_in_test_db(&mut ctx, &work, FakeMode::Real).await,
6039 0
6040 );
6041
6042 let v2_models = std::collections::BTreeMap::from([(
6043 table.clone(),
6044 simple_table(
6045 &bucket.app,
6046 &table,
6047 vec![id_col(), text_col("name", false), text_col("email", true)],
6048 ),
6049 )]);
6050 let v2_snapshot = bucket_snapshot(&bucket.app, v2_models, vec![]);
6051 let report = compose_bucket_migration(
6052 &work,
6053 &bucket,
6054 v2_snapshot.clone(),
6055 v1_snapshot,
6056 "fake-corrupt-v2",
6057 time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(19738),
6058 );
6059 let second_version = report.composed_buckets[0].version.clone();
6060
6061 let snap_path = reconstruct_snapshot_path(&work, &bucket);
6062 fs::write(&snap_path, b"not json").expect("corrupt snapshot");
6063
6064 let exit = run_apply_in_test_db(
6065 &mut ctx,
6066 &work,
6067 FakeMode::Fake {
6068 reason: "schema pre-exists (corrupt-snapshot guard)".to_string(),
6069 },
6070 )
6071 .await;
6072 assert_eq!(exit, 0, "fake apply must ignore corrupt snapshot");
6073
6074 let status = ctx
6075 .raw_scalar::<String>(
6076 "SELECT status FROM djogi_schema_migrations \
6077 WHERE version = $1 AND app_label = $2",
6078 &[&second_version, &bucket.app],
6079 )
6080 .await
6081 .expect("query fake row status");
6082 assert_eq!(status, "faked");
6083
6084 let repaired_snapshot = djogi::migrate::load_snapshot(&snap_path)
6085 .expect("fake apply should rewrite a valid snapshot");
6086 assert!(
6087 repaired_snapshot.models.contains_key(&table),
6088 "fake apply should persist the caller-supplied snapshot forward"
6089 );
6090 }
6091
6092 fn replace_db_in_url(url: &str, new_db: &str) -> Option<String> {
6096 let body = url
6097 .strip_prefix("postgres://")
6098 .or_else(|| url.strip_prefix("postgresql://"))?;
6099 let scheme = if url.starts_with("postgres://") {
6100 "postgres://"
6101 } else {
6102 "postgresql://"
6103 };
6104 let mut idx = 0usize;
6105 let body_bytes = body.as_bytes();
6106 while idx < body_bytes.len() && body_bytes[idx] != b'/' {
6107 idx += 1;
6108 }
6109 if idx >= body_bytes.len() {
6110 return None;
6111 }
6112 let authority = &body[..idx];
6113 let path_start = idx + 1;
6114 let mut path_end = path_start;
6115 while path_end < body_bytes.len() && body_bytes[path_end] != b'?' {
6116 path_end += 1;
6117 }
6118 let trailing = &body[path_end..];
6119 Some(format!("{scheme}{authority}/{new_db}{trailing}"))
6120 }
6121
6122 fn default_col() -> djogi::migrate::ColumnSchema {
6123 djogi::migrate::ColumnSchema {
6124 check: None,
6125 codec: None,
6126 comment: None,
6127 default_sql: None,
6128 foreign_key: None,
6129 generated: None,
6130 identity: None,
6131 index_type: None,
6132 indexed: false,
6133 max_length: None,
6134 name: "".to_string(),
6135 nullable: false,
6136 on_delete: None,
6137 outbox_exclude: false,
6138 rationale: None,
6139 relation_kind: None,
6140 renamed_from: None,
6141 sequence_within: None,
6142 sql_type: "".to_string(),
6143 unique: false,
6144 type_change_using: None,
6145 }
6146 }
6147
6148 fn default_table() -> djogi::migrate::TableSchema {
6149 djogi::migrate::TableSchema {
6150 app: None,
6151 columns: Vec::new(),
6152 exclusion_constraints: Vec::new(),
6153 fts: None,
6154 is_through: false,
6155 moved_from_app: None,
6156 partition: None,
6157 primary_key: djogi::migrate::PrimaryKeySchema {
6158 columns: Vec::new(),
6159 kind: djogi::migrate::PkKindSchema::Composite,
6160 },
6161 rationale: None,
6162 renamed_from: None,
6163 rls_enabled: false,
6164 table: "".to_string(),
6165 table_comment: None,
6166 storage_params: None,
6167 tablespace: None,
6168 tenant_key: None,
6169 }
6170 }
6171
6172 #[test]
6173 fn discover_pending_plans_refuses_malformed_pending_json() {
6174 let work = temp_workspace("discover_pending_malformed");
6175 let path = djogi::migrate::pending_json_path(
6176 &work,
6177 &BucketKey {
6178 database: "main".to_string(),
6179 app: String::new(),
6180 },
6181 );
6182 fs::create_dir_all(path.parent().unwrap()).unwrap();
6183 fs::write(&path, b"{ not json").unwrap();
6184
6185 let err = discover_pending_plans(&work).expect_err("malformed pending must refuse");
6186 assert!(err.contains("parse pending JSON"));
6187 let _ = fs::remove_dir_all(&work);
6188 }
6189
6190 #[test]
6191 fn discover_pending_plans_refuses_hidden_phase_zero_database_mismatch() {
6192 let work = temp_workspace("discover_pending_phase_zero_db_mismatch");
6193 write_pending_json(
6194 &djogi::migrate::phase_zero_pending_json_path(
6195 &work,
6196 "main",
6197 djogi::migrate::PHASE_ZERO_VERSION,
6198 ),
6199 "other_db",
6200 "",
6201 djogi::migrate::PHASE_ZERO_VERSION,
6202 &[],
6203 );
6204
6205 let err = discover_pending_plans(&work).expect_err("hidden Phase 0 mismatch must refuse");
6206 assert!(
6207 err.contains("expected main from path"),
6208 "unexpected error: {err}"
6209 );
6210 let _ = fs::remove_dir_all(&work);
6211 }
6212
6213 #[test]
6214 fn discover_pending_plans_refuses_normal_global_phase_zero_pending() {
6215 let work = temp_workspace("discover_pending_normal_global_phase_zero");
6216 let path = djogi::migrate::pending_json_path(
6217 &work,
6218 &BucketKey {
6219 database: "main".to_string(),
6220 app: String::new(),
6221 },
6222 );
6223 write_pending_json(&path, "main", "", djogi::migrate::PHASE_ZERO_VERSION, &[]);
6224
6225 let err = discover_pending_plans(&work).expect_err("normal-global Phase 0 must refuse");
6226 assert!(
6227 err.contains("Phase 0") && err.contains(".phase_zero"),
6228 "unexpected error: {err}"
6229 );
6230 let _ = fs::remove_dir_all(&work);
6231 }
6232
6233 #[test]
6234 fn discover_pending_plans_refuses_normal_pending_app_mismatch() {
6235 let work = temp_workspace("discover_pending_normal_app_mismatch");
6236 let path = djogi::migrate::pending_json_path(
6237 &work,
6238 &BucketKey {
6239 database: "main".to_string(),
6240 app: "billing".to_string(),
6241 },
6242 );
6243 write_pending_json(&path, "main", "audit", "V20260606010101__mismatch", &[]);
6244
6245 let err = discover_pending_plans(&work).expect_err("normal app mismatch must refuse");
6246 assert!(
6247 err.contains("expected billing from path"),
6248 "unexpected error: {err}"
6249 );
6250 let _ = fs::remove_dir_all(&work);
6251 }
6252
6253 #[test]
6254 fn discover_pending_plans_refuses_noncanonical_normal_pending_filename() {
6255 let work = temp_workspace("discover_pending_noncanonical_filename");
6256 let path = work.join("target/djogi_pending/main/bad-name.json");
6257 write_pending_json(&path, "main", "bad-name", "V20260606010101__bad_name", &[]);
6258
6259 let err = discover_pending_plans(&work).expect_err("non-canonical filename must refuse");
6260 assert!(
6261 err.contains("non-canonical app filename"),
6262 "unexpected error: {err}"
6263 );
6264 let _ = fs::remove_dir_all(&work);
6265 }
6266
6267 #[test]
6268 fn load_verified_pending_for_apply_refuses_changed_artifact() {
6269 let work = temp_workspace("apply_pending_changed_after_discovery");
6270 let path = djogi::migrate::pending_json_path(
6271 &work,
6272 &BucketKey {
6273 database: "main".to_string(),
6274 app: String::new(),
6275 },
6276 );
6277 write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
6278 let discovered = discover_pending_plans(&work).expect("discover");
6279 fs::write(
6280 &path,
6281 serde_json::to_vec_pretty(&PendingPlan {
6282 version: "V20260606010102__changed".to_string(),
6283 ..discovered[0].plan.clone()
6284 })
6285 .unwrap(),
6286 )
6287 .unwrap();
6288
6289 let err = load_verified_pending_for_apply(&discovered[0])
6290 .expect_err("apply must refuse a changed pending artifact");
6291 assert!(
6292 err.contains("changed after discovery"),
6293 "unexpected error: {err}"
6294 );
6295 let _ = fs::remove_dir_all(&work);
6296 }
6297
6298 #[test]
6299 fn reconcile_pending_plans_after_lock_refuses_added_artifact() {
6300 let work = temp_workspace("apply_pending_added_before_lock");
6301 let path = djogi::migrate::pending_json_path(
6302 &work,
6303 &BucketKey {
6304 database: "main".to_string(),
6305 app: String::new(),
6306 },
6307 );
6308 write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
6309 let discovered = discover_pending_plans(&work).expect("discover");
6310 write_pending_json(
6311 &djogi::migrate::phase_zero_pending_json_path(
6312 &work,
6313 "main",
6314 djogi::migrate::PHASE_ZERO_VERSION,
6315 ),
6316 "main",
6317 "",
6318 djogi::migrate::PHASE_ZERO_VERSION,
6319 &[],
6320 );
6321
6322 let err = reconcile_pending_plans_after_lock(&work, &discovered)
6323 .expect_err("locked reconciliation must refuse a changed pending set");
6324 assert!(
6325 err.contains("changed while waiting for the workspace lock"),
6326 "unexpected error: {err}"
6327 );
6328 let _ = fs::remove_dir_all(&work);
6329 }
6330
6331 #[test]
6332 fn reconcile_pending_plans_after_lock_accepts_unchanged_set() {
6333 let work = temp_workspace("apply_pending_stable_under_lock");
6334 let path = djogi::migrate::pending_json_path(
6335 &work,
6336 &BucketKey {
6337 database: "main".to_string(),
6338 app: String::new(),
6339 },
6340 );
6341 write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
6342 let discovered = discover_pending_plans(&work).expect("discover");
6343
6344 let locked = reconcile_pending_plans_after_lock(&work, &discovered)
6345 .expect("unchanged set must reconcile");
6346 assert_eq!(locked, discovered);
6347 let _ = fs::remove_dir_all(&work);
6348 }
6349
6350 #[test]
6351 fn repair_checksum_drift_is_identity_free() {
6352 let work = temp_workspace("repair_checksum_identity_free");
6353 write_unreachable_config(&work);
6354
6355 let exit = without_database_url(|| {
6356 repair_checksum_drift_cmd(
6357 "V20260601000000__repair_checksum",
6358 None,
6359 None,
6360 Some("V1:0000000000000000000000000000000000000000000000000000000000000000"),
6361 None,
6362 Some(work.clone()),
6363 )
6364 });
6365
6366 assert_eq!(
6367 exit,
6368 ExitCode::from(1),
6369 "checksum-drift should reach pool connection without shared identity validation"
6370 );
6371 let _ = fs::remove_dir_all(&work);
6372 }
6373
6374 #[test]
6375 fn repair_partial_apply_is_identity_free() {
6376 let work = temp_workspace("repair_partial_identity_free");
6377 write_unreachable_config(&work);
6378
6379 let exit = without_database_url(|| {
6380 repair_partial_apply_cmd(
6381 "V20260601000000__repair_partial",
6382 PartialApplyResolution::MarkRolledBack,
6383 "operator confirmed rollback",
6384 None,
6385 None,
6386 Some(work.clone()),
6387 )
6388 });
6389
6390 assert_eq!(
6391 exit,
6392 ExitCode::from(1),
6393 "partial-apply should reach pool connection without shared identity validation"
6394 );
6395 let _ = fs::remove_dir_all(&work);
6396 }
6397
6398 #[test]
6399 fn repair_snapshot_rebuild_is_identity_free() {
6400 let work = temp_workspace("repair_snapshot_identity_free");
6401 write_unreachable_config(&work);
6402
6403 let exit = without_database_url(|| {
6404 repair_snapshot_rebuild_cmd(None, None, None, Some(work.clone()))
6405 });
6406
6407 assert_eq!(
6408 exit,
6409 ExitCode::from(1),
6410 "snapshot-rebuild should reach pool connection without shared identity validation"
6411 );
6412 let _ = fs::remove_dir_all(&work);
6413 }
6414
6415 #[test]
6427 fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
6428 use djogi::migrate::projection::BucketKey;
6429 use djogi::migrate::schema::{
6430 ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
6431 };
6432 use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
6433 use std::collections::BTreeMap;
6434
6435 let work = temp_workspace("b1r2_compose_uses_discovery");
6436
6437 let billing_bucket = BucketKey {
6440 database: "main".into(),
6441 app: "billing".into(),
6442 };
6443 let mut billing_snap = AppliedSchema {
6444 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
6445 enums: BTreeMap::new(),
6446 format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
6447 generated_at: "2026-04-25T00:00:00Z".to_string(),
6448 indexes: Vec::new(),
6449 models: BTreeMap::new(),
6450 registered_apps: vec!["billing".to_string()],
6451 };
6452 billing_snap.models.insert(
6453 "widgets".to_string(),
6454 TableSchema {
6455 app: Some("billing".to_string()),
6456 columns: vec![ColumnSchema {
6457 check: None,
6458 codec: None,
6459 comment: None,
6460 default_sql: Some("heerid_next_desc()".to_string()),
6461 foreign_key: None,
6462 generated: None,
6463 identity: None,
6464 index_type: None,
6465 indexed: false,
6466 max_length: None,
6467 name: "id".to_string(),
6468 nullable: false,
6469 on_delete: None,
6470 outbox_exclude: false,
6471 rationale: None,
6472 relation_kind: None,
6473 renamed_from: None,
6474 sequence_within: None,
6475 sql_type: "BIGINT".to_string(),
6476 unique: false,
6477 type_change_using: None,
6478 }],
6479 exclusion_constraints: Vec::new(),
6480 fts: None,
6481 is_through: false,
6482 moved_from_app: None,
6483 partition: None,
6484 primary_key: PrimaryKeySchema {
6485 columns: vec!["id".to_string()],
6486 kind: PkKindSchema::HeerIdRecencyBiased,
6487 },
6488 rationale: None,
6489 renamed_from: None,
6490 rls_enabled: false,
6491 table: "widgets".to_string(),
6492 table_comment: None,
6493 storage_params: None,
6494 tablespace: None,
6495 tenant_key: None,
6496 },
6497 );
6498 let snap_path = snapshot_path(&work, &billing_bucket);
6499 save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
6500
6501 let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
6505 let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
6506
6507 let exit = compose_with_inputs(
6508 &work,
6509 "drop billing remnant",
6510 true, false, &empty_models,
6513 &[AppLifecycle {
6514 label: "billing".to_string(),
6515 database: "main".to_string(),
6516 renamed_from: None,
6517 tombstone: true, }],
6519 now,
6520 None, );
6522 assert_eq!(exit, ExitCode::from(0), "compose must succeed");
6523
6524 let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
6527 let mut up_path: Option<PathBuf> = None;
6528 for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
6529 let n = entry.file_name().to_string_lossy().to_string();
6530 if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
6533 up_path = Some(entry.path());
6534 break;
6535 }
6536 }
6537 let up_path = up_path.expect("compose must have written an up SQL file");
6538 let up_sql = fs::read_to_string(&up_path).unwrap();
6539 assert!(
6540 up_sql.contains("DROP TABLE \"widgets\""),
6541 "compose must have seen the disk snapshot and emitted DROP TABLE — \
6542 this proves discover_snapshot_buckets_on_disk reached the differ. \
6543 SQL: {up_sql}"
6544 );
6545 let _ = fs::remove_dir_all(&work);
6546 }
6547
6548 #[test]
6557 fn compose_cycle_exits_with_code_two() {
6558 use djogi::migrate::projection::BucketKey;
6559 use djogi::migrate::schema::{
6560 AppliedSchema, ColumnSchema, ForeignKeySchema, OnDeleteSchema, PkKindSchema,
6561 PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
6562 };
6563 use std::collections::BTreeMap;
6564
6565 let work = temp_workspace("compose_cycle_exit_two");
6566
6567 let fk_col = |name: &str, target_table: &str| -> ColumnSchema {
6569 ColumnSchema {
6570 name: name.to_string(),
6571 sql_type: "BIGINT".to_string(),
6572 foreign_key: Some(ForeignKeySchema {
6573 deferrable: false,
6574 initially_deferred: false,
6575 on_delete: OnDeleteSchema::Restrict,
6576 ref_column: "id".to_string(),
6577 ref_table: target_table.to_string(),
6578 }),
6579 ..default_col()
6580 }
6581 };
6582
6583 let table_with_fk =
6585 |app: &str, table: &str, fk_name: &str, fk_target: &str| -> TableSchema {
6586 let id_col = ColumnSchema {
6587 name: "id".to_string(),
6588 sql_type: "BIGINT".to_string(),
6589 default_sql: Some("heerid_next_desc()".to_string()),
6590 ..default_col()
6591 };
6592 TableSchema {
6593 app: Some(app.to_string()),
6594 columns: vec![id_col, fk_col(fk_name, fk_target)],
6595 primary_key: PrimaryKeySchema {
6596 columns: vec!["id".to_string()],
6597 kind: PkKindSchema::HeerIdRecencyBiased,
6598 },
6599 table: table.to_string(),
6600 ..default_table()
6601 }
6602 };
6603
6604 let schema_for =
6605 |app: &str, table: &str, fk_name: &str, fk_target: &str| -> AppliedSchema {
6606 let mut models = BTreeMap::new();
6607 models.insert(
6608 table.to_string(),
6609 table_with_fk(app, table, fk_name, fk_target),
6610 );
6611 AppliedSchema {
6612 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
6613 enums: BTreeMap::new(),
6614 format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
6615 generated_at: "2026-06-10T00:00:00Z".to_string(),
6616 indexes: Vec::new(),
6617 models,
6618 registered_apps: vec![app.to_string()],
6619 }
6620 };
6621
6622 let a_bucket = BucketKey {
6623 database: "main".into(),
6624 app: "a".into(),
6625 };
6626 let b_bucket = BucketKey {
6627 database: "main".into(),
6628 app: "b".into(),
6629 };
6630
6631 let mut models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
6633 models.insert(a_bucket, schema_for("a", "table_a", "b_id", "table_b"));
6634 models.insert(b_bucket, schema_for("b", "table_b", "a_id", "table_a"));
6635
6636 let now = time::OffsetDateTime::from_unix_timestamp(1_749_513_600).unwrap();
6637 let exit = compose_with_inputs(
6638 &work,
6639 "cross-bucket cycle",
6640 false, false, &models,
6643 &[
6644 AppLifecycle {
6645 label: "a".to_string(),
6646 database: "main".to_string(),
6647 renamed_from: None,
6648 tombstone: false,
6649 },
6650 AppLifecycle {
6651 label: "b".to_string(),
6652 database: "main".to_string(),
6653 renamed_from: None,
6654 tombstone: false,
6655 },
6656 ],
6657 now,
6658 None, );
6660
6661 assert_eq!(
6662 exit,
6663 ExitCode::from(2),
6664 "a cross-bucket FK cycle must exit 2 (operator-actionable refusal), not 1"
6665 );
6666 let _ = fs::remove_dir_all(&work);
6667 }
6668
6669 #[test]
6672 fn u4_compose_refusal_variants_map_to_exit_code_two() {
6673 use djogi::migrate::{BucketKey, Classification, ComposeError};
6674
6675 let bucket = BucketKey {
6676 database: "main".to_string(),
6677 app: "billing".to_string(),
6678 };
6679
6680 let cases = [
6681 ComposeError::TombstonedAppRequiresAllowDestructive {
6682 app_label: "billing".to_string(),
6683 database: "main".to_string(),
6684 text: "D011: tombstoned app".to_string(),
6685 },
6686 ComposeError::DestructiveRequiresAllowDestructive {
6687 bucket: bucket.clone(),
6688 classification: Classification::Lossy,
6689 },
6690 ComposeError::UnsupportedDelta {
6691 bucket: bucket.clone(),
6692 reason: "unsupported transition".to_string(),
6693 },
6694 ComposeError::HandEditedMigrationWouldBeOverwritten {
6695 bucket: bucket.clone(),
6696 path: std::path::PathBuf::from("/tmp/example.sdjql"),
6697 text: "D013: hand-edited migration would be overwritten".to_string(),
6698 },
6699 ComposeError::PendingJsonWouldBeOverwritten {
6700 path: std::path::PathBuf::from("/tmp/example.pending"),
6701 text: "pending json mismatch".to_string(),
6702 },
6703 ComposeError::FolderRenameTargetCollision {
6704 from: std::path::PathBuf::from("/tmp/old"),
6705 to: std::path::PathBuf::from("/tmp/new"),
6706 offending_entry: "users.sdjql".to_string(),
6707 },
6708 ComposeError::LinkageDropWithoutModels {
6709 app_label: "billing".to_string(),
6710 database: "main".to_string(),
6711 text: "D010: linkage drop without models".to_string(),
6712 },
6713 ComposeError::CrossBucketForeignKeyCycle {
6714 database: "main".to_string(),
6715 chain: vec!["a".to_string(), "b".to_string()],
6716 },
6717 ];
6718
6719 for case in &cases {
6720 assert_eq!(
6721 compose_error_exit_code(case),
6722 2,
6723 "compose refusal variant must map to exit 2: {case}"
6724 );
6725 }
6726 }
6727
6728 #[test]
6734 fn u4_compose_nothing_to_compose_maps_to_exit_code_zero() {
6735 use djogi::migrate::ComposeError;
6736
6737 assert_eq!(
6738 compose_error_exit_code(&ComposeError::NothingToCompose),
6739 0,
6740 "NothingToCompose must map to exit 0 in the helper, not the wildcard's 1"
6741 );
6742 }
6743
6744 #[test]
6752 fn u4_compose_runtime_variants_map_to_exit_code_one() {
6753 use djogi::migrate::schema::PkKindSchema;
6754 use djogi::migrate::{
6755 AutoEmitError, ComposeError, DiffError, PkFlipError, SnapshotError, SqlEmitError,
6756 };
6757
6758 let cases = [
6759 ComposeError::Io {
6761 path: std::path::PathBuf::from("/tmp/io-failure"),
6762 source: std::io::Error::other("io failure"),
6763 },
6764 ComposeError::SqlEmit(SqlEmitError::PkTypeFlipMustRouteToT9 {
6768 table: "orders".to_string(),
6769 from: PkKindSchema::HeerIdRecencyBiased,
6770 to: PkKindSchema::HeerId,
6771 }),
6772 ComposeError::Diff(DiffError::PkFlipMalformedSelfFkMetadata(
6778 PkFlipError::MalformedSelfFkMetadata {
6779 parent_table: "nodes".to_string(),
6780 fk_columns: 2,
6781 fk_constraint_names: 1,
6782 fk_deferrable: 2,
6783 fk_initially_deferred: 2,
6784 },
6785 )),
6786 ComposeError::PhaseZeroAutoEmit(AutoEmitError::Io {
6788 path: std::path::PathBuf::from("/tmp/bootstrap.sdjql"),
6789 source: std::io::Error::other("disk full"),
6790 }),
6791 ComposeError::PhaseZeroAutoEmit(AutoEmitError::PendingJson(
6793 serde_json::from_str::<serde_json::Value>("not-json-at-all").unwrap_err(),
6794 )),
6795 ComposeError::SerializeFailed(SnapshotError::Parse {
6797 path: None,
6798 source: serde_json::from_str::<serde_json::Value>("not-json").unwrap_err(),
6799 }),
6800 ];
6801
6802 for case in &cases {
6803 assert_eq!(
6804 compose_error_exit_code(case),
6805 1,
6806 "compose runtime variant must map to exit 1: {case}"
6807 );
6808 }
6809 }
6810
6811 #[test]
6817 fn u4_compose_nested_refusal_variants_map_to_exit_code_two() {
6818 use djogi::migrate::{
6819 AutoEmitError, BootstrapError, ComposeError, DiffError, SqlEmitError,
6820 };
6821
6822 let cases = [
6823 ComposeError::SqlEmit(SqlEmitError::Unsupported {
6825 reason: "enum-variant removal requires hand-written migration".to_string(),
6826 }),
6827 ComposeError::SqlEmit(SqlEmitError::UnsupportedPartitionChange {
6828 table: "events".to_string(),
6829 detail: "changing partition method requires full table rebuild".to_string(),
6830 }),
6831 ComposeError::SqlEmit(SqlEmitError::InvalidStorageParams {
6832 params: "fillfactor=invalid".to_string(),
6833 reason: "fillfactor must be an integer 10..100".to_string(),
6834 }),
6835 ComposeError::Diff(DiffError::PkFlipCascadeDepthExceeded {
6837 parent_table: "vehicles".to_string(),
6838 chain: vec!["vehicles".to_string(), "parts".to_string()],
6839 max_depth: 65,
6840 }),
6841 ComposeError::Diff(DiffError::PartitionedMultiParentClusterUnsupported {
6842 partitioned_parents: vec!["invoices".to_string()],
6843 cross_flipping_partners: vec!["invoices".to_string(), "line_items".to_string()],
6844 }),
6845 ComposeError::PhaseZeroAutoEmit(AutoEmitError::Compose(
6847 BootstrapError::InvalidExtensionName {
6848 name: "bad-name!".to_string(),
6849 },
6850 )),
6851 ComposeError::PhaseZeroAutoEmit(AutoEmitError::Compose(
6852 BootstrapError::UnknownExtension {
6853 name: "not_in_allowlist".to_string(),
6854 },
6855 )),
6856 ComposeError::SqlEmit(SqlEmitError::Diff(DiffError::PkFlipCascadeDepthExceeded {
6859 parent_table: "widgets".to_string(),
6860 chain: vec!["widgets".to_string()],
6861 max_depth: 65,
6862 })),
6863 ];
6864
6865 for case in &cases {
6866 assert_eq!(
6867 compose_error_exit_code(case),
6868 2,
6869 "nested operator-actionable compose refusal must map to exit 2: {case}"
6870 );
6871 }
6872 }
6873
6874 #[test]
6882 fn a1_round2_status_cmd_threads_workspace_to_config() {
6883 let work = temp_workspace("a1r2_status_workspace");
6884 fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
6889 let exit = status_cmd(Some(work.clone()));
6890 assert_eq!(
6891 exit,
6892 ExitCode::from(1),
6893 "malformed workspace Djogi.toml must surface as config load error"
6894 );
6895 let _ = fs::remove_dir_all(&work);
6896 }
6897
6898 #[test]
6906 fn u3_attune_refusal_variants_map_to_exit_code_two() {
6907 use djogi::migrate::AttuneRefusal;
6908 let cases = [
6909 AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
6910 database_url: "postgres://prod.example.com/main".to_string(),
6911 }),
6912 AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
6913 profile: "production".to_string(),
6914 }),
6915 AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
6918 AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
6919 env_value: "production".to_string(),
6920 }),
6921 AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
6922 version: "V20260101000000__missing".to_string(),
6923 }),
6924 AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
6925 version: "V20260101000000__shared".to_string(),
6926 buckets: vec!["main/users".to_string(), "main/billing".to_string()],
6927 }),
6928 ];
6929 for err in &cases {
6930 assert_eq!(
6931 attune_error_exit_code(err),
6932 2,
6933 "refusal variant must map to exit 2: {err}"
6934 );
6935 }
6936 }
6937
6938 #[test]
6943 fn u3_attune_runtime_variants_map_to_exit_code_one() {
6944 let cases = [
6945 AttuneError::FilesystemScanFailed {
6946 source: std::io::Error::other("disk full"),
6947 },
6948 AttuneError::SqlReadFailed {
6949 path: PathBuf::from("/tmp/x.sdjql"),
6950 source: std::io::Error::other("permission denied"),
6951 },
6952 AttuneError::SqlWriteFailed {
6953 path: PathBuf::from("/tmp/x.sdjql"),
6954 source: std::io::Error::other("read-only fs"),
6955 },
6956 AttuneError::SqlDeleteFailed {
6957 path: PathBuf::from("/tmp/x.sdjql"),
6958 source: std::io::Error::other("not found"),
6959 },
6960 AttuneError::GitPublishFailed {
6961 stderr: "fatal: refusing to push".to_string(),
6962 status_code: Some(128),
6963 },
6964 ];
6965 for err in &cases {
6966 assert_eq!(
6967 attune_error_exit_code(err),
6968 1,
6969 "runtime variant must map to exit 1: {err}"
6970 );
6971 }
6972 }
6973
6974 #[test]
6977 fn rollback_lossy_opt_in_requires_non_empty_reason() {
6978 let workspace = Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws"));
6979
6980 let missing = rollback_cmd(
6981 None,
6982 false,
6983 true,
6984 None,
6985 None,
6986 None,
6987 workspace.clone(),
6988 None,
6989 false,
6990 );
6991 assert_eq!(
6992 missing,
6993 ExitCode::from(2),
6994 "--allow-data-loss without --reason must exit 2 before any DB work"
6995 );
6996
6997 let blank = rollback_cmd(
6998 None,
6999 false,
7000 true,
7001 Some(" ".to_string()),
7002 None,
7003 None,
7004 workspace,
7005 None,
7006 false,
7007 );
7008 assert_eq!(
7009 blank,
7010 ExitCode::from(2),
7011 "blank --reason with --allow-data-loss must exit 2 before any DB work"
7012 );
7013 }
7014
7015 #[test]
7016 fn rollback_allow_data_loss_with_whitespace_reason_exits_code_2() {
7017 let work = temp_workspace("rollback-empty-reason");
7018 write_unreachable_config(&work);
7019 let exit = without_database_url(|| {
7020 rollback_cmd(
7021 None,
7022 false,
7023 true,
7024 Some(" ".to_string()),
7025 None,
7026 None,
7027 Some(work.clone()),
7028 None,
7029 true,
7030 )
7031 });
7032 assert_eq!(exit, ExitCode::from(2));
7033 let _ = std::fs::remove_dir_all(&work);
7034 }
7035
7036 #[test]
7037 fn rollback_missing_identity_exits_code_2_before_db_work() {
7038 let work = temp_workspace("rollback-no-identity");
7039 write_unreachable_config(&work);
7040 let exit = without_database_url(|| {
7041 rollback_cmd(
7042 None,
7043 false,
7044 false,
7045 None,
7046 None,
7047 None,
7048 Some(work.clone()),
7049 None,
7050 false,
7051 )
7052 });
7053 assert_eq!(exit, ExitCode::from(2));
7054 let _ = std::fs::remove_dir_all(&work);
7055 }
7056
7057 #[test]
7058 fn rollback_unreachable_database_exits_code_1() {
7059 let work = temp_workspace("rollback-unreachable");
7060 write_unreachable_config(&work);
7061 let exit = without_database_url(|| {
7062 rollback_cmd(
7063 None,
7064 false,
7065 false,
7066 None,
7067 None,
7068 None,
7069 Some(work.clone()),
7070 None,
7071 true,
7072 )
7073 });
7074 assert_eq!(exit, ExitCode::from(1));
7075 let _ = std::fs::remove_dir_all(&work);
7076 }
7077
7078 #[test]
7079 fn rollback_refusal_variants_map_to_exit_code_two() {
7080 use djogi::migrate::LossyRollbackKind;
7081
7082 let cases = [
7083 RollbackError::LossyRollbackRefused {
7084 offending_labels: vec!["DropTable widgets".to_string()],
7085 kinds: vec![LossyRollbackKind::DropTable],
7086 },
7087 RollbackError::VersionNotRollbackable {
7088 version: "V20260101000000__widgets".to_string(),
7089 current_status: LedgerStatus::Pending,
7090 },
7091 RollbackError::VersionNotFound {
7092 version: "V20260101000000__widgets".to_string(),
7093 },
7094 RollbackError::MissingRollbackIdentity {
7095 version: "V20260101000000__widgets".to_string(),
7096 },
7097 ];
7098
7099 for err in &cases {
7100 assert_eq!(
7101 rollback_error_exit_code(err),
7102 2,
7103 "rollback refusal variant must map to exit 2: {err}"
7104 );
7105 }
7106 }
7107
7108 #[test]
7109 fn rollback_runner_refusal_variants_map_to_exit_code_two() {
7110 let err = RollbackError::Runner {
7111 source: RunnerError::PartitionExpansionNoLeaves {
7112 parent: "public.events".to_string(),
7113 statement_label: "expand_partitions".to_string(),
7114 },
7115 live_db_committed: false,
7116 };
7117 assert_eq!(
7118 rollback_error_exit_code(&err),
7119 2,
7120 "rollback should classify replay-strict refusal variants as exit 2"
7121 );
7122 }
7123
7124 #[test]
7125 fn rollback_transient_variants_map_to_exit_code_one() {
7126 use djogi::error::{DbError, DjogiError};
7127
7128 let cases = [
7129 RollbackError::Runner {
7130 source: RunnerError::PinnedSessionCheckoutFailed {
7131 source: DjogiError::Db(DbError::other("pool exhausted")),
7132 },
7133 live_db_committed: false,
7134 },
7135 RollbackError::DownStatementFailed {
7136 segment_index: 0,
7137 statement_label: "DROP TABLE widgets".to_string(),
7138 live_db_committed: false,
7139 source: DjogiError::Db(DbError::other("syntax error")),
7140 },
7141 ];
7142
7143 for err in &cases {
7144 assert_eq!(
7145 rollback_error_exit_code(err),
7146 1,
7147 "rollback transient variant must map to exit 1: {err}"
7148 );
7149 }
7150 }
7151
7152 #[test]
7153 fn rollback_snapshot_persist_failed_maps_to_exit_two() {
7154 use djogi::migrate::SnapshotError;
7155
7156 let err = RollbackError::SnapshotPersistFailed {
7161 path: PathBuf::from("/tmp/schema_snapshot.json"),
7162 source: SnapshotError::Io {
7163 path: Some(PathBuf::from("/tmp/schema_snapshot.json")),
7164 source: std::io::Error::new(std::io::ErrorKind::PermissionDenied, "denied"),
7165 },
7166 };
7167 assert_eq!(
7168 rollback_error_exit_code(&err),
7169 2,
7170 "SnapshotPersistFailed must map to exit 2 (post-commit repair signal)"
7171 );
7172 }
7173
7174 #[test]
7175 fn rollback_targets_without_to_selects_newest_applied_row_only() {
7176 let rows = vec![
7177 ledger_row(10, "V20260101000001__old", LedgerStatus::Applied, ""),
7178 ledger_row(11, "V20260101000002__new", LedgerStatus::Applied, ""),
7179 ledger_row(
7180 12,
7181 "V20260101000003__other_app",
7182 LedgerStatus::Applied,
7183 "billing",
7184 ),
7185 ];
7186
7187 let selected = select_rollback_targets(&rows, "", None).expect("selection ok");
7188 assert_eq!(selected.len(), 1);
7189 assert_eq!(selected[0].version, "V20260101000002__new");
7190 }
7191
7192 #[test]
7193 fn rollback_targets_with_to_selects_every_newer_non_rolled_back_row() {
7194 let rows = vec![
7195 ledger_row(10, "V20260101000001__base", LedgerStatus::Applied, ""),
7196 ledger_row(11, "V20260101000002__middle", LedgerStatus::Faked, ""),
7197 ledger_row(12, "V20260101000003__newest", LedgerStatus::Applied, ""),
7198 ];
7199
7200 let selected = select_rollback_targets(&rows, "", Some("V20260101000001__base"))
7201 .expect("selection ok");
7202 assert_eq!(
7203 selected
7204 .iter()
7205 .map(|row| row.version.as_str())
7206 .collect::<Vec<_>>(),
7207 vec!["V20260101000003__newest", "V20260101000002__middle"]
7208 );
7209 }
7210
7211 #[test]
7212 fn rollback_targets_refuse_pending_row_inside_to_range() {
7213 let rows = vec![
7214 ledger_row(10, "V20260101000001__base", LedgerStatus::Applied, ""),
7215 ledger_row(11, "V20260101000002__pending", LedgerStatus::Pending, ""),
7216 ledger_row(12, "V20260101000003__newest", LedgerStatus::Applied, ""),
7217 ];
7218
7219 let err = select_rollback_targets(&rows, "", Some("V20260101000001__base"))
7220 .expect_err("pending row must refuse");
7221 assert!(err.contains("resolve it with `djogi migrations repair`"));
7222 assert!(err.contains("V20260101000002__pending"));
7223 }
7224
7225 #[test]
7226 fn rollback_targets_refuse_missing_to_version() {
7227 let rows = vec![ledger_row(
7228 10,
7229 "V20260101000001__base",
7230 LedgerStatus::Applied,
7231 "",
7232 )];
7233
7234 let err = select_rollback_targets(&rows, "", Some("V20260101000099__missing"))
7235 .expect_err("missing --to must refuse");
7236 assert!(err.contains("is not present in this bucket's ledger"));
7237 }
7238
7239 #[test]
7240 fn rollback_targets_skip_rolled_back_rows_inside_to_range() {
7241 let rows = vec![
7244 ledger_row(10, "V20260101000001__base", LedgerStatus::Applied, ""),
7245 ledger_row(11, "V20260101000002__undone", LedgerStatus::RolledBack, ""),
7246 ledger_row(12, "V20260101000003__newest", LedgerStatus::Applied, ""),
7247 ];
7248
7249 let selected = select_rollback_targets(&rows, "", Some("V20260101000001__base"))
7250 .expect("selection ok");
7251 assert_eq!(
7252 selected
7253 .iter()
7254 .map(|row| row.version.as_str())
7255 .collect::<Vec<_>>(),
7256 vec!["V20260101000003__newest"]
7259 );
7260 }
7261
7262 #[test]
7263 fn rollback_targets_without_to_stop_at_newest_baseline_row() {
7264 let rows = vec![
7267 ledger_row(10, "V20260101000001__older", LedgerStatus::Applied, ""),
7268 ledger_row(11, "V20260101000002__baseline", LedgerStatus::Baseline, ""),
7269 ];
7270
7271 let selected = select_rollback_targets(&rows, "", None).expect("selection ok");
7272 assert!(
7273 selected.is_empty(),
7274 "baseline as newest row must yield no rollback targets, got {selected:?}"
7275 );
7276 }
7277
7278 #[test]
7279 fn rollback_targets_refuse_baseline_above_floor() {
7280 let rows = vec![
7283 ledger_row(10, "V20260101000001__base", LedgerStatus::Applied, ""),
7284 ledger_row(11, "V20260101000002__baseline", LedgerStatus::Baseline, ""),
7285 ledger_row(12, "V20260101000003__newest", LedgerStatus::Applied, ""),
7286 ];
7287
7288 let err = select_rollback_targets(&rows, "", Some("V20260101000001__base"))
7289 .expect_err("baseline above floor must refuse");
7290 assert!(err.contains("cannot roll back past baseline"));
7291 assert!(err.contains("V20260101000002__baseline"));
7292 }
7293
7294 #[test]
7295 fn rollback_targets_refuse_failed_row_inside_to_range() {
7296 let rows = vec![
7299 ledger_row(10, "V20260101000001__base", LedgerStatus::Applied, ""),
7300 ledger_row(11, "V20260101000002__failed", LedgerStatus::Failed, ""),
7301 ledger_row(12, "V20260101000003__newest", LedgerStatus::Applied, ""),
7302 ];
7303
7304 let err = select_rollback_targets(&rows, "", Some("V20260101000001__base"))
7305 .expect_err("failed row must refuse");
7306 assert!(err.contains("resolve it with `djogi migrations repair`"));
7307 assert!(err.contains("V20260101000002__failed"));
7308 }
7309
7310 #[test]
7311 fn rollback_drift_identical_sets_pass() {
7312 let rows = [ledger_row(1, "V1__a", LedgerStatus::Applied, "")];
7313 let pre: Vec<_> = rows.iter().collect();
7314 let locked: Vec<_> = rows.iter().collect();
7315 assert!(ensure_no_target_drift(&pre, &locked).is_ok());
7316 }
7317
7318 #[test]
7319 fn rollback_drift_between_reads_refuses_with_rerun() {
7320 let pre_rows = [ledger_row(1, "V1__a", LedgerStatus::Applied, "")];
7321 let locked_rows = [
7322 ledger_row(1, "V1__a", LedgerStatus::Applied, ""),
7323 ledger_row(2, "V2__b", LedgerStatus::Applied, ""),
7324 ];
7325 let pre: Vec<_> = pre_rows.iter().collect();
7326 let locked: Vec<_> = locked_rows.iter().collect();
7327 let err = ensure_no_target_drift(&pre, &locked).expect_err("drift must refuse");
7328 assert!(err.contains("rerun"));
7329 }
7330
7331 #[test]
7332 fn lossy_scan_detects_both_marker_spellings() {
7333 let down = "-- Djogi composed migration — down\n\
7334 -- LOSSY: DroppedColumn — data in `horsepower` is lost\n\
7335 ALTER TABLE vehicles DROP COLUMN horsepower;\n\
7336 -- LOSSY ROLLBACK: cannot recreate table `probe` from the diff.\n";
7337 let hits = scan_lossy_down_markers(down);
7338 assert_eq!(hits.len(), 2);
7339 assert!(hits[0].starts_with("-- LOSSY:"));
7340 assert!(hits[1].starts_with("-- LOSSY ROLLBACK:"));
7341 }
7342
7343 #[test]
7344 fn lossy_scan_ignores_plain_comments_and_sql() {
7345 let down = "-- ordinary comment\nDROP TABLE probe;\n";
7346 assert!(scan_lossy_down_markers(down).is_empty());
7347 }
7348
7349 #[test]
7350 fn non_transactional_down_shape_is_detectable_via_public_api() {
7351 assert_eq!(
7352 djogi::migrate::find_non_transactional_statement_shape(
7353 "CREATE INDEX CONCURRENTLY idx_probe ON widgets (id);"
7354 ),
7355 Some("CREATE INDEX CONCURRENTLY"),
7356 );
7357 assert_eq!(
7358 djogi::migrate::find_non_transactional_statement_shape("DROP TABLE widgets;"),
7359 None,
7360 );
7361 }
7362
7363 #[test]
7371 fn baseline_empty_reason_exits_code_2() {
7372 let result = baseline_cmd(
7373 "V00000000000000__baseline",
7374 "description",
7375 "",
7376 None,
7377 None,
7378 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
7379 None, false, );
7382 assert_eq!(
7383 result,
7384 ExitCode::from(2),
7385 "empty --reason must exit 2 before any DB work"
7386 );
7387 }
7388
7389 #[test]
7390 fn baseline_whitespace_reason_exits_code_2() {
7391 let result = baseline_cmd(
7392 "V00000000000000__baseline",
7393 "description",
7394 " ",
7395 None,
7396 None,
7397 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
7398 None, false, );
7401 assert_eq!(
7402 result,
7403 ExitCode::from(2),
7404 "whitespace-only --reason must exit 2 before any DB work"
7405 );
7406 }
7407
7408 #[test]
7415 fn baseline_refusal_variants_map_to_exit_code_two() {
7416 let cases = [
7417 RunnerError::VersionAlreadyApplied {
7418 version: "V00000000000000__baseline".to_string(),
7419 applied_at: None,
7420 },
7421 RunnerError::VersionCollisionNonTerminal {
7422 version: "V00000000000000__baseline".to_string(),
7423 status: LedgerStatus::Pending,
7424 run_id: 1,
7425 },
7426 RunnerError::BaselineSnapshotShouldNotBeProvided,
7427 RunnerError::SnapshotPersistFailed {
7428 path: std::path::PathBuf::from("/tmp/schema_snapshot.json"),
7429 source: djogi::migrate::snapshot::SnapshotError::Io {
7430 path: None,
7431 source: std::io::Error::other("disk full"),
7432 },
7433 },
7434 RunnerError::AdvisoryUnlockReturnedFalse {
7435 bucket: BucketKey {
7436 database: "main".to_string(),
7437 app: String::new(),
7438 },
7439 key: 0x0102_0304_0506_0708,
7440 },
7441 RunnerError::OutOfOrderRejected {
7442 version: "V00000000000000__baseline".to_string(),
7443 conflicting_version: "V20260101000000__later".to_string(),
7444 conflicting_applied_at: None,
7445 },
7446 ];
7447 for err in &cases {
7448 assert_eq!(
7449 runner_error_exit_code(err),
7450 2,
7451 "baseline refusal variant must map to exit 2: {err}"
7452 );
7453 }
7454 }
7455
7456 #[test]
7462 fn baseline_transient_variants_map_to_exit_code_one() {
7463 use djogi::error::{DbError, DjogiError};
7464 let cases = [
7465 RunnerError::LedgerBootstrapFailed {
7466 source: DjogiError::Db(DbError::other("create table failed")),
7467 },
7468 RunnerError::LedgerWriteFailed {
7469 version: "V00000000000000__baseline".to_string(),
7470 source: DjogiError::Db(DbError::other("insert failed")),
7471 },
7472 RunnerError::PinnedSessionCheckoutFailed {
7473 source: DjogiError::Db(DbError::other("pool exhausted")),
7474 },
7475 RunnerError::AdvisoryLockFailed {
7476 bucket: BucketKey {
7477 database: "main".to_string(),
7478 app: String::new(),
7479 },
7480 key: 0x0102_0304_0506_0708,
7481 attempts: 3,
7482 },
7483 ];
7484 for err in &cases {
7485 assert_eq!(
7486 runner_error_exit_code(err),
7487 1,
7488 "baseline transient variant must map to exit 1: {err}"
7489 );
7490 }
7491 }
7492
7493 #[test]
7497 fn apply_runner_refusal_variants_map_to_exit_code_two() {
7498 let cases = [
7499 RunnerError::VersionAlreadyApplied {
7500 version: "V20260101000000__add_users".to_string(),
7501 applied_at: None,
7502 },
7503 RunnerError::VersionCollisionNonTerminal {
7504 version: "V20260101000000__add_users".to_string(),
7505 status: LedgerStatus::Pending,
7506 run_id: 1,
7507 },
7508 RunnerError::StalePhaseZeroArtifact {
7509 version: "V00000000000000__phase_zero".to_string(),
7510 refusal_reason: "generated-stale",
7511 },
7512 RunnerError::OutOfOrderRejected {
7513 version: "V20260101000000__add_users".to_string(),
7514 conflicting_version: "V20260201000000__add_more".to_string(),
7515 conflicting_applied_at: Some("2026-01-01T00:00:00Z".to_string()),
7516 },
7517 RunnerError::SnapshotPersistFailed {
7518 path: std::path::PathBuf::from("/tmp/schema_snapshot.json"),
7519 source: djogi::migrate::snapshot::SnapshotError::Io {
7520 path: None,
7521 source: std::io::Error::other("disk full"),
7522 },
7523 },
7524 RunnerError::PkFlipHazardDisabledTriggers {
7525 table: "public.events".to_string(),
7526 triggers: vec![("zzz_rv_events_id".to_string(), 'D')],
7527 },
7528 RunnerError::PartitionExpansionNoLeaves {
7529 parent: "public.events".to_string(),
7530 statement_label: "expand_partitions".to_string(),
7531 },
7532 ];
7533 for err in &cases {
7534 assert_eq!(
7535 runner_error_exit_code(err),
7536 2,
7537 "runner refusal variant must map to exit 2: {err}"
7538 );
7539 }
7540 }
7541
7542 #[test]
7543 fn apply_runner_drift_variants_map_to_expected_exit_codes() {
7544 use djogi::error::{DbError, DjogiError};
7545
7546 let bucket = BucketKey {
7547 database: "main".to_string(),
7548 app: "billing".to_string(),
7549 };
7550 let report = VerifyReport {
7551 diagnostics: vec![djogi::migrate::VerifyDiagnostic {
7552 code: "D601".to_string(),
7553 severity: VerifySeverity::Error,
7554 message: "Snapshot table missing from live DB".to_string(),
7555 location: Some("billing.invoices".to_string()),
7556 }],
7557 latest_applied_version: Some("V20260601000000__billing".to_string()),
7558 applied_count: 2,
7559 unfinished_count: 0,
7560 };
7561 let cases = [
7562 (
7563 RunnerError::DriftDetected {
7564 bucket: bucket.clone(),
7565 report,
7566 },
7567 2,
7568 ),
7569 (
7570 RunnerError::DriftBaselineMissing {
7571 bucket: bucket.clone(),
7572 },
7573 2,
7574 ),
7575 (
7576 RunnerError::DriftBaselineCorrupted {
7577 bucket: bucket.clone(),
7578 reason: "unexpected end of input".to_string(),
7579 },
7580 2,
7581 ),
7582 (
7583 RunnerError::DriftPreflightFailed {
7584 source: Box::new(djogi::migrate::verify::VerifyRunError::CatalogQueryFailed {
7585 query_label: "columns",
7586 source: DjogiError::Db(DbError::other("catalog read failed")),
7587 }),
7588 },
7589 1,
7590 ),
7591 ];
7592 for (err, expected) in cases {
7593 assert_eq!(
7594 runner_error_exit_code(&err),
7595 expected,
7596 "drift variant must map to exit code {expected}: {err}"
7597 );
7598 }
7599 }
7600
7601 #[test]
7605 fn apply_runner_transient_variants_map_to_exit_code_one() {
7606 use djogi::error::{DbError, DjogiError};
7607 let cases = [
7608 RunnerError::LockTimeout {
7609 path: std::path::PathBuf::from("/tmp/.djogi-migrations-lock"),
7610 holder_pid: None,
7611 },
7612 RunnerError::LedgerWriteFailed {
7613 version: "V20260101000000__add_users".to_string(),
7614 source: DjogiError::Db(DbError::other("insert failed")),
7615 },
7616 RunnerError::CatalogQueryFailed {
7617 query_label: "pg_class relpages",
7618 source: DjogiError::Db(DbError::other("query failed")),
7619 },
7620 RunnerError::PinnedSessionCheckoutFailed {
7621 source: DjogiError::Db(DbError::other("pool unavailable")),
7622 },
7623 RunnerError::AdvisoryLockFailed {
7624 bucket: BucketKey {
7625 database: "main".to_string(),
7626 app: String::new(),
7627 },
7628 key: 0x0102_0304_0506_0708,
7629 attempts: 3,
7630 },
7631 ];
7632 for err in &cases {
7633 assert_eq!(
7634 runner_error_exit_code(err),
7635 1,
7636 "runner transient variant must map to exit 1: {err}"
7637 );
7638 }
7639 }
7640
7641 #[test]
7645 fn fake_without_reason_exits_code_2() {
7646 let result = apply_cmd(
7647 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
7648 true,
7649 None,
7650 None, false, );
7653 assert_eq!(
7654 result,
7655 ExitCode::from(2),
7656 "--fake without --reason must exit 2"
7657 );
7658 }
7659
7660 #[test]
7662 fn fake_with_empty_reason_exits_code_2() {
7663 let result = apply_cmd(
7664 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
7665 true,
7666 Some(String::new()),
7667 None, false, );
7670 assert_eq!(
7671 result,
7672 ExitCode::from(2),
7673 "--fake with empty reason must exit 2"
7674 );
7675 }
7676
7677 #[test]
7679 fn fake_with_whitespace_reason_exits_code_2() {
7680 let result = apply_cmd(
7681 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
7682 true,
7683 Some(" ".to_string()),
7684 None, false, );
7687 assert_eq!(
7688 result,
7689 ExitCode::from(2),
7690 "--fake with whitespace reason must exit 2"
7691 );
7692 }
7693
7694 #[test]
7696 fn reason_without_fake_is_accepted() {
7697 let result = apply_cmd(
7701 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
7702 false, Some("test reason".to_string()),
7704 None, true, );
7707 assert_ne!(
7709 result,
7710 ExitCode::from(2),
7711 "--reason without --fake should not refuse"
7712 );
7713 }
7714
7715 #[test]
7716 fn load_drift_baseline_fake_skips_filesystem_and_real_missing_maps_missing() {
7717 let work = temp_workspace("load-drift-baseline");
7718 let missing = work.join("schema_snapshot.json");
7719
7720 assert!(
7721 matches!(
7722 load_drift_baseline(
7723 &FakeMode::Fake {
7724 reason: "adopt existing schema".to_string(),
7725 },
7726 &missing,
7727 ),
7728 DriftBaseline::Disabled
7729 ),
7730 "fake apply must not touch the snapshot path"
7731 );
7732 assert!(
7733 matches!(
7734 load_drift_baseline(&FakeMode::Real, &missing),
7735 DriftBaseline::Missing
7736 ),
7737 "real apply must surface missing snapshot as a typed baseline state"
7738 );
7739 }
7740
7741 #[test]
7742 fn load_drift_baseline_real_corrupt_snapshot_maps_to_corrupted() {
7743 let work = temp_workspace("load-drift-baseline-corrupt");
7744 let path = work.join("schema_snapshot.json");
7745 fs::write(&path, b"{ not json").unwrap();
7746 let baseline = load_drift_baseline(&FakeMode::Real, &path);
7747 assert!(
7748 matches!(baseline, DriftBaseline::Corrupted(_)),
7749 "corrupt snapshot must map to DriftBaseline::Corrupted, got: {baseline:?}"
7750 );
7751 }
7752
7753 fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
7760 djogi::migrate::BucketKey {
7761 database: database.to_string(),
7762 app: app.to_string(),
7763 }
7764 }
7765
7766 fn diag(
7768 code: &str,
7769 severity: djogi::migrate::VerifySeverity,
7770 message: &str,
7771 location: Option<&str>,
7772 ) -> djogi::migrate::VerifyDiagnostic {
7773 djogi::migrate::VerifyDiagnostic {
7774 code: code.to_string(),
7775 severity,
7776 message: message.to_string(),
7777 location: location.map(str::to_string),
7778 }
7779 }
7780
7781 #[test]
7782 fn render_verify_report_clean_output() {
7783 use djogi::migrate::VerifyReport;
7784
7785 let report = VerifyReport {
7786 diagnostics: vec![],
7787 latest_applied_version: Some("001_initial".to_string()),
7788 applied_count: 3,
7789 unfinished_count: 0,
7790 };
7791 let bucket = render_bucket("main", "");
7792
7793 let lines = render_verify_report(&report, &bucket);
7794
7795 assert!(
7796 lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
7797 "missing ledger line; got {lines:?}"
7798 );
7799 assert!(
7800 lines.contains(&"No drift detected. Schema is consistent.".to_string()),
7801 "missing clean line; got {lines:?}"
7802 );
7803 assert!(
7804 lines.iter().any(|l| l.contains("Result: PASSED")),
7805 "missing PASSED result; got {lines:?}"
7806 );
7807 assert!(
7808 !lines.iter().any(|l| l.contains("FAILED")),
7809 "clean report must not say FAILED; got {lines:?}"
7810 );
7811 }
7812
7813 #[test]
7814 fn render_verify_report_with_errors() {
7815 use djogi::migrate::{VerifyReport, VerifySeverity};
7816
7817 let report = VerifyReport {
7820 diagnostics: vec![
7821 diag(
7822 "D601",
7823 VerifySeverity::Error,
7824 "Snapshot table missing from live DB",
7825 Some("users"),
7826 ),
7827 diag(
7828 "D611",
7829 VerifySeverity::Warning,
7830 "Live index not present in snapshot",
7831 Some("idx_posts_created"),
7832 ),
7833 ],
7834 latest_applied_version: Some("V20260501000000__add_users".to_string()),
7835 applied_count: 2,
7836 unfinished_count: 0,
7837 };
7838 let bucket = render_bucket("main", "myapp");
7839
7840 assert!(report.has_errors());
7841 let lines = render_verify_report(&report, &bucket);
7842
7843 assert!(
7844 lines
7845 .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
7846 "missing D601 line; got {lines:?}"
7847 );
7848 assert!(
7849 lines.contains(
7850 &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
7851 ),
7852 "missing D611 line; got {lines:?}"
7853 );
7854 assert!(
7855 lines.iter().any(|l| l.contains("Result: FAILED")),
7856 "error report must say FAILED; got {lines:?}"
7857 );
7858 }
7859
7860 #[test]
7861 fn render_verify_report_header_shows_global_and_named_app() {
7862 use djogi::migrate::VerifyReport;
7863
7864 let report = VerifyReport {
7865 diagnostics: vec![],
7866 latest_applied_version: None,
7867 applied_count: 0,
7868 unfinished_count: 0,
7869 };
7870
7871 let global = render_verify_report(&report, &render_bucket("main", ""));
7873 assert_eq!(
7874 global.first().map(String::as_str),
7875 Some("djogi migrations verify — main/_global_"),
7876 "global bucket header; got {global:?}"
7877 );
7878
7879 let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
7881 assert_eq!(
7882 named.first().map(String::as_str),
7883 Some("djogi migrations verify — crud_log/billing"),
7884 "named bucket header; got {named:?}"
7885 );
7886 }
7887
7888 #[test]
7889 fn render_verify_report_warning_only_passes_with_warnings() {
7890 use djogi::migrate::{VerifyReport, VerifySeverity};
7891
7892 let report = VerifyReport {
7893 diagnostics: vec![diag(
7894 "D606",
7895 VerifySeverity::Warning,
7896 "type differs (advisory)",
7897 Some("users.age"),
7898 )],
7899 latest_applied_version: Some("001_initial".to_string()),
7900 applied_count: 1,
7901 unfinished_count: 0,
7902 };
7903 let lines = render_verify_report(&report, &render_bucket("main", ""));
7904
7905 assert!(
7906 lines
7907 .iter()
7908 .any(|l| l.contains("Result: PASSED with warnings")),
7909 "warning-only must PASS with warnings; got {lines:?}"
7910 );
7911 assert!(
7912 !lines.iter().any(|l| l.contains("FAILED")),
7913 "warning-only must not say FAILED; got {lines:?}"
7914 );
7915 }
7916
7917 #[test]
7918 fn render_verify_report_empty_ledger_line() {
7919 use djogi::migrate::VerifyReport;
7920
7921 let report = VerifyReport {
7922 diagnostics: vec![],
7923 latest_applied_version: None,
7924 applied_count: 0,
7925 unfinished_count: 0,
7926 };
7927 let lines = render_verify_report(&report, &render_bucket("main", ""));
7928
7929 assert!(
7930 lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
7931 "empty ledger line; got {lines:?}"
7932 );
7933 }
7934
7935 #[test]
7936 fn render_verify_report_unfinished_ledger_line() {
7937 use djogi::migrate::VerifyReport;
7938
7939 let report = VerifyReport {
7940 diagnostics: vec![],
7941 latest_applied_version: Some("V20260501000000__add_users".to_string()),
7942 applied_count: 2,
7943 unfinished_count: 1,
7944 };
7945 let lines = render_verify_report(&report, &render_bucket("main", ""));
7946
7947 assert!(
7948 lines.contains(
7949 &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
7950 ),
7951 "unfinished ledger line; got {lines:?}"
7952 );
7953 }
7954
7955 #[test]
7956 fn render_verify_report_info_with_no_location_uses_dash() {
7957 use djogi::migrate::{VerifyReport, VerifySeverity};
7958
7959 let report = VerifyReport {
7962 diagnostics: vec![diag(
7963 "D692",
7964 VerifySeverity::Info,
7965 "enum type(s) declared; not yet checked",
7966 None,
7967 )],
7968 latest_applied_version: Some("001_initial".to_string()),
7969 applied_count: 1,
7970 unfinished_count: 0,
7971 };
7972 let lines = render_verify_report(&report, &render_bucket("main", ""));
7973
7974 assert!(
7975 lines.iter().any(|l| l.contains("(-)")),
7976 "location: None must render as (-); got {lines:?}"
7977 );
7978 assert!(
7979 lines.contains(&"Result: PASSED (1 info(s))".to_string()),
7980 "all-info summary; got {lines:?}"
7981 );
7982 }
7983
7984 #[test]
7985 fn render_drift_refusal_appends_next_steps_trailer() {
7986 use djogi::migrate::{VerifyReport, VerifySeverity};
7987
7988 let report = VerifyReport {
7989 diagnostics: vec![diag(
7990 "D601",
7991 VerifySeverity::Error,
7992 "Snapshot table missing from live DB",
7993 Some("billing.invoices"),
7994 )],
7995 latest_applied_version: Some("V20260601000000__billing".to_string()),
7996 applied_count: 2,
7997 unfinished_count: 0,
7998 };
7999 let lines = render_drift_refusal(&render_bucket("main", "billing"), &report);
8000
8001 assert!(
8002 lines
8003 .iter()
8004 .any(|line| line.contains("Apply refused before any migration SQL ran")),
8005 "missing refusal trailer: {lines:?}"
8006 );
8007 assert!(
8008 lines.iter().any(|l| l.contains("djogi migrations verify")),
8009 "missing verify guidance: {lines:?}"
8010 );
8011 assert!(
8012 lines.iter().any(|l| l.contains("djogi migrations attune")),
8013 "missing attune guidance: {lines:?}"
8014 );
8015 assert!(
8016 lines.iter().any(|l| l.contains("repair resume-partial")),
8017 "missing resume-partial guidance: {lines:?}"
8018 );
8019 assert!(
8020 !lines.iter().any(|l| l.contains("repair snapshot-rebuild")),
8021 "DriftDetected trailer must not mention snapshot-rebuild (that is for DriftBaselineMissing): {lines:?}"
8022 );
8023 }
8024
8025 fn db_config(
8028 url: &str,
8029 crud_log_url: Option<&str>,
8030 event_log_url: Option<&str>,
8031 ) -> djogi::config::DatabaseConfig {
8032 djogi::config::DatabaseConfig {
8033 url: url.to_string(),
8034 crud_log_url: crud_log_url.map(str::to_string),
8035 event_log_url: event_log_url.map(str::to_string),
8036 max_connections: None,
8037 dev_mode: false,
8038 }
8039 }
8040
8041 #[test]
8042 fn resolve_bucket_url_main_uses_app_url_verbatim() {
8043 let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
8047 assert_eq!(
8048 resolve_bucket_url(&cfg, "main").as_deref(),
8049 Some("postgres://user:pass@localhost:5432/myapp_prod"),
8050 "main must return the app URL unchanged"
8051 );
8052 }
8053
8054 #[test]
8055 fn resolve_bucket_url_crud_log_prefers_explicit_url() {
8056 let cfg = db_config(
8057 "postgres://localhost/main",
8058 Some("postgres://localhost/explicit_crud"),
8059 None,
8060 );
8061 assert_eq!(
8062 resolve_bucket_url(&cfg, "crud_log").as_deref(),
8063 Some("postgres://localhost/explicit_crud"),
8064 "crud_log must prefer the explicit crud_log_url"
8065 );
8066 }
8067
8068 #[test]
8069 fn resolve_bucket_url_event_log_prefers_explicit_url() {
8070 let cfg = db_config(
8071 "postgres://localhost/main",
8072 None,
8073 Some("postgres://localhost/explicit_event"),
8074 );
8075 assert_eq!(
8076 resolve_bucket_url(&cfg, "event_log").as_deref(),
8077 Some("postgres://localhost/explicit_event"),
8078 "event_log must prefer the explicit event_log_url"
8079 );
8080 }
8081
8082 #[test]
8083 fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
8084 let cfg = db_config("postgres://localhost/main", Some(""), Some(" "));
8087 assert_eq!(
8089 resolve_bucket_url(&cfg, "crud_log").as_deref(),
8090 Some("postgres://localhost/crud_log"),
8091 "empty crud_log_url must fall back to derived"
8092 );
8093 assert_eq!(
8096 resolve_bucket_url(&cfg, "event_log").as_deref(),
8097 Some(" "),
8098 "non-empty (whitespace) event_log_url is used verbatim"
8099 );
8100 }
8101
8102 #[test]
8103 fn resolve_bucket_url_other_database_derives_from_app_url() {
8104 let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
8105 assert_eq!(
8106 resolve_bucket_url(&cfg, "analytics").as_deref(),
8107 Some("postgres://user:pass@localhost:5432/analytics"),
8108 "an arbitrary database name derives by path splice"
8109 );
8110 }
8111
8112 #[test]
8113 fn resolve_bucket_url_pathless_url_returns_none() {
8114 let cfg = db_config("postgres://localhost", None, None);
8116 assert_eq!(
8117 resolve_bucket_url(&cfg, "crud_log"),
8118 None,
8119 "pathless URL must yield None for a derived database"
8120 );
8121 }
8122
8123 #[test]
8124 fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
8125 let cfg = db_config("postgres://localhost", None, None);
8128 assert_eq!(
8129 resolve_bucket_url(&cfg, "main").as_deref(),
8130 Some("postgres://localhost"),
8131 "main returns the app URL verbatim regardless of path"
8132 );
8133 }
8134
8135 #[test]
8136 fn resolve_apply_target_urls_uses_pending_bucket_databases() {
8137 let work = temp_workspace("apply_target_urls");
8138 write_pending_json(
8139 &djogi::migrate::pending_json_path(
8140 &work,
8141 &BucketKey {
8142 database: "main".to_string(),
8143 app: String::new(),
8144 },
8145 ),
8146 "main",
8147 "",
8148 "V20260607010101__main_global",
8149 &[],
8150 );
8151 write_pending_json(
8152 &djogi::migrate::pending_json_path(
8153 &work,
8154 &BucketKey {
8155 database: "crud_log".to_string(),
8156 app: "audit".to_string(),
8157 },
8158 ),
8159 "crud_log",
8160 "audit",
8161 "V20260607010102__crud_log_audit",
8162 &[],
8163 );
8164
8165 let discovered = discover_pending_plans(&work).expect("discover");
8166 let cfg = db_config(
8167 "postgres://user:pass@localhost:5432/myapp_prod",
8168 Some("postgres://user:pass@localhost:5432/myapp_crud"),
8169 None,
8170 );
8171
8172 let urls = resolve_apply_target_urls(&discovered, &cfg).expect("resolve");
8173 assert_eq!(
8174 urls.len(),
8175 2,
8176 "apply must preserve distinct target databases"
8177 );
8178 assert_eq!(
8179 urls.get("main").map(String::as_str),
8180 Some("postgres://user:pass@localhost:5432/myapp_prod"),
8181 "main pending plans must keep the app database URL"
8182 );
8183 assert_eq!(
8184 urls.get("crud_log").map(String::as_str),
8185 Some("postgres://user:pass@localhost:5432/myapp_crud"),
8186 "crud_log pending plans must route through the crud_log database URL"
8187 );
8188 let _ = fs::remove_dir_all(&work);
8189 }
8190
8191 #[test]
8192 fn resolve_apply_target_urls_refuses_unresolvable_pending_database() {
8193 let work = temp_workspace("apply_target_urls_unresolvable");
8194 write_pending_json(
8195 &djogi::migrate::pending_json_path(
8196 &work,
8197 &BucketKey {
8198 database: "analytics".to_string(),
8199 app: String::new(),
8200 },
8201 ),
8202 "analytics",
8203 "",
8204 "V20260607010103__analytics_global",
8205 &[],
8206 );
8207
8208 let discovered = discover_pending_plans(&work).expect("discover");
8209 let cfg = db_config("postgres://localhost", None, None);
8210 let err = resolve_apply_target_urls(&discovered, &cfg)
8211 .expect_err("pathless app URL must refuse a derived pending database");
8212 assert!(err.contains("analytics"), "unexpected error: {err}");
8213 let _ = fs::remove_dir_all(&work);
8214 }
8215
8216 #[test]
8219 fn classify_phase_zero_bytes_identity_free_production_is_ok() {
8220 let sql = current_production_phase_zero_sql("current_bytes");
8221 assert!(
8222 classify_phase_zero_bytes(sql.as_bytes()).is_none(),
8223 "production Phase 0 should be identity-free replay-current (no refusal)"
8224 );
8225 }
8226
8227 #[test]
8228 fn classify_phase_zero_bytes_seed_capable_is_refused() {
8229 let sql = seed_capable_phase_zero_sql();
8230 let refusal = classify_phase_zero_bytes(sql.as_bytes());
8231 assert!(
8232 refusal.is_some(),
8233 "seed-capable Phase 0 should be refused by cleanup guard"
8234 );
8235 assert!(refusal.unwrap().contains("seed-capable"));
8236 }
8237
8238 #[test]
8239 fn classify_phase_zero_bytes_generated_stale_is_refused() {
8240 let sql = generated_stale_phase_zero_sql("stale_bytes");
8241 let refusal = classify_phase_zero_bytes(sql.as_bytes());
8242 assert!(
8243 refusal.is_some(),
8244 "generated-stale Phase 0 should be refused"
8245 );
8246 assert!(refusal.unwrap().contains("generated-stale"));
8247 }
8248
8249 #[test]
8250 fn classify_phase_zero_bytes_markerless_seed_is_refused() {
8251 let sql = markerless_seed_phase_zero_sql("markerless_seed_bytes");
8252 let refusal = classify_phase_zero_bytes(sql.as_bytes());
8253 assert!(
8254 refusal.is_some(),
8255 "markerless seed Phase 0 should be refused by cleanup guard"
8256 );
8257 assert!(refusal.unwrap().contains("seed-dml"));
8258 }
8259
8260 #[test]
8261 fn classify_phase_zero_bytes_extended_seed_dml_forms_are_refused() {
8262 for (name, statement) in extended_seed_statement_cases() {
8263 let sql =
8264 phase_zero_with_seed_statement(&format!("extended_seed_bytes_{name}"), statement);
8265 let refusal = classify_phase_zero_bytes(sql.as_bytes());
8266 let msg = refusal.expect("extended seed Phase 0 should be refused");
8267 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
8268 }
8269 }
8270
8271 #[test]
8272 fn classify_phase_zero_bytes_ambiguous_is_refused() {
8273 let sql = "CREATE SCHEMA IF NOT EXISTS heer;\n\
8275 ALTER DATABASE \"mydb\" SET heer.node_id = '1';\n";
8276 let refusal = classify_phase_zero_bytes(sql.as_bytes());
8277 assert!(refusal.is_some(), "ambiguous Phase 0 should be refused");
8278 assert!(refusal.unwrap().contains("ambiguous"));
8279 }
8280
8281 #[test]
8282 fn classify_phase_zero_bytes_missing_is_refused() {
8283 let refusal = classify_phase_zero_bytes(b" \n\t ");
8284 assert!(refusal.is_some(), "missing Phase 0 should be refused");
8285 assert!(refusal.unwrap().contains("missing"));
8286 }
8287
8288 #[test]
8289 fn classify_phase_zero_for_cleanup_refuses_stale_replay_plan() {
8290 let work = temp_workspace("stale_cleanup");
8291 let bucket_dir = work.join("migrations/main/_global_");
8292 fs::create_dir_all(&bucket_dir).unwrap();
8293
8294 let replay = CliReplayPlan {
8296 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
8297 classification: CliClassification::Additive,
8298 checksum_up: "V1:aabbccdd".to_string(),
8299 checksum_down: None,
8300 segments: vec![CliReplaySegment {
8301 kind: CliSegmentKind::Transactional,
8302 statements: vec![CliReplayStatement {
8303 label: "phase_zero_bootstrap".to_string(),
8304 up: generated_stale_phase_zero_sql("stale_replay"),
8305 }],
8306 }],
8307 };
8308 fs::write(
8309 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
8310 serde_json::to_string(&replay).unwrap(),
8311 )
8312 .unwrap();
8313
8314 let bucket = djogi::migrate::BucketKey {
8315 database: "main".to_string(),
8316 app: String::new(),
8317 };
8318 let refusal = classify_phase_zero_for_cleanup(
8319 &work,
8320 &bucket,
8321 djogi::migrate::PHASE_ZERO_VERSION,
8322 "V1:aabbccdd",
8323 None,
8324 );
8325 assert!(
8326 refusal.is_some(),
8327 "stale Phase 0 replay plan should be refused by cleanup guard"
8328 );
8329 let msg = refusal.unwrap();
8330 assert!(msg.contains("generated-stale"), "refusal reason: {msg}");
8331
8332 let _ = fs::remove_dir_all(&work);
8333 }
8334
8335 #[test]
8336 fn classify_phase_zero_for_cleanup_allows_current_replay_plan() {
8337 let work = temp_workspace("current_cleanup");
8338 let bucket_dir = work.join("migrations/main/_global_");
8339 fs::create_dir_all(&bucket_dir).unwrap();
8340
8341 let replay = CliReplayPlan {
8343 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
8344 classification: CliClassification::Additive,
8345 checksum_up: "V1:eeff0011".to_string(),
8346 checksum_down: None,
8347 segments: vec![CliReplaySegment {
8348 kind: CliSegmentKind::Transactional,
8349 statements: vec![CliReplayStatement {
8350 label: "phase_zero_bootstrap".to_string(),
8351 up: current_production_phase_zero_sql("current_replay"),
8352 }],
8353 }],
8354 };
8355 fs::write(
8356 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
8357 serde_json::to_string(&replay).unwrap(),
8358 )
8359 .unwrap();
8360
8361 let bucket = djogi::migrate::BucketKey {
8362 database: "main".to_string(),
8363 app: String::new(),
8364 };
8365 let refusal = classify_phase_zero_for_cleanup(
8366 &work,
8367 &bucket,
8368 djogi::migrate::PHASE_ZERO_VERSION,
8369 "V1:eeff0011",
8370 None,
8371 );
8372 assert!(
8373 refusal.is_none(),
8374 "identity-free Phase 0 should be allowed by cleanup guard; got: {refusal:?}"
8375 );
8376
8377 let _ = fs::remove_dir_all(&work);
8378 }
8379
8380 #[test]
8381 fn classify_phase_zero_for_cleanup_refuses_seed_capable_replay_plan() {
8382 let work = temp_workspace("seed_cleanup_replay_plan");
8383 let bucket_dir = work.join("migrations/main/_global_");
8384 fs::create_dir_all(&bucket_dir).unwrap();
8385
8386 let replay = CliReplayPlan {
8387 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
8388 classification: CliClassification::Additive,
8389 checksum_up: "V1:11223344".to_string(),
8390 checksum_down: None,
8391 segments: vec![CliReplaySegment {
8392 kind: CliSegmentKind::Transactional,
8393 statements: vec![CliReplayStatement {
8394 label: "phase_zero_bootstrap".to_string(),
8395 up: seed_capable_phase_zero_sql(),
8396 }],
8397 }],
8398 };
8399 fs::write(
8400 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
8401 serde_json::to_string(&replay).unwrap(),
8402 )
8403 .unwrap();
8404
8405 let bucket = djogi::migrate::BucketKey {
8406 database: "main".to_string(),
8407 app: String::new(),
8408 };
8409 let refusal = classify_phase_zero_for_cleanup(
8410 &work,
8411 &bucket,
8412 djogi::migrate::PHASE_ZERO_VERSION,
8413 "V1:11223344",
8414 None,
8415 );
8416 let msg = refusal.expect("seed-capable replay plan must refuse");
8417 assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
8418
8419 let _ = fs::remove_dir_all(&work);
8420 }
8421
8422 #[test]
8423 fn classify_phase_zero_for_cleanup_refuses_markerless_seed_replay_plan() {
8424 let work = temp_workspace("markerless_seed_cleanup_replay_plan");
8425 let bucket_dir = work.join("migrations/main/_global_");
8426 fs::create_dir_all(&bucket_dir).unwrap();
8427
8428 let replay = CliReplayPlan {
8429 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
8430 classification: CliClassification::Additive,
8431 checksum_up: "V1:55667788".to_string(),
8432 checksum_down: None,
8433 segments: vec![CliReplaySegment {
8434 kind: CliSegmentKind::Transactional,
8435 statements: vec![CliReplayStatement {
8436 label: "phase_zero_bootstrap".to_string(),
8437 up: markerless_seed_phase_zero_sql("markerless_seed_replay"),
8438 }],
8439 }],
8440 };
8441 fs::write(
8442 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
8443 serde_json::to_string(&replay).unwrap(),
8444 )
8445 .unwrap();
8446
8447 let bucket = djogi::migrate::BucketKey {
8448 database: "main".to_string(),
8449 app: String::new(),
8450 };
8451 let refusal = classify_phase_zero_for_cleanup(
8452 &work,
8453 &bucket,
8454 djogi::migrate::PHASE_ZERO_VERSION,
8455 "V1:55667788",
8456 None,
8457 );
8458 let msg = refusal.expect("markerless seed replay plan must refuse");
8459 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
8460
8461 let _ = fs::remove_dir_all(&work);
8462 }
8463
8464 #[test]
8465 fn classify_phase_zero_for_cleanup_refuses_cte_seed_dml_replay_plan() {
8466 let work = temp_workspace("cte_seed_cleanup_replay_plan");
8467 let bucket_dir = work.join("migrations/main/_global_");
8468 fs::create_dir_all(&bucket_dir).unwrap();
8469
8470 let replay = CliReplayPlan {
8471 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
8472 classification: CliClassification::Additive,
8473 checksum_up: "V1:66778899".to_string(),
8474 checksum_down: None,
8475 segments: vec![CliReplaySegment {
8476 kind: CliSegmentKind::Transactional,
8477 statements: vec![CliReplayStatement {
8478 label: "phase_zero_bootstrap".to_string(),
8479 up: phase_zero_with_seed_statement(
8480 "cte_seed_cleanup_replay",
8481 "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
8482 ),
8483 }],
8484 }],
8485 };
8486 fs::write(
8487 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
8488 serde_json::to_string(&replay).unwrap(),
8489 )
8490 .unwrap();
8491
8492 let bucket = djogi::migrate::BucketKey {
8493 database: "main".to_string(),
8494 app: String::new(),
8495 };
8496 let refusal = classify_phase_zero_for_cleanup(
8497 &work,
8498 &bucket,
8499 djogi::migrate::PHASE_ZERO_VERSION,
8500 "V1:66778899",
8501 None,
8502 );
8503 let msg = refusal.expect("CTE seed replay plan must refuse");
8504 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
8505
8506 let _ = fs::remove_dir_all(&work);
8507 }
8508
8509 #[test]
8510 fn classify_phase_zero_for_cleanup_fallback_sql_file() {
8511 let work = temp_workspace("fallback_cleanup");
8512 let bucket_dir = work.join("migrations/main/_global_");
8513 fs::create_dir_all(&bucket_dir).unwrap();
8514
8515 let up_sql = current_production_phase_zero_sql("fallback_sql");
8516 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
8517 fs::write(bucket_dir.join(&up_filename), up_sql).unwrap();
8518
8519 let bucket = djogi::migrate::BucketKey {
8520 database: "main".to_string(),
8521 app: String::new(),
8522 };
8523 let refusal = classify_phase_zero_for_cleanup(
8524 &work,
8525 &bucket,
8526 djogi::migrate::PHASE_ZERO_VERSION,
8527 "V1:anychecksum",
8528 None,
8529 );
8530 assert!(
8531 refusal.is_none(),
8532 "identity-free Phase 0 fallback SQL should be allowed; got: {refusal:?}"
8533 );
8534
8535 let _ = fs::remove_dir_all(&work);
8536 }
8537
8538 #[test]
8539 fn classify_phase_zero_for_cleanup_refuses_seed_capable_fallback_sql_file() {
8540 let work = temp_workspace("seed_cleanup_fallback");
8541 let bucket_dir = work.join("migrations/main/_global_");
8542 fs::create_dir_all(&bucket_dir).unwrap();
8543
8544 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
8545 fs::write(bucket_dir.join(&up_filename), seed_capable_phase_zero_sql()).unwrap();
8546
8547 let bucket = djogi::migrate::BucketKey {
8548 database: "main".to_string(),
8549 app: String::new(),
8550 };
8551 let refusal = classify_phase_zero_for_cleanup(
8552 &work,
8553 &bucket,
8554 djogi::migrate::PHASE_ZERO_VERSION,
8555 "V1:anychecksum",
8556 None,
8557 );
8558 let msg = refusal.expect("seed-capable fallback SQL must refuse");
8559 assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
8560
8561 let _ = fs::remove_dir_all(&work);
8562 }
8563
8564 #[test]
8565 fn classify_phase_zero_for_cleanup_refuses_markerless_seed_fallback_sql_file() {
8566 let work = temp_workspace("markerless_seed_cleanup_fallback");
8567 let bucket_dir = work.join("migrations/main/_global_");
8568 fs::create_dir_all(&bucket_dir).unwrap();
8569
8570 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
8571 fs::write(
8572 bucket_dir.join(&up_filename),
8573 markerless_seed_phase_zero_sql("markerless_seed_fallback"),
8574 )
8575 .unwrap();
8576
8577 let bucket = djogi::migrate::BucketKey {
8578 database: "main".to_string(),
8579 app: String::new(),
8580 };
8581 let refusal = classify_phase_zero_for_cleanup(
8582 &work,
8583 &bucket,
8584 djogi::migrate::PHASE_ZERO_VERSION,
8585 "V1:anychecksum",
8586 None,
8587 );
8588 let msg = refusal.expect("markerless seed fallback SQL must refuse");
8589 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
8590
8591 let _ = fs::remove_dir_all(&work);
8592 }
8593
8594 #[test]
8595 fn classify_phase_zero_for_cleanup_refuses_copy_from_seed_fallback_sql_file() {
8596 let work = temp_workspace("copy_seed_cleanup_fallback");
8597 let bucket_dir = work.join("migrations/main/_global_");
8598 fs::create_dir_all(&bucket_dir).unwrap();
8599
8600 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
8601 fs::write(
8602 bucket_dir.join(&up_filename),
8603 phase_zero_with_seed_statement(
8604 "copy_seed_cleanup_fallback",
8605 "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
8606 ),
8607 )
8608 .unwrap();
8609
8610 let bucket = djogi::migrate::BucketKey {
8611 database: "main".to_string(),
8612 app: String::new(),
8613 };
8614 let refusal = classify_phase_zero_for_cleanup(
8615 &work,
8616 &bucket,
8617 djogi::migrate::PHASE_ZERO_VERSION,
8618 "V1:anychecksum",
8619 None,
8620 );
8621 let msg = refusal.expect("COPY FROM seed fallback SQL must refuse");
8622 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
8623
8624 let _ = fs::remove_dir_all(&work);
8625 }
8626
8627 #[djogi::djogi_test]
8630 async fn check_ledger_state_is_app_scoped(mut ctx: djogi::context::DjogiContext) {
8631 use djogi::migrate::{ExecutionMode, LedgerRow, LedgerStatus};
8632
8633 djogi::migrate::bootstrap_ledger(&mut ctx)
8635 .await
8636 .expect("bootstrap");
8637
8638 let row = LedgerRow {
8640 version: "V20260609000000__t397".into(),
8641 description: "test migration".into(),
8642 checksum_up: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
8643 checksum_down: Some(
8644 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".into(),
8645 ),
8646 execution_mode: ExecutionMode::Transactional,
8647 status: LedgerStatus::Pending,
8648 execution_time_ms: 0,
8649 out_of_order_flag: false,
8650 applied_steps_count: 0,
8651 total_steps: None,
8652 partial_apply_note: None,
8653 run_id: 1,
8654 snapshot_version: "0".into(),
8655 app_label: "users".into(),
8656 leaf_identity: None,
8657 };
8658 let ledger_id = djogi::migrate::insert_pending_ledger_row(&mut ctx, &row)
8659 .await
8660 .expect("insert pending");
8661 djogi::migrate::mark_ledger_applied(&mut ctx, ledger_id, 10, 1)
8662 .await
8663 .expect("mark applied");
8664
8665 let state = check_ledger_state(&mut ctx, "V20260609000000__t397", "system").await;
8667 assert!(
8668 matches!(state, LedgerState::NotPresent),
8669 "different app stream must be NotPresent, got {state:?}",
8670 );
8671
8672 let state = check_ledger_state(&mut ctx, "V20260609000000__t397", "users").await;
8674 assert!(
8675 matches!(state, LedgerState::AlreadyApplied),
8676 "same app stream must be AlreadyApplied, got {state:?}",
8677 );
8678 }
8679}