1use std::path::{Path, PathBuf};
12use std::process::ExitCode;
13
14use djogi::apps::AppRegistry;
15use djogi::migrate::{
16 AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
17 DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
18 RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
19 VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
20 compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
21 repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
22};
23
24use djogi::migrate::LedgerStatus;
26
27use crate::{PartialApplyResolutionCli, RepairSubcommand};
30
31#[derive(Debug, Clone, serde::Deserialize)]
40struct CliReplayPlan {
41 format_version: String,
42 checksum_up: String,
43 checksum_down: Option<String>,
44 classification: CliClassification,
45 segments: Vec<CliReplaySegment>,
46}
47
48#[derive(Debug, Clone, serde::Deserialize)]
49#[serde(tag = "kind", rename_all = "snake_case")]
50enum CliClassification {
51 NoOp,
52 Additive,
53 Reversible,
54 Destructive,
55 Lossy,
56 Unsupported {
57 reason: String,
58 },
59 PkTypeFlip {
60 co_destructive: bool,
61 co_lossy: bool,
62 },
63}
64
65#[derive(Debug, Clone, serde::Deserialize)]
66struct CliReplaySegment {
67 kind: CliSegmentKind,
68 statements: Vec<CliReplayStatement>,
69}
70
71#[derive(Debug, Clone, serde::Deserialize)]
72#[serde(rename_all = "snake_case")]
73enum CliSegmentKind {
74 Transactional,
75 NonTransactional,
76 MetadataOnly,
77}
78
79#[derive(Debug, Clone, serde::Deserialize)]
80struct CliReplayStatement {
81 label: String,
82 up: String,
83}
84
85const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
87
88fn load_replay_plan_from_disk(
95 workspace: &Path,
96 bucket: &djogi::migrate::BucketKey,
97 version: &str,
98 pending_checksum_up: &str,
99 pending_checksum_down: Option<&str>,
100) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
101 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
103 let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
104
105 if let Ok(bytes) = std::fs::read(&replay_plan_path) {
106 let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
107 Ok(s) => s,
108 Err(e) => {
109 return Err(ApplyReplayPlanError::Parse {
110 path: replay_plan_path.clone(),
111 source: e.to_string(),
112 });
113 }
114 };
115
116 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
117 return Err(ApplyReplayPlanError::FormatVersion {
118 found: stored.format_version,
119 path: replay_plan_path.clone(),
120 });
121 }
122
123 if stored.checksum_up != pending_checksum_up
125 || stored.checksum_down.as_deref() != pending_checksum_down
126 {
127 return Err(ApplyReplayPlanError::ChecksumMismatch);
128 }
129
130 let plan = djogi::migrate::MigrationPlan {
131 bucket: bucket.clone(),
132 classification: stored.classification.into(),
133 segments: stored
134 .segments
135 .into_iter()
136 .map(|seg| djogi::migrate::Segment {
137 kind: seg.kind.into(),
138 statements: seg
139 .statements
140 .into_iter()
141 .map(|stmt| djogi::migrate::OperationSql {
142 label: stmt.label,
143 up: stmt.up,
144 down: String::new(),
145 lossy: None,
146 })
147 .collect(),
148 })
149 .collect(),
150 };
151
152 return Ok((plan, stored.checksum_up, stored.checksum_down));
153 }
154
155 let up_filename = djogi::migrate::up_filename(version);
157 let down_filename = djogi::migrate::down_filename(version);
158 let up_path = bucket_dir.join(&up_filename);
159 let down_path = bucket_dir.join(&down_filename);
160
161 let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
162 path: up_path.clone(),
163 source: e.to_string(),
164 })?;
165
166 let down_sql = match std::fs::read_to_string(&down_path) {
167 Ok(sql) => sql,
168 Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
169 Err(e) => {
170 return Err(ApplyReplayPlanError::SqlRead {
171 path: down_path.clone(),
172 source: e.to_string(),
173 });
174 }
175 };
176
177 let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
181
182 let plan = djogi::migrate::MigrationPlan {
186 bucket: bucket.clone(),
187 classification: djogi::migrate::Classification::Additive,
188 segments: vec![djogi::migrate::Segment {
189 kind: djogi::migrate::SegmentKind::Transactional,
190 statements: vec![djogi::migrate::OperationSql {
191 label: format!("replay {version}"),
192 up: up_sql,
193 down: down_sql,
194 lossy: None,
195 }],
196 }],
197 };
198
199 Ok((plan, computed_checksum_up, None))
200}
201
202#[derive(Debug)]
204enum ApplyReplayPlanError {
205 Parse { path: PathBuf, source: String },
206 FormatVersion { found: String, path: PathBuf },
207 ChecksumMismatch,
208 SqlRead { path: PathBuf, source: String },
209}
210
211impl std::fmt::Display for ApplyReplayPlanError {
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 match self {
214 Self::Parse { path, source } => {
215 write!(f, "parse replay plan {}: {source}", path.display())
216 }
217 Self::FormatVersion { found, path } => write!(
218 f,
219 "replay plan format version mismatch in {}: expected {}, found {}",
220 path.display(),
221 CLI_REPLAY_PLAN_FORMAT_VERSION,
222 found
223 ),
224 Self::ChecksumMismatch => {
225 write!(f, "checksum mismatch between pending JSON and replay plan")
226 }
227 Self::SqlRead { path, source } => {
228 write!(f, "read SQL file {}: {source}", path.display())
229 }
230 }
231 }
232}
233
234impl std::error::Error for ApplyReplayPlanError {}
235
236impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
239 fn from(kind: CliSegmentKind) -> Self {
240 match kind {
241 CliSegmentKind::Transactional => Self::Transactional,
242 CliSegmentKind::NonTransactional => Self::NonTransactional,
243 CliSegmentKind::MetadataOnly => Self::MetadataOnly,
244 }
245 }
246}
247
248impl From<CliClassification> for djogi::migrate::Classification {
249 fn from(classification: CliClassification) -> Self {
250 match classification {
251 CliClassification::NoOp => Self::NoOp,
252 CliClassification::Additive => Self::Additive,
253 CliClassification::Reversible => Self::Reversible,
254 CliClassification::Destructive => Self::Destructive,
255 CliClassification::Lossy => Self::Lossy,
256 CliClassification::Unsupported { reason } => Self::Unsupported { reason },
257 CliClassification::PkTypeFlip {
258 co_destructive,
259 co_lossy,
260 } => Self::PkTypeFlip {
261 co_destructive,
262 co_lossy,
263 },
264 }
265 }
266}
267
268fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
272 workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
273}
274
275fn discover_snapshot_buckets_on_disk(
289 workspace: &Path,
290) -> Vec<djogi::migrate::projection::BucketKey> {
291 let mut out = Vec::new();
292 let migrations_root = djogi::migrate::migrations_root(workspace);
293 let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
294 return out;
295 };
296 for db_entry in db_entries.flatten() {
297 let Ok(ft) = db_entry.file_type() else {
298 continue;
299 };
300 if !ft.is_dir() {
301 continue;
302 }
303 let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
304 continue;
305 };
306 let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
307 continue;
308 };
309 for app_entry in app_entries.flatten() {
310 let Ok(ft) = app_entry.file_type() else {
311 continue;
312 };
313 if !ft.is_dir() {
314 continue;
315 }
316 let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
317 continue;
318 };
319 let snap_path = app_entry.path().join("schema_snapshot.json");
320 if !snap_path.exists() {
321 continue;
322 }
323 let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
324 out.push(djogi::migrate::projection::BucketKey {
325 database: database.clone(),
326 app: label,
327 });
328 }
329 }
330 out
331}
332
333pub fn compose_cmd(
335 provider: &dyn DescriptorProvider,
336 name: &str,
337 allow_destructive: bool,
338 force_overwrite: bool,
339 workspace: Option<PathBuf>,
340) -> ExitCode {
341 let workspace = resolve_workspace(workspace);
342 let models = match project_from_provider(provider) {
343 Ok(m) => m,
344 Err(e) => {
345 eprintln!("djogi migrations compose: projection error: {e}");
346 return ExitCode::from(1);
347 }
348 };
349 let apps: Vec<AppLifecycle> = provider
350 .apps()
351 .iter()
352 .map(|d| AppLifecycle {
353 label: d.label.to_string(),
354 database: d.database.to_string(),
355 renamed_from: d.renamed_from.map(str::to_string),
356 tombstone: d.tombstone,
357 })
358 .collect();
359 let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
365 Ok(c) => c,
366 Err(e) => {
367 eprintln!("djogi migrations compose: config load: {e}");
368 return ExitCode::from(1);
369 }
370 };
371 let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
372 djogi_config.migrate.pk_flip_join_table_option,
373 );
374 compose_with_inputs(
375 &workspace,
376 name,
377 allow_destructive,
378 force_overwrite,
379 &models,
380 &apps,
381 time::OffsetDateTime::now_utc(),
382 Some(pk_flip_option),
383 )
384}
385
386#[allow(clippy::too_many_arguments)]
402fn compose_with_inputs(
403 workspace: &Path,
404 name: &str,
405 allow_destructive: bool,
406 force_overwrite: bool,
407 models: &std::collections::BTreeMap<
408 djogi::migrate::projection::BucketKey,
409 djogi::migrate::AppliedSchema,
410 >,
411 apps: &[AppLifecycle],
412 now: time::OffsetDateTime,
413 pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
414) -> ExitCode {
415 let lock_path = workspace.join(LOCK_FILE_NAME);
416 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
417 Ok(g) => g,
418 Err(e) => {
419 eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
420 return ExitCode::from(1);
421 }
422 };
423
424 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
432 models.keys().cloned().collect();
433 for bucket in discover_snapshot_buckets_on_disk(workspace) {
434 bucket_set.insert(bucket);
435 }
436
437 let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
438 for bucket in &bucket_set {
439 let path = djogi::migrate::snapshot_path(workspace, bucket);
440 match djogi::migrate::load_snapshot(&path) {
441 Ok(s) => {
442 snapshots.insert(bucket.clone(), s);
443 }
444 Err(djogi::migrate::SnapshotError::Io { source, .. })
445 if source.kind() == std::io::ErrorKind::NotFound =>
446 {
447 }
449 Err(e) => {
450 eprintln!(
451 "djogi migrations compose: snapshot load failed at {}: {e}",
452 path.display()
453 );
454 return ExitCode::from(1);
455 }
456 }
457 }
458
459 let req = ComposeRequest {
460 workspace_root: workspace,
461 models,
462 snapshots: &snapshots,
463 apps,
464 name,
465 allow_destructive,
466 force_overwrite,
467 now,
468 _guard: &guard,
469 pk_flip_join_table_option,
470 skip_phase_zero_auto_emit: false,
476 };
477 match compose(req) {
478 Ok(report) => {
479 for emit in &report.emitted_phase_zero {
483 let ext_summary = if emit.extensions.is_empty() {
484 "no extensions".to_string()
485 } else {
486 format!(
487 "extensions: {}",
488 emit.extensions
489 .iter()
490 .cloned()
491 .collect::<Vec<_>>()
492 .join(", ")
493 )
494 };
495 println!(
496 "auto-emitted Phase 0 bootstrap: {database}/_global_ ({ext_summary})",
497 database = emit.database,
498 );
499 }
500 for cb in &report.composed_buckets {
501 println!(
502 "composed {database}/{app}: {version} ({classification:?})",
503 database = cb.bucket.database,
504 app = if cb.bucket.app.is_empty() {
505 "_global_"
506 } else {
507 cb.bucket.app.as_str()
508 },
509 version = cb.version,
510 classification = cb.classification,
511 );
512 }
513 ExitCode::from(0)
514 }
515 Err(ComposeError::NothingToCompose) => {
516 println!("nothing to compose — model state matches snapshot for every bucket");
517 ExitCode::from(0)
521 }
522 Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
523 eprintln!("djogi migrations compose: {text}");
524 ExitCode::from(2)
526 }
527 Err(e) => {
528 eprintln!("djogi migrations compose: {e}");
529 ExitCode::from(1)
530 }
531 }
532}
533
534pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
540 let workspace = resolve_workspace(workspace);
541
542 let runtime = match tokio::runtime::Builder::new_current_thread()
544 .enable_all()
545 .build()
546 {
547 Ok(r) => r,
548 Err(e) => {
549 eprintln!("djogi migrations status: tokio runtime: {e}");
550 return ExitCode::from(1);
551 }
552 };
553
554 let exit = runtime.block_on(async { run_status(&workspace).await });
555 ExitCode::from(exit as u8)
556}
557
558async fn run_status(workspace: &Path) -> i32 {
568 use djogi::config::DjogiConfig;
569
570 let config = match DjogiConfig::load_from_workspace(workspace) {
571 Ok(c) => c,
572 Err(e) => {
573 eprintln!("djogi migrations status: config load: {e}");
574 return 1;
575 }
576 };
577
578 let mut ctx = match connect_and_check(&config.database.url).await {
579 ContextOutcome::Ready(ctx) => ctx,
580 ContextOutcome::UnsupportedVersion(e) => {
581 crate::print_support_boundary_error("migrations status", &e);
582 return 2;
583 }
584 ContextOutcome::RuntimeError(msg) => {
585 eprintln!("djogi migrations status: pool: {msg}");
586 return 1;
587 }
588 };
589
590 let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
591 Ok(rows) => rows,
592 Err(e) => {
593 if e.to_string().contains("djogi_schema_migrations") {
596 println!("No migrations recorded.");
597 return 0;
598 }
599 eprintln!("djogi migrations status: ledger read: {e}");
600 return 1;
601 }
602 };
603
604 let registered: Vec<String> = AppRegistry::all()
605 .iter()
606 .map(|d| d.label.to_string())
607 .collect();
608 let report = djogi::migrate::render_status(&rows, ®istered);
609 for line in &report.lines {
610 println!("{line}");
611 }
612 report.exit_code
613}
614
615#[allow(clippy::large_enum_variant)]
638enum ContextOutcome {
639 Ready(djogi::context::DjogiContext),
641 UnsupportedVersion(djogi::error::DjogiError),
644 RuntimeError(String),
647}
648
649async fn connect_and_check(url: &str) -> ContextOutcome {
658 let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
659 Ok(p) => p,
660 Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
661 };
662 match djogi::pg::preflight::check_postgres_version(&pool).await {
663 Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
664 Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
667 ContextOutcome::UnsupportedVersion(e)
668 }
669 Err(other) => ContextOutcome::RuntimeError(other.to_string()),
670 }
671}
672
673fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
696 if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
699 return Some(db_config.url.clone());
700 }
701 if database == "crud_log"
702 && let Some(u) = db_config.crud_log_url.as_deref()
703 && !u.is_empty()
704 {
705 return Some(u.to_string());
706 }
707 if database == "event_log"
708 && let Some(u) = db_config.event_log_url.as_deref()
709 && !u.is_empty()
710 {
711 return Some(u.to_string());
712 }
713 djogi::migrate::derive_per_database_url(&db_config.url, database)
714}
715
716pub fn apply_cmd(workspace: Option<PathBuf>, fake: bool, reason: Option<String>) -> ExitCode {
723 let workspace = resolve_workspace(workspace);
724
725 let mode = if fake {
727 match reason {
728 Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
729 Some(_) => {
730 eprintln!(
731 "djogi migrations apply --fake: --reason must not be empty; \
732 supply a non-empty reason why these migrations are being \
733 faked (e.g. 'schema pre-exists from prior tooling')"
734 );
735 return ExitCode::from(2);
736 }
737 None => {
738 eprintln!(
739 "djogi migrations apply --fake: --reason is required; \
740 supply a reason why these migrations are being faked \
741 (e.g. 'schema pre-exists from prior tooling'). \
742 This is recorded in the ledger audit trail."
743 );
744 return ExitCode::from(2);
745 }
746 }
747 } else {
748 FakeMode::Real
749 };
750
751 let runtime = match tokio::runtime::Builder::new_current_thread()
752 .enable_all()
753 .build()
754 {
755 Ok(r) => r,
756 Err(e) => {
757 eprintln!("djogi migrations apply: tokio runtime: {e}");
758 return ExitCode::from(1);
759 }
760 };
761
762 let exit = runtime.block_on(async { run_apply(&workspace, &mode).await });
763 ExitCode::from(exit as u8)
764}
765
766#[derive(Debug, Clone)]
769enum FakeMode {
770 Real,
772 Fake { reason: String },
774}
775
776async fn run_apply(workspace: &Path, mode: &FakeMode) -> i32 {
778 use djogi::config::DjogiConfig;
779
780 let action_verb = match mode {
781 FakeMode::Real => "apply",
782 FakeMode::Fake { .. } => "fake-apply",
783 };
784 let progress_verb = match mode {
785 FakeMode::Real => "applying",
786 FakeMode::Fake { .. } => "faking",
787 };
788
789 let config = match DjogiConfig::load_from_workspace(workspace) {
791 Ok(c) => c,
792 Err(e) => {
793 eprintln!("djogi migrations {action_verb}: config load: {e}");
794 return 2;
795 }
796 };
797
798 let pool = match djogi::pg::pool::DjogiPool::connect(&config.database.url).await {
800 Ok(p) => p,
801 Err(e) => {
802 eprintln!("djogi migrations {action_verb}: pool connect: {e}");
803 return 1;
804 }
805 };
806 if let Err(e) = djogi::pg::preflight::check_postgres_version(&pool).await {
807 crate::print_support_boundary_error("migrations apply", &e);
808 return 2;
809 }
810
811 let pending_files = discover_pending_plans(workspace);
813 if pending_files.is_empty() {
814 println!("No pending migrations to {action_verb}.");
815 return 0;
816 }
817
818 let lock_path = workspace.join(LOCK_FILE_NAME);
820 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
821 Ok(g) => g,
822 Err(e) => {
823 eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
824 return 1;
825 }
826 };
827
828 let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
830 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
831 Err(_) => None,
832 };
833
834 let mut ctx = djogi::context::DjogiContext::from_pool(pool);
836
837 for (pending_path, bucket_database, app_label) in &pending_files {
839 println!(" {progress_verb} {bucket_database}/{app_label}...");
840 let result = apply_one_pending(
841 &mut ctx,
842 workspace,
843 pending_path,
844 bucket_database.clone(),
845 app_label.clone(),
846 &config,
847 &guard,
848 audit_pool.as_ref(),
849 mode,
850 )
851 .await;
852
853 match result {
854 ApplyResult::Ok => match mode {
855 FakeMode::Real => {
856 println!("Applied: {bucket_database}/{app_label}");
857 }
858 FakeMode::Fake { .. } => {
859 println!(
860 " faked {bucket_database}/{app_label}: \
861 recorded in ledger with status = 'faked' (no SQL executed)"
862 );
863 }
864 },
865 ApplyResult::Skipped(reason) => {
866 println!("Skipped {bucket_database}/{app_label}: {reason}");
867 }
868 ApplyResult::Refused(reason) => {
869 eprintln!(
870 "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
871 );
872 return 2;
873 }
874 ApplyResult::RunnerError(e) => {
875 eprintln!(
876 "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
877 );
878 return runner_error_exit_code(&e);
879 }
880 }
881 }
882
883 let summary_verb = match mode {
884 FakeMode::Real => "applied",
885 FakeMode::Fake { .. } => "faked",
886 };
887 println!("{summary_verb} {} migration(s).", pending_files.len());
888 0
889}
890
891#[derive(Debug)]
893enum ApplyResult {
894 Ok,
896 Skipped(String),
898 Refused(String),
900 RunnerError(RunnerError),
902}
903
904fn discover_pending_plans(workspace: &Path) -> Vec<(PathBuf, String, String)> {
910 let pending_root = djogi::migrate::pending_root(workspace);
911 let mut out = Vec::new();
912
913 let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
914 return out;
915 };
916
917 for db_entry in db_entries.flatten() {
918 let db_name = match db_entry.file_name().to_str().map(str::to_string) {
919 Some(n) => n,
920 None => continue,
921 };
922 if db_name.starts_with('.') {
923 continue;
924 }
925
926 let db_dir = db_entry.path();
927 if !db_dir.is_dir() {
928 continue;
929 }
930
931 let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
932 continue;
933 };
934
935 for app_entry in app_entries.flatten() {
936 let path = app_entry.path();
937 if !path.is_file() {
938 continue;
939 }
940 let filename = match path.file_name().and_then(|f| f.to_str()) {
941 Some(f) => f,
942 None => continue,
943 };
944 if !filename.ends_with(".json") {
947 continue;
948 }
949 let app_label = if let Some(stripped) = filename.strip_suffix(".json") {
952 stripped.to_string()
953 } else {
954 continue;
955 };
956
957 out.push((path, db_name.clone(), app_label));
958 }
959 }
960
961 out.sort_by(|a, b| a.0.cmp(&b.0));
962 out
963}
964
965#[allow(clippy::too_many_arguments)]
980#[djogi::deliberately_bypass_convention_with_raw_sql]
981async fn apply_one_pending(
987 ctx: &mut djogi::context::DjogiContext,
988 workspace: &Path,
989 pending_path: &Path,
990 bucket_database: String,
991 app_label: String,
992 config: &djogi::config::DjogiConfig,
993 guard: &djogi::migrate::WorkspaceGuard,
994 audit_pool: Option<&deadpool_postgres::Pool>,
995 mode: &FakeMode,
996) -> ApplyResult {
997 let pending_bytes = match std::fs::read(pending_path) {
999 Ok(b) => b,
1000 Err(e) => {
1001 return ApplyResult::Refused(format!("read pending JSON: {e}"));
1002 }
1003 };
1004 let pending: PendingPlan = match serde_json::from_slice(&pending_bytes) {
1005 Ok(p) => p,
1006 Err(e) => {
1007 return ApplyResult::Refused(format!("parse pending JSON: {e}"));
1008 }
1009 };
1010
1011 let resolved_app = if app_label == "_global_" {
1014 String::new()
1015 } else {
1016 app_label.clone()
1017 };
1018 let bucket = djogi::migrate::BucketKey {
1019 database: bucket_database,
1020 app: resolved_app,
1021 };
1022
1023 match check_ledger_state(ctx, &pending.version).await {
1025 LedgerState::NotPresent => {} LedgerState::AlreadyApplied => {
1027 return ApplyResult::Skipped("already applied".to_string());
1028 }
1029 LedgerState::PendingOrPartial(existing_status) => {
1030 if existing_status == LedgerStatus::Failed
1035 || existing_status == LedgerStatus::RolledBack
1036 {
1037 if let Err(e) = delete_failed_ledger_row(ctx, &pending.version).await {
1043 return ApplyResult::Refused(format!(
1044 "clean {} ledger row: {e}",
1045 existing_status.as_db_str()
1046 ));
1047 }
1048 } else {
1049 return ApplyResult::Refused(format!(
1050 "version already in {} state — resolve before re-applying",
1051 existing_status.as_db_str()
1052 ));
1053 }
1054 }
1055 }
1056
1057 let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1059 workspace,
1060 &bucket,
1061 &pending.version,
1062 &pending.checksum_up,
1063 pending.checksum_down.as_deref(),
1064 ) {
1065 Ok(result) => result,
1066 Err(e) => {
1067 return ApplyResult::Refused(format!("load replay plan: {e}"));
1068 }
1069 };
1070
1071 let runner_ctx = RunnerCtx {
1073 bucket: bucket.clone(),
1074 version: pending.version.clone(),
1075 description: pending.slug.clone(),
1076 checksum_up,
1077 checksum_down,
1078 snapshot: Some(pending.model_snapshot.clone()),
1079 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
1080 config: djogi::config::MigrateConfig {
1082 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1083 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1084 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1085 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1086 },
1087 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1088 audit_pool: audit_pool.cloned(),
1089 };
1090
1091 let runner_result = match mode {
1093 FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1094 FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1095 };
1096 match runner_result {
1097 Ok(_) => ApplyResult::Ok,
1098 Err(e) => ApplyResult::RunnerError(e),
1099 }
1100}
1101
1102#[derive(Debug)]
1104enum LedgerState {
1105 NotPresent,
1107 AlreadyApplied,
1109 PendingOrPartial(LedgerStatus),
1111}
1112
1113async fn check_ledger_state(ctx: &mut djogi::context::DjogiContext, version: &str) -> LedgerState {
1115 let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1116 return LedgerState::NotPresent;
1119 };
1120
1121 let existing = rows.iter().find(|r| r.version == version);
1122 match existing {
1123 None => LedgerState::NotPresent,
1124 Some(row) => match row.status {
1125 LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1126 LedgerState::AlreadyApplied
1127 }
1128 LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1129 LedgerState::PendingOrPartial(row.status)
1130 }
1131 },
1132 }
1133}
1134
1135fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1141 1
1142}
1143
1144#[djogi::deliberately_bypass_convention_with_raw_sql]
1145async fn delete_failed_ledger_row(
1150 ctx: &mut djogi::context::DjogiContext,
1151 version: &str,
1152) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1153 ctx.raw_execute(
1154 "DELETE FROM djogi_schema_migrations WHERE version = $1",
1155 &[&version],
1156 )
1157 .await?;
1158 Ok(())
1159}
1160
1161fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1163 let migrations_root = djogi::migrate::migrations_root(workspace);
1164 migrations_root
1165 .join(&bucket.database)
1166 .join(djogi::migrate::app_dirname(&bucket.app))
1167 .join("schema_snapshot.json")
1168}
1169
1170#[allow(clippy::too_many_arguments)]
1201pub fn attune_cmd(
1202 target: Option<&str>,
1203 apply: bool,
1204 record: bool,
1205 record_ledger: bool,
1206 record_reason: &str,
1207 squash: bool,
1208 from: Option<&str>,
1209 publish: bool,
1210 app: Option<&str>,
1211 workspace: Option<PathBuf>,
1212) -> ExitCode {
1213 let workspace = resolve_workspace(workspace);
1214 let mode = match (record_ledger, squash) {
1215 (false, false) => AttuneMode::DiffOnly,
1216 (true, false) => AttuneMode::Record {
1217 reason: record_reason.to_string(),
1218 },
1219 (false, true) => match from {
1220 Some(v) if !v.is_empty() => AttuneMode::Squash {
1221 from: v.to_string(),
1222 publish,
1223 app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1224 },
1225 _ => {
1226 eprintln!(
1227 "djogi migrations attune --squash requires --from <version> (e.g. \
1228 `--from V20260101000000__init`)"
1229 );
1230 return ExitCode::from(2);
1231 }
1232 },
1233 (true, true) => {
1234 eprintln!(
1237 "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1238 );
1239 return ExitCode::from(2);
1240 }
1241 };
1242
1243 let runtime = match tokio::runtime::Builder::new_current_thread()
1244 .enable_all()
1245 .build()
1246 {
1247 Ok(r) => r,
1248 Err(e) => {
1249 eprintln!("djogi migrations attune: tokio runtime: {e}");
1250 return ExitCode::from(1);
1251 }
1252 };
1253
1254 let target_owned = target.map(str::to_string);
1255 let exit =
1256 runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1257 ExitCode::from(exit as u8)
1258}
1259
1260async fn run_attune(
1263 workspace: &Path,
1264 mode: AttuneMode,
1265 target: Option<String>,
1266 apply: bool,
1267 record: bool,
1268) -> i32 {
1269 use djogi::config::DjogiConfig;
1270
1271 let config = match DjogiConfig::load_from_workspace(workspace) {
1272 Ok(c) => c,
1273 Err(e) => {
1274 eprintln!("djogi migrations attune: config load: {e}");
1275 return 1;
1276 }
1277 };
1278
1279 let mut ctx = match connect_and_check(&config.database.url).await {
1280 ContextOutcome::Ready(ctx) => ctx,
1281 ContextOutcome::UnsupportedVersion(e) => {
1282 crate::print_support_boundary_error("migrations attune", &e);
1283 return 2;
1284 }
1285 ContextOutcome::RuntimeError(msg) => {
1286 eprintln!("djogi migrations attune: pool: {msg}");
1287 return 1;
1288 }
1289 };
1290
1291 let lock_path = workspace.join(LOCK_FILE_NAME);
1295 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1296 Ok(g) => g,
1297 Err(e) => {
1298 eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
1299 return 1;
1300 }
1301 };
1302
1303 let req = AttuneRequest {
1304 workspace_root: workspace,
1305 database_url: &config.database.url,
1306 profile: &config.profile,
1307 dev_mode: config.database.dev_mode,
1311 target: target.as_deref(),
1316 apply,
1317 record,
1318 mode,
1319 _guard: &guard,
1320 };
1321 match attune(&mut ctx, req).await {
1322 Ok(report) => {
1323 if report.entries.is_empty() {
1324 println!("attune: no drift");
1325 } else {
1326 for entry in &report.entries {
1327 let app_display = if entry.bucket.app.is_empty() {
1328 "_global_"
1329 } else {
1330 entry.bucket.app.as_str()
1331 };
1332 println!(
1333 " {kind:<10} {database}/{app} {version}",
1334 kind = entry.kind.as_str(),
1335 database = entry.bucket.database,
1336 app = app_display,
1337 version = entry.version,
1338 );
1339 }
1340 }
1341 for diag in &report.diagnostics {
1345 println!(" diagnostic: {diag}");
1346 }
1347 if let Some(sha) = &report.resolved_target {
1348 println!("resolved target: {sha}");
1349 }
1350 if let Some(squashed) = &report.squashed_to {
1351 println!("squashed to: {squashed}");
1352 }
1353 if report.published {
1354 println!("published to remote");
1355 }
1356 if report.parent_pointer_updated {
1357 println!("parent submodule pointer updated");
1358 }
1359 0
1360 }
1361 Err(e) => {
1362 eprintln!("djogi migrations attune: {e}");
1363 attune_error_exit_code(&e)
1364 }
1365 }
1366}
1367
1368fn attune_error_exit_code(err: &AttuneError) -> i32 {
1385 match err {
1386 AttuneError::Refused(_) => 2,
1387 AttuneError::FilesystemScanFailed { .. }
1388 | AttuneError::LedgerQueryFailed { .. }
1389 | AttuneError::SqlReadFailed { .. }
1390 | AttuneError::SqlWriteFailed { .. }
1391 | AttuneError::SqlDeleteFailed { .. }
1392 | AttuneError::GitPublishFailed { .. }
1393 | AttuneError::GitTargetResolveFailed { .. }
1394 | AttuneError::GitFetchFailed { .. }
1395 | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
1396 }
1397}
1398
1399pub fn verify_cmd(
1409 provider: &dyn DescriptorProvider,
1410 workspace: Option<PathBuf>,
1411 strict: bool,
1412) -> ExitCode {
1413 let workspace = resolve_workspace(workspace);
1414
1415 let runtime = match tokio::runtime::Builder::new_current_thread()
1416 .enable_all()
1417 .build()
1418 {
1419 Ok(r) => r,
1420 Err(e) => {
1421 eprintln!("djogi migrations verify: tokio runtime: {e}");
1422 return ExitCode::from(1);
1423 }
1424 };
1425
1426 let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
1427 ExitCode::from(exit as u8)
1428}
1429
1430async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
1447 use djogi::config::DjogiConfig;
1448
1449 if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
1462 crate::print_zero_descriptor_diagnostic("migrations verify");
1463 return 2;
1464 }
1465
1466 let config = match DjogiConfig::load_from_workspace(workspace) {
1468 Ok(c) => c,
1469 Err(e) => {
1470 eprintln!("djogi migrations verify: config load: {e}");
1471 return 1;
1472 }
1473 };
1474
1475 let models = match project_from_provider(provider) {
1477 Ok(m) => m,
1478 Err(e) => {
1479 eprintln!("djogi migrations verify: projection error: {e}");
1480 return 1;
1481 }
1482 };
1483
1484 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
1490 models.keys().cloned().collect();
1491 for bucket in discover_snapshot_buckets_on_disk(workspace) {
1492 bucket_set.insert(bucket);
1493 }
1494 if bucket_set.is_empty() {
1501 crate::print_zero_descriptor_diagnostic("migrations verify");
1502 return 2;
1503 }
1504
1505 let policy = djogi::config::PolicyConfig {
1507 strict_out_of_order: strict,
1508 };
1509
1510 let database_has_models: std::collections::HashSet<String> = bucket_set
1517 .iter()
1518 .filter(|b| {
1519 models
1520 .get(*b)
1521 .map(|s| !s.models.is_empty())
1522 .unwrap_or(false)
1523 })
1524 .map(|b| b.database.clone())
1525 .collect();
1526
1527 let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
1533 std::collections::BTreeMap::new();
1534 let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
1535 let mut exit_code: i32 = 0;
1536
1537 for bucket in &bucket_set {
1539 let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
1541 let bd = if bucket.app.is_empty() {
1542 "_global_"
1543 } else {
1544 &bucket.app
1545 };
1546 eprintln!(
1547 "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
1548 check that config.database.url has a valid path component",
1549 bucket.database, bucket.database, bd
1550 );
1551 exit_code = 1;
1552 continue;
1553 };
1554
1555 if !contexts.contains_key(&bucket.database) {
1559 match connect_and_check(&url).await {
1560 ContextOutcome::Ready(ctx) => {
1561 contexts.insert(bucket.database.clone(), ctx);
1562 }
1563 ContextOutcome::UnsupportedVersion(e) => {
1564 crate::print_support_boundary_error("migrations verify", &e);
1565 return 2;
1566 }
1567 ContextOutcome::RuntimeError(msg) => {
1568 eprintln!(
1569 "djogi migrations verify: pool for '{}': {msg}",
1570 bucket.database
1571 );
1572 exit_code = 1;
1573 continue;
1574 }
1575 }
1576 }
1577
1578 let snap_path = snapshot_path(workspace, bucket);
1583 let snapshot = match load_snapshot(&snap_path) {
1584 Ok(s) => s,
1585 Err(SnapshotError::Io { source, .. })
1586 if source.kind() == std::io::ErrorKind::NotFound =>
1587 {
1588 let bd = if bucket.app.is_empty() {
1589 "_global_"
1590 } else {
1591 &bucket.app
1592 };
1593 let has_models = models
1594 .get(bucket)
1595 .map(|s| !s.models.is_empty())
1596 .unwrap_or(false);
1597 if has_models {
1598 eprintln!(
1599 "djogi migrations verify: {}/{} has registered models but no \
1600 snapshot; run `djogi migrations compose` then \
1601 `djogi migrations apply` to record a baseline",
1602 bucket.database, bd
1603 );
1604 exit_code = 1;
1605 } else {
1606 println!("No snapshot found for bucket {}/{}", bucket.database, bd);
1607 }
1608 continue;
1609 }
1610 Err(e) => {
1611 let bd = if bucket.app.is_empty() {
1612 "_global_"
1613 } else {
1614 &bucket.app
1615 };
1616 eprintln!(
1617 "djogi migrations verify: load snapshot for {}/{}: {e}",
1618 bucket.database, bd
1619 );
1620 exit_code = 1;
1621 continue;
1622 }
1623 };
1624
1625 let db_has_models = database_has_models.contains(&bucket.database);
1630 let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
1631
1632 let ctx = contexts
1634 .get_mut(&bucket.database)
1635 .expect("context inserted above");
1636 let report = match djogi::migrate::verify_bucket(
1637 ctx,
1638 bucket,
1639 &snapshot,
1640 &policy,
1641 emit_ledger,
1642 db_has_models,
1643 )
1644 .await
1645 {
1646 Ok(r) => r,
1647 Err(e) => {
1648 let bd = if bucket.app.is_empty() {
1649 "_global_"
1650 } else {
1651 &bucket.app
1652 };
1653 eprintln!(
1654 "djogi migrations verify: error for {}/{}: {e}",
1655 bucket.database, bd
1656 );
1657 exit_code = 1;
1658 continue;
1659 }
1660 };
1661
1662 for line in render_verify_report(&report, bucket) {
1664 println!("{line}");
1665 }
1666 if report.has_errors() {
1667 exit_code = 1;
1668 }
1669 }
1670
1671 exit_code
1672}
1673
1674fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
1684 let mut lines: Vec<String> = Vec::new();
1685
1686 let app_display = if bucket.app.is_empty() {
1687 "_global_"
1688 } else {
1689 &bucket.app
1690 };
1691 lines.push(format!(
1692 "djogi migrations verify — {}/{}",
1693 bucket.database, app_display
1694 ));
1695 lines.push("──────────────────────────────────────────".to_string());
1696
1697 match (
1698 &report.latest_applied_version,
1699 report.applied_count,
1700 report.unfinished_count,
1701 ) {
1702 (Some(version), applied, 0) => {
1703 lines.push(format!("Ledger: {applied} applied, latest {version}"));
1704 }
1705 (Some(version), applied, unfinished) => {
1706 lines.push(format!(
1707 "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
1708 ));
1709 }
1710 (None, 0, 0) => {
1711 lines.push("Ledger: empty (no migrations applied yet)".to_string());
1712 }
1713 _ => {}
1714 }
1715 lines.push(String::new());
1716
1717 if report.diagnostics.is_empty() {
1718 lines.push("No drift detected. Schema is consistent.".to_string());
1719 } else {
1720 for d in &report.diagnostics {
1721 let severity = match d.severity {
1722 VerifySeverity::Info => "INFO",
1723 VerifySeverity::Warning => "WARN",
1724 VerifySeverity::Error => "ERROR",
1725 };
1726 let location = d.location.as_deref().unwrap_or("-");
1727 lines.push(format!(
1728 "[{severity}] {code} ({loc}): {msg}",
1729 severity = severity,
1730 code = d.code,
1731 loc = location,
1732 msg = d.message
1733 ));
1734 }
1735 }
1736
1737 let errors = report
1738 .diagnostics
1739 .iter()
1740 .filter(|d| d.severity == VerifySeverity::Error)
1741 .count();
1742 let warnings = report
1743 .diagnostics
1744 .iter()
1745 .filter(|d| d.severity == VerifySeverity::Warning)
1746 .count();
1747 let infos = report
1748 .diagnostics
1749 .iter()
1750 .filter(|d| d.severity == VerifySeverity::Info)
1751 .count();
1752
1753 if errors > 0 {
1754 lines.push(String::new());
1755 lines.push(format!(
1756 "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
1757 ));
1758 } else if warnings > 0 {
1759 lines.push(String::new());
1760 lines.push(format!(
1761 "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
1762 ));
1763 } else {
1764 lines.push(String::new());
1765 lines.push(format!("Result: PASSED ({infos} info(s))"));
1766 }
1767
1768 lines
1769}
1770
1771impl From<PartialApplyResolutionCli> for PartialApplyResolution {
1774 fn from(cli: PartialApplyResolutionCli) -> Self {
1775 match cli {
1776 PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
1777 PartialApplyResolutionCli::Faked => Self::MarkFaked,
1778 PartialApplyResolutionCli::Applied => Self::MarkApplied,
1779 }
1780 }
1781}
1782
1783pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
1789 match command {
1790 RepairSubcommand::ChecksumDrift {
1791 version,
1792 app,
1793 database,
1794 checksum_up,
1795 checksum_down,
1796 workspace,
1797 } => repair_checksum_drift_cmd(
1798 &version,
1799 app.as_deref(),
1800 database.as_deref(),
1801 checksum_up.as_deref(),
1802 checksum_down.as_deref(),
1803 workspace,
1804 ),
1805 RepairSubcommand::PartialApply {
1806 version,
1807 resolution,
1808 note,
1809 app,
1810 database,
1811 workspace,
1812 } => repair_partial_apply_cmd(
1813 &version,
1814 resolution.into(),
1815 ¬e,
1816 app.as_deref(),
1817 database.as_deref(),
1818 workspace,
1819 ),
1820 RepairSubcommand::ResumePartial {
1821 version,
1822 app,
1823 database,
1824 workspace,
1825 } => repair_resume_partial_apply_cmd(
1826 &version,
1827 app.as_deref(),
1828 database.as_deref(),
1829 workspace,
1830 ),
1831 RepairSubcommand::SnapshotRebuild {
1832 app,
1833 database,
1834 snapshot_path,
1835 workspace,
1836 } => repair_snapshot_rebuild_cmd(
1837 app.as_deref(),
1838 database.as_deref(),
1839 snapshot_path.as_deref(),
1840 workspace,
1841 ),
1842 }
1843}
1844
1845fn render_repair_report(report: &RepairReport) {
1849 for action in &report.actions_taken {
1850 println!(" {action}");
1851 }
1852 if !report.ledger_changes.is_empty() {
1853 println!("Ledger changes:");
1854 for lc in &report.ledger_changes {
1855 println!(
1856 " {} | {} | {} -> {}",
1857 lc.version, lc.column, lc.before, lc.after,
1858 );
1859 }
1860 }
1861 if !report.snapshot_changes.is_empty() {
1862 println!("Snapshot changes:");
1863 for sc in &report.snapshot_changes {
1864 println!(" {} | {}", sc.path.display(), sc.description);
1865 }
1866 }
1867}
1868
1869fn repair_error_exit_code(err: &RepairError) -> i32 {
1885 match err {
1886 RepairError::LedgerIo { .. } | RepairError::SnapshotIo { .. } | RepairError::AdvisoryLockFailed { .. } | RepairError::AdvisoryLockQueryFailed { .. } | RepairError::PinnedSessionCheckoutFailed { .. } | RepairError::ResumeStepFailed { .. } | RepairError::ResumeProgressAckFailed { .. } => 1,
1897
1898 RepairError::VersionNotFound { .. }
1902 | RepairError::InsufficientConfirmation
1903 | RepairError::InvalidChecksum { .. }
1904 | RepairError::InvalidResolution { .. }
1905 | RepairError::BucketAppMismatch { .. }
1906 | RepairError::PlanVersionMismatch { .. }
1907 | RepairError::PlanChecksumMismatch { .. }
1908 | RepairError::LeafIdentityMismatch { .. }
1909 | RepairError::NothingToResume { .. }
1910 | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
1911 | RepairError::SuppliedSnapshotDiverges { .. }
1912 | RepairError::AdvisoryUnlockReturnedFalse { .. } | RepairError::ResumePlanShapeMismatch { .. }
1914 | RepairError::ReplayPlanShapeMismatch { .. }
1915 => 2,
1916 }
1917}
1918
1919fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
1927 database.unwrap_or("main").to_string()
1928}
1929
1930fn compute_checksum_up_from_disk(
1946 workspace: &Path,
1947 bucket: &djogi::migrate::BucketKey,
1948 version: &str,
1949) -> std::io::Result<String> {
1950 let path =
1951 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
1952 let sql = std::fs::read_to_string(&path)?;
1953 Ok(djogi::migrate::compute_committed_sql_checksum(
1954 &sql,
1955 djogi::migrate::ResetSqlSide::Up,
1956 ))
1957}
1958
1959fn compute_checksum_down_from_disk(
1970 workspace: &Path,
1971 bucket: &djogi::migrate::BucketKey,
1972 version: &str,
1973) -> std::io::Result<Option<String>> {
1974 let path =
1975 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
1976 let sql = match std::fs::read_to_string(&path) {
1977 Ok(s) => s,
1978 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1979 Err(e) => return Err(e),
1980 };
1981 Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
1982}
1983
1984pub fn repair_checksum_drift_cmd(
1992 version: &str,
1993 app: Option<&str>,
1994 database: Option<&str>,
1995 checksum_up: Option<&str>,
1996 checksum_down: Option<&str>,
1997 workspace: Option<PathBuf>,
1998) -> ExitCode {
1999 let workspace = resolve_workspace(workspace);
2000 let runtime = match tokio::runtime::Builder::new_current_thread()
2001 .enable_all()
2002 .build()
2003 {
2004 Ok(r) => r,
2005 Err(e) => {
2006 eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
2007 return ExitCode::from(1);
2008 }
2009 };
2010 let exit = runtime.block_on(async {
2011 run_repair_checksum_drift(
2012 &workspace,
2013 version,
2014 app,
2015 database,
2016 checksum_up,
2017 checksum_down,
2018 )
2019 .await
2020 });
2021 ExitCode::from(exit as u8)
2022}
2023
2024async fn run_repair_checksum_drift(
2026 workspace: &Path,
2027 version: &str,
2028 app: Option<&str>,
2029 database: Option<&str>,
2030 checksum_up: Option<&str>,
2031 checksum_down: Option<&str>,
2032) -> i32 {
2033 use djogi::config::DjogiConfig;
2034
2035 let config = match DjogiConfig::load_from_workspace(workspace) {
2036 Ok(c) => c,
2037 Err(e) => {
2038 eprintln!("djogi migrations repair checksum-drift: config load: {e}");
2039 return 1;
2040 }
2041 };
2042
2043 let db_name = resolve_database(database, &config);
2048 let url = match resolve_bucket_url(&config.database, &db_name) {
2049 Some(u) => u,
2050 None => {
2051 eprintln!(
2052 "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
2053 );
2054 return 2;
2055 }
2056 };
2057
2058 let mut ctx = match connect_and_check(&url).await {
2059 ContextOutcome::Ready(ctx) => ctx,
2060 ContextOutcome::UnsupportedVersion(e) => {
2061 crate::print_support_boundary_error("migrations repair checksum-drift", &e);
2062 return 2;
2063 }
2064 ContextOutcome::RuntimeError(msg) => {
2065 eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
2066 return 1;
2067 }
2068 };
2069
2070 let lock_path = workspace.join(LOCK_FILE_NAME);
2071 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2072 Ok(g) => g,
2073 Err(e) => {
2074 eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
2075 return 1;
2076 }
2077 };
2078
2079 let app_label = app.unwrap_or("");
2080 let bucket = BucketKey {
2081 database: db_name,
2082 app: app_label.to_string(),
2083 };
2084
2085 let new_checksum_up = match checksum_up {
2086 Some(c) => c.to_string(),
2087 None => {
2088 match compute_checksum_up_from_disk(workspace, &bucket, version) {
2094 Ok(cs) => cs,
2095 Err(e) => {
2096 eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
2097 return 1;
2098 }
2099 }
2100 }
2101 };
2102
2103 let resolved_checksum_down = match checksum_down {
2104 Some(c) => Some(c.to_string()),
2105 None => {
2106 match compute_checksum_down_from_disk(workspace, &bucket, version) {
2111 Ok(cs_opt) => cs_opt,
2112 Err(e) => {
2113 eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
2114 return 1;
2115 }
2116 }
2117 }
2118 };
2119
2120 match repair_checksum_drift(
2121 &mut ctx,
2122 &guard,
2123 &bucket,
2124 version,
2125 &new_checksum_up,
2126 resolved_checksum_down.as_deref(),
2127 RepairConfirmation::OperatorAcknowledged,
2128 )
2129 .await
2130 {
2131 Ok(report) => {
2132 render_repair_report(&report);
2133 0
2134 }
2135 Err(e) => {
2136 eprintln!("djogi migrations repair checksum-drift: {e}");
2137 repair_error_exit_code(&e)
2138 }
2139 }
2140}
2141
2142pub fn repair_partial_apply_cmd(
2148 version: &str,
2149 resolution: PartialApplyResolution,
2150 note: &str,
2151 app: Option<&str>,
2152 database: Option<&str>,
2153 workspace: Option<PathBuf>,
2154) -> ExitCode {
2155 let workspace = resolve_workspace(workspace);
2156 let runtime = match tokio::runtime::Builder::new_current_thread()
2157 .enable_all()
2158 .build()
2159 {
2160 Ok(r) => r,
2161 Err(e) => {
2162 eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
2163 return ExitCode::from(1);
2164 }
2165 };
2166 let exit = runtime.block_on(async {
2167 run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
2168 });
2169 ExitCode::from(exit as u8)
2170}
2171
2172async fn run_repair_partial_apply(
2174 workspace: &Path,
2175 version: &str,
2176 resolution: PartialApplyResolution,
2177 note: &str,
2178 app: Option<&str>,
2179 database: Option<&str>,
2180) -> i32 {
2181 use djogi::config::DjogiConfig;
2182
2183 let config = match DjogiConfig::load_from_workspace(workspace) {
2184 Ok(c) => c,
2185 Err(e) => {
2186 eprintln!("djogi migrations repair partial-apply: config load: {e}");
2187 return 1;
2188 }
2189 };
2190
2191 let db_name = resolve_database(database, &config);
2196 let url = match resolve_bucket_url(&config.database, &db_name) {
2197 Some(u) => u,
2198 None => {
2199 eprintln!(
2200 "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
2201 );
2202 return 2;
2203 }
2204 };
2205
2206 let mut ctx = match connect_and_check(&url).await {
2207 ContextOutcome::Ready(ctx) => ctx,
2208 ContextOutcome::UnsupportedVersion(e) => {
2209 crate::print_support_boundary_error("migrations repair partial-apply", &e);
2210 return 2;
2211 }
2212 ContextOutcome::RuntimeError(msg) => {
2213 eprintln!("djogi migrations repair partial-apply: pool: {msg}");
2214 return 1;
2215 }
2216 };
2217
2218 let lock_path = workspace.join(LOCK_FILE_NAME);
2219 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2220 Ok(g) => g,
2221 Err(e) => {
2222 eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
2223 return 1;
2224 }
2225 };
2226
2227 let app_label = app.unwrap_or("");
2228 let bucket = BucketKey {
2229 database: db_name,
2230 app: app_label.to_string(),
2231 };
2232
2233 match repair_partial_apply(
2234 &mut ctx,
2235 &guard,
2236 &bucket,
2237 version,
2238 resolution,
2239 note,
2240 RepairConfirmation::OperatorAcknowledged,
2241 )
2242 .await
2243 {
2244 Ok(report) => {
2245 render_repair_report(&report);
2246 0
2247 }
2248 Err(e) => {
2249 eprintln!("djogi migrations repair partial-apply: {e}");
2250 repair_error_exit_code(&e)
2251 }
2252 }
2253}
2254
2255pub fn repair_resume_partial_apply_cmd(
2262 version: &str,
2263 app: Option<&str>,
2264 database: Option<&str>,
2265 workspace: Option<PathBuf>,
2266) -> ExitCode {
2267 let workspace = resolve_workspace(workspace);
2268 let runtime = match tokio::runtime::Builder::new_current_thread()
2269 .enable_all()
2270 .build()
2271 {
2272 Ok(r) => r,
2273 Err(e) => {
2274 eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
2275 return ExitCode::from(1);
2276 }
2277 };
2278 let exit = runtime
2279 .block_on(async { run_repair_resume_partial(&workspace, version, app, database).await });
2280 ExitCode::from(exit as u8)
2281}
2282
2283async fn run_repair_resume_partial(
2285 workspace: &Path,
2286 version: &str,
2287 app: Option<&str>,
2288 database: Option<&str>,
2289) -> i32 {
2290 use djogi::config::DjogiConfig;
2291
2292 let config = match DjogiConfig::load_from_workspace(workspace) {
2293 Ok(c) => c,
2294 Err(e) => {
2295 eprintln!("djogi migrations repair resume-partial: config load: {e}");
2296 return 1;
2297 }
2298 };
2299
2300 let db_name = resolve_database(database, &config);
2305 let url = match resolve_bucket_url(&config.database, &db_name) {
2306 Some(u) => u,
2307 None => {
2308 eprintln!(
2309 "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
2310 );
2311 return 2;
2312 }
2313 };
2314
2315 let mut ctx = match connect_and_check(&url).await {
2316 ContextOutcome::Ready(ctx) => ctx,
2317 ContextOutcome::UnsupportedVersion(e) => {
2318 crate::print_support_boundary_error("migrations repair resume-partial", &e);
2319 return 2;
2320 }
2321 ContextOutcome::RuntimeError(msg) => {
2322 eprintln!("djogi migrations repair resume-partial: pool: {msg}");
2323 return 1;
2324 }
2325 };
2326
2327 let lock_path = workspace.join(LOCK_FILE_NAME);
2328 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2329 Ok(g) => g,
2330 Err(e) => {
2331 eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
2332 return 1;
2333 }
2334 };
2335
2336 let app_label = app.unwrap_or("");
2337 let bucket = BucketKey {
2338 database: db_name,
2339 app: app_label.to_string(),
2340 };
2341
2342 let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
2348 Ok(p) => p,
2349 Err(e) => {
2350 eprintln!("djogi migrations repair resume-partial: load plan: {e}");
2351 return 2;
2352 }
2353 };
2354
2355 match repair_resume_partial_apply(
2356 &mut ctx,
2357 &guard,
2358 version,
2359 &plan,
2360 RepairConfirmation::OperatorAcknowledged,
2361 )
2362 .await
2363 {
2364 Ok(report) => {
2365 render_repair_report(&report);
2366 0
2367 }
2368 Err(e) => {
2369 eprintln!("djogi migrations repair resume-partial: {e}");
2370 repair_error_exit_code(&e)
2371 }
2372 }
2373}
2374
2375fn load_committed_plan_for_resume(
2391 workspace: &Path,
2392 bucket: &djogi::migrate::BucketKey,
2393 version: &str,
2394) -> Result<djogi::migrate::MigrationPlan, String> {
2395 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
2396 let plan_path = bucket_dir.join(format!("{version}.plan.json"));
2397 let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
2398 let stored: CliReplayPlan = serde_json::from_slice(&bytes)
2399 .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
2400 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
2401 return Err(format!(
2402 "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
2403 plan_path.display(),
2404 stored.format_version,
2405 ));
2406 }
2407 Ok(djogi::migrate::MigrationPlan {
2408 bucket: bucket.clone(),
2409 classification: stored.classification.into(),
2410 segments: stored
2411 .segments
2412 .into_iter()
2413 .map(|seg| djogi::migrate::Segment {
2414 kind: seg.kind.into(),
2415 statements: seg
2416 .statements
2417 .into_iter()
2418 .map(|stmt| djogi::migrate::OperationSql {
2419 label: stmt.label,
2420 up: stmt.up,
2421 down: String::new(),
2422 lossy: None,
2423 })
2424 .collect(),
2425 })
2426 .collect(),
2427 })
2428}
2429
2430pub fn repair_snapshot_rebuild_cmd(
2437 app: Option<&str>,
2438 database: Option<&str>,
2439 snapshot_path: Option<&Path>,
2440 workspace: Option<PathBuf>,
2441) -> ExitCode {
2442 let workspace = resolve_workspace(workspace);
2443 let runtime = match tokio::runtime::Builder::new_current_thread()
2444 .enable_all()
2445 .build()
2446 {
2447 Ok(r) => r,
2448 Err(e) => {
2449 eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
2450 return ExitCode::from(1);
2451 }
2452 };
2453 let exit = runtime.block_on(async {
2454 run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
2455 });
2456 ExitCode::from(exit as u8)
2457}
2458
2459async fn run_repair_snapshot_rebuild(
2461 workspace: &Path,
2462 app: Option<&str>,
2463 database: Option<&str>,
2464 snapshot_path: Option<&Path>,
2465) -> i32 {
2466 use djogi::config::DjogiConfig;
2467
2468 let config = match DjogiConfig::load_from_workspace(workspace) {
2469 Ok(c) => c,
2470 Err(e) => {
2471 eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
2472 return 1;
2473 }
2474 };
2475
2476 let db_name = resolve_database(database, &config);
2481 let url = match resolve_bucket_url(&config.database, &db_name) {
2482 Some(u) => u,
2483 None => {
2484 eprintln!(
2485 "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
2486 );
2487 return 2;
2488 }
2489 };
2490
2491 let mut ctx = match connect_and_check(&url).await {
2492 ContextOutcome::Ready(ctx) => ctx,
2493 ContextOutcome::UnsupportedVersion(e) => {
2494 crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
2495 return 2;
2496 }
2497 ContextOutcome::RuntimeError(msg) => {
2498 eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
2499 return 1;
2500 }
2501 };
2502
2503 let lock_path = workspace.join(LOCK_FILE_NAME);
2504 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2505 Ok(g) => g,
2506 Err(e) => {
2507 eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
2508 return 1;
2509 }
2510 };
2511
2512 let app_label = app.unwrap_or("");
2513 let bucket = BucketKey {
2514 database: db_name,
2515 app: app_label.to_string(),
2516 };
2517
2518 let snap_path = match snapshot_path {
2519 Some(p) => p.to_path_buf(),
2520 None => reconstruct_snapshot_path(workspace, &bucket),
2521 };
2522
2523 match repair_snapshot_rebuild(
2524 &mut ctx,
2525 &guard,
2526 &bucket,
2527 &snap_path,
2528 RepairConfirmation::OperatorAcknowledged,
2529 )
2530 .await
2531 {
2532 Ok(report) => {
2533 render_repair_report(&report);
2534 0
2535 }
2536 Err(e) => {
2537 eprintln!("djogi migrations repair snapshot-rebuild: {e}");
2538 repair_error_exit_code(&e)
2539 }
2540 }
2541}
2542
2543pub fn baseline_cmd(
2564 version: &str,
2565 description: &str,
2566 reason: &str,
2567 app: Option<&str>,
2568 database: Option<&str>,
2569 workspace: Option<PathBuf>,
2570) -> ExitCode {
2571 if reason.trim().is_empty() {
2576 eprintln!(
2577 "djogi migrations baseline: --reason must not be empty; \
2578 supply a non-empty reason why this baseline is being established \
2579 (e.g. 'schema pre-exists from prior tooling'). \
2580 This is recorded in the ledger audit trail."
2581 );
2582 return ExitCode::from(2);
2583 }
2584
2585 let workspace = resolve_workspace(workspace);
2586 let runtime = match tokio::runtime::Builder::new_current_thread()
2587 .enable_all()
2588 .build()
2589 {
2590 Ok(r) => r,
2591 Err(e) => {
2592 eprintln!("djogi migrations baseline: tokio runtime: {e}");
2593 return ExitCode::from(1);
2594 }
2595 };
2596 let exit = runtime.block_on(async {
2597 run_baseline(&workspace, version, description, reason, app, database).await
2598 });
2599 ExitCode::from(exit as u8)
2600}
2601
2602async fn run_baseline(
2614 workspace: &Path,
2615 version: &str,
2616 description: &str,
2617 reason: &str,
2618 app: Option<&str>,
2619 database: Option<&str>,
2620) -> i32 {
2621 use djogi::config::DjogiConfig;
2622
2623 let config = match DjogiConfig::load_from_workspace(workspace) {
2624 Ok(c) => c,
2625 Err(e) => {
2626 eprintln!("djogi migrations baseline: config load: {e}");
2627 return 1;
2628 }
2629 };
2630
2631 let db_name = resolve_database(database, &config);
2636 let url = match resolve_bucket_url(&config.database, &db_name) {
2637 Some(u) => u,
2638 None => {
2639 eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
2640 return 2;
2641 }
2642 };
2643
2644 let mut ctx = match connect_and_check(&url).await {
2645 ContextOutcome::Ready(ctx) => ctx,
2646 ContextOutcome::UnsupportedVersion(e) => {
2647 crate::print_support_boundary_error("migrations baseline", &e);
2648 return 2;
2649 }
2650 ContextOutcome::RuntimeError(msg) => {
2651 eprintln!("djogi migrations baseline: pool: {msg}");
2652 return 1;
2653 }
2654 };
2655
2656 let lock_path = workspace.join(LOCK_FILE_NAME);
2657 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2658 Ok(g) => g,
2659 Err(e) => {
2660 eprintln!("djogi migrations baseline: workspace lock: {e}");
2661 return 1;
2662 }
2663 };
2664
2665 let app_label = app.unwrap_or("");
2666 let bucket = BucketKey {
2667 database: db_name,
2668 app: app_label.to_string(),
2669 };
2670
2671 let runner_ctx = RunnerCtx {
2672 bucket: bucket.clone(),
2673 version: version.to_string(),
2674 description: description.to_string(),
2675 checksum_up: String::new(),
2678 checksum_down: None,
2679 snapshot: None,
2683 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
2684 config: djogi::config::MigrateConfig {
2687 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
2688 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
2689 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
2690 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
2691 },
2692 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
2693 audit_pool: match djogi::migrate::resolve_audit_url(&config) {
2694 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
2695 Err(_) => None,
2696 },
2697 };
2698
2699 match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
2700 Ok(report) => {
2701 println!(
2702 "djogi migrations baseline: established baseline `{}` \
2703 (ledger_id={}) in {:.1}s",
2704 version,
2705 report.ledger_id,
2706 report.execution_time_ms as f64 / 1000.0
2707 );
2708 0
2709 }
2710 Err(e) => {
2711 eprintln!("djogi migrations baseline: {e}");
2712 baseline_error_exit_code(&e)
2713 }
2714 }
2715}
2716
2717fn baseline_error_exit_code(err: &RunnerError) -> i32 {
2737 match err {
2738 RunnerError::VersionAlreadyApplied { .. }
2759 | RunnerError::VersionCollisionNonTerminal { .. }
2760 | RunnerError::BaselineSnapshotShouldNotBeProvided
2761 | RunnerError::AdvisoryUnlockReturnedFalse { .. }
2762 | RunnerError::SnapshotPersistFailed { .. }
2763 | RunnerError::OutOfOrderRejected { .. } => 2,
2764 _ => 1,
2769 }
2770}
2771
2772#[cfg(test)]
2773mod tests {
2774 use super::*;
2775 use std::fs;
2776 use std::sync::atomic::{AtomicUsize, Ordering};
2777
2778 fn temp_workspace(tag: &str) -> std::path::PathBuf {
2779 static COUNTER: AtomicUsize = AtomicUsize::new(0);
2780 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2781 let nanos = std::time::SystemTime::now()
2782 .duration_since(std::time::UNIX_EPOCH)
2783 .unwrap()
2784 .as_nanos();
2785 let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
2786 fs::create_dir_all(&p).unwrap();
2787 p
2788 }
2789
2790 #[test]
2794 fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
2795 let work = temp_workspace("b1_discover");
2796 let billing_dir = work.join("migrations/main/billing");
2801 fs::create_dir_all(&billing_dir).unwrap();
2802 fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
2803 let global_dir = work.join("migrations/main/_global_");
2806 fs::create_dir_all(&global_dir).unwrap();
2807 fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
2808 let no_snap_dir = work.join("migrations/main/empty_app");
2812 fs::create_dir_all(&no_snap_dir).unwrap();
2813
2814 let buckets = discover_snapshot_buckets_on_disk(&work);
2815 let labels: std::collections::BTreeSet<&str> =
2816 buckets.iter().map(|b| b.app.as_str()).collect();
2817 assert!(
2818 labels.contains("billing"),
2819 "must include the renamed-from bucket: {labels:?}"
2820 );
2821 assert!(
2822 labels.contains(""),
2823 "must include the global bucket: {labels:?}"
2824 );
2825 assert!(
2826 !labels.contains("empty_app"),
2827 "must not include directories without a snapshot: {labels:?}"
2828 );
2829 let _ = fs::remove_dir_all(&work);
2830 }
2831
2832 #[test]
2837 fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
2838 let work = temp_workspace("a1_workspace_config");
2839 let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
2840 max_connections = 1\ndev_mode = false\n\
2841 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
2842 fs::write(work.join("Djogi.toml"), toml).unwrap();
2843 let prior = std::env::var("DATABASE_URL").ok();
2846 unsafe { std::env::remove_var("DATABASE_URL") };
2850 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
2851 assert_eq!(
2852 config.database.url,
2853 "postgres://discovered-by-workspace-flag/test"
2854 );
2855 assert_eq!(config.server.port, 1234);
2856 if let Some(v) = prior {
2857 unsafe { std::env::set_var("DATABASE_URL", v) };
2858 }
2859 let _ = fs::remove_dir_all(&work);
2860 }
2861
2862 #[test]
2867 fn a1_round2_env_override_beats_workspace_toml() {
2868 let work = temp_workspace("a1r2_env_override");
2869 let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
2870 max_connections = 1\ndev_mode = false\n\
2871 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
2872 fs::write(work.join("Djogi.toml"), toml).unwrap();
2873 let prior = std::env::var("DATABASE_URL").ok();
2874 unsafe { std::env::set_var("DATABASE_URL", "postgres://from-env/test") };
2876 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
2877 assert_eq!(
2878 config.database.url, "postgres://from-env/test",
2879 "env DATABASE_URL must win over workspace Djogi.toml"
2880 );
2881 match prior {
2882 Some(v) => unsafe { std::env::set_var("DATABASE_URL", v) },
2883 None => unsafe { std::env::remove_var("DATABASE_URL") },
2884 }
2885 let _ = fs::remove_dir_all(&work);
2886 }
2887
2888 #[test]
2901 fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
2902 use djogi::migrate::projection::BucketKey;
2903 use djogi::migrate::schema::{
2904 ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
2905 };
2906 use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
2907 use std::collections::BTreeMap;
2908
2909 let work = temp_workspace("b1r2_compose_uses_discovery");
2910
2911 let billing_bucket = BucketKey {
2914 database: "main".into(),
2915 app: "billing".into(),
2916 };
2917 let mut billing_snap = AppliedSchema {
2918 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
2919 enums: BTreeMap::new(),
2920 format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
2921 generated_at: "2026-04-25T00:00:00Z".to_string(),
2922 indexes: Vec::new(),
2923 models: BTreeMap::new(),
2924 registered_apps: vec!["billing".to_string()],
2925 };
2926 billing_snap.models.insert(
2927 "widgets".to_string(),
2928 TableSchema {
2929 app: Some("billing".to_string()),
2930 columns: vec![ColumnSchema {
2931 check: None,
2932 comment: None,
2933 default_sql: Some("heerid_next_desc()".to_string()),
2934 foreign_key: None,
2935 generated: None,
2936 identity: None,
2937 index_type: None,
2938 indexed: false,
2939 max_length: None,
2940 name: "id".to_string(),
2941 nullable: false,
2942 on_delete: None,
2943 outbox_exclude: false,
2944 rationale: None,
2945 relation_kind: None,
2946 renamed_from: None,
2947 sequence_within: None,
2948 sql_type: "BIGINT".to_string(),
2949 unique: false,
2950 type_change_using: None,
2951 }],
2952 exclusion_constraints: Vec::new(),
2953 fts: None,
2954 is_through: false,
2955 moved_from_app: None,
2956 partition: None,
2957 primary_key: PrimaryKeySchema {
2958 columns: vec!["id".to_string()],
2959 kind: PkKindSchema::HeerIdRecencyBiased,
2960 },
2961 rationale: None,
2962 renamed_from: None,
2963 rls_enabled: false,
2964 table: "widgets".to_string(),
2965 table_comment: None,
2966 storage_params: None,
2967 tablespace: None,
2968 tenant_key: None,
2969 },
2970 );
2971 let snap_path = snapshot_path(&work, &billing_bucket);
2972 save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
2973
2974 let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
2978 let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
2979
2980 let exit = compose_with_inputs(
2981 &work,
2982 "drop billing remnant",
2983 true, false, &empty_models,
2986 &[AppLifecycle {
2987 label: "billing".to_string(),
2988 database: "main".to_string(),
2989 renamed_from: None,
2990 tombstone: true, }],
2992 now,
2993 None, );
2995 assert_eq!(exit, ExitCode::from(0), "compose must succeed");
2996
2997 let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
3000 let mut up_path: Option<PathBuf> = None;
3001 for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
3002 let n = entry.file_name().to_string_lossy().to_string();
3003 if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
3006 up_path = Some(entry.path());
3007 break;
3008 }
3009 }
3010 let up_path = up_path.expect("compose must have written an up SQL file");
3011 let up_sql = fs::read_to_string(&up_path).unwrap();
3012 assert!(
3013 up_sql.contains("DROP TABLE \"widgets\""),
3014 "compose must have seen the disk snapshot and emitted DROP TABLE — \
3015 this proves discover_snapshot_buckets_on_disk reached the differ. \
3016 SQL: {up_sql}"
3017 );
3018 let _ = fs::remove_dir_all(&work);
3019 }
3020
3021 #[test]
3029 fn a1_round2_status_cmd_threads_workspace_to_config() {
3030 let work = temp_workspace("a1r2_status_workspace");
3031 fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
3036 let exit = status_cmd(Some(work.clone()));
3037 assert_eq!(
3038 exit,
3039 ExitCode::from(1),
3040 "malformed workspace Djogi.toml must surface as config load error"
3041 );
3042 let _ = fs::remove_dir_all(&work);
3043 }
3044
3045 #[test]
3054 fn u3_attune_refusal_variants_map_to_exit_code_two() {
3055 use djogi::migrate::AttuneRefusal;
3056 let cases = [
3057 AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
3058 database_url: "postgres://prod.example.com/main".to_string(),
3059 }),
3060 AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
3061 profile: "production".to_string(),
3062 }),
3063 AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
3067 AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
3068 env_value: "production".to_string(),
3069 }),
3070 AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
3071 version: "V20260101000000__missing".to_string(),
3072 }),
3073 AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
3074 version: "V20260101000000__shared".to_string(),
3075 buckets: vec!["main/users".to_string(), "main/billing".to_string()],
3076 }),
3077 ];
3078 for err in &cases {
3079 assert_eq!(
3080 attune_error_exit_code(err),
3081 2,
3082 "refusal variant must map to exit 2: {err}"
3083 );
3084 }
3085 }
3086
3087 #[test]
3092 fn u3_attune_runtime_variants_map_to_exit_code_one() {
3093 let cases = [
3094 AttuneError::FilesystemScanFailed {
3095 source: std::io::Error::other("disk full"),
3096 },
3097 AttuneError::SqlReadFailed {
3098 path: PathBuf::from("/tmp/x.sdjql"),
3099 source: std::io::Error::other("permission denied"),
3100 },
3101 AttuneError::SqlWriteFailed {
3102 path: PathBuf::from("/tmp/x.sdjql"),
3103 source: std::io::Error::other("read-only fs"),
3104 },
3105 AttuneError::SqlDeleteFailed {
3106 path: PathBuf::from("/tmp/x.sdjql"),
3107 source: std::io::Error::other("not found"),
3108 },
3109 AttuneError::GitPublishFailed {
3110 stderr: "fatal: refusing to push".to_string(),
3111 status_code: Some(128),
3112 },
3113 ];
3114 for err in &cases {
3115 assert_eq!(
3116 attune_error_exit_code(err),
3117 1,
3118 "runtime variant must map to exit 1: {err}"
3119 );
3120 }
3121 }
3122
3123 #[test]
3131 fn baseline_empty_reason_exits_code_2() {
3132 let result = baseline_cmd(
3133 "V00000000000000__baseline",
3134 "description",
3135 "",
3136 None,
3137 None,
3138 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3139 );
3140 assert_eq!(
3141 result,
3142 ExitCode::from(2),
3143 "empty --reason must exit 2 before any DB work"
3144 );
3145 }
3146
3147 #[test]
3148 fn baseline_whitespace_reason_exits_code_2() {
3149 let result = baseline_cmd(
3150 "V00000000000000__baseline",
3151 "description",
3152 " ",
3153 None,
3154 None,
3155 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3156 );
3157 assert_eq!(
3158 result,
3159 ExitCode::from(2),
3160 "whitespace-only --reason must exit 2 before any DB work"
3161 );
3162 }
3163
3164 #[test]
3170 fn baseline_refusal_variants_map_to_exit_code_two() {
3171 let cases = [
3172 RunnerError::VersionAlreadyApplied {
3173 version: "V00000000000000__baseline".to_string(),
3174 applied_at: None,
3175 },
3176 RunnerError::VersionCollisionNonTerminal {
3177 version: "V00000000000000__baseline".to_string(),
3178 status: LedgerStatus::Pending,
3179 run_id: 1,
3180 },
3181 RunnerError::BaselineSnapshotShouldNotBeProvided,
3182 RunnerError::AdvisoryUnlockReturnedFalse {
3183 bucket: BucketKey {
3184 database: "main".to_string(),
3185 app: String::new(),
3186 },
3187 key: 0x0102_0304_0506_0708,
3188 },
3189 RunnerError::OutOfOrderRejected {
3190 version: "V00000000000000__baseline".to_string(),
3191 conflicting_version: "V20260101000000__later".to_string(),
3192 conflicting_applied_at: None,
3193 },
3194 ];
3195 for err in &cases {
3196 assert_eq!(
3197 baseline_error_exit_code(err),
3198 2,
3199 "baseline refusal variant must map to exit 2: {err}"
3200 );
3201 }
3202 }
3203
3204 #[test]
3210 fn baseline_transient_variants_map_to_exit_code_one() {
3211 use djogi::error::{DbError, DjogiError};
3212 let cases = [
3213 RunnerError::LedgerBootstrapFailed {
3214 source: DjogiError::Db(DbError::other("create table failed")),
3215 },
3216 RunnerError::LedgerWriteFailed {
3217 version: "V00000000000000__baseline".to_string(),
3218 source: DjogiError::Db(DbError::other("insert failed")),
3219 },
3220 RunnerError::PinnedSessionCheckoutFailed {
3221 source: DjogiError::Db(DbError::other("pool exhausted")),
3222 },
3223 RunnerError::AdvisoryLockFailed {
3224 bucket: BucketKey {
3225 database: "main".to_string(),
3226 app: String::new(),
3227 },
3228 key: 0x0102_0304_0506_0708,
3229 attempts: 3,
3230 },
3231 ];
3232 for err in &cases {
3233 assert_eq!(
3234 baseline_error_exit_code(err),
3235 1,
3236 "baseline transient variant must map to exit 1: {err}"
3237 );
3238 }
3239 }
3240
3241 #[test]
3245 fn fake_without_reason_exits_code_2() {
3246 let result = apply_cmd(
3247 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3248 true,
3249 None,
3250 );
3251 assert_eq!(
3252 result,
3253 ExitCode::from(2),
3254 "--fake without --reason must exit 2"
3255 );
3256 }
3257
3258 #[test]
3260 fn fake_with_empty_reason_exits_code_2() {
3261 let result = apply_cmd(
3262 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3263 true,
3264 Some(String::new()),
3265 );
3266 assert_eq!(
3267 result,
3268 ExitCode::from(2),
3269 "--fake with empty reason must exit 2"
3270 );
3271 }
3272
3273 #[test]
3275 fn fake_with_whitespace_reason_exits_code_2() {
3276 let result = apply_cmd(
3277 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3278 true,
3279 Some(" ".to_string()),
3280 );
3281 assert_eq!(
3282 result,
3283 ExitCode::from(2),
3284 "--fake with whitespace reason must exit 2"
3285 );
3286 }
3287
3288 #[test]
3290 fn reason_without_fake_is_accepted() {
3291 let result = apply_cmd(
3295 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3296 false, Some("test reason".to_string()),
3298 );
3299 assert_ne!(
3301 result,
3302 ExitCode::from(2),
3303 "--reason without --fake should not refuse"
3304 );
3305 }
3306
3307 fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
3315 djogi::migrate::BucketKey {
3316 database: database.to_string(),
3317 app: app.to_string(),
3318 }
3319 }
3320
3321 fn diag(
3323 code: &str,
3324 severity: djogi::migrate::VerifySeverity,
3325 message: &str,
3326 location: Option<&str>,
3327 ) -> djogi::migrate::VerifyDiagnostic {
3328 djogi::migrate::VerifyDiagnostic {
3329 code: code.to_string(),
3330 severity,
3331 message: message.to_string(),
3332 location: location.map(str::to_string),
3333 }
3334 }
3335
3336 #[test]
3337 fn render_verify_report_clean_output() {
3338 use djogi::migrate::VerifyReport;
3339
3340 let report = VerifyReport {
3341 diagnostics: vec![],
3342 latest_applied_version: Some("001_initial".to_string()),
3343 applied_count: 3,
3344 unfinished_count: 0,
3345 };
3346 let bucket = render_bucket("main", "");
3347
3348 let lines = render_verify_report(&report, &bucket);
3349
3350 assert!(
3351 lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
3352 "missing ledger line; got {lines:?}"
3353 );
3354 assert!(
3355 lines.contains(&"No drift detected. Schema is consistent.".to_string()),
3356 "missing clean line; got {lines:?}"
3357 );
3358 assert!(
3359 lines.iter().any(|l| l.contains("Result: PASSED")),
3360 "missing PASSED result; got {lines:?}"
3361 );
3362 assert!(
3363 !lines.iter().any(|l| l.contains("FAILED")),
3364 "clean report must not say FAILED; got {lines:?}"
3365 );
3366 }
3367
3368 #[test]
3369 fn render_verify_report_with_errors() {
3370 use djogi::migrate::{VerifyReport, VerifySeverity};
3371
3372 let report = VerifyReport {
3375 diagnostics: vec![
3376 diag(
3377 "D601",
3378 VerifySeverity::Error,
3379 "Snapshot table missing from live DB",
3380 Some("users"),
3381 ),
3382 diag(
3383 "D611",
3384 VerifySeverity::Warning,
3385 "Live index not present in snapshot",
3386 Some("idx_posts_created"),
3387 ),
3388 ],
3389 latest_applied_version: Some("V20260501000000__add_users".to_string()),
3390 applied_count: 2,
3391 unfinished_count: 0,
3392 };
3393 let bucket = render_bucket("main", "myapp");
3394
3395 assert!(report.has_errors());
3396 let lines = render_verify_report(&report, &bucket);
3397
3398 assert!(
3399 lines
3400 .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
3401 "missing D601 line; got {lines:?}"
3402 );
3403 assert!(
3404 lines.contains(
3405 &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
3406 ),
3407 "missing D611 line; got {lines:?}"
3408 );
3409 assert!(
3410 lines.iter().any(|l| l.contains("Result: FAILED")),
3411 "error report must say FAILED; got {lines:?}"
3412 );
3413 }
3414
3415 #[test]
3416 fn render_verify_report_header_shows_global_and_named_app() {
3417 use djogi::migrate::VerifyReport;
3418
3419 let report = VerifyReport {
3420 diagnostics: vec![],
3421 latest_applied_version: None,
3422 applied_count: 0,
3423 unfinished_count: 0,
3424 };
3425
3426 let global = render_verify_report(&report, &render_bucket("main", ""));
3428 assert_eq!(
3429 global.first().map(String::as_str),
3430 Some("djogi migrations verify — main/_global_"),
3431 "global bucket header; got {global:?}"
3432 );
3433
3434 let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
3436 assert_eq!(
3437 named.first().map(String::as_str),
3438 Some("djogi migrations verify — crud_log/billing"),
3439 "named bucket header; got {named:?}"
3440 );
3441 }
3442
3443 #[test]
3444 fn render_verify_report_warning_only_passes_with_warnings() {
3445 use djogi::migrate::{VerifyReport, VerifySeverity};
3446
3447 let report = VerifyReport {
3448 diagnostics: vec![diag(
3449 "D606",
3450 VerifySeverity::Warning,
3451 "type differs (advisory)",
3452 Some("users.age"),
3453 )],
3454 latest_applied_version: Some("001_initial".to_string()),
3455 applied_count: 1,
3456 unfinished_count: 0,
3457 };
3458 let lines = render_verify_report(&report, &render_bucket("main", ""));
3459
3460 assert!(
3461 lines
3462 .iter()
3463 .any(|l| l.contains("Result: PASSED with warnings")),
3464 "warning-only must PASS with warnings; got {lines:?}"
3465 );
3466 assert!(
3467 !lines.iter().any(|l| l.contains("FAILED")),
3468 "warning-only must not say FAILED; got {lines:?}"
3469 );
3470 }
3471
3472 #[test]
3473 fn render_verify_report_empty_ledger_line() {
3474 use djogi::migrate::VerifyReport;
3475
3476 let report = VerifyReport {
3477 diagnostics: vec![],
3478 latest_applied_version: None,
3479 applied_count: 0,
3480 unfinished_count: 0,
3481 };
3482 let lines = render_verify_report(&report, &render_bucket("main", ""));
3483
3484 assert!(
3485 lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
3486 "empty ledger line; got {lines:?}"
3487 );
3488 }
3489
3490 #[test]
3491 fn render_verify_report_unfinished_ledger_line() {
3492 use djogi::migrate::VerifyReport;
3493
3494 let report = VerifyReport {
3495 diagnostics: vec![],
3496 latest_applied_version: Some("V20260501000000__add_users".to_string()),
3497 applied_count: 2,
3498 unfinished_count: 1,
3499 };
3500 let lines = render_verify_report(&report, &render_bucket("main", ""));
3501
3502 assert!(
3503 lines.contains(
3504 &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
3505 ),
3506 "unfinished ledger line; got {lines:?}"
3507 );
3508 }
3509
3510 #[test]
3511 fn render_verify_report_info_with_no_location_uses_dash() {
3512 use djogi::migrate::{VerifyReport, VerifySeverity};
3513
3514 let report = VerifyReport {
3517 diagnostics: vec![diag(
3518 "D692",
3519 VerifySeverity::Info,
3520 "enum type(s) declared; not yet checked",
3521 None,
3522 )],
3523 latest_applied_version: Some("001_initial".to_string()),
3524 applied_count: 1,
3525 unfinished_count: 0,
3526 };
3527 let lines = render_verify_report(&report, &render_bucket("main", ""));
3528
3529 assert!(
3530 lines.iter().any(|l| l.contains("(-)")),
3531 "location: None must render as (-); got {lines:?}"
3532 );
3533 assert!(
3534 lines.contains(&"Result: PASSED (1 info(s))".to_string()),
3535 "all-info summary; got {lines:?}"
3536 );
3537 }
3538
3539 fn db_config(
3542 url: &str,
3543 crud_log_url: Option<&str>,
3544 event_log_url: Option<&str>,
3545 ) -> djogi::config::DatabaseConfig {
3546 djogi::config::DatabaseConfig {
3547 url: url.to_string(),
3548 crud_log_url: crud_log_url.map(str::to_string),
3549 event_log_url: event_log_url.map(str::to_string),
3550 max_connections: None,
3551 dev_mode: false,
3552 }
3553 }
3554
3555 #[test]
3556 fn resolve_bucket_url_main_uses_app_url_verbatim() {
3557 let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
3561 assert_eq!(
3562 resolve_bucket_url(&cfg, "main").as_deref(),
3563 Some("postgres://user:pass@localhost:5432/myapp_prod"),
3564 "main must return the app URL unchanged"
3565 );
3566 }
3567
3568 #[test]
3569 fn resolve_bucket_url_crud_log_prefers_explicit_url() {
3570 let cfg = db_config(
3571 "postgres://localhost/main",
3572 Some("postgres://localhost/explicit_crud"),
3573 None,
3574 );
3575 assert_eq!(
3576 resolve_bucket_url(&cfg, "crud_log").as_deref(),
3577 Some("postgres://localhost/explicit_crud"),
3578 "crud_log must prefer the explicit crud_log_url"
3579 );
3580 }
3581
3582 #[test]
3583 fn resolve_bucket_url_event_log_prefers_explicit_url() {
3584 let cfg = db_config(
3585 "postgres://localhost/main",
3586 None,
3587 Some("postgres://localhost/explicit_event"),
3588 );
3589 assert_eq!(
3590 resolve_bucket_url(&cfg, "event_log").as_deref(),
3591 Some("postgres://localhost/explicit_event"),
3592 "event_log must prefer the explicit event_log_url"
3593 );
3594 }
3595
3596 #[test]
3597 fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
3598 let cfg = db_config("postgres://localhost/main", Some(""), Some(" "));
3601 assert_eq!(
3603 resolve_bucket_url(&cfg, "crud_log").as_deref(),
3604 Some("postgres://localhost/crud_log"),
3605 "empty crud_log_url must fall back to derived"
3606 );
3607 assert_eq!(
3610 resolve_bucket_url(&cfg, "event_log").as_deref(),
3611 Some(" "),
3612 "non-empty (whitespace) event_log_url is used verbatim"
3613 );
3614 }
3615
3616 #[test]
3617 fn resolve_bucket_url_other_database_derives_from_app_url() {
3618 let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
3619 assert_eq!(
3620 resolve_bucket_url(&cfg, "analytics").as_deref(),
3621 Some("postgres://user:pass@localhost:5432/analytics"),
3622 "an arbitrary database name derives by path splice"
3623 );
3624 }
3625
3626 #[test]
3627 fn resolve_bucket_url_pathless_url_returns_none() {
3628 let cfg = db_config("postgres://localhost", None, None);
3630 assert_eq!(
3631 resolve_bucket_url(&cfg, "crud_log"),
3632 None,
3633 "pathless URL must yield None for a derived database"
3634 );
3635 }
3636
3637 #[test]
3638 fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
3639 let cfg = db_config("postgres://localhost", None, None);
3642 assert_eq!(
3643 resolve_bucket_url(&cfg, "main").as_deref(),
3644 Some("postgres://localhost"),
3645 "main returns the app URL verbatim regardless of path"
3646 );
3647 }
3648}