#![allow(dead_code)]
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use crate::error::{BatchError, Result};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SchemaVersion {
pub major: u32,
pub minor: u32,
pub patch: u32,
}
impl SchemaVersion {
#[must_use]
pub const fn new(major: u32, minor: u32, patch: u32) -> Self {
Self {
major,
minor,
patch,
}
}
#[must_use]
pub fn is_older_than(&self, other: &Self) -> bool {
self.to_tuple() < other.to_tuple()
}
#[must_use]
pub fn is_at_least(&self, other: &Self) -> bool {
self.to_tuple() >= other.to_tuple()
}
fn to_tuple(&self) -> (u32, u32, u32) {
(self.major, self.minor, self.patch)
}
}
impl std::fmt::Display for SchemaVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}
impl PartialOrd for SchemaVersion {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SchemaVersion {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.to_tuple().cmp(&other.to_tuple())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MigrationAction {
AddField {
field: String,
default_value: String,
},
RemoveField {
field: String,
},
RenameField {
from: String,
to: String,
},
TransformField {
field: String,
description: String,
},
SetValue {
field: String,
value: String,
},
Custom {
description: String,
},
}
impl std::fmt::Display for MigrationAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AddField { field, .. } => write!(f, "add_field({field})"),
Self::RemoveField { field } => write!(f, "remove_field({field})"),
Self::RenameField { from, to } => write!(f, "rename_field({from}->{to})"),
Self::TransformField { field, .. } => write!(f, "transform_field({field})"),
Self::SetValue { field, .. } => write!(f, "set_value({field})"),
Self::Custom { description } => write!(f, "custom({description})"),
}
}
}
#[derive(Debug, Clone)]
pub struct MigrationStep {
pub from_version: SchemaVersion,
pub to_version: SchemaVersion,
pub description: String,
pub actions: Vec<MigrationAction>,
pub reversible: bool,
}
impl MigrationStep {
#[must_use]
pub fn new(from: SchemaVersion, to: SchemaVersion, description: impl Into<String>) -> Self {
Self {
from_version: from,
to_version: to,
description: description.into(),
actions: Vec::new(),
reversible: false,
}
}
#[must_use]
pub fn with_action(mut self, action: MigrationAction) -> Self {
self.actions.push(action);
self
}
#[must_use]
pub fn reversible(mut self) -> Self {
self.reversible = true;
self
}
}
#[derive(Debug, Clone)]
pub struct MigrationChain {
steps: Vec<MigrationStep>,
}
impl MigrationChain {
#[must_use]
pub fn new() -> Self {
Self { steps: Vec::new() }
}
pub fn add_step(&mut self, step: MigrationStep) {
self.steps.push(step);
self.steps
.sort_by(|a, b| a.from_version.cmp(&b.from_version));
}
#[must_use]
pub fn len(&self) -> usize {
self.steps.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.steps.is_empty()
}
#[must_use]
pub fn latest_version(&self) -> Option<SchemaVersion> {
self.steps.last().map(|s| s.to_version.clone())
}
pub fn find_path(
&self,
from: &SchemaVersion,
to: &SchemaVersion,
) -> Result<Vec<&MigrationStep>> {
if from == to {
return Ok(Vec::new());
}
if from > to {
return Err(BatchError::InvalidJobConfig(format!(
"Cannot downgrade from {from} to {to}: downgrade not supported"
)));
}
let mut path = Vec::new();
let mut current = from.clone();
while current < *to {
let step = self
.steps
.iter()
.find(|s| s.from_version == current)
.ok_or_else(|| {
BatchError::InvalidJobConfig(format!(
"No migration path from {current} to {to}"
))
})?;
path.push(step);
current = step.to_version.clone();
if path.len() > 1000 {
return Err(BatchError::InvalidJobConfig(
"Migration path too long (possible loop)".to_string(),
));
}
}
Ok(path)
}
#[must_use]
pub fn steps(&self) -> &[MigrationStep] {
&self.steps
}
}
impl Default for MigrationChain {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobData {
pub schema_version: SchemaVersion,
pub fields: HashMap<String, serde_json::Value>,
}
impl JobData {
#[must_use]
pub fn new(version: SchemaVersion) -> Self {
Self {
schema_version: version,
fields: HashMap::new(),
}
}
pub fn set(&mut self, key: impl Into<String>, value: serde_json::Value) {
self.fields.insert(key.into(), value);
}
#[must_use]
pub fn get(&self, key: &str) -> Option<&serde_json::Value> {
self.fields.get(key)
}
pub fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
self.fields.remove(key)
}
pub fn rename(&mut self, from: &str, to: &str) -> bool {
if let Some(val) = self.fields.remove(from) {
self.fields.insert(to.to_string(), val);
true
} else {
false
}
}
#[must_use]
pub fn has_field(&self, key: &str) -> bool {
self.fields.contains_key(key)
}
#[must_use]
pub fn field_count(&self) -> usize {
self.fields.len()
}
}
#[derive(Debug)]
pub struct JobMigrator {
chain: MigrationChain,
target_version: SchemaVersion,
}
impl JobMigrator {
#[must_use]
pub fn new(chain: MigrationChain) -> Self {
let target = chain
.latest_version()
.unwrap_or(SchemaVersion::new(1, 0, 0));
Self {
chain,
target_version: target,
}
}
#[must_use]
pub fn with_target(mut self, version: SchemaVersion) -> Self {
self.target_version = version;
self
}
pub fn migrate(&self, job: &mut JobData) -> Result<MigrationReport> {
let path = self
.chain
.find_path(&job.schema_version, &self.target_version)?;
if path.is_empty() {
return Ok(MigrationReport {
from_version: job.schema_version.clone(),
to_version: job.schema_version.clone(),
steps_applied: 0,
actions_applied: Vec::new(),
});
}
let from = job.schema_version.clone();
let mut actions_applied = Vec::new();
for step in &path {
for action in &step.actions {
self.apply_action(job, action)?;
actions_applied.push(action.to_string());
}
job.schema_version = step.to_version.clone();
}
Ok(MigrationReport {
from_version: from,
to_version: job.schema_version.clone(),
steps_applied: path.len(),
actions_applied,
})
}
#[must_use]
pub fn needs_migration(&self, job: &JobData) -> bool {
job.schema_version < self.target_version
}
#[must_use]
pub fn target_version(&self) -> &SchemaVersion {
&self.target_version
}
#[must_use]
pub fn chain(&self) -> &MigrationChain {
&self.chain
}
fn apply_action(&self, job: &mut JobData, action: &MigrationAction) -> Result<()> {
match action {
MigrationAction::AddField {
field,
default_value,
} => {
if !job.has_field(field) {
let value: serde_json::Value =
serde_json::from_str(default_value).map_err(|e| {
BatchError::InvalidJobConfig(format!(
"Invalid default value for field '{field}': {e}"
))
})?;
job.set(field.clone(), value);
}
}
MigrationAction::RemoveField { field } => {
job.remove(field);
}
MigrationAction::RenameField { from, to } => {
job.rename(from, to);
}
MigrationAction::SetValue { field, value } => {
let parsed: serde_json::Value = serde_json::from_str(value).map_err(|e| {
BatchError::InvalidJobConfig(format!("Invalid value for field '{field}': {e}"))
})?;
job.set(field.clone(), parsed);
}
MigrationAction::TransformField { .. } | MigrationAction::Custom { .. } => {
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct MigrationReport {
pub from_version: SchemaVersion,
pub to_version: SchemaVersion,
pub steps_applied: usize,
pub actions_applied: Vec<String>,
}
impl MigrationReport {
#[must_use]
pub fn was_migrated(&self) -> bool {
self.steps_applied > 0
}
}
#[cfg(test)]
mod tests {
use super::*;
fn v(major: u32, minor: u32, patch: u32) -> SchemaVersion {
SchemaVersion::new(major, minor, patch)
}
fn sample_chain() -> MigrationChain {
let mut chain = MigrationChain::new();
chain.add_step(
MigrationStep::new(v(1, 0, 0), v(1, 1, 0), "Add priority field").with_action(
MigrationAction::AddField {
field: "priority".to_string(),
default_value: "\"normal\"".to_string(),
},
),
);
chain.add_step(
MigrationStep::new(v(1, 1, 0), v(1, 2, 0), "Rename output field").with_action(
MigrationAction::RenameField {
from: "output_path".to_string(),
to: "output_dir".to_string(),
},
),
);
chain.add_step(
MigrationStep::new(v(1, 2, 0), v(2, 0, 0), "Restructure retry config")
.with_action(MigrationAction::AddField {
field: "retry_config".to_string(),
default_value: r#"{"max_attempts": 3, "delay_ms": 1000}"#.to_string(),
})
.with_action(MigrationAction::RemoveField {
field: "legacy_retries".to_string(),
}),
);
chain
}
#[test]
fn test_schema_version_display() {
assert_eq!(v(1, 2, 3).to_string(), "1.2.3");
}
#[test]
fn test_schema_version_comparison() {
assert!(v(1, 0, 0).is_older_than(&v(1, 1, 0)));
assert!(v(1, 1, 0).is_older_than(&v(2, 0, 0)));
assert!(!v(2, 0, 0).is_older_than(&v(1, 0, 0)));
assert!(v(1, 0, 0).is_at_least(&v(1, 0, 0)));
assert!(v(2, 0, 0).is_at_least(&v(1, 0, 0)));
}
#[test]
fn test_schema_version_ordering() {
let mut versions = vec![v(2, 0, 0), v(1, 0, 0), v(1, 2, 0), v(1, 1, 0)];
versions.sort();
assert_eq!(versions[0], v(1, 0, 0));
assert_eq!(versions[3], v(2, 0, 0));
}
#[test]
fn test_migration_action_display() {
assert_eq!(
MigrationAction::AddField {
field: "priority".into(),
default_value: "\"normal\"".into()
}
.to_string(),
"add_field(priority)"
);
assert_eq!(
MigrationAction::RenameField {
from: "old".into(),
to: "new".into()
}
.to_string(),
"rename_field(old->new)"
);
assert_eq!(
MigrationAction::RemoveField {
field: "legacy".into()
}
.to_string(),
"remove_field(legacy)"
);
}
#[test]
fn test_chain_latest_version() {
let chain = sample_chain();
assert_eq!(chain.latest_version(), Some(v(2, 0, 0)));
}
#[test]
fn test_chain_find_path_same_version() {
let chain = sample_chain();
let path = chain
.find_path(&v(2, 0, 0), &v(2, 0, 0))
.expect("should work");
assert!(path.is_empty());
}
#[test]
fn test_chain_find_path_one_step() {
let chain = sample_chain();
let path = chain
.find_path(&v(1, 0, 0), &v(1, 1, 0))
.expect("should work");
assert_eq!(path.len(), 1);
assert_eq!(path[0].to_version, v(1, 1, 0));
}
#[test]
fn test_chain_find_path_multi_step() {
let chain = sample_chain();
let path = chain
.find_path(&v(1, 0, 0), &v(2, 0, 0))
.expect("should work");
assert_eq!(path.len(), 3);
}
#[test]
fn test_chain_find_path_no_path() {
let chain = sample_chain();
let result = chain.find_path(&v(0, 9, 0), &v(2, 0, 0));
assert!(result.is_err());
}
#[test]
fn test_chain_downgrade_not_supported() {
let chain = sample_chain();
let result = chain.find_path(&v(2, 0, 0), &v(1, 0, 0));
assert!(result.is_err());
}
#[test]
fn test_job_data_field_operations() {
let mut data = JobData::new(v(1, 0, 0));
data.set("name", serde_json::Value::String("test".into()));
assert!(data.has_field("name"));
assert_eq!(data.field_count(), 1);
assert_eq!(
data.get("name"),
Some(&serde_json::Value::String("test".into()))
);
data.rename("name", "job_name");
assert!(!data.has_field("name"));
assert!(data.has_field("job_name"));
data.remove("job_name");
assert_eq!(data.field_count(), 0);
}
#[test]
fn test_migrator_no_migration_needed() {
let chain = sample_chain();
let migrator = JobMigrator::new(chain);
let mut job = JobData::new(v(2, 0, 0));
let report = migrator.migrate(&mut job).expect("should work");
assert!(!report.was_migrated());
assert_eq!(report.steps_applied, 0);
}
#[test]
fn test_migrator_single_step() {
let chain = sample_chain();
let migrator = JobMigrator::new(chain);
let mut job = JobData::new(v(1, 0, 0));
let migrator = migrator.with_target(v(1, 1, 0));
let report = migrator.migrate(&mut job).expect("should work");
assert!(report.was_migrated());
assert_eq!(report.steps_applied, 1);
assert_eq!(job.schema_version, v(1, 1, 0));
assert!(job.has_field("priority"));
assert_eq!(
job.get("priority"),
Some(&serde_json::Value::String("normal".into()))
);
}
#[test]
fn test_migrator_full_migration() {
let chain = sample_chain();
let migrator = JobMigrator::new(chain);
let mut job = JobData::new(v(1, 0, 0));
job.set(
"output_path",
serde_json::Value::String(
std::env::temp_dir()
.join("oximedia-batch-migration-output")
.to_string_lossy()
.into_owned(),
),
);
job.set("legacy_retries", serde_json::Value::Number(3.into()));
let report = migrator.migrate(&mut job).expect("should work");
assert!(report.was_migrated());
assert_eq!(report.steps_applied, 3);
assert_eq!(job.schema_version, v(2, 0, 0));
assert!(job.has_field("priority"));
assert!(!job.has_field("output_path"));
assert!(job.has_field("output_dir"));
assert!(job.has_field("retry_config"));
assert!(!job.has_field("legacy_retries"));
}
#[test]
fn test_migrator_add_field_does_not_overwrite_existing() {
let chain = sample_chain();
let migrator = JobMigrator::new(chain).with_target(v(1, 1, 0));
let mut job = JobData::new(v(1, 0, 0));
job.set("priority", serde_json::Value::String("high".into()));
let _report = migrator.migrate(&mut job).expect("should work");
assert_eq!(
job.get("priority"),
Some(&serde_json::Value::String("high".into()))
);
}
#[test]
fn test_migrator_needs_migration() {
let chain = sample_chain();
let migrator = JobMigrator::new(chain);
let old_job = JobData::new(v(1, 0, 0));
let current_job = JobData::new(v(2, 0, 0));
assert!(migrator.needs_migration(&old_job));
assert!(!migrator.needs_migration(¤t_job));
}
#[test]
fn test_migrator_set_value_action() {
let mut chain = MigrationChain::new();
chain.add_step(
MigrationStep::new(v(1, 0, 0), v(1, 1, 0), "Set format version").with_action(
MigrationAction::SetValue {
field: "format_version".to_string(),
value: "2".to_string(),
},
),
);
let migrator = JobMigrator::new(chain);
let mut job = JobData::new(v(1, 0, 0));
job.set("format_version", serde_json::Value::Number(1.into()));
migrator.migrate(&mut job).expect("should work");
assert_eq!(
job.get("format_version"),
Some(&serde_json::Value::Number(2.into()))
);
}
#[test]
fn test_migration_report_actions() {
let chain = sample_chain();
let migrator = JobMigrator::new(chain).with_target(v(1, 1, 0));
let mut job = JobData::new(v(1, 0, 0));
let report = migrator.migrate(&mut job).expect("should work");
assert_eq!(report.actions_applied.len(), 1);
assert!(report.actions_applied[0].contains("priority"));
}
#[test]
fn test_migration_step_builder() {
let step = MigrationStep::new(v(1, 0, 0), v(1, 1, 0), "Test")
.with_action(MigrationAction::AddField {
field: "x".into(),
default_value: "0".into(),
})
.reversible();
assert!(step.reversible);
assert_eq!(step.actions.len(), 1);
}
#[test]
fn test_empty_chain() {
let chain = MigrationChain::new();
assert!(chain.is_empty());
assert_eq!(chain.len(), 0);
assert!(chain.latest_version().is_none());
}
#[test]
fn test_invalid_json_value_returns_error() {
let mut chain = MigrationChain::new();
chain.add_step(
MigrationStep::new(v(1, 0, 0), v(1, 1, 0), "Bad value").with_action(
MigrationAction::AddField {
field: "bad".into(),
default_value: "not valid json {{{".into(),
},
),
);
let migrator = JobMigrator::new(chain);
let mut job = JobData::new(v(1, 0, 0));
let result = migrator.migrate(&mut job);
assert!(result.is_err());
}
#[test]
fn test_custom_action_is_noop() {
let mut chain = MigrationChain::new();
chain.add_step(
MigrationStep::new(v(1, 0, 0), v(1, 1, 0), "Custom migration").with_action(
MigrationAction::Custom {
description: "manual review required".into(),
},
),
);
let migrator = JobMigrator::new(chain);
let mut job = JobData::new(v(1, 0, 0));
let report = migrator.migrate(&mut job).expect("should work");
assert!(report.was_migrated());
}
}