#![allow(deprecated)]
use gflow::core::executor::Executor;
use gflow::core::job::{DependencyIds, Job, JobBuilder, JobState};
use gflow::core::scheduler::{Scheduler, SchedulerBuilder};
use gflow::core::GPUSlot;
use smallvec::smallvec;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use uuid::Uuid;
#[derive(Clone)]
struct MockExecutor {
executions: Arc<Mutex<Vec<Job>>>,
should_fail: bool,
}
impl MockExecutor {
fn new() -> Self {
Self {
executions: Arc::new(Mutex::new(Vec::new())),
should_fail: false,
}
}
#[allow(dead_code)]
fn with_failure(should_fail: bool) -> Self {
Self {
executions: Arc::new(Mutex::new(Vec::new())),
should_fail,
}
}
fn get_executions(&self) -> Vec<Job> {
self.executions.lock().unwrap().clone()
}
fn execution_count(&self) -> usize {
self.executions.lock().unwrap().len()
}
fn clear(&self) {
self.executions.lock().unwrap().clear();
}
}
impl Executor for MockExecutor {
fn execute(&self, job: &Job) -> anyhow::Result<()> {
if self.should_fail {
anyhow::bail!("Mock execution failed")
} else {
self.executions.lock().unwrap().push(job.clone());
Ok(())
}
}
}
fn create_test_scheduler() -> (Scheduler, MockExecutor) {
let executor = MockExecutor::new();
let executor_clone = executor.clone();
let mut gpu_slots = HashMap::new();
gpu_slots.insert(
"GPU-0".to_string(),
GPUSlot {
index: 0,
available: true,
total_memory_mb: None,
reason: None,
},
);
gpu_slots.insert(
"GPU-1".to_string(),
GPUSlot {
index: 1,
available: true,
total_memory_mb: None,
reason: None,
},
);
let scheduler = SchedulerBuilder::new()
.with_executor(Box::new(executor))
.with_state_path(PathBuf::from("/tmp/test_scheduler.json"))
.with_total_memory_mb(8192) .with_gpu_slots(gpu_slots)
.build();
(scheduler, executor_clone)
}
fn create_test_job(username: &str) -> Job {
JobBuilder::new()
.submitted_by(username.to_string())
.run_dir("/tmp")
.command("echo test")
.build()
}
#[test]
fn test_job_submission_and_queuing() {
let (mut scheduler, _) = create_test_scheduler();
let job1 = create_test_job("alice");
let (job_id1, run_name1) = scheduler.submit_job(job1);
assert_eq!(job_id1, 1);
assert_eq!(run_name1, "gflow-job-1");
assert!(scheduler.job_exists(1));
assert_eq!(scheduler.get_job(1).unwrap().state, JobState::Queued);
let job2 = create_test_job("bob");
let (job_id2, run_name2) = scheduler.submit_job(job2);
assert_eq!(job_id2, 2);
assert_eq!(run_name2, "gflow-job-2");
assert!(scheduler.job_exists(2));
assert_eq!(scheduler.get_job(2).unwrap().state, JobState::Queued);
assert_eq!(scheduler.jobs_len(), 2);
}
#[test]
fn test_job_execution_from_queue() {
let (mut scheduler, executor) = create_test_scheduler();
let job = create_test_job("alice");
let (job_id, _) = scheduler.submit_job(job);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 1);
assert!(results[0].1.is_ok());
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Running);
assert_eq!(executor.execution_count(), 1);
let executed_jobs = executor.get_executions();
assert_eq!(executed_jobs[0].id, job_id);
}
#[test]
fn test_multiple_jobs_execution() {
let (mut scheduler, executor) = create_test_scheduler();
for i in 0..3 {
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.command(format!("echo job{}", i))
.build();
scheduler.submit_job(job);
}
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 3);
assert_eq!(executor.execution_count(), 3);
for job_id in 1..=3 {
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Running);
}
}
#[test]
fn test_dependency_resolution_basic() {
let (mut scheduler, executor) = create_test_scheduler();
let job_a = create_test_job("alice");
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.command("echo job_b")
.depends_on(Some(job_a_id))
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 1);
assert_eq!(results[0].0, job_a_id);
assert_eq!(
scheduler.get_job(job_a_id).unwrap().state,
JobState::Running
);
assert_eq!(scheduler.get_job(job_b_id).unwrap().state, JobState::Queued);
executor.clear();
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 0);
assert_eq!(scheduler.get_job(job_b_id).unwrap().state, JobState::Queued);
scheduler.finish_job(job_a_id);
assert_eq!(
scheduler.get_job(job_a_id).unwrap().state,
JobState::Finished
);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 1);
assert_eq!(results[0].0, job_b_id);
assert_eq!(
scheduler.get_job(job_b_id).unwrap().state,
JobState::Running
);
}
#[test]
fn test_dependency_chain() {
let (mut scheduler, _executor) = create_test_scheduler();
let job_a = create_test_job("alice");
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.depends_on(Some(job_a_id))
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
let job_c = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.depends_on(Some(job_b_id))
.build();
let (job_c_id, _) = scheduler.submit_job(job_c);
scheduler.schedule_jobs();
assert_eq!(
scheduler.get_job(job_a_id).unwrap().state,
JobState::Running
);
assert_eq!(scheduler.get_job(job_b_id).unwrap().state, JobState::Queued);
assert_eq!(scheduler.get_job(job_c_id).unwrap().state, JobState::Queued);
scheduler.finish_job(job_a_id);
scheduler.schedule_jobs();
assert_eq!(
scheduler.get_job(job_a_id).unwrap().state,
JobState::Finished
);
assert_eq!(
scheduler.get_job(job_b_id).unwrap().state,
JobState::Running
);
assert_eq!(scheduler.get_job(job_c_id).unwrap().state, JobState::Queued);
scheduler.finish_job(job_b_id);
scheduler.schedule_jobs();
assert_eq!(
scheduler.get_job(job_b_id).unwrap().state,
JobState::Finished
);
assert_eq!(
scheduler.get_job(job_c_id).unwrap().state,
JobState::Running
);
}
#[test]
fn test_dependency_not_started_if_parent_failed() {
let (mut scheduler, _executor) = create_test_scheduler();
let job_a = create_test_job("alice");
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.depends_on(Some(job_a_id))
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
scheduler.schedule_jobs();
assert_eq!(
scheduler.get_job(job_a_id).unwrap().state,
JobState::Running
);
scheduler.fail_job(job_a_id);
assert_eq!(scheduler.get_job(job_a_id).unwrap().state, JobState::Failed);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 0);
assert_eq!(scheduler.get_job(job_b_id).unwrap().state, JobState::Queued);
}
#[test]
fn test_priority_scheduling_high_priority_first() {
let (mut scheduler, executor) = create_test_scheduler();
let low_priority_job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.priority(5)
.gpus(1) .build();
let (low_id, _) = scheduler.submit_job(low_priority_job);
let high_priority_job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.priority(20)
.gpus(1) .build();
let (high_id, _) = scheduler.submit_job(high_priority_job);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 2);
let executions = executor.get_executions();
assert_eq!(executions[0].id, high_id);
assert_eq!(executions[0].priority, 20);
assert_eq!(executions[1].id, low_id);
assert_eq!(executions[1].priority, 5);
}
#[test]
fn test_priority_with_time_bonus() {
let (mut scheduler, executor) = create_test_scheduler();
let long_job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.priority(10)
.time_limit(Some(Duration::from_secs(3600 * 24))) .build();
scheduler.submit_job(long_job);
let short_job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.priority(10)
.time_limit(Some(Duration::from_secs(60))) .build();
let (short_id, _) = scheduler.submit_job(short_job);
scheduler.schedule_jobs();
let executions = executor.get_executions();
assert_eq!(executions[0].id, short_id);
}
#[test]
fn test_priority_tiebreaker_with_job_id() {
let (mut scheduler, executor) = create_test_scheduler();
for _ in 0..3 {
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.priority(10)
.build();
scheduler.submit_job(job);
}
scheduler.schedule_jobs();
let executions = executor.get_executions();
assert_eq!(executions[0].id, 1);
assert_eq!(executions[1].id, 2);
assert_eq!(executions[2].id, 3);
}
#[test]
fn test_gpu_constraints() {
let (mut scheduler, executor) = create_test_scheduler();
for _ in 0..3 {
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.build();
scheduler.submit_job(job);
}
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 2);
assert_eq!(executor.execution_count(), 2);
assert_eq!(scheduler.get_job(1).unwrap().state, JobState::Running);
assert_eq!(scheduler.get_job(2).unwrap().state, JobState::Running);
assert_eq!(scheduler.get_job(3).unwrap().state, JobState::Queued);
assert_eq!(scheduler.get_job(1).unwrap().gpu_ids, Some(smallvec![0]));
assert_eq!(scheduler.get_job(2).unwrap().gpu_ids, Some(smallvec![1]));
scheduler
.gpu_slots_mut()
.get_mut("GPU-0")
.unwrap()
.available = false;
scheduler
.gpu_slots_mut()
.get_mut("GPU-1")
.unwrap()
.available = false;
executor.clear();
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 0);
assert_eq!(scheduler.get_job(3).unwrap().state, JobState::Queued);
}
#[test]
fn test_multi_gpu_job() {
let (mut scheduler, executor) = create_test_scheduler();
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(2)
.build();
let (job_id, _) = scheduler.submit_job(job);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 1);
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Running);
assert_eq!(
scheduler.get_job(job_id).unwrap().gpu_ids,
Some(smallvec![0, 1])
);
assert_eq!(executor.execution_count(), 1);
}
#[test]
fn test_insufficient_gpus() {
let (mut scheduler, executor) = create_test_scheduler();
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(3)
.build();
let (job_id, _) = scheduler.submit_job(job);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 0);
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Queued);
assert_eq!(executor.execution_count(), 0);
}
#[test]
fn test_memory_constraints() {
let (mut scheduler, _executor) = create_test_scheduler();
let job1 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.memory_limit_mb(Some(4096))
.build();
let (job1_id, _) = scheduler.submit_job(job1);
let job2 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.memory_limit_mb(Some(3072))
.build();
let (job2_id, _) = scheduler.submit_job(job2);
let job3 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.memory_limit_mb(Some(2048))
.build();
let (job3_id, _) = scheduler.submit_job(job3);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 2);
assert_eq!(scheduler.get_job(job1_id).unwrap().state, JobState::Running);
assert_eq!(scheduler.get_job(job2_id).unwrap().state, JobState::Running);
assert_eq!(scheduler.get_job(job3_id).unwrap().state, JobState::Queued);
scheduler.refresh_available_memory();
scheduler.finish_job(job1_id);
scheduler.refresh_available_memory();
let results2 = scheduler.schedule_jobs();
assert_eq!(results2.len(), 1);
assert_eq!(scheduler.get_job(job3_id).unwrap().state, JobState::Running);
}
#[test]
fn test_job_exceeds_total_memory() {
let (mut scheduler, executor) = create_test_scheduler();
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.memory_limit_mb(Some(10240)) .build();
let (job_id, _) = scheduler.submit_job(job);
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 0);
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Queued);
assert_eq!(executor.execution_count(), 0);
}
#[test]
fn test_priority_with_resource_constraints() {
let (mut scheduler, executor) = create_test_scheduler();
let low_priority = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.priority(5)
.gpus(1)
.build();
scheduler.submit_job(low_priority);
let high_priority = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.priority(20)
.gpus(2)
.build();
let (high_id, _) = scheduler.submit_job(high_priority);
let _results = scheduler.schedule_jobs();
let executions = executor.get_executions();
assert_eq!(executions[0].id, high_id);
assert_eq!(executions[0].gpu_ids, Some(smallvec![0, 1]));
}
#[test]
fn test_group_concurrency_limit() {
let (mut scheduler, _executor) = create_test_scheduler();
let group_id = Uuid::new_v4();
for _ in 0..3 {
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.group_id_uuid(Some(group_id))
.max_concurrent(Some(2))
.build();
scheduler.submit_job(job);
}
scheduler.schedule_jobs();
assert_eq!(scheduler.get_job(1).unwrap().state, JobState::Running);
assert_eq!(scheduler.get_job(2).unwrap().state, JobState::Running);
assert_eq!(scheduler.get_job(3).unwrap().state, JobState::Queued);
scheduler.finish_job(1);
scheduler.schedule_jobs();
assert_eq!(scheduler.get_job(3).unwrap().state, JobState::Running);
}
#[test]
fn test_cascade_redo_dependency_chain() {
use gflow::core::job::JobStateReason;
let (mut scheduler, _executor) = create_test_scheduler();
let job1 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.command("echo job1")
.build();
let (job1_id, _) = scheduler.submit_job(job1);
let job2 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.command("echo job2")
.depends_on_ids(vec![job1_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job2_id, _) = scheduler.submit_job(job2);
let job3 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.command("echo job3")
.depends_on_ids(vec![job2_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job3_id, _) = scheduler.submit_job(job3);
scheduler.schedule_jobs();
assert_eq!(scheduler.get_job(job1_id).unwrap().state, JobState::Running);
scheduler.fail_job(job1_id);
assert_eq!(scheduler.get_job(job1_id).unwrap().state, JobState::Failed);
scheduler.auto_cancel_dependent_jobs(job1_id);
assert_eq!(
scheduler.get_job(job2_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job2_id).unwrap().reason,
Some(Box::new(JobStateReason::DependencyFailed(job1_id)))
);
assert_eq!(
scheduler.get_job(job3_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job3_id).unwrap().reason,
Some(Box::new(JobStateReason::DependencyFailed(job2_id)))
);
let job4 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.command("echo job1")
.redone_from(Some(job1_id))
.build();
let (job4_id, _) = scheduler.submit_job(job4);
let job5 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.command("echo job2")
.depends_on_ids(vec![job4_id]) .auto_cancel_on_dependency_failure(true)
.redone_from(Some(job2_id))
.build();
let (job5_id, _) = scheduler.submit_job(job5);
let job6 = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.command("echo job3")
.depends_on_ids(vec![job5_id]) .auto_cancel_on_dependency_failure(true)
.redone_from(Some(job3_id))
.build();
let (job6_id, _) = scheduler.submit_job(job6);
assert_eq!(scheduler.get_job(job4_id).unwrap().state, JobState::Queued);
assert_eq!(scheduler.get_job(job5_id).unwrap().state, JobState::Queued);
assert_eq!(scheduler.get_job(job6_id).unwrap().state, JobState::Queued);
scheduler.schedule_jobs();
assert_eq!(scheduler.get_job(job4_id).unwrap().state, JobState::Running);
assert_eq!(scheduler.get_job(job5_id).unwrap().state, JobState::Queued);
scheduler.finish_job(job4_id);
scheduler.schedule_jobs();
assert_eq!(
scheduler.get_job(job4_id).unwrap().state,
JobState::Finished
);
assert_eq!(scheduler.get_job(job5_id).unwrap().state, JobState::Running);
assert_eq!(scheduler.get_job(job6_id).unwrap().state, JobState::Queued);
scheduler.finish_job(job5_id);
scheduler.schedule_jobs();
assert_eq!(
scheduler.get_job(job5_id).unwrap().state,
JobState::Finished
);
assert_eq!(scheduler.get_job(job6_id).unwrap().state, JobState::Running);
scheduler.finish_job(job6_id);
assert_eq!(
scheduler.get_job(job6_id).unwrap().state,
JobState::Finished
);
assert_eq!(
scheduler.get_job(job5_id).unwrap().depends_on_ids,
DependencyIds::from_slice(&[job4_id])
);
assert_eq!(
scheduler.get_job(job6_id).unwrap().depends_on_ids,
DependencyIds::from_slice(&[job5_id])
);
}