1use std::path::{Path, PathBuf};
10use std::process::ExitCode;
11
12use djogi::apps::AppRegistry;
13use djogi::migrate::{
14 AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
15 DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
16 RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
17 VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
18 compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
19 repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
20};
21
22use djogi::migrate::LedgerStatus;
24
25use crate::{PartialApplyResolutionCli, RepairSubcommand};
28
29#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
37struct CliReplayPlan {
38 format_version: String,
39 checksum_up: String,
40 checksum_down: Option<String>,
41 classification: CliClassification,
42 segments: Vec<CliReplaySegment>,
43}
44
45#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
46#[serde(tag = "kind", rename_all = "snake_case")]
47enum CliClassification {
48 NoOp,
49 Additive,
50 Reversible,
51 Destructive,
52 Lossy,
53 Unsupported {
54 reason: String,
55 },
56 PkTypeFlip {
57 co_destructive: bool,
58 co_lossy: bool,
59 },
60}
61
62#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
63struct CliReplaySegment {
64 kind: CliSegmentKind,
65 statements: Vec<CliReplayStatement>,
66}
67
68#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
69#[serde(rename_all = "snake_case")]
70enum CliSegmentKind {
71 Transactional,
72 NonTransactional,
73 MetadataOnly,
74}
75
76#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
77struct CliReplayStatement {
78 label: String,
79 up: String,
80}
81
82const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
84
85fn load_replay_plan_from_disk(
91 workspace: &Path,
92 bucket: &djogi::migrate::BucketKey,
93 version: &str,
94 pending_checksum_up: &str,
95 pending_checksum_down: Option<&str>,
96) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
97 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
99 let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
100
101 if let Ok(bytes) = std::fs::read(&replay_plan_path) {
102 let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
103 Ok(s) => s,
104 Err(e) => {
105 return Err(ApplyReplayPlanError::Parse {
106 path: replay_plan_path.clone(),
107 source: e.to_string(),
108 });
109 }
110 };
111
112 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
113 return Err(ApplyReplayPlanError::FormatVersion {
114 found: stored.format_version,
115 path: replay_plan_path.clone(),
116 });
117 }
118
119 if stored.checksum_up != pending_checksum_up
121 || stored.checksum_down.as_deref() != pending_checksum_down
122 {
123 return Err(ApplyReplayPlanError::ChecksumMismatch);
124 }
125
126 let plan = djogi::migrate::MigrationPlan {
127 bucket: bucket.clone(),
128 classification: stored.classification.into(),
129 segments: stored
130 .segments
131 .into_iter()
132 .map(|seg| djogi::migrate::Segment {
133 kind: seg.kind.into(),
134 statements: seg
135 .statements
136 .into_iter()
137 .map(|stmt| djogi::migrate::OperationSql {
138 label: stmt.label,
139 up: stmt.up,
140 down: String::new(),
141 lossy: None,
142 })
143 .collect(),
144 })
145 .collect(),
146 };
147
148 return Ok((plan, stored.checksum_up, stored.checksum_down));
149 }
150
151 let up_filename = djogi::migrate::up_filename(version);
153 let down_filename = djogi::migrate::down_filename(version);
154 let up_path = bucket_dir.join(&up_filename);
155 let down_path = bucket_dir.join(&down_filename);
156
157 let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
158 path: up_path.clone(),
159 source: e.to_string(),
160 })?;
161
162 let down_sql = match std::fs::read_to_string(&down_path) {
163 Ok(sql) => sql,
164 Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
165 Err(e) => {
166 return Err(ApplyReplayPlanError::SqlRead {
167 path: down_path.clone(),
168 source: e.to_string(),
169 });
170 }
171 };
172
173 let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
177
178 let plan = djogi::migrate::MigrationPlan {
182 bucket: bucket.clone(),
183 classification: djogi::migrate::Classification::Additive,
184 segments: vec![djogi::migrate::Segment {
185 kind: djogi::migrate::SegmentKind::Transactional,
186 statements: vec![djogi::migrate::OperationSql {
187 label: format!("replay {version}"),
188 up: up_sql,
189 down: down_sql,
190 lossy: None,
191 }],
192 }],
193 };
194
195 Ok((plan, computed_checksum_up, None))
196}
197
198#[derive(Debug)]
200enum ApplyReplayPlanError {
201 Parse { path: PathBuf, source: String },
202 FormatVersion { found: String, path: PathBuf },
203 ChecksumMismatch,
204 SqlRead { path: PathBuf, source: String },
205}
206
207impl std::fmt::Display for ApplyReplayPlanError {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 match self {
210 Self::Parse { path, source } => {
211 write!(f, "parse replay plan {}: {source}", path.display())
212 }
213 Self::FormatVersion { found, path } => write!(
214 f,
215 "replay plan format version mismatch in {}: expected {}, found {}",
216 path.display(),
217 CLI_REPLAY_PLAN_FORMAT_VERSION,
218 found
219 ),
220 Self::ChecksumMismatch => {
221 write!(f, "checksum mismatch between pending JSON and replay plan")
222 }
223 Self::SqlRead { path, source } => {
224 write!(f, "read SQL file {}: {source}", path.display())
225 }
226 }
227 }
228}
229
230impl std::error::Error for ApplyReplayPlanError {}
231
232fn classify_phase_zero_for_cleanup(
239 workspace: &Path,
240 bucket: &djogi::migrate::BucketKey,
241 version: &str,
242 pending_checksum_up: &str,
243 pending_checksum_down: Option<&str>,
244) -> Option<String> {
245 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
247 let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
248
249 if let Ok(bytes) = std::fs::read(&replay_plan_path) {
250 let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
251 Ok(s) => s,
252 Err(e) => {
253 return Some(format!("parse replay plan: {e}"));
254 }
255 };
256
257 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
258 return Some(format!(
259 "replay plan format version mismatch: expected {}, found {}",
260 CLI_REPLAY_PLAN_FORMAT_VERSION, stored.format_version
261 ));
262 }
263
264 if stored.checksum_up != pending_checksum_up
266 || stored.checksum_down.as_deref() != pending_checksum_down
267 {
268 return Some("checksum mismatch between pending JSON and replay plan".to_string());
269 }
270
271 let up_sql: String = stored
273 .segments
274 .iter()
275 .flat_map(|seg| seg.statements.iter())
276 .map(|stmt| stmt.up.as_str())
277 .collect::<Vec<&str>>()
278 .join("\n");
279
280 return classify_phase_zero_bytes(up_sql.as_bytes());
281 }
282
283 let up_filename = djogi::migrate::up_filename(version);
285 let up_path = bucket_dir.join(&up_filename);
286 match std::fs::read_to_string(&up_path) {
287 Ok(up_sql) => classify_phase_zero_bytes(up_sql.as_bytes()),
288 Err(e) => Some(format!("read up SQL file {}: {e}", up_path.display())),
289 }
290}
291
292fn classify_phase_zero_bytes(bytes: &[u8]) -> Option<String> {
295 match djogi::migrate::classify_phase_zero_artifact(bytes) {
296 djogi::migrate::PhaseZeroArtifactState::IdentityFreeCurrent => None,
297 djogi::migrate::PhaseZeroArtifactState::SeedCapableRuntimeCurrent => {
298 Some("seed-capable runtime-only artifact detected".to_string())
299 }
300 djogi::migrate::PhaseZeroArtifactState::SeedDmlNotRuntimeCurrent => {
301 Some("seed-dml non-runtime-current artifact detected".to_string())
302 }
303 djogi::migrate::PhaseZeroArtifactState::GeneratedStale => {
304 Some("generated-stale artifact detected".to_string())
305 }
306 djogi::migrate::PhaseZeroArtifactState::Ambiguous => {
307 Some("ambiguous or hand-edited artifact detected".to_string())
308 }
309 djogi::migrate::PhaseZeroArtifactState::Incomplete => {
310 Some("incomplete artifact (truncated generation)".to_string())
311 }
312 djogi::migrate::PhaseZeroArtifactState::Missing => Some("missing artifact".to_string()),
313 }
314}
315
316impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
319 fn from(kind: CliSegmentKind) -> Self {
320 match kind {
321 CliSegmentKind::Transactional => Self::Transactional,
322 CliSegmentKind::NonTransactional => Self::NonTransactional,
323 CliSegmentKind::MetadataOnly => Self::MetadataOnly,
324 }
325 }
326}
327
328impl From<CliClassification> for djogi::migrate::Classification {
329 fn from(classification: CliClassification) -> Self {
330 match classification {
331 CliClassification::NoOp => Self::NoOp,
332 CliClassification::Additive => Self::Additive,
333 CliClassification::Reversible => Self::Reversible,
334 CliClassification::Destructive => Self::Destructive,
335 CliClassification::Lossy => Self::Lossy,
336 CliClassification::Unsupported { reason } => Self::Unsupported { reason },
337 CliClassification::PkTypeFlip {
338 co_destructive,
339 co_lossy,
340 } => Self::PkTypeFlip {
341 co_destructive,
342 co_lossy,
343 },
344 }
345 }
346}
347
348fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
352 workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
353}
354
355fn discover_snapshot_buckets_on_disk(
367 workspace: &Path,
368) -> Vec<djogi::migrate::projection::BucketKey> {
369 let mut out = Vec::new();
370 let migrations_root = djogi::migrate::migrations_root(workspace);
371 let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
372 return out;
373 };
374 for db_entry in db_entries.flatten() {
375 let Ok(ft) = db_entry.file_type() else {
376 continue;
377 };
378 if !ft.is_dir() {
379 continue;
380 }
381 let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
382 continue;
383 };
384 let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
385 continue;
386 };
387 for app_entry in app_entries.flatten() {
388 let Ok(ft) = app_entry.file_type() else {
389 continue;
390 };
391 if !ft.is_dir() {
392 continue;
393 }
394 let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
395 continue;
396 };
397 let snap_path = app_entry.path().join("schema_snapshot.json");
398 if !snap_path.exists() {
399 continue;
400 }
401 let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
402 out.push(djogi::migrate::projection::BucketKey {
403 database: database.clone(),
404 app: label,
405 });
406 }
407 }
408 out
409}
410
411pub fn compose_cmd(
413 provider: &dyn DescriptorProvider,
414 name: &str,
415 allow_destructive: bool,
416 force_overwrite: bool,
417 workspace: Option<PathBuf>,
418) -> ExitCode {
419 let workspace = resolve_workspace(workspace);
420 let models = match project_from_provider(provider) {
421 Ok(m) => m,
422 Err(e) => {
423 eprintln!("djogi migrations compose: projection error: {e}");
424 return ExitCode::from(1);
425 }
426 };
427 let apps: Vec<AppLifecycle> = provider
428 .apps()
429 .iter()
430 .map(|d| AppLifecycle {
431 label: d.label.to_string(),
432 database: d.database.to_string(),
433 renamed_from: d.renamed_from.map(str::to_string),
434 tombstone: d.tombstone,
435 })
436 .collect();
437 let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
442 Ok(c) => c,
443 Err(e) => {
444 eprintln!("djogi migrations compose: config load: {e}");
445 return ExitCode::from(1);
446 }
447 };
448 let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
449 djogi_config.migrate.pk_flip_join_table_option,
450 );
451 compose_with_inputs(
452 &workspace,
453 name,
454 allow_destructive,
455 force_overwrite,
456 &models,
457 &apps,
458 time::OffsetDateTime::now_utc(),
459 Some(pk_flip_option),
460 )
461}
462
463#[allow(clippy::too_many_arguments)]
478fn compose_with_inputs(
479 workspace: &Path,
480 name: &str,
481 allow_destructive: bool,
482 force_overwrite: bool,
483 models: &std::collections::BTreeMap<
484 djogi::migrate::projection::BucketKey,
485 djogi::migrate::AppliedSchema,
486 >,
487 apps: &[AppLifecycle],
488 now: time::OffsetDateTime,
489 pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
490) -> ExitCode {
491 let lock_path = workspace.join(LOCK_FILE_NAME);
492 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
493 Ok(g) => g,
494 Err(e) => {
495 eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
496 return ExitCode::from(1);
497 }
498 };
499
500 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
508 models.keys().cloned().collect();
509 for bucket in discover_snapshot_buckets_on_disk(workspace) {
510 bucket_set.insert(bucket);
511 }
512
513 let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
514 for bucket in &bucket_set {
515 let path = djogi::migrate::snapshot_path(workspace, bucket);
516 match djogi::migrate::load_snapshot(&path) {
517 Ok(s) => {
518 snapshots.insert(bucket.clone(), s);
519 }
520 Err(djogi::migrate::SnapshotError::Io { source, .. })
521 if source.kind() == std::io::ErrorKind::NotFound =>
522 {
523 }
525 Err(e) => {
526 eprintln!(
527 "djogi migrations compose: snapshot load failed at {}: {e}",
528 path.display()
529 );
530 return ExitCode::from(1);
531 }
532 }
533 }
534
535 let req = ComposeRequest {
536 workspace_root: workspace,
537 models,
538 snapshots: &snapshots,
539 apps,
540 name,
541 allow_destructive,
542 force_overwrite,
543 now,
544 _guard: &guard,
545 pk_flip_join_table_option,
546 skip_phase_zero_auto_emit: false,
552 };
553 match compose(req) {
554 Ok(report) => {
555 for emit in &report.emitted_phase_zero {
559 let ext_summary = if emit.extensions.is_empty() {
560 "no extensions".to_string()
561 } else {
562 format!(
563 "extensions: {}",
564 emit.extensions
565 .iter()
566 .cloned()
567 .collect::<Vec<_>>()
568 .join(", ")
569 )
570 };
571 println!(
572 "auto-emitted bootstrap migration: {database}/_global_ ({ext_summary})",
573 database = emit.database,
574 );
575 }
576 for cb in &report.composed_buckets {
577 println!(
578 "composed {database}/{app}: {version} ({classification:?})",
579 database = cb.bucket.database,
580 app = if cb.bucket.app.is_empty() {
581 "_global_"
582 } else {
583 cb.bucket.app.as_str()
584 },
585 version = cb.version,
586 classification = cb.classification,
587 );
588 }
589 for bucket in &report.converged_snapshot_buckets {
590 println!(
591 "snapshot converged: {database}/{app} — snapshot updated to scoped enum set, no migration needed",
592 database = bucket.database,
593 app = if bucket.app.is_empty() {
594 "_global_"
595 } else {
596 bucket.app.as_str()
597 },
598 );
599 }
600 ExitCode::from(0)
601 }
602 Err(ComposeError::NothingToCompose) => {
603 println!("nothing to compose — model state matches snapshot for every bucket");
604 ExitCode::from(0)
608 }
609 Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
610 eprintln!("djogi migrations compose: {text}");
611 ExitCode::from(2)
613 }
614 Err(e @ ComposeError::CrossBucketForeignKeyCycle { .. }) => {
615 eprintln!("djogi migrations compose: {e}");
616 ExitCode::from(2)
621 }
622 Err(e) => {
623 eprintln!("djogi migrations compose: {e}");
624 ExitCode::from(1)
625 }
626 }
627}
628
629pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
634 let workspace = resolve_workspace(workspace);
635
636 let runtime = match tokio::runtime::Builder::new_current_thread()
638 .enable_all()
639 .build()
640 {
641 Ok(r) => r,
642 Err(e) => {
643 eprintln!("djogi migrations status: tokio runtime: {e}");
644 return ExitCode::from(1);
645 }
646 };
647
648 let exit = runtime.block_on(async { run_status(&workspace).await });
649 ExitCode::from(exit as u8)
650}
651
652async fn run_status(workspace: &Path) -> i32 {
661 use djogi::config::DjogiConfig;
662
663 let config = match DjogiConfig::load_from_workspace(workspace) {
664 Ok(c) => c,
665 Err(e) => {
666 eprintln!("djogi migrations status: config load: {e}");
667 return 1;
668 }
669 };
670
671 let mut ctx = match connect_and_check(&config.database.url).await {
672 ContextOutcome::Ready(ctx) => ctx,
673 ContextOutcome::UnsupportedVersion(e) => {
674 crate::print_support_boundary_error("migrations status", &e);
675 return 2;
676 }
677 ContextOutcome::RuntimeError(msg) => {
678 eprintln!("djogi migrations status: pool: {msg}");
679 return 1;
680 }
681 };
682
683 let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
684 Ok(rows) => rows,
685 Err(e) => {
686 if e.to_string().contains("djogi_schema_migrations") {
689 println!("No migrations recorded.");
690 return 0;
691 }
692 eprintln!("djogi migrations status: ledger read: {e}");
693 return 1;
694 }
695 };
696
697 let registered: Vec<String> = AppRegistry::all()
698 .iter()
699 .map(|d| d.label.to_string())
700 .collect();
701 let report = djogi::migrate::render_status(&rows, ®istered);
702 for line in &report.lines {
703 println!("{line}");
704 }
705 report.exit_code
706}
707
708#[allow(clippy::large_enum_variant)]
729enum ContextOutcome {
730 Ready(djogi::context::DjogiContext),
732 UnsupportedVersion(djogi::error::DjogiError),
735 RuntimeError(String),
738}
739
740async fn connect_and_check(url: &str) -> ContextOutcome {
748 let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
749 Ok(p) => p,
750 Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
751 };
752 match djogi::pg::preflight::check_postgres_version(&pool).await {
753 Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
754 Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
757 ContextOutcome::UnsupportedVersion(e)
758 }
759 Err(other) => ContextOutcome::RuntimeError(other.to_string()),
760 }
761}
762
763fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
783 if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
786 return Some(db_config.url.clone());
787 }
788 if database == "crud_log"
789 && let Some(u) = db_config.crud_log_url.as_deref()
790 && !u.is_empty()
791 {
792 return Some(u.to_string());
793 }
794 if database == "event_log"
795 && let Some(u) = db_config.event_log_url.as_deref()
796 && !u.is_empty()
797 {
798 return Some(u.to_string());
799 }
800 djogi::migrate::derive_per_database_url(&db_config.url, database)
801}
802
803pub fn apply_cmd(
813 workspace: Option<PathBuf>,
814 fake: bool,
815 reason: Option<String>,
816 node_id: Option<u32>,
817 single_node_dev: bool,
818) -> ExitCode {
819 let workspace = resolve_workspace(workspace);
820
821 let mode = if fake {
823 match reason {
824 Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
825 Some(_) => {
826 eprintln!(
827 "djogi migrations apply --fake: --reason must not be empty; \
828 supply a non-empty reason why these migrations are being \
829 faked (e.g. 'schema pre-exists from prior tooling')"
830 );
831 return ExitCode::from(2);
832 }
833 None => {
834 eprintln!(
835 "djogi migrations apply --fake: --reason is required; \
836 supply a reason why these migrations are being faked \
837 (e.g. 'schema pre-exists from prior tooling'). \
838 This is recorded in the ledger audit trail."
839 );
840 return ExitCode::from(2);
841 }
842 }
843 } else {
844 FakeMode::Real
845 };
846
847 let runtime = match tokio::runtime::Builder::new_current_thread()
848 .enable_all()
849 .build()
850 {
851 Ok(r) => r,
852 Err(e) => {
853 eprintln!("djogi migrations apply: tokio runtime: {e}");
854 return ExitCode::from(1);
855 }
856 };
857
858 let exit =
859 runtime.block_on(async { run_apply(&workspace, &mode, node_id, single_node_dev).await });
860 ExitCode::from(exit as u8)
861}
862
863#[derive(Debug, Clone)]
866enum FakeMode {
867 Real,
869 Fake { reason: String },
871}
872
873async fn run_apply(
875 workspace: &Path,
876 mode: &FakeMode,
877 node_id: Option<u32>,
878 single_node_dev: bool,
879) -> i32 {
880 use djogi::config::DjogiConfig;
881
882 let action_verb = match mode {
883 FakeMode::Real => "apply",
884 FakeMode::Fake { .. } => "fake-apply",
885 };
886 let progress_verb = match mode {
887 FakeMode::Real => "applying",
888 FakeMode::Fake { .. } => "faking",
889 };
890
891 let config = match DjogiConfig::load_from_workspace(workspace) {
893 Ok(c) => c,
894 Err(e) => {
895 eprintln!("djogi migrations {action_verb}: config load: {e}");
896 return 2;
897 }
898 };
899
900 let pending_files = match discover_pending_plans(workspace) {
904 Ok(pending_files) => pending_files,
905 Err(e) => {
906 eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
907 return 2;
908 }
909 };
910 if pending_files.is_empty() {
911 println!("No pending migrations to {action_verb}.");
912 return 0;
913 }
914
915 let runner_identity = match crate::identity::resolve_identity(
918 node_id,
919 single_node_dev,
920 &config.profile,
921 action_verb,
922 ) {
923 Ok(resolved) => Some(resolved.into_runner_identity()),
924 Err(e) => {
925 let _ = crate::identity::print_identity_error(action_verb, &e);
926 return 2;
927 }
928 };
929
930 let target_urls = match resolve_apply_target_urls(&pending_files, &config.database) {
935 Ok(urls) => urls,
936 Err(e) => {
937 eprintln!("djogi migrations {action_verb}: target routing: {e}");
938 return 2;
939 }
940 };
941 let mut contexts = std::collections::BTreeMap::<String, djogi::context::DjogiContext>::new();
942 for (database, url) in &target_urls {
943 match connect_and_check(url).await {
944 ContextOutcome::Ready(ctx) => {
945 contexts.insert(database.clone(), ctx);
946 }
947 ContextOutcome::UnsupportedVersion(e) => {
948 crate::print_support_boundary_error("migrations apply", &e);
949 return 2;
950 }
951 ContextOutcome::RuntimeError(msg) => {
952 eprintln!("djogi migrations {action_verb}: pool for '{database}': {msg}");
953 return 1;
954 }
955 }
956 }
957
958 let lock_path = workspace.join(LOCK_FILE_NAME);
960 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
961 Ok(g) => g,
962 Err(e) => {
963 eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
964 return 1;
965 }
966 };
967
968 let pending_files = match reconcile_pending_plans_after_lock(workspace, &pending_files) {
970 Ok(pending_files) => pending_files,
971 Err(e) => {
972 eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
973 return 2;
974 }
975 };
976
977 let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
979 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
980 Err(_) => None,
981 };
982
983 for pending_file in &pending_files {
987 let bucket_database = &pending_file.bucket.database;
988 let app_label = &pending_file.bucket.app;
989 let Some(ctx) = contexts.get_mut(bucket_database) else {
990 eprintln!(
991 "djogi migrations {action_verb}: internal error: missing context for database '{bucket_database}'"
992 );
993 return 1;
994 };
995 println!(" {progress_verb} {bucket_database}/{app_label}...");
996 let result = apply_one_pending(
997 ctx,
998 workspace,
999 pending_file,
1000 &config,
1001 &guard,
1002 audit_pool.as_ref(),
1003 mode,
1004 runner_identity,
1005 )
1006 .await;
1007
1008 match result {
1009 ApplyResult::Ok => match mode {
1010 FakeMode::Real => {
1011 println!("Applied: {bucket_database}/{app_label}");
1012 }
1013 FakeMode::Fake { .. } => {
1014 println!(
1015 " faked {bucket_database}/{app_label}: \
1016 recorded in ledger with status = 'faked' (no SQL executed)"
1017 );
1018 }
1019 },
1020 ApplyResult::Skipped(reason) => {
1021 println!("Skipped {bucket_database}/{app_label}: {reason}");
1022 }
1023 ApplyResult::Refused(reason) => {
1024 eprintln!(
1025 "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
1026 );
1027 return 2;
1028 }
1029 ApplyResult::RunnerError(e) => {
1030 eprintln!(
1031 "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
1032 );
1033 return runner_error_exit_code(&e);
1034 }
1035 }
1036 }
1037
1038 let summary_verb = match mode {
1039 FakeMode::Real => "applied",
1040 FakeMode::Fake { .. } => "faked",
1041 };
1042 println!("{summary_verb} {} migration(s).", pending_files.len());
1043 0
1044}
1045
1046#[derive(Debug)]
1048enum ApplyResult {
1049 Ok,
1051 Skipped(String),
1053 Refused(String),
1055 RunnerError(RunnerError),
1057}
1058
1059#[derive(Debug, Clone, PartialEq, Eq)]
1060struct DiscoveredPendingPlan {
1061 path: PathBuf,
1062 bucket: BucketKey,
1063 plan: PendingPlan,
1064 is_phase_zero: bool,
1065}
1066
1067fn is_acceptable_pending_path_component(bytes: &[u8]) -> bool {
1068 if bytes.is_empty() || bytes.len() > 63 {
1069 return false;
1070 }
1071 if bytes[0] == b'.' {
1072 return false;
1073 }
1074 let first = bytes[0];
1075 if first != b'_' && !first.is_ascii_alphabetic() {
1076 return false;
1077 }
1078 for &b in &bytes[1..] {
1079 if b != b'_' && !b.is_ascii_alphanumeric() {
1080 return false;
1081 }
1082 }
1083 true
1084}
1085
1086fn canonical_pending_filename(app_label: &str) -> String {
1087 format!("{}.json", djogi::migrate::app_dirname(app_label))
1088}
1089
1090fn validate_hidden_phase_zero_pending(
1091 path: PathBuf,
1092 database: &str,
1093) -> Result<DiscoveredPendingPlan, String> {
1094 let filename = path
1095 .file_name()
1096 .and_then(|f| f.to_str())
1097 .ok_or_else(|| format!("non-utf8 Phase 0 pending path {}", path.display()))?;
1098 let expected_filename = format!("{}.json", djogi::migrate::PHASE_ZERO_VERSION);
1099 if filename != expected_filename {
1100 return Err(format!(
1101 "hidden Phase 0 pending path {} must use canonical filename {}",
1102 path.display(),
1103 expected_filename
1104 ));
1105 }
1106 let plan = djogi::migrate::load_pending(&path)
1107 .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1108 if plan.bucket_database != database {
1109 return Err(format!(
1110 "pending JSON {} has bucket database {}, expected {} from path",
1111 path.display(),
1112 plan.bucket_database,
1113 database
1114 ));
1115 }
1116 if !plan.bucket_app.is_empty() {
1117 return Err(format!(
1118 "pending JSON {} must target the global bucket in hidden Phase 0 namespace",
1119 path.display()
1120 ));
1121 }
1122 if plan.version != djogi::migrate::PHASE_ZERO_VERSION {
1123 return Err(format!(
1124 "pending JSON {} must use Phase 0 version {}, found {}",
1125 path.display(),
1126 djogi::migrate::PHASE_ZERO_VERSION,
1127 plan.version
1128 ));
1129 }
1130 Ok(DiscoveredPendingPlan {
1131 path,
1132 bucket: BucketKey {
1133 database: database.to_string(),
1134 app: String::new(),
1135 },
1136 plan,
1137 is_phase_zero: true,
1138 })
1139}
1140
1141fn validate_normal_pending(
1142 path: PathBuf,
1143 database: &str,
1144 filename: &str,
1145) -> Result<DiscoveredPendingPlan, String> {
1146 let Some(stem) = filename.strip_suffix(".json") else {
1147 return Err(format!(
1148 "pending path {} must end with .json",
1149 path.display()
1150 ));
1151 };
1152 let app = if stem == "_global_" {
1153 String::new()
1154 } else {
1155 if !is_acceptable_pending_path_component(stem.as_bytes()) {
1156 return Err(format!(
1157 "pending path {} uses non-canonical app filename {}",
1158 path.display(),
1159 filename
1160 ));
1161 }
1162 stem.to_string()
1163 };
1164 let expected_filename = canonical_pending_filename(&app);
1165 if filename != expected_filename {
1166 return Err(format!(
1167 "pending path {} must use canonical filename {}",
1168 path.display(),
1169 expected_filename
1170 ));
1171 }
1172 let plan = djogi::migrate::load_pending(&path)
1173 .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1174 if plan.bucket_database != database {
1175 return Err(format!(
1176 "pending JSON {} has bucket database {}, expected {} from path",
1177 path.display(),
1178 plan.bucket_database,
1179 database
1180 ));
1181 }
1182 if plan.bucket_app != app {
1183 let expected_app = if app.is_empty() {
1184 "_global_"
1185 } else {
1186 app.as_str()
1187 };
1188 let found_app = if plan.bucket_app.is_empty() {
1189 "_global_"
1190 } else {
1191 plan.bucket_app.as_str()
1192 };
1193 return Err(format!(
1194 "pending JSON {} has bucket app {}, expected {} from path",
1195 path.display(),
1196 found_app,
1197 expected_app
1198 ));
1199 }
1200 if plan.version == djogi::migrate::PHASE_ZERO_VERSION {
1201 return Err(format!(
1202 "pending JSON {} must use the hidden .phase_zero namespace for Phase 0",
1203 path.display()
1204 ));
1205 }
1206 Ok(DiscoveredPendingPlan {
1207 path,
1208 bucket: BucketKey {
1209 database: database.to_string(),
1210 app,
1211 },
1212 is_phase_zero: false,
1213 plan,
1214 })
1215}
1216
1217fn discover_pending_plans(workspace: &Path) -> Result<Vec<DiscoveredPendingPlan>, String> {
1222 let pending_root = djogi::migrate::pending_root(workspace);
1223 let mut out = Vec::new();
1224 let mut seen_identities = std::collections::BTreeSet::new();
1225
1226 let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
1227 return Ok(out);
1228 };
1229
1230 for db_entry in db_entries.flatten() {
1231 let db_name = match db_entry.file_name().to_str().map(str::to_string) {
1232 Some(n) => n,
1233 None => continue,
1234 };
1235 if !is_acceptable_pending_path_component(db_name.as_bytes()) {
1236 continue;
1237 }
1238
1239 let db_dir = db_entry.path();
1240 if !db_dir.is_dir() {
1241 continue;
1242 }
1243
1244 let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
1245 continue;
1246 };
1247
1248 for app_entry in app_entries.flatten() {
1249 let path = app_entry.path();
1250 let file_type = match app_entry.file_type() {
1251 Ok(file_type) => file_type,
1252 Err(_) => continue,
1253 };
1254 if file_type.is_dir() {
1255 if app_entry.file_name().to_str() == Some(".phase_zero") {
1256 let Ok(phase_zero_entries) = std::fs::read_dir(&path) else {
1257 continue;
1258 };
1259 for phase_zero_entry in phase_zero_entries.flatten() {
1260 let phase_zero_path = phase_zero_entry.path();
1261 if !phase_zero_path.is_file() {
1262 continue;
1263 }
1264 let discovered =
1265 validate_hidden_phase_zero_pending(phase_zero_path, &db_name)?;
1266 let identity = (
1267 discovered.bucket.database.clone(),
1268 discovered.bucket.app.clone(),
1269 discovered.plan.version.clone(),
1270 );
1271 if !seen_identities.insert(identity.clone()) {
1272 return Err(format!(
1273 "duplicate pending identity discovered for {}/{}/{}",
1274 identity.0,
1275 if identity.1.is_empty() {
1276 "_global_"
1277 } else {
1278 identity.1.as_str()
1279 },
1280 identity.2
1281 ));
1282 }
1283 out.push(discovered);
1284 }
1285 }
1286 continue;
1287 }
1288 if !file_type.is_file() {
1289 continue;
1290 }
1291 let filename = match path.file_name().and_then(|f| f.to_str()) {
1292 Some(f) => f.to_string(),
1293 None => continue,
1294 };
1295 if !filename.ends_with(".json") {
1296 continue;
1297 }
1298 let discovered = validate_normal_pending(path, &db_name, &filename)?;
1299 let identity = (
1300 discovered.bucket.database.clone(),
1301 discovered.bucket.app.clone(),
1302 discovered.plan.version.clone(),
1303 );
1304 if !seen_identities.insert(identity.clone()) {
1305 return Err(format!(
1306 "duplicate pending identity discovered for {}/{}/{}",
1307 identity.0,
1308 if identity.1.is_empty() {
1309 "_global_"
1310 } else {
1311 identity.1.as_str()
1312 },
1313 identity.2
1314 ));
1315 }
1316 out.push(discovered);
1317 }
1318 }
1319
1320 out.sort_by(|a, b| {
1323 a.plan
1324 .version
1325 .cmp(&b.plan.version)
1326 .then_with(|| b.is_phase_zero.cmp(&a.is_phase_zero))
1327 .then_with(|| a.path.cmp(&b.path))
1328 });
1329
1330 let out = order_pending_groups_by_dependencies(out)?;
1336
1337 Ok(out)
1338}
1339
1340fn order_pending_groups_by_dependencies(
1351 out: Vec<DiscoveredPendingPlan>,
1352) -> Result<Vec<DiscoveredPendingPlan>, String> {
1353 let mut result = Vec::with_capacity(out.len());
1356 let mut i = 0;
1357 while i < out.len() {
1358 let mut j = i + 1;
1359 while j < out.len()
1360 && out[j].bucket.database == out[i].bucket.database
1361 && out[j].plan.version == out[i].plan.version
1362 && out[j].is_phase_zero == out[i].is_phase_zero
1363 {
1364 j += 1;
1365 }
1366
1367 for entry in &out[i..j] {
1374 for dep_app in &entry.plan.depends_on {
1375 if !is_acceptable_pending_path_component(dep_app.as_bytes()) {
1376 return Err(format!(
1377 "pending plan for {}/{} has invalid depends_on label {:?}",
1378 entry.bucket.database, entry.bucket.app, dep_app,
1379 ));
1380 }
1381 }
1382 }
1383
1384 if j - i <= 1 {
1386 result.append(&mut out[i..j].to_vec());
1388 i = j;
1389 continue;
1390 }
1391
1392 let database = &out[i].bucket.database;
1393 let version = &out[i].plan.version;
1394
1395 let group_len = j - i;
1397 let mut in_degree = vec![0usize; group_len];
1398 let mut reverse: Vec<Vec<usize>> = vec![Vec::new(); group_len];
1399
1400 let app_to_idx: std::collections::HashMap<&str, usize> = out[i..j]
1402 .iter()
1403 .enumerate()
1404 .map(|(idx, entry)| (entry.bucket.app.as_str(), idx))
1405 .collect();
1406
1407 for (k_idx, entry) in out[i..j].iter().enumerate() {
1408 for dep_app in &entry.plan.depends_on {
1409 let Some(&dep_idx) = app_to_idx.get(dep_app.as_str()) else {
1410 continue; };
1412 if dep_idx != k_idx {
1413 in_degree[k_idx] += 1;
1414 reverse[dep_idx].push(k_idx);
1415 }
1416 }
1417 }
1418
1419 let mut ready: std::collections::BTreeSet<usize> =
1421 (0..group_len).filter(|&idx| in_degree[idx] == 0).collect();
1422
1423 let mut ordered = Vec::with_capacity(group_len);
1424 while let Some(idx) = ready.iter().next().cloned() {
1425 ready.remove(&idx);
1426 ordered.push(idx);
1427 for &dependent in &reverse[idx] {
1428 in_degree[dependent] -= 1;
1429 if in_degree[dependent] == 0 {
1430 ready.insert(dependent);
1431 }
1432 }
1433 }
1434
1435 if ordered.len() != group_len {
1436 let mut chain: Vec<String> = (0..group_len)
1437 .filter(|&idx| in_degree[idx] > 0)
1438 .map(|idx| out[i + idx].bucket.app.clone())
1439 .collect();
1440 chain.sort();
1441 return Err(format!(
1442 "pending migrations for database `{database}` version `{version}` \
1443 declare a dependency cycle between apps: {chain:?}; \
1444 recompose or inspect hand-edited pending files"
1445 ));
1446 }
1447
1448 for idx in ordered {
1449 result.push(out[i + idx].clone());
1450 }
1451 i = j;
1452 }
1453
1454 Ok(result)
1455}
1456
1457fn load_verified_pending_for_apply(
1458 pending_file: &DiscoveredPendingPlan,
1459) -> Result<PendingPlan, String> {
1460 let pending_bytes =
1461 std::fs::read(&pending_file.path).map_err(|e| format!("read pending JSON: {e}"))?;
1462 let pending: PendingPlan =
1463 serde_json::from_slice(&pending_bytes).map_err(|e| format!("parse pending JSON: {e}"))?;
1464 if pending != pending_file.plan {
1465 return Err(format!(
1466 "pending JSON changed after discovery at {}; rerun the command",
1467 pending_file.path.display()
1468 ));
1469 }
1470 Ok(pending)
1471}
1472
1473fn resolve_apply_target_urls(
1474 pending_files: &[DiscoveredPendingPlan],
1475 db_config: &djogi::config::DatabaseConfig,
1476) -> Result<std::collections::BTreeMap<String, String>, String> {
1477 let mut urls = std::collections::BTreeMap::new();
1478 for pending_file in pending_files {
1479 let database = &pending_file.bucket.database;
1480 if urls.contains_key(database) {
1481 continue;
1482 }
1483 let Some(url) = resolve_bucket_url(db_config, database) else {
1484 return Err(format!("cannot derive a database URL for `{database}`"));
1485 };
1486 urls.insert(database.clone(), url);
1487 }
1488 Ok(urls)
1489}
1490
1491fn reconcile_pending_plans_after_lock(
1492 workspace: &Path,
1493 pre_lock_pending_files: &[DiscoveredPendingPlan],
1494) -> Result<Vec<DiscoveredPendingPlan>, String> {
1495 let locked_pending_files = discover_pending_plans(workspace)?;
1496 if locked_pending_files != pre_lock_pending_files {
1497 return Err(
1498 "pending migration set changed while waiting for the workspace lock; rerun the command"
1499 .to_string(),
1500 );
1501 }
1502 Ok(locked_pending_files)
1503}
1504
1505#[allow(clippy::too_many_arguments)]
1523#[djogi::deliberately_bypass_convention_with_raw_sql]
1524async fn apply_one_pending(
1531 ctx: &mut djogi::context::DjogiContext,
1532 workspace: &Path,
1533 pending_file: &DiscoveredPendingPlan,
1534 config: &djogi::config::DjogiConfig,
1535 guard: &djogi::migrate::WorkspaceGuard,
1536 audit_pool: Option<&deadpool_postgres::Pool>,
1537 mode: &FakeMode,
1538 runner_identity: Option<djogi::migrate::RunnerIdentity>,
1539) -> ApplyResult {
1540 let pending = match load_verified_pending_for_apply(pending_file) {
1542 Ok(pending) => pending,
1543 Err(e) => return ApplyResult::Refused(e),
1544 };
1545
1546 let bucket = pending_file.bucket.clone();
1547
1548 match check_ledger_state(ctx, &pending.version, &bucket.app).await {
1550 LedgerState::NotPresent => {} LedgerState::AlreadyApplied => {
1552 return ApplyResult::Skipped("already applied".to_string());
1553 }
1554 LedgerState::PendingOrPartial(existing_status) => {
1555 if existing_status == LedgerStatus::Failed
1559 || existing_status == LedgerStatus::RolledBack
1560 {
1561 if pending.version == djogi::migrate::PHASE_ZERO_VERSION {
1567 let cleanup_refusal = classify_phase_zero_for_cleanup(
1568 workspace,
1569 &bucket,
1570 &pending.version,
1571 &pending.checksum_up,
1572 pending.checksum_down.as_deref(),
1573 );
1574 if let Some(reason) = cleanup_refusal {
1575 return ApplyResult::Refused(format!(
1576 "Phase 0 cleanup refused: {reason}; \
1577 refusing before deleting {} row to prevent stale replay",
1578 existing_status.as_db_str()
1579 ));
1580 }
1581 }
1582
1583 if let Err(e) =
1587 delete_reapply_blocking_ledger_row(ctx, &pending.version, &bucket.app).await
1588 {
1589 return ApplyResult::Refused(format!(
1590 "clean {} ledger row: {e}",
1591 existing_status.as_db_str()
1592 ));
1593 }
1594 } else {
1595 return ApplyResult::Refused(format!(
1596 "version already in {} state — resolve before re-applying",
1597 existing_status.as_db_str()
1598 ));
1599 }
1600 }
1601 }
1602
1603 let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1605 workspace,
1606 &bucket,
1607 &pending.version,
1608 &pending.checksum_up,
1609 pending.checksum_down.as_deref(),
1610 ) {
1611 Ok(result) => result,
1612 Err(e) => {
1613 return ApplyResult::Refused(format!("load replay plan: {e}"));
1614 }
1615 };
1616
1617 let runner_ctx = RunnerCtx {
1619 bucket: bucket.clone(),
1620 version: pending.version.clone(),
1621 description: pending.slug.clone(),
1622 checksum_up,
1623 checksum_down,
1624 snapshot: Some(pending.model_snapshot.clone()),
1625 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
1626 config: djogi::config::MigrateConfig {
1628 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1629 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1630 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1631 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1632 },
1633 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1634 audit_pool: audit_pool.cloned(),
1635 runner_identity,
1636 };
1637
1638 let runner_result = match mode {
1640 FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1641 FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1642 };
1643 match runner_result {
1644 Ok(_) => ApplyResult::Ok,
1645 Err(e) => ApplyResult::RunnerError(e),
1646 }
1647}
1648
1649#[derive(Debug)]
1651enum LedgerState {
1652 NotPresent,
1654 AlreadyApplied,
1656 PendingOrPartial(LedgerStatus),
1658}
1659
1660async fn check_ledger_state(
1662 ctx: &mut djogi::context::DjogiContext,
1663 version: &str,
1664 app_label: &str,
1665) -> LedgerState {
1666 let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1667 return LedgerState::NotPresent;
1670 };
1671
1672 let existing = rows
1673 .iter()
1674 .find(|r| r.version == version && r.app_label == app_label);
1675 match existing {
1676 None => LedgerState::NotPresent,
1677 Some(row) => match row.status {
1678 LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1679 LedgerState::AlreadyApplied
1680 }
1681 LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1682 LedgerState::PendingOrPartial(row.status)
1683 }
1684 },
1685 }
1686}
1687
1688fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1693 1
1694}
1695
1696#[djogi::deliberately_bypass_convention_with_raw_sql]
1697async fn delete_reapply_blocking_ledger_row(
1703 ctx: &mut djogi::context::DjogiContext,
1704 version: &str,
1705 app_label: &str,
1706) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1707 ctx.raw_execute(
1708 "DELETE FROM djogi_schema_migrations \
1709 WHERE version = $1 AND app_label = $2",
1710 &[&version, &app_label],
1711 )
1712 .await?;
1713 Ok(())
1714}
1715
1716fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1718 let migrations_root = djogi::migrate::migrations_root(workspace);
1719 migrations_root
1720 .join(&bucket.database)
1721 .join(djogi::migrate::app_dirname(&bucket.app))
1722 .join("schema_snapshot.json")
1723}
1724
1725#[allow(clippy::too_many_arguments)]
1752pub fn attune_cmd(
1753 target: Option<&str>,
1754 apply: bool,
1755 record: bool,
1756 record_ledger: bool,
1757 record_reason: &str,
1758 squash: bool,
1759 from: Option<&str>,
1760 publish: bool,
1761 app: Option<&str>,
1762 workspace: Option<PathBuf>,
1763) -> ExitCode {
1764 let workspace = resolve_workspace(workspace);
1765 let mode = match (record_ledger, squash) {
1766 (false, false) => AttuneMode::DiffOnly,
1767 (true, false) => AttuneMode::Record {
1768 reason: record_reason.to_string(),
1769 },
1770 (false, true) => match from {
1771 Some(v) if !v.is_empty() => AttuneMode::Squash {
1772 from: v.to_string(),
1773 publish,
1774 app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1775 },
1776 _ => {
1777 eprintln!(
1778 "djogi migrations attune --squash requires --from <version> (e.g. \
1779 `--from V20260101000000__init`)"
1780 );
1781 return ExitCode::from(2);
1782 }
1783 },
1784 (true, true) => {
1785 eprintln!(
1788 "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1789 );
1790 return ExitCode::from(2);
1791 }
1792 };
1793
1794 let runtime = match tokio::runtime::Builder::new_current_thread()
1795 .enable_all()
1796 .build()
1797 {
1798 Ok(r) => r,
1799 Err(e) => {
1800 eprintln!("djogi migrations attune: tokio runtime: {e}");
1801 return ExitCode::from(1);
1802 }
1803 };
1804
1805 let target_owned = target.map(str::to_string);
1806 let exit =
1807 runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1808 ExitCode::from(exit as u8)
1809}
1810
1811async fn run_attune(
1814 workspace: &Path,
1815 mode: AttuneMode,
1816 target: Option<String>,
1817 apply: bool,
1818 record: bool,
1819) -> i32 {
1820 use djogi::config::DjogiConfig;
1821
1822 let config = match DjogiConfig::load_from_workspace(workspace) {
1823 Ok(c) => c,
1824 Err(e) => {
1825 eprintln!("djogi migrations attune: config load: {e}");
1826 return 1;
1827 }
1828 };
1829
1830 let mut ctx = match connect_and_check(&config.database.url).await {
1831 ContextOutcome::Ready(ctx) => ctx,
1832 ContextOutcome::UnsupportedVersion(e) => {
1833 crate::print_support_boundary_error("migrations attune", &e);
1834 return 2;
1835 }
1836 ContextOutcome::RuntimeError(msg) => {
1837 eprintln!("djogi migrations attune: pool: {msg}");
1838 return 1;
1839 }
1840 };
1841
1842 let lock_path = workspace.join(LOCK_FILE_NAME);
1846 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1847 Ok(g) => g,
1848 Err(e) => {
1849 eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
1850 return 1;
1851 }
1852 };
1853
1854 let req = AttuneRequest {
1855 workspace_root: workspace,
1856 database_url: &config.database.url,
1857 profile: &config.profile,
1858 dev_mode: config.database.dev_mode,
1862 target: target.as_deref(),
1867 apply,
1868 record,
1869 mode,
1870 _guard: &guard,
1871 };
1872 match attune(&mut ctx, req).await {
1873 Ok(report) => {
1874 if report.entries.is_empty() {
1875 println!("attune: no drift");
1876 } else {
1877 for entry in &report.entries {
1878 let app_display = if entry.bucket.app.is_empty() {
1879 "_global_"
1880 } else {
1881 entry.bucket.app.as_str()
1882 };
1883 println!(
1884 " {kind:<10} {database}/{app} {version}",
1885 kind = entry.kind.as_str(),
1886 database = entry.bucket.database,
1887 app = app_display,
1888 version = entry.version,
1889 );
1890 }
1891 }
1892 for diag in &report.diagnostics {
1896 println!(" diagnostic: {diag}");
1897 }
1898 if let Some(sha) = &report.resolved_target {
1899 println!("resolved target: {sha}");
1900 }
1901 if let Some(squashed) = &report.squashed_to {
1902 println!("squashed to: {squashed}");
1903 }
1904 if report.published {
1905 println!("published to remote");
1906 }
1907 if report.parent_pointer_updated {
1908 println!("parent submodule pointer updated");
1909 }
1910 0
1911 }
1912 Err(e) => {
1913 eprintln!("djogi migrations attune: {e}");
1914 attune_error_exit_code(&e)
1915 }
1916 }
1917}
1918
1919fn attune_error_exit_code(err: &AttuneError) -> i32 {
1934 match err {
1935 AttuneError::Refused(_) => 2,
1936 AttuneError::FilesystemScanFailed { .. }
1937 | AttuneError::LedgerQueryFailed { .. }
1938 | AttuneError::SqlReadFailed { .. }
1939 | AttuneError::SqlWriteFailed { .. }
1940 | AttuneError::SqlDeleteFailed { .. }
1941 | AttuneError::GitPublishFailed { .. }
1942 | AttuneError::GitTargetResolveFailed { .. }
1943 | AttuneError::GitFetchFailed { .. }
1944 | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
1945 }
1946}
1947
1948pub fn verify_cmd(
1956 provider: &dyn DescriptorProvider,
1957 workspace: Option<PathBuf>,
1958 strict: bool,
1959) -> ExitCode {
1960 let workspace = resolve_workspace(workspace);
1961
1962 let runtime = match tokio::runtime::Builder::new_current_thread()
1963 .enable_all()
1964 .build()
1965 {
1966 Ok(r) => r,
1967 Err(e) => {
1968 eprintln!("djogi migrations verify: tokio runtime: {e}");
1969 return ExitCode::from(1);
1970 }
1971 };
1972
1973 let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
1974 ExitCode::from(exit as u8)
1975}
1976
1977async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
1992 use djogi::config::DjogiConfig;
1993
1994 if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
2006 crate::print_zero_descriptor_diagnostic("migrations verify");
2007 return 2;
2008 }
2009
2010 let config = match DjogiConfig::load_from_workspace(workspace) {
2012 Ok(c) => c,
2013 Err(e) => {
2014 eprintln!("djogi migrations verify: config load: {e}");
2015 return 1;
2016 }
2017 };
2018
2019 let models = match project_from_provider(provider) {
2021 Ok(m) => m,
2022 Err(e) => {
2023 eprintln!("djogi migrations verify: projection error: {e}");
2024 return 1;
2025 }
2026 };
2027
2028 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
2034 models.keys().cloned().collect();
2035 for bucket in discover_snapshot_buckets_on_disk(workspace) {
2036 bucket_set.insert(bucket);
2037 }
2038 if bucket_set.is_empty() {
2045 crate::print_zero_descriptor_diagnostic("migrations verify");
2046 return 2;
2047 }
2048
2049 let policy = djogi::config::PolicyConfig {
2051 strict_out_of_order: strict,
2052 };
2053
2054 let database_has_models: std::collections::HashSet<String> = bucket_set
2061 .iter()
2062 .filter(|b| {
2063 models
2064 .get(*b)
2065 .map(|s| !s.models.is_empty())
2066 .unwrap_or(false)
2067 })
2068 .map(|b| b.database.clone())
2069 .collect();
2070
2071 let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
2077 std::collections::BTreeMap::new();
2078 let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
2079 let mut exit_code: i32 = 0;
2080
2081 for bucket in &bucket_set {
2083 let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
2085 let bd = if bucket.app.is_empty() {
2086 "_global_"
2087 } else {
2088 &bucket.app
2089 };
2090 eprintln!(
2091 "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
2092 check that config.database.url has a valid path component",
2093 bucket.database, bucket.database, bd
2094 );
2095 exit_code = 1;
2096 continue;
2097 };
2098
2099 if !contexts.contains_key(&bucket.database) {
2103 match connect_and_check(&url).await {
2104 ContextOutcome::Ready(ctx) => {
2105 contexts.insert(bucket.database.clone(), ctx);
2106 }
2107 ContextOutcome::UnsupportedVersion(e) => {
2108 crate::print_support_boundary_error("migrations verify", &e);
2109 return 2;
2110 }
2111 ContextOutcome::RuntimeError(msg) => {
2112 eprintln!(
2113 "djogi migrations verify: pool for '{}': {msg}",
2114 bucket.database
2115 );
2116 exit_code = 1;
2117 continue;
2118 }
2119 }
2120 }
2121
2122 let snap_path = snapshot_path(workspace, bucket);
2127 let snapshot = match load_snapshot(&snap_path) {
2128 Ok(s) => s,
2129 Err(SnapshotError::Io { source, .. })
2130 if source.kind() == std::io::ErrorKind::NotFound =>
2131 {
2132 let bd = if bucket.app.is_empty() {
2133 "_global_"
2134 } else {
2135 &bucket.app
2136 };
2137 let has_models = models
2138 .get(bucket)
2139 .map(|s| !s.models.is_empty())
2140 .unwrap_or(false);
2141 if has_models {
2142 eprintln!(
2143 "djogi migrations verify: {}/{} has registered models but no \
2144 snapshot; run `djogi migrations compose` then \
2145 `djogi migrations apply` to record a baseline",
2146 bucket.database, bd
2147 );
2148 exit_code = 1;
2149 } else {
2150 println!("No snapshot found for bucket {}/{}", bucket.database, bd);
2151 }
2152 continue;
2153 }
2154 Err(e) => {
2155 let bd = if bucket.app.is_empty() {
2156 "_global_"
2157 } else {
2158 &bucket.app
2159 };
2160 eprintln!(
2161 "djogi migrations verify: load snapshot for {}/{}: {e}",
2162 bucket.database, bd
2163 );
2164 exit_code = 1;
2165 continue;
2166 }
2167 };
2168
2169 let db_has_models = database_has_models.contains(&bucket.database);
2174 let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
2175
2176 let ctx = contexts
2178 .get_mut(&bucket.database)
2179 .expect("context inserted above");
2180 let report = match djogi::migrate::verify_bucket(
2181 ctx,
2182 bucket,
2183 &snapshot,
2184 &policy,
2185 emit_ledger,
2186 db_has_models,
2187 )
2188 .await
2189 {
2190 Ok(r) => r,
2191 Err(e) => {
2192 let bd = if bucket.app.is_empty() {
2193 "_global_"
2194 } else {
2195 &bucket.app
2196 };
2197 eprintln!(
2198 "djogi migrations verify: error for {}/{}: {e}",
2199 bucket.database, bd
2200 );
2201 exit_code = 1;
2202 continue;
2203 }
2204 };
2205
2206 for line in render_verify_report(&report, bucket) {
2208 println!("{line}");
2209 }
2210 if report.has_errors() {
2211 exit_code = 1;
2212 }
2213 }
2214
2215 exit_code
2216}
2217
2218fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
2226 let mut lines: Vec<String> = Vec::new();
2227
2228 let app_display = if bucket.app.is_empty() {
2229 "_global_"
2230 } else {
2231 &bucket.app
2232 };
2233 lines.push(format!(
2234 "djogi migrations verify — {}/{}",
2235 bucket.database, app_display
2236 ));
2237 lines.push("──────────────────────────────────────────".to_string());
2238
2239 match (
2240 &report.latest_applied_version,
2241 report.applied_count,
2242 report.unfinished_count,
2243 ) {
2244 (Some(version), applied, 0) => {
2245 lines.push(format!("Ledger: {applied} applied, latest {version}"));
2246 }
2247 (Some(version), applied, unfinished) => {
2248 lines.push(format!(
2249 "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
2250 ));
2251 }
2252 (None, 0, 0) => {
2253 lines.push("Ledger: empty (no migrations applied yet)".to_string());
2254 }
2255 _ => {}
2256 }
2257 lines.push(String::new());
2258
2259 if report.diagnostics.is_empty() {
2260 lines.push("No drift detected. Schema is consistent.".to_string());
2261 } else {
2262 for d in &report.diagnostics {
2263 let severity = match d.severity {
2264 VerifySeverity::Info => "INFO",
2265 VerifySeverity::Warning => "WARN",
2266 VerifySeverity::Error => "ERROR",
2267 };
2268 let location = d.location.as_deref().unwrap_or("-");
2269 lines.push(format!(
2270 "[{severity}] {code} ({loc}): {msg}",
2271 severity = severity,
2272 code = d.code,
2273 loc = location,
2274 msg = d.message
2275 ));
2276 }
2277 }
2278
2279 let errors = report
2280 .diagnostics
2281 .iter()
2282 .filter(|d| d.severity == VerifySeverity::Error)
2283 .count();
2284 let warnings = report
2285 .diagnostics
2286 .iter()
2287 .filter(|d| d.severity == VerifySeverity::Warning)
2288 .count();
2289 let infos = report
2290 .diagnostics
2291 .iter()
2292 .filter(|d| d.severity == VerifySeverity::Info)
2293 .count();
2294
2295 if errors > 0 {
2296 lines.push(String::new());
2297 lines.push(format!(
2298 "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
2299 ));
2300 } else if warnings > 0 {
2301 lines.push(String::new());
2302 lines.push(format!(
2303 "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
2304 ));
2305 } else {
2306 lines.push(String::new());
2307 lines.push(format!("Result: PASSED ({infos} info(s))"));
2308 }
2309
2310 lines
2311}
2312
2313impl From<PartialApplyResolutionCli> for PartialApplyResolution {
2316 fn from(cli: PartialApplyResolutionCli) -> Self {
2317 match cli {
2318 PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
2319 PartialApplyResolutionCli::Faked => Self::MarkFaked,
2320 PartialApplyResolutionCli::Applied => Self::MarkApplied,
2321 }
2322 }
2323}
2324
2325pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
2330 match command {
2331 RepairSubcommand::ChecksumDrift {
2332 version,
2333 app,
2334 database,
2335 checksum_up,
2336 checksum_down,
2337 workspace,
2338 } => repair_checksum_drift_cmd(
2339 &version,
2340 app.as_deref(),
2341 database.as_deref(),
2342 checksum_up.as_deref(),
2343 checksum_down.as_deref(),
2344 workspace,
2345 ),
2346 RepairSubcommand::PartialApply {
2347 version,
2348 resolution,
2349 note,
2350 app,
2351 database,
2352 workspace,
2353 } => repair_partial_apply_cmd(
2354 &version,
2355 resolution.into(),
2356 ¬e,
2357 app.as_deref(),
2358 database.as_deref(),
2359 workspace,
2360 ),
2361 RepairSubcommand::ResumePartial {
2362 version,
2363 app,
2364 database,
2365 workspace,
2366 node_id,
2367 single_node_dev,
2368 } => repair_resume_partial_apply_cmd(
2369 &version,
2370 app.as_deref(),
2371 database.as_deref(),
2372 workspace,
2373 node_id,
2374 single_node_dev,
2375 ),
2376 RepairSubcommand::SnapshotRebuild {
2377 app,
2378 database,
2379 snapshot_path,
2380 workspace,
2381 } => repair_snapshot_rebuild_cmd(
2382 app.as_deref(),
2383 database.as_deref(),
2384 snapshot_path.as_deref(),
2385 workspace,
2386 ),
2387 }
2388}
2389
2390fn render_repair_report(report: &RepairReport) {
2394 for action in &report.actions_taken {
2395 println!(" {action}");
2396 }
2397 if !report.ledger_changes.is_empty() {
2398 println!("Ledger changes:");
2399 for lc in &report.ledger_changes {
2400 println!(
2401 " {} | {} | {} -> {}",
2402 lc.version, lc.column, lc.before, lc.after,
2403 );
2404 }
2405 }
2406 if !report.snapshot_changes.is_empty() {
2407 println!("Snapshot changes:");
2408 for sc in &report.snapshot_changes {
2409 println!(" {} | {}", sc.path.display(), sc.description);
2410 }
2411 }
2412}
2413
2414fn repair_error_exit_code(err: &RepairError) -> i32 {
2428 match err {
2429 RepairError::LedgerIo { .. } | RepairError::SnapshotIo { .. } | RepairError::AdvisoryLockFailed { .. } | RepairError::AdvisoryLockQueryFailed { .. } | RepairError::PinnedSessionCheckoutFailed { .. } | RepairError::ResumeStepFailed { .. } | RepairError::ResumeProgressAckFailed { .. } => 1,
2440
2441 RepairError::VersionNotFound { .. }
2445 | RepairError::InsufficientConfirmation
2446 | RepairError::InvalidChecksum { .. }
2447 | RepairError::InvalidResolution { .. }
2448 | RepairError::BucketAppMismatch { .. }
2449 | RepairError::PlanVersionMismatch { .. }
2450 | RepairError::PlanChecksumMismatch { .. }
2451 | RepairError::LeafIdentityMismatch { .. }
2452 | RepairError::NothingToResume { .. }
2453 | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
2454 | RepairError::SuppliedSnapshotDiverges { .. }
2455 | RepairError::AdvisoryUnlockReturnedFalse { .. } | RepairError::ResumePlanShapeMismatch { .. }
2457 | RepairError::ReplayPlanShapeMismatch { .. }
2458 | RepairError::PhaseZeroArtifactRefused { .. } | RepairError::MissingResumeIdentity { .. } => 2,
2461 }
2462}
2463
2464fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
2471 database.unwrap_or("main").to_string()
2472}
2473
2474fn compute_checksum_up_from_disk(
2488 workspace: &Path,
2489 bucket: &djogi::migrate::BucketKey,
2490 version: &str,
2491) -> std::io::Result<String> {
2492 let path =
2493 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
2494 let sql = std::fs::read_to_string(&path)?;
2495 Ok(djogi::migrate::compute_committed_sql_checksum(
2496 &sql,
2497 djogi::migrate::ResetSqlSide::Up,
2498 ))
2499}
2500
2501fn compute_checksum_down_from_disk(
2511 workspace: &Path,
2512 bucket: &djogi::migrate::BucketKey,
2513 version: &str,
2514) -> std::io::Result<Option<String>> {
2515 let path =
2516 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
2517 let sql = match std::fs::read_to_string(&path) {
2518 Ok(s) => s,
2519 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
2520 Err(e) => return Err(e),
2521 };
2522 Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
2523}
2524
2525pub fn repair_checksum_drift_cmd(
2532 version: &str,
2533 app: Option<&str>,
2534 database: Option<&str>,
2535 checksum_up: Option<&str>,
2536 checksum_down: Option<&str>,
2537 workspace: Option<PathBuf>,
2538) -> ExitCode {
2539 let workspace = resolve_workspace(workspace);
2540 let runtime = match tokio::runtime::Builder::new_current_thread()
2541 .enable_all()
2542 .build()
2543 {
2544 Ok(r) => r,
2545 Err(e) => {
2546 eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
2547 return ExitCode::from(1);
2548 }
2549 };
2550 let exit = runtime.block_on(async {
2551 run_repair_checksum_drift(
2552 &workspace,
2553 version,
2554 app,
2555 database,
2556 checksum_up,
2557 checksum_down,
2558 )
2559 .await
2560 });
2561 ExitCode::from(exit as u8)
2562}
2563
2564async fn run_repair_checksum_drift(
2566 workspace: &Path,
2567 version: &str,
2568 app: Option<&str>,
2569 database: Option<&str>,
2570 checksum_up: Option<&str>,
2571 checksum_down: Option<&str>,
2572) -> i32 {
2573 use djogi::config::DjogiConfig;
2574
2575 let config = match DjogiConfig::load_from_workspace(workspace) {
2576 Ok(c) => c,
2577 Err(e) => {
2578 eprintln!("djogi migrations repair checksum-drift: config load: {e}");
2579 return 1;
2580 }
2581 };
2582
2583 let db_name = resolve_database(database, &config);
2588 let url = match resolve_bucket_url(&config.database, &db_name) {
2589 Some(u) => u,
2590 None => {
2591 eprintln!(
2592 "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
2593 );
2594 return 2;
2595 }
2596 };
2597
2598 let mut ctx = match connect_and_check(&url).await {
2599 ContextOutcome::Ready(ctx) => ctx,
2600 ContextOutcome::UnsupportedVersion(e) => {
2601 crate::print_support_boundary_error("migrations repair checksum-drift", &e);
2602 return 2;
2603 }
2604 ContextOutcome::RuntimeError(msg) => {
2605 eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
2606 return 1;
2607 }
2608 };
2609
2610 let lock_path = workspace.join(LOCK_FILE_NAME);
2611 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2612 Ok(g) => g,
2613 Err(e) => {
2614 eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
2615 return 1;
2616 }
2617 };
2618
2619 let app_label = app.unwrap_or("");
2620 let bucket = BucketKey {
2621 database: db_name,
2622 app: app_label.to_string(),
2623 };
2624
2625 let new_checksum_up = match checksum_up {
2626 Some(c) => c.to_string(),
2627 None => {
2628 match compute_checksum_up_from_disk(workspace, &bucket, version) {
2634 Ok(cs) => cs,
2635 Err(e) => {
2636 eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
2637 return 1;
2638 }
2639 }
2640 }
2641 };
2642
2643 let resolved_checksum_down = match checksum_down {
2644 Some(c) => Some(c.to_string()),
2645 None => {
2646 match compute_checksum_down_from_disk(workspace, &bucket, version) {
2651 Ok(cs_opt) => cs_opt,
2652 Err(e) => {
2653 eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
2654 return 1;
2655 }
2656 }
2657 }
2658 };
2659
2660 match repair_checksum_drift(
2661 &mut ctx,
2662 &guard,
2663 &bucket,
2664 version,
2665 workspace,
2666 &new_checksum_up,
2667 resolved_checksum_down.as_deref(),
2668 RepairConfirmation::OperatorAcknowledged,
2669 )
2670 .await
2671 {
2672 Ok(report) => {
2673 render_repair_report(&report);
2674 0
2675 }
2676 Err(e) => {
2677 eprintln!("djogi migrations repair checksum-drift: {e}");
2678 repair_error_exit_code(&e)
2679 }
2680 }
2681}
2682
2683pub fn repair_partial_apply_cmd(
2688 version: &str,
2689 resolution: PartialApplyResolution,
2690 note: &str,
2691 app: Option<&str>,
2692 database: Option<&str>,
2693 workspace: Option<PathBuf>,
2694) -> ExitCode {
2695 let workspace = resolve_workspace(workspace);
2696 let runtime = match tokio::runtime::Builder::new_current_thread()
2697 .enable_all()
2698 .build()
2699 {
2700 Ok(r) => r,
2701 Err(e) => {
2702 eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
2703 return ExitCode::from(1);
2704 }
2705 };
2706 let exit = runtime.block_on(async {
2707 run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
2708 });
2709 ExitCode::from(exit as u8)
2710}
2711
2712async fn run_repair_partial_apply(
2714 workspace: &Path,
2715 version: &str,
2716 resolution: PartialApplyResolution,
2717 note: &str,
2718 app: Option<&str>,
2719 database: Option<&str>,
2720) -> i32 {
2721 use djogi::config::DjogiConfig;
2722
2723 let config = match DjogiConfig::load_from_workspace(workspace) {
2724 Ok(c) => c,
2725 Err(e) => {
2726 eprintln!("djogi migrations repair partial-apply: config load: {e}");
2727 return 1;
2728 }
2729 };
2730
2731 let db_name = resolve_database(database, &config);
2736 let url = match resolve_bucket_url(&config.database, &db_name) {
2737 Some(u) => u,
2738 None => {
2739 eprintln!(
2740 "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
2741 );
2742 return 2;
2743 }
2744 };
2745
2746 let mut ctx = match connect_and_check(&url).await {
2747 ContextOutcome::Ready(ctx) => ctx,
2748 ContextOutcome::UnsupportedVersion(e) => {
2749 crate::print_support_boundary_error("migrations repair partial-apply", &e);
2750 return 2;
2751 }
2752 ContextOutcome::RuntimeError(msg) => {
2753 eprintln!("djogi migrations repair partial-apply: pool: {msg}");
2754 return 1;
2755 }
2756 };
2757
2758 let lock_path = workspace.join(LOCK_FILE_NAME);
2759 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2760 Ok(g) => g,
2761 Err(e) => {
2762 eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
2763 return 1;
2764 }
2765 };
2766
2767 let app_label = app.unwrap_or("");
2768 let bucket = BucketKey {
2769 database: db_name,
2770 app: app_label.to_string(),
2771 };
2772
2773 match repair_partial_apply(
2774 &mut ctx,
2775 &guard,
2776 &bucket,
2777 version,
2778 workspace,
2779 resolution,
2780 note,
2781 RepairConfirmation::OperatorAcknowledged,
2782 )
2783 .await
2784 {
2785 Ok(report) => {
2786 render_repair_report(&report);
2787 0
2788 }
2789 Err(e) => {
2790 eprintln!("djogi migrations repair partial-apply: {e}");
2791 repair_error_exit_code(&e)
2792 }
2793 }
2794}
2795
2796pub fn repair_resume_partial_apply_cmd(
2802 version: &str,
2803 app: Option<&str>,
2804 database: Option<&str>,
2805 workspace: Option<PathBuf>,
2806 node_id: Option<u32>,
2807 single_node_dev: bool,
2808) -> ExitCode {
2809 let workspace = resolve_workspace(workspace);
2810 let runtime = match tokio::runtime::Builder::new_current_thread()
2811 .enable_all()
2812 .build()
2813 {
2814 Ok(r) => r,
2815 Err(e) => {
2816 eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
2817 return ExitCode::from(1);
2818 }
2819 };
2820 let exit = runtime.block_on(async {
2821 run_repair_resume_partial(&workspace, version, app, database, node_id, single_node_dev)
2822 .await
2823 });
2824 ExitCode::from(exit as u8)
2825}
2826
2827async fn run_repair_resume_partial(
2829 workspace: &Path,
2830 version: &str,
2831 app: Option<&str>,
2832 database: Option<&str>,
2833 node_id: Option<u32>,
2834 single_node_dev: bool,
2835) -> i32 {
2836 use djogi::config::DjogiConfig;
2837
2838 let config = match DjogiConfig::load_from_workspace(workspace) {
2839 Ok(c) => c,
2840 Err(e) => {
2841 eprintln!("djogi migrations repair resume-partial: config load: {e}");
2842 return 1;
2843 }
2844 };
2845
2846 let runner_identity = match crate::identity::resolve_identity(
2848 node_id,
2849 single_node_dev,
2850 &config.profile,
2851 "repair resume-partial",
2852 ) {
2853 Ok(resolved) => Some(resolved.into_runner_identity()),
2854 Err(e) => {
2855 let _ = crate::identity::print_identity_error("repair resume-partial", &e);
2856 return 2;
2857 }
2858 };
2859
2860 let db_name = resolve_database(database, &config);
2865 let url = match resolve_bucket_url(&config.database, &db_name) {
2866 Some(u) => u,
2867 None => {
2868 eprintln!(
2869 "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
2870 );
2871 return 2;
2872 }
2873 };
2874
2875 let mut ctx = match connect_and_check(&url).await {
2876 ContextOutcome::Ready(ctx) => ctx,
2877 ContextOutcome::UnsupportedVersion(e) => {
2878 crate::print_support_boundary_error("migrations repair resume-partial", &e);
2879 return 2;
2880 }
2881 ContextOutcome::RuntimeError(msg) => {
2882 eprintln!("djogi migrations repair resume-partial: pool: {msg}");
2883 return 1;
2884 }
2885 };
2886
2887 let lock_path = workspace.join(LOCK_FILE_NAME);
2888 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2889 Ok(g) => g,
2890 Err(e) => {
2891 eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
2892 return 1;
2893 }
2894 };
2895
2896 let app_label = app.unwrap_or("");
2897 let bucket = BucketKey {
2898 database: db_name,
2899 app: app_label.to_string(),
2900 };
2901
2902 let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
2908 Ok(p) => p,
2909 Err(e) => {
2910 eprintln!("djogi migrations repair resume-partial: load plan: {e}");
2911 return 2;
2912 }
2913 };
2914
2915 match repair_resume_partial_apply(
2916 &mut ctx,
2917 &guard,
2918 workspace,
2919 version,
2920 &plan,
2921 runner_identity,
2922 RepairConfirmation::OperatorAcknowledged,
2923 )
2924 .await
2925 {
2926 Ok(report) => {
2927 render_repair_report(&report);
2928 0
2929 }
2930 Err(e) => {
2931 eprintln!("djogi migrations repair resume-partial: {e}");
2932 repair_error_exit_code(&e)
2933 }
2934 }
2935}
2936
2937fn load_committed_plan_for_resume(
2951 workspace: &Path,
2952 bucket: &djogi::migrate::BucketKey,
2953 version: &str,
2954) -> Result<djogi::migrate::MigrationPlan, String> {
2955 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
2956 let plan_path = bucket_dir.join(format!("{version}.plan.json"));
2957 let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
2958 let stored: CliReplayPlan = serde_json::from_slice(&bytes)
2959 .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
2960 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
2961 return Err(format!(
2962 "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
2963 plan_path.display(),
2964 stored.format_version,
2965 ));
2966 }
2967 Ok(djogi::migrate::MigrationPlan {
2968 bucket: bucket.clone(),
2969 classification: stored.classification.into(),
2970 segments: stored
2971 .segments
2972 .into_iter()
2973 .map(|seg| djogi::migrate::Segment {
2974 kind: seg.kind.into(),
2975 statements: seg
2976 .statements
2977 .into_iter()
2978 .map(|stmt| djogi::migrate::OperationSql {
2979 label: stmt.label,
2980 up: stmt.up,
2981 down: String::new(),
2982 lossy: None,
2983 })
2984 .collect(),
2985 })
2986 .collect(),
2987 })
2988}
2989
2990pub fn repair_snapshot_rebuild_cmd(
2996 app: Option<&str>,
2997 database: Option<&str>,
2998 snapshot_path: Option<&Path>,
2999 workspace: Option<PathBuf>,
3000) -> ExitCode {
3001 let workspace = resolve_workspace(workspace);
3002 let runtime = match tokio::runtime::Builder::new_current_thread()
3003 .enable_all()
3004 .build()
3005 {
3006 Ok(r) => r,
3007 Err(e) => {
3008 eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
3009 return ExitCode::from(1);
3010 }
3011 };
3012 let exit = runtime.block_on(async {
3013 run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
3014 });
3015 ExitCode::from(exit as u8)
3016}
3017
3018async fn run_repair_snapshot_rebuild(
3020 workspace: &Path,
3021 app: Option<&str>,
3022 database: Option<&str>,
3023 snapshot_path: Option<&Path>,
3024) -> i32 {
3025 use djogi::config::DjogiConfig;
3026
3027 let config = match DjogiConfig::load_from_workspace(workspace) {
3028 Ok(c) => c,
3029 Err(e) => {
3030 eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
3031 return 1;
3032 }
3033 };
3034
3035 let db_name = resolve_database(database, &config);
3040 let url = match resolve_bucket_url(&config.database, &db_name) {
3041 Some(u) => u,
3042 None => {
3043 eprintln!(
3044 "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
3045 );
3046 return 2;
3047 }
3048 };
3049
3050 let mut ctx = match connect_and_check(&url).await {
3051 ContextOutcome::Ready(ctx) => ctx,
3052 ContextOutcome::UnsupportedVersion(e) => {
3053 crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
3054 return 2;
3055 }
3056 ContextOutcome::RuntimeError(msg) => {
3057 eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
3058 return 1;
3059 }
3060 };
3061
3062 let lock_path = workspace.join(LOCK_FILE_NAME);
3063 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3064 Ok(g) => g,
3065 Err(e) => {
3066 eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
3067 return 1;
3068 }
3069 };
3070
3071 let app_label = app.unwrap_or("");
3072 let bucket = BucketKey {
3073 database: db_name,
3074 app: app_label.to_string(),
3075 };
3076
3077 let snap_path = match snapshot_path {
3078 Some(p) => p.to_path_buf(),
3079 None => reconstruct_snapshot_path(workspace, &bucket),
3080 };
3081
3082 match repair_snapshot_rebuild(
3083 &mut ctx,
3084 &guard,
3085 &bucket,
3086 &snap_path,
3087 RepairConfirmation::OperatorAcknowledged,
3088 )
3089 .await
3090 {
3091 Ok(report) => {
3092 render_repair_report(&report);
3093 0
3094 }
3095 Err(e) => {
3096 eprintln!("djogi migrations repair snapshot-rebuild: {e}");
3097 repair_error_exit_code(&e)
3098 }
3099 }
3100}
3101
3102#[expect(
3120 clippy::too_many_arguments,
3121 reason = "CLI command entry point mirrors clap arguments explicitly"
3122)]
3123pub fn baseline_cmd(
3124 version: &str,
3125 description: &str,
3126 reason: &str,
3127 app: Option<&str>,
3128 database: Option<&str>,
3129 workspace: Option<PathBuf>,
3130 node_id: Option<u32>,
3131 single_node_dev: bool,
3132) -> ExitCode {
3133 if reason.trim().is_empty() {
3138 eprintln!(
3139 "djogi migrations baseline: --reason must not be empty; \
3140 supply a non-empty reason why this baseline is being established \
3141 (e.g. 'schema pre-exists from prior tooling'). \
3142 This is recorded in the ledger audit trail."
3143 );
3144 return ExitCode::from(2);
3145 }
3146
3147 let workspace = resolve_workspace(workspace);
3148 let runtime = match tokio::runtime::Builder::new_current_thread()
3149 .enable_all()
3150 .build()
3151 {
3152 Ok(r) => r,
3153 Err(e) => {
3154 eprintln!("djogi migrations baseline: tokio runtime: {e}");
3155 return ExitCode::from(1);
3156 }
3157 };
3158 let exit = runtime.block_on(async {
3159 run_baseline(
3160 &workspace,
3161 version,
3162 description,
3163 reason,
3164 app,
3165 database,
3166 node_id,
3167 single_node_dev,
3168 )
3169 .await
3170 });
3171 ExitCode::from(exit as u8)
3172}
3173
3174#[expect(
3185 clippy::too_many_arguments,
3186 reason = "baseline async body keeps CLI arguments explicit through validation and connection setup"
3187)]
3188async fn run_baseline(
3189 workspace: &Path,
3190 version: &str,
3191 description: &str,
3192 reason: &str,
3193 app: Option<&str>,
3194 database: Option<&str>,
3195 node_id: Option<u32>,
3196 single_node_dev: bool,
3197) -> i32 {
3198 use djogi::config::DjogiConfig;
3199
3200 let config = match DjogiConfig::load_from_workspace(workspace) {
3201 Ok(c) => c,
3202 Err(e) => {
3203 eprintln!("djogi migrations baseline: config load: {e}");
3204 return 1;
3205 }
3206 };
3207
3208 let runner_identity = match crate::identity::resolve_identity(
3210 node_id,
3211 single_node_dev,
3212 &config.profile,
3213 "baseline",
3214 ) {
3215 Ok(resolved) => Some(resolved.into_runner_identity()),
3216 Err(e) => {
3217 let _ = crate::identity::print_identity_error("baseline", &e);
3218 return 2;
3219 }
3220 };
3221
3222 let db_name = resolve_database(database, &config);
3227 let url = match resolve_bucket_url(&config.database, &db_name) {
3228 Some(u) => u,
3229 None => {
3230 eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
3231 return 2;
3232 }
3233 };
3234
3235 let mut ctx = match connect_and_check(&url).await {
3236 ContextOutcome::Ready(ctx) => ctx,
3237 ContextOutcome::UnsupportedVersion(e) => {
3238 crate::print_support_boundary_error("migrations baseline", &e);
3239 return 2;
3240 }
3241 ContextOutcome::RuntimeError(msg) => {
3242 eprintln!("djogi migrations baseline: pool: {msg}");
3243 return 1;
3244 }
3245 };
3246
3247 let lock_path = workspace.join(LOCK_FILE_NAME);
3248 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3249 Ok(g) => g,
3250 Err(e) => {
3251 eprintln!("djogi migrations baseline: workspace lock: {e}");
3252 return 1;
3253 }
3254 };
3255
3256 let app_label = app.unwrap_or("");
3257 let bucket = BucketKey {
3258 database: db_name,
3259 app: app_label.to_string(),
3260 };
3261
3262 let runner_ctx = RunnerCtx {
3263 bucket: bucket.clone(),
3264 version: version.to_string(),
3265 description: description.to_string(),
3266 checksum_up: String::new(),
3269 checksum_down: None,
3270 snapshot: None,
3274 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
3275 config: djogi::config::MigrateConfig {
3278 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
3279 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
3280 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
3281 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
3282 },
3283 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
3284 audit_pool: match djogi::migrate::resolve_audit_url(&config) {
3285 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
3286 Err(_) => None,
3287 },
3288 runner_identity,
3289 };
3290
3291 match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
3292 Ok(report) => {
3293 println!(
3294 "djogi migrations baseline: established baseline `{}` \
3295 (ledger_id={}) in {:.1}s",
3296 version,
3297 report.ledger_id,
3298 report.execution_time_ms as f64 / 1000.0
3299 );
3300 0
3301 }
3302 Err(e) => {
3303 eprintln!("djogi migrations baseline: {e}");
3304 baseline_error_exit_code(&e)
3305 }
3306 }
3307}
3308
3309fn baseline_error_exit_code(err: &RunnerError) -> i32 {
3327 match err {
3328 RunnerError::VersionAlreadyApplied { .. }
3348 | RunnerError::VersionCollisionNonTerminal { .. }
3349 | RunnerError::BaselineSnapshotShouldNotBeProvided
3350 | RunnerError::AdvisoryUnlockReturnedFalse { .. }
3351 | RunnerError::SnapshotPersistFailed { .. }
3352 | RunnerError::OutOfOrderRejected { .. } => 2,
3353 _ => 1,
3358 }
3359}
3360
3361#[cfg(test)]
3362mod tests {
3363 use super::*;
3364 use djogi::__bypass::RawAccessExt as _;
3365 use std::fs;
3366 use std::sync::atomic::{AtomicUsize, Ordering};
3367
3368 struct DatabaseUrlEnvGuard {
3369 _lock: std::sync::MutexGuard<'static, ()>,
3370 prior: Option<String>,
3371 }
3372
3373 impl DatabaseUrlEnvGuard {
3374 fn new() -> Self {
3375 Self {
3376 _lock: crate::test_env_lock(),
3377 prior: std::env::var("DATABASE_URL").ok(),
3378 }
3379 }
3380
3381 fn set(&self, value: &str) {
3382 unsafe { std::env::set_var("DATABASE_URL", value) };
3383 }
3384
3385 fn remove(&self) {
3386 unsafe { std::env::remove_var("DATABASE_URL") };
3387 }
3388 }
3389
3390 impl Drop for DatabaseUrlEnvGuard {
3391 fn drop(&mut self) {
3392 match &self.prior {
3393 Some(value) => unsafe { std::env::set_var("DATABASE_URL", value) },
3394 None => unsafe { std::env::remove_var("DATABASE_URL") },
3395 }
3396 }
3397 }
3398
3399 fn temp_workspace(tag: &str) -> std::path::PathBuf {
3400 static COUNTER: AtomicUsize = AtomicUsize::new(0);
3401 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3402 let nanos = std::time::SystemTime::now()
3403 .duration_since(std::time::UNIX_EPOCH)
3404 .unwrap()
3405 .as_nanos();
3406 let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
3407 fs::create_dir_all(&p).unwrap();
3408 p
3409 }
3410
3411 fn write_unreachable_config(work: &std::path::Path) {
3412 let toml = "[database]\nurl = \"postgres://localhost:1/djogi_unreachable\"\n\
3413 max_connections = 1\ndev_mode = false\n\
3414 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3415 fs::write(work.join("Djogi.toml"), toml).unwrap();
3416 }
3417
3418 fn without_database_url<T>(f: impl FnOnce() -> T) -> T {
3419 let env_guard = DatabaseUrlEnvGuard::new();
3420 env_guard.remove();
3421 f()
3422 }
3423
3424 #[test]
3425 fn database_url_env_guard_restores_prior_value() {
3426 let env_guard = DatabaseUrlEnvGuard::new();
3427 let expected = env_guard.prior.clone();
3428 let next = if expected.as_deref() == Some("postgres://from-env/test") {
3429 "postgres://temporary/test"
3430 } else {
3431 "postgres://from-env/test"
3432 };
3433 env_guard.set(next);
3434 drop(env_guard);
3435 assert_eq!(std::env::var("DATABASE_URL").ok(), expected);
3436 }
3437
3438 fn current_production_phase_zero_sql(tag: &str) -> String {
3439 let work = temp_workspace(tag);
3440 let lock_path = work.join(LOCK_FILE_NAME);
3441 let guard = acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT).expect("lock");
3442 let models: std::collections::BTreeMap<
3443 djogi::migrate::BucketKey,
3444 djogi::migrate::AppliedSchema,
3445 > = std::collections::BTreeMap::new();
3446 let apps = vec![AppLifecycle {
3447 label: "billing".to_string(),
3448 database: "main".to_string(),
3449 renamed_from: None,
3450 tombstone: false,
3451 }];
3452 let emitted = djogi::migrate::ensure_phase_zero_emitted(
3453 &work,
3454 &models,
3455 &apps,
3456 time::OffsetDateTime::now_utc(),
3457 &guard,
3458 )
3459 .expect("auto-emit Phase 0");
3460 let sql = fs::read_to_string(&emitted[0].up_sql_path).expect("read emitted Phase 0");
3461 drop(guard);
3462 let _ = fs::remove_dir_all(&work);
3463 sql
3464 }
3465
3466 fn markerless_seed_phase_zero_sql(tag: &str) -> String {
3467 let mut sql = current_production_phase_zero_sql(tag);
3468 sql.push_str("\nINSERT INTO heer.heer_nodes (id) VALUES (1);\n");
3469 sql
3470 }
3471
3472 fn phase_zero_with_seed_statement(tag: &str, statement: &str) -> String {
3473 let mut sql = current_production_phase_zero_sql(tag);
3474 sql.push('\n');
3475 sql.push_str(statement);
3476 sql.push('\n');
3477 sql
3478 }
3479
3480 fn extended_seed_statement_cases() -> [(&'static str, &'static str); 4] {
3481 [
3482 (
3483 "cte_insert",
3484 "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
3485 ),
3486 (
3487 "cte_delete",
3488 "WITH moved AS (DELETE FROM heer.heer_node_state RETURNING *) SELECT 1;",
3489 ),
3490 (
3491 "merge",
3492 "MERGE INTO heer.heer_nodes AS target USING incoming ON false WHEN NOT MATCHED THEN INSERT (id) VALUES (1);",
3493 ),
3494 (
3495 "copy_from",
3496 "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
3497 ),
3498 ]
3499 }
3500
3501 fn generated_stale_phase_zero_sql(tag: &str) -> String {
3502 let mut sql = current_production_phase_zero_sql(tag);
3503 sql.push_str(
3504 "\nALTER DATABASE \"mydb\" SET heer.node_id = '1';\n\
3505 ALTER DATABASE \"mydb\" SET heer.ranj_node_id = '1';\n\
3506 SET heer.node_id = '1';\n\
3507 SET heer.ranj_node_id = '1';\n",
3508 );
3509 sql
3510 }
3511
3512 fn seed_capable_phase_zero_sql() -> String {
3513 djogi::testing::phase_zero_sql_for_testing("main", true)
3514 .expect("compose seed-capable Phase 0")
3515 }
3516
3517 fn write_pending_json(
3518 path: &Path,
3519 database: &str,
3520 app: &str,
3521 version: &str,
3522 depends_on: &[&str],
3523 ) {
3524 let pending = PendingPlan {
3525 format_version: djogi::migrate::PENDING_FORMAT_VERSION.to_string(),
3526 bucket_database: database.to_string(),
3527 bucket_app: app.to_string(),
3528 version: version.to_string(),
3529 slug: "test".to_string(),
3530 model_snapshot: djogi::migrate::AppliedSchema {
3531 djogi_version: "0.1.0".to_string(),
3532 enums: std::collections::BTreeMap::new(),
3533 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3534 generated_at: "2026-06-06T00:00:00Z".to_string(),
3535 indexes: Vec::new(),
3536 models: std::collections::BTreeMap::new(),
3537 registered_apps: vec![app.to_string()],
3538 },
3539 checksum_up: "V1:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
3540 .to_string(),
3541 checksum_down: None,
3542 composed_at: "2026-06-06T00:00:00Z".to_string(),
3543 depends_on: depends_on.iter().map(|s| s.to_string()).collect(),
3544 };
3545 if let Some(parent) = path.parent() {
3546 fs::create_dir_all(parent).unwrap();
3547 }
3548 fs::write(path, serde_json::to_vec_pretty(&pending).unwrap()).unwrap();
3549 }
3550
3551 #[test]
3555 fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
3556 let work = temp_workspace("b1_discover");
3557 let billing_dir = work.join("migrations/main/billing");
3562 fs::create_dir_all(&billing_dir).unwrap();
3563 fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
3564 let global_dir = work.join("migrations/main/_global_");
3567 fs::create_dir_all(&global_dir).unwrap();
3568 fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
3569 let no_snap_dir = work.join("migrations/main/empty_app");
3573 fs::create_dir_all(&no_snap_dir).unwrap();
3574
3575 let buckets = discover_snapshot_buckets_on_disk(&work);
3576 let labels: std::collections::BTreeSet<&str> =
3577 buckets.iter().map(|b| b.app.as_str()).collect();
3578 assert!(
3579 labels.contains("billing"),
3580 "must include the renamed-from bucket: {labels:?}"
3581 );
3582 assert!(
3583 labels.contains(""),
3584 "must include the global bucket: {labels:?}"
3585 );
3586 assert!(
3587 !labels.contains("empty_app"),
3588 "must not include directories without a snapshot: {labels:?}"
3589 );
3590 let _ = fs::remove_dir_all(&work);
3591 }
3592
3593 #[test]
3598 fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
3599 let work = temp_workspace("a1_workspace_config");
3600 let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
3601 max_connections = 1\ndev_mode = false\n\
3602 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3603 fs::write(work.join("Djogi.toml"), toml).unwrap();
3604 let env_guard = DatabaseUrlEnvGuard::new();
3605 env_guard.remove();
3606 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
3607 assert_eq!(
3608 config.database.url,
3609 "postgres://discovered-by-workspace-flag/test"
3610 );
3611 assert_eq!(config.server.port, 1234);
3612 let _ = fs::remove_dir_all(&work);
3613 }
3614
3615 #[test]
3620 fn a1_round2_env_override_beats_workspace_toml() {
3621 let work = temp_workspace("a1r2_env_override");
3622 let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
3623 max_connections = 1\ndev_mode = false\n\
3624 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3625 fs::write(work.join("Djogi.toml"), toml).unwrap();
3626 let env_guard = DatabaseUrlEnvGuard::new();
3627 env_guard.set("postgres://from-env/test");
3628 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
3629 assert_eq!(
3630 config.database.url, "postgres://from-env/test",
3631 "env DATABASE_URL must win over workspace Djogi.toml"
3632 );
3633 let _ = fs::remove_dir_all(&work);
3634 }
3635
3636 #[test]
3637 fn apply_no_pending_is_identity_free_and_skips_pool_connect() {
3638 let work = temp_workspace("apply_no_pending");
3639 write_unreachable_config(&work);
3640
3641 let exit = without_database_url(|| {
3642 let runtime = tokio::runtime::Builder::new_current_thread()
3643 .enable_all()
3644 .build()
3645 .expect("runtime");
3646 runtime.block_on(run_apply(&work, &FakeMode::Real, None, false))
3647 });
3648
3649 assert_eq!(
3650 exit, 0,
3651 "no-pending apply must return before identity resolution or pool checkout"
3652 );
3653 let _ = fs::remove_dir_all(&work);
3654 }
3655
3656 #[test]
3657 fn discover_pending_plans_orders_phase_zero_before_normal_global() {
3658 let work = temp_workspace("discover_pending_phase_zero_first");
3659 write_pending_json(
3660 &djogi::migrate::pending_json_path(
3661 &work,
3662 &BucketKey {
3663 database: "main".to_string(),
3664 app: String::new(),
3665 },
3666 ),
3667 "main",
3668 "",
3669 "V20260606010101__later_global",
3670 &[],
3671 );
3672 write_pending_json(
3673 &djogi::migrate::phase_zero_pending_json_path(
3674 &work,
3675 "main",
3676 djogi::migrate::PHASE_ZERO_VERSION,
3677 ),
3678 "main",
3679 "",
3680 djogi::migrate::PHASE_ZERO_VERSION,
3681 &[],
3682 );
3683
3684 let discovered = discover_pending_plans(&work).expect("discover");
3685 assert_eq!(discovered.len(), 2);
3686 assert_eq!(
3687 discovered[0].plan.version,
3688 djogi::migrate::PHASE_ZERO_VERSION
3689 );
3690 assert!(discovered[0].is_phase_zero);
3691 assert_eq!(discovered[1].plan.version, "V20260606010101__later_global");
3692 let _ = fs::remove_dir_all(&work);
3693 }
3694
3695 #[test]
3699 fn discover_orders_same_version_buckets_by_depends_on() {
3700 let work = temp_workspace("discover_pending_depends_on");
3701 write_pending_json(
3702 &djogi::migrate::pending_json_path(
3703 &work,
3704 &BucketKey {
3705 database: "main".to_string(),
3706 app: "system".to_string(),
3707 },
3708 ),
3709 "main",
3710 "system",
3711 "V20260609000000__initial",
3712 &["users"],
3713 );
3714 write_pending_json(
3715 &djogi::migrate::pending_json_path(
3716 &work,
3717 &BucketKey {
3718 database: "main".to_string(),
3719 app: "users".to_string(),
3720 },
3721 ),
3722 "main",
3723 "users",
3724 "V20260609000000__initial",
3725 &[],
3726 );
3727
3728 let plans = discover_pending_plans(&work).expect("discovers");
3729 let apps: Vec<&str> = plans.iter().map(|p| p.bucket.app.as_str()).collect();
3730 assert_eq!(apps, ["users", "system"]);
3731 let _ = fs::remove_dir_all(&work);
3732 }
3733
3734 #[test]
3737 fn discover_orders_no_dependency_buckets_alphabetically() {
3738 let work = temp_workspace("discover_pending_alpha_tiebreak");
3739 for app in &["charlie", "bravo", "alpha"] {
3741 write_pending_json(
3742 &djogi::migrate::pending_json_path(
3743 &work,
3744 &BucketKey {
3745 database: "main".to_string(),
3746 app: app.to_string(),
3747 },
3748 ),
3749 "main",
3750 app,
3751 "V20260609000000__initial",
3752 &[],
3753 );
3754 }
3755
3756 let plans = discover_pending_plans(&work).expect("discovers");
3757 let apps: Vec<&str> = plans.iter().map(|p| p.bucket.app.as_str()).collect();
3758 assert_eq!(apps, ["alpha", "bravo", "charlie"]);
3759 let _ = fs::remove_dir_all(&work);
3760 }
3761
3762 #[test]
3765 fn discover_depends_on_missing_bucket_is_ignored() {
3766 let work = temp_workspace("discover_pending_deps_missing");
3767 write_pending_json(
3769 &djogi::migrate::pending_json_path(
3770 &work,
3771 &BucketKey {
3772 database: "main".to_string(),
3773 app: "system".to_string(),
3774 },
3775 ),
3776 "main",
3777 "system",
3778 "V20260609000000__initial",
3779 &["billing"],
3780 );
3781
3782 let plans = discover_pending_plans(&work).expect("discovers");
3783 assert_eq!(plans.len(), 1);
3784 assert_eq!(plans[0].bucket.app, "system");
3785 let _ = fs::remove_dir_all(&work);
3786 }
3787
3788 #[test]
3792 fn discover_depends_on_cycle_is_refused() {
3793 let work = temp_workspace("discover_pending_deps_cycle");
3794 write_pending_json(
3795 &djogi::migrate::pending_json_path(
3796 &work,
3797 &BucketKey {
3798 database: "main".to_string(),
3799 app: "alpha".to_string(),
3800 },
3801 ),
3802 "main",
3803 "alpha",
3804 "V20260609000000__initial",
3805 &["beta"],
3806 );
3807 write_pending_json(
3808 &djogi::migrate::pending_json_path(
3809 &work,
3810 &BucketKey {
3811 database: "main".to_string(),
3812 app: "beta".to_string(),
3813 },
3814 ),
3815 "main",
3816 "beta",
3817 "V20260609000000__initial",
3818 &["alpha"],
3819 );
3820
3821 let err = discover_pending_plans(&work).expect_err("cycle must be refused");
3822 assert!(
3823 err.contains("alpha") && err.contains("beta") && err.contains("cycle"),
3824 "error should name both apps and mention cycle, got: {err}"
3825 );
3826 let _ = fs::remove_dir_all(&work);
3827 }
3828
3829 #[test]
3838 fn single_bucket_with_invalid_depends_on_is_refused() {
3839 let make_singleton = |dep: &str| -> Vec<DiscoveredPendingPlan> {
3840 let plan = PendingPlan {
3841 format_version: djogi::migrate::PENDING_FORMAT_VERSION.to_string(),
3842 bucket_database: "main".to_string(),
3843 bucket_app: "system".to_string(),
3844 version: "V20260609000000__initial".to_string(),
3845 slug: "test".to_string(),
3846 model_snapshot: djogi::migrate::AppliedSchema {
3847 djogi_version: "0.1.0".to_string(),
3848 enums: std::collections::BTreeMap::new(),
3849 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3850 generated_at: "2026-06-09T00:00:00Z".to_string(),
3851 indexes: Vec::new(),
3852 models: std::collections::BTreeMap::new(),
3853 registered_apps: vec!["system".to_string()],
3854 },
3855 checksum_up: "V1:".to_string() + &"a".repeat(64),
3856 checksum_down: None,
3857 composed_at: "2026-06-09T00:00:00Z".to_string(),
3858 depends_on: vec![dep.to_string()],
3859 };
3860 vec![DiscoveredPendingPlan {
3861 path: PathBuf::from("target/djogi_pending/main/system.json"),
3862 bucket: BucketKey {
3863 database: "main".to_string(),
3864 app: "system".to_string(),
3865 },
3866 plan,
3867 is_phase_zero: false,
3868 }]
3869 };
3870
3871 for bad_label in ["../traversal", "has space"] {
3872 let err = order_pending_groups_by_dependencies(make_singleton(bad_label))
3873 .expect_err("invalid singleton depends_on label must be refused");
3874 assert!(
3875 err.contains("invalid depends_on label")
3876 && err.contains("main")
3877 && err.contains("system"),
3878 "[{bad_label}] error must name database, app, and the invalid label: {err}"
3879 );
3880 }
3881 }
3882
3883 #[djogi::djogi_test]
3890 async fn cross_bucket_fk_applies_in_dependency_order(mut ctx: djogi::context::DjogiContext) {
3891 static E2E_COUNTER: AtomicUsize = AtomicUsize::new(0);
3893 let n = E2E_COUNTER.fetch_add(1, Ordering::SeqCst);
3894 let users_table = format!("e2e_users_{n}");
3895 let event_log_table = format!("e2e_event_log_{n}");
3896
3897 let work = temp_workspace("cross-bucket-fk-e2e");
3898 let guard = djogi::migrate::acquire_workspace_lock(
3899 &work.join(LOCK_FILE_NAME),
3900 std::time::Duration::from_secs(5),
3901 )
3902 .expect("lock workspace");
3903
3904 let mut models: std::collections::BTreeMap<
3906 djogi::migrate::BucketKey,
3907 djogi::migrate::AppliedSchema,
3908 > = std::collections::BTreeMap::new();
3909
3910 let users_bucket = BucketKey {
3911 database: "main".into(),
3912 app: "users".into(),
3913 };
3914 let system_bucket = BucketKey {
3915 database: "main".into(),
3916 app: "system".into(),
3917 };
3918
3919 {
3920 let mut users_schema = djogi::migrate::AppliedSchema {
3921 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
3922 enums: std::collections::BTreeMap::new(),
3923 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3924 generated_at: "2026-06-10T00:00:00Z".to_string(),
3925 indexes: Vec::new(),
3926 models: std::collections::BTreeMap::new(),
3927 registered_apps: vec!["users".to_string()],
3928 };
3929 users_schema.models.insert(
3930 users_table.clone(),
3931 djogi::migrate::TableSchema {
3932 app: Some("users".to_string()),
3933 columns: vec![djogi::migrate::ColumnSchema {
3934 name: "id".to_string(),
3935 sql_type: "BIGINT".to_string(),
3936 nullable: false,
3937 default_sql: Some("heerid_next_desc()".to_string()),
3938 ..default_col()
3939 }],
3940 primary_key: djogi::migrate::PrimaryKeySchema {
3941 columns: vec!["id".to_string()],
3942 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
3943 },
3944 table: users_table.clone(),
3945 ..default_table()
3946 },
3947 );
3948 models.insert(users_bucket.clone(), users_schema);
3949 }
3950
3951 {
3952 let mut system_schema = djogi::migrate::AppliedSchema {
3953 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
3954 enums: std::collections::BTreeMap::new(),
3955 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3956 generated_at: "2026-06-10T00:00:00Z".to_string(),
3957 indexes: Vec::new(),
3958 models: std::collections::BTreeMap::new(),
3959 registered_apps: vec!["system".to_string()],
3960 };
3961 system_schema.models.insert(
3962 event_log_table.clone(),
3963 djogi::migrate::TableSchema {
3964 app: Some("system".to_string()),
3965 columns: vec![
3966 djogi::migrate::ColumnSchema {
3967 name: "id".to_string(),
3968 sql_type: "BIGINT".to_string(),
3969 nullable: false,
3970 default_sql: Some("heerid_next_desc()".to_string()),
3971 ..default_col()
3972 },
3973 djogi::migrate::ColumnSchema {
3974 name: "user_id".to_string(),
3975 sql_type: "BIGINT".to_string(),
3976 nullable: false,
3977 foreign_key: Some(djogi::migrate::ForeignKeySchema {
3978 deferrable: false,
3979 initially_deferred: false,
3980 on_delete: djogi::migrate::OnDeleteSchema::Restrict,
3981 ref_column: "id".to_string(),
3982 ref_table: users_table.clone(),
3983 }),
3984 ..default_col()
3985 },
3986 ],
3987 primary_key: djogi::migrate::PrimaryKeySchema {
3988 columns: vec!["id".to_string()],
3989 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
3990 },
3991 table: event_log_table.clone(),
3992 ..default_table()
3993 },
3994 );
3995 models.insert(system_bucket.clone(), system_schema);
3996 }
3997
3998 let mut snapshots = std::collections::BTreeMap::new();
4000 for bucket in [&users_bucket, &system_bucket] {
4001 snapshots.insert(
4002 bucket.clone(),
4003 djogi::migrate::AppliedSchema {
4004 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4005 enums: std::collections::BTreeMap::new(),
4006 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4007 generated_at: "2026-06-10T00:00:00Z".to_string(),
4008 indexes: Vec::new(),
4009 models: std::collections::BTreeMap::new(),
4010 registered_apps: vec![bucket.app.clone()],
4011 },
4012 );
4013 }
4014
4015 let apps = vec![
4016 djogi::migrate::AppLifecycle {
4017 label: "users".into(),
4018 database: "main".into(),
4019 renamed_from: None,
4020 tombstone: false,
4021 },
4022 djogi::migrate::AppLifecycle {
4023 label: "system".into(),
4024 database: "main".into(),
4025 renamed_from: None,
4026 tombstone: false,
4027 },
4028 ];
4029
4030 let compose_req = djogi::migrate::ComposeRequest {
4032 workspace_root: &work,
4033 models: &models,
4034 snapshots: &snapshots,
4035 apps: &apps,
4036 name: "cross-bucket-fk",
4037 allow_destructive: false,
4038 force_overwrite: false,
4039 now: time::OffsetDateTime::UNIX_EPOCH
4040 + time::Duration::days(19726)
4041 + time::Duration::seconds(0),
4042 _guard: &guard,
4043 pk_flip_join_table_option: None,
4044 skip_phase_zero_auto_emit: false,
4049 };
4050
4051 let compose_report = djogi::migrate::compose(compose_req).expect("compose");
4052 assert!(
4053 !compose_report.composed_buckets.is_empty(),
4054 "compose should produce delta buckets"
4055 );
4056
4057 drop(guard);
4063
4064 let composed_version = &compose_report.composed_buckets[0].version;
4066
4067 let test_db = ctx
4071 .raw_scalar::<String>("SELECT current_database()", &[])
4072 .await
4073 .expect("current_database");
4074 let admin_url = std::env::var("DATABASE_URL").expect(
4075 "DATABASE_URL must be set for djogi_test \
4076 (e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
4077 );
4078 let test_db_url = replace_db_in_url(&admin_url, &test_db)
4079 .expect("construct per-test database URL from DATABASE_URL");
4080
4081 fs::write(
4083 work.join("Djogi.toml"),
4084 format!(
4085 "[database]\nurl = \"{test_db_url}\"\n\
4086 max_connections = 1\ndev_mode = false\n\
4087 [server]\nhost = \"127.0.0.1\"\nport = 8080\n"
4088 ),
4089 )
4090 .unwrap();
4091
4092 let db_url_guard = DatabaseUrlEnvGuard::new();
4100 db_url_guard.set(&test_db_url);
4101
4102 let exit = {
4107 let work = work.clone();
4108 tokio::task::spawn_blocking(move || {
4109 tokio::runtime::Builder::new_current_thread()
4110 .enable_all()
4111 .build()
4112 .expect("runtime")
4113 .block_on(run_apply(
4114 &work,
4115 &FakeMode::Real,
4116 None,
4117 true, ))
4119 })
4120 .await
4121 .expect("spawn_blocking join")
4122 };
4123
4124 drop(db_url_guard);
4127
4128 assert_eq!(
4129 exit, 0,
4130 "apply should succeed (tables created in FK dependency order)"
4131 );
4132
4133 let fk_rows = ctx
4135 .raw_rows(
4136 "SELECT c.conname \
4137 FROM pg_constraint c \
4138 JOIN pg_class r ON r.oid = c.conrelid \
4139 JOIN pg_class f ON f.oid = c.confrelid \
4140 WHERE r.relname = $1 AND c.contype = 'f' AND f.relname = $2",
4141 &[&event_log_table.as_str(), &users_table.as_str()],
4142 )
4143 .await
4144 .expect("query pg_constraint");
4145 assert!(
4146 !fk_rows.is_empty(),
4147 "FK constraint should exist from {event_log_table} → {users_table}"
4148 );
4149
4150 let ledger_rows = ctx
4154 .raw_rows(
4155 "SELECT app_label FROM djogi_schema_migrations \
4156 WHERE version = $1 AND status = 'applied'",
4157 &[&composed_version.as_str()],
4158 )
4159 .await
4160 .expect("query ledger");
4161 assert_eq!(
4162 ledger_rows.len(),
4163 2,
4164 "ledger should have exactly 2 rows for composed version {composed_version} \
4165 (users + system), got {} rows",
4166 ledger_rows.len()
4167 );
4168 let app_labels: Vec<String> = ledger_rows
4169 .iter()
4170 .map(|row| row.try_get(0).expect("decode app_label"))
4171 .collect();
4172 assert!(
4173 app_labels.contains(&"users".to_string()),
4174 "ledger should have 'users' bucket: {app_labels:?}"
4175 );
4176 assert!(
4177 app_labels.contains(&"system".to_string()),
4178 "ledger should have 'system' bucket: {app_labels:?}"
4179 );
4180
4181 let ordered_rows = ctx
4183 .raw_rows(
4184 "SELECT app_label, id FROM djogi_schema_migrations \
4185 WHERE version = $1 AND status = 'applied' ORDER BY id",
4186 &[&composed_version.as_str()],
4187 )
4188 .await
4189 .expect("query ledger ordered");
4190 assert_eq!(ordered_rows[0].try_get::<_, String>(0).unwrap(), "users");
4191 assert_eq!(ordered_rows[1].try_get::<_, String>(0).unwrap(), "system");
4192
4193 let _ = fs::remove_dir_all(&work);
4194
4195 }
4201
4202 #[djogi::djogi_test]
4206 async fn shared_enum_multi_slice_applies(mut ctx: djogi::context::DjogiContext) {
4207 static E2E_COUNTER: AtomicUsize = AtomicUsize::new(0);
4209 let n = E2E_COUNTER.fetch_add(1, Ordering::SeqCst);
4210 let posts_table = format!("e2e_posts_{n}");
4211 let comments_table = format!("e2e_comments_{n}");
4212
4213 let work = temp_workspace("shared-enum-e2e");
4214 let guard = djogi::migrate::acquire_workspace_lock(
4215 &work.join(LOCK_FILE_NAME),
4216 std::time::Duration::from_secs(5),
4217 )
4218 .expect("lock workspace");
4219
4220 let mut models: std::collections::BTreeMap<
4223 djogi::migrate::BucketKey,
4224 djogi::migrate::AppliedSchema,
4225 > = std::collections::BTreeMap::new();
4226
4227 let alpha_bucket = BucketKey {
4228 database: "main".into(),
4229 app: "alpha".into(),
4230 };
4231 let beta_bucket = BucketKey {
4232 database: "main".into(),
4233 app: "beta".into(),
4234 };
4235
4236 let mood_enum = djogi::migrate::schema::EnumSchema {
4238 name: "mood".to_string(),
4239 variants: vec!["happy".to_string(), "sad".to_string()],
4240 };
4241
4242 {
4244 let mut alpha_schema = djogi::migrate::AppliedSchema {
4245 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4246 enums: std::collections::BTreeMap::new(),
4247 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4248 generated_at: "2026-06-10T00:00:00Z".to_string(),
4249 indexes: Vec::new(),
4250 models: std::collections::BTreeMap::new(),
4251 registered_apps: vec!["alpha".to_string()],
4252 };
4253 alpha_schema
4254 .enums
4255 .insert("mood".to_string(), mood_enum.clone());
4256 alpha_schema.models.insert(
4257 posts_table.clone(),
4258 djogi::migrate::TableSchema {
4259 app: Some("alpha".to_string()),
4260 columns: vec![
4261 djogi::migrate::ColumnSchema {
4262 name: "id".to_string(),
4263 sql_type: "BIGINT".to_string(),
4264 nullable: false,
4265 default_sql: Some("heerid_next_desc()".to_string()),
4266 ..default_col()
4267 },
4268 djogi::migrate::ColumnSchema {
4269 name: "mood".to_string(),
4270 sql_type: "mood".to_string(),
4271 nullable: true,
4272 ..default_col()
4273 },
4274 ],
4275 primary_key: djogi::migrate::PrimaryKeySchema {
4276 columns: vec!["id".to_string()],
4277 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
4278 },
4279 table: posts_table.clone(),
4280 ..default_table()
4281 },
4282 );
4283 models.insert(alpha_bucket.clone(), alpha_schema);
4284 }
4285
4286 {
4288 let mut beta_schema = djogi::migrate::AppliedSchema {
4289 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4290 enums: std::collections::BTreeMap::new(),
4291 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4292 generated_at: "2026-06-10T00:00:00Z".to_string(),
4293 indexes: Vec::new(),
4294 models: std::collections::BTreeMap::new(),
4295 registered_apps: vec!["beta".to_string()],
4296 };
4297 beta_schema.enums.insert("mood".to_string(), mood_enum);
4298 beta_schema.models.insert(
4299 comments_table.clone(),
4300 djogi::migrate::TableSchema {
4301 app: Some("beta".to_string()),
4302 columns: vec![
4303 djogi::migrate::ColumnSchema {
4304 name: "id".to_string(),
4305 sql_type: "BIGINT".to_string(),
4306 nullable: false,
4307 default_sql: Some("heerid_next_desc()".to_string()),
4308 ..default_col()
4309 },
4310 djogi::migrate::ColumnSchema {
4311 name: "post_id".to_string(),
4312 sql_type: "BIGINT".to_string(),
4313 nullable: false,
4314 foreign_key: Some(djogi::migrate::ForeignKeySchema {
4315 deferrable: false,
4316 initially_deferred: false,
4317 on_delete: djogi::migrate::OnDeleteSchema::Restrict,
4318 ref_column: "id".to_string(),
4319 ref_table: posts_table.clone(),
4320 }),
4321 ..default_col()
4322 },
4323 djogi::migrate::ColumnSchema {
4324 name: "author_mood".to_string(),
4325 sql_type: "mood".to_string(),
4326 nullable: true,
4327 ..default_col()
4328 },
4329 ],
4330 primary_key: djogi::migrate::PrimaryKeySchema {
4331 columns: vec!["id".to_string()],
4332 kind: djogi::migrate::PkKindSchema::HeerIdRecencyBiased,
4333 },
4334 table: comments_table.clone(),
4335 ..default_table()
4336 },
4337 );
4338 models.insert(beta_bucket.clone(), beta_schema);
4339 }
4340
4341 let mut snapshots = std::collections::BTreeMap::new();
4343 for bucket in [&alpha_bucket, &beta_bucket] {
4344 snapshots.insert(
4345 bucket.clone(),
4346 djogi::migrate::AppliedSchema {
4347 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4348 enums: std::collections::BTreeMap::new(),
4349 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
4350 generated_at: "2026-06-10T00:00:00Z".to_string(),
4351 indexes: Vec::new(),
4352 models: std::collections::BTreeMap::new(),
4353 registered_apps: vec![bucket.app.clone()],
4354 },
4355 );
4356 }
4357
4358 let apps = vec![
4359 djogi::migrate::AppLifecycle {
4360 label: "alpha".into(),
4361 database: "main".into(),
4362 renamed_from: None,
4363 tombstone: false,
4364 },
4365 djogi::migrate::AppLifecycle {
4366 label: "beta".into(),
4367 database: "main".into(),
4368 renamed_from: None,
4369 tombstone: false,
4370 },
4371 ];
4372
4373 let compose_req = djogi::migrate::ComposeRequest {
4375 workspace_root: &work,
4376 models: &models,
4377 snapshots: &snapshots,
4378 apps: &apps,
4379 name: "shared-enum-multi-slice",
4380 allow_destructive: false,
4381 force_overwrite: false,
4382 now: time::OffsetDateTime::UNIX_EPOCH
4383 + time::Duration::days(19726)
4384 + time::Duration::seconds(0),
4385 _guard: &guard,
4386 pk_flip_join_table_option: None,
4387 skip_phase_zero_auto_emit: false,
4388 };
4389
4390 let compose_report = djogi::migrate::compose(compose_req).expect("compose");
4391 assert!(
4392 !compose_report.composed_buckets.is_empty(),
4393 "compose should produce delta buckets"
4394 );
4395
4396 drop(guard);
4398
4399 let composed_version = &compose_report.composed_buckets[0].version;
4401
4402 let test_db = ctx
4404 .raw_scalar::<String>("SELECT current_database()", &[])
4405 .await
4406 .expect("current_database");
4407 let admin_url = std::env::var("DATABASE_URL").expect(
4408 "DATABASE_URL must be set for djogi_test \
4409 (e.g. postgres://djogi:djogi@localhost:5432/djogi_test)",
4410 );
4411 let test_db_url = replace_db_in_url(&admin_url, &test_db)
4412 .expect("construct per-test database URL from DATABASE_URL");
4413
4414 fs::write(
4416 work.join("Djogi.toml"),
4417 format!(
4418 "[database]\nurl = \"{test_db_url}\"\n\
4419 max_connections = 1\ndev_mode = false\n\
4420 [server]\nhost = \"127.0.0.1\"\nport = 8080\n"
4421 ),
4422 )
4423 .unwrap();
4424
4425 let db_url_guard = DatabaseUrlEnvGuard::new();
4426 db_url_guard.set(&test_db_url);
4427
4428 let exit = {
4430 let work = work.clone();
4431 tokio::task::spawn_blocking(move || {
4432 tokio::runtime::Builder::new_current_thread()
4433 .enable_all()
4434 .build()
4435 .expect("runtime")
4436 .block_on(run_apply(
4437 &work,
4438 &FakeMode::Real,
4439 None,
4440 true, ))
4442 })
4443 .await
4444 .expect("spawn_blocking join")
4445 };
4446
4447 drop(db_url_guard);
4448
4449 assert_eq!(
4450 exit, 0,
4451 "apply should succeed (enum created once, tables in FK order)"
4452 );
4453
4454 let mood_count = ctx
4456 .raw_scalar::<i64>(
4457 "SELECT count(*) FROM pg_type WHERE typname = $1",
4458 &[&"mood"],
4459 )
4460 .await
4461 .expect("query pg_type for mood");
4462 assert_eq!(
4463 mood_count, 1,
4464 "exactly one mood enum type should exist in pg_type, got {mood_count}"
4465 );
4466
4467 let table_count = ctx
4469 .raw_scalar::<i64>(
4470 "SELECT count(*) FROM pg_class WHERE relname = $1 OR relname = $2",
4471 &[&posts_table.as_str(), &comments_table.as_str()],
4472 )
4473 .await
4474 .expect("query pg_class for tables");
4475 assert_eq!(
4476 table_count, 2,
4477 "both tables should exist ({posts_table}, {comments_table}), got {table_count}"
4478 );
4479
4480 let ledger_rows = ctx
4482 .raw_rows(
4483 "SELECT app_label FROM djogi_schema_migrations \
4484 WHERE version = $1 AND status = 'applied'",
4485 &[&composed_version.as_str()],
4486 )
4487 .await
4488 .expect("query ledger");
4489 assert_eq!(
4490 ledger_rows.len(),
4491 2,
4492 "ledger should have exactly 2 rows for composed version {composed_version} \
4493 (alpha + beta), got {} rows",
4494 ledger_rows.len()
4495 );
4496
4497 let _ = fs::remove_dir_all(&work);
4498 }
4499
4500 fn replace_db_in_url(url: &str, new_db: &str) -> Option<String> {
4504 let body = url
4505 .strip_prefix("postgres://")
4506 .or_else(|| url.strip_prefix("postgresql://"))?;
4507 let scheme = if url.starts_with("postgres://") {
4508 "postgres://"
4509 } else {
4510 "postgresql://"
4511 };
4512 let mut idx = 0usize;
4513 let body_bytes = body.as_bytes();
4514 while idx < body_bytes.len() && body_bytes[idx] != b'/' {
4515 idx += 1;
4516 }
4517 if idx >= body_bytes.len() {
4518 return None;
4519 }
4520 let authority = &body[..idx];
4521 let path_start = idx + 1;
4522 let mut path_end = path_start;
4523 while path_end < body_bytes.len() && body_bytes[path_end] != b'?' {
4524 path_end += 1;
4525 }
4526 let trailing = &body[path_end..];
4527 Some(format!("{scheme}{authority}/{new_db}{trailing}"))
4528 }
4529
4530 fn default_col() -> djogi::migrate::ColumnSchema {
4531 djogi::migrate::ColumnSchema {
4532 check: None,
4533 codec: None,
4534 comment: None,
4535 default_sql: None,
4536 foreign_key: None,
4537 generated: None,
4538 identity: None,
4539 index_type: None,
4540 indexed: false,
4541 max_length: None,
4542 name: "".to_string(),
4543 nullable: false,
4544 on_delete: None,
4545 outbox_exclude: false,
4546 rationale: None,
4547 relation_kind: None,
4548 renamed_from: None,
4549 sequence_within: None,
4550 sql_type: "".to_string(),
4551 unique: false,
4552 type_change_using: None,
4553 }
4554 }
4555
4556 fn default_table() -> djogi::migrate::TableSchema {
4557 djogi::migrate::TableSchema {
4558 app: None,
4559 columns: Vec::new(),
4560 exclusion_constraints: Vec::new(),
4561 fts: None,
4562 is_through: false,
4563 moved_from_app: None,
4564 partition: None,
4565 primary_key: djogi::migrate::PrimaryKeySchema {
4566 columns: Vec::new(),
4567 kind: djogi::migrate::PkKindSchema::Composite,
4568 },
4569 rationale: None,
4570 renamed_from: None,
4571 rls_enabled: false,
4572 table: "".to_string(),
4573 table_comment: None,
4574 storage_params: None,
4575 tablespace: None,
4576 tenant_key: None,
4577 }
4578 }
4579
4580 #[test]
4581 fn discover_pending_plans_refuses_malformed_pending_json() {
4582 let work = temp_workspace("discover_pending_malformed");
4583 let path = djogi::migrate::pending_json_path(
4584 &work,
4585 &BucketKey {
4586 database: "main".to_string(),
4587 app: String::new(),
4588 },
4589 );
4590 fs::create_dir_all(path.parent().unwrap()).unwrap();
4591 fs::write(&path, b"{ not json").unwrap();
4592
4593 let err = discover_pending_plans(&work).expect_err("malformed pending must refuse");
4594 assert!(err.contains("parse pending JSON"));
4595 let _ = fs::remove_dir_all(&work);
4596 }
4597
4598 #[test]
4599 fn discover_pending_plans_refuses_hidden_phase_zero_database_mismatch() {
4600 let work = temp_workspace("discover_pending_phase_zero_db_mismatch");
4601 write_pending_json(
4602 &djogi::migrate::phase_zero_pending_json_path(
4603 &work,
4604 "main",
4605 djogi::migrate::PHASE_ZERO_VERSION,
4606 ),
4607 "other_db",
4608 "",
4609 djogi::migrate::PHASE_ZERO_VERSION,
4610 &[],
4611 );
4612
4613 let err = discover_pending_plans(&work).expect_err("hidden Phase 0 mismatch must refuse");
4614 assert!(
4615 err.contains("expected main from path"),
4616 "unexpected error: {err}"
4617 );
4618 let _ = fs::remove_dir_all(&work);
4619 }
4620
4621 #[test]
4622 fn discover_pending_plans_refuses_normal_global_phase_zero_pending() {
4623 let work = temp_workspace("discover_pending_normal_global_phase_zero");
4624 let path = djogi::migrate::pending_json_path(
4625 &work,
4626 &BucketKey {
4627 database: "main".to_string(),
4628 app: String::new(),
4629 },
4630 );
4631 write_pending_json(&path, "main", "", djogi::migrate::PHASE_ZERO_VERSION, &[]);
4632
4633 let err = discover_pending_plans(&work).expect_err("normal-global Phase 0 must refuse");
4634 assert!(
4635 err.contains("Phase 0") && err.contains(".phase_zero"),
4636 "unexpected error: {err}"
4637 );
4638 let _ = fs::remove_dir_all(&work);
4639 }
4640
4641 #[test]
4642 fn discover_pending_plans_refuses_normal_pending_app_mismatch() {
4643 let work = temp_workspace("discover_pending_normal_app_mismatch");
4644 let path = djogi::migrate::pending_json_path(
4645 &work,
4646 &BucketKey {
4647 database: "main".to_string(),
4648 app: "billing".to_string(),
4649 },
4650 );
4651 write_pending_json(&path, "main", "audit", "V20260606010101__mismatch", &[]);
4652
4653 let err = discover_pending_plans(&work).expect_err("normal app mismatch must refuse");
4654 assert!(
4655 err.contains("expected billing from path"),
4656 "unexpected error: {err}"
4657 );
4658 let _ = fs::remove_dir_all(&work);
4659 }
4660
4661 #[test]
4662 fn discover_pending_plans_refuses_noncanonical_normal_pending_filename() {
4663 let work = temp_workspace("discover_pending_noncanonical_filename");
4664 let path = work.join("target/djogi_pending/main/bad-name.json");
4665 write_pending_json(&path, "main", "bad-name", "V20260606010101__bad_name", &[]);
4666
4667 let err = discover_pending_plans(&work).expect_err("non-canonical filename must refuse");
4668 assert!(
4669 err.contains("non-canonical app filename"),
4670 "unexpected error: {err}"
4671 );
4672 let _ = fs::remove_dir_all(&work);
4673 }
4674
4675 #[test]
4676 fn load_verified_pending_for_apply_refuses_changed_artifact() {
4677 let work = temp_workspace("apply_pending_changed_after_discovery");
4678 let path = djogi::migrate::pending_json_path(
4679 &work,
4680 &BucketKey {
4681 database: "main".to_string(),
4682 app: String::new(),
4683 },
4684 );
4685 write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
4686 let discovered = discover_pending_plans(&work).expect("discover");
4687 fs::write(
4688 &path,
4689 serde_json::to_vec_pretty(&PendingPlan {
4690 version: "V20260606010102__changed".to_string(),
4691 ..discovered[0].plan.clone()
4692 })
4693 .unwrap(),
4694 )
4695 .unwrap();
4696
4697 let err = load_verified_pending_for_apply(&discovered[0])
4698 .expect_err("apply must refuse a changed pending artifact");
4699 assert!(
4700 err.contains("changed after discovery"),
4701 "unexpected error: {err}"
4702 );
4703 let _ = fs::remove_dir_all(&work);
4704 }
4705
4706 #[test]
4707 fn reconcile_pending_plans_after_lock_refuses_added_artifact() {
4708 let work = temp_workspace("apply_pending_added_before_lock");
4709 let path = djogi::migrate::pending_json_path(
4710 &work,
4711 &BucketKey {
4712 database: "main".to_string(),
4713 app: String::new(),
4714 },
4715 );
4716 write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
4717 let discovered = discover_pending_plans(&work).expect("discover");
4718 write_pending_json(
4719 &djogi::migrate::phase_zero_pending_json_path(
4720 &work,
4721 "main",
4722 djogi::migrate::PHASE_ZERO_VERSION,
4723 ),
4724 "main",
4725 "",
4726 djogi::migrate::PHASE_ZERO_VERSION,
4727 &[],
4728 );
4729
4730 let err = reconcile_pending_plans_after_lock(&work, &discovered)
4731 .expect_err("locked reconciliation must refuse a changed pending set");
4732 assert!(
4733 err.contains("changed while waiting for the workspace lock"),
4734 "unexpected error: {err}"
4735 );
4736 let _ = fs::remove_dir_all(&work);
4737 }
4738
4739 #[test]
4740 fn reconcile_pending_plans_after_lock_accepts_unchanged_set() {
4741 let work = temp_workspace("apply_pending_stable_under_lock");
4742 let path = djogi::migrate::pending_json_path(
4743 &work,
4744 &BucketKey {
4745 database: "main".to_string(),
4746 app: String::new(),
4747 },
4748 );
4749 write_pending_json(&path, "main", "", "V20260606010101__stable", &[]);
4750 let discovered = discover_pending_plans(&work).expect("discover");
4751
4752 let locked = reconcile_pending_plans_after_lock(&work, &discovered)
4753 .expect("unchanged set must reconcile");
4754 assert_eq!(locked, discovered);
4755 let _ = fs::remove_dir_all(&work);
4756 }
4757
4758 #[test]
4759 fn repair_checksum_drift_is_identity_free() {
4760 let work = temp_workspace("repair_checksum_identity_free");
4761 write_unreachable_config(&work);
4762
4763 let exit = without_database_url(|| {
4764 repair_checksum_drift_cmd(
4765 "V20260601000000__repair_checksum",
4766 None,
4767 None,
4768 Some("V1:0000000000000000000000000000000000000000000000000000000000000000"),
4769 None,
4770 Some(work.clone()),
4771 )
4772 });
4773
4774 assert_eq!(
4775 exit,
4776 ExitCode::from(1),
4777 "checksum-drift should reach pool connection without shared identity validation"
4778 );
4779 let _ = fs::remove_dir_all(&work);
4780 }
4781
4782 #[test]
4783 fn repair_partial_apply_is_identity_free() {
4784 let work = temp_workspace("repair_partial_identity_free");
4785 write_unreachable_config(&work);
4786
4787 let exit = without_database_url(|| {
4788 repair_partial_apply_cmd(
4789 "V20260601000000__repair_partial",
4790 PartialApplyResolution::MarkRolledBack,
4791 "operator confirmed rollback",
4792 None,
4793 None,
4794 Some(work.clone()),
4795 )
4796 });
4797
4798 assert_eq!(
4799 exit,
4800 ExitCode::from(1),
4801 "partial-apply should reach pool connection without shared identity validation"
4802 );
4803 let _ = fs::remove_dir_all(&work);
4804 }
4805
4806 #[test]
4807 fn repair_snapshot_rebuild_is_identity_free() {
4808 let work = temp_workspace("repair_snapshot_identity_free");
4809 write_unreachable_config(&work);
4810
4811 let exit = without_database_url(|| {
4812 repair_snapshot_rebuild_cmd(None, None, None, Some(work.clone()))
4813 });
4814
4815 assert_eq!(
4816 exit,
4817 ExitCode::from(1),
4818 "snapshot-rebuild should reach pool connection without shared identity validation"
4819 );
4820 let _ = fs::remove_dir_all(&work);
4821 }
4822
4823 #[test]
4835 fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
4836 use djogi::migrate::projection::BucketKey;
4837 use djogi::migrate::schema::{
4838 ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
4839 };
4840 use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
4841 use std::collections::BTreeMap;
4842
4843 let work = temp_workspace("b1r2_compose_uses_discovery");
4844
4845 let billing_bucket = BucketKey {
4848 database: "main".into(),
4849 app: "billing".into(),
4850 };
4851 let mut billing_snap = AppliedSchema {
4852 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
4853 enums: BTreeMap::new(),
4854 format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
4855 generated_at: "2026-04-25T00:00:00Z".to_string(),
4856 indexes: Vec::new(),
4857 models: BTreeMap::new(),
4858 registered_apps: vec!["billing".to_string()],
4859 };
4860 billing_snap.models.insert(
4861 "widgets".to_string(),
4862 TableSchema {
4863 app: Some("billing".to_string()),
4864 columns: vec![ColumnSchema {
4865 check: None,
4866 codec: None,
4867 comment: None,
4868 default_sql: Some("heerid_next_desc()".to_string()),
4869 foreign_key: None,
4870 generated: None,
4871 identity: None,
4872 index_type: None,
4873 indexed: false,
4874 max_length: None,
4875 name: "id".to_string(),
4876 nullable: false,
4877 on_delete: None,
4878 outbox_exclude: false,
4879 rationale: None,
4880 relation_kind: None,
4881 renamed_from: None,
4882 sequence_within: None,
4883 sql_type: "BIGINT".to_string(),
4884 unique: false,
4885 type_change_using: None,
4886 }],
4887 exclusion_constraints: Vec::new(),
4888 fts: None,
4889 is_through: false,
4890 moved_from_app: None,
4891 partition: None,
4892 primary_key: PrimaryKeySchema {
4893 columns: vec!["id".to_string()],
4894 kind: PkKindSchema::HeerIdRecencyBiased,
4895 },
4896 rationale: None,
4897 renamed_from: None,
4898 rls_enabled: false,
4899 table: "widgets".to_string(),
4900 table_comment: None,
4901 storage_params: None,
4902 tablespace: None,
4903 tenant_key: None,
4904 },
4905 );
4906 let snap_path = snapshot_path(&work, &billing_bucket);
4907 save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
4908
4909 let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
4913 let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
4914
4915 let exit = compose_with_inputs(
4916 &work,
4917 "drop billing remnant",
4918 true, false, &empty_models,
4921 &[AppLifecycle {
4922 label: "billing".to_string(),
4923 database: "main".to_string(),
4924 renamed_from: None,
4925 tombstone: true, }],
4927 now,
4928 None, );
4930 assert_eq!(exit, ExitCode::from(0), "compose must succeed");
4931
4932 let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
4935 let mut up_path: Option<PathBuf> = None;
4936 for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
4937 let n = entry.file_name().to_string_lossy().to_string();
4938 if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
4941 up_path = Some(entry.path());
4942 break;
4943 }
4944 }
4945 let up_path = up_path.expect("compose must have written an up SQL file");
4946 let up_sql = fs::read_to_string(&up_path).unwrap();
4947 assert!(
4948 up_sql.contains("DROP TABLE \"widgets\""),
4949 "compose must have seen the disk snapshot and emitted DROP TABLE — \
4950 this proves discover_snapshot_buckets_on_disk reached the differ. \
4951 SQL: {up_sql}"
4952 );
4953 let _ = fs::remove_dir_all(&work);
4954 }
4955
4956 #[test]
4965 fn compose_cycle_exits_with_code_two() {
4966 use djogi::migrate::projection::BucketKey;
4967 use djogi::migrate::schema::{
4968 AppliedSchema, ColumnSchema, ForeignKeySchema, OnDeleteSchema, PkKindSchema,
4969 PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
4970 };
4971 use std::collections::BTreeMap;
4972
4973 let work = temp_workspace("compose_cycle_exit_two");
4974
4975 let fk_col = |name: &str, target_table: &str| -> ColumnSchema {
4977 ColumnSchema {
4978 name: name.to_string(),
4979 sql_type: "BIGINT".to_string(),
4980 foreign_key: Some(ForeignKeySchema {
4981 deferrable: false,
4982 initially_deferred: false,
4983 on_delete: OnDeleteSchema::Restrict,
4984 ref_column: "id".to_string(),
4985 ref_table: target_table.to_string(),
4986 }),
4987 ..default_col()
4988 }
4989 };
4990
4991 let table_with_fk =
4993 |app: &str, table: &str, fk_name: &str, fk_target: &str| -> TableSchema {
4994 let id_col = ColumnSchema {
4995 name: "id".to_string(),
4996 sql_type: "BIGINT".to_string(),
4997 default_sql: Some("heerid_next_desc()".to_string()),
4998 ..default_col()
4999 };
5000 TableSchema {
5001 app: Some(app.to_string()),
5002 columns: vec![id_col, fk_col(fk_name, fk_target)],
5003 primary_key: PrimaryKeySchema {
5004 columns: vec!["id".to_string()],
5005 kind: PkKindSchema::HeerIdRecencyBiased,
5006 },
5007 table: table.to_string(),
5008 ..default_table()
5009 }
5010 };
5011
5012 let schema_for =
5013 |app: &str, table: &str, fk_name: &str, fk_target: &str| -> AppliedSchema {
5014 let mut models = BTreeMap::new();
5015 models.insert(
5016 table.to_string(),
5017 table_with_fk(app, table, fk_name, fk_target),
5018 );
5019 AppliedSchema {
5020 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
5021 enums: BTreeMap::new(),
5022 format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
5023 generated_at: "2026-06-10T00:00:00Z".to_string(),
5024 indexes: Vec::new(),
5025 models,
5026 registered_apps: vec![app.to_string()],
5027 }
5028 };
5029
5030 let a_bucket = BucketKey {
5031 database: "main".into(),
5032 app: "a".into(),
5033 };
5034 let b_bucket = BucketKey {
5035 database: "main".into(),
5036 app: "b".into(),
5037 };
5038
5039 let mut models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
5041 models.insert(a_bucket, schema_for("a", "table_a", "b_id", "table_b"));
5042 models.insert(b_bucket, schema_for("b", "table_b", "a_id", "table_a"));
5043
5044 let now = time::OffsetDateTime::from_unix_timestamp(1_749_513_600).unwrap();
5045 let exit = compose_with_inputs(
5046 &work,
5047 "cross-bucket cycle",
5048 false, false, &models,
5051 &[
5052 AppLifecycle {
5053 label: "a".to_string(),
5054 database: "main".to_string(),
5055 renamed_from: None,
5056 tombstone: false,
5057 },
5058 AppLifecycle {
5059 label: "b".to_string(),
5060 database: "main".to_string(),
5061 renamed_from: None,
5062 tombstone: false,
5063 },
5064 ],
5065 now,
5066 None, );
5068
5069 assert_eq!(
5070 exit,
5071 ExitCode::from(2),
5072 "a cross-bucket FK cycle must exit 2 (operator-actionable refusal), not 1"
5073 );
5074 let _ = fs::remove_dir_all(&work);
5075 }
5076
5077 #[test]
5085 fn a1_round2_status_cmd_threads_workspace_to_config() {
5086 let work = temp_workspace("a1r2_status_workspace");
5087 fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
5092 let exit = status_cmd(Some(work.clone()));
5093 assert_eq!(
5094 exit,
5095 ExitCode::from(1),
5096 "malformed workspace Djogi.toml must surface as config load error"
5097 );
5098 let _ = fs::remove_dir_all(&work);
5099 }
5100
5101 #[test]
5109 fn u3_attune_refusal_variants_map_to_exit_code_two() {
5110 use djogi::migrate::AttuneRefusal;
5111 let cases = [
5112 AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
5113 database_url: "postgres://prod.example.com/main".to_string(),
5114 }),
5115 AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
5116 profile: "production".to_string(),
5117 }),
5118 AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
5121 AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
5122 env_value: "production".to_string(),
5123 }),
5124 AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
5125 version: "V20260101000000__missing".to_string(),
5126 }),
5127 AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
5128 version: "V20260101000000__shared".to_string(),
5129 buckets: vec!["main/users".to_string(), "main/billing".to_string()],
5130 }),
5131 ];
5132 for err in &cases {
5133 assert_eq!(
5134 attune_error_exit_code(err),
5135 2,
5136 "refusal variant must map to exit 2: {err}"
5137 );
5138 }
5139 }
5140
5141 #[test]
5146 fn u3_attune_runtime_variants_map_to_exit_code_one() {
5147 let cases = [
5148 AttuneError::FilesystemScanFailed {
5149 source: std::io::Error::other("disk full"),
5150 },
5151 AttuneError::SqlReadFailed {
5152 path: PathBuf::from("/tmp/x.sdjql"),
5153 source: std::io::Error::other("permission denied"),
5154 },
5155 AttuneError::SqlWriteFailed {
5156 path: PathBuf::from("/tmp/x.sdjql"),
5157 source: std::io::Error::other("read-only fs"),
5158 },
5159 AttuneError::SqlDeleteFailed {
5160 path: PathBuf::from("/tmp/x.sdjql"),
5161 source: std::io::Error::other("not found"),
5162 },
5163 AttuneError::GitPublishFailed {
5164 stderr: "fatal: refusing to push".to_string(),
5165 status_code: Some(128),
5166 },
5167 ];
5168 for err in &cases {
5169 assert_eq!(
5170 attune_error_exit_code(err),
5171 1,
5172 "runtime variant must map to exit 1: {err}"
5173 );
5174 }
5175 }
5176
5177 #[test]
5185 fn baseline_empty_reason_exits_code_2() {
5186 let result = baseline_cmd(
5187 "V00000000000000__baseline",
5188 "description",
5189 "",
5190 None,
5191 None,
5192 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5193 None, false, );
5196 assert_eq!(
5197 result,
5198 ExitCode::from(2),
5199 "empty --reason must exit 2 before any DB work"
5200 );
5201 }
5202
5203 #[test]
5204 fn baseline_whitespace_reason_exits_code_2() {
5205 let result = baseline_cmd(
5206 "V00000000000000__baseline",
5207 "description",
5208 " ",
5209 None,
5210 None,
5211 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5212 None, false, );
5215 assert_eq!(
5216 result,
5217 ExitCode::from(2),
5218 "whitespace-only --reason must exit 2 before any DB work"
5219 );
5220 }
5221
5222 #[test]
5228 fn baseline_refusal_variants_map_to_exit_code_two() {
5229 let cases = [
5230 RunnerError::VersionAlreadyApplied {
5231 version: "V00000000000000__baseline".to_string(),
5232 applied_at: None,
5233 },
5234 RunnerError::VersionCollisionNonTerminal {
5235 version: "V00000000000000__baseline".to_string(),
5236 status: LedgerStatus::Pending,
5237 run_id: 1,
5238 },
5239 RunnerError::BaselineSnapshotShouldNotBeProvided,
5240 RunnerError::AdvisoryUnlockReturnedFalse {
5241 bucket: BucketKey {
5242 database: "main".to_string(),
5243 app: String::new(),
5244 },
5245 key: 0x0102_0304_0506_0708,
5246 },
5247 RunnerError::OutOfOrderRejected {
5248 version: "V00000000000000__baseline".to_string(),
5249 conflicting_version: "V20260101000000__later".to_string(),
5250 conflicting_applied_at: None,
5251 },
5252 ];
5253 for err in &cases {
5254 assert_eq!(
5255 baseline_error_exit_code(err),
5256 2,
5257 "baseline refusal variant must map to exit 2: {err}"
5258 );
5259 }
5260 }
5261
5262 #[test]
5268 fn baseline_transient_variants_map_to_exit_code_one() {
5269 use djogi::error::{DbError, DjogiError};
5270 let cases = [
5271 RunnerError::LedgerBootstrapFailed {
5272 source: DjogiError::Db(DbError::other("create table failed")),
5273 },
5274 RunnerError::LedgerWriteFailed {
5275 version: "V00000000000000__baseline".to_string(),
5276 source: DjogiError::Db(DbError::other("insert failed")),
5277 },
5278 RunnerError::PinnedSessionCheckoutFailed {
5279 source: DjogiError::Db(DbError::other("pool exhausted")),
5280 },
5281 RunnerError::AdvisoryLockFailed {
5282 bucket: BucketKey {
5283 database: "main".to_string(),
5284 app: String::new(),
5285 },
5286 key: 0x0102_0304_0506_0708,
5287 attempts: 3,
5288 },
5289 ];
5290 for err in &cases {
5291 assert_eq!(
5292 baseline_error_exit_code(err),
5293 1,
5294 "baseline transient variant must map to exit 1: {err}"
5295 );
5296 }
5297 }
5298
5299 #[test]
5303 fn fake_without_reason_exits_code_2() {
5304 let result = apply_cmd(
5305 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5306 true,
5307 None,
5308 None, false, );
5311 assert_eq!(
5312 result,
5313 ExitCode::from(2),
5314 "--fake without --reason must exit 2"
5315 );
5316 }
5317
5318 #[test]
5320 fn fake_with_empty_reason_exits_code_2() {
5321 let result = apply_cmd(
5322 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5323 true,
5324 Some(String::new()),
5325 None, false, );
5328 assert_eq!(
5329 result,
5330 ExitCode::from(2),
5331 "--fake with empty reason must exit 2"
5332 );
5333 }
5334
5335 #[test]
5337 fn fake_with_whitespace_reason_exits_code_2() {
5338 let result = apply_cmd(
5339 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5340 true,
5341 Some(" ".to_string()),
5342 None, false, );
5345 assert_eq!(
5346 result,
5347 ExitCode::from(2),
5348 "--fake with whitespace reason must exit 2"
5349 );
5350 }
5351
5352 #[test]
5354 fn reason_without_fake_is_accepted() {
5355 let result = apply_cmd(
5359 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
5360 false, Some("test reason".to_string()),
5362 None, true, );
5365 assert_ne!(
5367 result,
5368 ExitCode::from(2),
5369 "--reason without --fake should not refuse"
5370 );
5371 }
5372
5373 fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
5380 djogi::migrate::BucketKey {
5381 database: database.to_string(),
5382 app: app.to_string(),
5383 }
5384 }
5385
5386 fn diag(
5388 code: &str,
5389 severity: djogi::migrate::VerifySeverity,
5390 message: &str,
5391 location: Option<&str>,
5392 ) -> djogi::migrate::VerifyDiagnostic {
5393 djogi::migrate::VerifyDiagnostic {
5394 code: code.to_string(),
5395 severity,
5396 message: message.to_string(),
5397 location: location.map(str::to_string),
5398 }
5399 }
5400
5401 #[test]
5402 fn render_verify_report_clean_output() {
5403 use djogi::migrate::VerifyReport;
5404
5405 let report = VerifyReport {
5406 diagnostics: vec![],
5407 latest_applied_version: Some("001_initial".to_string()),
5408 applied_count: 3,
5409 unfinished_count: 0,
5410 };
5411 let bucket = render_bucket("main", "");
5412
5413 let lines = render_verify_report(&report, &bucket);
5414
5415 assert!(
5416 lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
5417 "missing ledger line; got {lines:?}"
5418 );
5419 assert!(
5420 lines.contains(&"No drift detected. Schema is consistent.".to_string()),
5421 "missing clean line; got {lines:?}"
5422 );
5423 assert!(
5424 lines.iter().any(|l| l.contains("Result: PASSED")),
5425 "missing PASSED result; got {lines:?}"
5426 );
5427 assert!(
5428 !lines.iter().any(|l| l.contains("FAILED")),
5429 "clean report must not say FAILED; got {lines:?}"
5430 );
5431 }
5432
5433 #[test]
5434 fn render_verify_report_with_errors() {
5435 use djogi::migrate::{VerifyReport, VerifySeverity};
5436
5437 let report = VerifyReport {
5440 diagnostics: vec![
5441 diag(
5442 "D601",
5443 VerifySeverity::Error,
5444 "Snapshot table missing from live DB",
5445 Some("users"),
5446 ),
5447 diag(
5448 "D611",
5449 VerifySeverity::Warning,
5450 "Live index not present in snapshot",
5451 Some("idx_posts_created"),
5452 ),
5453 ],
5454 latest_applied_version: Some("V20260501000000__add_users".to_string()),
5455 applied_count: 2,
5456 unfinished_count: 0,
5457 };
5458 let bucket = render_bucket("main", "myapp");
5459
5460 assert!(report.has_errors());
5461 let lines = render_verify_report(&report, &bucket);
5462
5463 assert!(
5464 lines
5465 .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
5466 "missing D601 line; got {lines:?}"
5467 );
5468 assert!(
5469 lines.contains(
5470 &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
5471 ),
5472 "missing D611 line; got {lines:?}"
5473 );
5474 assert!(
5475 lines.iter().any(|l| l.contains("Result: FAILED")),
5476 "error report must say FAILED; got {lines:?}"
5477 );
5478 }
5479
5480 #[test]
5481 fn render_verify_report_header_shows_global_and_named_app() {
5482 use djogi::migrate::VerifyReport;
5483
5484 let report = VerifyReport {
5485 diagnostics: vec![],
5486 latest_applied_version: None,
5487 applied_count: 0,
5488 unfinished_count: 0,
5489 };
5490
5491 let global = render_verify_report(&report, &render_bucket("main", ""));
5493 assert_eq!(
5494 global.first().map(String::as_str),
5495 Some("djogi migrations verify — main/_global_"),
5496 "global bucket header; got {global:?}"
5497 );
5498
5499 let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
5501 assert_eq!(
5502 named.first().map(String::as_str),
5503 Some("djogi migrations verify — crud_log/billing"),
5504 "named bucket header; got {named:?}"
5505 );
5506 }
5507
5508 #[test]
5509 fn render_verify_report_warning_only_passes_with_warnings() {
5510 use djogi::migrate::{VerifyReport, VerifySeverity};
5511
5512 let report = VerifyReport {
5513 diagnostics: vec![diag(
5514 "D606",
5515 VerifySeverity::Warning,
5516 "type differs (advisory)",
5517 Some("users.age"),
5518 )],
5519 latest_applied_version: Some("001_initial".to_string()),
5520 applied_count: 1,
5521 unfinished_count: 0,
5522 };
5523 let lines = render_verify_report(&report, &render_bucket("main", ""));
5524
5525 assert!(
5526 lines
5527 .iter()
5528 .any(|l| l.contains("Result: PASSED with warnings")),
5529 "warning-only must PASS with warnings; got {lines:?}"
5530 );
5531 assert!(
5532 !lines.iter().any(|l| l.contains("FAILED")),
5533 "warning-only must not say FAILED; got {lines:?}"
5534 );
5535 }
5536
5537 #[test]
5538 fn render_verify_report_empty_ledger_line() {
5539 use djogi::migrate::VerifyReport;
5540
5541 let report = VerifyReport {
5542 diagnostics: vec![],
5543 latest_applied_version: None,
5544 applied_count: 0,
5545 unfinished_count: 0,
5546 };
5547 let lines = render_verify_report(&report, &render_bucket("main", ""));
5548
5549 assert!(
5550 lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
5551 "empty ledger line; got {lines:?}"
5552 );
5553 }
5554
5555 #[test]
5556 fn render_verify_report_unfinished_ledger_line() {
5557 use djogi::migrate::VerifyReport;
5558
5559 let report = VerifyReport {
5560 diagnostics: vec![],
5561 latest_applied_version: Some("V20260501000000__add_users".to_string()),
5562 applied_count: 2,
5563 unfinished_count: 1,
5564 };
5565 let lines = render_verify_report(&report, &render_bucket("main", ""));
5566
5567 assert!(
5568 lines.contains(
5569 &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
5570 ),
5571 "unfinished ledger line; got {lines:?}"
5572 );
5573 }
5574
5575 #[test]
5576 fn render_verify_report_info_with_no_location_uses_dash() {
5577 use djogi::migrate::{VerifyReport, VerifySeverity};
5578
5579 let report = VerifyReport {
5582 diagnostics: vec![diag(
5583 "D692",
5584 VerifySeverity::Info,
5585 "enum type(s) declared; not yet checked",
5586 None,
5587 )],
5588 latest_applied_version: Some("001_initial".to_string()),
5589 applied_count: 1,
5590 unfinished_count: 0,
5591 };
5592 let lines = render_verify_report(&report, &render_bucket("main", ""));
5593
5594 assert!(
5595 lines.iter().any(|l| l.contains("(-)")),
5596 "location: None must render as (-); got {lines:?}"
5597 );
5598 assert!(
5599 lines.contains(&"Result: PASSED (1 info(s))".to_string()),
5600 "all-info summary; got {lines:?}"
5601 );
5602 }
5603
5604 fn db_config(
5607 url: &str,
5608 crud_log_url: Option<&str>,
5609 event_log_url: Option<&str>,
5610 ) -> djogi::config::DatabaseConfig {
5611 djogi::config::DatabaseConfig {
5612 url: url.to_string(),
5613 crud_log_url: crud_log_url.map(str::to_string),
5614 event_log_url: event_log_url.map(str::to_string),
5615 max_connections: None,
5616 dev_mode: false,
5617 }
5618 }
5619
5620 #[test]
5621 fn resolve_bucket_url_main_uses_app_url_verbatim() {
5622 let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
5626 assert_eq!(
5627 resolve_bucket_url(&cfg, "main").as_deref(),
5628 Some("postgres://user:pass@localhost:5432/myapp_prod"),
5629 "main must return the app URL unchanged"
5630 );
5631 }
5632
5633 #[test]
5634 fn resolve_bucket_url_crud_log_prefers_explicit_url() {
5635 let cfg = db_config(
5636 "postgres://localhost/main",
5637 Some("postgres://localhost/explicit_crud"),
5638 None,
5639 );
5640 assert_eq!(
5641 resolve_bucket_url(&cfg, "crud_log").as_deref(),
5642 Some("postgres://localhost/explicit_crud"),
5643 "crud_log must prefer the explicit crud_log_url"
5644 );
5645 }
5646
5647 #[test]
5648 fn resolve_bucket_url_event_log_prefers_explicit_url() {
5649 let cfg = db_config(
5650 "postgres://localhost/main",
5651 None,
5652 Some("postgres://localhost/explicit_event"),
5653 );
5654 assert_eq!(
5655 resolve_bucket_url(&cfg, "event_log").as_deref(),
5656 Some("postgres://localhost/explicit_event"),
5657 "event_log must prefer the explicit event_log_url"
5658 );
5659 }
5660
5661 #[test]
5662 fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
5663 let cfg = db_config("postgres://localhost/main", Some(""), Some(" "));
5666 assert_eq!(
5668 resolve_bucket_url(&cfg, "crud_log").as_deref(),
5669 Some("postgres://localhost/crud_log"),
5670 "empty crud_log_url must fall back to derived"
5671 );
5672 assert_eq!(
5675 resolve_bucket_url(&cfg, "event_log").as_deref(),
5676 Some(" "),
5677 "non-empty (whitespace) event_log_url is used verbatim"
5678 );
5679 }
5680
5681 #[test]
5682 fn resolve_bucket_url_other_database_derives_from_app_url() {
5683 let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
5684 assert_eq!(
5685 resolve_bucket_url(&cfg, "analytics").as_deref(),
5686 Some("postgres://user:pass@localhost:5432/analytics"),
5687 "an arbitrary database name derives by path splice"
5688 );
5689 }
5690
5691 #[test]
5692 fn resolve_bucket_url_pathless_url_returns_none() {
5693 let cfg = db_config("postgres://localhost", None, None);
5695 assert_eq!(
5696 resolve_bucket_url(&cfg, "crud_log"),
5697 None,
5698 "pathless URL must yield None for a derived database"
5699 );
5700 }
5701
5702 #[test]
5703 fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
5704 let cfg = db_config("postgres://localhost", None, None);
5707 assert_eq!(
5708 resolve_bucket_url(&cfg, "main").as_deref(),
5709 Some("postgres://localhost"),
5710 "main returns the app URL verbatim regardless of path"
5711 );
5712 }
5713
5714 #[test]
5715 fn resolve_apply_target_urls_uses_pending_bucket_databases() {
5716 let work = temp_workspace("apply_target_urls");
5717 write_pending_json(
5718 &djogi::migrate::pending_json_path(
5719 &work,
5720 &BucketKey {
5721 database: "main".to_string(),
5722 app: String::new(),
5723 },
5724 ),
5725 "main",
5726 "",
5727 "V20260607010101__main_global",
5728 &[],
5729 );
5730 write_pending_json(
5731 &djogi::migrate::pending_json_path(
5732 &work,
5733 &BucketKey {
5734 database: "crud_log".to_string(),
5735 app: "audit".to_string(),
5736 },
5737 ),
5738 "crud_log",
5739 "audit",
5740 "V20260607010102__crud_log_audit",
5741 &[],
5742 );
5743
5744 let discovered = discover_pending_plans(&work).expect("discover");
5745 let cfg = db_config(
5746 "postgres://user:pass@localhost:5432/myapp_prod",
5747 Some("postgres://user:pass@localhost:5432/myapp_crud"),
5748 None,
5749 );
5750
5751 let urls = resolve_apply_target_urls(&discovered, &cfg).expect("resolve");
5752 assert_eq!(
5753 urls.len(),
5754 2,
5755 "apply must preserve distinct target databases"
5756 );
5757 assert_eq!(
5758 urls.get("main").map(String::as_str),
5759 Some("postgres://user:pass@localhost:5432/myapp_prod"),
5760 "main pending plans must keep the app database URL"
5761 );
5762 assert_eq!(
5763 urls.get("crud_log").map(String::as_str),
5764 Some("postgres://user:pass@localhost:5432/myapp_crud"),
5765 "crud_log pending plans must route through the crud_log database URL"
5766 );
5767 let _ = fs::remove_dir_all(&work);
5768 }
5769
5770 #[test]
5771 fn resolve_apply_target_urls_refuses_unresolvable_pending_database() {
5772 let work = temp_workspace("apply_target_urls_unresolvable");
5773 write_pending_json(
5774 &djogi::migrate::pending_json_path(
5775 &work,
5776 &BucketKey {
5777 database: "analytics".to_string(),
5778 app: String::new(),
5779 },
5780 ),
5781 "analytics",
5782 "",
5783 "V20260607010103__analytics_global",
5784 &[],
5785 );
5786
5787 let discovered = discover_pending_plans(&work).expect("discover");
5788 let cfg = db_config("postgres://localhost", None, None);
5789 let err = resolve_apply_target_urls(&discovered, &cfg)
5790 .expect_err("pathless app URL must refuse a derived pending database");
5791 assert!(err.contains("analytics"), "unexpected error: {err}");
5792 let _ = fs::remove_dir_all(&work);
5793 }
5794
5795 #[test]
5798 fn classify_phase_zero_bytes_identity_free_production_is_ok() {
5799 let sql = current_production_phase_zero_sql("current_bytes");
5800 assert!(
5801 classify_phase_zero_bytes(sql.as_bytes()).is_none(),
5802 "production Phase 0 should be identity-free replay-current (no refusal)"
5803 );
5804 }
5805
5806 #[test]
5807 fn classify_phase_zero_bytes_seed_capable_is_refused() {
5808 let sql = seed_capable_phase_zero_sql();
5809 let refusal = classify_phase_zero_bytes(sql.as_bytes());
5810 assert!(
5811 refusal.is_some(),
5812 "seed-capable Phase 0 should be refused by cleanup guard"
5813 );
5814 assert!(refusal.unwrap().contains("seed-capable"));
5815 }
5816
5817 #[test]
5818 fn classify_phase_zero_bytes_generated_stale_is_refused() {
5819 let sql = generated_stale_phase_zero_sql("stale_bytes");
5820 let refusal = classify_phase_zero_bytes(sql.as_bytes());
5821 assert!(
5822 refusal.is_some(),
5823 "generated-stale Phase 0 should be refused"
5824 );
5825 assert!(refusal.unwrap().contains("generated-stale"));
5826 }
5827
5828 #[test]
5829 fn classify_phase_zero_bytes_markerless_seed_is_refused() {
5830 let sql = markerless_seed_phase_zero_sql("markerless_seed_bytes");
5831 let refusal = classify_phase_zero_bytes(sql.as_bytes());
5832 assert!(
5833 refusal.is_some(),
5834 "markerless seed Phase 0 should be refused by cleanup guard"
5835 );
5836 assert!(refusal.unwrap().contains("seed-dml"));
5837 }
5838
5839 #[test]
5840 fn classify_phase_zero_bytes_extended_seed_dml_forms_are_refused() {
5841 for (name, statement) in extended_seed_statement_cases() {
5842 let sql =
5843 phase_zero_with_seed_statement(&format!("extended_seed_bytes_{name}"), statement);
5844 let refusal = classify_phase_zero_bytes(sql.as_bytes());
5845 let msg = refusal.expect("extended seed Phase 0 should be refused");
5846 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
5847 }
5848 }
5849
5850 #[test]
5851 fn classify_phase_zero_bytes_ambiguous_is_refused() {
5852 let sql = "CREATE SCHEMA IF NOT EXISTS heer;\n\
5854 ALTER DATABASE \"mydb\" SET heer.node_id = '1';\n";
5855 let refusal = classify_phase_zero_bytes(sql.as_bytes());
5856 assert!(refusal.is_some(), "ambiguous Phase 0 should be refused");
5857 assert!(refusal.unwrap().contains("ambiguous"));
5858 }
5859
5860 #[test]
5861 fn classify_phase_zero_bytes_missing_is_refused() {
5862 let refusal = classify_phase_zero_bytes(b" \n\t ");
5863 assert!(refusal.is_some(), "missing Phase 0 should be refused");
5864 assert!(refusal.unwrap().contains("missing"));
5865 }
5866
5867 #[test]
5868 fn classify_phase_zero_for_cleanup_refuses_stale_replay_plan() {
5869 let work = temp_workspace("stale_cleanup");
5870 let bucket_dir = work.join("migrations/main/_global_");
5871 fs::create_dir_all(&bucket_dir).unwrap();
5872
5873 let replay = CliReplayPlan {
5875 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
5876 classification: CliClassification::Additive,
5877 checksum_up: "V1:aabbccdd".to_string(),
5878 checksum_down: None,
5879 segments: vec![CliReplaySegment {
5880 kind: CliSegmentKind::Transactional,
5881 statements: vec![CliReplayStatement {
5882 label: "phase_zero_bootstrap".to_string(),
5883 up: generated_stale_phase_zero_sql("stale_replay"),
5884 }],
5885 }],
5886 };
5887 fs::write(
5888 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
5889 serde_json::to_string(&replay).unwrap(),
5890 )
5891 .unwrap();
5892
5893 let bucket = djogi::migrate::BucketKey {
5894 database: "main".to_string(),
5895 app: String::new(),
5896 };
5897 let refusal = classify_phase_zero_for_cleanup(
5898 &work,
5899 &bucket,
5900 djogi::migrate::PHASE_ZERO_VERSION,
5901 "V1:aabbccdd",
5902 None,
5903 );
5904 assert!(
5905 refusal.is_some(),
5906 "stale Phase 0 replay plan should be refused by cleanup guard"
5907 );
5908 let msg = refusal.unwrap();
5909 assert!(msg.contains("generated-stale"), "refusal reason: {msg}");
5910
5911 let _ = fs::remove_dir_all(&work);
5912 }
5913
5914 #[test]
5915 fn classify_phase_zero_for_cleanup_allows_current_replay_plan() {
5916 let work = temp_workspace("current_cleanup");
5917 let bucket_dir = work.join("migrations/main/_global_");
5918 fs::create_dir_all(&bucket_dir).unwrap();
5919
5920 let replay = CliReplayPlan {
5922 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
5923 classification: CliClassification::Additive,
5924 checksum_up: "V1:eeff0011".to_string(),
5925 checksum_down: None,
5926 segments: vec![CliReplaySegment {
5927 kind: CliSegmentKind::Transactional,
5928 statements: vec![CliReplayStatement {
5929 label: "phase_zero_bootstrap".to_string(),
5930 up: current_production_phase_zero_sql("current_replay"),
5931 }],
5932 }],
5933 };
5934 fs::write(
5935 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
5936 serde_json::to_string(&replay).unwrap(),
5937 )
5938 .unwrap();
5939
5940 let bucket = djogi::migrate::BucketKey {
5941 database: "main".to_string(),
5942 app: String::new(),
5943 };
5944 let refusal = classify_phase_zero_for_cleanup(
5945 &work,
5946 &bucket,
5947 djogi::migrate::PHASE_ZERO_VERSION,
5948 "V1:eeff0011",
5949 None,
5950 );
5951 assert!(
5952 refusal.is_none(),
5953 "identity-free Phase 0 should be allowed by cleanup guard; got: {refusal:?}"
5954 );
5955
5956 let _ = fs::remove_dir_all(&work);
5957 }
5958
5959 #[test]
5960 fn classify_phase_zero_for_cleanup_refuses_seed_capable_replay_plan() {
5961 let work = temp_workspace("seed_cleanup_replay_plan");
5962 let bucket_dir = work.join("migrations/main/_global_");
5963 fs::create_dir_all(&bucket_dir).unwrap();
5964
5965 let replay = CliReplayPlan {
5966 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
5967 classification: CliClassification::Additive,
5968 checksum_up: "V1:11223344".to_string(),
5969 checksum_down: None,
5970 segments: vec![CliReplaySegment {
5971 kind: CliSegmentKind::Transactional,
5972 statements: vec![CliReplayStatement {
5973 label: "phase_zero_bootstrap".to_string(),
5974 up: seed_capable_phase_zero_sql(),
5975 }],
5976 }],
5977 };
5978 fs::write(
5979 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
5980 serde_json::to_string(&replay).unwrap(),
5981 )
5982 .unwrap();
5983
5984 let bucket = djogi::migrate::BucketKey {
5985 database: "main".to_string(),
5986 app: String::new(),
5987 };
5988 let refusal = classify_phase_zero_for_cleanup(
5989 &work,
5990 &bucket,
5991 djogi::migrate::PHASE_ZERO_VERSION,
5992 "V1:11223344",
5993 None,
5994 );
5995 let msg = refusal.expect("seed-capable replay plan must refuse");
5996 assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
5997
5998 let _ = fs::remove_dir_all(&work);
5999 }
6000
6001 #[test]
6002 fn classify_phase_zero_for_cleanup_refuses_markerless_seed_replay_plan() {
6003 let work = temp_workspace("markerless_seed_cleanup_replay_plan");
6004 let bucket_dir = work.join("migrations/main/_global_");
6005 fs::create_dir_all(&bucket_dir).unwrap();
6006
6007 let replay = CliReplayPlan {
6008 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
6009 classification: CliClassification::Additive,
6010 checksum_up: "V1:55667788".to_string(),
6011 checksum_down: None,
6012 segments: vec![CliReplaySegment {
6013 kind: CliSegmentKind::Transactional,
6014 statements: vec![CliReplayStatement {
6015 label: "phase_zero_bootstrap".to_string(),
6016 up: markerless_seed_phase_zero_sql("markerless_seed_replay"),
6017 }],
6018 }],
6019 };
6020 fs::write(
6021 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
6022 serde_json::to_string(&replay).unwrap(),
6023 )
6024 .unwrap();
6025
6026 let bucket = djogi::migrate::BucketKey {
6027 database: "main".to_string(),
6028 app: String::new(),
6029 };
6030 let refusal = classify_phase_zero_for_cleanup(
6031 &work,
6032 &bucket,
6033 djogi::migrate::PHASE_ZERO_VERSION,
6034 "V1:55667788",
6035 None,
6036 );
6037 let msg = refusal.expect("markerless seed replay plan must refuse");
6038 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
6039
6040 let _ = fs::remove_dir_all(&work);
6041 }
6042
6043 #[test]
6044 fn classify_phase_zero_for_cleanup_refuses_cte_seed_dml_replay_plan() {
6045 let work = temp_workspace("cte_seed_cleanup_replay_plan");
6046 let bucket_dir = work.join("migrations/main/_global_");
6047 fs::create_dir_all(&bucket_dir).unwrap();
6048
6049 let replay = CliReplayPlan {
6050 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
6051 classification: CliClassification::Additive,
6052 checksum_up: "V1:66778899".to_string(),
6053 checksum_down: None,
6054 segments: vec![CliReplaySegment {
6055 kind: CliSegmentKind::Transactional,
6056 statements: vec![CliReplayStatement {
6057 label: "phase_zero_bootstrap".to_string(),
6058 up: phase_zero_with_seed_statement(
6059 "cte_seed_cleanup_replay",
6060 "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
6061 ),
6062 }],
6063 }],
6064 };
6065 fs::write(
6066 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
6067 serde_json::to_string(&replay).unwrap(),
6068 )
6069 .unwrap();
6070
6071 let bucket = djogi::migrate::BucketKey {
6072 database: "main".to_string(),
6073 app: String::new(),
6074 };
6075 let refusal = classify_phase_zero_for_cleanup(
6076 &work,
6077 &bucket,
6078 djogi::migrate::PHASE_ZERO_VERSION,
6079 "V1:66778899",
6080 None,
6081 );
6082 let msg = refusal.expect("CTE seed replay plan must refuse");
6083 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
6084
6085 let _ = fs::remove_dir_all(&work);
6086 }
6087
6088 #[test]
6089 fn classify_phase_zero_for_cleanup_fallback_sql_file() {
6090 let work = temp_workspace("fallback_cleanup");
6091 let bucket_dir = work.join("migrations/main/_global_");
6092 fs::create_dir_all(&bucket_dir).unwrap();
6093
6094 let up_sql = current_production_phase_zero_sql("fallback_sql");
6095 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
6096 fs::write(bucket_dir.join(&up_filename), up_sql).unwrap();
6097
6098 let bucket = djogi::migrate::BucketKey {
6099 database: "main".to_string(),
6100 app: String::new(),
6101 };
6102 let refusal = classify_phase_zero_for_cleanup(
6103 &work,
6104 &bucket,
6105 djogi::migrate::PHASE_ZERO_VERSION,
6106 "V1:anychecksum",
6107 None,
6108 );
6109 assert!(
6110 refusal.is_none(),
6111 "identity-free Phase 0 fallback SQL should be allowed; got: {refusal:?}"
6112 );
6113
6114 let _ = fs::remove_dir_all(&work);
6115 }
6116
6117 #[test]
6118 fn classify_phase_zero_for_cleanup_refuses_seed_capable_fallback_sql_file() {
6119 let work = temp_workspace("seed_cleanup_fallback");
6120 let bucket_dir = work.join("migrations/main/_global_");
6121 fs::create_dir_all(&bucket_dir).unwrap();
6122
6123 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
6124 fs::write(bucket_dir.join(&up_filename), seed_capable_phase_zero_sql()).unwrap();
6125
6126 let bucket = djogi::migrate::BucketKey {
6127 database: "main".to_string(),
6128 app: String::new(),
6129 };
6130 let refusal = classify_phase_zero_for_cleanup(
6131 &work,
6132 &bucket,
6133 djogi::migrate::PHASE_ZERO_VERSION,
6134 "V1:anychecksum",
6135 None,
6136 );
6137 let msg = refusal.expect("seed-capable fallback SQL must refuse");
6138 assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
6139
6140 let _ = fs::remove_dir_all(&work);
6141 }
6142
6143 #[test]
6144 fn classify_phase_zero_for_cleanup_refuses_markerless_seed_fallback_sql_file() {
6145 let work = temp_workspace("markerless_seed_cleanup_fallback");
6146 let bucket_dir = work.join("migrations/main/_global_");
6147 fs::create_dir_all(&bucket_dir).unwrap();
6148
6149 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
6150 fs::write(
6151 bucket_dir.join(&up_filename),
6152 markerless_seed_phase_zero_sql("markerless_seed_fallback"),
6153 )
6154 .unwrap();
6155
6156 let bucket = djogi::migrate::BucketKey {
6157 database: "main".to_string(),
6158 app: String::new(),
6159 };
6160 let refusal = classify_phase_zero_for_cleanup(
6161 &work,
6162 &bucket,
6163 djogi::migrate::PHASE_ZERO_VERSION,
6164 "V1:anychecksum",
6165 None,
6166 );
6167 let msg = refusal.expect("markerless seed fallback SQL must refuse");
6168 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
6169
6170 let _ = fs::remove_dir_all(&work);
6171 }
6172
6173 #[test]
6174 fn classify_phase_zero_for_cleanup_refuses_copy_from_seed_fallback_sql_file() {
6175 let work = temp_workspace("copy_seed_cleanup_fallback");
6176 let bucket_dir = work.join("migrations/main/_global_");
6177 fs::create_dir_all(&bucket_dir).unwrap();
6178
6179 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
6180 fs::write(
6181 bucket_dir.join(&up_filename),
6182 phase_zero_with_seed_statement(
6183 "copy_seed_cleanup_fallback",
6184 "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
6185 ),
6186 )
6187 .unwrap();
6188
6189 let bucket = djogi::migrate::BucketKey {
6190 database: "main".to_string(),
6191 app: String::new(),
6192 };
6193 let refusal = classify_phase_zero_for_cleanup(
6194 &work,
6195 &bucket,
6196 djogi::migrate::PHASE_ZERO_VERSION,
6197 "V1:anychecksum",
6198 None,
6199 );
6200 let msg = refusal.expect("COPY FROM seed fallback SQL must refuse");
6201 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
6202
6203 let _ = fs::remove_dir_all(&work);
6204 }
6205
6206 #[djogi::djogi_test]
6209 async fn check_ledger_state_is_app_scoped(mut ctx: djogi::context::DjogiContext) {
6210 use djogi::migrate::{ExecutionMode, LedgerRow, LedgerStatus};
6211
6212 djogi::migrate::bootstrap_ledger(&mut ctx)
6214 .await
6215 .expect("bootstrap");
6216
6217 let row = LedgerRow {
6219 version: "V20260609000000__t397".into(),
6220 description: "test migration".into(),
6221 checksum_up: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
6222 checksum_down: Some(
6223 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".into(),
6224 ),
6225 execution_mode: ExecutionMode::Transactional,
6226 status: LedgerStatus::Pending,
6227 execution_time_ms: 0,
6228 out_of_order_flag: false,
6229 applied_steps_count: 0,
6230 total_steps: None,
6231 partial_apply_note: None,
6232 run_id: 1,
6233 snapshot_version: "0".into(),
6234 app_label: "users".into(),
6235 leaf_identity: None,
6236 };
6237 let ledger_id = djogi::migrate::insert_pending_ledger_row(&mut ctx, &row)
6238 .await
6239 .expect("insert pending");
6240 djogi::migrate::mark_ledger_applied(&mut ctx, ledger_id, 10, 1)
6241 .await
6242 .expect("mark applied");
6243
6244 let state = check_ledger_state(&mut ctx, "V20260609000000__t397", "system").await;
6246 assert!(
6247 matches!(state, LedgerState::NotPresent),
6248 "different app stream must be NotPresent, got {state:?}",
6249 );
6250
6251 let state = check_ledger_state(&mut ctx, "V20260609000000__t397", "users").await;
6253 assert!(
6254 matches!(state, LedgerState::AlreadyApplied),
6255 "same app stream must be AlreadyApplied, got {state:?}",
6256 );
6257 }
6258}