1use std::path::{Path, PathBuf};
10use std::process::ExitCode;
11
12use djogi::apps::AppRegistry;
13use djogi::migrate::{
14 AppLifecycle, AttuneError, AttuneMode, AttuneRequest, BucketKey, ComposeError, ComposeRequest,
15 DescriptorProvider, GUARD_DEFAULT_TIMEOUT, LOCK_FILE_NAME, PartialApplyResolution, PendingPlan,
16 RepairConfirmation, RepairError, RepairReport, RunnerCtx, RunnerError, SnapshotError,
17 VerifyReport, VerifySeverity, acquire_workspace_lock, apply_plan, attune, baseline_plan,
18 compose, fake_apply_plan, load_snapshot, project_from_provider, repair_checksum_drift,
19 repair_partial_apply, repair_resume_partial_apply, repair_snapshot_rebuild, snapshot_path,
20};
21
22use djogi::migrate::LedgerStatus;
24
25use crate::{PartialApplyResolutionCli, RepairSubcommand};
28
29#[derive(Debug, Clone, serde::Deserialize)]
37struct CliReplayPlan {
38 format_version: String,
39 checksum_up: String,
40 checksum_down: Option<String>,
41 classification: CliClassification,
42 segments: Vec<CliReplaySegment>,
43}
44
45#[derive(Debug, Clone, serde::Deserialize)]
46#[serde(tag = "kind", rename_all = "snake_case")]
47enum CliClassification {
48 NoOp,
49 Additive,
50 Reversible,
51 Destructive,
52 Lossy,
53 Unsupported {
54 reason: String,
55 },
56 PkTypeFlip {
57 co_destructive: bool,
58 co_lossy: bool,
59 },
60}
61
62#[derive(Debug, Clone, serde::Deserialize)]
63struct CliReplaySegment {
64 kind: CliSegmentKind,
65 statements: Vec<CliReplayStatement>,
66}
67
68#[derive(Debug, Clone, serde::Deserialize)]
69#[serde(rename_all = "snake_case")]
70enum CliSegmentKind {
71 Transactional,
72 NonTransactional,
73 MetadataOnly,
74}
75
76#[derive(Debug, Clone, serde::Deserialize)]
77struct CliReplayStatement {
78 label: String,
79 up: String,
80}
81
82const CLI_REPLAY_PLAN_FORMAT_VERSION: &str = "1";
84
85fn load_replay_plan_from_disk(
91 workspace: &Path,
92 bucket: &djogi::migrate::BucketKey,
93 version: &str,
94 pending_checksum_up: &str,
95 pending_checksum_down: Option<&str>,
96) -> Result<(djogi::migrate::MigrationPlan, String, Option<String>), ApplyReplayPlanError> {
97 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
99 let replay_plan_path = bucket_dir.join(format!("{version}.plan.json"));
100
101 if let Ok(bytes) = std::fs::read(&replay_plan_path) {
102 let stored: CliReplayPlan = match serde_json::from_slice(&bytes) {
103 Ok(s) => s,
104 Err(e) => {
105 return Err(ApplyReplayPlanError::Parse {
106 path: replay_plan_path.clone(),
107 source: e.to_string(),
108 });
109 }
110 };
111
112 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
113 return Err(ApplyReplayPlanError::FormatVersion {
114 found: stored.format_version,
115 path: replay_plan_path.clone(),
116 });
117 }
118
119 if stored.checksum_up != pending_checksum_up
121 || stored.checksum_down.as_deref() != pending_checksum_down
122 {
123 return Err(ApplyReplayPlanError::ChecksumMismatch);
124 }
125
126 let plan = djogi::migrate::MigrationPlan {
127 bucket: bucket.clone(),
128 classification: stored.classification.into(),
129 segments: stored
130 .segments
131 .into_iter()
132 .map(|seg| djogi::migrate::Segment {
133 kind: seg.kind.into(),
134 statements: seg
135 .statements
136 .into_iter()
137 .map(|stmt| djogi::migrate::OperationSql {
138 label: stmt.label,
139 up: stmt.up,
140 down: String::new(),
141 lossy: None,
142 })
143 .collect(),
144 })
145 .collect(),
146 };
147
148 return Ok((plan, stored.checksum_up, stored.checksum_down));
149 }
150
151 let up_filename = djogi::migrate::up_filename(version);
153 let down_filename = djogi::migrate::down_filename(version);
154 let up_path = bucket_dir.join(&up_filename);
155 let down_path = bucket_dir.join(&down_filename);
156
157 let up_sql = std::fs::read_to_string(&up_path).map_err(|e| ApplyReplayPlanError::SqlRead {
158 path: up_path.clone(),
159 source: e.to_string(),
160 })?;
161
162 let down_sql = match std::fs::read_to_string(&down_path) {
163 Ok(sql) => sql,
164 Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
165 Err(e) => {
166 return Err(ApplyReplayPlanError::SqlRead {
167 path: down_path.clone(),
168 source: e.to_string(),
169 });
170 }
171 };
172
173 let computed_checksum_up = djogi::migrate::compute_checksum([&up_sql]);
177
178 let plan = djogi::migrate::MigrationPlan {
182 bucket: bucket.clone(),
183 classification: djogi::migrate::Classification::Additive,
184 segments: vec![djogi::migrate::Segment {
185 kind: djogi::migrate::SegmentKind::Transactional,
186 statements: vec![djogi::migrate::OperationSql {
187 label: format!("replay {version}"),
188 up: up_sql,
189 down: down_sql,
190 lossy: None,
191 }],
192 }],
193 };
194
195 Ok((plan, computed_checksum_up, None))
196}
197
198#[derive(Debug)]
200enum ApplyReplayPlanError {
201 Parse { path: PathBuf, source: String },
202 FormatVersion { found: String, path: PathBuf },
203 ChecksumMismatch,
204 SqlRead { path: PathBuf, source: String },
205}
206
207impl std::fmt::Display for ApplyReplayPlanError {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 match self {
210 Self::Parse { path, source } => {
211 write!(f, "parse replay plan {}: {source}", path.display())
212 }
213 Self::FormatVersion { found, path } => write!(
214 f,
215 "replay plan format version mismatch in {}: expected {}, found {}",
216 path.display(),
217 CLI_REPLAY_PLAN_FORMAT_VERSION,
218 found
219 ),
220 Self::ChecksumMismatch => {
221 write!(f, "checksum mismatch between pending JSON and replay plan")
222 }
223 Self::SqlRead { path, source } => {
224 write!(f, "read SQL file {}: {source}", path.display())
225 }
226 }
227 }
228}
229
230impl std::error::Error for ApplyReplayPlanError {}
231
232impl From<CliSegmentKind> for djogi::migrate::SegmentKind {
235 fn from(kind: CliSegmentKind) -> Self {
236 match kind {
237 CliSegmentKind::Transactional => Self::Transactional,
238 CliSegmentKind::NonTransactional => Self::NonTransactional,
239 CliSegmentKind::MetadataOnly => Self::MetadataOnly,
240 }
241 }
242}
243
244impl From<CliClassification> for djogi::migrate::Classification {
245 fn from(classification: CliClassification) -> Self {
246 match classification {
247 CliClassification::NoOp => Self::NoOp,
248 CliClassification::Additive => Self::Additive,
249 CliClassification::Reversible => Self::Reversible,
250 CliClassification::Destructive => Self::Destructive,
251 CliClassification::Lossy => Self::Lossy,
252 CliClassification::Unsupported { reason } => Self::Unsupported { reason },
253 CliClassification::PkTypeFlip {
254 co_destructive,
255 co_lossy,
256 } => Self::PkTypeFlip {
257 co_destructive,
258 co_lossy,
259 },
260 }
261 }
262}
263
264fn resolve_workspace(workspace: Option<PathBuf>) -> PathBuf {
268 workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")))
269}
270
271fn discover_snapshot_buckets_on_disk(
283 workspace: &Path,
284) -> Vec<djogi::migrate::projection::BucketKey> {
285 let mut out = Vec::new();
286 let migrations_root = djogi::migrate::migrations_root(workspace);
287 let Ok(db_entries) = std::fs::read_dir(&migrations_root) else {
288 return out;
289 };
290 for db_entry in db_entries.flatten() {
291 let Ok(ft) = db_entry.file_type() else {
292 continue;
293 };
294 if !ft.is_dir() {
295 continue;
296 }
297 let Some(database) = db_entry.file_name().to_str().map(str::to_string) else {
298 continue;
299 };
300 let Ok(app_entries) = std::fs::read_dir(db_entry.path()) else {
301 continue;
302 };
303 for app_entry in app_entries.flatten() {
304 let Ok(ft) = app_entry.file_type() else {
305 continue;
306 };
307 if !ft.is_dir() {
308 continue;
309 }
310 let Some(dirname) = app_entry.file_name().to_str().map(str::to_string) else {
311 continue;
312 };
313 let snap_path = app_entry.path().join("schema_snapshot.json");
314 if !snap_path.exists() {
315 continue;
316 }
317 let label = djogi::migrate::app_label_from_dirname(&dirname).to_string();
318 out.push(djogi::migrate::projection::BucketKey {
319 database: database.clone(),
320 app: label,
321 });
322 }
323 }
324 out
325}
326
327pub fn compose_cmd(
329 provider: &dyn DescriptorProvider,
330 name: &str,
331 allow_destructive: bool,
332 force_overwrite: bool,
333 workspace: Option<PathBuf>,
334) -> ExitCode {
335 let workspace = resolve_workspace(workspace);
336 let models = match project_from_provider(provider) {
337 Ok(m) => m,
338 Err(e) => {
339 eprintln!("djogi migrations compose: projection error: {e}");
340 return ExitCode::from(1);
341 }
342 };
343 let apps: Vec<AppLifecycle> = provider
344 .apps()
345 .iter()
346 .map(|d| AppLifecycle {
347 label: d.label.to_string(),
348 database: d.database.to_string(),
349 renamed_from: d.renamed_from.map(str::to_string),
350 tombstone: d.tombstone,
351 })
352 .collect();
353 let djogi_config = match djogi::config::DjogiConfig::load_from_workspace(&workspace) {
358 Ok(c) => c,
359 Err(e) => {
360 eprintln!("djogi migrations compose: config load: {e}");
361 return ExitCode::from(1);
362 }
363 };
364 let pk_flip_option = djogi::migrate::PkFlipJoinTableOption::from_config_char(
365 djogi_config.migrate.pk_flip_join_table_option,
366 );
367 compose_with_inputs(
368 &workspace,
369 name,
370 allow_destructive,
371 force_overwrite,
372 &models,
373 &apps,
374 time::OffsetDateTime::now_utc(),
375 Some(pk_flip_option),
376 )
377}
378
379#[allow(clippy::too_many_arguments)]
394fn compose_with_inputs(
395 workspace: &Path,
396 name: &str,
397 allow_destructive: bool,
398 force_overwrite: bool,
399 models: &std::collections::BTreeMap<
400 djogi::migrate::projection::BucketKey,
401 djogi::migrate::AppliedSchema,
402 >,
403 apps: &[AppLifecycle],
404 now: time::OffsetDateTime,
405 pk_flip_join_table_option: Option<djogi::migrate::PkFlipJoinTableOption>,
406) -> ExitCode {
407 let lock_path = workspace.join(LOCK_FILE_NAME);
408 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
409 Ok(g) => g,
410 Err(e) => {
411 eprintln!("djogi migrations compose: failed to acquire workspace lock: {e}");
412 return ExitCode::from(1);
413 }
414 };
415
416 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::projection::BucketKey> =
424 models.keys().cloned().collect();
425 for bucket in discover_snapshot_buckets_on_disk(workspace) {
426 bucket_set.insert(bucket);
427 }
428
429 let mut snapshots: std::collections::BTreeMap<_, _> = std::collections::BTreeMap::new();
430 for bucket in &bucket_set {
431 let path = djogi::migrate::snapshot_path(workspace, bucket);
432 match djogi::migrate::load_snapshot(&path) {
433 Ok(s) => {
434 snapshots.insert(bucket.clone(), s);
435 }
436 Err(djogi::migrate::SnapshotError::Io { source, .. })
437 if source.kind() == std::io::ErrorKind::NotFound =>
438 {
439 }
441 Err(e) => {
442 eprintln!(
443 "djogi migrations compose: snapshot load failed at {}: {e}",
444 path.display()
445 );
446 return ExitCode::from(1);
447 }
448 }
449 }
450
451 let req = ComposeRequest {
452 workspace_root: workspace,
453 models,
454 snapshots: &snapshots,
455 apps,
456 name,
457 allow_destructive,
458 force_overwrite,
459 now,
460 _guard: &guard,
461 pk_flip_join_table_option,
462 skip_phase_zero_auto_emit: false,
468 };
469 match compose(req) {
470 Ok(report) => {
471 for emit in &report.emitted_phase_zero {
475 let ext_summary = if emit.extensions.is_empty() {
476 "no extensions".to_string()
477 } else {
478 format!(
479 "extensions: {}",
480 emit.extensions
481 .iter()
482 .cloned()
483 .collect::<Vec<_>>()
484 .join(", ")
485 )
486 };
487 println!(
488 "auto-emitted bootstrap migration: {database}/_global_ ({ext_summary})",
489 database = emit.database,
490 );
491 }
492 for cb in &report.composed_buckets {
493 println!(
494 "composed {database}/{app}: {version} ({classification:?})",
495 database = cb.bucket.database,
496 app = if cb.bucket.app.is_empty() {
497 "_global_"
498 } else {
499 cb.bucket.app.as_str()
500 },
501 version = cb.version,
502 classification = cb.classification,
503 );
504 }
505 ExitCode::from(0)
506 }
507 Err(ComposeError::NothingToCompose) => {
508 println!("nothing to compose — model state matches snapshot for every bucket");
509 ExitCode::from(0)
513 }
514 Err(ComposeError::LinkageDropWithoutModels { ref text, .. }) => {
515 eprintln!("djogi migrations compose: {text}");
516 ExitCode::from(2)
518 }
519 Err(e) => {
520 eprintln!("djogi migrations compose: {e}");
521 ExitCode::from(1)
522 }
523 }
524}
525
526pub fn status_cmd(workspace: Option<PathBuf>) -> ExitCode {
531 let workspace = resolve_workspace(workspace);
532
533 let runtime = match tokio::runtime::Builder::new_current_thread()
535 .enable_all()
536 .build()
537 {
538 Ok(r) => r,
539 Err(e) => {
540 eprintln!("djogi migrations status: tokio runtime: {e}");
541 return ExitCode::from(1);
542 }
543 };
544
545 let exit = runtime.block_on(async { run_status(&workspace).await });
546 ExitCode::from(exit as u8)
547}
548
549async fn run_status(workspace: &Path) -> i32 {
558 use djogi::config::DjogiConfig;
559
560 let config = match DjogiConfig::load_from_workspace(workspace) {
561 Ok(c) => c,
562 Err(e) => {
563 eprintln!("djogi migrations status: config load: {e}");
564 return 1;
565 }
566 };
567
568 let mut ctx = match connect_and_check(&config.database.url).await {
569 ContextOutcome::Ready(ctx) => ctx,
570 ContextOutcome::UnsupportedVersion(e) => {
571 crate::print_support_boundary_error("migrations status", &e);
572 return 2;
573 }
574 ContextOutcome::RuntimeError(msg) => {
575 eprintln!("djogi migrations status: pool: {msg}");
576 return 1;
577 }
578 };
579
580 let rows = match djogi::migrate::select_all_ledger_rows(&mut ctx).await {
581 Ok(rows) => rows,
582 Err(e) => {
583 if e.to_string().contains("djogi_schema_migrations") {
586 println!("No migrations recorded.");
587 return 0;
588 }
589 eprintln!("djogi migrations status: ledger read: {e}");
590 return 1;
591 }
592 };
593
594 let registered: Vec<String> = AppRegistry::all()
595 .iter()
596 .map(|d| d.label.to_string())
597 .collect();
598 let report = djogi::migrate::render_status(&rows, ®istered);
599 for line in &report.lines {
600 println!("{line}");
601 }
602 report.exit_code
603}
604
605#[allow(clippy::large_enum_variant)]
626enum ContextOutcome {
627 Ready(djogi::context::DjogiContext),
629 UnsupportedVersion(djogi::error::DjogiError),
632 RuntimeError(String),
635}
636
637async fn connect_and_check(url: &str) -> ContextOutcome {
645 let pool = match djogi::pg::pool::DjogiPool::connect(url).await {
646 Ok(p) => p,
647 Err(e) => return ContextOutcome::RuntimeError(e.to_string()),
648 };
649 match djogi::pg::preflight::check_postgres_version(&pool).await {
650 Ok(_) => ContextOutcome::Ready(djogi::context::DjogiContext::from_pool(pool)),
651 Err(e @ djogi::error::DjogiError::UnsupportedPostgresVersion { .. }) => {
654 ContextOutcome::UnsupportedVersion(e)
655 }
656 Err(other) => ContextOutcome::RuntimeError(other.to_string()),
657 }
658}
659
660fn resolve_bucket_url(db_config: &djogi::config::DatabaseConfig, database: &str) -> Option<String> {
680 if database == djogi::apps::AppDescriptor::GLOBAL_DATABASE {
683 return Some(db_config.url.clone());
684 }
685 if database == "crud_log"
686 && let Some(u) = db_config.crud_log_url.as_deref()
687 && !u.is_empty()
688 {
689 return Some(u.to_string());
690 }
691 if database == "event_log"
692 && let Some(u) = db_config.event_log_url.as_deref()
693 && !u.is_empty()
694 {
695 return Some(u.to_string());
696 }
697 djogi::migrate::derive_per_database_url(&db_config.url, database)
698}
699
700pub fn apply_cmd(workspace: Option<PathBuf>, fake: bool, reason: Option<String>) -> ExitCode {
706 let workspace = resolve_workspace(workspace);
707
708 let mode = if fake {
710 match reason {
711 Some(r) if !r.trim().is_empty() => FakeMode::Fake { reason: r },
712 Some(_) => {
713 eprintln!(
714 "djogi migrations apply --fake: --reason must not be empty; \
715 supply a non-empty reason why these migrations are being \
716 faked (e.g. 'schema pre-exists from prior tooling')"
717 );
718 return ExitCode::from(2);
719 }
720 None => {
721 eprintln!(
722 "djogi migrations apply --fake: --reason is required; \
723 supply a reason why these migrations are being faked \
724 (e.g. 'schema pre-exists from prior tooling'). \
725 This is recorded in the ledger audit trail."
726 );
727 return ExitCode::from(2);
728 }
729 }
730 } else {
731 FakeMode::Real
732 };
733
734 let runtime = match tokio::runtime::Builder::new_current_thread()
735 .enable_all()
736 .build()
737 {
738 Ok(r) => r,
739 Err(e) => {
740 eprintln!("djogi migrations apply: tokio runtime: {e}");
741 return ExitCode::from(1);
742 }
743 };
744
745 let exit = runtime.block_on(async { run_apply(&workspace, &mode).await });
746 ExitCode::from(exit as u8)
747}
748
749#[derive(Debug, Clone)]
752enum FakeMode {
753 Real,
755 Fake { reason: String },
757}
758
759async fn run_apply(workspace: &Path, mode: &FakeMode) -> i32 {
761 use djogi::config::DjogiConfig;
762
763 let action_verb = match mode {
764 FakeMode::Real => "apply",
765 FakeMode::Fake { .. } => "fake-apply",
766 };
767 let progress_verb = match mode {
768 FakeMode::Real => "applying",
769 FakeMode::Fake { .. } => "faking",
770 };
771
772 let config = match DjogiConfig::load_from_workspace(workspace) {
774 Ok(c) => c,
775 Err(e) => {
776 eprintln!("djogi migrations {action_verb}: config load: {e}");
777 return 2;
778 }
779 };
780
781 let pool = match djogi::pg::pool::DjogiPool::connect(&config.database.url).await {
783 Ok(p) => p,
784 Err(e) => {
785 eprintln!("djogi migrations {action_verb}: pool connect: {e}");
786 return 1;
787 }
788 };
789 if let Err(e) = djogi::pg::preflight::check_postgres_version(&pool).await {
790 crate::print_support_boundary_error("migrations apply", &e);
791 return 2;
792 }
793
794 let pending_files = discover_pending_plans(workspace);
796 if pending_files.is_empty() {
797 println!("No pending migrations to {action_verb}.");
798 return 0;
799 }
800
801 let lock_path = workspace.join(LOCK_FILE_NAME);
803 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
804 Ok(g) => g,
805 Err(e) => {
806 eprintln!("djogi migrations {action_verb}: workspace lock: {e}");
807 return 1;
808 }
809 };
810
811 let audit_pool = match djogi::migrate::resolve_audit_url(&config) {
813 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
814 Err(_) => None,
815 };
816
817 let mut ctx = djogi::context::DjogiContext::from_pool(pool);
819
820 for (pending_path, bucket_database, app_label) in &pending_files {
822 println!(" {progress_verb} {bucket_database}/{app_label}...");
823 let result = apply_one_pending(
824 &mut ctx,
825 workspace,
826 pending_path,
827 bucket_database.clone(),
828 app_label.clone(),
829 &config,
830 &guard,
831 audit_pool.as_ref(),
832 mode,
833 )
834 .await;
835
836 match result {
837 ApplyResult::Ok => match mode {
838 FakeMode::Real => {
839 println!("Applied: {bucket_database}/{app_label}");
840 }
841 FakeMode::Fake { .. } => {
842 println!(
843 " faked {bucket_database}/{app_label}: \
844 recorded in ledger with status = 'faked' (no SQL executed)"
845 );
846 }
847 },
848 ApplyResult::Skipped(reason) => {
849 println!("Skipped {bucket_database}/{app_label}: {reason}");
850 }
851 ApplyResult::Refused(reason) => {
852 eprintln!(
853 "djogi migrations apply: refused {bucket_database}/{app_label}: {reason}"
854 );
855 return 2;
856 }
857 ApplyResult::RunnerError(e) => {
858 eprintln!(
859 "djogi migrations apply: runner error on {bucket_database}/{app_label}: {e}"
860 );
861 return runner_error_exit_code(&e);
862 }
863 }
864 }
865
866 let summary_verb = match mode {
867 FakeMode::Real => "applied",
868 FakeMode::Fake { .. } => "faked",
869 };
870 println!("{summary_verb} {} migration(s).", pending_files.len());
871 0
872}
873
874#[derive(Debug)]
876enum ApplyResult {
877 Ok,
879 Skipped(String),
881 Refused(String),
883 RunnerError(RunnerError),
885}
886
887fn discover_pending_plans(workspace: &Path) -> Vec<(PathBuf, String, String)> {
892 let pending_root = djogi::migrate::pending_root(workspace);
893 let mut out = Vec::new();
894
895 let Ok(db_entries) = std::fs::read_dir(&pending_root) else {
896 return out;
897 };
898
899 for db_entry in db_entries.flatten() {
900 let db_name = match db_entry.file_name().to_str().map(str::to_string) {
901 Some(n) => n,
902 None => continue,
903 };
904 if db_name.starts_with('.') {
905 continue;
906 }
907
908 let db_dir = db_entry.path();
909 if !db_dir.is_dir() {
910 continue;
911 }
912
913 let Ok(app_entries) = std::fs::read_dir(&db_dir) else {
914 continue;
915 };
916
917 for app_entry in app_entries.flatten() {
918 let path = app_entry.path();
919 if !path.is_file() {
920 continue;
921 }
922 let filename = match path.file_name().and_then(|f| f.to_str()) {
923 Some(f) => f,
924 None => continue,
925 };
926 if !filename.ends_with(".json") {
929 continue;
930 }
931 let app_label = if let Some(stripped) = filename.strip_suffix(".json") {
934 stripped.to_string()
935 } else {
936 continue;
937 };
938
939 out.push((path, db_name.clone(), app_label));
940 }
941 }
942
943 out.sort_by(|a, b| a.0.cmp(&b.0));
944 out
945}
946
947#[allow(clippy::too_many_arguments)]
960#[djogi::deliberately_bypass_convention_with_raw_sql]
961async fn apply_one_pending(
967 ctx: &mut djogi::context::DjogiContext,
968 workspace: &Path,
969 pending_path: &Path,
970 bucket_database: String,
971 app_label: String,
972 config: &djogi::config::DjogiConfig,
973 guard: &djogi::migrate::WorkspaceGuard,
974 audit_pool: Option<&deadpool_postgres::Pool>,
975 mode: &FakeMode,
976) -> ApplyResult {
977 let pending_bytes = match std::fs::read(pending_path) {
979 Ok(b) => b,
980 Err(e) => {
981 return ApplyResult::Refused(format!("read pending JSON: {e}"));
982 }
983 };
984 let pending: PendingPlan = match serde_json::from_slice(&pending_bytes) {
985 Ok(p) => p,
986 Err(e) => {
987 return ApplyResult::Refused(format!("parse pending JSON: {e}"));
988 }
989 };
990
991 let resolved_app = if app_label == "_global_" {
994 String::new()
995 } else {
996 app_label.clone()
997 };
998 let bucket = djogi::migrate::BucketKey {
999 database: bucket_database,
1000 app: resolved_app,
1001 };
1002
1003 match check_ledger_state(ctx, &pending.version).await {
1005 LedgerState::NotPresent => {} LedgerState::AlreadyApplied => {
1007 return ApplyResult::Skipped("already applied".to_string());
1008 }
1009 LedgerState::PendingOrPartial(existing_status) => {
1010 if existing_status == LedgerStatus::Failed
1015 || existing_status == LedgerStatus::RolledBack
1016 {
1017 if let Err(e) = delete_failed_ledger_row(ctx, &pending.version).await {
1023 return ApplyResult::Refused(format!(
1024 "clean {} ledger row: {e}",
1025 existing_status.as_db_str()
1026 ));
1027 }
1028 } else {
1029 return ApplyResult::Refused(format!(
1030 "version already in {} state — resolve before re-applying",
1031 existing_status.as_db_str()
1032 ));
1033 }
1034 }
1035 }
1036
1037 let (plan, checksum_up, checksum_down) = match load_replay_plan_from_disk(
1039 workspace,
1040 &bucket,
1041 &pending.version,
1042 &pending.checksum_up,
1043 pending.checksum_down.as_deref(),
1044 ) {
1045 Ok(result) => result,
1046 Err(e) => {
1047 return ApplyResult::Refused(format!("load replay plan: {e}"));
1048 }
1049 };
1050
1051 let runner_ctx = RunnerCtx {
1053 bucket: bucket.clone(),
1054 version: pending.version.clone(),
1055 description: pending.slug.clone(),
1056 checksum_up,
1057 checksum_down,
1058 snapshot: Some(pending.model_snapshot.clone()),
1059 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
1060 config: djogi::config::MigrateConfig {
1062 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
1063 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
1064 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
1065 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
1066 },
1067 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(config),
1068 audit_pool: audit_pool.cloned(),
1069 };
1070
1071 let runner_result = match mode {
1073 FakeMode::Real => apply_plan(ctx, &plan, &runner_ctx, guard).await,
1074 FakeMode::Fake { reason } => fake_apply_plan(ctx, &plan, &runner_ctx, guard, reason).await,
1075 };
1076 match runner_result {
1077 Ok(_) => ApplyResult::Ok,
1078 Err(e) => ApplyResult::RunnerError(e),
1079 }
1080}
1081
1082#[derive(Debug)]
1084enum LedgerState {
1085 NotPresent,
1087 AlreadyApplied,
1089 PendingOrPartial(LedgerStatus),
1091}
1092
1093async fn check_ledger_state(ctx: &mut djogi::context::DjogiContext, version: &str) -> LedgerState {
1095 let Ok(rows) = djogi::migrate::select_all_ledger_rows(ctx).await else {
1096 return LedgerState::NotPresent;
1099 };
1100
1101 let existing = rows.iter().find(|r| r.version == version);
1102 match existing {
1103 None => LedgerState::NotPresent,
1104 Some(row) => match row.status {
1105 LedgerStatus::Applied | LedgerStatus::Baseline | LedgerStatus::Faked => {
1106 LedgerState::AlreadyApplied
1107 }
1108 LedgerStatus::Pending | LedgerStatus::Failed | LedgerStatus::RolledBack => {
1109 LedgerState::PendingOrPartial(row.status)
1110 }
1111 },
1112 }
1113}
1114
1115fn runner_error_exit_code(_error: &RunnerError) -> i32 {
1120 1
1121}
1122
1123#[djogi::deliberately_bypass_convention_with_raw_sql]
1124async fn delete_failed_ledger_row(
1129 ctx: &mut djogi::context::DjogiContext,
1130 version: &str,
1131) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1132 ctx.raw_execute(
1133 "DELETE FROM djogi_schema_migrations WHERE version = $1",
1134 &[&version],
1135 )
1136 .await?;
1137 Ok(())
1138}
1139
1140fn reconstruct_snapshot_path(workspace: &Path, bucket: &djogi::migrate::BucketKey) -> PathBuf {
1142 let migrations_root = djogi::migrate::migrations_root(workspace);
1143 migrations_root
1144 .join(&bucket.database)
1145 .join(djogi::migrate::app_dirname(&bucket.app))
1146 .join("schema_snapshot.json")
1147}
1148
1149#[allow(clippy::too_many_arguments)]
1176pub fn attune_cmd(
1177 target: Option<&str>,
1178 apply: bool,
1179 record: bool,
1180 record_ledger: bool,
1181 record_reason: &str,
1182 squash: bool,
1183 from: Option<&str>,
1184 publish: bool,
1185 app: Option<&str>,
1186 workspace: Option<PathBuf>,
1187) -> ExitCode {
1188 let workspace = resolve_workspace(workspace);
1189 let mode = match (record_ledger, squash) {
1190 (false, false) => AttuneMode::DiffOnly,
1191 (true, false) => AttuneMode::Record {
1192 reason: record_reason.to_string(),
1193 },
1194 (false, true) => match from {
1195 Some(v) if !v.is_empty() => AttuneMode::Squash {
1196 from: v.to_string(),
1197 publish,
1198 app: app.filter(|s| !s.is_empty()).map(|s| s.to_string()),
1199 },
1200 _ => {
1201 eprintln!(
1202 "djogi migrations attune --squash requires --from <version> (e.g. \
1203 `--from V20260101000000__init`)"
1204 );
1205 return ExitCode::from(2);
1206 }
1207 },
1208 (true, true) => {
1209 eprintln!(
1212 "djogi migrations attune: --record-ledger and --squash are mutually exclusive"
1213 );
1214 return ExitCode::from(2);
1215 }
1216 };
1217
1218 let runtime = match tokio::runtime::Builder::new_current_thread()
1219 .enable_all()
1220 .build()
1221 {
1222 Ok(r) => r,
1223 Err(e) => {
1224 eprintln!("djogi migrations attune: tokio runtime: {e}");
1225 return ExitCode::from(1);
1226 }
1227 };
1228
1229 let target_owned = target.map(str::to_string);
1230 let exit =
1231 runtime.block_on(async { run_attune(&workspace, mode, target_owned, apply, record).await });
1232 ExitCode::from(exit as u8)
1233}
1234
1235async fn run_attune(
1238 workspace: &Path,
1239 mode: AttuneMode,
1240 target: Option<String>,
1241 apply: bool,
1242 record: bool,
1243) -> i32 {
1244 use djogi::config::DjogiConfig;
1245
1246 let config = match DjogiConfig::load_from_workspace(workspace) {
1247 Ok(c) => c,
1248 Err(e) => {
1249 eprintln!("djogi migrations attune: config load: {e}");
1250 return 1;
1251 }
1252 };
1253
1254 let mut ctx = match connect_and_check(&config.database.url).await {
1255 ContextOutcome::Ready(ctx) => ctx,
1256 ContextOutcome::UnsupportedVersion(e) => {
1257 crate::print_support_boundary_error("migrations attune", &e);
1258 return 2;
1259 }
1260 ContextOutcome::RuntimeError(msg) => {
1261 eprintln!("djogi migrations attune: pool: {msg}");
1262 return 1;
1263 }
1264 };
1265
1266 let lock_path = workspace.join(LOCK_FILE_NAME);
1270 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
1271 Ok(g) => g,
1272 Err(e) => {
1273 eprintln!("djogi migrations attune: failed to acquire workspace lock: {e}");
1274 return 1;
1275 }
1276 };
1277
1278 let req = AttuneRequest {
1279 workspace_root: workspace,
1280 database_url: &config.database.url,
1281 profile: &config.profile,
1282 dev_mode: config.database.dev_mode,
1286 target: target.as_deref(),
1291 apply,
1292 record,
1293 mode,
1294 _guard: &guard,
1295 };
1296 match attune(&mut ctx, req).await {
1297 Ok(report) => {
1298 if report.entries.is_empty() {
1299 println!("attune: no drift");
1300 } else {
1301 for entry in &report.entries {
1302 let app_display = if entry.bucket.app.is_empty() {
1303 "_global_"
1304 } else {
1305 entry.bucket.app.as_str()
1306 };
1307 println!(
1308 " {kind:<10} {database}/{app} {version}",
1309 kind = entry.kind.as_str(),
1310 database = entry.bucket.database,
1311 app = app_display,
1312 version = entry.version,
1313 );
1314 }
1315 }
1316 for diag in &report.diagnostics {
1320 println!(" diagnostic: {diag}");
1321 }
1322 if let Some(sha) = &report.resolved_target {
1323 println!("resolved target: {sha}");
1324 }
1325 if let Some(squashed) = &report.squashed_to {
1326 println!("squashed to: {squashed}");
1327 }
1328 if report.published {
1329 println!("published to remote");
1330 }
1331 if report.parent_pointer_updated {
1332 println!("parent submodule pointer updated");
1333 }
1334 0
1335 }
1336 Err(e) => {
1337 eprintln!("djogi migrations attune: {e}");
1338 attune_error_exit_code(&e)
1339 }
1340 }
1341}
1342
1343fn attune_error_exit_code(err: &AttuneError) -> i32 {
1358 match err {
1359 AttuneError::Refused(_) => 2,
1360 AttuneError::FilesystemScanFailed { .. }
1361 | AttuneError::LedgerQueryFailed { .. }
1362 | AttuneError::SqlReadFailed { .. }
1363 | AttuneError::SqlWriteFailed { .. }
1364 | AttuneError::SqlDeleteFailed { .. }
1365 | AttuneError::GitPublishFailed { .. }
1366 | AttuneError::GitTargetResolveFailed { .. }
1367 | AttuneError::GitFetchFailed { .. }
1368 | AttuneError::GitUpdateSubmodulePointerFailed { .. } => 1,
1369 }
1370}
1371
1372pub fn verify_cmd(
1380 provider: &dyn DescriptorProvider,
1381 workspace: Option<PathBuf>,
1382 strict: bool,
1383) -> ExitCode {
1384 let workspace = resolve_workspace(workspace);
1385
1386 let runtime = match tokio::runtime::Builder::new_current_thread()
1387 .enable_all()
1388 .build()
1389 {
1390 Ok(r) => r,
1391 Err(e) => {
1392 eprintln!("djogi migrations verify: tokio runtime: {e}");
1393 return ExitCode::from(1);
1394 }
1395 };
1396
1397 let exit = runtime.block_on(async { run_verify(provider, &workspace, strict).await });
1398 ExitCode::from(exit as u8)
1399}
1400
1401async fn run_verify(provider: &dyn DescriptorProvider, workspace: &Path, strict: bool) -> i32 {
1416 use djogi::config::DjogiConfig;
1417
1418 if provider.models().is_empty() && discover_snapshot_buckets_on_disk(workspace).is_empty() {
1430 crate::print_zero_descriptor_diagnostic("migrations verify");
1431 return 2;
1432 }
1433
1434 let config = match DjogiConfig::load_from_workspace(workspace) {
1436 Ok(c) => c,
1437 Err(e) => {
1438 eprintln!("djogi migrations verify: config load: {e}");
1439 return 1;
1440 }
1441 };
1442
1443 let models = match project_from_provider(provider) {
1445 Ok(m) => m,
1446 Err(e) => {
1447 eprintln!("djogi migrations verify: projection error: {e}");
1448 return 1;
1449 }
1450 };
1451
1452 let mut bucket_set: std::collections::BTreeSet<djogi::migrate::BucketKey> =
1458 models.keys().cloned().collect();
1459 for bucket in discover_snapshot_buckets_on_disk(workspace) {
1460 bucket_set.insert(bucket);
1461 }
1462 if bucket_set.is_empty() {
1469 crate::print_zero_descriptor_diagnostic("migrations verify");
1470 return 2;
1471 }
1472
1473 let policy = djogi::config::PolicyConfig {
1475 strict_out_of_order: strict,
1476 };
1477
1478 let database_has_models: std::collections::HashSet<String> = bucket_set
1485 .iter()
1486 .filter(|b| {
1487 models
1488 .get(*b)
1489 .map(|s| !s.models.is_empty())
1490 .unwrap_or(false)
1491 })
1492 .map(|b| b.database.clone())
1493 .collect();
1494
1495 let mut contexts: std::collections::BTreeMap<String, djogi::context::DjogiContext> =
1501 std::collections::BTreeMap::new();
1502 let mut seen_ledger_databases = std::collections::HashSet::<String>::new();
1503 let mut exit_code: i32 = 0;
1504
1505 for bucket in &bucket_set {
1507 let Some(url) = resolve_bucket_url(&config.database, &bucket.database) else {
1509 let bd = if bucket.app.is_empty() {
1510 "_global_"
1511 } else {
1512 &bucket.app
1513 };
1514 eprintln!(
1515 "djogi migrations verify: cannot derive URL for database '{}' (bucket {}/{}); \
1516 check that config.database.url has a valid path component",
1517 bucket.database, bucket.database, bd
1518 );
1519 exit_code = 1;
1520 continue;
1521 };
1522
1523 if !contexts.contains_key(&bucket.database) {
1527 match connect_and_check(&url).await {
1528 ContextOutcome::Ready(ctx) => {
1529 contexts.insert(bucket.database.clone(), ctx);
1530 }
1531 ContextOutcome::UnsupportedVersion(e) => {
1532 crate::print_support_boundary_error("migrations verify", &e);
1533 return 2;
1534 }
1535 ContextOutcome::RuntimeError(msg) => {
1536 eprintln!(
1537 "djogi migrations verify: pool for '{}': {msg}",
1538 bucket.database
1539 );
1540 exit_code = 1;
1541 continue;
1542 }
1543 }
1544 }
1545
1546 let snap_path = snapshot_path(workspace, bucket);
1551 let snapshot = match load_snapshot(&snap_path) {
1552 Ok(s) => s,
1553 Err(SnapshotError::Io { source, .. })
1554 if source.kind() == std::io::ErrorKind::NotFound =>
1555 {
1556 let bd = if bucket.app.is_empty() {
1557 "_global_"
1558 } else {
1559 &bucket.app
1560 };
1561 let has_models = models
1562 .get(bucket)
1563 .map(|s| !s.models.is_empty())
1564 .unwrap_or(false);
1565 if has_models {
1566 eprintln!(
1567 "djogi migrations verify: {}/{} has registered models but no \
1568 snapshot; run `djogi migrations compose` then \
1569 `djogi migrations apply` to record a baseline",
1570 bucket.database, bd
1571 );
1572 exit_code = 1;
1573 } else {
1574 println!("No snapshot found for bucket {}/{}", bucket.database, bd);
1575 }
1576 continue;
1577 }
1578 Err(e) => {
1579 let bd = if bucket.app.is_empty() {
1580 "_global_"
1581 } else {
1582 &bucket.app
1583 };
1584 eprintln!(
1585 "djogi migrations verify: load snapshot for {}/{}: {e}",
1586 bucket.database, bd
1587 );
1588 exit_code = 1;
1589 continue;
1590 }
1591 };
1592
1593 let db_has_models = database_has_models.contains(&bucket.database);
1598 let emit_ledger = db_has_models && seen_ledger_databases.insert(bucket.database.clone());
1599
1600 let ctx = contexts
1602 .get_mut(&bucket.database)
1603 .expect("context inserted above");
1604 let report = match djogi::migrate::verify_bucket(
1605 ctx,
1606 bucket,
1607 &snapshot,
1608 &policy,
1609 emit_ledger,
1610 db_has_models,
1611 )
1612 .await
1613 {
1614 Ok(r) => r,
1615 Err(e) => {
1616 let bd = if bucket.app.is_empty() {
1617 "_global_"
1618 } else {
1619 &bucket.app
1620 };
1621 eprintln!(
1622 "djogi migrations verify: error for {}/{}: {e}",
1623 bucket.database, bd
1624 );
1625 exit_code = 1;
1626 continue;
1627 }
1628 };
1629
1630 for line in render_verify_report(&report, bucket) {
1632 println!("{line}");
1633 }
1634 if report.has_errors() {
1635 exit_code = 1;
1636 }
1637 }
1638
1639 exit_code
1640}
1641
1642fn render_verify_report(report: &VerifyReport, bucket: &BucketKey) -> Vec<String> {
1650 let mut lines: Vec<String> = Vec::new();
1651
1652 let app_display = if bucket.app.is_empty() {
1653 "_global_"
1654 } else {
1655 &bucket.app
1656 };
1657 lines.push(format!(
1658 "djogi migrations verify — {}/{}",
1659 bucket.database, app_display
1660 ));
1661 lines.push("──────────────────────────────────────────".to_string());
1662
1663 match (
1664 &report.latest_applied_version,
1665 report.applied_count,
1666 report.unfinished_count,
1667 ) {
1668 (Some(version), applied, 0) => {
1669 lines.push(format!("Ledger: {applied} applied, latest {version}"));
1670 }
1671 (Some(version), applied, unfinished) => {
1672 lines.push(format!(
1673 "Ledger: {applied} applied, {unfinished} unfinished, latest {version}"
1674 ));
1675 }
1676 (None, 0, 0) => {
1677 lines.push("Ledger: empty (no migrations applied yet)".to_string());
1678 }
1679 _ => {}
1680 }
1681 lines.push(String::new());
1682
1683 if report.diagnostics.is_empty() {
1684 lines.push("No drift detected. Schema is consistent.".to_string());
1685 } else {
1686 for d in &report.diagnostics {
1687 let severity = match d.severity {
1688 VerifySeverity::Info => "INFO",
1689 VerifySeverity::Warning => "WARN",
1690 VerifySeverity::Error => "ERROR",
1691 };
1692 let location = d.location.as_deref().unwrap_or("-");
1693 lines.push(format!(
1694 "[{severity}] {code} ({loc}): {msg}",
1695 severity = severity,
1696 code = d.code,
1697 loc = location,
1698 msg = d.message
1699 ));
1700 }
1701 }
1702
1703 let errors = report
1704 .diagnostics
1705 .iter()
1706 .filter(|d| d.severity == VerifySeverity::Error)
1707 .count();
1708 let warnings = report
1709 .diagnostics
1710 .iter()
1711 .filter(|d| d.severity == VerifySeverity::Warning)
1712 .count();
1713 let infos = report
1714 .diagnostics
1715 .iter()
1716 .filter(|d| d.severity == VerifySeverity::Info)
1717 .count();
1718
1719 if errors > 0 {
1720 lines.push(String::new());
1721 lines.push(format!(
1722 "Result: FAILED ({errors} error(s), {warnings} warning(s), {infos} info(s))"
1723 ));
1724 } else if warnings > 0 {
1725 lines.push(String::new());
1726 lines.push(format!(
1727 "Result: PASSED with warnings ({warnings} warning(s), {infos} info(s))"
1728 ));
1729 } else {
1730 lines.push(String::new());
1731 lines.push(format!("Result: PASSED ({infos} info(s))"));
1732 }
1733
1734 lines
1735}
1736
1737impl From<PartialApplyResolutionCli> for PartialApplyResolution {
1740 fn from(cli: PartialApplyResolutionCli) -> Self {
1741 match cli {
1742 PartialApplyResolutionCli::RolledBack => Self::MarkRolledBack,
1743 PartialApplyResolutionCli::Faked => Self::MarkFaked,
1744 PartialApplyResolutionCli::Applied => Self::MarkApplied,
1745 }
1746 }
1747}
1748
1749pub fn repair_cmd(command: RepairSubcommand) -> ExitCode {
1754 match command {
1755 RepairSubcommand::ChecksumDrift {
1756 version,
1757 app,
1758 database,
1759 checksum_up,
1760 checksum_down,
1761 workspace,
1762 } => repair_checksum_drift_cmd(
1763 &version,
1764 app.as_deref(),
1765 database.as_deref(),
1766 checksum_up.as_deref(),
1767 checksum_down.as_deref(),
1768 workspace,
1769 ),
1770 RepairSubcommand::PartialApply {
1771 version,
1772 resolution,
1773 note,
1774 app,
1775 database,
1776 workspace,
1777 } => repair_partial_apply_cmd(
1778 &version,
1779 resolution.into(),
1780 ¬e,
1781 app.as_deref(),
1782 database.as_deref(),
1783 workspace,
1784 ),
1785 RepairSubcommand::ResumePartial {
1786 version,
1787 app,
1788 database,
1789 workspace,
1790 } => repair_resume_partial_apply_cmd(
1791 &version,
1792 app.as_deref(),
1793 database.as_deref(),
1794 workspace,
1795 ),
1796 RepairSubcommand::SnapshotRebuild {
1797 app,
1798 database,
1799 snapshot_path,
1800 workspace,
1801 } => repair_snapshot_rebuild_cmd(
1802 app.as_deref(),
1803 database.as_deref(),
1804 snapshot_path.as_deref(),
1805 workspace,
1806 ),
1807 }
1808}
1809
1810fn render_repair_report(report: &RepairReport) {
1814 for action in &report.actions_taken {
1815 println!(" {action}");
1816 }
1817 if !report.ledger_changes.is_empty() {
1818 println!("Ledger changes:");
1819 for lc in &report.ledger_changes {
1820 println!(
1821 " {} | {} | {} -> {}",
1822 lc.version, lc.column, lc.before, lc.after,
1823 );
1824 }
1825 }
1826 if !report.snapshot_changes.is_empty() {
1827 println!("Snapshot changes:");
1828 for sc in &report.snapshot_changes {
1829 println!(" {} | {}", sc.path.display(), sc.description);
1830 }
1831 }
1832}
1833
1834fn repair_error_exit_code(err: &RepairError) -> i32 {
1848 match err {
1849 RepairError::LedgerIo { .. } | RepairError::SnapshotIo { .. } | RepairError::AdvisoryLockFailed { .. } | RepairError::AdvisoryLockQueryFailed { .. } | RepairError::PinnedSessionCheckoutFailed { .. } | RepairError::ResumeStepFailed { .. } | RepairError::ResumeProgressAckFailed { .. } => 1,
1860
1861 RepairError::VersionNotFound { .. }
1865 | RepairError::InsufficientConfirmation
1866 | RepairError::InvalidChecksum { .. }
1867 | RepairError::InvalidResolution { .. }
1868 | RepairError::BucketAppMismatch { .. }
1869 | RepairError::PlanVersionMismatch { .. }
1870 | RepairError::PlanChecksumMismatch { .. }
1871 | RepairError::LeafIdentityMismatch { .. }
1872 | RepairError::NothingToResume { .. }
1873 | RepairError::ResumeBlockedByNonTxProgressClaim { .. }
1874 | RepairError::SuppliedSnapshotDiverges { .. }
1875 | RepairError::AdvisoryUnlockReturnedFalse { .. } | RepairError::ResumePlanShapeMismatch { .. }
1877 | RepairError::ReplayPlanShapeMismatch { .. }
1878 => 2,
1879 }
1880}
1881
1882fn resolve_database(database: Option<&str>, _config: &djogi::config::DjogiConfig) -> String {
1889 database.unwrap_or("main").to_string()
1890}
1891
1892fn compute_checksum_up_from_disk(
1906 workspace: &Path,
1907 bucket: &djogi::migrate::BucketKey,
1908 version: &str,
1909) -> std::io::Result<String> {
1910 let path =
1911 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::up_filename(version));
1912 let sql = std::fs::read_to_string(&path)?;
1913 Ok(djogi::migrate::compute_committed_sql_checksum(
1914 &sql,
1915 djogi::migrate::ResetSqlSide::Up,
1916 ))
1917}
1918
1919fn compute_checksum_down_from_disk(
1929 workspace: &Path,
1930 bucket: &djogi::migrate::BucketKey,
1931 version: &str,
1932) -> std::io::Result<Option<String>> {
1933 let path =
1934 djogi::migrate::bucket_dir(workspace, bucket).join(djogi::migrate::down_filename(version));
1935 let sql = match std::fs::read_to_string(&path) {
1936 Ok(s) => s,
1937 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1938 Err(e) => return Err(e),
1939 };
1940 Ok(djogi::migrate::compute_committed_down_sql_checksum(&sql))
1941}
1942
1943pub fn repair_checksum_drift_cmd(
1950 version: &str,
1951 app: Option<&str>,
1952 database: Option<&str>,
1953 checksum_up: Option<&str>,
1954 checksum_down: Option<&str>,
1955 workspace: Option<PathBuf>,
1956) -> ExitCode {
1957 let workspace = resolve_workspace(workspace);
1958 let runtime = match tokio::runtime::Builder::new_current_thread()
1959 .enable_all()
1960 .build()
1961 {
1962 Ok(r) => r,
1963 Err(e) => {
1964 eprintln!("djogi migrations repair checksum-drift: tokio runtime: {e}");
1965 return ExitCode::from(1);
1966 }
1967 };
1968 let exit = runtime.block_on(async {
1969 run_repair_checksum_drift(
1970 &workspace,
1971 version,
1972 app,
1973 database,
1974 checksum_up,
1975 checksum_down,
1976 )
1977 .await
1978 });
1979 ExitCode::from(exit as u8)
1980}
1981
1982async fn run_repair_checksum_drift(
1984 workspace: &Path,
1985 version: &str,
1986 app: Option<&str>,
1987 database: Option<&str>,
1988 checksum_up: Option<&str>,
1989 checksum_down: Option<&str>,
1990) -> i32 {
1991 use djogi::config::DjogiConfig;
1992
1993 let config = match DjogiConfig::load_from_workspace(workspace) {
1994 Ok(c) => c,
1995 Err(e) => {
1996 eprintln!("djogi migrations repair checksum-drift: config load: {e}");
1997 return 1;
1998 }
1999 };
2000
2001 let db_name = resolve_database(database, &config);
2006 let url = match resolve_bucket_url(&config.database, &db_name) {
2007 Some(u) => u,
2008 None => {
2009 eprintln!(
2010 "djogi migrations repair checksum-drift: cannot derive a database URL for `{db_name}`"
2011 );
2012 return 2;
2013 }
2014 };
2015
2016 let mut ctx = match connect_and_check(&url).await {
2017 ContextOutcome::Ready(ctx) => ctx,
2018 ContextOutcome::UnsupportedVersion(e) => {
2019 crate::print_support_boundary_error("migrations repair checksum-drift", &e);
2020 return 2;
2021 }
2022 ContextOutcome::RuntimeError(msg) => {
2023 eprintln!("djogi migrations repair checksum-drift: pool: {msg}");
2024 return 1;
2025 }
2026 };
2027
2028 let lock_path = workspace.join(LOCK_FILE_NAME);
2029 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2030 Ok(g) => g,
2031 Err(e) => {
2032 eprintln!("djogi migrations repair checksum-drift: workspace lock: {e}");
2033 return 1;
2034 }
2035 };
2036
2037 let app_label = app.unwrap_or("");
2038 let bucket = BucketKey {
2039 database: db_name,
2040 app: app_label.to_string(),
2041 };
2042
2043 let new_checksum_up = match checksum_up {
2044 Some(c) => c.to_string(),
2045 None => {
2046 match compute_checksum_up_from_disk(workspace, &bucket, version) {
2052 Ok(cs) => cs,
2053 Err(e) => {
2054 eprintln!("djogi migrations repair checksum-drift: compute checksum_up: {e}");
2055 return 1;
2056 }
2057 }
2058 }
2059 };
2060
2061 let resolved_checksum_down = match checksum_down {
2062 Some(c) => Some(c.to_string()),
2063 None => {
2064 match compute_checksum_down_from_disk(workspace, &bucket, version) {
2069 Ok(cs_opt) => cs_opt,
2070 Err(e) => {
2071 eprintln!("djogi migrations repair checksum-drift: read down SQL: {e}");
2072 return 1;
2073 }
2074 }
2075 }
2076 };
2077
2078 match repair_checksum_drift(
2079 &mut ctx,
2080 &guard,
2081 &bucket,
2082 version,
2083 &new_checksum_up,
2084 resolved_checksum_down.as_deref(),
2085 RepairConfirmation::OperatorAcknowledged,
2086 )
2087 .await
2088 {
2089 Ok(report) => {
2090 render_repair_report(&report);
2091 0
2092 }
2093 Err(e) => {
2094 eprintln!("djogi migrations repair checksum-drift: {e}");
2095 repair_error_exit_code(&e)
2096 }
2097 }
2098}
2099
2100pub fn repair_partial_apply_cmd(
2105 version: &str,
2106 resolution: PartialApplyResolution,
2107 note: &str,
2108 app: Option<&str>,
2109 database: Option<&str>,
2110 workspace: Option<PathBuf>,
2111) -> ExitCode {
2112 let workspace = resolve_workspace(workspace);
2113 let runtime = match tokio::runtime::Builder::new_current_thread()
2114 .enable_all()
2115 .build()
2116 {
2117 Ok(r) => r,
2118 Err(e) => {
2119 eprintln!("djogi migrations repair partial-apply: tokio runtime: {e}");
2120 return ExitCode::from(1);
2121 }
2122 };
2123 let exit = runtime.block_on(async {
2124 run_repair_partial_apply(&workspace, version, resolution, note, app, database).await
2125 });
2126 ExitCode::from(exit as u8)
2127}
2128
2129async fn run_repair_partial_apply(
2131 workspace: &Path,
2132 version: &str,
2133 resolution: PartialApplyResolution,
2134 note: &str,
2135 app: Option<&str>,
2136 database: Option<&str>,
2137) -> i32 {
2138 use djogi::config::DjogiConfig;
2139
2140 let config = match DjogiConfig::load_from_workspace(workspace) {
2141 Ok(c) => c,
2142 Err(e) => {
2143 eprintln!("djogi migrations repair partial-apply: config load: {e}");
2144 return 1;
2145 }
2146 };
2147
2148 let db_name = resolve_database(database, &config);
2153 let url = match resolve_bucket_url(&config.database, &db_name) {
2154 Some(u) => u,
2155 None => {
2156 eprintln!(
2157 "djogi migrations repair partial-apply: cannot derive a database URL for `{db_name}`"
2158 );
2159 return 2;
2160 }
2161 };
2162
2163 let mut ctx = match connect_and_check(&url).await {
2164 ContextOutcome::Ready(ctx) => ctx,
2165 ContextOutcome::UnsupportedVersion(e) => {
2166 crate::print_support_boundary_error("migrations repair partial-apply", &e);
2167 return 2;
2168 }
2169 ContextOutcome::RuntimeError(msg) => {
2170 eprintln!("djogi migrations repair partial-apply: pool: {msg}");
2171 return 1;
2172 }
2173 };
2174
2175 let lock_path = workspace.join(LOCK_FILE_NAME);
2176 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2177 Ok(g) => g,
2178 Err(e) => {
2179 eprintln!("djogi migrations repair partial-apply: workspace lock: {e}");
2180 return 1;
2181 }
2182 };
2183
2184 let app_label = app.unwrap_or("");
2185 let bucket = BucketKey {
2186 database: db_name,
2187 app: app_label.to_string(),
2188 };
2189
2190 match repair_partial_apply(
2191 &mut ctx,
2192 &guard,
2193 &bucket,
2194 version,
2195 resolution,
2196 note,
2197 RepairConfirmation::OperatorAcknowledged,
2198 )
2199 .await
2200 {
2201 Ok(report) => {
2202 render_repair_report(&report);
2203 0
2204 }
2205 Err(e) => {
2206 eprintln!("djogi migrations repair partial-apply: {e}");
2207 repair_error_exit_code(&e)
2208 }
2209 }
2210}
2211
2212pub fn repair_resume_partial_apply_cmd(
2218 version: &str,
2219 app: Option<&str>,
2220 database: Option<&str>,
2221 workspace: Option<PathBuf>,
2222) -> ExitCode {
2223 let workspace = resolve_workspace(workspace);
2224 let runtime = match tokio::runtime::Builder::new_current_thread()
2225 .enable_all()
2226 .build()
2227 {
2228 Ok(r) => r,
2229 Err(e) => {
2230 eprintln!("djogi migrations repair resume-partial: tokio runtime: {e}");
2231 return ExitCode::from(1);
2232 }
2233 };
2234 let exit = runtime
2235 .block_on(async { run_repair_resume_partial(&workspace, version, app, database).await });
2236 ExitCode::from(exit as u8)
2237}
2238
2239async fn run_repair_resume_partial(
2241 workspace: &Path,
2242 version: &str,
2243 app: Option<&str>,
2244 database: Option<&str>,
2245) -> i32 {
2246 use djogi::config::DjogiConfig;
2247
2248 let config = match DjogiConfig::load_from_workspace(workspace) {
2249 Ok(c) => c,
2250 Err(e) => {
2251 eprintln!("djogi migrations repair resume-partial: config load: {e}");
2252 return 1;
2253 }
2254 };
2255
2256 let db_name = resolve_database(database, &config);
2261 let url = match resolve_bucket_url(&config.database, &db_name) {
2262 Some(u) => u,
2263 None => {
2264 eprintln!(
2265 "djogi migrations repair resume-partial: cannot derive a database URL for `{db_name}`"
2266 );
2267 return 2;
2268 }
2269 };
2270
2271 let mut ctx = match connect_and_check(&url).await {
2272 ContextOutcome::Ready(ctx) => ctx,
2273 ContextOutcome::UnsupportedVersion(e) => {
2274 crate::print_support_boundary_error("migrations repair resume-partial", &e);
2275 return 2;
2276 }
2277 ContextOutcome::RuntimeError(msg) => {
2278 eprintln!("djogi migrations repair resume-partial: pool: {msg}");
2279 return 1;
2280 }
2281 };
2282
2283 let lock_path = workspace.join(LOCK_FILE_NAME);
2284 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2285 Ok(g) => g,
2286 Err(e) => {
2287 eprintln!("djogi migrations repair resume-partial: workspace lock: {e}");
2288 return 1;
2289 }
2290 };
2291
2292 let app_label = app.unwrap_or("");
2293 let bucket = BucketKey {
2294 database: db_name,
2295 app: app_label.to_string(),
2296 };
2297
2298 let plan = match load_committed_plan_for_resume(workspace, &bucket, version) {
2304 Ok(p) => p,
2305 Err(e) => {
2306 eprintln!("djogi migrations repair resume-partial: load plan: {e}");
2307 return 2;
2308 }
2309 };
2310
2311 match repair_resume_partial_apply(
2312 &mut ctx,
2313 &guard,
2314 version,
2315 &plan,
2316 RepairConfirmation::OperatorAcknowledged,
2317 )
2318 .await
2319 {
2320 Ok(report) => {
2321 render_repair_report(&report);
2322 0
2323 }
2324 Err(e) => {
2325 eprintln!("djogi migrations repair resume-partial: {e}");
2326 repair_error_exit_code(&e)
2327 }
2328 }
2329}
2330
2331fn load_committed_plan_for_resume(
2345 workspace: &Path,
2346 bucket: &djogi::migrate::BucketKey,
2347 version: &str,
2348) -> Result<djogi::migrate::MigrationPlan, String> {
2349 let bucket_dir = djogi::migrate::bucket_dir(workspace, bucket);
2350 let plan_path = bucket_dir.join(format!("{version}.plan.json"));
2351 let bytes = std::fs::read(&plan_path).map_err(|e| format!("{}: {e}", plan_path.display()))?;
2352 let stored: CliReplayPlan = serde_json::from_slice(&bytes)
2353 .map_err(|e| format!("{}: parse: {e}", plan_path.display()))?;
2354 if stored.format_version != CLI_REPLAY_PLAN_FORMAT_VERSION {
2355 return Err(format!(
2356 "{}: unsupported format version {} (expected {CLI_REPLAY_PLAN_FORMAT_VERSION})",
2357 plan_path.display(),
2358 stored.format_version,
2359 ));
2360 }
2361 Ok(djogi::migrate::MigrationPlan {
2362 bucket: bucket.clone(),
2363 classification: stored.classification.into(),
2364 segments: stored
2365 .segments
2366 .into_iter()
2367 .map(|seg| djogi::migrate::Segment {
2368 kind: seg.kind.into(),
2369 statements: seg
2370 .statements
2371 .into_iter()
2372 .map(|stmt| djogi::migrate::OperationSql {
2373 label: stmt.label,
2374 up: stmt.up,
2375 down: String::new(),
2376 lossy: None,
2377 })
2378 .collect(),
2379 })
2380 .collect(),
2381 })
2382}
2383
2384pub fn repair_snapshot_rebuild_cmd(
2390 app: Option<&str>,
2391 database: Option<&str>,
2392 snapshot_path: Option<&Path>,
2393 workspace: Option<PathBuf>,
2394) -> ExitCode {
2395 let workspace = resolve_workspace(workspace);
2396 let runtime = match tokio::runtime::Builder::new_current_thread()
2397 .enable_all()
2398 .build()
2399 {
2400 Ok(r) => r,
2401 Err(e) => {
2402 eprintln!("djogi migrations repair snapshot-rebuild: tokio runtime: {e}");
2403 return ExitCode::from(1);
2404 }
2405 };
2406 let exit = runtime.block_on(async {
2407 run_repair_snapshot_rebuild(&workspace, app, database, snapshot_path).await
2408 });
2409 ExitCode::from(exit as u8)
2410}
2411
2412async fn run_repair_snapshot_rebuild(
2414 workspace: &Path,
2415 app: Option<&str>,
2416 database: Option<&str>,
2417 snapshot_path: Option<&Path>,
2418) -> i32 {
2419 use djogi::config::DjogiConfig;
2420
2421 let config = match DjogiConfig::load_from_workspace(workspace) {
2422 Ok(c) => c,
2423 Err(e) => {
2424 eprintln!("djogi migrations repair snapshot-rebuild: config load: {e}");
2425 return 1;
2426 }
2427 };
2428
2429 let db_name = resolve_database(database, &config);
2434 let url = match resolve_bucket_url(&config.database, &db_name) {
2435 Some(u) => u,
2436 None => {
2437 eprintln!(
2438 "djogi migrations repair snapshot-rebuild: cannot derive a database URL for `{db_name}`"
2439 );
2440 return 2;
2441 }
2442 };
2443
2444 let mut ctx = match connect_and_check(&url).await {
2445 ContextOutcome::Ready(ctx) => ctx,
2446 ContextOutcome::UnsupportedVersion(e) => {
2447 crate::print_support_boundary_error("migrations repair snapshot-rebuild", &e);
2448 return 2;
2449 }
2450 ContextOutcome::RuntimeError(msg) => {
2451 eprintln!("djogi migrations repair snapshot-rebuild: pool: {msg}");
2452 return 1;
2453 }
2454 };
2455
2456 let lock_path = workspace.join(LOCK_FILE_NAME);
2457 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2458 Ok(g) => g,
2459 Err(e) => {
2460 eprintln!("djogi migrations repair snapshot-rebuild: workspace lock: {e}");
2461 return 1;
2462 }
2463 };
2464
2465 let app_label = app.unwrap_or("");
2466 let bucket = BucketKey {
2467 database: db_name,
2468 app: app_label.to_string(),
2469 };
2470
2471 let snap_path = match snapshot_path {
2472 Some(p) => p.to_path_buf(),
2473 None => reconstruct_snapshot_path(workspace, &bucket),
2474 };
2475
2476 match repair_snapshot_rebuild(
2477 &mut ctx,
2478 &guard,
2479 &bucket,
2480 &snap_path,
2481 RepairConfirmation::OperatorAcknowledged,
2482 )
2483 .await
2484 {
2485 Ok(report) => {
2486 render_repair_report(&report);
2487 0
2488 }
2489 Err(e) => {
2490 eprintln!("djogi migrations repair snapshot-rebuild: {e}");
2491 repair_error_exit_code(&e)
2492 }
2493 }
2494}
2495
2496pub fn baseline_cmd(
2514 version: &str,
2515 description: &str,
2516 reason: &str,
2517 app: Option<&str>,
2518 database: Option<&str>,
2519 workspace: Option<PathBuf>,
2520) -> ExitCode {
2521 if reason.trim().is_empty() {
2526 eprintln!(
2527 "djogi migrations baseline: --reason must not be empty; \
2528 supply a non-empty reason why this baseline is being established \
2529 (e.g. 'schema pre-exists from prior tooling'). \
2530 This is recorded in the ledger audit trail."
2531 );
2532 return ExitCode::from(2);
2533 }
2534
2535 let workspace = resolve_workspace(workspace);
2536 let runtime = match tokio::runtime::Builder::new_current_thread()
2537 .enable_all()
2538 .build()
2539 {
2540 Ok(r) => r,
2541 Err(e) => {
2542 eprintln!("djogi migrations baseline: tokio runtime: {e}");
2543 return ExitCode::from(1);
2544 }
2545 };
2546 let exit = runtime.block_on(async {
2547 run_baseline(&workspace, version, description, reason, app, database).await
2548 });
2549 ExitCode::from(exit as u8)
2550}
2551
2552async fn run_baseline(
2563 workspace: &Path,
2564 version: &str,
2565 description: &str,
2566 reason: &str,
2567 app: Option<&str>,
2568 database: Option<&str>,
2569) -> i32 {
2570 use djogi::config::DjogiConfig;
2571
2572 let config = match DjogiConfig::load_from_workspace(workspace) {
2573 Ok(c) => c,
2574 Err(e) => {
2575 eprintln!("djogi migrations baseline: config load: {e}");
2576 return 1;
2577 }
2578 };
2579
2580 let db_name = resolve_database(database, &config);
2585 let url = match resolve_bucket_url(&config.database, &db_name) {
2586 Some(u) => u,
2587 None => {
2588 eprintln!("djogi migrations baseline: cannot derive a database URL for `{db_name}`");
2589 return 2;
2590 }
2591 };
2592
2593 let mut ctx = match connect_and_check(&url).await {
2594 ContextOutcome::Ready(ctx) => ctx,
2595 ContextOutcome::UnsupportedVersion(e) => {
2596 crate::print_support_boundary_error("migrations baseline", &e);
2597 return 2;
2598 }
2599 ContextOutcome::RuntimeError(msg) => {
2600 eprintln!("djogi migrations baseline: pool: {msg}");
2601 return 1;
2602 }
2603 };
2604
2605 let lock_path = workspace.join(LOCK_FILE_NAME);
2606 let guard = match acquire_workspace_lock(&lock_path, GUARD_DEFAULT_TIMEOUT) {
2607 Ok(g) => g,
2608 Err(e) => {
2609 eprintln!("djogi migrations baseline: workspace lock: {e}");
2610 return 1;
2611 }
2612 };
2613
2614 let app_label = app.unwrap_or("");
2615 let bucket = BucketKey {
2616 database: db_name,
2617 app: app_label.to_string(),
2618 };
2619
2620 let runner_ctx = RunnerCtx {
2621 bucket: bucket.clone(),
2622 version: version.to_string(),
2623 description: description.to_string(),
2624 checksum_up: String::new(),
2627 checksum_down: None,
2628 snapshot: None,
2632 snapshot_path: Some(reconstruct_snapshot_path(workspace, &bucket)),
2633 config: djogi::config::MigrateConfig {
2636 concurrent_warn_relpages: config.migrate.concurrent_warn_relpages,
2637 strict_concurrent_warnings: config.migrate.strict_concurrent_warnings,
2638 pk_flip_long_tx_threshold_secs: config.migrate.pk_flip_long_tx_threshold_secs,
2639 pk_flip_join_table_option: config.migrate.pk_flip_join_table_option,
2640 },
2641 out_of_order_policy: djogi::migrate::OutOfOrderPolicy::default_for_config(&config),
2642 audit_pool: match djogi::migrate::resolve_audit_url(&config) {
2643 Ok(url) => djogi::migrate::build_audit_pool(&url).await.ok(),
2644 Err(_) => None,
2645 },
2646 };
2647
2648 match baseline_plan(&mut ctx, &bucket, &runner_ctx, &guard, reason).await {
2649 Ok(report) => {
2650 println!(
2651 "djogi migrations baseline: established baseline `{}` \
2652 (ledger_id={}) in {:.1}s",
2653 version,
2654 report.ledger_id,
2655 report.execution_time_ms as f64 / 1000.0
2656 );
2657 0
2658 }
2659 Err(e) => {
2660 eprintln!("djogi migrations baseline: {e}");
2661 baseline_error_exit_code(&e)
2662 }
2663 }
2664}
2665
2666fn baseline_error_exit_code(err: &RunnerError) -> i32 {
2684 match err {
2685 RunnerError::VersionAlreadyApplied { .. }
2705 | RunnerError::VersionCollisionNonTerminal { .. }
2706 | RunnerError::BaselineSnapshotShouldNotBeProvided
2707 | RunnerError::AdvisoryUnlockReturnedFalse { .. }
2708 | RunnerError::SnapshotPersistFailed { .. }
2709 | RunnerError::OutOfOrderRejected { .. } => 2,
2710 _ => 1,
2715 }
2716}
2717
2718#[cfg(test)]
2719mod tests {
2720 use super::*;
2721 use std::fs;
2722 use std::sync::atomic::{AtomicUsize, Ordering};
2723
2724 fn temp_workspace(tag: &str) -> std::path::PathBuf {
2725 static COUNTER: AtomicUsize = AtomicUsize::new(0);
2726 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2727 let nanos = std::time::SystemTime::now()
2728 .duration_since(std::time::UNIX_EPOCH)
2729 .unwrap()
2730 .as_nanos();
2731 let p = std::env::temp_dir().join(format!("djogi-cli-{tag}-{nanos}-{n}"));
2732 fs::create_dir_all(&p).unwrap();
2733 p
2734 }
2735
2736 #[test]
2740 fn b1_discover_snapshot_buckets_picks_up_renamed_from_app() {
2741 let work = temp_workspace("b1_discover");
2742 let billing_dir = work.join("migrations/main/billing");
2747 fs::create_dir_all(&billing_dir).unwrap();
2748 fs::write(billing_dir.join("schema_snapshot.json"), "{}").unwrap();
2749 let global_dir = work.join("migrations/main/_global_");
2752 fs::create_dir_all(&global_dir).unwrap();
2753 fs::write(global_dir.join("schema_snapshot.json"), "{}").unwrap();
2754 let no_snap_dir = work.join("migrations/main/empty_app");
2758 fs::create_dir_all(&no_snap_dir).unwrap();
2759
2760 let buckets = discover_snapshot_buckets_on_disk(&work);
2761 let labels: std::collections::BTreeSet<&str> =
2762 buckets.iter().map(|b| b.app.as_str()).collect();
2763 assert!(
2764 labels.contains("billing"),
2765 "must include the renamed-from bucket: {labels:?}"
2766 );
2767 assert!(
2768 labels.contains(""),
2769 "must include the global bucket: {labels:?}"
2770 );
2771 assert!(
2772 !labels.contains("empty_app"),
2773 "must not include directories without a snapshot: {labels:?}"
2774 );
2775 let _ = fs::remove_dir_all(&work);
2776 }
2777
2778 #[test]
2783 fn a1_load_from_workspace_reads_path_specific_djogi_toml() {
2784 let work = temp_workspace("a1_workspace_config");
2785 let toml = "[database]\nurl = \"postgres://discovered-by-workspace-flag/test\"\n\
2786 max_connections = 1\ndev_mode = false\n\
2787 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
2788 fs::write(work.join("Djogi.toml"), toml).unwrap();
2789 let prior = std::env::var("DATABASE_URL").ok();
2792 unsafe { std::env::remove_var("DATABASE_URL") };
2796 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
2797 assert_eq!(
2798 config.database.url,
2799 "postgres://discovered-by-workspace-flag/test"
2800 );
2801 assert_eq!(config.server.port, 1234);
2802 if let Some(v) = prior {
2803 unsafe { std::env::set_var("DATABASE_URL", v) };
2804 }
2805 let _ = fs::remove_dir_all(&work);
2806 }
2807
2808 #[test]
2813 fn a1_round2_env_override_beats_workspace_toml() {
2814 let work = temp_workspace("a1r2_env_override");
2815 let toml = "[database]\nurl = \"postgres://from-toml/test\"\n\
2816 max_connections = 1\ndev_mode = false\n\
2817 [server]\nhost = \"127.0.0.1\"\nport = 1234\n";
2818 fs::write(work.join("Djogi.toml"), toml).unwrap();
2819 let prior = std::env::var("DATABASE_URL").ok();
2820 unsafe { std::env::set_var("DATABASE_URL", "postgres://from-env/test") };
2822 let config = djogi::config::DjogiConfig::load_from_workspace(&work).expect("load");
2823 assert_eq!(
2824 config.database.url, "postgres://from-env/test",
2825 "env DATABASE_URL must win over workspace Djogi.toml"
2826 );
2827 match prior {
2828 Some(v) => unsafe { std::env::set_var("DATABASE_URL", v) },
2829 None => unsafe { std::env::remove_var("DATABASE_URL") },
2830 }
2831 let _ = fs::remove_dir_all(&work);
2832 }
2833
2834 #[test]
2846 fn b1_round2_compose_consumes_discovered_orphan_snapshot() {
2847 use djogi::migrate::projection::BucketKey;
2848 use djogi::migrate::schema::{
2849 ColumnSchema, PkKindSchema, PrimaryKeySchema, SNAPSHOT_FORMAT_VERSION, TableSchema,
2850 };
2851 use djogi::migrate::{AppliedSchema, save_snapshot, snapshot_path};
2852 use std::collections::BTreeMap;
2853
2854 let work = temp_workspace("b1r2_compose_uses_discovery");
2855
2856 let billing_bucket = BucketKey {
2859 database: "main".into(),
2860 app: "billing".into(),
2861 };
2862 let mut billing_snap = AppliedSchema {
2863 djogi_version: env!("CARGO_PKG_VERSION").to_string(),
2864 enums: BTreeMap::new(),
2865 format_version: SNAPSHOT_FORMAT_VERSION.to_string(),
2866 generated_at: "2026-04-25T00:00:00Z".to_string(),
2867 indexes: Vec::new(),
2868 models: BTreeMap::new(),
2869 registered_apps: vec!["billing".to_string()],
2870 };
2871 billing_snap.models.insert(
2872 "widgets".to_string(),
2873 TableSchema {
2874 app: Some("billing".to_string()),
2875 columns: vec![ColumnSchema {
2876 check: None,
2877 comment: None,
2878 default_sql: Some("heerid_next_desc()".to_string()),
2879 foreign_key: None,
2880 generated: None,
2881 identity: None,
2882 index_type: None,
2883 indexed: false,
2884 max_length: None,
2885 name: "id".to_string(),
2886 nullable: false,
2887 on_delete: None,
2888 outbox_exclude: false,
2889 rationale: None,
2890 relation_kind: None,
2891 renamed_from: None,
2892 sequence_within: None,
2893 sql_type: "BIGINT".to_string(),
2894 unique: false,
2895 type_change_using: None,
2896 }],
2897 exclusion_constraints: Vec::new(),
2898 fts: None,
2899 is_through: false,
2900 moved_from_app: None,
2901 partition: None,
2902 primary_key: PrimaryKeySchema {
2903 columns: vec!["id".to_string()],
2904 kind: PkKindSchema::HeerIdRecencyBiased,
2905 },
2906 rationale: None,
2907 renamed_from: None,
2908 rls_enabled: false,
2909 table: "widgets".to_string(),
2910 table_comment: None,
2911 storage_params: None,
2912 tablespace: None,
2913 tenant_key: None,
2914 },
2915 );
2916 let snap_path = snapshot_path(&work, &billing_bucket);
2917 save_snapshot(&billing_snap, &snap_path).expect("write snapshot");
2918
2919 let empty_models: BTreeMap<BucketKey, AppliedSchema> = BTreeMap::new();
2923 let now = time::OffsetDateTime::from_unix_timestamp(1_745_549_523).unwrap();
2924
2925 let exit = compose_with_inputs(
2926 &work,
2927 "drop billing remnant",
2928 true, false, &empty_models,
2931 &[AppLifecycle {
2932 label: "billing".to_string(),
2933 database: "main".to_string(),
2934 renamed_from: None,
2935 tombstone: true, }],
2937 now,
2938 None, );
2940 assert_eq!(exit, ExitCode::from(0), "compose must succeed");
2941
2942 let billing_dir = djogi::migrate::bucket_dir(&work, &billing_bucket);
2945 let mut up_path: Option<PathBuf> = None;
2946 for entry in fs::read_dir(&billing_dir).unwrap().flatten() {
2947 let n = entry.file_name().to_string_lossy().to_string();
2948 if n.starts_with('V') && n.ends_with(".sdjql") && !n.contains(".down.") {
2951 up_path = Some(entry.path());
2952 break;
2953 }
2954 }
2955 let up_path = up_path.expect("compose must have written an up SQL file");
2956 let up_sql = fs::read_to_string(&up_path).unwrap();
2957 assert!(
2958 up_sql.contains("DROP TABLE \"widgets\""),
2959 "compose must have seen the disk snapshot and emitted DROP TABLE — \
2960 this proves discover_snapshot_buckets_on_disk reached the differ. \
2961 SQL: {up_sql}"
2962 );
2963 let _ = fs::remove_dir_all(&work);
2964 }
2965
2966 #[test]
2974 fn a1_round2_status_cmd_threads_workspace_to_config() {
2975 let work = temp_workspace("a1r2_status_workspace");
2976 fs::write(work.join("Djogi.toml"), "this is = not = valid toml ===").unwrap();
2981 let exit = status_cmd(Some(work.clone()));
2982 assert_eq!(
2983 exit,
2984 ExitCode::from(1),
2985 "malformed workspace Djogi.toml must surface as config load error"
2986 );
2987 let _ = fs::remove_dir_all(&work);
2988 }
2989
2990 #[test]
2998 fn u3_attune_refusal_variants_map_to_exit_code_two() {
2999 use djogi::migrate::AttuneRefusal;
3000 let cases = [
3001 AttuneError::Refused(AttuneRefusal::SquashNotLocalhost {
3002 database_url: "postgres://prod.example.com/main".to_string(),
3003 }),
3004 AttuneError::Refused(AttuneRefusal::SquashNotDevProfile {
3005 profile: "production".to_string(),
3006 }),
3007 AttuneError::Refused(AttuneRefusal::SquashDevModeOff),
3010 AttuneError::Refused(AttuneRefusal::SquashEnvIsProduction {
3011 env_value: "production".to_string(),
3012 }),
3013 AttuneError::Refused(AttuneRefusal::SquashFromVersionNotFound {
3014 version: "V20260101000000__missing".to_string(),
3015 }),
3016 AttuneError::Refused(AttuneRefusal::SquashFromVersionAmbiguous {
3017 version: "V20260101000000__shared".to_string(),
3018 buckets: vec!["main/users".to_string(), "main/billing".to_string()],
3019 }),
3020 ];
3021 for err in &cases {
3022 assert_eq!(
3023 attune_error_exit_code(err),
3024 2,
3025 "refusal variant must map to exit 2: {err}"
3026 );
3027 }
3028 }
3029
3030 #[test]
3035 fn u3_attune_runtime_variants_map_to_exit_code_one() {
3036 let cases = [
3037 AttuneError::FilesystemScanFailed {
3038 source: std::io::Error::other("disk full"),
3039 },
3040 AttuneError::SqlReadFailed {
3041 path: PathBuf::from("/tmp/x.sdjql"),
3042 source: std::io::Error::other("permission denied"),
3043 },
3044 AttuneError::SqlWriteFailed {
3045 path: PathBuf::from("/tmp/x.sdjql"),
3046 source: std::io::Error::other("read-only fs"),
3047 },
3048 AttuneError::SqlDeleteFailed {
3049 path: PathBuf::from("/tmp/x.sdjql"),
3050 source: std::io::Error::other("not found"),
3051 },
3052 AttuneError::GitPublishFailed {
3053 stderr: "fatal: refusing to push".to_string(),
3054 status_code: Some(128),
3055 },
3056 ];
3057 for err in &cases {
3058 assert_eq!(
3059 attune_error_exit_code(err),
3060 1,
3061 "runtime variant must map to exit 1: {err}"
3062 );
3063 }
3064 }
3065
3066 #[test]
3074 fn baseline_empty_reason_exits_code_2() {
3075 let result = baseline_cmd(
3076 "V00000000000000__baseline",
3077 "description",
3078 "",
3079 None,
3080 None,
3081 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3082 );
3083 assert_eq!(
3084 result,
3085 ExitCode::from(2),
3086 "empty --reason must exit 2 before any DB work"
3087 );
3088 }
3089
3090 #[test]
3091 fn baseline_whitespace_reason_exits_code_2() {
3092 let result = baseline_cmd(
3093 "V00000000000000__baseline",
3094 "description",
3095 " ",
3096 None,
3097 None,
3098 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3099 );
3100 assert_eq!(
3101 result,
3102 ExitCode::from(2),
3103 "whitespace-only --reason must exit 2 before any DB work"
3104 );
3105 }
3106
3107 #[test]
3113 fn baseline_refusal_variants_map_to_exit_code_two() {
3114 let cases = [
3115 RunnerError::VersionAlreadyApplied {
3116 version: "V00000000000000__baseline".to_string(),
3117 applied_at: None,
3118 },
3119 RunnerError::VersionCollisionNonTerminal {
3120 version: "V00000000000000__baseline".to_string(),
3121 status: LedgerStatus::Pending,
3122 run_id: 1,
3123 },
3124 RunnerError::BaselineSnapshotShouldNotBeProvided,
3125 RunnerError::AdvisoryUnlockReturnedFalse {
3126 bucket: BucketKey {
3127 database: "main".to_string(),
3128 app: String::new(),
3129 },
3130 key: 0x0102_0304_0506_0708,
3131 },
3132 RunnerError::OutOfOrderRejected {
3133 version: "V00000000000000__baseline".to_string(),
3134 conflicting_version: "V20260101000000__later".to_string(),
3135 conflicting_applied_at: None,
3136 },
3137 ];
3138 for err in &cases {
3139 assert_eq!(
3140 baseline_error_exit_code(err),
3141 2,
3142 "baseline refusal variant must map to exit 2: {err}"
3143 );
3144 }
3145 }
3146
3147 #[test]
3153 fn baseline_transient_variants_map_to_exit_code_one() {
3154 use djogi::error::{DbError, DjogiError};
3155 let cases = [
3156 RunnerError::LedgerBootstrapFailed {
3157 source: DjogiError::Db(DbError::other("create table failed")),
3158 },
3159 RunnerError::LedgerWriteFailed {
3160 version: "V00000000000000__baseline".to_string(),
3161 source: DjogiError::Db(DbError::other("insert failed")),
3162 },
3163 RunnerError::PinnedSessionCheckoutFailed {
3164 source: DjogiError::Db(DbError::other("pool exhausted")),
3165 },
3166 RunnerError::AdvisoryLockFailed {
3167 bucket: BucketKey {
3168 database: "main".to_string(),
3169 app: String::new(),
3170 },
3171 key: 0x0102_0304_0506_0708,
3172 attempts: 3,
3173 },
3174 ];
3175 for err in &cases {
3176 assert_eq!(
3177 baseline_error_exit_code(err),
3178 1,
3179 "baseline transient variant must map to exit 1: {err}"
3180 );
3181 }
3182 }
3183
3184 #[test]
3188 fn fake_without_reason_exits_code_2() {
3189 let result = apply_cmd(
3190 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3191 true,
3192 None,
3193 );
3194 assert_eq!(
3195 result,
3196 ExitCode::from(2),
3197 "--fake without --reason must exit 2"
3198 );
3199 }
3200
3201 #[test]
3203 fn fake_with_empty_reason_exits_code_2() {
3204 let result = apply_cmd(
3205 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3206 true,
3207 Some(String::new()),
3208 );
3209 assert_eq!(
3210 result,
3211 ExitCode::from(2),
3212 "--fake with empty reason must exit 2"
3213 );
3214 }
3215
3216 #[test]
3218 fn fake_with_whitespace_reason_exits_code_2() {
3219 let result = apply_cmd(
3220 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3221 true,
3222 Some(" ".to_string()),
3223 );
3224 assert_eq!(
3225 result,
3226 ExitCode::from(2),
3227 "--fake with whitespace reason must exit 2"
3228 );
3229 }
3230
3231 #[test]
3233 fn reason_without_fake_is_accepted() {
3234 let result = apply_cmd(
3238 Some(std::path::PathBuf::from("/tmp/nonexistent_djogi_ws")),
3239 false, Some("test reason".to_string()),
3241 );
3242 assert_ne!(
3244 result,
3245 ExitCode::from(2),
3246 "--reason without --fake should not refuse"
3247 );
3248 }
3249
3250 fn render_bucket(database: &str, app: &str) -> djogi::migrate::BucketKey {
3257 djogi::migrate::BucketKey {
3258 database: database.to_string(),
3259 app: app.to_string(),
3260 }
3261 }
3262
3263 fn diag(
3265 code: &str,
3266 severity: djogi::migrate::VerifySeverity,
3267 message: &str,
3268 location: Option<&str>,
3269 ) -> djogi::migrate::VerifyDiagnostic {
3270 djogi::migrate::VerifyDiagnostic {
3271 code: code.to_string(),
3272 severity,
3273 message: message.to_string(),
3274 location: location.map(str::to_string),
3275 }
3276 }
3277
3278 #[test]
3279 fn render_verify_report_clean_output() {
3280 use djogi::migrate::VerifyReport;
3281
3282 let report = VerifyReport {
3283 diagnostics: vec![],
3284 latest_applied_version: Some("001_initial".to_string()),
3285 applied_count: 3,
3286 unfinished_count: 0,
3287 };
3288 let bucket = render_bucket("main", "");
3289
3290 let lines = render_verify_report(&report, &bucket);
3291
3292 assert!(
3293 lines.contains(&"Ledger: 3 applied, latest 001_initial".to_string()),
3294 "missing ledger line; got {lines:?}"
3295 );
3296 assert!(
3297 lines.contains(&"No drift detected. Schema is consistent.".to_string()),
3298 "missing clean line; got {lines:?}"
3299 );
3300 assert!(
3301 lines.iter().any(|l| l.contains("Result: PASSED")),
3302 "missing PASSED result; got {lines:?}"
3303 );
3304 assert!(
3305 !lines.iter().any(|l| l.contains("FAILED")),
3306 "clean report must not say FAILED; got {lines:?}"
3307 );
3308 }
3309
3310 #[test]
3311 fn render_verify_report_with_errors() {
3312 use djogi::migrate::{VerifyReport, VerifySeverity};
3313
3314 let report = VerifyReport {
3317 diagnostics: vec![
3318 diag(
3319 "D601",
3320 VerifySeverity::Error,
3321 "Snapshot table missing from live DB",
3322 Some("users"),
3323 ),
3324 diag(
3325 "D611",
3326 VerifySeverity::Warning,
3327 "Live index not present in snapshot",
3328 Some("idx_posts_created"),
3329 ),
3330 ],
3331 latest_applied_version: Some("V20260501000000__add_users".to_string()),
3332 applied_count: 2,
3333 unfinished_count: 0,
3334 };
3335 let bucket = render_bucket("main", "myapp");
3336
3337 assert!(report.has_errors());
3338 let lines = render_verify_report(&report, &bucket);
3339
3340 assert!(
3341 lines
3342 .contains(&"[ERROR] D601 (users): Snapshot table missing from live DB".to_string()),
3343 "missing D601 line; got {lines:?}"
3344 );
3345 assert!(
3346 lines.contains(
3347 &"[WARN] D611 (idx_posts_created): Live index not present in snapshot".to_string()
3348 ),
3349 "missing D611 line; got {lines:?}"
3350 );
3351 assert!(
3352 lines.iter().any(|l| l.contains("Result: FAILED")),
3353 "error report must say FAILED; got {lines:?}"
3354 );
3355 }
3356
3357 #[test]
3358 fn render_verify_report_header_shows_global_and_named_app() {
3359 use djogi::migrate::VerifyReport;
3360
3361 let report = VerifyReport {
3362 diagnostics: vec![],
3363 latest_applied_version: None,
3364 applied_count: 0,
3365 unfinished_count: 0,
3366 };
3367
3368 let global = render_verify_report(&report, &render_bucket("main", ""));
3370 assert_eq!(
3371 global.first().map(String::as_str),
3372 Some("djogi migrations verify — main/_global_"),
3373 "global bucket header; got {global:?}"
3374 );
3375
3376 let named = render_verify_report(&report, &render_bucket("crud_log", "billing"));
3378 assert_eq!(
3379 named.first().map(String::as_str),
3380 Some("djogi migrations verify — crud_log/billing"),
3381 "named bucket header; got {named:?}"
3382 );
3383 }
3384
3385 #[test]
3386 fn render_verify_report_warning_only_passes_with_warnings() {
3387 use djogi::migrate::{VerifyReport, VerifySeverity};
3388
3389 let report = VerifyReport {
3390 diagnostics: vec![diag(
3391 "D606",
3392 VerifySeverity::Warning,
3393 "type differs (advisory)",
3394 Some("users.age"),
3395 )],
3396 latest_applied_version: Some("001_initial".to_string()),
3397 applied_count: 1,
3398 unfinished_count: 0,
3399 };
3400 let lines = render_verify_report(&report, &render_bucket("main", ""));
3401
3402 assert!(
3403 lines
3404 .iter()
3405 .any(|l| l.contains("Result: PASSED with warnings")),
3406 "warning-only must PASS with warnings; got {lines:?}"
3407 );
3408 assert!(
3409 !lines.iter().any(|l| l.contains("FAILED")),
3410 "warning-only must not say FAILED; got {lines:?}"
3411 );
3412 }
3413
3414 #[test]
3415 fn render_verify_report_empty_ledger_line() {
3416 use djogi::migrate::VerifyReport;
3417
3418 let report = VerifyReport {
3419 diagnostics: vec![],
3420 latest_applied_version: None,
3421 applied_count: 0,
3422 unfinished_count: 0,
3423 };
3424 let lines = render_verify_report(&report, &render_bucket("main", ""));
3425
3426 assert!(
3427 lines.contains(&"Ledger: empty (no migrations applied yet)".to_string()),
3428 "empty ledger line; got {lines:?}"
3429 );
3430 }
3431
3432 #[test]
3433 fn render_verify_report_unfinished_ledger_line() {
3434 use djogi::migrate::VerifyReport;
3435
3436 let report = VerifyReport {
3437 diagnostics: vec![],
3438 latest_applied_version: Some("V20260501000000__add_users".to_string()),
3439 applied_count: 2,
3440 unfinished_count: 1,
3441 };
3442 let lines = render_verify_report(&report, &render_bucket("main", ""));
3443
3444 assert!(
3445 lines.contains(
3446 &"Ledger: 2 applied, 1 unfinished, latest V20260501000000__add_users".to_string()
3447 ),
3448 "unfinished ledger line; got {lines:?}"
3449 );
3450 }
3451
3452 #[test]
3453 fn render_verify_report_info_with_no_location_uses_dash() {
3454 use djogi::migrate::{VerifyReport, VerifySeverity};
3455
3456 let report = VerifyReport {
3459 diagnostics: vec![diag(
3460 "D692",
3461 VerifySeverity::Info,
3462 "enum type(s) declared; not yet checked",
3463 None,
3464 )],
3465 latest_applied_version: Some("001_initial".to_string()),
3466 applied_count: 1,
3467 unfinished_count: 0,
3468 };
3469 let lines = render_verify_report(&report, &render_bucket("main", ""));
3470
3471 assert!(
3472 lines.iter().any(|l| l.contains("(-)")),
3473 "location: None must render as (-); got {lines:?}"
3474 );
3475 assert!(
3476 lines.contains(&"Result: PASSED (1 info(s))".to_string()),
3477 "all-info summary; got {lines:?}"
3478 );
3479 }
3480
3481 fn db_config(
3484 url: &str,
3485 crud_log_url: Option<&str>,
3486 event_log_url: Option<&str>,
3487 ) -> djogi::config::DatabaseConfig {
3488 djogi::config::DatabaseConfig {
3489 url: url.to_string(),
3490 crud_log_url: crud_log_url.map(str::to_string),
3491 event_log_url: event_log_url.map(str::to_string),
3492 max_connections: None,
3493 dev_mode: false,
3494 }
3495 }
3496
3497 #[test]
3498 fn resolve_bucket_url_main_uses_app_url_verbatim() {
3499 let cfg = db_config("postgres://user:pass@localhost:5432/myapp_prod", None, None);
3503 assert_eq!(
3504 resolve_bucket_url(&cfg, "main").as_deref(),
3505 Some("postgres://user:pass@localhost:5432/myapp_prod"),
3506 "main must return the app URL unchanged"
3507 );
3508 }
3509
3510 #[test]
3511 fn resolve_bucket_url_crud_log_prefers_explicit_url() {
3512 let cfg = db_config(
3513 "postgres://localhost/main",
3514 Some("postgres://localhost/explicit_crud"),
3515 None,
3516 );
3517 assert_eq!(
3518 resolve_bucket_url(&cfg, "crud_log").as_deref(),
3519 Some("postgres://localhost/explicit_crud"),
3520 "crud_log must prefer the explicit crud_log_url"
3521 );
3522 }
3523
3524 #[test]
3525 fn resolve_bucket_url_event_log_prefers_explicit_url() {
3526 let cfg = db_config(
3527 "postgres://localhost/main",
3528 None,
3529 Some("postgres://localhost/explicit_event"),
3530 );
3531 assert_eq!(
3532 resolve_bucket_url(&cfg, "event_log").as_deref(),
3533 Some("postgres://localhost/explicit_event"),
3534 "event_log must prefer the explicit event_log_url"
3535 );
3536 }
3537
3538 #[test]
3539 fn resolve_bucket_url_empty_explicit_log_url_falls_back_to_derived() {
3540 let cfg = db_config("postgres://localhost/main", Some(""), Some(" "));
3543 assert_eq!(
3545 resolve_bucket_url(&cfg, "crud_log").as_deref(),
3546 Some("postgres://localhost/crud_log"),
3547 "empty crud_log_url must fall back to derived"
3548 );
3549 assert_eq!(
3552 resolve_bucket_url(&cfg, "event_log").as_deref(),
3553 Some(" "),
3554 "non-empty (whitespace) event_log_url is used verbatim"
3555 );
3556 }
3557
3558 #[test]
3559 fn resolve_bucket_url_other_database_derives_from_app_url() {
3560 let cfg = db_config("postgres://user:pass@localhost:5432/main", None, None);
3561 assert_eq!(
3562 resolve_bucket_url(&cfg, "analytics").as_deref(),
3563 Some("postgres://user:pass@localhost:5432/analytics"),
3564 "an arbitrary database name derives by path splice"
3565 );
3566 }
3567
3568 #[test]
3569 fn resolve_bucket_url_pathless_url_returns_none() {
3570 let cfg = db_config("postgres://localhost", None, None);
3572 assert_eq!(
3573 resolve_bucket_url(&cfg, "crud_log"),
3574 None,
3575 "pathless URL must yield None for a derived database"
3576 );
3577 }
3578
3579 #[test]
3580 fn resolve_bucket_url_pathless_url_still_returns_main_verbatim() {
3581 let cfg = db_config("postgres://localhost", None, None);
3584 assert_eq!(
3585 resolve_bucket_url(&cfg, "main").as_deref(),
3586 Some("postgres://localhost"),
3587 "main returns the app URL verbatim regardless of path"
3588 );
3589 }
3590}