1#![allow(clippy::disallowed_methods)]
18
19use forge_core::error::{ForgeError, Result};
20use sqlx::{PgPool, Postgres};
21use std::collections::HashMap;
22use std::path::Path;
23use std::time::{Duration, Instant};
24use tracing::{debug, info, warn};
25
26use super::builtin::extract_version;
27
28const MIGRATION_LOCK_ID: i64 = 0x464F524745;
30
31const BOOTSTRAP_SQL: &str = include_str!("../../../migrations/system/v000_bootstrap.sql");
34
35#[derive(Debug, Clone)]
39pub struct MigrationConfig {
40 pub lock_acquire_timeout: Duration,
42 pub lock_poll_interval: Duration,
44 pub lock_warn_interval: Duration,
46}
47
48impl Default for MigrationConfig {
49 fn default() -> Self {
50 Self {
51 lock_acquire_timeout: Duration::from_secs(300),
52 lock_poll_interval: Duration::from_secs(2),
53 lock_warn_interval: Duration::from_secs(30),
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct Migration {
61 pub version: String,
64 pub up_sql: String,
66 pub transactional: bool,
71}
72
73impl Migration {
74 pub fn new(version: impl Into<String>, sql: impl Into<String>) -> Self {
76 Self {
77 version: version.into(),
78 up_sql: sql.into(),
79 transactional: true,
80 }
81 }
82
83 pub fn parse(version: impl Into<String>, content: &str) -> Self {
92 let mut transactional = true;
93 for line in content.lines().take(20) {
94 let line = line.trim();
95 if line.is_empty() {
96 continue;
97 }
98 if !line.starts_with("--") {
99 break;
100 }
101 let body = line.trim_start_matches("--").trim();
102 let Some(directive) = body.strip_prefix('@') else {
103 continue;
104 };
105 if let Some(rest) = directive.strip_prefix("transactional") {
106 let val = rest
107 .trim()
108 .trim_start_matches('=')
109 .trim()
110 .to_ascii_lowercase();
111 transactional = !matches!(val.as_str(), "false" | "no" | "0");
112 }
113 }
114
115 let up_sql = content
116 .replace("-- @up", "")
117 .replace("--@up", "")
118 .replace("-- @UP", "")
119 .replace("--@UP", "")
120 .trim()
121 .to_string();
122 Self {
123 version: version.into(),
124 up_sql,
125 transactional,
126 }
127 }
128}
129
130pub struct MigrationRunner {
132 pool: PgPool,
133 config: MigrationConfig,
134}
135
136impl MigrationRunner {
137 pub fn new(pool: PgPool) -> Self {
138 Self::with_config(pool, MigrationConfig::default())
139 }
140
141 pub fn with_config(pool: PgPool, config: MigrationConfig) -> Self {
142 Self { pool, config }
143 }
144
145 pub async fn run(&self, user_migrations: Vec<Migration>) -> Result<()> {
147 let mut lock_conn = self.acquire_lock_connection().await?;
148
149 let result = self.run_migrations_inner(user_migrations).await;
150
151 if let Err(e) = self.release_lock_connection(&mut lock_conn).await {
152 warn!("Failed to release migration lock: {}", e);
153 }
154
155 result
156 }
157
158 async fn run_migrations_inner(&self, user_migrations: Vec<Migration>) -> Result<()> {
159 self.bootstrap_tracking_table().await?;
160
161 let applied = self.applied_versions().await?;
162 debug!(
163 "Already applied migrations: {:?}",
164 applied.keys().collect::<Vec<_>>()
165 );
166
167 let max_applied_version = self.get_max_system_version(&applied);
168 debug!("Max applied system version: {:?}", max_applied_version);
169
170 let system_migrations = super::builtin::get_system_migrations();
171
172 let max_known_version = system_migrations.iter().map(|m| m.version).max();
173 if let (Some(applied_max), Some(known_max)) = (max_applied_version, max_known_version)
174 && applied_max > known_max
175 {
176 return Err(ForgeError::internal(format!(
177 "Database is at system migration v{applied_max} but this binary only knows up to v{known_max}. \
178 Refusing to start — running an older binary on a newer schema risks data loss. \
179 Upgrade the binary or restore the database to a compatible version."
180 )));
181 }
182
183 let known_user_versions: std::collections::HashSet<&str> =
184 user_migrations.iter().map(|m| m.version.as_str()).collect();
185 let mut unknown_applied: Vec<&str> = applied
186 .keys()
187 .filter(|v| {
188 !super::builtin::is_system_migration(v) && !known_user_versions.contains(v.as_str())
189 })
190 .map(|v| v.as_str())
191 .collect();
192 if !unknown_applied.is_empty() {
193 unknown_applied.sort_unstable();
194 return Err(ForgeError::internal(format!(
195 "Database has {} user migration(s) this binary does not know about: [{}]. \
196 Refusing to start — the database schema is ahead of this binary. \
197 Deploy the latest binary version.",
198 unknown_applied.len(),
199 unknown_applied.join(", "),
200 )));
201 }
202
203 let mut new_migrations_applied = false;
204
205 for sys_migration in system_migrations {
206 let migration = sys_migration.to_migration();
207 if let Some(recorded) = applied.get(&migration.version) {
208 verify_checksum(&migration, recorded)?;
209 debug!(
210 "Skipping system migration {} (already applied, checksum verified)",
211 migration.version
212 );
213 continue;
214 }
215 info!(
216 "Applying system migration: {} ({})",
217 migration.version, sys_migration.description
218 );
219 self.apply_migration(&migration).await?;
220 new_migrations_applied = true;
221 }
222
223 for migration in user_migrations {
224 if let Some(recorded) = applied.get(&migration.version) {
225 verify_checksum(&migration, recorded)?;
226 debug!(
227 "Skipping user migration {} (already applied, checksum verified)",
228 migration.version
229 );
230 continue;
231 }
232 self.apply_migration(&migration).await?;
233 new_migrations_applied = true;
234 }
235
236 if new_migrations_applied {
237 self.notify_schema_changed().await;
238 }
239
240 Ok(())
241 }
242
243 async fn notify_schema_changed(&self) {
251 match sqlx::query("SELECT pg_notify('forge_schema_changed', 'migrations_applied')")
252 .execute(&self.pool)
253 .await
254 {
255 Ok(_) => debug!("Schema change notification sent"),
256 Err(e) => warn!(error = %e, "Failed to send schema change notification (non-fatal)"),
257 }
258 }
259
260 fn get_max_system_version(&self, applied: &HashMap<String, String>) -> Option<u32> {
261 applied.keys().filter_map(|v| extract_version(v)).max()
262 }
263
264 async fn acquire_lock_connection(&self) -> Result<sqlx::pool::PoolConnection<Postgres>> {
265 debug!(
266 timeout_secs = self.config.lock_acquire_timeout.as_secs(),
267 "Acquiring migration lock..."
268 );
269 let mut conn = self
270 .pool
271 .acquire()
272 .await
273 .map_err(|e| ForgeError::internal_with("Failed to acquire lock connection", e))?;
274
275 let classid = (MIGRATION_LOCK_ID >> 32) as i32;
276 let objid = (MIGRATION_LOCK_ID & 0xFFFF_FFFF) as i32;
277
278 let deadline = Instant::now() + self.config.lock_acquire_timeout;
279 let mut last_warn = Instant::now()
281 .checked_sub(self.config.lock_warn_interval)
282 .unwrap_or_else(Instant::now);
283
284 loop {
285 let acquired = sqlx::query_scalar!(
286 r#"SELECT pg_try_advisory_lock($1) AS "acquired!""#,
287 MIGRATION_LOCK_ID
288 )
289 .fetch_one(&mut *conn)
290 .await
291 .map_err(|e| ForgeError::internal_with("Failed to attempt migration lock", e))?;
292
293 if acquired {
294 debug!("Migration lock acquired");
295 return Ok(conn);
296 }
297
298 let now = Instant::now();
299 if now >= deadline {
300 let holder = lookup_lock_holder(&mut conn, classid, objid).await;
301 return Err(ForgeError::internal(format!(
302 "Timed out after {:?} waiting for migration lock (holder pid: {:?}). \
303 Another node is likely running migrations or stalled holding the lock.",
304 self.config.lock_acquire_timeout, holder
305 )));
306 }
307
308 if now.duration_since(last_warn) >= self.config.lock_warn_interval {
309 let holder = lookup_lock_holder(&mut conn, classid, objid).await;
310 warn!(
311 holder_pid = ?holder,
312 "Still waiting for migration lock — another node is holding it"
313 );
314 last_warn = now;
315 }
316
317 tokio::time::sleep(self.config.lock_poll_interval).await;
318 }
319 }
320
321 async fn release_lock_connection(
322 &self,
323 conn: &mut sqlx::pool::PoolConnection<Postgres>,
324 ) -> Result<()> {
325 sqlx::query_scalar!("SELECT pg_advisory_unlock($1)", MIGRATION_LOCK_ID)
326 .fetch_one(&mut **conn)
327 .await
328 .map_err(|e| ForgeError::internal_with("Failed to release migration lock", e))?;
329 debug!("Migration lock released");
330 Ok(())
331 }
332
333 async fn bootstrap_tracking_table(&self) -> Result<()> {
334 let mut conn =
335 self.pool.acquire().await.map_err(|e| {
336 ForgeError::internal_with("Failed to acquire bootstrap connection", e)
337 })?;
338 for statement in split_sql_statements(BOOTSTRAP_SQL) {
339 let stmt = statement.trim();
340 if is_empty_or_comment_only(stmt) {
341 continue;
342 }
343 sqlx::query(stmt)
344 .execute(&mut *conn)
345 .await
346 .map_err(|e| ForgeError::internal_with("Bootstrap failed", e))?;
347 }
348 Ok(())
349 }
350
351 async fn applied_versions(&self) -> Result<HashMap<String, String>> {
352 let rows = sqlx::query!("SELECT version, checksum FROM forge_system_migrations")
353 .fetch_all(&self.pool)
354 .await
355 .map_err(|e| ForgeError::internal_with("Failed to get applied migrations", e))?;
356
357 Ok(rows
358 .into_iter()
359 .map(|row| (row.version, row.checksum))
360 .collect())
361 }
362
363 async fn apply_migration(&self, migration: &Migration) -> Result<()> {
364 if migration.transactional {
365 self.apply_transactional(migration).await
366 } else {
367 self.apply_non_transactional(migration).await
368 }
369 }
370
371 async fn apply_transactional(&self, migration: &Migration) -> Result<()> {
372 info!("Applying migration: {}", migration.version);
373 let start = Instant::now();
374
375 let mut tx = self.pool.begin().await.map_err(|e| {
376 ForgeError::internal_with(
377 format!(
378 "Failed to begin migration transaction for '{}'",
379 migration.version
380 ),
381 e,
382 )
383 })?;
384
385 sqlx::query("SET LOCAL lock_timeout = '5s'")
387 .execute(&mut *tx)
388 .await
389 .map_err(|e| ForgeError::internal_with("Failed to set lock_timeout", e))?;
390 sqlx::query("SET LOCAL statement_timeout = '5min'")
391 .execute(&mut *tx)
392 .await
393 .map_err(|e| ForgeError::internal_with("Failed to set statement_timeout", e))?;
394
395 for statement in split_sql_statements(&migration.up_sql) {
396 let statement = statement.trim();
397 if is_empty_or_comment_only(statement) {
398 continue;
399 }
400
401 sqlx::query(statement)
402 .execute(&mut *tx)
403 .await
404 .map_err(|e| {
405 ForgeError::internal_with(
406 format!("Failed to apply migration '{}'", migration.version),
407 e,
408 )
409 })?;
410 }
411
412 let checksum = crate::stable_hash::sha256_hex(migration.up_sql.as_bytes());
413 sqlx::query!(
414 "INSERT INTO forge_system_migrations (version, checksum) VALUES ($1, $2)",
415 migration.version,
416 checksum,
417 )
418 .execute(&mut *tx)
419 .await
420 .map_err(|e| {
421 ForgeError::internal_with(
422 format!("Failed to record migration '{}'", migration.version),
423 e,
424 )
425 })?;
426
427 tx.commit().await.map_err(|e| {
428 ForgeError::internal_with(
429 format!("Failed to commit migration '{}'", migration.version),
430 e,
431 )
432 })?;
433
434 info!(
435 "Migration applied: {} ({:?})",
436 migration.version,
437 start.elapsed()
438 );
439 Ok(())
440 }
441
442 async fn apply_non_transactional(&self, migration: &Migration) -> Result<()> {
457 info!(
458 "Applying non-transactional migration: {}",
459 migration.version
460 );
461 let start = Instant::now();
462
463 let mut conn = self.pool.acquire().await.map_err(|e| {
464 ForgeError::internal_with(
465 format!(
466 "Failed to acquire connection for migration '{}'",
467 migration.version
468 ),
469 e,
470 )
471 })?;
472
473 sqlx::query("SET lock_timeout = '5s'")
477 .execute(&mut *conn)
478 .await
479 .map_err(|e| ForgeError::internal_with("Failed to set lock_timeout", e))?;
480 sqlx::query("SET statement_timeout = '30min'")
481 .execute(&mut *conn)
482 .await
483 .map_err(|e| ForgeError::internal_with("Failed to set statement_timeout", e))?;
484
485 let exec_result: Result<()> = async {
486 for statement in split_sql_statements(&migration.up_sql) {
487 let statement = statement.trim();
488 if is_empty_or_comment_only(statement) {
489 continue;
490 }
491 sqlx::query(statement)
492 .execute(&mut *conn)
493 .await
494 .map_err(|e| {
495 ForgeError::internal_with(
496 format!("Failed to apply migration '{}'", migration.version),
497 e,
498 )
499 })?;
500 }
501 Ok(())
502 }
503 .await;
504
505 if let Err(e) = sqlx::query("RESET lock_timeout").execute(&mut *conn).await {
508 warn!(error = %e, "Failed to RESET lock_timeout after non-tx migration");
509 }
510 if let Err(e) = sqlx::query("RESET statement_timeout")
511 .execute(&mut *conn)
512 .await
513 {
514 warn!(error = %e, "Failed to RESET statement_timeout after non-tx migration");
515 }
516 drop(conn);
517
518 exec_result?;
519
520 let checksum = crate::stable_hash::sha256_hex(migration.up_sql.as_bytes());
521 sqlx::query!(
522 "INSERT INTO forge_system_migrations (version, checksum) VALUES ($1, $2)",
523 migration.version,
524 checksum,
525 )
526 .execute(&self.pool)
527 .await
528 .map_err(|e| {
529 ForgeError::internal_with(
530 format!("Failed to record migration '{}'", migration.version),
531 e,
532 )
533 })?;
534
535 info!(
536 "Non-transactional migration applied: {} ({:?})",
537 migration.version,
538 start.elapsed()
539 );
540 Ok(())
541 }
542
543 pub async fn status(&self, available: &[Migration]) -> Result<MigrationStatus> {
552 self.bootstrap_tracking_table().await?;
553
554 let applied = self.applied_versions().await?;
555
556 let available_by_version: HashMap<&str, &Migration> =
557 available.iter().map(|m| (m.version.as_str(), m)).collect();
558
559 let applied_list: Vec<AppliedMigration> = sqlx::query!(
560 "SELECT version, applied_at, checksum FROM forge_system_migrations ORDER BY applied_at ASC"
561 )
562 .fetch_all(&self.pool)
563 .await
564 .map_err(|e| ForgeError::internal_with("Failed to get migrations", e))?
565 .into_iter()
566 .map(|row| {
567 let drift = match available_by_version.get(row.version.as_str()) {
568 None => DriftStatus::SourceMissing,
569 Some(m) => {
570 let computed = crate::stable_hash::sha256_hex(m.up_sql.as_bytes());
571 if computed == row.checksum {
572 DriftStatus::Unchanged
573 } else {
574 DriftStatus::Drifted {
575 current_checksum: computed,
576 }
577 }
578 }
579 };
580 AppliedMigration {
581 version: row.version,
582 applied_at: row.applied_at,
583 checksum: row.checksum,
584 drift,
585 }
586 })
587 .collect();
588
589 let pending: Vec<String> = available
590 .iter()
591 .filter(|m| !applied.contains_key(&m.version))
592 .map(|m| m.version.clone())
593 .collect();
594
595 Ok(MigrationStatus {
596 applied: applied_list,
597 pending,
598 })
599 }
600}
601
602#[derive(Debug, Clone)]
604pub struct AppliedMigration {
605 pub version: String,
606 pub applied_at: chrono::DateTime<chrono::Utc>,
607 pub checksum: String,
608 pub drift: DriftStatus,
612}
613
614#[derive(Debug, Clone, PartialEq, Eq)]
617pub enum DriftStatus {
618 Unchanged,
620 Drifted {
623 current_checksum: String,
627 },
628 SourceMissing,
632}
633
634#[derive(Debug, Clone)]
636pub struct MigrationStatus {
637 pub applied: Vec<AppliedMigration>,
638 pub pending: Vec<String>,
639}
640
641fn verify_checksum(migration: &Migration, recorded: &str) -> Result<()> {
647 let computed = crate::stable_hash::sha256_hex(migration.up_sql.as_bytes());
648 if computed != recorded {
649 return Err(ForgeError::internal(format!(
650 "Migration '{}' has changed since it was applied. \
651 Recorded checksum: {recorded}, but current file checksum: {computed}. \
652 Migrations are immutable once applied — revert the file or create a new migration.",
653 migration.version
654 )));
655 }
656 Ok(())
657}
658
659async fn lookup_lock_holder(
664 conn: &mut sqlx::pool::PoolConnection<Postgres>,
665 classid: i32,
666 objid: i32,
667) -> Option<i32> {
668 sqlx::query_scalar!(
669 r#"SELECT pid AS "pid!" FROM pg_locks
670 WHERE locktype = 'advisory'
671 AND classid::int = $1
672 AND objid::int = $2
673 AND granted
674 LIMIT 1"#,
675 classid,
676 objid
677 )
678 .fetch_optional(&mut **conn)
679 .await
680 .ok()
681 .flatten()
682}
683
684fn is_empty_or_comment_only(stmt: &str) -> bool {
689 stmt.is_empty()
690 || stmt.lines().all(|l| {
691 let l = l.trim();
692 l.is_empty() || l.starts_with("--")
693 })
694}
695
696fn split_sql_statements(sql: &str) -> Vec<String> {
699 let mut statements = Vec::new();
700 let mut current = String::new();
701 let mut in_dollar_quote = false;
702 let mut dollar_tag = String::new();
703 let mut in_line_comment = false;
704 let mut in_block_comment = false;
705 let mut in_string_literal = false;
706 let mut chars = sql.chars().peekable();
707
708 while let Some(c) = chars.next() {
709 current.push(c);
710
711 if in_line_comment {
712 if c == '\n' {
713 in_line_comment = false;
714 }
715 continue;
716 }
717
718 if in_block_comment {
719 if c == '*' && chars.peek() == Some(&'/') {
720 current.push(chars.next().expect("peeked char"));
721 in_block_comment = false;
722 }
723 continue;
724 }
725
726 if in_string_literal {
727 if c == '\'' {
728 if chars.peek() == Some(&'\'') {
729 current.push(chars.next().expect("peeked char"));
730 } else {
731 in_string_literal = false;
732 }
733 }
734 continue;
735 }
736
737 if in_dollar_quote {
738 if c == '$' {
739 let mut potential_tag = String::from("$");
740 while let Some(&next_c) = chars.peek() {
741 if next_c == '$' {
742 potential_tag.push(chars.next().expect("peeked char"));
743 current.push('$');
744 break;
745 } else if next_c.is_alphanumeric() || next_c == '_' {
746 let ch = chars.next().expect("peeked char");
747 potential_tag.push(ch);
748 current.push(ch);
749 } else {
750 break;
751 }
752 }
753 if potential_tag.len() >= 2
754 && potential_tag.ends_with('$')
755 && potential_tag == dollar_tag
756 {
757 in_dollar_quote = false;
758 dollar_tag.clear();
759 }
760 }
761 continue;
762 }
763
764 if c == '-' && chars.peek() == Some(&'-') {
765 current.push(chars.next().expect("peeked char"));
766 in_line_comment = true;
767 continue;
768 }
769
770 if c == '/' && chars.peek() == Some(&'*') {
771 current.push(chars.next().expect("peeked char"));
772 in_block_comment = true;
773 continue;
774 }
775
776 if c == '\'' {
777 in_string_literal = true;
778 continue;
779 }
780
781 if c == '$' {
782 let mut potential_tag = String::from("$");
783 while let Some(&next_c) = chars.peek() {
784 if next_c == '$' {
785 potential_tag.push(chars.next().expect("peeked char"));
786 current.push('$');
787 break;
788 } else if next_c.is_alphanumeric() || next_c == '_' {
789 let ch = chars.next().expect("peeked char");
790 potential_tag.push(ch);
791 current.push(ch);
792 } else {
793 break;
794 }
795 }
796 if potential_tag.len() >= 2 && potential_tag.ends_with('$') {
797 in_dollar_quote = true;
798 dollar_tag = potential_tag;
799 }
800 continue;
801 }
802
803 if c == ';' {
804 let stmt = current.trim().trim_end_matches(';').trim().to_string();
805 if !stmt.is_empty() {
806 statements.push(stmt);
807 }
808 current.clear();
809 }
810 }
811
812 let stmt = current.trim().trim_end_matches(';').trim().to_string();
813 if !stmt.is_empty() {
814 statements.push(stmt);
815 }
816
817 statements
818}
819
820pub fn load_migrations_from_dir(dir: &Path) -> Result<Vec<Migration>> {
827 if !dir.exists() {
828 debug!("Migrations directory does not exist: {:?}", dir);
829 return Ok(Vec::new());
830 }
831
832 let mut migrations = Vec::new();
833 let mut prefix_width: Option<usize> = None;
834 let mut seen_versions: std::collections::HashSet<u64> = std::collections::HashSet::new();
835
836 let entries = std::fs::read_dir(dir).map_err(ForgeError::Io)?;
837
838 for entry in entries {
839 let entry = entry.map_err(ForgeError::Io)?;
840 let path = entry.path();
841
842 if path.extension().map(|e| e == "sql").unwrap_or(false) {
843 let name = path
844 .file_stem()
845 .and_then(|s| s.to_str())
846 .ok_or_else(|| ForgeError::config("Invalid migration filename"))?
847 .to_string();
848
849 let (digits, version) = parse_migration_prefix(&name)?;
850
851 match prefix_width {
852 Some(w) if w != digits.len() => {
853 return Err(ForgeError::config(format!(
854 "Inconsistent migration prefix width: {} uses {} digits but earlier migrations use {}. \
855 Pad all migration filenames to the same width (e.g. 0001_*.sql).",
856 name,
857 digits.len(),
858 w,
859 )));
860 }
861 None => prefix_width = Some(digits.len()),
862 _ => {}
863 }
864
865 if !seen_versions.insert(version) {
866 return Err(ForgeError::config(format!(
867 "Duplicate migration version {} for {}",
868 version, name
869 )));
870 }
871
872 let content = std::fs::read_to_string(&path).map_err(ForgeError::Io)?;
873
874 migrations.push((version, Migration::parse(name, &content)));
875 }
876 }
877
878 migrations.sort_by_key(|(v, _)| *v);
879
880 debug!("Loaded {} user migrations", migrations.len());
881 Ok(migrations.into_iter().map(|(_, m)| m).collect())
882}
883
884fn parse_migration_prefix(name: &str) -> Result<(&str, u64)> {
888 let digits_end = name
889 .find(|c: char| !c.is_ascii_digit())
890 .unwrap_or(name.len());
891 if digits_end == 0 {
892 return Err(ForgeError::config(format!(
893 "Migration {} is missing a numeric prefix (expected NNNN_name.sql)",
894 name
895 )));
896 }
897 let digits = name.get(..digits_end).unwrap_or("");
898 let version: u64 = digits.parse().map_err(|_| {
899 ForgeError::config(format!(
900 "Migration {} has an unparseable numeric prefix",
901 name
902 ))
903 })?;
904 Ok((digits, version))
905}
906
907#[cfg(test)]
908#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
909mod tests {
910 use super::*;
911 use std::collections::HashMap;
912 use std::fs;
913 use tempfile::TempDir;
914
915 #[test]
916 fn test_load_migrations_from_empty_dir() {
917 let dir = TempDir::new().unwrap();
918 let migrations = load_migrations_from_dir(dir.path()).unwrap();
919 assert!(migrations.is_empty());
920 }
921
922 #[test]
923 fn test_load_migrations_from_nonexistent_dir() {
924 let migrations = load_migrations_from_dir(Path::new("/nonexistent/path")).unwrap();
925 assert!(migrations.is_empty());
926 }
927
928 #[test]
929 fn test_load_migrations_sorted() {
930 let dir = TempDir::new().unwrap();
931
932 fs::write(dir.path().join("0002_second.sql"), "SELECT 2;").unwrap();
933 fs::write(dir.path().join("0001_first.sql"), "SELECT 1;").unwrap();
934 fs::write(dir.path().join("0003_third.sql"), "SELECT 3;").unwrap();
935
936 let migrations = load_migrations_from_dir(dir.path()).unwrap();
937 assert_eq!(migrations.len(), 3);
938 assert_eq!(migrations[0].version, "0001_first");
939 assert_eq!(migrations[1].version, "0002_second");
940 assert_eq!(migrations[2].version, "0003_third");
941 }
942
943 #[test]
944 fn test_load_migrations_rejects_mixed_prefix_widths() {
945 let dir = TempDir::new().unwrap();
946 fs::write(dir.path().join("0001_first.sql"), "SELECT 1;").unwrap();
947 fs::write(dir.path().join("2_second.sql"), "SELECT 2;").unwrap();
948
949 let err = load_migrations_from_dir(dir.path()).unwrap_err();
950 let msg = err.to_string();
951 assert!(
952 msg.contains("Inconsistent migration prefix width") || msg.contains("digits"),
953 "unexpected error: {msg}"
954 );
955 }
956
957 #[test]
958 fn test_load_migrations_rejects_missing_prefix() {
959 let dir = TempDir::new().unwrap();
960 fs::write(dir.path().join("create_users.sql"), "SELECT 1;").unwrap();
961
962 let err = load_migrations_from_dir(dir.path()).unwrap_err();
963 assert!(err.to_string().contains("missing a numeric prefix"));
964 }
965
966 #[test]
967 fn test_load_migrations_rejects_duplicate_versions() {
968 let dir = TempDir::new().unwrap();
969 fs::write(dir.path().join("0001_a.sql"), "SELECT 1;").unwrap();
970 fs::write(dir.path().join("0001_b.sql"), "SELECT 2;").unwrap();
971
972 let err = load_migrations_from_dir(dir.path()).unwrap_err();
973 assert!(err.to_string().contains("Duplicate migration version"));
974 }
975
976 #[test]
977 fn test_load_migrations_ignores_non_sql() {
978 let dir = TempDir::new().unwrap();
979
980 fs::write(dir.path().join("0001_migration.sql"), "SELECT 1;").unwrap();
981 fs::write(dir.path().join("readme.txt"), "Not a migration").unwrap();
982 fs::write(dir.path().join("backup.sql.bak"), "Backup").unwrap();
983
984 let migrations = load_migrations_from_dir(dir.path()).unwrap();
985 assert_eq!(migrations.len(), 1);
986 assert_eq!(migrations[0].version, "0001_migration");
987 }
988
989 #[test]
990 fn test_migration_new() {
991 let m = Migration::new("test", "SELECT 1");
992 assert_eq!(m.version, "test");
993 assert_eq!(m.up_sql, "SELECT 1");
994 }
995
996 #[test]
997 fn test_migration_parse_strips_up_marker() {
998 let content = "-- @up\nCREATE TABLE users (id INT);";
999 let m = Migration::parse("0001_test", content);
1000 assert_eq!(m.version, "0001_test");
1001 assert_eq!(m.up_sql, "CREATE TABLE users (id INT);");
1002 assert!(m.transactional, "default should be transactional=true");
1003 }
1004
1005 #[test]
1006 fn test_migration_parse_no_directive_defaults_transactional() {
1007 let m = Migration::parse("0001_test", "CREATE TABLE x (id INT);");
1008 assert!(m.transactional);
1009 }
1010
1011 #[test]
1012 fn test_migration_parse_transactional_false_directive() {
1013 let content = "-- @transactional false\nCREATE INDEX CONCURRENTLY idx_x ON t(c);";
1014 let m = Migration::parse("0001_idx", content);
1015 assert!(!m.transactional);
1016 assert!(m.up_sql.contains("CREATE INDEX CONCURRENTLY"));
1017 }
1018
1019 #[test]
1020 fn test_migration_parse_transactional_no_space_directive() {
1021 let content = "--@transactional false\nCREATE INDEX CONCURRENTLY idx_x ON t(c);";
1022 let m = Migration::parse("0001_idx", content);
1023 assert!(!m.transactional);
1024 }
1025
1026 #[test]
1027 fn test_migration_parse_transactional_equals_form() {
1028 let content = "-- @transactional = false\nVACUUM ANALYZE;";
1029 let m = Migration::parse("0001_vac", content);
1030 assert!(!m.transactional);
1031 }
1032
1033 #[test]
1034 fn test_migration_parse_transactional_uppercase_value() {
1035 let content = "-- @transactional FALSE\nVACUUM;";
1036 let m = Migration::parse("0001_vac", content);
1037 assert!(!m.transactional);
1038 }
1039
1040 #[test]
1041 fn test_migration_parse_transactional_true_explicit() {
1042 let content = "-- @transactional true\nCREATE TABLE t (id INT);";
1043 let m = Migration::parse("0001_t", content);
1044 assert!(m.transactional);
1045 }
1046
1047 #[test]
1048 fn test_migration_parse_directive_only_in_leading_block() {
1049 let content = "CREATE TABLE t (id INT);\n-- @transactional false\nCREATE INDEX i ON t(id);";
1052 let m = Migration::parse("0001_t", content);
1053 assert!(m.transactional);
1054 }
1055
1056 #[test]
1057 fn test_migration_parse_requires_at_prefix() {
1058 let content = "-- transactional false (this is just prose)\nCREATE TABLE t (id INT);";
1061 let m = Migration::parse("0001_t", content);
1062 assert!(m.transactional);
1063 }
1064
1065 #[test]
1066 fn test_migration_parse_prose_only_no_directive() {
1067 let content = "-- This migration creates a transactional ledger\nCREATE TABLE t (id INT);";
1068 let m = Migration::parse("0001_t", content);
1069 assert!(m.transactional);
1070 }
1071
1072 #[test]
1073 fn test_migration_parse_directive_after_blank_lines_in_header() {
1074 let content =
1075 "\n\n-- file header\n-- @transactional false\nCREATE INDEX CONCURRENTLY i ON t(id);";
1076 let m = Migration::parse("0001_t", content);
1077 assert!(!m.transactional);
1078 }
1079
1080 #[test]
1081 fn test_migration_new_defaults_transactional() {
1082 let m = Migration::new("v", "SELECT 1");
1083 assert!(m.transactional);
1084 }
1085
1086 #[test]
1087 fn test_is_empty_or_comment_only() {
1088 assert!(is_empty_or_comment_only(""));
1089 assert!(is_empty_or_comment_only("-- just a comment"));
1090 assert!(is_empty_or_comment_only("-- one\n-- two\n "));
1091 assert!(!is_empty_or_comment_only("SELECT 1"));
1092 assert!(!is_empty_or_comment_only("-- header\nSELECT 1"));
1093 }
1094
1095 #[tokio::test]
1096 async fn test_get_max_system_version_prefers_highest_applied_version() {
1097 let pool = sqlx::postgres::PgPoolOptions::new()
1098 .max_connections(1)
1099 .connect_lazy("postgres://localhost/nonexistent")
1100 .expect("lazy pool must build");
1101 let runner = MigrationRunner::new(pool);
1102
1103 let applied = HashMap::from([
1104 ("__forge_v003".to_string(), "checksum-3".to_string()),
1105 ("__forge_v001".to_string(), "checksum-1".to_string()),
1106 ("0001_user_schema".to_string(), "checksum-u".to_string()),
1107 ]);
1108
1109 assert_eq!(runner.get_max_system_version(&applied), Some(3));
1110 }
1111
1112 #[test]
1113 fn test_verify_checksum_matches() {
1114 let m = Migration::new("0001_test", "CREATE TABLE t (id INT);");
1115 let computed = crate::stable_hash::sha256_hex(m.up_sql.as_bytes());
1116 verify_checksum(&m, &computed).expect("matching checksum should pass");
1117 }
1118
1119 #[test]
1120 fn test_verify_checksum_catches_system_migration_drift() {
1121 let migrations = super::super::builtin::get_system_migrations();
1125 let sys = migrations
1126 .first()
1127 .expect("at least one system migration is bundled");
1128 let migration = sys.to_migration();
1129 let real_checksum = crate::stable_hash::sha256_hex(migration.up_sql.as_bytes());
1130 verify_checksum(&migration, &real_checksum)
1131 .expect("matching checksum must pass for system migrations");
1132
1133 let err = verify_checksum(&migration, "stale-checksum").unwrap_err();
1134 let msg = err.to_string();
1135 assert!(
1136 msg.contains(&migration.version),
1137 "drift error must name the system migration: {msg}"
1138 );
1139 }
1140
1141 #[test]
1142 fn test_verify_checksum_mismatch_reports_versions() {
1143 let m = Migration::new("0001_test", "CREATE TABLE t (id INT);");
1144 let err = verify_checksum(&m, "deadbeef-old-checksum").unwrap_err();
1145 let msg = err.to_string();
1146 assert!(
1147 msg.contains("0001_test"),
1148 "error should name the migration: {msg}"
1149 );
1150 assert!(
1151 msg.contains("deadbeef-old-checksum"),
1152 "error should include recorded checksum: {msg}"
1153 );
1154 assert!(
1155 msg.to_lowercase().contains("changed") || msg.to_lowercase().contains("immutable"),
1156 "error should explain the drift policy: {msg}"
1157 );
1158 }
1159
1160 #[test]
1161 fn test_split_simple_statements() {
1162 let sql = "SELECT 1; SELECT 2; SELECT 3;";
1163 let stmts = super::split_sql_statements(sql);
1164 assert_eq!(stmts.len(), 3);
1165 assert_eq!(stmts[0], "SELECT 1");
1166 assert_eq!(stmts[1], "SELECT 2");
1167 assert_eq!(stmts[2], "SELECT 3");
1168 }
1169
1170 #[test]
1171 fn test_split_with_dollar_quoted_function() {
1172 let sql = r#"
1173CREATE FUNCTION test() RETURNS void AS $$
1174BEGIN
1175 SELECT 1;
1176 SELECT 2;
1177END;
1178$$ LANGUAGE plpgsql;
1179
1180SELECT 3;
1181"#;
1182 let stmts = super::split_sql_statements(sql);
1183 assert_eq!(stmts.len(), 2);
1184 assert!(stmts[0].contains("CREATE FUNCTION"));
1185 assert!(stmts[0].contains("$$ LANGUAGE plpgsql"));
1186 assert!(stmts[1].contains("SELECT 3"));
1187 }
1188
1189 #[test]
1190 fn test_split_preserves_dollar_quote_content() {
1191 let sql = r#"
1192CREATE FUNCTION notify() RETURNS trigger AS $$
1193DECLARE
1194 row_id TEXT;
1195BEGIN
1196 row_id := NEW.id::TEXT;
1197 RETURN NEW;
1198END;
1199$$ LANGUAGE plpgsql;
1200"#;
1201 let stmts = super::split_sql_statements(sql);
1202 assert_eq!(stmts.len(), 1);
1203 assert!(stmts[0].contains("row_id := NEW.id::TEXT"));
1204 }
1205}
1206
1207#[cfg(all(test, feature = "testcontainers"))]
1208#[allow(
1209 clippy::unwrap_used,
1210 clippy::indexing_slicing,
1211 clippy::panic,
1212 clippy::disallowed_methods
1213)]
1214mod integration_tests {
1215 use super::*;
1216 use forge_core::testing::{IsolatedTestDb, TestDatabase};
1217
1218 async fn setup_db(test_name: &str) -> IsolatedTestDb {
1219 let base = TestDatabase::from_env()
1220 .await
1221 .expect("Failed to create test database");
1222 base.isolated(test_name)
1223 .await
1224 .expect("Failed to create isolated db")
1225 }
1226
1227 #[tokio::test]
1230 async fn non_transactional_migration_runs_create_index_concurrently() {
1231 let db = setup_db("mig_non_tx_create_index").await;
1232 let runner = MigrationRunner::new(db.pool().clone());
1233
1234 let setup = Migration::new(
1235 "0001_setup",
1236 "CREATE TABLE items (id INT PRIMARY KEY, name TEXT);",
1237 );
1238 let concurrent = Migration::parse(
1239 "0002_index",
1240 "-- @transactional false\nCREATE INDEX CONCURRENTLY items_name_idx ON items(name);",
1241 );
1242 assert!(!concurrent.transactional);
1243
1244 runner
1245 .run(vec![setup, concurrent])
1246 .await
1247 .expect("migrations apply cleanly");
1248
1249 let exists = sqlx::query_scalar!(
1250 r#"SELECT EXISTS(
1251 SELECT 1 FROM pg_indexes
1252 WHERE schemaname='public' AND tablename='items' AND indexname='items_name_idx'
1253 ) AS "exists!""#
1254 )
1255 .fetch_one(db.pool())
1256 .await
1257 .unwrap();
1258 assert!(exists, "index should be created");
1259
1260 let recorded = sqlx::query_scalar!(
1261 r#"SELECT COUNT(*) AS "n!" FROM forge_system_migrations WHERE version='0002_index'"#
1262 )
1263 .fetch_one(db.pool())
1264 .await
1265 .unwrap();
1266 assert_eq!(recorded, 1, "non-tx migration must record bookkeeping row");
1267 }
1268
1269 #[tokio::test]
1273 async fn transactional_migration_rejects_create_index_concurrently() {
1274 let db = setup_db("mig_tx_rejects_concurrent").await;
1275 let runner = MigrationRunner::new(db.pool().clone());
1276
1277 let setup = Migration::new(
1278 "0001_setup",
1279 "CREATE TABLE items (id INT PRIMARY KEY, name TEXT);",
1280 );
1281 let concurrent = Migration::new(
1283 "0002_index",
1284 "CREATE INDEX CONCURRENTLY items_name_idx ON items(name);",
1285 );
1286 assert!(concurrent.transactional);
1287
1288 let err = runner.run(vec![setup, concurrent]).await.unwrap_err();
1289 let msg = err.to_string();
1290 assert!(
1291 msg.contains("CONCURRENTLY") || msg.to_lowercase().contains("transaction"),
1292 "expected PG to reject concurrent index in tx, got: {msg}"
1293 );
1294 }
1295
1296 #[tokio::test]
1301 async fn rerun_with_modified_sql_errors_with_checksum_drift() {
1302 let db = setup_db("mig_checksum_drift").await;
1303 let runner = MigrationRunner::new(db.pool().clone());
1304
1305 let original = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1306 runner
1307 .run(vec![original])
1308 .await
1309 .expect("first run applies cleanly");
1310
1311 let tampered = Migration::new(
1314 "0001_users",
1315 "CREATE TABLE users (id INT PRIMARY KEY, name TEXT);",
1316 );
1317 let err = runner.run(vec![tampered]).await.unwrap_err();
1318 let msg = err.to_string();
1319 assert!(
1320 msg.contains("0001_users") && msg.to_lowercase().contains("changed"),
1321 "expected drift error mentioning the migration, got: {msg}"
1322 );
1323 }
1324
1325 #[tokio::test]
1330 async fn status_surfaces_checksum_drift_on_modified_migration() {
1331 let db = setup_db("mig_status_drift").await;
1332 let runner = MigrationRunner::new(db.pool().clone());
1333
1334 let original = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1335 runner
1336 .run(vec![original])
1337 .await
1338 .expect("first run applies cleanly");
1339
1340 let tampered = Migration::new(
1341 "0001_users",
1342 "CREATE TABLE users (id INT PRIMARY KEY, email TEXT);",
1343 );
1344 let status = runner
1345 .status(std::slice::from_ref(&tampered))
1346 .await
1347 .expect("status must succeed even with drift");
1348
1349 let row = status
1350 .applied
1351 .iter()
1352 .find(|a| a.version == "0001_users")
1353 .expect("applied row must exist");
1354 let expected = crate::stable_hash::sha256_hex(tampered.up_sql.as_bytes());
1355 match &row.drift {
1356 DriftStatus::Drifted { current_checksum } => {
1357 assert_eq!(
1358 current_checksum, &expected,
1359 "current_checksum must equal the *new* on-disk checksum",
1360 );
1361 assert_ne!(
1362 current_checksum, &row.checksum,
1363 "current_checksum must differ from the recorded checksum",
1364 );
1365 }
1366 other => panic!("expected DriftStatus::Drifted, got {other:?}"),
1367 }
1368 }
1369
1370 #[tokio::test]
1375 async fn status_reports_source_missing_when_file_gone() {
1376 let db = setup_db("mig_status_missing").await;
1377 let runner = MigrationRunner::new(db.pool().clone());
1378
1379 let m = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1380 runner
1381 .run(vec![m])
1382 .await
1383 .expect("first run applies cleanly");
1384
1385 let status = runner
1387 .status(&[])
1388 .await
1389 .expect("status must succeed with missing source");
1390
1391 let row = status
1392 .applied
1393 .iter()
1394 .find(|a| a.version == "0001_users")
1395 .expect("applied row must exist");
1396 assert_eq!(row.drift, DriftStatus::SourceMissing);
1397 }
1398
1399 #[tokio::test]
1402 async fn status_reports_unchanged_when_source_matches() {
1403 let db = setup_db("mig_status_unchanged").await;
1404 let runner = MigrationRunner::new(db.pool().clone());
1405
1406 let m = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1407 runner
1408 .run(vec![m.clone()])
1409 .await
1410 .expect("first run applies cleanly");
1411
1412 let status = runner
1413 .status(std::slice::from_ref(&m))
1414 .await
1415 .expect("status must succeed for clean source");
1416
1417 let row = status
1418 .applied
1419 .iter()
1420 .find(|a| a.version == "0001_users")
1421 .expect("applied row must exist");
1422 assert_eq!(row.drift, DriftStatus::Unchanged);
1423 }
1424
1425 #[tokio::test]
1430 async fn lock_acquire_times_out_when_another_holder_present() {
1431 let db = setup_db("mig_lock_timeout").await;
1432
1433 let mut blocker = db.pool().acquire().await.unwrap();
1435 let acquired = sqlx::query_scalar!(
1436 r#"SELECT pg_try_advisory_lock($1) AS "ok!""#,
1437 MIGRATION_LOCK_ID
1438 )
1439 .fetch_one(&mut *blocker)
1440 .await
1441 .unwrap();
1442 assert!(acquired, "blocker must acquire the lock first");
1443
1444 let config = MigrationConfig {
1445 lock_acquire_timeout: Duration::from_millis(500),
1446 lock_poll_interval: Duration::from_millis(50),
1447 lock_warn_interval: Duration::from_secs(60),
1448 };
1449 let runner = MigrationRunner::with_config(db.pool().clone(), config);
1450
1451 let err = runner.run(vec![]).await.unwrap_err();
1452 let msg = err.to_string();
1453 assert!(
1454 msg.contains("Timed out") && msg.contains("migration lock"),
1455 "expected timeout error, got: {msg}"
1456 );
1457 assert!(
1458 msg.contains("holder pid"),
1459 "expected holder pid in error: {msg}"
1460 );
1461
1462 sqlx::query_scalar!("SELECT pg_advisory_unlock($1)", MIGRATION_LOCK_ID)
1464 .fetch_one(&mut *blocker)
1465 .await
1466 .unwrap();
1467 }
1468
1469 async fn db_with_system_schema(name: &str) -> IsolatedTestDb {
1472 let db = setup_db(name).await;
1473 MigrationRunner::new(db.pool().clone())
1474 .run(vec![])
1475 .await
1476 .expect("system migrations apply cleanly");
1477 db
1478 }
1479
1480 fn pg_message(err: &sqlx::Error) -> String {
1483 match err {
1484 sqlx::Error::Database(db_err) => db_err.message().to_string(),
1485 other => other.to_string(),
1486 }
1487 }
1488
1489 #[tokio::test]
1490 async fn validate_identifier_rejects_empty() {
1491 let db = db_with_system_schema("vid_empty").await;
1492 let err = sqlx::query("SELECT forge_validate_identifier('')")
1493 .execute(db.pool())
1494 .await
1495 .unwrap_err();
1496 let msg = pg_message(&err);
1497 assert!(
1498 msg.contains("empty"),
1499 "expected empty-name error, got: {msg}"
1500 );
1501 }
1502
1503 #[tokio::test]
1504 async fn validate_identifier_rejects_overlong_name() {
1505 let db = db_with_system_schema("vid_overlong").await;
1506 let name = "a".repeat(64);
1508 let err = sqlx::query(&format!("SELECT forge_validate_identifier('{name}')"))
1509 .execute(db.pool())
1510 .await
1511 .unwrap_err();
1512 let msg = pg_message(&err);
1513 assert!(
1514 msg.contains("63 bytes"),
1515 "expected 63-byte limit in error, got: {msg}",
1516 );
1517 }
1518
1519 #[tokio::test]
1520 async fn validate_identifier_rejects_pg_prefix() {
1521 let db = db_with_system_schema("vid_pgprefix").await;
1522 let err = sqlx::query("SELECT forge_validate_identifier('pg_my_table')")
1523 .execute(db.pool())
1524 .await
1525 .unwrap_err();
1526 let msg = pg_message(&err);
1527 assert!(
1528 msg.contains("pg_") || msg.to_lowercase().contains("reserved"),
1529 "expected pg_ reservation error, got: {msg}",
1530 );
1531 }
1532
1533 #[tokio::test]
1534 async fn validate_identifier_accepts_valid_name() {
1535 let db = db_with_system_schema("vid_ok").await;
1536 sqlx::query("SELECT forge_validate_identifier('orders_2026')")
1537 .execute(db.pool())
1538 .await
1539 .expect("normal identifier must be accepted");
1540 }
1541
1542 #[tokio::test]
1547 async fn startup_rejects_schema_ahead_of_binary() {
1548 let db = setup_db("mig_schema_ahead").await;
1549 let runner = MigrationRunner::new(db.pool().clone());
1550
1551 let m1 = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1553 let m2 = Migration::new("0002_extra", "CREATE TABLE extra (id INT PRIMARY KEY);");
1554 runner
1555 .run(vec![m1.clone(), m2])
1556 .await
1557 .expect("newer binary applies both cleanly");
1558
1559 let err = runner
1561 .run(vec![m1])
1562 .await
1563 .expect_err("older binary must refuse to start against a newer schema");
1564 let msg = err.to_string();
1565 assert!(
1566 msg.contains("0002_extra"),
1567 "error must name the unknown migration: {msg}",
1568 );
1569 assert!(
1570 msg.to_lowercase().contains("ahead") || msg.to_lowercase().contains("does not know"),
1571 "error must explain the schema-ahead condition: {msg}",
1572 );
1573 }
1574
1575 #[tokio::test]
1579 async fn enable_reactivity_rejects_derived_trigger_overflow() {
1580 let db = db_with_system_schema("enrx_overflow").await;
1581 let name = "a".repeat(51);
1582 let err = sqlx::query(&format!("SELECT forge_enable_reactivity('{name}')"))
1583 .execute(db.pool())
1584 .await
1585 .unwrap_err();
1586 let msg = pg_message(&err);
1587 assert!(
1588 msg.contains("63 bytes"),
1589 "expected derived-name overflow error, got: {msg}",
1590 );
1591 }
1592}