1use std::path::{Path, PathBuf};
10use std::process::ExitCode;
11
12use djogi::apps::AppRegistry;
13use djogi::migrate::{
14 AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
15 DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
16 RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
17 VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
18 compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
19 repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
20};
21
22use djogi::migrate::LedgerStatus;
24
25use crate::{PartialApplyResolutionCli, RepairSubcommand};
28
29#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
37struct CliReplayPlan {
38 format_version: String,
39 checksum_up: String,
40 checksum_down: Option<String>,
41 classification: CliClassification,
42 segments: Vec<CliReplaySegment>,
43}
44
45#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
46#[serde(tag = "kind", rename_all = "snake_case")]
47enum CliClassification {
48 NoOp,
49 Additive,
50 Reversible,
51 Destructive,
52 Lossy,
53 Unsupported {
54 reason: String,
55 },
56 PkTypeFlip {
57 co_destructive: bool,
58 co_lossy: bool,
59 },
60}
61
62#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
63struct CliReplaySegment {
64 kind: CliSegmentKind,
65 statements: Vec<CliReplayStatement>,
66}
67
68#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
69#[serde(rename_all = "snake_case")]
70enum CliSegmentKind {
71 Transactional,
72 NonTransactional,
73 MetadataOnly,
74}
75
76#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
77struct CliReplayStatement {
78 label: String,
79 up: String,
80}
81
82const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
84
85fn load_replay_plan_from_disk(
91 workspace: &Path,
92 bucket: &djogi::migrate::BucketKey,
93 version: &str,
94 pending_checksum_up: &str,
95 pending_checksum_down: Option<&str>,
96) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
97 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
99 let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
100
101 if let Ok(bytes) = std::fs::read(&replay_plan_path) {
102 let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
103 Ok(s) => s,
104 Err(e) => {
105 return Err(ApplyReplayPlanError::Parse {
106 path: replay_plan_path.clone(),
107 source: e.to_string(),
108 });
109 }
110 };
111
112 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
113 return Err(ApplyReplayPlanError::FormatVersion {
114 found: stored.format_version,
115 path: replay_plan_path.clone(),
116 });
117 }
118
119 if stored.checksum_up != pending_checksum_up
121 || stored.checksum_down.as_deref() != pending_checksum_down
122 {
123 return Err(ApplyReplayPlanError::ChecksumMismatch);
124 }
125
126 let plan = djogi::migrate::MigrationPlan {
127 bucket: bucket.clone(),
128 classification: stored.classification.into(),
129 segments: stored
130 .segments
131 .into_iter()
132 .map(|seg| djogi::migrate::Segment {
133 kind: seg.kind.into(),
134 statements: seg
135 .statements
136 .into_iter()
137 .map(|stmt| djogi::migrate::OperationSql {
138 label: stmt.label,
139 up: stmt.up,
140 down: String::new(),
141 lossy: None,
142 })
143 .collect(),
144 })
145 .collect(),
146 };
147
148 return Ok((plan, stored.checksum_up, stored.checksum_down));
149 }
150
151 let up_filename = djogi::migrate::up_filename(version);
153 let down_filename = djogi::migrate::down_filename(version);
154 let up_path = bucket_dir.join(&up_filename);
155 let down_path = bucket_dir.join(&down_filename);
156
157 let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
158 path: up_path.clone(),
159 source: e.to_string(),
160 })?;
161
162 let down_sql = match std::fs::read_to_string(&down_path) {
163 Ok(sql) => sql,
164 Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
165 Err(e) => {
166 return Err(ApplyReplayPlanError::SqlRead {
167 path: down_path.clone(),
168 source: e.to_string(),
169 });
170 }
171 };
172
173 let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
177
178 let plan = djogi::migrate::MigrationPlan {
182 bucket: bucket.clone(),
183 classification: djogi::migrate::Classification::Additive,
184 segments: vec![djogi::migrate::Segment {
185 kind: djogi::migrate::SegmentKind::Transactional,
186 statements: vec![djogi::migrate::OperationSql {
187 label: format!("replay {version}"),
188 up: up_sql,
189 down: down_sql,
190 lossy: None,
191 }],
192 }],
193 };
194
195 Ok((plan, computed_checksum_up, None))
196}
197
198#[derive(Debug)]
200enum ApplyReplayPlanError {
201 Parse { path: PathBuf, source: String },
202 FormatVersion { found: String, path: PathBuf },
203 ChecksumMismatch,
204 SqlRead { path: PathBuf, source: String },
205}
206
207impl std::fmt::Display for ApplyReplayPlanError {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 match self {
210 Self::Parse { path, source } => {
211 write!(f, "parse replay plan {}: {source}", path.display())
212 }
213 Self::FormatVersion { found, path } => write!(
214 f,
215 "replay plan format version mismatch in {}: expected {}, found {}",
216 path.display(),
217 CLI_REPLAY_PLAN_FORMAT_VERSION,
218 found
219 ),
220 Self::ChecksumMismatch => {
221 write!(f, "checksum mismatch between pending JSON and replay plan")
222 }
223 Self::SqlRead { path, source } => {
224 write!(f, "read SQL file {}: {source}", path.display())
225 }
226 }
227 }
228}
229
230impl std::error::Error for ApplyReplayPlanError {}
231
232fn classify_phase_zero_for_cleanup(
239 workspace: &Path,
240 bucket: &djogi::migrate::BucketKey,
241 version: &str,
242 pending_checksum_up: &str,
243 pending_checksum_down: Option<&str>,
244) -> Option<String> {
245 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
247 let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
248
249 if let Ok(bytes) = std::fs::read(&replay_plan_path) {
250 let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
251 Ok(s) => s,
252 Err(e) => {
253 return Some(format!("parse replay plan: {e}"));
254 }
255 };
256
257 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
258 return Some(format!(
259 "replay plan format version mismatch: expected {}, found {}",
260 CLI_REPLAY_PLAN_FORMAT_VERSION, stored.format_version
261 ));
262 }
263
264 if stored.checksum_up != pending_checksum_up
266 || stored.checksum_down.as_deref() != pending_checksum_down
267 {
268 return Some("checksum mismatch between pending JSON and replay plan".to_string());
269 }
270
271 let up_sql: String = stored
273 .segments
274 .iter()
275 .flat_map(|seg| seg.statements.iter())
276 .map(|stmt| stmt.up.as_str())
277 .collect::<Vec<&str>>()
278 .join("\n");
279
280 return classify_phase_zero_bytes(up_sql.as_bytes());
281 }
282
283 let up_filename = djogi::migrate::up_filename(version);
285 let up_path = bucket_dir.join(&up_filename);
286 match std::fs::read_to_string(&up_path) {
287 Ok(up_sql) => classify_phase_zero_bytes(up_sql.as_bytes()),
288 Err(e) => Some(format!("read up SQL file {}: {e}", up_path.display())),
289 }
290}
291
292fn classify_phase_zero_bytes(bytes: &[u8]) -> Option<String> {
295 match djogi::migrate::classify_phase_zero_artifact(bytes) {
296 djogi::migrate::PhaseZeroArtifactState::IdentityFreeCurrent => None,
297 djogi::migrate::PhaseZeroArtifactState::SeedCapableRuntimeCurrent => {
298 Some("seed-capable runtime-only artifact detected".to_string())
299 }
300 djogi::migrate::PhaseZeroArtifactState::SeedDmlNotRuntimeCurrent => {
301 Some("seed-dml non-runtime-current artifact detected".to_string())
302 }
303 djogi::migrate::PhaseZeroArtifactState::GeneratedStale => {
304 Some("generated-stale artifact detected".to_string())
305 }
306 djogi::migrate::PhaseZeroArtifactState::Ambiguous => {
307 Some("ambiguous or hand-edited artifact detected".to_string())
308 }
309 djogi::migrate::PhaseZeroArtifactState::Incomplete => {
310 Some("incomplete artifact (truncated generation)".to_string())
311 }
312 djogi::migrate::PhaseZeroArtifactState::Missing => Some("missing artifact".to_string()),
313 }
314}
315
316impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
319 fn from(kind: CliSegmentKind) -> Self {
320 match kind {
321 CliSegmentKind::Transactional => Self::Transactional,
322 CliSegmentKind::NonTransactional => Self::NonTransactional,
323 CliSegmentKind::MetadataOnly => Self::MetadataOnly,
324 }
325 }
326}
327
328impl From<CliClassification> for djogi::migrate::Classification {
329 fn from(classification: CliClassification) -> Self {
330 match classification {
331 CliClassification::NoOp => Self::NoOp,
332 CliClassification::Additive => Self::Additive,
333 CliClassification::Reversible => Self::Reversible,
334 CliClassification::Destructive => Self::Destructive,
335 CliClassification::Lossy => Self::Lossy,
336 CliClassification::Unsupported { reason } => Self::Unsupported { reason },
337 CliClassification::PkTypeFlip {
338 co_destructive,
339 co_lossy,
340 } => Self::PkTypeFlip {
341 co_destructive,
342 co_lossy,
343 },
344 }
345 }
346}
347
348fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
352 workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
353}
354
355fn discover_snapshot_buckets_on_disk(
367 workspace: &Path,
368) -> Vec<djogi::migrate::projection::BucketKey> {
369 let mut out = Vec::new();
370 let migrations_root = djogi::migrate::migrations_root(workspace);
371 let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
372 return out;
373 };
374 for db_entry in db_entries.flatten() {
375 let Ok(ft) = db_entry.file_type() else {
376 continue;
377 };
378 if !ft.is_dir() {
379 continue;
380 }
381 let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
382 continue;
383 };
384 let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
385 continue;
386 };
387 for app_entry in app_entries.flatten() {
388 let Ok(ft) = app_entry.file_type() else {
389 continue;
390 };
391 if !ft.is_dir() {
392 continue;
393 }
394 let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
395 continue;
396 };
397 let snap_path = app_entry.path().join("schema_snapshot.json");
398 if !snap_path.exists() {
399 continue;
400 }
401 let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
402 out.push(djogi::migrate::projection::BucketKey {
403 database: database.clone(),
404 app: label,
405 });
406 }
407 }
408 out
409}
410
411pub fn compose_cmd(
413 provider: &dyn DescriptorProvider,
414 name: &str,
415 allow_destructive: bool,
416 force_overwrite: bool,
417 workspace: Option<PathBuf>,
418) -> ExitCode {
419 let workspace = resolve_workspace(workspace);
420 let models = match project_from_provider(provider) {
421 Ok(m) => m,
422 Err(e) => {
423 eprintln!("djogi migrations compose: projection error: {e}");
424 return ExitCode::from(1);
425 }
426 };
427 let apps: Vec<AppLifecycle> = provider
428 .apps()
429 .iter()
430 .map(|d| AppLifecycle {
431 label: d.label.to_string(),
432 database: d.database.to_string(),
433 renamed_from: d.renamed_from.map(str::to_string),
434 tombstone: d.tombstone,
435 })
436 .collect();
437 let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
442 Ok(c) => c,
443 Err(e) => {
444 eprintln!("djogi migrations compose: config load: {e}");
445 return ExitCode::from(1);
446 }
447 };
448 let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
449 djogi_config.migrate.pk_flip_join_table_option,
450 );
451 compose_with_inputs(
452 &workspace,
453 name,
454 allow_destructive,
455 force_overwrite,
456 &models,
457 &apps,
458 time::OffsetDateTime::now_utc(),
459 Some(pk_flip_option),
460 )
461}
462
463#[allow(clippy::too_many_arguments)]
478fn compose_with_inputs(
479 workspace: &Path,
480 name: &str,
481 allow_destructive: bool,
482 force_overwrite: bool,
483 models: &std::collections::BTreeMap<
484 djogi::migrate::projection::BucketKey,
485 djogi::migrate::AppliedSchema,
486 >,
487 apps: &[AppLifecycle],
488 now: time::OffsetDateTime,
489 pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
490) -> ExitCode {
491 let lock_path = workspace.join(LOCK_FILE_NAME);
492 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
493 Ok(g) => g,
494 Err(e) => {
495 eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
496 return ExitCode::from(1);
497 }
498 };
499
500 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
508 models.keys().cloned().collect();
509 for bucket in discover_snapshot_buckets_on_disk(workspace) {
510 bucket_set.insert(bucket);
511 }
512
513 let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
514 for bucket in &bucket_set {
515 let path = djogi::migrate::snapshot_path(workspace, bucket);
516 match djogi::migrate::load_snapshot(&path) {
517 Ok(s) => {
518 snapshots.insert(bucket.clone(), s);
519 }
520 Err(djogi::migrate::SnapshotError::Io { source, .. })
521 if source.kind() == std::io::ErrorKind::NotFound =>
522 {
523 }
525 Err(e) => {
526 eprintln!(
527 "djogi migrations compose: snapshot load failed at {}: {e}",
528 path.display()
529 );
530 return ExitCode::from(1);
531 }
532 }
533 }
534
535 let req = ComposeRequest {
536 workspace_root: workspace,
537 models,
538 snapshots: &snapshots,
539 apps,
540 name,
541 allow_destructive,
542 force_overwrite,
543 now,
544 _guard: &guard,
545 pk_flip_join_table_option,
546 skip_phase_zero_auto_emit: false,
552 };
553 match compose(req) {
554 Ok(report) => {
555 for emit in &report.emitted_phase_zero {
559 let ext_summary = if emit.extensions.is_empty() {
560 "no extensions".to_string()
561 } else {
562 format!(
563 "extensions: {}",
564 emit.extensions
565 .iter()
566 .cloned()
567 .collect::<Vec<_>>()
568 .join(", ")
569 )
570 };
571 println!(
572 "auto-emitted bootstrap migration: {database}/_global_ ({ext_summary})",
573 database = emit.database,
574 );
575 }
576 for cb in &report.composed_buckets {
577 println!(
578 "composed {database}/{app}: {version} ({classification:?})",
579 database = cb.bucket.database,
580 app = if cb.bucket.app.is_empty() {
581 "_global_"
582 } else {
583 cb.bucket.app.as_str()
584 },
585 version = cb.version,
586 classification = cb.classification,
587 );
588 }
589 ExitCode::from(0)
590 }
591 Err(ComposeError::NothingToCompose) => {
592 println!("nothing to compose — model state matches snapshot for every bucket");
593 ExitCode::from(0)
597 }
598 Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
599 eprintln!("djogi migrations compose: {text}");
600 ExitCode::from(2)
602 }
603 Err(e) => {
604 eprintln!("djogi migrations compose: {e}");
605 ExitCode::from(1)
606 }
607 }
608}
609
610pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
615 let workspace = resolve_workspace(workspace);
616
617 let runtime = match tokio::runtime::Builder::new_current_thread()
619 .enable_all()
620 .build()
621 {
622 Ok(r) => r,
623 Err(e) => {
624 eprintln!("djogi migrations status: tokio runtime: {e}");
625 return ExitCode::from(1);
626 }
627 };
628
629 let exit = runtime.block_on(async { run_status(&workspace).await });
630 ExitCode::from(exit as u8)
631}
632
633async fn run_status(workspace: &Path) -> i32 {
642 use djogi::config::DjogiConfig;
643
644 let config = match DjogiConfig::load_from_workspace(workspace) {
645 Ok(c) => c,
646 Err(e) => {
647 eprintln!("djogi migrations status: config load: {e}");
648 return 1;
649 }
650 };
651
652 let mut ctx = match connect_and_check(&config.database.url).await {
653 ContextOutcome::Ready(ctx) => ctx,
654 ContextOutcome::UnsupportedVersion(e) => {
655 crate::print_support_boundary_error("migrations status", &e);
656 return 2;
657 }
658 ContextOutcome::RuntimeError(msg) => {
659 eprintln!("djogi migrations status: pool: {msg}");
660 return 1;
661 }
662 };
663
664 let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
665 Ok(rows) => rows,
666 Err(e) => {
667 if e.to_string().contains("djogi_schema_migrations") {
670 println!("No migrations recorded.");
671 return 0;
672 }
673 eprintln!("djogi migrations status: ledger read: {e}");
674 return 1;
675 }
676 };
677
678 let registered: Vec<String> = AppRegistry::all()
679 .iter()
680 .map(|d| d.label.to_string())
681 .collect();
682 let report = djogi::migrate::render_status(&rows, ®istered);
683 for line in &report.lines {
684 println!("{line}");
685 }
686 report.exit_code
687}
688
689#[allow(clippy::large_enum_variant)]
710enum ContextOutcome {
711 Ready(djogi::context::DjogiContext),
713 UnsupportedVersion(djogi::error::DjogiError),
716 RuntimeError(String),
719}
720
721async fn connect_and_check(url: &str) -> ContextOutcome {
729 let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
730 Ok(p) => p,
731 Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
732 };
733 match djogi::pg::preflight::check_postgres_version(&pool).await {
734 Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
735 Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
738 ContextOutcome::UnsupportedVersion(e)
739 }
740 Err(other) => ContextOutcome::RuntimeError(other.to_string()),
741 }
742}
743
744fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
764 if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
767 return Some(db_config.url.clone());
768 }
769 if database == "crud_log"
770 && let Some(u) = db_config.crud_log_url.as_deref()
771 && !u.is_empty()
772 {
773 return Some(u.to_string());
774 }
775 if database == "event_log"
776 && let Some(u) = db_config.event_log_url.as_deref()
777 && !u.is_empty()
778 {
779 return Some(u.to_string());
780 }
781 djogi::migrate::derive_per_database_url(&db_config.url, database)
782}
783
784pub fn apply_cmd(
794 workspace: Option<PathBuf>,
795 fake: bool,
796 reason: Option<String>,
797 node_id: Option<u32>,
798 single_node_dev: bool,
799) -> ExitCode {
800 let workspace = resolve_workspace(workspace);
801
802 let mode = if fake {
804 match reason {
805 Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
806 Some(_) => {
807 eprintln!(
808 "djogi migrations apply --fake: --reason must not be empty; \
809 supply a non-empty reason why these migrations are being \
810 faked (e.g. 'schema pre-exists from prior tooling')"
811 );
812 return ExitCode::from(2);
813 }
814 None => {
815 eprintln!(
816 "djogi migrations apply --fake: --reason is required; \
817 supply a reason why these migrations are being faked \
818 (e.g. 'schema pre-exists from prior tooling'). \
819 This is recorded in the ledger audit trail."
820 );
821 return ExitCode::from(2);
822 }
823 }
824 } else {
825 FakeMode::Real
826 };
827
828 let runtime = match tokio::runtime::Builder::new_current_thread()
829 .enable_all()
830 .build()
831 {
832 Ok(r) => r,
833 Err(e) => {
834 eprintln!("djogi migrations apply: tokio runtime: {e}");
835 return ExitCode::from(1);
836 }
837 };
838
839 let exit =
840 runtime.block_on(async { run_apply(&workspace, &mode, node_id, single_node_dev).await });
841 ExitCode::from(exit as u8)
842}
843
844#[derive(Debug, Clone)]
847enum FakeMode {
848 Real,
850 Fake { reason: String },
852}
853
854async fn run_apply(
856 workspace: &Path,
857 mode: &FakeMode,
858 node_id: Option<u32>,
859 single_node_dev: bool,
860) -> i32 {
861 use djogi::config::DjogiConfig;
862
863 let action_verb = match mode {
864 FakeMode::Real => "apply",
865 FakeMode::Fake { .. } => "fake-apply",
866 };
867 let progress_verb = match mode {
868 FakeMode::Real => "applying",
869 FakeMode::Fake { .. } => "faking",
870 };
871
872 let config = match DjogiConfig::load_from_workspace(workspace) {
874 Ok(c) => c,
875 Err(e) => {
876 eprintln!("djogi migrations {action_verb}: config load: {e}");
877 return 2;
878 }
879 };
880
881 let pending_files = match discover_pending_plans(workspace) {
885 Ok(pending_files) => pending_files,
886 Err(e) => {
887 eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
888 return 2;
889 }
890 };
891 if pending_files.is_empty() {
892 println!("No pending migrations to {action_verb}.");
893 return 0;
894 }
895
896 let runner_identity = match crate::identity::resolve_identity(
899 node_id,
900 single_node_dev,
901 &config.profile,
902 action_verb,
903 ) {
904 Ok(resolved) => Some(resolved.into_runner_identity()),
905 Err(e) => {
906 let _ = crate::identity::print_identity_error(action_verb, &e);
907 return 2;
908 }
909 };
910
911 let target_urls = match resolve_apply_target_urls(&pending_files, &config.database) {
916 Ok(urls) => urls,
917 Err(e) => {
918 eprintln!("djogi migrations {action_verb}: target routing: {e}");
919 return 2;
920 }
921 };
922 let mut contexts = std::collections::BTreeMap::<String, djogi::context::DjogiContext>::new();
923 for (database, url) in &target_urls {
924 match connect_and_check(url).await {
925 ContextOutcome::Ready(ctx) => {
926 contexts.insert(database.clone(), ctx);
927 }
928 ContextOutcome::UnsupportedVersion(e) => {
929 crate::print_support_boundary_error("migrations apply", &e);
930 return 2;
931 }
932 ContextOutcome::RuntimeError(msg) => {
933 eprintln!("djogi migrations {action_verb}: pool for '{database}': {msg}");
934 return 1;
935 }
936 }
937 }
938
939 let lock_path = workspace.join(LOCK_FILE_NAME);
941 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
942 Ok(g) => g,
943 Err(e) => {
944 eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
945 return 1;
946 }
947 };
948
949 let pending_files = match reconcile_pending_plans_after_lock(workspace, &pending_files) {
951 Ok(pending_files) => pending_files,
952 Err(e) => {
953 eprintln!("djogi migrations {action_verb}: pending discovery: {e}");
954 return 2;
955 }
956 };
957
958 let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
960 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
961 Err(_) => None,
962 };
963
964 for pending_file in &pending_files {
968 let bucket_database = &pending_file.bucket.database;
969 let app_label = &pending_file.bucket.app;
970 let Some(ctx) = contexts.get_mut(bucket_database) else {
971 eprintln!(
972 "djogi migrations {action_verb}: internal error: missing context for database '{bucket_database}'"
973 );
974 return 1;
975 };
976 println!(" {progress_verb} {bucket_database}/{app_label}...");
977 let result = apply_one_pending(
978 ctx,
979 workspace,
980 pending_file,
981 &config,
982 &guard,
983 audit_pool.as_ref(),
984 mode,
985 runner_identity,
986 )
987 .await;
988
989 match result {
990 ApplyResult::Ok => match mode {
991 FakeMode::Real => {
992 println!("Applied: {bucket_database}/{app_label}");
993 }
994 FakeMode::Fake { .. } => {
995 println!(
996 " faked {bucket_database}/{app_label}: \
997 recorded in ledger with status = 'faked' (no SQL executed)"
998 );
999 }
1000 },
1001 ApplyResult::Skipped(reason) => {
1002 println!("Skipped {bucket_database}/{app_label}: {reason}");
1003 }
1004 ApplyResult::Refused(reason) => {
1005 eprintln!(
1006 "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
1007 );
1008 return 2;
1009 }
1010 ApplyResult::RunnerError(e) => {
1011 eprintln!(
1012 "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
1013 );
1014 return runner_error_exit_code(&e);
1015 }
1016 }
1017 }
1018
1019 let summary_verb = match mode {
1020 FakeMode::Real => "applied",
1021 FakeMode::Fake { .. } => "faked",
1022 };
1023 println!("{summary_verb} {} migration(s).", pending_files.len());
1024 0
1025}
1026
1027#[derive(Debug)]
1029enum ApplyResult {
1030 Ok,
1032 Skipped(String),
1034 Refused(String),
1036 RunnerError(RunnerError),
1038}
1039
1040#[derive(Debug, Clone, PartialEq, Eq)]
1041struct DiscoveredPendingPlan {
1042 path: PathBuf,
1043 bucket: BucketKey,
1044 plan: PendingPlan,
1045 is_phase_zero: bool,
1046}
1047
1048fn is_acceptable_pending_path_component(bytes: &[u8]) -> bool {
1049 if bytes.is_empty() || bytes.len() > 63 {
1050 return false;
1051 }
1052 if bytes[0] == b'.' {
1053 return false;
1054 }
1055 let first = bytes[0];
1056 if first != b'_' && !first.is_ascii_alphabetic() {
1057 return false;
1058 }
1059 for &b in &bytes[1..] {
1060 if b != b'_' && !b.is_ascii_alphanumeric() {
1061 return false;
1062 }
1063 }
1064 true
1065}
1066
1067fn canonical_pending_filename(app_label: &str) -> String {
1068 format!("{}.json", djogi::migrate::app_dirname(app_label))
1069}
1070
1071fn validate_hidden_phase_zero_pending(
1072 path: PathBuf,
1073 database: &str,
1074) -> Result<DiscoveredPendingPlan, String> {
1075 let filename = path
1076 .file_name()
1077 .and_then(|f| f.to_str())
1078 .ok_or_else(|| format!("non-utf8 Phase 0 pending path {}", path.display()))?;
1079 let expected_filename = format!("{}.json", djogi::migrate::PHASE_ZERO_VERSION);
1080 if filename != expected_filename {
1081 return Err(format!(
1082 "hidden Phase 0 pending path {} must use canonical filename {}",
1083 path.display(),
1084 expected_filename
1085 ));
1086 }
1087 let plan = djogi::migrate::load_pending(&path)
1088 .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1089 if plan.bucket_database != database {
1090 return Err(format!(
1091 "pending JSON {} has bucket database {}, expected {} from path",
1092 path.display(),
1093 plan.bucket_database,
1094 database
1095 ));
1096 }
1097 if !plan.bucket_app.is_empty() {
1098 return Err(format!(
1099 "pending JSON {} must target the global bucket in hidden Phase 0 namespace",
1100 path.display()
1101 ));
1102 }
1103 if plan.version != djogi::migrate::PHASE_ZERO_VERSION {
1104 return Err(format!(
1105 "pending JSON {} must use Phase 0 version {}, found {}",
1106 path.display(),
1107 djogi::migrate::PHASE_ZERO_VERSION,
1108 plan.version
1109 ));
1110 }
1111 Ok(DiscoveredPendingPlan {
1112 path,
1113 bucket: BucketKey {
1114 database: database.to_string(),
1115 app: String::new(),
1116 },
1117 plan,
1118 is_phase_zero: true,
1119 })
1120}
1121
1122fn validate_normal_pending(
1123 path: PathBuf,
1124 database: &str,
1125 filename: &str,
1126) -> Result<DiscoveredPendingPlan, String> {
1127 let Some(stem) = filename.strip_suffix(".json") else {
1128 return Err(format!(
1129 "pending path {} must end with .json",
1130 path.display()
1131 ));
1132 };
1133 let app = if stem == "_global_" {
1134 String::new()
1135 } else {
1136 if !is_acceptable_pending_path_component(stem.as_bytes()) {
1137 return Err(format!(
1138 "pending path {} uses non-canonical app filename {}",
1139 path.display(),
1140 filename
1141 ));
1142 }
1143 stem.to_string()
1144 };
1145 let expected_filename = canonical_pending_filename(&app);
1146 if filename != expected_filename {
1147 return Err(format!(
1148 "pending path {} must use canonical filename {}",
1149 path.display(),
1150 expected_filename
1151 ));
1152 }
1153 let plan = djogi::migrate::load_pending(&path)
1154 .map_err(|e| format!("parse pending JSON {}: {e}", path.display()))?;
1155 if plan.bucket_database != database {
1156 return Err(format!(
1157 "pending JSON {} has bucket database {}, expected {} from path",
1158 path.display(),
1159 plan.bucket_database,
1160 database
1161 ));
1162 }
1163 if plan.bucket_app != app {
1164 let expected_app = if app.is_empty() {
1165 "_global_"
1166 } else {
1167 app.as_str()
1168 };
1169 let found_app = if plan.bucket_app.is_empty() {
1170 "_global_"
1171 } else {
1172 plan.bucket_app.as_str()
1173 };
1174 return Err(format!(
1175 "pending JSON {} has bucket app {}, expected {} from path",
1176 path.display(),
1177 found_app,
1178 expected_app
1179 ));
1180 }
1181 if plan.version == djogi::migrate::PHASE_ZERO_VERSION {
1182 return Err(format!(
1183 "pending JSON {} must use the hidden .phase_zero namespace for Phase 0",
1184 path.display()
1185 ));
1186 }
1187 Ok(DiscoveredPendingPlan {
1188 path,
1189 bucket: BucketKey {
1190 database: database.to_string(),
1191 app,
1192 },
1193 is_phase_zero: false,
1194 plan,
1195 })
1196}
1197
1198fn discover_pending_plans(workspace: &Path) -> Result<Vec<DiscoveredPendingPlan>, String> {
1203 let pending_root = djogi::migrate::pending_root(workspace);
1204 let mut out = Vec::new();
1205 let mut seen_identities = std::collections::BTreeSet::new();
1206
1207 let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
1208 return Ok(out);
1209 };
1210
1211 for db_entry in db_entries.flatten() {
1212 let db_name = match db_entry.file_name().to_str().map(str::to_string) {
1213 Some(n) => n,
1214 None => continue,
1215 };
1216 if !is_acceptable_pending_path_component(db_name.as_bytes()) {
1217 continue;
1218 }
1219
1220 let db_dir = db_entry.path();
1221 if !db_dir.is_dir() {
1222 continue;
1223 }
1224
1225 let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
1226 continue;
1227 };
1228
1229 for app_entry in app_entries.flatten() {
1230 let path = app_entry.path();
1231 let file_type = match app_entry.file_type() {
1232 Ok(file_type) => file_type,
1233 Err(_) => continue,
1234 };
1235 if file_type.is_dir() {
1236 if app_entry.file_name().to_str() == Some(".phase_zero") {
1237 let Ok(phase_zero_entries) = std::fs::read_dir(&path) else {
1238 continue;
1239 };
1240 for phase_zero_entry in phase_zero_entries.flatten() {
1241 let phase_zero_path = phase_zero_entry.path();
1242 if !phase_zero_path.is_file() {
1243 continue;
1244 }
1245 let discovered =
1246 validate_hidden_phase_zero_pending(phase_zero_path, &db_name)?;
1247 let identity = (
1248 discovered.bucket.database.clone(),
1249 discovered.bucket.app.clone(),
1250 discovered.plan.version.clone(),
1251 );
1252 if !seen_identities.insert(identity.clone()) {
1253 return Err(format!(
1254 "duplicate pending identity discovered for {}/{}/{}",
1255 identity.0,
1256 if identity.1.is_empty() {
1257 "_global_"
1258 } else {
1259 identity.1.as_str()
1260 },
1261 identity.2
1262 ));
1263 }
1264 out.push(discovered);
1265 }
1266 }
1267 continue;
1268 }
1269 if !file_type.is_file() {
1270 continue;
1271 }
1272 let filename = match path.file_name().and_then(|f| f.to_str()) {
1273 Some(f) => f.to_string(),
1274 None => continue,
1275 };
1276 if !filename.ends_with(".json") {
1277 continue;
1278 }
1279 let discovered = validate_normal_pending(path, &db_name, &filename)?;
1280 let identity = (
1281 discovered.bucket.database.clone(),
1282 discovered.bucket.app.clone(),
1283 discovered.plan.version.clone(),
1284 );
1285 if !seen_identities.insert(identity.clone()) {
1286 return Err(format!(
1287 "duplicate pending identity discovered for {}/{}/{}",
1288 identity.0,
1289 if identity.1.is_empty() {
1290 "_global_"
1291 } else {
1292 identity.1.as_str()
1293 },
1294 identity.2
1295 ));
1296 }
1297 out.push(discovered);
1298 }
1299 }
1300
1301 out.sort_by(|a, b| {
1302 a.plan
1303 .version
1304 .cmp(&b.plan.version)
1305 .then_with(|| b.is_phase_zero.cmp(&a.is_phase_zero))
1306 .then_with(|| a.path.cmp(&b.path))
1307 });
1308 Ok(out)
1309}
1310
1311fn load_verified_pending_for_apply(
1312 pending_file: &DiscoveredPendingPlan,
1313) -> Result<PendingPlan, String> {
1314 let pending_bytes =
1315 std::fs::read(&pending_file.path).map_err(|e| format!("read pending JSON: {e}"))?;
1316 let pending: PendingPlan =
1317 serde_json::from_slice(&pending_bytes).map_err(|e| format!("parse pending JSON: {e}"))?;
1318 if pending != pending_file.plan {
1319 return Err(format!(
1320 "pending JSON changed after discovery at {}; rerun the command",
1321 pending_file.path.display()
1322 ));
1323 }
1324 Ok(pending)
1325}
1326
1327fn resolve_apply_target_urls(
1328 pending_files: &[DiscoveredPendingPlan],
1329 db_config: &djogi::config::DatabaseConfig,
1330) -> Result<std::collections::BTreeMap<String, String>, String> {
1331 let mut urls = std::collections::BTreeMap::new();
1332 for pending_file in pending_files {
1333 let database = &pending_file.bucket.database;
1334 if urls.contains_key(database) {
1335 continue;
1336 }
1337 let Some(url) = resolve_bucket_url(db_config, database) else {
1338 return Err(format!("cannot derive a database URL for `{database}`"));
1339 };
1340 urls.insert(database.clone(), url);
1341 }
1342 Ok(urls)
1343}
1344
1345fn reconcile_pending_plans_after_lock(
1346 workspace: &Path,
1347 pre_lock_pending_files: &[DiscoveredPendingPlan],
1348) -> Result<Vec<DiscoveredPendingPlan>, String> {
1349 let locked_pending_files = discover_pending_plans(workspace)?;
1350 if locked_pending_files != pre_lock_pending_files {
1351 return Err(
1352 "pending migration set changed while waiting for the workspace lock; rerun the command"
1353 .to_string(),
1354 );
1355 }
1356 Ok(locked_pending_files)
1357}
1358
1359#[allow(clippy::too_many_arguments)]
1377#[djogi::deliberately_bypass_convention_with_raw_sql]
1378async fn apply_one_pending(
1385 ctx: &mut djogi::context::DjogiContext,
1386 workspace: &Path,
1387 pending_file: &DiscoveredPendingPlan,
1388 config: &djogi::config::DjogiConfig,
1389 guard: &djogi::migrate::WorkspaceGuard,
1390 audit_pool: Option<&deadpool_postgres::Pool>,
1391 mode: &FakeMode,
1392 runner_identity: Option<djogi::migrate::RunnerIdentity>,
1393) -> ApplyResult {
1394 let pending = match load_verified_pending_for_apply(pending_file) {
1396 Ok(pending) => pending,
1397 Err(e) => return ApplyResult::Refused(e),
1398 };
1399
1400 let bucket = pending_file.bucket.clone();
1401
1402 match check_ledger_state(ctx, &pending.version).await {
1404 LedgerState::NotPresent => {} LedgerState::AlreadyApplied => {
1406 return ApplyResult::Skipped("already applied".to_string());
1407 }
1408 LedgerState::PendingOrPartial(existing_status) => {
1409 if existing_status == LedgerStatus::Failed
1413 || existing_status == LedgerStatus::RolledBack
1414 {
1415 if pending.version == djogi::migrate::PHASE_ZERO_VERSION {
1421 let cleanup_refusal = classify_phase_zero_for_cleanup(
1422 workspace,
1423 &bucket,
1424 &pending.version,
1425 &pending.checksum_up,
1426 pending.checksum_down.as_deref(),
1427 );
1428 if let Some(reason) = cleanup_refusal {
1429 return ApplyResult::Refused(format!(
1430 "Phase 0 cleanup refused: {reason}; \
1431 refusing before deleting {} row to prevent stale replay",
1432 existing_status.as_db_str()
1433 ));
1434 }
1435 }
1436
1437 if let Err(e) = delete_reapply_blocking_ledger_row(ctx, &pending.version).await {
1441 return ApplyResult::Refused(format!(
1442 "clean {} ledger row: {e}",
1443 existing_status.as_db_str()
1444 ));
1445 }
1446 } else {
1447 return ApplyResult::Refused(format!(
1448 "version already in {} state — resolve before re-applying",
1449 existing_status.as_db_str()
1450 ));
1451 }
1452 }
1453 }
1454
1455 let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1457 workspace,
1458 &bucket,
1459 &pending.version,
1460 &pending.checksum_up,
1461 pending.checksum_down.as_deref(),
1462 ) {
1463 Ok(result) => result,
1464 Err(e) => {
1465 return ApplyResult::Refused(format!("load replay plan: {e}"));
1466 }
1467 };
1468
1469 let runner_ctx = RunnerCtx {
1471 bucket: bucket.clone(),
1472 version: pending.version.clone(),
1473 description: pending.slug.clone(),
1474 checksum_up,
1475 checksum_down,
1476 snapshot: Some(pending.model_snapshot.clone()),
1477 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
1478 config: djogi::config::MigrateConfig {
1480 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1481 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1482 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1483 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1484 },
1485 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1486 audit_pool: audit_pool.cloned(),
1487 runner_identity,
1488 };
1489
1490 let runner_result = match mode {
1492 FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1493 FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1494 };
1495 match runner_result {
1496 Ok(_) => ApplyResult::Ok,
1497 Err(e) => ApplyResult::RunnerError(e),
1498 }
1499}
1500
1501#[derive(Debug)]
1503enum LedgerState {
1504 NotPresent,
1506 AlreadyApplied,
1508 PendingOrPartial(LedgerStatus),
1510}
1511
1512async fn check_ledger_state(ctx: &mut djogi::context::DjogiContext, version: &str) -> LedgerState {
1514 let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1515 return LedgerState::NotPresent;
1518 };
1519
1520 let existing = rows.iter().find(|r| r.version == version);
1521 match existing {
1522 None => LedgerState::NotPresent,
1523 Some(row) => match row.status {
1524 LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1525 LedgerState::AlreadyApplied
1526 }
1527 LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1528 LedgerState::PendingOrPartial(row.status)
1529 }
1530 },
1531 }
1532}
1533
1534fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1539 1
1540}
1541
1542#[djogi::deliberately_bypass_convention_with_raw_sql]
1543async fn delete_reapply_blocking_ledger_row(
1549 ctx: &mut djogi::context::DjogiContext,
1550 version: &str,
1551) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1552 ctx.raw_execute(
1553 "DELETE FROM djogi_schema_migrations WHERE version = $1",
1554 &[&version],
1555 )
1556 .await?;
1557 Ok(())
1558}
1559
1560fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1562 let migrations_root = djogi::migrate::migrations_root(workspace);
1563 migrations_root
1564 .join(&bucket.database)
1565 .join(djogi::migrate::app_dirname(&bucket.app))
1566 .join("schema_snapshot.json")
1567}
1568
1569#[allow(clippy::too_many_arguments)]
1596pub fn attune_cmd(
1597 target: Option<&str>,
1598 apply: bool,
1599 record: bool,
1600 record_ledger: bool,
1601 record_reason: &str,
1602 squash: bool,
1603 from: Option<&str>,
1604 publish: bool,
1605 app: Option<&str>,
1606 workspace: Option<PathBuf>,
1607) -> ExitCode {
1608 let workspace = resolve_workspace(workspace);
1609 let mode = match (record_ledger, squash) {
1610 (false, false) => AttuneMode::DiffOnly,
1611 (true, false) => AttuneMode::Record {
1612 reason: record_reason.to_string(),
1613 },
1614 (false, true) => match from {
1615 Some(v) if !v.is_empty() => AttuneMode::Squash {
1616 from: v.to_string(),
1617 publish,
1618 app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1619 },
1620 _ => {
1621 eprintln!(
1622 "djogi migrations attune --squash requires --from <version> (e.g. \
1623 `--from V20260101000000__init`)"
1624 );
1625 return ExitCode::from(2);
1626 }
1627 },
1628 (true, true) => {
1629 eprintln!(
1632 "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1633 );
1634 return ExitCode::from(2);
1635 }
1636 };
1637
1638 let runtime = match tokio::runtime::Builder::new_current_thread()
1639 .enable_all()
1640 .build()
1641 {
1642 Ok(r) => r,
1643 Err(e) => {
1644 eprintln!("djogi migrations attune: tokio runtime: {e}");
1645 return ExitCode::from(1);
1646 }
1647 };
1648
1649 let target_owned = target.map(str::to_string);
1650 let exit =
1651 runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1652 ExitCode::from(exit as u8)
1653}
1654
1655async fn run_attune(
1658 workspace: &Path,
1659 mode: AttuneMode,
1660 target: Option<String>,
1661 apply: bool,
1662 record: bool,
1663) -> i32 {
1664 use djogi::config::DjogiConfig;
1665
1666 let config = match DjogiConfig::load_from_workspace(workspace) {
1667 Ok(c) => c,
1668 Err(e) => {
1669 eprintln!("djogi migrations attune: config load: {e}");
1670 return 1;
1671 }
1672 };
1673
1674 let mut ctx = match connect_and_check(&config.database.url).await {
1675 ContextOutcome::Ready(ctx) => ctx,
1676 ContextOutcome::UnsupportedVersion(e) => {
1677 crate::print_support_boundary_error("migrations attune", &e);
1678 return 2;
1679 }
1680 ContextOutcome::RuntimeError(msg) => {
1681 eprintln!("djogi migrations attune: pool: {msg}");
1682 return 1;
1683 }
1684 };
1685
1686 let lock_path = workspace.join(LOCK_FILE_NAME);
1690 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1691 Ok(g) => g,
1692 Err(e) => {
1693 eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
1694 return 1;
1695 }
1696 };
1697
1698 let req = AttuneRequest {
1699 workspace_root: workspace,
1700 database_url: &config.database.url,
1701 profile: &config.profile,
1702 dev_mode: config.database.dev_mode,
1706 target: target.as_deref(),
1711 apply,
1712 record,
1713 mode,
1714 _guard: &guard,
1715 };
1716 match attune(&mut ctx, req).await {
1717 Ok(report) => {
1718 if report.entries.is_empty() {
1719 println!("attune: no drift");
1720 } else {
1721 for entry in &report.entries {
1722 let app_display = if entry.bucket.app.is_empty() {
1723 "_global_"
1724 } else {
1725 entry.bucket.app.as_str()
1726 };
1727 println!(
1728 " {kind:<10} {database}/{app} {version}",
1729 kind = entry.kind.as_str(),
1730 database = entry.bucket.database,
1731 app = app_display,
1732 version = entry.version,
1733 );
1734 }
1735 }
1736 for diag in &report.diagnostics {
1740 println!(" diagnostic: {diag}");
1741 }
1742 if let Some(sha) = &report.resolved_target {
1743 println!("resolved target: {sha}");
1744 }
1745 if let Some(squashed) = &report.squashed_to {
1746 println!("squashed to: {squashed}");
1747 }
1748 if report.published {
1749 println!("published to remote");
1750 }
1751 if report.parent_pointer_updated {
1752 println!("parent submodule pointer updated");
1753 }
1754 0
1755 }
1756 Err(e) => {
1757 eprintln!("djogi migrations attune: {e}");
1758 attune_error_exit_code(&e)
1759 }
1760 }
1761}
1762
1763fn attune_error_exit_code(err: &AttuneError) -> i32 {
1778 match err {
1779 AttuneError::Refused(_) => 2,
1780 AttuneError::FilesystemScanFailed { .. }
1781 | AttuneError::LedgerQueryFailed { .. }
1782 | AttuneError::SqlReadFailed { .. }
1783 | AttuneError::SqlWriteFailed { .. }
1784 | AttuneError::SqlDeleteFailed { .. }
1785 | AttuneError::GitPublishFailed { .. }
1786 | AttuneError::GitTargetResolveFailed { .. }
1787 | AttuneError::GitFetchFailed { .. }
1788 | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
1789 }
1790}
1791
1792pub fn verify_cmd(
1800 provider: &dyn DescriptorProvider,
1801 workspace: Option<PathBuf>,
1802 strict: bool,
1803) -> ExitCode {
1804 let workspace = resolve_workspace(workspace);
1805
1806 let runtime = match tokio::runtime::Builder::new_current_thread()
1807 .enable_all()
1808 .build()
1809 {
1810 Ok(r) => r,
1811 Err(e) => {
1812 eprintln!("djogi migrations verify: tokio runtime: {e}");
1813 return ExitCode::from(1);
1814 }
1815 };
1816
1817 let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
1818 ExitCode::from(exit as u8)
1819}
1820
1821async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
1836 use djogi::config::DjogiConfig;
1837
1838 if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
1850 crate::print_zero_descriptor_diagnostic("migrations verify");
1851 return 2;
1852 }
1853
1854 let config = match DjogiConfig::load_from_workspace(workspace) {
1856 Ok(c) => c,
1857 Err(e) => {
1858 eprintln!("djogi migrations verify: config load: {e}");
1859 return 1;
1860 }
1861 };
1862
1863 let models = match project_from_provider(provider) {
1865 Ok(m) => m,
1866 Err(e) => {
1867 eprintln!("djogi migrations verify: projection error: {e}");
1868 return 1;
1869 }
1870 };
1871
1872 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
1878 models.keys().cloned().collect();
1879 for bucket in discover_snapshot_buckets_on_disk(workspace) {
1880 bucket_set.insert(bucket);
1881 }
1882 if bucket_set.is_empty() {
1889 crate::print_zero_descriptor_diagnostic("migrations verify");
1890 return 2;
1891 }
1892
1893 let policy = djogi::config::PolicyConfig {
1895 strict_out_of_order: strict,
1896 };
1897
1898 let database_has_models: std::collections::HashSet<String> = bucket_set
1905 .iter()
1906 .filter(|b| {
1907 models
1908 .get(*b)
1909 .map(|s| !s.models.is_empty())
1910 .unwrap_or(false)
1911 })
1912 .map(|b| b.database.clone())
1913 .collect();
1914
1915 let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
1921 std::collections::BTreeMap::new();
1922 let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
1923 let mut exit_code: i32 = 0;
1924
1925 for bucket in &bucket_set {
1927 let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
1929 let bd = if bucket.app.is_empty() {
1930 "_global_"
1931 } else {
1932 &bucket.app
1933 };
1934 eprintln!(
1935 "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
1936 check that config.database.url has a valid path component",
1937 bucket.database, bucket.database, bd
1938 );
1939 exit_code = 1;
1940 continue;
1941 };
1942
1943 if !contexts.contains_key(&bucket.database) {
1947 match connect_and_check(&url).await {
1948 ContextOutcome::Ready(ctx) => {
1949 contexts.insert(bucket.database.clone(), ctx);
1950 }
1951 ContextOutcome::UnsupportedVersion(e) => {
1952 crate::print_support_boundary_error("migrations verify", &e);
1953 return 2;
1954 }
1955 ContextOutcome::RuntimeError(msg) => {
1956 eprintln!(
1957 "djogi migrations verify: pool for '{}': {msg}",
1958 bucket.database
1959 );
1960 exit_code = 1;
1961 continue;
1962 }
1963 }
1964 }
1965
1966 let snap_path = snapshot_path(workspace, bucket);
1971 let snapshot = match load_snapshot(&snap_path) {
1972 Ok(s) => s,
1973 Err(SnapshotError::Io { source, .. })
1974 if source.kind() == std::io::ErrorKind::NotFound =>
1975 {
1976 let bd = if bucket.app.is_empty() {
1977 "_global_"
1978 } else {
1979 &bucket.app
1980 };
1981 let has_models = models
1982 .get(bucket)
1983 .map(|s| !s.models.is_empty())
1984 .unwrap_or(false);
1985 if has_models {
1986 eprintln!(
1987 "djogi migrations verify: {}/{} has registered models but no \
1988 snapshot; run `djogi migrations compose` then \
1989 `djogi migrations apply` to record a baseline",
1990 bucket.database, bd
1991 );
1992 exit_code = 1;
1993 } else {
1994 println!("No snapshot found for bucket {}/{}", bucket.database, bd);
1995 }
1996 continue;
1997 }
1998 Err(e) => {
1999 let bd = if bucket.app.is_empty() {
2000 "_global_"
2001 } else {
2002 &bucket.app
2003 };
2004 eprintln!(
2005 "djogi migrations verify: load snapshot for {}/{}: {e}",
2006 bucket.database, bd
2007 );
2008 exit_code = 1;
2009 continue;
2010 }
2011 };
2012
2013 let db_has_models = database_has_models.contains(&bucket.database);
2018 let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
2019
2020 let ctx = contexts
2022 .get_mut(&bucket.database)
2023 .expect("context inserted above");
2024 let report = match djogi::migrate::verify_bucket(
2025 ctx,
2026 bucket,
2027 &snapshot,
2028 &policy,
2029 emit_ledger,
2030 db_has_models,
2031 )
2032 .await
2033 {
2034 Ok(r) => r,
2035 Err(e) => {
2036 let bd = if bucket.app.is_empty() {
2037 "_global_"
2038 } else {
2039 &bucket.app
2040 };
2041 eprintln!(
2042 "djogi migrations verify: error for {}/{}: {e}",
2043 bucket.database, bd
2044 );
2045 exit_code = 1;
2046 continue;
2047 }
2048 };
2049
2050 for line in render_verify_report(&report, bucket) {
2052 println!("{line}");
2053 }
2054 if report.has_errors() {
2055 exit_code = 1;
2056 }
2057 }
2058
2059 exit_code
2060}
2061
2062fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
2070 let mut lines: Vec<String> = Vec::new();
2071
2072 let app_display = if bucket.app.is_empty() {
2073 "_global_"
2074 } else {
2075 &bucket.app
2076 };
2077 lines.push(format!(
2078 "djogi migrations verify — {}/{}",
2079 bucket.database, app_display
2080 ));
2081 lines.push("──────────────────────────────────────────".to_string());
2082
2083 match (
2084 &report.latest_applied_version,
2085 report.applied_count,
2086 report.unfinished_count,
2087 ) {
2088 (Some(version), applied, 0) => {
2089 lines.push(format!("Ledger: {applied} applied, latest {version}"));
2090 }
2091 (Some(version), applied, unfinished) => {
2092 lines.push(format!(
2093 "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
2094 ));
2095 }
2096 (None, 0, 0) => {
2097 lines.push("Ledger: empty (no migrations applied yet)".to_string());
2098 }
2099 _ => {}
2100 }
2101 lines.push(String::new());
2102
2103 if report.diagnostics.is_empty() {
2104 lines.push("No drift detected. Schema is consistent.".to_string());
2105 } else {
2106 for d in &report.diagnostics {
2107 let severity = match d.severity {
2108 VerifySeverity::Info => "INFO",
2109 VerifySeverity::Warning => "WARN",
2110 VerifySeverity::Error => "ERROR",
2111 };
2112 let location = d.location.as_deref().unwrap_or("-");
2113 lines.push(format!(
2114 "[{severity}] {code} ({loc}): {msg}",
2115 severity = severity,
2116 code = d.code,
2117 loc = location,
2118 msg = d.message
2119 ));
2120 }
2121 }
2122
2123 let errors = report
2124 .diagnostics
2125 .iter()
2126 .filter(|d| d.severity == VerifySeverity::Error)
2127 .count();
2128 let warnings = report
2129 .diagnostics
2130 .iter()
2131 .filter(|d| d.severity == VerifySeverity::Warning)
2132 .count();
2133 let infos = report
2134 .diagnostics
2135 .iter()
2136 .filter(|d| d.severity == VerifySeverity::Info)
2137 .count();
2138
2139 if errors > 0 {
2140 lines.push(String::new());
2141 lines.push(format!(
2142 "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
2143 ));
2144 } else if warnings > 0 {
2145 lines.push(String::new());
2146 lines.push(format!(
2147 "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
2148 ));
2149 } else {
2150 lines.push(String::new());
2151 lines.push(format!("Result: PASSED ({infos} info(s))"));
2152 }
2153
2154 lines
2155}
2156
2157impl From<PartialApplyResolutionCli> for PartialApplyResolution {
2160 fn from(cli: PartialApplyResolutionCli) -> Self {
2161 match cli {
2162 PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
2163 PartialApplyResolutionCli::Faked => Self::MarkFaked,
2164 PartialApplyResolutionCli::Applied => Self::MarkApplied,
2165 }
2166 }
2167}
2168
2169pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
2174 match command {
2175 RepairSubcommand::ChecksumDrift {
2176 version,
2177 app,
2178 database,
2179 checksum_up,
2180 checksum_down,
2181 workspace,
2182 } => repair_checksum_drift_cmd(
2183 &version,
2184 app.as_deref(),
2185 database.as_deref(),
2186 checksum_up.as_deref(),
2187 checksum_down.as_deref(),
2188 workspace,
2189 ),
2190 RepairSubcommand::PartialApply {
2191 version,
2192 resolution,
2193 note,
2194 app,
2195 database,
2196 workspace,
2197 } => repair_partial_apply_cmd(
2198 &version,
2199 resolution.into(),
2200 ¬e,
2201 app.as_deref(),
2202 database.as_deref(),
2203 workspace,
2204 ),
2205 RepairSubcommand::ResumePartial {
2206 version,
2207 app,
2208 database,
2209 workspace,
2210 node_id,
2211 single_node_dev,
2212 } => repair_resume_partial_apply_cmd(
2213 &version,
2214 app.as_deref(),
2215 database.as_deref(),
2216 workspace,
2217 node_id,
2218 single_node_dev,
2219 ),
2220 RepairSubcommand::SnapshotRebuild {
2221 app,
2222 database,
2223 snapshot_path,
2224 workspace,
2225 } => repair_snapshot_rebuild_cmd(
2226 app.as_deref(),
2227 database.as_deref(),
2228 snapshot_path.as_deref(),
2229 workspace,
2230 ),
2231 }
2232}
2233
2234fn render_repair_report(report: &RepairReport) {
2238 for action in &report.actions_taken {
2239 println!(" {action}");
2240 }
2241 if !report.ledger_changes.is_empty() {
2242 println!("Ledger changes:");
2243 for lc in &report.ledger_changes {
2244 println!(
2245 " {} | {} | {} -> {}",
2246 lc.version, lc.column, lc.before, lc.after,
2247 );
2248 }
2249 }
2250 if !report.snapshot_changes.is_empty() {
2251 println!("Snapshot changes:");
2252 for sc in &report.snapshot_changes {
2253 println!(" {} | {}", sc.path.display(), sc.description);
2254 }
2255 }
2256}
2257
2258fn repair_error_exit_code(err: &RepairError) -> i32 {
2272 match err {
2273 RepairError::LedgerIo { .. } | RepairError::SnapshotIo { .. } | RepairError::AdvisoryLockFailed { .. } | RepairError::AdvisoryLockQueryFailed { .. } | RepairError::PinnedSessionCheckoutFailed { .. } | RepairError::ResumeStepFailed { .. } | RepairError::ResumeProgressAckFailed { .. } => 1,
2284
2285 RepairError::VersionNotFound { .. }
2289 | RepairError::InsufficientConfirmation
2290 | RepairError::InvalidChecksum { .. }
2291 | RepairError::InvalidResolution { .. }
2292 | RepairError::BucketAppMismatch { .. }
2293 | RepairError::PlanVersionMismatch { .. }
2294 | RepairError::PlanChecksumMismatch { .. }
2295 | RepairError::LeafIdentityMismatch { .. }
2296 | RepairError::NothingToResume { .. }
2297 | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
2298 | RepairError::SuppliedSnapshotDiverges { .. }
2299 | RepairError::AdvisoryUnlockReturnedFalse { .. } | RepairError::ResumePlanShapeMismatch { .. }
2301 | RepairError::ReplayPlanShapeMismatch { .. }
2302 | RepairError::PhaseZeroArtifactRefused { .. } | RepairError::MissingResumeIdentity { .. } => 2,
2305 }
2306}
2307
2308fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
2315 database.unwrap_or("main").to_string()
2316}
2317
2318fn compute_checksum_up_from_disk(
2332 workspace: &Path,
2333 bucket: &djogi::migrate::BucketKey,
2334 version: &str,
2335) -> std::io::Result<String> {
2336 let path =
2337 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
2338 let sql = std::fs::read_to_string(&path)?;
2339 Ok(djogi::migrate::compute_committed_sql_checksum(
2340 &sql,
2341 djogi::migrate::ResetSqlSide::Up,
2342 ))
2343}
2344
2345fn compute_checksum_down_from_disk(
2355 workspace: &Path,
2356 bucket: &djogi::migrate::BucketKey,
2357 version: &str,
2358) -> std::io::Result<Option<String>> {
2359 let path =
2360 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
2361 let sql = match std::fs::read_to_string(&path) {
2362 Ok(s) => s,
2363 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
2364 Err(e) => return Err(e),
2365 };
2366 Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
2367}
2368
2369pub fn repair_checksum_drift_cmd(
2376 version: &str,
2377 app: Option<&str>,
2378 database: Option<&str>,
2379 checksum_up: Option<&str>,
2380 checksum_down: Option<&str>,
2381 workspace: Option<PathBuf>,
2382) -> ExitCode {
2383 let workspace = resolve_workspace(workspace);
2384 let runtime = match tokio::runtime::Builder::new_current_thread()
2385 .enable_all()
2386 .build()
2387 {
2388 Ok(r) => r,
2389 Err(e) => {
2390 eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
2391 return ExitCode::from(1);
2392 }
2393 };
2394 let exit = runtime.block_on(async {
2395 run_repair_checksum_drift(
2396 &workspace,
2397 version,
2398 app,
2399 database,
2400 checksum_up,
2401 checksum_down,
2402 )
2403 .await
2404 });
2405 ExitCode::from(exit as u8)
2406}
2407
2408async fn run_repair_checksum_drift(
2410 workspace: &Path,
2411 version: &str,
2412 app: Option<&str>,
2413 database: Option<&str>,
2414 checksum_up: Option<&str>,
2415 checksum_down: Option<&str>,
2416) -> i32 {
2417 use djogi::config::DjogiConfig;
2418
2419 let config = match DjogiConfig::load_from_workspace(workspace) {
2420 Ok(c) => c,
2421 Err(e) => {
2422 eprintln!("djogi migrations repair checksum-drift: config load: {e}");
2423 return 1;
2424 }
2425 };
2426
2427 let db_name = resolve_database(database, &config);
2432 let url = match resolve_bucket_url(&config.database, &db_name) {
2433 Some(u) => u,
2434 None => {
2435 eprintln!(
2436 "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
2437 );
2438 return 2;
2439 }
2440 };
2441
2442 let mut ctx = match connect_and_check(&url).await {
2443 ContextOutcome::Ready(ctx) => ctx,
2444 ContextOutcome::UnsupportedVersion(e) => {
2445 crate::print_support_boundary_error("migrations repair checksum-drift", &e);
2446 return 2;
2447 }
2448 ContextOutcome::RuntimeError(msg) => {
2449 eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
2450 return 1;
2451 }
2452 };
2453
2454 let lock_path = workspace.join(LOCK_FILE_NAME);
2455 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2456 Ok(g) => g,
2457 Err(e) => {
2458 eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
2459 return 1;
2460 }
2461 };
2462
2463 let app_label = app.unwrap_or("");
2464 let bucket = BucketKey {
2465 database: db_name,
2466 app: app_label.to_string(),
2467 };
2468
2469 let new_checksum_up = match checksum_up {
2470 Some(c) => c.to_string(),
2471 None => {
2472 match compute_checksum_up_from_disk(workspace, &bucket, version) {
2478 Ok(cs) => cs,
2479 Err(e) => {
2480 eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
2481 return 1;
2482 }
2483 }
2484 }
2485 };
2486
2487 let resolved_checksum_down = match checksum_down {
2488 Some(c) => Some(c.to_string()),
2489 None => {
2490 match compute_checksum_down_from_disk(workspace, &bucket, version) {
2495 Ok(cs_opt) => cs_opt,
2496 Err(e) => {
2497 eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
2498 return 1;
2499 }
2500 }
2501 }
2502 };
2503
2504 match repair_checksum_drift(
2505 &mut ctx,
2506 &guard,
2507 &bucket,
2508 version,
2509 workspace,
2510 &new_checksum_up,
2511 resolved_checksum_down.as_deref(),
2512 RepairConfirmation::OperatorAcknowledged,
2513 )
2514 .await
2515 {
2516 Ok(report) => {
2517 render_repair_report(&report);
2518 0
2519 }
2520 Err(e) => {
2521 eprintln!("djogi migrations repair checksum-drift: {e}");
2522 repair_error_exit_code(&e)
2523 }
2524 }
2525}
2526
2527pub fn repair_partial_apply_cmd(
2532 version: &str,
2533 resolution: PartialApplyResolution,
2534 note: &str,
2535 app: Option<&str>,
2536 database: Option<&str>,
2537 workspace: Option<PathBuf>,
2538) -> ExitCode {
2539 let workspace = resolve_workspace(workspace);
2540 let runtime = match tokio::runtime::Builder::new_current_thread()
2541 .enable_all()
2542 .build()
2543 {
2544 Ok(r) => r,
2545 Err(e) => {
2546 eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
2547 return ExitCode::from(1);
2548 }
2549 };
2550 let exit = runtime.block_on(async {
2551 run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
2552 });
2553 ExitCode::from(exit as u8)
2554}
2555
2556async fn run_repair_partial_apply(
2558 workspace: &Path,
2559 version: &str,
2560 resolution: PartialApplyResolution,
2561 note: &str,
2562 app: Option<&str>,
2563 database: Option<&str>,
2564) -> i32 {
2565 use djogi::config::DjogiConfig;
2566
2567 let config = match DjogiConfig::load_from_workspace(workspace) {
2568 Ok(c) => c,
2569 Err(e) => {
2570 eprintln!("djogi migrations repair partial-apply: config load: {e}");
2571 return 1;
2572 }
2573 };
2574
2575 let db_name = resolve_database(database, &config);
2580 let url = match resolve_bucket_url(&config.database, &db_name) {
2581 Some(u) => u,
2582 None => {
2583 eprintln!(
2584 "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
2585 );
2586 return 2;
2587 }
2588 };
2589
2590 let mut ctx = match connect_and_check(&url).await {
2591 ContextOutcome::Ready(ctx) => ctx,
2592 ContextOutcome::UnsupportedVersion(e) => {
2593 crate::print_support_boundary_error("migrations repair partial-apply", &e);
2594 return 2;
2595 }
2596 ContextOutcome::RuntimeError(msg) => {
2597 eprintln!("djogi migrations repair partial-apply: pool: {msg}");
2598 return 1;
2599 }
2600 };
2601
2602 let lock_path = workspace.join(LOCK_FILE_NAME);
2603 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2604 Ok(g) => g,
2605 Err(e) => {
2606 eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
2607 return 1;
2608 }
2609 };
2610
2611 let app_label = app.unwrap_or("");
2612 let bucket = BucketKey {
2613 database: db_name,
2614 app: app_label.to_string(),
2615 };
2616
2617 match repair_partial_apply(
2618 &mut ctx,
2619 &guard,
2620 &bucket,
2621 version,
2622 workspace,
2623 resolution,
2624 note,
2625 RepairConfirmation::OperatorAcknowledged,
2626 )
2627 .await
2628 {
2629 Ok(report) => {
2630 render_repair_report(&report);
2631 0
2632 }
2633 Err(e) => {
2634 eprintln!("djogi migrations repair partial-apply: {e}");
2635 repair_error_exit_code(&e)
2636 }
2637 }
2638}
2639
2640pub fn repair_resume_partial_apply_cmd(
2646 version: &str,
2647 app: Option<&str>,
2648 database: Option<&str>,
2649 workspace: Option<PathBuf>,
2650 node_id: Option<u32>,
2651 single_node_dev: bool,
2652) -> ExitCode {
2653 let workspace = resolve_workspace(workspace);
2654 let runtime = match tokio::runtime::Builder::new_current_thread()
2655 .enable_all()
2656 .build()
2657 {
2658 Ok(r) => r,
2659 Err(e) => {
2660 eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
2661 return ExitCode::from(1);
2662 }
2663 };
2664 let exit = runtime.block_on(async {
2665 run_repair_resume_partial(&workspace, version, app, database, node_id, single_node_dev)
2666 .await
2667 });
2668 ExitCode::from(exit as u8)
2669}
2670
2671async fn run_repair_resume_partial(
2673 workspace: &Path,
2674 version: &str,
2675 app: Option<&str>,
2676 database: Option<&str>,
2677 node_id: Option<u32>,
2678 single_node_dev: bool,
2679) -> i32 {
2680 use djogi::config::DjogiConfig;
2681
2682 let config = match DjogiConfig::load_from_workspace(workspace) {
2683 Ok(c) => c,
2684 Err(e) => {
2685 eprintln!("djogi migrations repair resume-partial: config load: {e}");
2686 return 1;
2687 }
2688 };
2689
2690 let runner_identity = match crate::identity::resolve_identity(
2692 node_id,
2693 single_node_dev,
2694 &config.profile,
2695 "repair resume-partial",
2696 ) {
2697 Ok(resolved) => Some(resolved.into_runner_identity()),
2698 Err(e) => {
2699 let _ = crate::identity::print_identity_error("repair resume-partial", &e);
2700 return 2;
2701 }
2702 };
2703
2704 let db_name = resolve_database(database, &config);
2709 let url = match resolve_bucket_url(&config.database, &db_name) {
2710 Some(u) => u,
2711 None => {
2712 eprintln!(
2713 "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
2714 );
2715 return 2;
2716 }
2717 };
2718
2719 let mut ctx = match connect_and_check(&url).await {
2720 ContextOutcome::Ready(ctx) => ctx,
2721 ContextOutcome::UnsupportedVersion(e) => {
2722 crate::print_support_boundary_error("migrations repair resume-partial", &e);
2723 return 2;
2724 }
2725 ContextOutcome::RuntimeError(msg) => {
2726 eprintln!("djogi migrations repair resume-partial: pool: {msg}");
2727 return 1;
2728 }
2729 };
2730
2731 let lock_path = workspace.join(LOCK_FILE_NAME);
2732 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2733 Ok(g) => g,
2734 Err(e) => {
2735 eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
2736 return 1;
2737 }
2738 };
2739
2740 let app_label = app.unwrap_or("");
2741 let bucket = BucketKey {
2742 database: db_name,
2743 app: app_label.to_string(),
2744 };
2745
2746 let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
2752 Ok(p) => p,
2753 Err(e) => {
2754 eprintln!("djogi migrations repair resume-partial: load plan: {e}");
2755 return 2;
2756 }
2757 };
2758
2759 match repair_resume_partial_apply(
2760 &mut ctx,
2761 &guard,
2762 workspace,
2763 version,
2764 &plan,
2765 runner_identity,
2766 RepairConfirmation::OperatorAcknowledged,
2767 )
2768 .await
2769 {
2770 Ok(report) => {
2771 render_repair_report(&report);
2772 0
2773 }
2774 Err(e) => {
2775 eprintln!("djogi migrations repair resume-partial: {e}");
2776 repair_error_exit_code(&e)
2777 }
2778 }
2779}
2780
2781fn load_committed_plan_for_resume(
2795 workspace: &Path,
2796 bucket: &djogi::migrate::BucketKey,
2797 version: &str,
2798) -> Result<djogi::migrate::MigrationPlan, String> {
2799 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
2800 let plan_path = bucket_dir.join(format!("{version}.plan.json"));
2801 let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
2802 let stored: CliReplayPlan = serde_json::from_slice(&bytes)
2803 .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
2804 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
2805 return Err(format!(
2806 "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
2807 plan_path.display(),
2808 stored.format_version,
2809 ));
2810 }
2811 Ok(djogi::migrate::MigrationPlan {
2812 bucket: bucket.clone(),
2813 classification: stored.classification.into(),
2814 segments: stored
2815 .segments
2816 .into_iter()
2817 .map(|seg| djogi::migrate::Segment {
2818 kind: seg.kind.into(),
2819 statements: seg
2820 .statements
2821 .into_iter()
2822 .map(|stmt| djogi::migrate::OperationSql {
2823 label: stmt.label,
2824 up: stmt.up,
2825 down: String::new(),
2826 lossy: None,
2827 })
2828 .collect(),
2829 })
2830 .collect(),
2831 })
2832}
2833
2834pub fn repair_snapshot_rebuild_cmd(
2840 app: Option<&str>,
2841 database: Option<&str>,
2842 snapshot_path: Option<&Path>,
2843 workspace: Option<PathBuf>,
2844) -> ExitCode {
2845 let workspace = resolve_workspace(workspace);
2846 let runtime = match tokio::runtime::Builder::new_current_thread()
2847 .enable_all()
2848 .build()
2849 {
2850 Ok(r) => r,
2851 Err(e) => {
2852 eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
2853 return ExitCode::from(1);
2854 }
2855 };
2856 let exit = runtime.block_on(async {
2857 run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
2858 });
2859 ExitCode::from(exit as u8)
2860}
2861
2862async fn run_repair_snapshot_rebuild(
2864 workspace: &Path,
2865 app: Option<&str>,
2866 database: Option<&str>,
2867 snapshot_path: Option<&Path>,
2868) -> i32 {
2869 use djogi::config::DjogiConfig;
2870
2871 let config = match DjogiConfig::load_from_workspace(workspace) {
2872 Ok(c) => c,
2873 Err(e) => {
2874 eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
2875 return 1;
2876 }
2877 };
2878
2879 let db_name = resolve_database(database, &config);
2884 let url = match resolve_bucket_url(&config.database, &db_name) {
2885 Some(u) => u,
2886 None => {
2887 eprintln!(
2888 "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
2889 );
2890 return 2;
2891 }
2892 };
2893
2894 let mut ctx = match connect_and_check(&url).await {
2895 ContextOutcome::Ready(ctx) => ctx,
2896 ContextOutcome::UnsupportedVersion(e) => {
2897 crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
2898 return 2;
2899 }
2900 ContextOutcome::RuntimeError(msg) => {
2901 eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
2902 return 1;
2903 }
2904 };
2905
2906 let lock_path = workspace.join(LOCK_FILE_NAME);
2907 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2908 Ok(g) => g,
2909 Err(e) => {
2910 eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
2911 return 1;
2912 }
2913 };
2914
2915 let app_label = app.unwrap_or("");
2916 let bucket = BucketKey {
2917 database: db_name,
2918 app: app_label.to_string(),
2919 };
2920
2921 let snap_path = match snapshot_path {
2922 Some(p) => p.to_path_buf(),
2923 None => reconstruct_snapshot_path(workspace, &bucket),
2924 };
2925
2926 match repair_snapshot_rebuild(
2927 &mut ctx,
2928 &guard,
2929 &bucket,
2930 &snap_path,
2931 RepairConfirmation::OperatorAcknowledged,
2932 )
2933 .await
2934 {
2935 Ok(report) => {
2936 render_repair_report(&report);
2937 0
2938 }
2939 Err(e) => {
2940 eprintln!("djogi migrations repair snapshot-rebuild: {e}");
2941 repair_error_exit_code(&e)
2942 }
2943 }
2944}
2945
2946#[expect(
2964 clippy::too_many_arguments,
2965 reason = "CLI command entry point mirrors clap arguments explicitly"
2966)]
2967pub fn baseline_cmd(
2968 version: &str,
2969 description: &str,
2970 reason: &str,
2971 app: Option<&str>,
2972 database: Option<&str>,
2973 workspace: Option<PathBuf>,
2974 node_id: Option<u32>,
2975 single_node_dev: bool,
2976) -> ExitCode {
2977 if reason.trim().is_empty() {
2982 eprintln!(
2983 "djogi migrations baseline: --reason must not be empty; \
2984 supply a non-empty reason why this baseline is being established \
2985 (e.g. 'schema pre-exists from prior tooling'). \
2986 This is recorded in the ledger audit trail."
2987 );
2988 return ExitCode::from(2);
2989 }
2990
2991 let workspace = resolve_workspace(workspace);
2992 let runtime = match tokio::runtime::Builder::new_current_thread()
2993 .enable_all()
2994 .build()
2995 {
2996 Ok(r) => r,
2997 Err(e) => {
2998 eprintln!("djogi migrations baseline: tokio runtime: {e}");
2999 return ExitCode::from(1);
3000 }
3001 };
3002 let exit = runtime.block_on(async {
3003 run_baseline(
3004 &workspace,
3005 version,
3006 description,
3007 reason,
3008 app,
3009 database,
3010 node_id,
3011 single_node_dev,
3012 )
3013 .await
3014 });
3015 ExitCode::from(exit as u8)
3016}
3017
3018#[expect(
3029 clippy::too_many_arguments,
3030 reason = "baseline async body keeps CLI arguments explicit through validation and connection setup"
3031)]
3032async fn run_baseline(
3033 workspace: &Path,
3034 version: &str,
3035 description: &str,
3036 reason: &str,
3037 app: Option<&str>,
3038 database: Option<&str>,
3039 node_id: Option<u32>,
3040 single_node_dev: bool,
3041) -> i32 {
3042 use djogi::config::DjogiConfig;
3043
3044 let config = match DjogiConfig::load_from_workspace(workspace) {
3045 Ok(c) => c,
3046 Err(e) => {
3047 eprintln!("djogi migrations baseline: config load: {e}");
3048 return 1;
3049 }
3050 };
3051
3052 let runner_identity = match crate::identity::resolve_identity(
3054 node_id,
3055 single_node_dev,
3056 &config.profile,
3057 "baseline",
3058 ) {
3059 Ok(resolved) => Some(resolved.into_runner_identity()),
3060 Err(e) => {
3061 let _ = crate::identity::print_identity_error("baseline", &e);
3062 return 2;
3063 }
3064 };
3065
3066 let db_name = resolve_database(database, &config);
3071 let url = match resolve_bucket_url(&config.database, &db_name) {
3072 Some(u) => u,
3073 None => {
3074 eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
3075 return 2;
3076 }
3077 };
3078
3079 let mut ctx = match connect_and_check(&url).await {
3080 ContextOutcome::Ready(ctx) => ctx,
3081 ContextOutcome::UnsupportedVersion(e) => {
3082 crate::print_support_boundary_error("migrations baseline", &e);
3083 return 2;
3084 }
3085 ContextOutcome::RuntimeError(msg) => {
3086 eprintln!("djogi migrations baseline: pool: {msg}");
3087 return 1;
3088 }
3089 };
3090
3091 let lock_path = workspace.join(LOCK_FILE_NAME);
3092 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
3093 Ok(g) => g,
3094 Err(e) => {
3095 eprintln!("djogi migrations baseline: workspace lock: {e}");
3096 return 1;
3097 }
3098 };
3099
3100 let app_label = app.unwrap_or("");
3101 let bucket = BucketKey {
3102 database: db_name,
3103 app: app_label.to_string(),
3104 };
3105
3106 let runner_ctx = RunnerCtx {
3107 bucket: bucket.clone(),
3108 version: version.to_string(),
3109 description: description.to_string(),
3110 checksum_up: String::new(),
3113 checksum_down: None,
3114 snapshot: None,
3118 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
3119 config: djogi::config::MigrateConfig {
3122 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
3123 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
3124 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
3125 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
3126 },
3127 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
3128 audit_pool: match djogi::migrate::resolve_audit_url(&config) {
3129 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
3130 Err(_) => None,
3131 },
3132 runner_identity,
3133 };
3134
3135 match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
3136 Ok(report) => {
3137 println!(
3138 "djogi migrations baseline: established baseline `{}` \
3139 (ledger_id={}) in {:.1}s",
3140 version,
3141 report.ledger_id,
3142 report.execution_time_ms as f64 / 1000.0
3143 );
3144 0
3145 }
3146 Err(e) => {
3147 eprintln!("djogi migrations baseline: {e}");
3148 baseline_error_exit_code(&e)
3149 }
3150 }
3151}
3152
3153fn baseline_error_exit_code(err: &RunnerError) -> i32 {
3171 match err {
3172 RunnerError::VersionAlreadyApplied { .. }
3192 | RunnerError::VersionCollisionNonTerminal { .. }
3193 | RunnerError::BaselineSnapshotShouldNotBeProvided
3194 | RunnerError::AdvisoryUnlockReturnedFalse { .. }
3195 | RunnerError::SnapshotPersistFailed { .. }
3196 | RunnerError::OutOfOrderRejected { .. } => 2,
3197 _ => 1,
3202 }
3203}
3204
3205#[cfg(test)]
3206mod tests {
3207 use super::*;
3208 use std::fs;
3209 use std::sync::atomic::{AtomicUsize, Ordering};
3210
3211 struct DatabaseUrlEnvGuard {
3212 _lock: std::sync::MutexGuard<'static, ()>,
3213 prior: Option<String>,
3214 }
3215
3216 impl DatabaseUrlEnvGuard {
3217 fn new() -> Self {
3218 Self {
3219 _lock: crate::test_env_lock(),
3220 prior: std::env::var("DATABASE_URL").ok(),
3221 }
3222 }
3223
3224 fn set(&self, value: &str) {
3225 unsafe { std::env::set_var("DATABASE_URL", value) };
3226 }
3227
3228 fn remove(&self) {
3229 unsafe { std::env::remove_var("DATABASE_URL") };
3230 }
3231 }
3232
3233 impl Drop for DatabaseUrlEnvGuard {
3234 fn drop(&mut self) {
3235 match &self.prior {
3236 Some(value) => unsafe { std::env::set_var("DATABASE_URL", value) },
3237 None => unsafe { std::env::remove_var("DATABASE_URL") },
3238 }
3239 }
3240 }
3241
3242 fn temp_workspace(tag: &str) -> std::path::PathBuf {
3243 static COUNTER: AtomicUsize = AtomicUsize::new(0);
3244 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3245 let nanos = std::time::SystemTime::now()
3246 .duration_since(std::time::UNIX_EPOCH)
3247 .unwrap()
3248 .as_nanos();
3249 let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
3250 fs::create_dir_all(&p).unwrap();
3251 p
3252 }
3253
3254 fn write_unreachable_config(work: &std::path::Path) {
3255 let toml = "[database]\nurl = \"postgres://localhost:1/djogi_unreachable\"\n\
3256 max_connections = 1\ndev_mode = false\n\
3257 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3258 fs::write(work.join("Djogi.toml"), toml).unwrap();
3259 }
3260
3261 fn without_database_url<T>(f: impl FnOnce() -> T) -> T {
3262 let env_guard = DatabaseUrlEnvGuard::new();
3263 env_guard.remove();
3264 f()
3265 }
3266
3267 #[test]
3268 fn database_url_env_guard_restores_prior_value() {
3269 let env_guard = DatabaseUrlEnvGuard::new();
3270 let expected = env_guard.prior.clone();
3271 let next = if expected.as_deref() == Some("postgres://from-env/test") {
3272 "postgres://temporary/test"
3273 } else {
3274 "postgres://from-env/test"
3275 };
3276 env_guard.set(next);
3277 drop(env_guard);
3278 assert_eq!(std::env::var("DATABASE_URL").ok(), expected);
3279 }
3280
3281 fn current_production_phase_zero_sql(tag: &str) -> String {
3282 let work = temp_workspace(tag);
3283 let lock_path = work.join(LOCK_FILE_NAME);
3284 let guard = acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT).expect("lock");
3285 let models: std::collections::BTreeMap<
3286 djogi::migrate::BucketKey,
3287 djogi::migrate::AppliedSchema,
3288 > = std::collections::BTreeMap::new();
3289 let apps = vec![AppLifecycle {
3290 label: "billing".to_string(),
3291 database: "main".to_string(),
3292 renamed_from: None,
3293 tombstone: false,
3294 }];
3295 let emitted = djogi::migrate::ensure_phase_zero_emitted(
3296 &work,
3297 &models,
3298 &apps,
3299 time::OffsetDateTime::now_utc(),
3300 &guard,
3301 )
3302 .expect("auto-emit Phase 0");
3303 let sql = fs::read_to_string(&emitted[0].up_sql_path).expect("read emitted Phase 0");
3304 drop(guard);
3305 let _ = fs::remove_dir_all(&work);
3306 sql
3307 }
3308
3309 fn markerless_seed_phase_zero_sql(tag: &str) -> String {
3310 let mut sql = current_production_phase_zero_sql(tag);
3311 sql.push_str("\nINSERT INTO heer.heer_nodes (id) VALUES (1);\n");
3312 sql
3313 }
3314
3315 fn phase_zero_with_seed_statement(tag: &str, statement: &str) -> String {
3316 let mut sql = current_production_phase_zero_sql(tag);
3317 sql.push('\n');
3318 sql.push_str(statement);
3319 sql.push('\n');
3320 sql
3321 }
3322
3323 fn extended_seed_statement_cases() -> [(&'static str, &'static str); 4] {
3324 [
3325 (
3326 "cte_insert",
3327 "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
3328 ),
3329 (
3330 "cte_delete",
3331 "WITH moved AS (DELETE FROM heer.heer_node_state RETURNING *) SELECT 1;",
3332 ),
3333 (
3334 "merge",
3335 "MERGE INTO heer.heer_nodes AS target USING incoming ON false WHEN NOT MATCHED THEN INSERT (id) VALUES (1);",
3336 ),
3337 (
3338 "copy_from",
3339 "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
3340 ),
3341 ]
3342 }
3343
3344 fn generated_stale_phase_zero_sql(tag: &str) -> String {
3345 let mut sql = current_production_phase_zero_sql(tag);
3346 sql.push_str(
3347 "\nALTER DATABASE \"mydb\" SET heer.node_id = '1';\n\
3348 ALTER DATABASE \"mydb\" SET heer.ranj_node_id = '1';\n\
3349 SET heer.node_id = '1';\n\
3350 SET heer.ranj_node_id = '1';\n",
3351 );
3352 sql
3353 }
3354
3355 fn seed_capable_phase_zero_sql() -> String {
3356 djogi::testing::phase_zero_sql_for_testing("main", true)
3357 .expect("compose seed-capable Phase 0")
3358 }
3359
3360 fn write_pending_json(path: &Path, database: &str, app: &str, version: &str) {
3361 let pending = PendingPlan {
3362 format_version: "1".to_string(),
3363 bucket_database: database.to_string(),
3364 bucket_app: app.to_string(),
3365 version: version.to_string(),
3366 slug: "test".to_string(),
3367 model_snapshot: djogi::migrate::AppliedSchema {
3368 djogi_version: "0.1.0".to_string(),
3369 enums: std::collections::BTreeMap::new(),
3370 format_version: djogi::migrate::SNAPSHOT_FORMAT_VERSION.to_string(),
3371 generated_at: "2026-06-06T00:00:00Z".to_string(),
3372 indexes: Vec::new(),
3373 models: std::collections::BTreeMap::new(),
3374 registered_apps: vec![app.to_string()],
3375 },
3376 checksum_up: "V1:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
3377 .to_string(),
3378 checksum_down: None,
3379 composed_at: "2026-06-06T00:00:00Z".to_string(),
3380 };
3381 if let Some(parent) = path.parent() {
3382 fs::create_dir_all(parent).unwrap();
3383 }
3384 fs::write(path, serde_json::to_vec_pretty(&pending).unwrap()).unwrap();
3385 }
3386
3387 #[test]
3391 fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
3392 let work = temp_workspace("b1_discover");
3393 let billing_dir = work.join("migrations/main/billing");
3398 fs::create_dir_all(&billing_dir).unwrap();
3399 fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
3400 let global_dir = work.join("migrations/main/_global_");
3403 fs::create_dir_all(&global_dir).unwrap();
3404 fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
3405 let no_snap_dir = work.join("migrations/main/empty_app");
3409 fs::create_dir_all(&no_snap_dir).unwrap();
3410
3411 let buckets = discover_snapshot_buckets_on_disk(&work);
3412 let labels: std::collections::BTreeSet<&str> =
3413 buckets.iter().map(|b| b.app.as_str()).collect();
3414 assert!(
3415 labels.contains("billing"),
3416 "must include the renamed-from bucket: {labels:?}"
3417 );
3418 assert!(
3419 labels.contains(""),
3420 "must include the global bucket: {labels:?}"
3421 );
3422 assert!(
3423 !labels.contains("empty_app"),
3424 "must not include directories without a snapshot: {labels:?}"
3425 );
3426 let _ = fs::remove_dir_all(&work);
3427 }
3428
3429 #[test]
3434 fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
3435 let work = temp_workspace("a1_workspace_config");
3436 let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
3437 max_connections = 1\ndev_mode = false\n\
3438 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3439 fs::write(work.join("Djogi.toml"), toml).unwrap();
3440 let env_guard = DatabaseUrlEnvGuard::new();
3441 env_guard.remove();
3442 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
3443 assert_eq!(
3444 config.database.url,
3445 "postgres://discovered-by-workspace-flag/test"
3446 );
3447 assert_eq!(config.server.port, 1234);
3448 let _ = fs::remove_dir_all(&work);
3449 }
3450
3451 #[test]
3456 fn a1_round2_env_override_beats_workspace_toml() {
3457 let work = temp_workspace("a1r2_env_override");
3458 let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
3459 max_connections = 1\ndev_mode = false\n\
3460 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
3461 fs::write(work.join("Djogi.toml"), toml).unwrap();
3462 let env_guard = DatabaseUrlEnvGuard::new();
3463 env_guard.set("postgres://from-env/test");
3464 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
3465 assert_eq!(
3466 config.database.url, "postgres://from-env/test",
3467 "env DATABASE_URL must win over workspace Djogi.toml"
3468 );
3469 let _ = fs::remove_dir_all(&work);
3470 }
3471
3472 #[test]
3473 fn apply_no_pending_is_identity_free_and_skips_pool_connect() {
3474 let work = temp_workspace("apply_no_pending");
3475 write_unreachable_config(&work);
3476
3477 let exit = without_database_url(|| {
3478 let runtime = tokio::runtime::Builder::new_current_thread()
3479 .enable_all()
3480 .build()
3481 .expect("runtime");
3482 runtime.block_on(run_apply(&work, &FakeMode::Real, None, false))
3483 });
3484
3485 assert_eq!(
3486 exit, 0,
3487 "no-pending apply must return before identity resolution or pool checkout"
3488 );
3489 let _ = fs::remove_dir_all(&work);
3490 }
3491
3492 #[test]
3493 fn discover_pending_plans_orders_phase_zero_before_normal_global() {
3494 let work = temp_workspace("discover_pending_phase_zero_first");
3495 write_pending_json(
3496 &djogi::migrate::pending_json_path(
3497 &work,
3498 &BucketKey {
3499 database: "main".to_string(),
3500 app: String::new(),
3501 },
3502 ),
3503 "main",
3504 "",
3505 "V20260606010101__later_global",
3506 );
3507 write_pending_json(
3508 &djogi::migrate::phase_zero_pending_json_path(
3509 &work,
3510 "main",
3511 djogi::migrate::PHASE_ZERO_VERSION,
3512 ),
3513 "main",
3514 "",
3515 djogi::migrate::PHASE_ZERO_VERSION,
3516 );
3517
3518 let discovered = discover_pending_plans(&work).expect("discover");
3519 assert_eq!(discovered.len(), 2);
3520 assert_eq!(
3521 discovered[0].plan.version,
3522 djogi::migrate::PHASE_ZERO_VERSION
3523 );
3524 assert!(discovered[0].is_phase_zero);
3525 assert_eq!(discovered[1].plan.version, "V20260606010101__later_global");
3526 let _ = fs::remove_dir_all(&work);
3527 }
3528
3529 #[test]
3530 fn discover_pending_plans_refuses_malformed_pending_json() {
3531 let work = temp_workspace("discover_pending_malformed");
3532 let path = djogi::migrate::pending_json_path(
3533 &work,
3534 &BucketKey {
3535 database: "main".to_string(),
3536 app: String::new(),
3537 },
3538 );
3539 fs::create_dir_all(path.parent().unwrap()).unwrap();
3540 fs::write(&path, b"{ not json").unwrap();
3541
3542 let err = discover_pending_plans(&work).expect_err("malformed pending must refuse");
3543 assert!(err.contains("parse pending JSON"));
3544 let _ = fs::remove_dir_all(&work);
3545 }
3546
3547 #[test]
3548 fn discover_pending_plans_refuses_hidden_phase_zero_database_mismatch() {
3549 let work = temp_workspace("discover_pending_phase_zero_db_mismatch");
3550 write_pending_json(
3551 &djogi::migrate::phase_zero_pending_json_path(
3552 &work,
3553 "main",
3554 djogi::migrate::PHASE_ZERO_VERSION,
3555 ),
3556 "other_db",
3557 "",
3558 djogi::migrate::PHASE_ZERO_VERSION,
3559 );
3560
3561 let err = discover_pending_plans(&work).expect_err("hidden Phase 0 mismatch must refuse");
3562 assert!(
3563 err.contains("expected main from path"),
3564 "unexpected error: {err}"
3565 );
3566 let _ = fs::remove_dir_all(&work);
3567 }
3568
3569 #[test]
3570 fn discover_pending_plans_refuses_normal_global_phase_zero_pending() {
3571 let work = temp_workspace("discover_pending_normal_global_phase_zero");
3572 let path = djogi::migrate::pending_json_path(
3573 &work,
3574 &BucketKey {
3575 database: "main".to_string(),
3576 app: String::new(),
3577 },
3578 );
3579 write_pending_json(&path, "main", "", djogi::migrate::PHASE_ZERO_VERSION);
3580
3581 let err = discover_pending_plans(&work).expect_err("normal-global Phase 0 must refuse");
3582 assert!(
3583 err.contains("Phase 0") && err.contains(".phase_zero"),
3584 "unexpected error: {err}"
3585 );
3586 let _ = fs::remove_dir_all(&work);
3587 }
3588
3589 #[test]
3590 fn discover_pending_plans_refuses_normal_pending_app_mismatch() {
3591 let work = temp_workspace("discover_pending_normal_app_mismatch");
3592 let path = djogi::migrate::pending_json_path(
3593 &work,
3594 &BucketKey {
3595 database: "main".to_string(),
3596 app: "billing".to_string(),
3597 },
3598 );
3599 write_pending_json(&path, "main", "audit", "V20260606010101__mismatch");
3600
3601 let err = discover_pending_plans(&work).expect_err("normal app mismatch must refuse");
3602 assert!(
3603 err.contains("expected billing from path"),
3604 "unexpected error: {err}"
3605 );
3606 let _ = fs::remove_dir_all(&work);
3607 }
3608
3609 #[test]
3610 fn discover_pending_plans_refuses_noncanonical_normal_pending_filename() {
3611 let work = temp_workspace("discover_pending_noncanonical_filename");
3612 let path = work.join("target/djogi_pending/main/bad-name.json");
3613 write_pending_json(&path, "main", "bad-name", "V20260606010101__bad_name");
3614
3615 let err = discover_pending_plans(&work).expect_err("non-canonical filename must refuse");
3616 assert!(
3617 err.contains("non-canonical app filename"),
3618 "unexpected error: {err}"
3619 );
3620 let _ = fs::remove_dir_all(&work);
3621 }
3622
3623 #[test]
3624 fn load_verified_pending_for_apply_refuses_changed_artifact() {
3625 let work = temp_workspace("apply_pending_changed_after_discovery");
3626 let path = djogi::migrate::pending_json_path(
3627 &work,
3628 &BucketKey {
3629 database: "main".to_string(),
3630 app: String::new(),
3631 },
3632 );
3633 write_pending_json(&path, "main", "", "V20260606010101__stable");
3634 let discovered = discover_pending_plans(&work).expect("discover");
3635 fs::write(
3636 &path,
3637 serde_json::to_vec_pretty(&PendingPlan {
3638 version: "V20260606010102__changed".to_string(),
3639 ..discovered[0].plan.clone()
3640 })
3641 .unwrap(),
3642 )
3643 .unwrap();
3644
3645 let err = load_verified_pending_for_apply(&discovered[0])
3646 .expect_err("apply must refuse a changed pending artifact");
3647 assert!(
3648 err.contains("changed after discovery"),
3649 "unexpected error: {err}"
3650 );
3651 let _ = fs::remove_dir_all(&work);
3652 }
3653
3654 #[test]
3655 fn reconcile_pending_plans_after_lock_refuses_added_artifact() {
3656 let work = temp_workspace("apply_pending_added_before_lock");
3657 let path = djogi::migrate::pending_json_path(
3658 &work,
3659 &BucketKey {
3660 database: "main".to_string(),
3661 app: String::new(),
3662 },
3663 );
3664 write_pending_json(&path, "main", "", "V20260606010101__stable");
3665 let discovered = discover_pending_plans(&work).expect("discover");
3666 write_pending_json(
3667 &djogi::migrate::phase_zero_pending_json_path(
3668 &work,
3669 "main",
3670 djogi::migrate::PHASE_ZERO_VERSION,
3671 ),
3672 "main",
3673 "",
3674 djogi::migrate::PHASE_ZERO_VERSION,
3675 );
3676
3677 let err = reconcile_pending_plans_after_lock(&work, &discovered)
3678 .expect_err("locked reconciliation must refuse a changed pending set");
3679 assert!(
3680 err.contains("changed while waiting for the workspace lock"),
3681 "unexpected error: {err}"
3682 );
3683 let _ = fs::remove_dir_all(&work);
3684 }
3685
3686 #[test]
3687 fn reconcile_pending_plans_after_lock_accepts_unchanged_set() {
3688 let work = temp_workspace("apply_pending_stable_under_lock");
3689 let path = djogi::migrate::pending_json_path(
3690 &work,
3691 &BucketKey {
3692 database: "main".to_string(),
3693 app: String::new(),
3694 },
3695 );
3696 write_pending_json(&path, "main", "", "V20260606010101__stable");
3697 let discovered = discover_pending_plans(&work).expect("discover");
3698
3699 let locked = reconcile_pending_plans_after_lock(&work, &discovered)
3700 .expect("unchanged set must reconcile");
3701 assert_eq!(locked, discovered);
3702 let _ = fs::remove_dir_all(&work);
3703 }
3704
3705 #[test]
3706 fn repair_checksum_drift_is_identity_free() {
3707 let work = temp_workspace("repair_checksum_identity_free");
3708 write_unreachable_config(&work);
3709
3710 let exit = without_database_url(|| {
3711 repair_checksum_drift_cmd(
3712 "V20260601000000__repair_checksum",
3713 None,
3714 None,
3715 Some("V1:0000000000000000000000000000000000000000000000000000000000000000"),
3716 None,
3717 Some(work.clone()),
3718 )
3719 });
3720
3721 assert_eq!(
3722 exit,
3723 ExitCode::from(1),
3724 "checksum-drift should reach pool connection without shared identity validation"
3725 );
3726 let _ = fs::remove_dir_all(&work);
3727 }
3728
3729 #[test]
3730 fn repair_partial_apply_is_identity_free() {
3731 let work = temp_workspace("repair_partial_identity_free");
3732 write_unreachable_config(&work);
3733
3734 let exit = without_database_url(|| {
3735 repair_partial_apply_cmd(
3736 "V20260601000000__repair_partial",
3737 PartialApplyResolution::MarkRolledBack,
3738 "operator confirmed rollback",
3739 None,
3740 None,
3741 Some(work.clone()),
3742 )
3743 });
3744
3745 assert_eq!(
3746 exit,
3747 ExitCode::from(1),
3748 "partial-apply should reach pool connection without shared identity validation"
3749 );
3750 let _ = fs::remove_dir_all(&work);
3751 }
3752
3753 #[test]
3754 fn repair_snapshot_rebuild_is_identity_free() {
3755 let work = temp_workspace("repair_snapshot_identity_free");
3756 write_unreachable_config(&work);
3757
3758 let exit = without_database_url(|| {
3759 repair_snapshot_rebuild_cmd(None, None, None, Some(work.clone()))
3760 });
3761
3762 assert_eq!(
3763 exit,
3764 ExitCode::from(1),
3765 "snapshot-rebuild should reach pool connection without shared identity validation"
3766 );
3767 let _ = fs::remove_dir_all(&work);
3768 }
3769
3770 #[test]
3782 fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
3783 use djogi::migrate::projection::BucketKey;
3784 use djogi::migrate::schema::{
3785 ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
3786 };
3787 use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
3788 use std::collections::BTreeMap;
3789
3790 let work = temp_workspace("b1r2_compose_uses_discovery");
3791
3792 let billing_bucket = BucketKey {
3795 database: "main".into(),
3796 app: "billing".into(),
3797 };
3798 let mut billing_snap = AppliedSchema {
3799 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
3800 enums: BTreeMap::new(),
3801 format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
3802 generated_at: "2026-04-25T00:00:00Z".to_string(),
3803 indexes: Vec::new(),
3804 models: BTreeMap::new(),
3805 registered_apps: vec!["billing".to_string()],
3806 };
3807 billing_snap.models.insert(
3808 "widgets".to_string(),
3809 TableSchema {
3810 app: Some("billing".to_string()),
3811 columns: vec![ColumnSchema {
3812 check: None,
3813 comment: None,
3814 default_sql: Some("heerid_next_desc()".to_string()),
3815 foreign_key: None,
3816 generated: None,
3817 identity: None,
3818 index_type: None,
3819 indexed: false,
3820 max_length: None,
3821 name: "id".to_string(),
3822 nullable: false,
3823 on_delete: None,
3824 outbox_exclude: false,
3825 rationale: None,
3826 relation_kind: None,
3827 renamed_from: None,
3828 sequence_within: None,
3829 sql_type: "BIGINT".to_string(),
3830 unique: false,
3831 type_change_using: None,
3832 }],
3833 exclusion_constraints: Vec::new(),
3834 fts: None,
3835 is_through: false,
3836 moved_from_app: None,
3837 partition: None,
3838 primary_key: PrimaryKeySchema {
3839 columns: vec!["id".to_string()],
3840 kind: PkKindSchema::HeerIdRecencyBiased,
3841 },
3842 rationale: None,
3843 renamed_from: None,
3844 rls_enabled: false,
3845 table: "widgets".to_string(),
3846 table_comment: None,
3847 storage_params: None,
3848 tablespace: None,
3849 tenant_key: None,
3850 },
3851 );
3852 let snap_path = snapshot_path(&work, &billing_bucket);
3853 save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
3854
3855 let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
3859 let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
3860
3861 let exit = compose_with_inputs(
3862 &work,
3863 "drop billing remnant",
3864 true, false, &empty_models,
3867 &[AppLifecycle {
3868 label: "billing".to_string(),
3869 database: "main".to_string(),
3870 renamed_from: None,
3871 tombstone: true, }],
3873 now,
3874 None, );
3876 assert_eq!(exit, ExitCode::from(0), "compose must succeed");
3877
3878 let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
3881 let mut up_path: Option<PathBuf> = None;
3882 for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
3883 let n = entry.file_name().to_string_lossy().to_string();
3884 if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
3887 up_path = Some(entry.path());
3888 break;
3889 }
3890 }
3891 let up_path = up_path.expect("compose must have written an up SQL file");
3892 let up_sql = fs::read_to_string(&up_path).unwrap();
3893 assert!(
3894 up_sql.contains("DROP TABLE \"widgets\""),
3895 "compose must have seen the disk snapshot and emitted DROP TABLE — \
3896 this proves discover_snapshot_buckets_on_disk reached the differ. \
3897 SQL: {up_sql}"
3898 );
3899 let _ = fs::remove_dir_all(&work);
3900 }
3901
3902 #[test]
3910 fn a1_round2_status_cmd_threads_workspace_to_config() {
3911 let work = temp_workspace("a1r2_status_workspace");
3912 fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
3917 let exit = status_cmd(Some(work.clone()));
3918 assert_eq!(
3919 exit,
3920 ExitCode::from(1),
3921 "malformed workspace Djogi.toml must surface as config load error"
3922 );
3923 let _ = fs::remove_dir_all(&work);
3924 }
3925
3926 #[test]
3934 fn u3_attune_refusal_variants_map_to_exit_code_two() {
3935 use djogi::migrate::AttuneRefusal;
3936 let cases = [
3937 AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
3938 database_url: "postgres://prod.example.com/main".to_string(),
3939 }),
3940 AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
3941 profile: "production".to_string(),
3942 }),
3943 AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
3946 AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
3947 env_value: "production".to_string(),
3948 }),
3949 AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
3950 version: "V20260101000000__missing".to_string(),
3951 }),
3952 AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
3953 version: "V20260101000000__shared".to_string(),
3954 buckets: vec!["main/users".to_string(), "main/billing".to_string()],
3955 }),
3956 ];
3957 for err in &cases {
3958 assert_eq!(
3959 attune_error_exit_code(err),
3960 2,
3961 "refusal variant must map to exit 2: {err}"
3962 );
3963 }
3964 }
3965
3966 #[test]
3971 fn u3_attune_runtime_variants_map_to_exit_code_one() {
3972 let cases = [
3973 AttuneError::FilesystemScanFailed {
3974 source: std::io::Error::other("disk full"),
3975 },
3976 AttuneError::SqlReadFailed {
3977 path: PathBuf::from("/tmp/x.sdjql"),
3978 source: std::io::Error::other("permission denied"),
3979 },
3980 AttuneError::SqlWriteFailed {
3981 path: PathBuf::from("/tmp/x.sdjql"),
3982 source: std::io::Error::other("read-only fs"),
3983 },
3984 AttuneError::SqlDeleteFailed {
3985 path: PathBuf::from("/tmp/x.sdjql"),
3986 source: std::io::Error::other("not found"),
3987 },
3988 AttuneError::GitPublishFailed {
3989 stderr: "fatal: refusing to push".to_string(),
3990 status_code: Some(128),
3991 },
3992 ];
3993 for err in &cases {
3994 assert_eq!(
3995 attune_error_exit_code(err),
3996 1,
3997 "runtime variant must map to exit 1: {err}"
3998 );
3999 }
4000 }
4001
4002 #[test]
4010 fn baseline_empty_reason_exits_code_2() {
4011 let result = baseline_cmd(
4012 "V00000000000000__baseline",
4013 "description",
4014 "",
4015 None,
4016 None,
4017 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4018 None, false, );
4021 assert_eq!(
4022 result,
4023 ExitCode::from(2),
4024 "empty --reason must exit 2 before any DB work"
4025 );
4026 }
4027
4028 #[test]
4029 fn baseline_whitespace_reason_exits_code_2() {
4030 let result = baseline_cmd(
4031 "V00000000000000__baseline",
4032 "description",
4033 " ",
4034 None,
4035 None,
4036 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4037 None, false, );
4040 assert_eq!(
4041 result,
4042 ExitCode::from(2),
4043 "whitespace-only --reason must exit 2 before any DB work"
4044 );
4045 }
4046
4047 #[test]
4053 fn baseline_refusal_variants_map_to_exit_code_two() {
4054 let cases = [
4055 RunnerError::VersionAlreadyApplied {
4056 version: "V00000000000000__baseline".to_string(),
4057 applied_at: None,
4058 },
4059 RunnerError::VersionCollisionNonTerminal {
4060 version: "V00000000000000__baseline".to_string(),
4061 status: LedgerStatus::Pending,
4062 run_id: 1,
4063 },
4064 RunnerError::BaselineSnapshotShouldNotBeProvided,
4065 RunnerError::AdvisoryUnlockReturnedFalse {
4066 bucket: BucketKey {
4067 database: "main".to_string(),
4068 app: String::new(),
4069 },
4070 key: 0x0102_0304_0506_0708,
4071 },
4072 RunnerError::OutOfOrderRejected {
4073 version: "V00000000000000__baseline".to_string(),
4074 conflicting_version: "V20260101000000__later".to_string(),
4075 conflicting_applied_at: None,
4076 },
4077 ];
4078 for err in &cases {
4079 assert_eq!(
4080 baseline_error_exit_code(err),
4081 2,
4082 "baseline refusal variant must map to exit 2: {err}"
4083 );
4084 }
4085 }
4086
4087 #[test]
4093 fn baseline_transient_variants_map_to_exit_code_one() {
4094 use djogi::error::{DbError, DjogiError};
4095 let cases = [
4096 RunnerError::LedgerBootstrapFailed {
4097 source: DjogiError::Db(DbError::other("create table failed")),
4098 },
4099 RunnerError::LedgerWriteFailed {
4100 version: "V00000000000000__baseline".to_string(),
4101 source: DjogiError::Db(DbError::other("insert failed")),
4102 },
4103 RunnerError::PinnedSessionCheckoutFailed {
4104 source: DjogiError::Db(DbError::other("pool exhausted")),
4105 },
4106 RunnerError::AdvisoryLockFailed {
4107 bucket: BucketKey {
4108 database: "main".to_string(),
4109 app: String::new(),
4110 },
4111 key: 0x0102_0304_0506_0708,
4112 attempts: 3,
4113 },
4114 ];
4115 for err in &cases {
4116 assert_eq!(
4117 baseline_error_exit_code(err),
4118 1,
4119 "baseline transient variant must map to exit 1: {err}"
4120 );
4121 }
4122 }
4123
4124 #[test]
4128 fn fake_without_reason_exits_code_2() {
4129 let result = apply_cmd(
4130 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4131 true,
4132 None,
4133 None, false, );
4136 assert_eq!(
4137 result,
4138 ExitCode::from(2),
4139 "--fake without --reason must exit 2"
4140 );
4141 }
4142
4143 #[test]
4145 fn fake_with_empty_reason_exits_code_2() {
4146 let result = apply_cmd(
4147 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4148 true,
4149 Some(String::new()),
4150 None, false, );
4153 assert_eq!(
4154 result,
4155 ExitCode::from(2),
4156 "--fake with empty reason must exit 2"
4157 );
4158 }
4159
4160 #[test]
4162 fn fake_with_whitespace_reason_exits_code_2() {
4163 let result = apply_cmd(
4164 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4165 true,
4166 Some(" ".to_string()),
4167 None, false, );
4170 assert_eq!(
4171 result,
4172 ExitCode::from(2),
4173 "--fake with whitespace reason must exit 2"
4174 );
4175 }
4176
4177 #[test]
4179 fn reason_without_fake_is_accepted() {
4180 let result = apply_cmd(
4184 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
4185 false, Some("test reason".to_string()),
4187 None, true, );
4190 assert_ne!(
4192 result,
4193 ExitCode::from(2),
4194 "--reason without --fake should not refuse"
4195 );
4196 }
4197
4198 fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
4205 djogi::migrate::BucketKey {
4206 database: database.to_string(),
4207 app: app.to_string(),
4208 }
4209 }
4210
4211 fn diag(
4213 code: &str,
4214 severity: djogi::migrate::VerifySeverity,
4215 message: &str,
4216 location: Option<&str>,
4217 ) -> djogi::migrate::VerifyDiagnostic {
4218 djogi::migrate::VerifyDiagnostic {
4219 code: code.to_string(),
4220 severity,
4221 message: message.to_string(),
4222 location: location.map(str::to_string),
4223 }
4224 }
4225
4226 #[test]
4227 fn render_verify_report_clean_output() {
4228 use djogi::migrate::VerifyReport;
4229
4230 let report = VerifyReport {
4231 diagnostics: vec![],
4232 latest_applied_version: Some("001_initial".to_string()),
4233 applied_count: 3,
4234 unfinished_count: 0,
4235 };
4236 let bucket = render_bucket("main", "");
4237
4238 let lines = render_verify_report(&report, &bucket);
4239
4240 assert!(
4241 lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
4242 "missing ledger line; got {lines:?}"
4243 );
4244 assert!(
4245 lines.contains(&"No drift detected. Schema is consistent.".to_string()),
4246 "missing clean line; got {lines:?}"
4247 );
4248 assert!(
4249 lines.iter().any(|l| l.contains("Result: PASSED")),
4250 "missing PASSED result; got {lines:?}"
4251 );
4252 assert!(
4253 !lines.iter().any(|l| l.contains("FAILED")),
4254 "clean report must not say FAILED; got {lines:?}"
4255 );
4256 }
4257
4258 #[test]
4259 fn render_verify_report_with_errors() {
4260 use djogi::migrate::{VerifyReport, VerifySeverity};
4261
4262 let report = VerifyReport {
4265 diagnostics: vec![
4266 diag(
4267 "D601",
4268 VerifySeverity::Error,
4269 "Snapshot table missing from live DB",
4270 Some("users"),
4271 ),
4272 diag(
4273 "D611",
4274 VerifySeverity::Warning,
4275 "Live index not present in snapshot",
4276 Some("idx_posts_created"),
4277 ),
4278 ],
4279 latest_applied_version: Some("V20260501000000__add_users".to_string()),
4280 applied_count: 2,
4281 unfinished_count: 0,
4282 };
4283 let bucket = render_bucket("main", "myapp");
4284
4285 assert!(report.has_errors());
4286 let lines = render_verify_report(&report, &bucket);
4287
4288 assert!(
4289 lines
4290 .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
4291 "missing D601 line; got {lines:?}"
4292 );
4293 assert!(
4294 lines.contains(
4295 &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
4296 ),
4297 "missing D611 line; got {lines:?}"
4298 );
4299 assert!(
4300 lines.iter().any(|l| l.contains("Result: FAILED")),
4301 "error report must say FAILED; got {lines:?}"
4302 );
4303 }
4304
4305 #[test]
4306 fn render_verify_report_header_shows_global_and_named_app() {
4307 use djogi::migrate::VerifyReport;
4308
4309 let report = VerifyReport {
4310 diagnostics: vec![],
4311 latest_applied_version: None,
4312 applied_count: 0,
4313 unfinished_count: 0,
4314 };
4315
4316 let global = render_verify_report(&report, &render_bucket("main", ""));
4318 assert_eq!(
4319 global.first().map(String::as_str),
4320 Some("djogi migrations verify — main/_global_"),
4321 "global bucket header; got {global:?}"
4322 );
4323
4324 let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
4326 assert_eq!(
4327 named.first().map(String::as_str),
4328 Some("djogi migrations verify — crud_log/billing"),
4329 "named bucket header; got {named:?}"
4330 );
4331 }
4332
4333 #[test]
4334 fn render_verify_report_warning_only_passes_with_warnings() {
4335 use djogi::migrate::{VerifyReport, VerifySeverity};
4336
4337 let report = VerifyReport {
4338 diagnostics: vec![diag(
4339 "D606",
4340 VerifySeverity::Warning,
4341 "type differs (advisory)",
4342 Some("users.age"),
4343 )],
4344 latest_applied_version: Some("001_initial".to_string()),
4345 applied_count: 1,
4346 unfinished_count: 0,
4347 };
4348 let lines = render_verify_report(&report, &render_bucket("main", ""));
4349
4350 assert!(
4351 lines
4352 .iter()
4353 .any(|l| l.contains("Result: PASSED with warnings")),
4354 "warning-only must PASS with warnings; got {lines:?}"
4355 );
4356 assert!(
4357 !lines.iter().any(|l| l.contains("FAILED")),
4358 "warning-only must not say FAILED; got {lines:?}"
4359 );
4360 }
4361
4362 #[test]
4363 fn render_verify_report_empty_ledger_line() {
4364 use djogi::migrate::VerifyReport;
4365
4366 let report = VerifyReport {
4367 diagnostics: vec![],
4368 latest_applied_version: None,
4369 applied_count: 0,
4370 unfinished_count: 0,
4371 };
4372 let lines = render_verify_report(&report, &render_bucket("main", ""));
4373
4374 assert!(
4375 lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
4376 "empty ledger line; got {lines:?}"
4377 );
4378 }
4379
4380 #[test]
4381 fn render_verify_report_unfinished_ledger_line() {
4382 use djogi::migrate::VerifyReport;
4383
4384 let report = VerifyReport {
4385 diagnostics: vec![],
4386 latest_applied_version: Some("V20260501000000__add_users".to_string()),
4387 applied_count: 2,
4388 unfinished_count: 1,
4389 };
4390 let lines = render_verify_report(&report, &render_bucket("main", ""));
4391
4392 assert!(
4393 lines.contains(
4394 &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
4395 ),
4396 "unfinished ledger line; got {lines:?}"
4397 );
4398 }
4399
4400 #[test]
4401 fn render_verify_report_info_with_no_location_uses_dash() {
4402 use djogi::migrate::{VerifyReport, VerifySeverity};
4403
4404 let report = VerifyReport {
4407 diagnostics: vec![diag(
4408 "D692",
4409 VerifySeverity::Info,
4410 "enum type(s) declared; not yet checked",
4411 None,
4412 )],
4413 latest_applied_version: Some("001_initial".to_string()),
4414 applied_count: 1,
4415 unfinished_count: 0,
4416 };
4417 let lines = render_verify_report(&report, &render_bucket("main", ""));
4418
4419 assert!(
4420 lines.iter().any(|l| l.contains("(-)")),
4421 "location: None must render as (-); got {lines:?}"
4422 );
4423 assert!(
4424 lines.contains(&"Result: PASSED (1 info(s))".to_string()),
4425 "all-info summary; got {lines:?}"
4426 );
4427 }
4428
4429 fn db_config(
4432 url: &str,
4433 crud_log_url: Option<&str>,
4434 event_log_url: Option<&str>,
4435 ) -> djogi::config::DatabaseConfig {
4436 djogi::config::DatabaseConfig {
4437 url: url.to_string(),
4438 crud_log_url: crud_log_url.map(str::to_string),
4439 event_log_url: event_log_url.map(str::to_string),
4440 max_connections: None,
4441 dev_mode: false,
4442 }
4443 }
4444
4445 #[test]
4446 fn resolve_bucket_url_main_uses_app_url_verbatim() {
4447 let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
4451 assert_eq!(
4452 resolve_bucket_url(&cfg, "main").as_deref(),
4453 Some("postgres://user:pass@localhost:5432/myapp_prod"),
4454 "main must return the app URL unchanged"
4455 );
4456 }
4457
4458 #[test]
4459 fn resolve_bucket_url_crud_log_prefers_explicit_url() {
4460 let cfg = db_config(
4461 "postgres://localhost/main",
4462 Some("postgres://localhost/explicit_crud"),
4463 None,
4464 );
4465 assert_eq!(
4466 resolve_bucket_url(&cfg, "crud_log").as_deref(),
4467 Some("postgres://localhost/explicit_crud"),
4468 "crud_log must prefer the explicit crud_log_url"
4469 );
4470 }
4471
4472 #[test]
4473 fn resolve_bucket_url_event_log_prefers_explicit_url() {
4474 let cfg = db_config(
4475 "postgres://localhost/main",
4476 None,
4477 Some("postgres://localhost/explicit_event"),
4478 );
4479 assert_eq!(
4480 resolve_bucket_url(&cfg, "event_log").as_deref(),
4481 Some("postgres://localhost/explicit_event"),
4482 "event_log must prefer the explicit event_log_url"
4483 );
4484 }
4485
4486 #[test]
4487 fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
4488 let cfg = db_config("postgres://localhost/main", Some(""), Some(" "));
4491 assert_eq!(
4493 resolve_bucket_url(&cfg, "crud_log").as_deref(),
4494 Some("postgres://localhost/crud_log"),
4495 "empty crud_log_url must fall back to derived"
4496 );
4497 assert_eq!(
4500 resolve_bucket_url(&cfg, "event_log").as_deref(),
4501 Some(" "),
4502 "non-empty (whitespace) event_log_url is used verbatim"
4503 );
4504 }
4505
4506 #[test]
4507 fn resolve_bucket_url_other_database_derives_from_app_url() {
4508 let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
4509 assert_eq!(
4510 resolve_bucket_url(&cfg, "analytics").as_deref(),
4511 Some("postgres://user:pass@localhost:5432/analytics"),
4512 "an arbitrary database name derives by path splice"
4513 );
4514 }
4515
4516 #[test]
4517 fn resolve_bucket_url_pathless_url_returns_none() {
4518 let cfg = db_config("postgres://localhost", None, None);
4520 assert_eq!(
4521 resolve_bucket_url(&cfg, "crud_log"),
4522 None,
4523 "pathless URL must yield None for a derived database"
4524 );
4525 }
4526
4527 #[test]
4528 fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
4529 let cfg = db_config("postgres://localhost", None, None);
4532 assert_eq!(
4533 resolve_bucket_url(&cfg, "main").as_deref(),
4534 Some("postgres://localhost"),
4535 "main returns the app URL verbatim regardless of path"
4536 );
4537 }
4538
4539 #[test]
4540 fn resolve_apply_target_urls_uses_pending_bucket_databases() {
4541 let work = temp_workspace("apply_target_urls");
4542 write_pending_json(
4543 &djogi::migrate::pending_json_path(
4544 &work,
4545 &BucketKey {
4546 database: "main".to_string(),
4547 app: String::new(),
4548 },
4549 ),
4550 "main",
4551 "",
4552 "V20260607010101__main_global",
4553 );
4554 write_pending_json(
4555 &djogi::migrate::pending_json_path(
4556 &work,
4557 &BucketKey {
4558 database: "crud_log".to_string(),
4559 app: "audit".to_string(),
4560 },
4561 ),
4562 "crud_log",
4563 "audit",
4564 "V20260607010102__crud_log_audit",
4565 );
4566
4567 let discovered = discover_pending_plans(&work).expect("discover");
4568 let cfg = db_config(
4569 "postgres://user:pass@localhost:5432/myapp_prod",
4570 Some("postgres://user:pass@localhost:5432/myapp_crud"),
4571 None,
4572 );
4573
4574 let urls = resolve_apply_target_urls(&discovered, &cfg).expect("resolve");
4575 assert_eq!(
4576 urls.len(),
4577 2,
4578 "apply must preserve distinct target databases"
4579 );
4580 assert_eq!(
4581 urls.get("main").map(String::as_str),
4582 Some("postgres://user:pass@localhost:5432/myapp_prod"),
4583 "main pending plans must keep the app database URL"
4584 );
4585 assert_eq!(
4586 urls.get("crud_log").map(String::as_str),
4587 Some("postgres://user:pass@localhost:5432/myapp_crud"),
4588 "crud_log pending plans must route through the crud_log database URL"
4589 );
4590 let _ = fs::remove_dir_all(&work);
4591 }
4592
4593 #[test]
4594 fn resolve_apply_target_urls_refuses_unresolvable_pending_database() {
4595 let work = temp_workspace("apply_target_urls_unresolvable");
4596 write_pending_json(
4597 &djogi::migrate::pending_json_path(
4598 &work,
4599 &BucketKey {
4600 database: "analytics".to_string(),
4601 app: String::new(),
4602 },
4603 ),
4604 "analytics",
4605 "",
4606 "V20260607010103__analytics_global",
4607 );
4608
4609 let discovered = discover_pending_plans(&work).expect("discover");
4610 let cfg = db_config("postgres://localhost", None, None);
4611 let err = resolve_apply_target_urls(&discovered, &cfg)
4612 .expect_err("pathless app URL must refuse a derived pending database");
4613 assert!(err.contains("analytics"), "unexpected error: {err}");
4614 let _ = fs::remove_dir_all(&work);
4615 }
4616
4617 #[test]
4620 fn classify_phase_zero_bytes_identity_free_production_is_ok() {
4621 let sql = current_production_phase_zero_sql("current_bytes");
4622 assert!(
4623 classify_phase_zero_bytes(sql.as_bytes()).is_none(),
4624 "production Phase 0 should be identity-free replay-current (no refusal)"
4625 );
4626 }
4627
4628 #[test]
4629 fn classify_phase_zero_bytes_seed_capable_is_refused() {
4630 let sql = seed_capable_phase_zero_sql();
4631 let refusal = classify_phase_zero_bytes(sql.as_bytes());
4632 assert!(
4633 refusal.is_some(),
4634 "seed-capable Phase 0 should be refused by cleanup guard"
4635 );
4636 assert!(refusal.unwrap().contains("seed-capable"));
4637 }
4638
4639 #[test]
4640 fn classify_phase_zero_bytes_generated_stale_is_refused() {
4641 let sql = generated_stale_phase_zero_sql("stale_bytes");
4642 let refusal = classify_phase_zero_bytes(sql.as_bytes());
4643 assert!(
4644 refusal.is_some(),
4645 "generated-stale Phase 0 should be refused"
4646 );
4647 assert!(refusal.unwrap().contains("generated-stale"));
4648 }
4649
4650 #[test]
4651 fn classify_phase_zero_bytes_markerless_seed_is_refused() {
4652 let sql = markerless_seed_phase_zero_sql("markerless_seed_bytes");
4653 let refusal = classify_phase_zero_bytes(sql.as_bytes());
4654 assert!(
4655 refusal.is_some(),
4656 "markerless seed Phase 0 should be refused by cleanup guard"
4657 );
4658 assert!(refusal.unwrap().contains("seed-dml"));
4659 }
4660
4661 #[test]
4662 fn classify_phase_zero_bytes_extended_seed_dml_forms_are_refused() {
4663 for (name, statement) in extended_seed_statement_cases() {
4664 let sql =
4665 phase_zero_with_seed_statement(&format!("extended_seed_bytes_{name}"), statement);
4666 let refusal = classify_phase_zero_bytes(sql.as_bytes());
4667 let msg = refusal.expect("extended seed Phase 0 should be refused");
4668 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
4669 }
4670 }
4671
4672 #[test]
4673 fn classify_phase_zero_bytes_ambiguous_is_refused() {
4674 let sql = "CREATE SCHEMA IF NOT EXISTS heer;\n\
4676 ALTER DATABASE \"mydb\" SET heer.node_id = '1';\n";
4677 let refusal = classify_phase_zero_bytes(sql.as_bytes());
4678 assert!(refusal.is_some(), "ambiguous Phase 0 should be refused");
4679 assert!(refusal.unwrap().contains("ambiguous"));
4680 }
4681
4682 #[test]
4683 fn classify_phase_zero_bytes_missing_is_refused() {
4684 let refusal = classify_phase_zero_bytes(b" \n\t ");
4685 assert!(refusal.is_some(), "missing Phase 0 should be refused");
4686 assert!(refusal.unwrap().contains("missing"));
4687 }
4688
4689 #[test]
4690 fn classify_phase_zero_for_cleanup_refuses_stale_replay_plan() {
4691 let work = temp_workspace("stale_cleanup");
4692 let bucket_dir = work.join("migrations/main/_global_");
4693 fs::create_dir_all(&bucket_dir).unwrap();
4694
4695 let replay = CliReplayPlan {
4697 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4698 classification: CliClassification::Additive,
4699 checksum_up: "V1:aabbccdd".to_string(),
4700 checksum_down: None,
4701 segments: vec![CliReplaySegment {
4702 kind: CliSegmentKind::Transactional,
4703 statements: vec![CliReplayStatement {
4704 label: "phase_zero_bootstrap".to_string(),
4705 up: generated_stale_phase_zero_sql("stale_replay"),
4706 }],
4707 }],
4708 };
4709 fs::write(
4710 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4711 serde_json::to_string(&replay).unwrap(),
4712 )
4713 .unwrap();
4714
4715 let bucket = djogi::migrate::BucketKey {
4716 database: "main".to_string(),
4717 app: String::new(),
4718 };
4719 let refusal = classify_phase_zero_for_cleanup(
4720 &work,
4721 &bucket,
4722 djogi::migrate::PHASE_ZERO_VERSION,
4723 "V1:aabbccdd",
4724 None,
4725 );
4726 assert!(
4727 refusal.is_some(),
4728 "stale Phase 0 replay plan should be refused by cleanup guard"
4729 );
4730 let msg = refusal.unwrap();
4731 assert!(msg.contains("generated-stale"), "refusal reason: {msg}");
4732
4733 let _ = fs::remove_dir_all(&work);
4734 }
4735
4736 #[test]
4737 fn classify_phase_zero_for_cleanup_allows_current_replay_plan() {
4738 let work = temp_workspace("current_cleanup");
4739 let bucket_dir = work.join("migrations/main/_global_");
4740 fs::create_dir_all(&bucket_dir).unwrap();
4741
4742 let replay = CliReplayPlan {
4744 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4745 classification: CliClassification::Additive,
4746 checksum_up: "V1:eeff0011".to_string(),
4747 checksum_down: None,
4748 segments: vec![CliReplaySegment {
4749 kind: CliSegmentKind::Transactional,
4750 statements: vec![CliReplayStatement {
4751 label: "phase_zero_bootstrap".to_string(),
4752 up: current_production_phase_zero_sql("current_replay"),
4753 }],
4754 }],
4755 };
4756 fs::write(
4757 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4758 serde_json::to_string(&replay).unwrap(),
4759 )
4760 .unwrap();
4761
4762 let bucket = djogi::migrate::BucketKey {
4763 database: "main".to_string(),
4764 app: String::new(),
4765 };
4766 let refusal = classify_phase_zero_for_cleanup(
4767 &work,
4768 &bucket,
4769 djogi::migrate::PHASE_ZERO_VERSION,
4770 "V1:eeff0011",
4771 None,
4772 );
4773 assert!(
4774 refusal.is_none(),
4775 "identity-free Phase 0 should be allowed by cleanup guard; got: {refusal:?}"
4776 );
4777
4778 let _ = fs::remove_dir_all(&work);
4779 }
4780
4781 #[test]
4782 fn classify_phase_zero_for_cleanup_refuses_seed_capable_replay_plan() {
4783 let work = temp_workspace("seed_cleanup_replay_plan");
4784 let bucket_dir = work.join("migrations/main/_global_");
4785 fs::create_dir_all(&bucket_dir).unwrap();
4786
4787 let replay = CliReplayPlan {
4788 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4789 classification: CliClassification::Additive,
4790 checksum_up: "V1:11223344".to_string(),
4791 checksum_down: None,
4792 segments: vec![CliReplaySegment {
4793 kind: CliSegmentKind::Transactional,
4794 statements: vec![CliReplayStatement {
4795 label: "phase_zero_bootstrap".to_string(),
4796 up: seed_capable_phase_zero_sql(),
4797 }],
4798 }],
4799 };
4800 fs::write(
4801 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4802 serde_json::to_string(&replay).unwrap(),
4803 )
4804 .unwrap();
4805
4806 let bucket = djogi::migrate::BucketKey {
4807 database: "main".to_string(),
4808 app: String::new(),
4809 };
4810 let refusal = classify_phase_zero_for_cleanup(
4811 &work,
4812 &bucket,
4813 djogi::migrate::PHASE_ZERO_VERSION,
4814 "V1:11223344",
4815 None,
4816 );
4817 let msg = refusal.expect("seed-capable replay plan must refuse");
4818 assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
4819
4820 let _ = fs::remove_dir_all(&work);
4821 }
4822
4823 #[test]
4824 fn classify_phase_zero_for_cleanup_refuses_markerless_seed_replay_plan() {
4825 let work = temp_workspace("markerless_seed_cleanup_replay_plan");
4826 let bucket_dir = work.join("migrations/main/_global_");
4827 fs::create_dir_all(&bucket_dir).unwrap();
4828
4829 let replay = CliReplayPlan {
4830 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4831 classification: CliClassification::Additive,
4832 checksum_up: "V1:55667788".to_string(),
4833 checksum_down: None,
4834 segments: vec![CliReplaySegment {
4835 kind: CliSegmentKind::Transactional,
4836 statements: vec![CliReplayStatement {
4837 label: "phase_zero_bootstrap".to_string(),
4838 up: markerless_seed_phase_zero_sql("markerless_seed_replay"),
4839 }],
4840 }],
4841 };
4842 fs::write(
4843 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4844 serde_json::to_string(&replay).unwrap(),
4845 )
4846 .unwrap();
4847
4848 let bucket = djogi::migrate::BucketKey {
4849 database: "main".to_string(),
4850 app: String::new(),
4851 };
4852 let refusal = classify_phase_zero_for_cleanup(
4853 &work,
4854 &bucket,
4855 djogi::migrate::PHASE_ZERO_VERSION,
4856 "V1:55667788",
4857 None,
4858 );
4859 let msg = refusal.expect("markerless seed replay plan must refuse");
4860 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
4861
4862 let _ = fs::remove_dir_all(&work);
4863 }
4864
4865 #[test]
4866 fn classify_phase_zero_for_cleanup_refuses_cte_seed_dml_replay_plan() {
4867 let work = temp_workspace("cte_seed_cleanup_replay_plan");
4868 let bucket_dir = work.join("migrations/main/_global_");
4869 fs::create_dir_all(&bucket_dir).unwrap();
4870
4871 let replay = CliReplayPlan {
4872 format_version: CLI_REPLAY_PLAN_FORMAT_VERSION.to_string(),
4873 classification: CliClassification::Additive,
4874 checksum_up: "V1:66778899".to_string(),
4875 checksum_down: None,
4876 segments: vec![CliReplaySegment {
4877 kind: CliSegmentKind::Transactional,
4878 statements: vec![CliReplayStatement {
4879 label: "phase_zero_bootstrap".to_string(),
4880 up: phase_zero_with_seed_statement(
4881 "cte_seed_cleanup_replay",
4882 "WITH rows AS (SELECT 1) INSERT INTO heer.heer_nodes (id) VALUES (1);",
4883 ),
4884 }],
4885 }],
4886 };
4887 fs::write(
4888 bucket_dir.join("V00000000000000__phase_zero_bootstrap.plan.json"),
4889 serde_json::to_string(&replay).unwrap(),
4890 )
4891 .unwrap();
4892
4893 let bucket = djogi::migrate::BucketKey {
4894 database: "main".to_string(),
4895 app: String::new(),
4896 };
4897 let refusal = classify_phase_zero_for_cleanup(
4898 &work,
4899 &bucket,
4900 djogi::migrate::PHASE_ZERO_VERSION,
4901 "V1:66778899",
4902 None,
4903 );
4904 let msg = refusal.expect("CTE seed replay plan must refuse");
4905 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
4906
4907 let _ = fs::remove_dir_all(&work);
4908 }
4909
4910 #[test]
4911 fn classify_phase_zero_for_cleanup_fallback_sql_file() {
4912 let work = temp_workspace("fallback_cleanup");
4913 let bucket_dir = work.join("migrations/main/_global_");
4914 fs::create_dir_all(&bucket_dir).unwrap();
4915
4916 let up_sql = current_production_phase_zero_sql("fallback_sql");
4917 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
4918 fs::write(bucket_dir.join(&up_filename), up_sql).unwrap();
4919
4920 let bucket = djogi::migrate::BucketKey {
4921 database: "main".to_string(),
4922 app: String::new(),
4923 };
4924 let refusal = classify_phase_zero_for_cleanup(
4925 &work,
4926 &bucket,
4927 djogi::migrate::PHASE_ZERO_VERSION,
4928 "V1:anychecksum",
4929 None,
4930 );
4931 assert!(
4932 refusal.is_none(),
4933 "identity-free Phase 0 fallback SQL should be allowed; got: {refusal:?}"
4934 );
4935
4936 let _ = fs::remove_dir_all(&work);
4937 }
4938
4939 #[test]
4940 fn classify_phase_zero_for_cleanup_refuses_seed_capable_fallback_sql_file() {
4941 let work = temp_workspace("seed_cleanup_fallback");
4942 let bucket_dir = work.join("migrations/main/_global_");
4943 fs::create_dir_all(&bucket_dir).unwrap();
4944
4945 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
4946 fs::write(bucket_dir.join(&up_filename), seed_capable_phase_zero_sql()).unwrap();
4947
4948 let bucket = djogi::migrate::BucketKey {
4949 database: "main".to_string(),
4950 app: String::new(),
4951 };
4952 let refusal = classify_phase_zero_for_cleanup(
4953 &work,
4954 &bucket,
4955 djogi::migrate::PHASE_ZERO_VERSION,
4956 "V1:anychecksum",
4957 None,
4958 );
4959 let msg = refusal.expect("seed-capable fallback SQL must refuse");
4960 assert!(msg.contains("seed-capable"), "refusal reason: {msg}");
4961
4962 let _ = fs::remove_dir_all(&work);
4963 }
4964
4965 #[test]
4966 fn classify_phase_zero_for_cleanup_refuses_markerless_seed_fallback_sql_file() {
4967 let work = temp_workspace("markerless_seed_cleanup_fallback");
4968 let bucket_dir = work.join("migrations/main/_global_");
4969 fs::create_dir_all(&bucket_dir).unwrap();
4970
4971 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
4972 fs::write(
4973 bucket_dir.join(&up_filename),
4974 markerless_seed_phase_zero_sql("markerless_seed_fallback"),
4975 )
4976 .unwrap();
4977
4978 let bucket = djogi::migrate::BucketKey {
4979 database: "main".to_string(),
4980 app: String::new(),
4981 };
4982 let refusal = classify_phase_zero_for_cleanup(
4983 &work,
4984 &bucket,
4985 djogi::migrate::PHASE_ZERO_VERSION,
4986 "V1:anychecksum",
4987 None,
4988 );
4989 let msg = refusal.expect("markerless seed fallback SQL must refuse");
4990 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
4991
4992 let _ = fs::remove_dir_all(&work);
4993 }
4994
4995 #[test]
4996 fn classify_phase_zero_for_cleanup_refuses_copy_from_seed_fallback_sql_file() {
4997 let work = temp_workspace("copy_seed_cleanup_fallback");
4998 let bucket_dir = work.join("migrations/main/_global_");
4999 fs::create_dir_all(&bucket_dir).unwrap();
5000
5001 let up_filename = djogi::migrate::up_filename(djogi::migrate::PHASE_ZERO_VERSION);
5002 fs::write(
5003 bucket_dir.join(&up_filename),
5004 phase_zero_with_seed_statement(
5005 "copy_seed_cleanup_fallback",
5006 "COPY \"heer\".\"heer_ranj_node_state\" (\"node_id\") FROM STDIN;",
5007 ),
5008 )
5009 .unwrap();
5010
5011 let bucket = djogi::migrate::BucketKey {
5012 database: "main".to_string(),
5013 app: String::new(),
5014 };
5015 let refusal = classify_phase_zero_for_cleanup(
5016 &work,
5017 &bucket,
5018 djogi::migrate::PHASE_ZERO_VERSION,
5019 "V1:anychecksum",
5020 None,
5021 );
5022 let msg = refusal.expect("COPY FROM seed fallback SQL must refuse");
5023 assert!(msg.contains("seed-dml"), "refusal reason: {msg}");
5024
5025 let _ = fs::remove_dir_all(&work);
5026 }
5027}