use super::scheduler::Scheduler;
use anyhow::{anyhow, Result};
pub const CURRENT_VERSION: u32 = 4;
pub fn migrate_state(mut scheduler: Scheduler) -> Result<Scheduler> {
let from_version = scheduler.version;
if from_version > CURRENT_VERSION {
return Err(anyhow!(
"State file version {} is newer than supported version {}. Please upgrade gflowd.",
from_version,
CURRENT_VERSION
));
}
if from_version == CURRENT_VERSION {
if scheduler.job_specs.len() != scheduler.job_runtimes.len() {
return Err(anyhow!(
"Invalid state: job_specs({}) and job_runtimes({}) length mismatch",
scheduler.job_specs.len(),
scheduler.job_runtimes.len()
));
}
return Ok(scheduler);
}
tracing::info!(
"Migrating state from version {} to {}",
from_version,
CURRENT_VERSION
);
tracing::debug!(
"State pre-migration summary: version={}, jobs_len={}, job_specs={}, job_runtimes={}, next_job_id={}, reservations={}, next_reservation_id={}",
scheduler.version,
scheduler.jobs_len(),
scheduler.job_specs.len(),
scheduler.job_runtimes.len(),
scheduler.next_job_id(),
scheduler.reservations.len(),
scheduler.next_reservation_id
);
if from_version < 1 {
scheduler = migrate_v0_to_v1(scheduler)?;
}
if from_version < 2 {
scheduler = migrate_v1_to_v2(scheduler)?;
}
if from_version < 3 {
scheduler = migrate_v2_to_v3(scheduler)?;
}
if from_version < 4 {
scheduler = migrate_v3_to_v4(scheduler)?;
}
scheduler.version = CURRENT_VERSION;
tracing::debug!(
"State post-migration summary: version={}, jobs_len={}, job_specs={}, job_runtimes={}, next_job_id={}, reservations={}, next_reservation_id={}",
scheduler.version,
scheduler.jobs_len(),
scheduler.job_specs.len(),
scheduler.job_runtimes.len(),
scheduler.next_job_id(),
scheduler.reservations.len(),
scheduler.next_reservation_id
);
Ok(scheduler)
}
fn migrate_v0_to_v1(mut scheduler: Scheduler) -> Result<Scheduler> {
tracing::info!("Migrating from v0 to v1: adding version field");
scheduler.version = 1;
Ok(scheduler)
}
fn migrate_v1_to_v2(mut scheduler: Scheduler) -> Result<Scheduler> {
tracing::info!("Migrating from v1 to v2: converting jobs HashMap to Vec");
scheduler.version = 2;
Ok(scheduler)
}
fn migrate_v2_to_v3(mut scheduler: Scheduler) -> Result<Scheduler> {
tracing::info!("Migrating from v2 to v3: adding GPU reservations");
scheduler.reservations = Vec::new();
scheduler.next_reservation_id = 1;
scheduler.version = 3;
Ok(scheduler)
}
fn migrate_v3_to_v4(mut scheduler: Scheduler) -> Result<Scheduler> {
tracing::info!("Migrating from v3 to v4: splitting Job into JobSpec and JobRuntime");
if scheduler.job_specs.len() != scheduler.job_runtimes.len() {
return Err(anyhow!(
"Invalid state during v3→v4 migration: job_specs({}) and job_runtimes({}) length mismatch",
scheduler.job_specs.len(),
scheduler.job_runtimes.len()
));
}
scheduler.version = 4;
tracing::info!(
"Migration v3→v4 complete: split {} jobs into specs and runtimes",
scheduler.job_specs.len()
);
Ok(scheduler)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::scheduler::Scheduler;
#[test]
fn test_current_version_no_migration() {
let scheduler = Scheduler {
version: CURRENT_VERSION,
..Default::default()
};
let next_id = scheduler.next_job_id();
let result = migrate_state(scheduler).unwrap();
assert_eq!(result.version, CURRENT_VERSION);
assert_eq!(result.next_job_id(), next_id);
}
#[test]
fn test_future_version_fails() {
let scheduler = Scheduler {
version: 999,
..Default::default()
};
let result = migrate_state(scheduler);
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("newer than supported"));
}
}
#[test]
fn test_v0_to_v1_migration() {
let scheduler = Scheduler {
version: 0,
..Default::default()
};
let original_next_id = scheduler.next_job_id();
let result = migrate_state(scheduler).unwrap();
assert_eq!(result.version, CURRENT_VERSION); assert_eq!(result.next_job_id(), original_next_id); }
#[test]
fn test_data_preservation_through_migration() {
use crate::core::job::{Job, JobState};
let old_json = serde_json::json!({
"version": 0,
"next_job_id": 42,
"jobs": [Job { id: 1, state: JobState::Finished, ..Default::default() }],
})
.to_string();
let scheduler: Scheduler = serde_json::from_str(&old_json).unwrap();
let result = migrate_state(scheduler).unwrap();
assert_eq!(result.version, CURRENT_VERSION); assert_eq!(result.next_job_id(), 42);
assert_eq!(result.jobs_len(), 1);
assert_eq!(result.get_job(1).unwrap().state, JobState::Finished);
}
#[test]
fn test_deserialize_old_hashmap_format() {
use crate::core::job::JobState;
let old_format_json = r#"{
"version": 1,
"jobs": {
"1": {
"id": 1,
"state": "Finished",
"script": null,
"command": null,
"gpus": 0,
"conda_env": null,
"run_dir": ".",
"priority": 0,
"depends_on": null,
"depends_on_ids": [],
"dependency_mode": null,
"auto_cancel_on_dependency_failure": true,
"task_id": null,
"time_limit": null,
"memory_limit_mb": null,
"submitted_by": "",
"redone_from": null,
"auto_close_tmux": false,
"parameters": {},
"group_id": null,
"max_concurrent": null,
"run_name": null,
"gpu_ids": null,
"submitted_at": null,
"started_at": null,
"finished_at": null
},
"2": {
"id": 2,
"state": "Queued",
"script": null,
"command": null,
"gpus": 0,
"conda_env": null,
"run_dir": ".",
"priority": 0,
"depends_on": null,
"depends_on_ids": [],
"dependency_mode": null,
"auto_cancel_on_dependency_failure": true,
"task_id": null,
"time_limit": null,
"memory_limit_mb": null,
"submitted_by": "",
"redone_from": null,
"auto_close_tmux": false,
"parameters": {},
"group_id": null,
"max_concurrent": null,
"run_name": null,
"gpu_ids": null,
"submitted_at": null,
"started_at": null,
"finished_at": null
}
},
"state_path": "state.json",
"next_job_id": 3,
"allowed_gpu_indices": null
}"#;
let scheduler: Scheduler = serde_json::from_str(old_format_json).unwrap();
assert_eq!(scheduler.version, 1);
assert_eq!(scheduler.jobs_len(), 2);
assert_eq!(scheduler.get_job(1).unwrap().state, JobState::Finished);
assert_eq!(scheduler.get_job(2).unwrap().state, JobState::Queued);
assert_eq!(scheduler.next_job_id(), 3);
}
#[test]
fn test_deserialize_new_vec_format() {
use crate::core::job::JobState;
let new_format_json = r#"{
"version": 2,
"jobs": [
{
"id": 1,
"state": "Finished",
"script": null,
"command": null,
"gpus": 0,
"conda_env": null,
"run_dir": ".",
"priority": 0,
"depends_on": null,
"depends_on_ids": [],
"dependency_mode": null,
"auto_cancel_on_dependency_failure": true,
"task_id": null,
"time_limit": null,
"memory_limit_mb": null,
"submitted_by": "",
"redone_from": null,
"auto_close_tmux": false,
"parameters": {},
"group_id": null,
"max_concurrent": null,
"run_name": null,
"gpu_ids": null,
"submitted_at": null,
"started_at": null,
"finished_at": null
},
{
"id": 2,
"state": "Queued",
"script": null,
"command": null,
"gpus": 0,
"conda_env": null,
"run_dir": ".",
"priority": 0,
"depends_on": null,
"depends_on_ids": [],
"dependency_mode": null,
"auto_cancel_on_dependency_failure": true,
"task_id": null,
"time_limit": null,
"memory_limit_mb": null,
"submitted_by": "",
"redone_from": null,
"auto_close_tmux": false,
"parameters": {},
"group_id": null,
"max_concurrent": null,
"run_name": null,
"gpu_ids": null,
"submitted_at": null,
"started_at": null,
"finished_at": null
}
],
"state_path": "state.json",
"next_job_id": 3,
"allowed_gpu_indices": null
}"#;
let scheduler: Scheduler = serde_json::from_str(new_format_json).unwrap();
assert_eq!(scheduler.version, 2);
assert_eq!(scheduler.jobs_len(), 2);
assert_eq!(scheduler.get_job(1).unwrap().state, JobState::Finished);
assert_eq!(scheduler.get_job(2).unwrap().state, JobState::Queued);
assert_eq!(scheduler.next_job_id(), 3);
}
}