use std::collections::BTreeSet;
use std::hash::{Hash, Hasher};
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use crate::config::{MigrationConfig, MigrationMode};
use crate::error::{MigrationError, Result};
pub const RESUME_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CompletedStage {
PrepareSnapshot,
Dump,
Restore,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumeToken {
pub schema_version: u32,
pub config_hash: String,
pub mode: String,
pub completed: BTreeSet<CompletedStage>,
pub dump_path: PathBuf,
pub slot_name: Option<String>,
pub subscription_name: Option<String>,
pub publication: Option<String>,
pub snapshot_name: Option<String>,
pub last_applied_lsn: Option<u64>,
pub updated_at: String,
}
impl ResumeToken {
pub fn new(cfg: &MigrationConfig, dump_path: PathBuf) -> Self {
let mode = match cfg.mode {
MigrationMode::Offline => "offline",
MigrationMode::Online => "online",
};
Self {
schema_version: RESUME_SCHEMA_VERSION,
config_hash: config_hash(cfg),
mode: mode.to_string(),
completed: BTreeSet::new(),
dump_path,
slot_name: if cfg.mode == MigrationMode::Online {
Some(cfg.online.slot_name.clone())
} else {
None
},
subscription_name: if cfg.mode == MigrationMode::Online {
Some(cfg.online.subscription_name.clone())
} else {
None
},
publication: if cfg.mode == MigrationMode::Online {
Some(cfg.online.publication.clone())
} else {
None
},
snapshot_name: None,
last_applied_lsn: None,
updated_at: now_rfc3339(),
}
}
pub fn mark(&mut self, stage: CompletedStage) {
self.completed.insert(stage);
self.updated_at = now_rfc3339();
}
pub fn has(&self, stage: CompletedStage) -> bool {
self.completed.contains(&stage)
}
pub async fn load(path: &Path) -> Result<Option<Self>> {
let bytes = match tokio::fs::read(path).await {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(MigrationError::Io(e)),
};
let token: Self = serde_json::from_slice(&bytes).map_err(|e| {
MigrationError::config(format!(
"resume token at {} is not valid JSON: {e}",
path.display()
))
})?;
if token.schema_version != RESUME_SCHEMA_VERSION {
return Err(MigrationError::config(format!(
"resume token at {} has schema version {} (expected {}); \
retry with --force-clean to start fresh",
path.display(),
token.schema_version,
RESUME_SCHEMA_VERSION,
)));
}
Ok(Some(token))
}
pub async fn save(&self, path: &Path) -> Result<()> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await?;
}
}
let tmp = path.with_extension("json.tmp");
let bytes = serde_json::to_vec_pretty(self).map_err(|e| {
MigrationError::config(format!("failed to serialise resume token: {e}"))
})?;
tokio::fs::write(&tmp, &bytes).await?;
tokio::fs::rename(&tmp, path).await?;
Ok(())
}
pub fn check_compatible(&self, cfg: &MigrationConfig) -> Result<()> {
let expected = config_hash(cfg);
if self.config_hash != expected {
return Err(MigrationError::config(format!(
"resume token's config_hash {} does not match current config {} — \
either restore the original CLI flags or retry with --force-clean",
self.config_hash, expected,
)));
}
let mode = match cfg.mode {
MigrationMode::Offline => "offline",
MigrationMode::Online => "online",
};
if self.mode != mode {
return Err(MigrationError::config(format!(
"resume token was written in `{}` mode; current run is `{}`",
self.mode, mode,
)));
}
Ok(())
}
}
pub fn default_resume_path(dump_path: &Path) -> PathBuf {
let mut s = dump_path.as_os_str().to_os_string();
s.push(".resume.json");
PathBuf::from(s)
}
pub fn config_hash(cfg: &MigrationConfig) -> String {
let mut h = std::collections::hash_map::DefaultHasher::new();
match cfg.mode {
MigrationMode::Offline => 0u8.hash(&mut h),
MigrationMode::Online => 1u8.hash(&mut h),
}
cfg.source.host.hash(&mut h);
cfg.source.port.hash(&mut h);
cfg.source.database.hash(&mut h);
cfg.target.host.hash(&mut h);
cfg.target.port.hash(&mut h);
cfg.target.database.hash(&mut h);
let mut schemas = cfg.schemas.clone();
schemas.sort();
schemas.hash(&mut h);
let mut tables = cfg.tables.clone();
tables.sort();
tables.hash(&mut h);
if cfg.mode == MigrationMode::Online {
cfg.online.slot_name.hash(&mut h);
cfg.online.publication.hash(&mut h);
cfg.online.subscription_name.hash(&mut h);
}
format!("{:016x}", h.finish())
}
fn now_rfc3339() -> String {
chrono::Utc::now().to_rfc3339()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{EndpointConfig, OnlineOptions};
fn cfg() -> MigrationConfig {
MigrationConfig {
mode: MigrationMode::Online,
source: EndpointConfig::parse("postgresql://u:p@src:5432/db").unwrap(),
target: EndpointConfig::parse("postgresql://u:p@dst:5432/db").unwrap(),
online: OnlineOptions {
slot_name: "slot_a".into(),
publication: "pub_a".into(),
subscription_name: "sub_a".into(),
..OnlineOptions::default()
},
..MigrationConfig::default()
}
}
#[test]
fn config_hash_is_stable_for_identical_inputs() {
assert_eq!(config_hash(&cfg()), config_hash(&cfg()));
}
#[test]
fn config_hash_changes_when_slot_name_changes() {
let mut a = cfg();
let mut b = cfg();
a.online.slot_name = "slot_a".into();
b.online.slot_name = "slot_b".into();
assert_ne!(config_hash(&a), config_hash(&b));
}
#[test]
fn config_hash_ignores_password() {
let mut a = cfg();
let mut b = cfg();
a.source.password = "one".into();
b.source.password = "two".into();
assert_eq!(config_hash(&a), config_hash(&b));
}
#[test]
fn config_hash_ignores_schema_table_order() {
let mut a = cfg();
let mut b = cfg();
a.schemas = vec!["public".into(), "app".into()];
b.schemas = vec!["app".into(), "public".into()];
assert_eq!(config_hash(&a), config_hash(&b));
}
#[test]
fn mark_and_has_round_trip() {
let mut t = ResumeToken::new(&cfg(), PathBuf::from("/tmp/dump"));
assert!(!t.has(CompletedStage::Dump));
t.mark(CompletedStage::Dump);
assert!(t.has(CompletedStage::Dump));
assert!(!t.has(CompletedStage::Restore));
}
#[tokio::test]
async fn load_returns_none_when_file_absent() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("nope.json");
assert!(ResumeToken::load(&path).await.unwrap().is_none());
}
#[tokio::test]
async fn save_then_load_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("resume.json");
let mut t = ResumeToken::new(&cfg(), PathBuf::from("/tmp/dump"));
t.mark(CompletedStage::PrepareSnapshot);
t.mark(CompletedStage::Dump);
t.snapshot_name = Some("00000003-deadbeef-1".into());
t.last_applied_lsn = Some(0x1234_5678);
t.save(&path).await.unwrap();
let loaded = ResumeToken::load(&path).await.unwrap().unwrap();
assert_eq!(loaded.config_hash, t.config_hash);
assert!(loaded.has(CompletedStage::PrepareSnapshot));
assert!(loaded.has(CompletedStage::Dump));
assert!(!loaded.has(CompletedStage::Restore));
assert_eq!(loaded.snapshot_name.as_deref(), Some("00000003-deadbeef-1"));
assert_eq!(loaded.last_applied_lsn, Some(0x1234_5678));
}
#[tokio::test]
async fn check_compatible_rejects_mismatched_config() {
let t = ResumeToken::new(&cfg(), PathBuf::from("/tmp/dump"));
let mut other = cfg();
other.online.slot_name = "different".into();
let err = t.check_compatible(&other).unwrap_err();
assert!(matches!(err, MigrationError::Config(_)));
}
#[tokio::test]
async fn check_compatible_rejects_mode_change() {
let t = ResumeToken::new(&cfg(), PathBuf::from("/tmp/dump"));
let mut other = cfg();
other.mode = MigrationMode::Offline;
let err = t.check_compatible(&other).unwrap_err();
assert!(matches!(err, MigrationError::Config(_)));
}
#[tokio::test]
async fn load_rejects_unknown_schema_version() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("resume.json");
let mut t = ResumeToken::new(&cfg(), PathBuf::from("/tmp/dump"));
t.schema_version = RESUME_SCHEMA_VERSION + 1;
t.save(&path).await.unwrap();
let err = ResumeToken::load(&path).await.unwrap_err();
assert!(matches!(err, MigrationError::Config(_)));
}
#[test]
fn default_resume_path_appends_suffix() {
let p = default_resume_path(Path::new("/tmp/dump_online-12345"));
assert_eq!(p, PathBuf::from("/tmp/dump_online-12345.resume.json"));
}
}