use crate::core::executor::Executor;
use crate::core::gpu::{GPUSlot, GpuUuid};
use crate::core::gpu_allocation::GpuAllocationStrategy;
use crate::core::info::{GpuInfo, SchedulerInfo};
use crate::core::job::{
DependencyMode, GpuIds, GpuSharingMode, Job, JobRuntime, JobSpec, JobState, JobStateReason,
JobView,
};
use crate::core::reservation::{GpuReservation, ReservationStatus};
use compact_str::{format_compact, CompactString};
use serde::{Deserialize, Deserializer, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::time::Duration;
#[path = "scheduler/access.rs"]
mod access;
#[path = "scheduler/builder.rs"]
mod builder;
#[path = "scheduler/persistence.rs"]
mod persistence;
#[path = "scheduler/reservations.rs"]
mod reservations;
#[path = "scheduler/scheduling.rs"]
mod scheduling;
#[path = "scheduler/transitions.rs"]
mod transitions;
pub use builder::SchedulerBuilder;
#[derive(Serialize)]
#[serde(default)]
pub struct Scheduler {
#[serde(default)]
pub version: u32,
#[serde(default)]
pub(crate) job_specs: Vec<JobSpec>,
#[serde(default)]
pub(crate) job_runtimes: Vec<JobRuntime>,
#[serde(skip)]
pub(crate) executor: Option<Box<dyn Executor>>,
#[serde(skip)]
pub(crate) gpu_slots: HashMap<GpuUuid, GPUSlot>,
#[serde(skip)]
pub(crate) total_memory_mb: u64,
#[serde(skip)]
pub(crate) available_memory_mb: u64,
pub(crate) state_path: PathBuf,
pub(crate) next_job_id: u32,
pub(crate) allowed_gpu_indices: Option<Vec<u32>>,
#[serde(skip)]
pub(crate) gpu_allocation_strategy: GpuAllocationStrategy,
#[serde(skip)]
pub(crate) user_jobs_index: HashMap<CompactString, Vec<u32>>,
#[serde(skip)]
pub(crate) state_jobs_index: HashMap<JobState, Vec<u32>>,
#[serde(skip)]
pub(crate) project_jobs_index: HashMap<CompactString, Vec<u32>>,
#[serde(skip)]
pub(crate) dependency_graph: HashMap<u32, Vec<u32>>,
#[serde(skip)]
pub(crate) group_running_count: HashMap<uuid::Uuid, usize>,
pub reservations: Vec<GpuReservation>,
pub next_reservation_id: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::job::JobBuilder;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use uuid::Uuid;
struct MockExecutor {
executions: Arc<Mutex<Vec<Job>>>,
should_fail: bool,
}
impl Executor for MockExecutor {
fn execute(&self, job: &Job) -> anyhow::Result<()> {
if self.should_fail {
anyhow::bail!("Mock execution failed")
} else {
self.executions.lock().unwrap().push(job.clone());
Ok(())
}
}
}
fn create_test_scheduler() -> Scheduler {
let executor = Box::new(MockExecutor {
executions: Arc::new(Mutex::new(Vec::new())),
should_fail: false,
});
SchedulerBuilder::new()
.with_executor(executor)
.with_state_path(PathBuf::from("/tmp/test.json"))
.with_total_memory_mb(16 * 1024)
.build()
}
#[test]
fn test_deserialize_legacy_jobs_map_msgpack_int_keys() {
#[derive(Serialize)]
struct LegacySchedulerState {
version: u32,
jobs: HashMap<u32, Job>,
state_path: PathBuf,
next_job_id: u32,
allowed_gpu_indices: Option<Vec<u32>>,
reservations: Vec<GpuReservation>,
next_reservation_id: u32,
}
let mut jobs = HashMap::new();
let mut job = JobBuilder::new().command("echo hi").gpus(1).build();
job.id = 1;
jobs.insert(1, job);
let legacy = LegacySchedulerState {
version: crate::core::migrations::CURRENT_VERSION,
jobs,
state_path: PathBuf::from("state.json"),
next_job_id: 2,
allowed_gpu_indices: None,
reservations: Vec::new(),
next_reservation_id: 1,
};
let bytes = rmp_serde::to_vec_named(&legacy).unwrap();
let scheduler: Scheduler = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(scheduler.job_specs.len(), 1);
assert_eq!(scheduler.job_runtimes.len(), 1);
let cmd = scheduler
.get_job_spec(1)
.unwrap()
.command
.as_ref()
.map(|s| s.as_str());
assert_eq!(cmd, Some("echo hi"));
}
#[test]
fn test_deserialize_legacy_scheduler_seq_msgpack_v2() {
let mut job = JobBuilder::new().command("echo hi").gpus(1).build();
job.id = 1;
let jobs = vec![job];
let legacy = (
2u32,
jobs,
PathBuf::from("/home/happy/.local/share/gflow/state.json"),
2u32,
Option::<Vec<u32>>::None,
);
let bytes = rmp_serde::to_vec(&legacy).unwrap();
let scheduler: Scheduler = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(scheduler.job_specs.len(), 1);
assert_eq!(scheduler.job_runtimes.len(), 1);
let cmd = scheduler
.get_job_spec(1)
.unwrap()
.command
.as_ref()
.map(|s| s.as_str());
assert_eq!(cmd, Some("echo hi"));
}
fn create_test_job(username: &str) -> Job {
JobBuilder::new()
.submitted_by(username.to_string())
.run_dir("/tmp")
.build()
}
#[test]
fn test_submit_job() {
let mut scheduler = create_test_scheduler();
let job = create_test_job("test");
let (job_id, run_name) = scheduler.submit_job(job);
assert_eq!(job_id, 1);
assert_eq!(run_name, "gjob-1");
assert!(scheduler.job_exists(1));
assert_eq!(scheduler.get_job(1).unwrap().state, JobState::Queued);
}
#[test]
fn test_gpu_allocation_strategy_sequential_uses_lowest_indices_first() {
let mut scheduler = create_test_scheduler();
scheduler.set_gpu_allocation_strategy(GpuAllocationStrategy::Sequential);
for i in 0..4 {
scheduler.gpu_slots.insert(
format!("GPU-{}", i),
GPUSlot {
index: i,
available: true,
total_memory_mb: None,
reason: None,
},
);
}
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(2)
.build();
let (job_id, _) = scheduler.submit_job(job);
let prepared = scheduler.prepare_jobs_for_execution();
assert_eq!(prepared.len(), 1);
assert_eq!(prepared[0].id, job_id);
assert_eq!(
scheduler.get_job(job_id).and_then(|j| j.gpu_ids),
Some(GpuIds::from_iter([0, 1]))
);
}
#[test]
fn test_shared_jobs_can_share_same_gpu() {
let mut scheduler = create_test_scheduler();
scheduler.gpu_slots.insert(
"GPU-0".to_string(),
GPUSlot {
index: 0,
available: true,
total_memory_mb: None,
reason: None,
},
);
let job_a = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.shared(true)
.build();
let (job_a_id, _) = scheduler.submit_job(job_a);
let prepared_a = scheduler.prepare_jobs_for_execution();
assert_eq!(prepared_a.len(), 1);
assert_eq!(prepared_a[0].id, job_a_id);
assert_eq!(
scheduler.get_job(job_a_id).and_then(|j| j.gpu_ids),
Some(GpuIds::from_iter([0]))
);
let job_b = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.shared(true)
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
let prepared_b = scheduler.prepare_jobs_for_execution();
assert_eq!(prepared_b.len(), 1);
assert_eq!(prepared_b[0].id, job_b_id);
assert_eq!(
scheduler.get_job(job_b_id).and_then(|j| j.gpu_ids),
Some(GpuIds::from_iter([0]))
);
}
#[test]
fn test_exclusive_job_waits_when_shared_job_is_running() {
let mut scheduler = create_test_scheduler();
scheduler.gpu_slots.insert(
"GPU-0".to_string(),
GPUSlot {
index: 0,
available: true,
total_memory_mb: None,
reason: None,
},
);
let shared_job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.shared(true)
.build();
let (shared_job_id, _) = scheduler.submit_job(shared_job);
scheduler.prepare_jobs_for_execution();
let exclusive_job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.build();
let (exclusive_job_id, _) = scheduler.submit_job(exclusive_job);
let prepared = scheduler.prepare_jobs_for_execution();
assert!(prepared.is_empty());
assert_eq!(
scheduler.get_job(exclusive_job_id).map(|j| j.state),
Some(JobState::Queued)
);
scheduler.finish_job(shared_job_id);
let prepared_after_finish = scheduler.prepare_jobs_for_execution();
assert_eq!(prepared_after_finish.len(), 1);
assert_eq!(prepared_after_finish[0].id, exclusive_job_id);
}
#[test]
fn test_shared_job_can_still_schedule_after_one_shared_job_finishes() {
let mut scheduler = create_test_scheduler();
scheduler.gpu_slots.insert(
"GPU-0".to_string(),
GPUSlot {
index: 0,
available: true,
total_memory_mb: None,
reason: None,
},
);
let job_a = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.shared(true)
.build();
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.shared(true)
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
let prepared = scheduler.prepare_jobs_for_execution();
assert_eq!(prepared.len(), 2);
assert_eq!(
scheduler.get_job(job_b_id).map(|j| j.state),
Some(JobState::Running)
);
scheduler.finish_job(job_a_id);
let job_c = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.shared(true)
.build();
let (job_c_id, _) = scheduler.submit_job(job_c);
let prepared_c = scheduler.prepare_jobs_for_execution();
assert_eq!(prepared_c.len(), 1);
assert_eq!(prepared_c[0].id, job_c_id);
assert_eq!(
scheduler.get_job(job_c_id).and_then(|j| j.gpu_ids),
Some(GpuIds::from_iter([0]))
);
}
#[test]
fn test_shared_jobs_respect_per_gpu_memory_limits() {
let mut scheduler = create_test_scheduler();
scheduler.gpu_slots.insert(
"GPU-0".to_string(),
GPUSlot {
index: 0,
available: true,
total_memory_mb: Some(10_000),
reason: None,
},
);
let job_a = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.shared(true)
.gpu_memory_limit_mb(Some(8_000))
.build();
scheduler.submit_job(job_a);
let first = scheduler.prepare_jobs_for_execution();
assert_eq!(first.len(), 1);
let job_b = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.gpus(1)
.shared(true)
.gpu_memory_limit_mb(Some(3_000))
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
let second = scheduler.prepare_jobs_for_execution();
assert!(second.is_empty());
assert_eq!(
scheduler.get_job(job_b_id).map(|j| j.state),
Some(JobState::Queued)
);
}
#[test]
fn test_scheduler_info_includes_gpu_allocation_strategy() {
let mut scheduler = create_test_scheduler();
scheduler.set_gpu_allocation_strategy(GpuAllocationStrategy::Random);
let info = scheduler.info();
assert_eq!(info.gpu_allocation_strategy, GpuAllocationStrategy::Random);
}
#[test]
fn test_group_running_count_updates_on_run_and_finish() {
let mut scheduler = create_test_scheduler();
let group_id = Uuid::new_v4();
let job = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.group_id_uuid(Some(group_id))
.max_concurrent(Some(10))
.build();
let (job_id, _) = scheduler.submit_job(job);
let jobs = scheduler.prepare_jobs_for_execution();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].id, job_id);
assert_eq!(scheduler.group_running_count.get(&group_id), Some(&1));
assert!(scheduler
.state_jobs_index
.get(&JobState::Running)
.is_some_and(|v| v.contains(&job_id)));
scheduler.finish_job(job_id).unwrap();
assert!(!scheduler.group_running_count.contains_key(&group_id));
assert!(scheduler
.state_jobs_index
.get(&JobState::Finished)
.is_some_and(|v| v.contains(&job_id)));
}
#[test]
fn test_resolve_dependency_most_recent() {
let mut scheduler = create_test_scheduler();
for _i in 0..3 {
let job = JobBuilder::new()
.submitted_by("alice")
.run_dir("/tmp")
.build();
scheduler.submit_job(job);
}
assert_eq!(scheduler.resolve_dependency("alice", "@"), Some(3));
}
#[test]
fn test_resolve_dependency_offset() {
let mut scheduler = create_test_scheduler();
for _i in 0..5 {
let job = JobBuilder::new()
.submitted_by("bob")
.run_dir("/tmp")
.build();
scheduler.submit_job(job);
}
assert_eq!(scheduler.resolve_dependency("bob", "@~1"), Some(5));
assert_eq!(scheduler.resolve_dependency("bob", "@~2"), Some(4));
assert_eq!(scheduler.resolve_dependency("bob", "@~5"), Some(1));
assert_eq!(scheduler.resolve_dependency("bob", "@~6"), None); }
#[test]
fn test_calculate_time_bonus() {
assert_eq!(Scheduler::calculate_time_bonus(&None), 100);
assert_eq!(
Scheduler::calculate_time_bonus(&Some(Duration::from_secs(60))),
299
);
assert_eq!(
Scheduler::calculate_time_bonus(&Some(Duration::from_secs(24 * 3600))),
200
);
}
#[test]
fn test_refresh_available_memory() {
let mut scheduler = create_test_scheduler();
let total = scheduler.total_memory_mb;
let job = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.memory_limit_mb(Some(1024))
.build();
let (job_id, _) = scheduler.submit_job(job);
assert!(scheduler
.transition_job_state(job_id, JobState::Running, None)
.unwrap());
scheduler.refresh_available_memory();
assert_eq!(scheduler.available_memory_mb, total - 1024);
}
#[test]
fn test_state_jobs_index_updates_on_transitions() {
let mut scheduler = create_test_scheduler();
let job = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.build();
let (job_id, _) = scheduler.submit_job(job);
assert_eq!(
scheduler.state_jobs_index.get(&JobState::Queued).unwrap(),
&vec![job_id]
);
assert!(scheduler
.transition_job_state(job_id, JobState::Running, None)
.unwrap());
assert!(scheduler
.state_jobs_index
.get(&JobState::Queued)
.is_none_or(|v| !v.contains(&job_id)));
assert!(scheduler
.state_jobs_index
.get(&JobState::Running)
.is_some_and(|v| v.contains(&job_id)));
scheduler.finish_job(job_id).unwrap();
assert!(scheduler
.state_jobs_index
.get(&JobState::Running)
.is_none_or(|v| !v.contains(&job_id)));
assert!(scheduler
.state_jobs_index
.get(&JobState::Finished)
.is_some_and(|v| v.contains(&job_id)));
scheduler.rebuild_user_jobs_index();
assert!(scheduler
.state_jobs_index
.get(&JobState::Finished)
.is_some_and(|v| v.contains(&job_id)));
}
#[test]
#[allow(deprecated)]
fn test_schedule_jobs_without_executor_does_not_mutate_state() {
let mut scheduler = SchedulerBuilder::new()
.with_state_path(PathBuf::from("/tmp/test.json"))
.with_total_memory_mb(16 * 1024)
.build();
let job = create_test_job("test");
let (job_id, _) = scheduler.submit_job(job);
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Queued);
let initial_available_memory = scheduler.available_memory_mb;
let results = scheduler.schedule_jobs();
assert_eq!(results.len(), 0);
assert_eq!(
scheduler.get_job(job_id).unwrap().state,
JobState::Queued,
"Job should remain Queued when no executor is present"
);
assert_eq!(
scheduler.available_memory_mb, initial_available_memory,
"Memory should not be allocated when no executor is present"
);
assert_eq!(
scheduler.get_job(job_id).unwrap().gpu_ids,
None,
"GPU IDs should not be assigned when no executor is present"
);
assert_eq!(
scheduler.get_job(job_id).unwrap().started_at,
None,
"started_at should not be set when no executor is present"
);
}
#[test]
fn test_auto_cancel_direct_dependent() {
let mut scheduler = create_test_scheduler();
let job_a = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.build();
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_a_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
scheduler.fail_job(job_a_id);
let cancelled = scheduler.auto_cancel_dependent_jobs(job_a_id);
assert_eq!(cancelled.len(), 1);
assert!(cancelled.contains(&job_b_id));
assert_eq!(
scheduler.get_job(job_b_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job_b_id).unwrap().reason,
Some(Box::new(JobStateReason::DependencyFailed(job_a_id)))
);
}
#[test]
fn test_auto_cancel_transitive_dependencies() {
let mut scheduler = create_test_scheduler();
let job_a = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.build();
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_a_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
let job_c = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_b_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job_c_id, _) = scheduler.submit_job(job_c);
scheduler.fail_job(job_a_id);
let cancelled = scheduler.auto_cancel_dependent_jobs(job_a_id);
assert_eq!(cancelled.len(), 2);
assert!(cancelled.contains(&job_b_id));
assert!(cancelled.contains(&job_c_id));
assert_eq!(
scheduler.get_job(job_b_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job_c_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job_b_id).unwrap().reason,
Some(Box::new(JobStateReason::DependencyFailed(job_a_id)))
);
assert_eq!(
scheduler.get_job(job_c_id).unwrap().reason,
Some(Box::new(JobStateReason::DependencyFailed(job_b_id)))
);
}
#[test]
fn test_auto_cancel_deep_chain() {
let mut scheduler = create_test_scheduler();
let job_a = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.build();
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_a_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
let job_c = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_b_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job_c_id, _) = scheduler.submit_job(job_c);
let job_d = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_c_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job_d_id, _) = scheduler.submit_job(job_d);
let job_e = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_d_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job_e_id, _) = scheduler.submit_job(job_e);
scheduler.fail_job(job_a_id);
let cancelled = scheduler.auto_cancel_dependent_jobs(job_a_id);
assert_eq!(cancelled.len(), 4);
assert!(cancelled.contains(&job_b_id));
assert!(cancelled.contains(&job_c_id));
assert!(cancelled.contains(&job_d_id));
assert!(cancelled.contains(&job_e_id));
assert_eq!(
scheduler.get_job(job_b_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job_c_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job_d_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job_e_id).unwrap().state,
JobState::Cancelled
);
}
#[test]
fn test_auto_cancel_respects_flag() {
let mut scheduler = create_test_scheduler();
let job_a = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.build();
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_a_id])
.auto_cancel_on_dependency_failure(false)
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
scheduler.fail_job(job_a_id);
let cancelled = scheduler.auto_cancel_dependent_jobs(job_a_id);
assert_eq!(cancelled.len(), 0);
assert_eq!(scheduler.get_job(job_b_id).unwrap().state, JobState::Queued);
}
#[test]
fn test_auto_cancel_mixed_flags() {
let mut scheduler = create_test_scheduler();
let job_a = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.build();
let (job_a_id, _) = scheduler.submit_job(job_a);
let job_b = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_a_id])
.auto_cancel_on_dependency_failure(true)
.build();
let (job_b_id, _) = scheduler.submit_job(job_b);
let job_c = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.depends_on_ids(vec![job_b_id])
.auto_cancel_on_dependency_failure(false)
.build();
let (job_c_id, _) = scheduler.submit_job(job_c);
scheduler.fail_job(job_a_id);
let cancelled = scheduler.auto_cancel_dependent_jobs(job_a_id);
assert_eq!(cancelled.len(), 1);
assert!(cancelled.contains(&job_b_id));
assert_eq!(
scheduler.get_job(job_b_id).unwrap().state,
JobState::Cancelled
);
assert_eq!(scheduler.get_job(job_c_id).unwrap().state, JobState::Queued);
}
#[test]
fn test_create_reservation_with_indices() {
use crate::core::reservation::GpuSpec;
let mut scheduler = create_test_scheduler();
for i in 0..4 {
scheduler.gpu_slots.insert(
format!("GPU-{}", i),
GPUSlot {
index: i,
available: true,
total_memory_mb: None,
reason: None,
},
);
}
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let result = scheduler.create_reservation(
"alice".into(),
GpuSpec::Indices(vec![0, 2]),
start_time,
duration,
);
assert!(result.is_ok());
let reservation_id = result.unwrap();
assert_eq!(reservation_id, 1);
let reservation = scheduler.get_reservation(reservation_id).unwrap();
assert_eq!(reservation.user, "alice");
assert_eq!(reservation.gpu_spec, GpuSpec::Indices(vec![0, 2]));
assert_eq!(reservation.gpu_spec.count(), 2);
}
#[test]
fn test_reservation_conflict_indices_vs_indices() {
use crate::core::reservation::GpuSpec;
let mut scheduler = create_test_scheduler();
for i in 0..4 {
scheduler.gpu_slots.insert(
format!("GPU-{}", i),
GPUSlot {
index: i,
available: true,
total_memory_mb: None,
reason: None,
},
);
}
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
scheduler
.create_reservation(
"alice".into(),
GpuSpec::Indices(vec![0, 1]),
start_time,
duration,
)
.unwrap();
let result = scheduler.create_reservation(
"bob".into(),
GpuSpec::Indices(vec![1, 2]),
start_time,
duration,
);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("GPU index 1 is already reserved"));
let result = scheduler.create_reservation(
"bob".into(),
GpuSpec::Indices(vec![2, 3]),
start_time,
duration,
);
assert!(result.is_ok());
}
#[test]
fn test_reservation_conflict_count_vs_indices() {
use crate::core::reservation::GpuSpec;
let mut scheduler = create_test_scheduler();
for i in 0..4 {
scheduler.gpu_slots.insert(
format!("GPU-{}", i),
GPUSlot {
index: i,
available: true,
total_memory_mb: None,
reason: None,
},
);
}
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
scheduler
.create_reservation(
"alice".into(),
GpuSpec::Indices(vec![0, 1]),
start_time,
duration,
)
.unwrap();
let result =
scheduler.create_reservation("bob".into(), GpuSpec::Count(3), start_time, duration);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("conflicts"));
let result =
scheduler.create_reservation("bob".into(), GpuSpec::Count(2), start_time, duration);
assert!(result.is_ok());
}
#[test]
fn test_reservation_out_of_range_index() {
use crate::core::reservation::GpuSpec;
let mut scheduler = create_test_scheduler();
for i in 0..2 {
scheduler.gpu_slots.insert(
format!("GPU-{}", i),
GPUSlot {
index: i,
available: true,
total_memory_mb: None,
reason: None,
},
);
}
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let result = scheduler.create_reservation(
"alice".into(),
GpuSpec::Indices(vec![0, 3]),
start_time,
duration,
);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("GPU index 3 is out of range"));
}
mod proptests {
use super::*;
use crate::core::reservation::GpuSpec;
use proptest::prelude::*;
fn scheduler_with_gpus(n: u32) -> Scheduler {
let mut scheduler = create_test_scheduler();
for i in 0..n {
scheduler.gpu_slots.insert(
format!("GPU-{}", i),
GPUSlot {
index: i,
available: true,
total_memory_mb: None,
reason: None,
},
);
}
scheduler
}
proptest! {
#[test]
fn prop_no_gpu_overallocation(
total_gpus in 2u32..8,
reservation_count in 1usize..5,
) {
let mut scheduler = scheduler_with_gpus(total_gpus);
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let mut successful_reservations = Vec::new();
for i in 0..reservation_count {
let gpu_count = (i as u32 % total_gpus) + 1;
let result = scheduler.create_reservation(
format!("user{}", i).into(),
GpuSpec::Count(gpu_count),
start_time,
duration,
);
if result.is_ok() {
successful_reservations.push(gpu_count);
}
}
let total_allocated: u32 = successful_reservations.iter().sum();
prop_assert!(total_allocated <= total_gpus);
}
#[test]
fn prop_no_index_overlap(
total_gpus in 4u32..8,
) {
let mut scheduler = scheduler_with_gpus(total_gpus);
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let res1 = scheduler.create_reservation(
"alice".into(),
GpuSpec::Indices(vec![0, 1]),
start_time,
duration,
);
prop_assert!(res1.is_ok());
let res2 = scheduler.create_reservation(
"bob".into(),
GpuSpec::Indices(vec![1, 2]),
start_time,
duration,
);
prop_assert!(res2.is_err());
let res3 = scheduler.create_reservation(
"charlie".into(),
GpuSpec::Indices(vec![2, 3]),
start_time,
duration,
);
prop_assert!(res3.is_ok());
}
#[test]
fn prop_count_respects_indices(
total_gpus in 4u32..8,
reserved_indices_count in 1u32..3,
) {
let mut scheduler = scheduler_with_gpus(total_gpus);
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let indices: Vec<u32> = (0..reserved_indices_count).collect();
scheduler.create_reservation(
"alice".into(),
GpuSpec::Indices(indices),
start_time,
duration,
).unwrap();
let available_for_count = total_gpus - reserved_indices_count;
let res1 = scheduler.create_reservation(
"bob".into(),
GpuSpec::Count(available_for_count),
start_time,
duration,
);
prop_assert!(res1.is_ok());
let res2 = scheduler.create_reservation(
"charlie".into(),
GpuSpec::Count(1),
start_time,
duration,
);
prop_assert!(res2.is_err());
}
#[test]
fn prop_no_conflict_different_times(
total_gpus in 2u32..8,
gpu_count in 1u32..4,
time_gap in 1u64..1000,
) {
let mut scheduler = scheduler_with_gpus(total_gpus);
let gpu_count = std::cmp::min(gpu_count, total_gpus);
let start1 = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration1 = std::time::Duration::from_secs(7200);
let res1 = scheduler.create_reservation(
"alice".into(),
GpuSpec::Count(gpu_count),
start1,
duration1,
);
prop_assert!(res1.is_ok());
let start2 = start1 + duration1 + std::time::Duration::from_secs(time_gap);
let duration2 = std::time::Duration::from_secs(3600);
let res2 = scheduler.create_reservation(
"bob".into(),
GpuSpec::Count(gpu_count),
start2,
duration2,
);
prop_assert!(res2.is_ok());
}
#[test]
fn prop_cancel_frees_resources(
total_gpus in 2u32..8,
) {
let mut scheduler = scheduler_with_gpus(total_gpus);
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let res1_id = scheduler.create_reservation(
"alice".into(),
GpuSpec::Count(total_gpus),
start_time,
duration,
).unwrap();
let res2 = scheduler.create_reservation(
"bob".into(),
GpuSpec::Count(1),
start_time,
duration,
);
prop_assert!(res2.is_err());
scheduler.cancel_reservation(res1_id).unwrap();
let res3 = scheduler.create_reservation(
"charlie".into(),
GpuSpec::Count(total_gpus),
start_time,
duration,
);
prop_assert!(res3.is_ok());
}
#[test]
fn prop_reject_invalid_indices(
total_gpus in 2u32..8,
invalid_index in 8u32..100,
) {
let mut scheduler = scheduler_with_gpus(total_gpus);
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let result = scheduler.create_reservation(
"alice".into(),
GpuSpec::Indices(vec![0, invalid_index]),
start_time,
duration,
);
prop_assert!(result.is_err());
prop_assert!(result.unwrap_err().to_string().contains("out of range"));
}
#[test]
fn prop_reject_zero_gpus(
total_gpus in 2u32..8,
) {
let mut scheduler = scheduler_with_gpus(total_gpus);
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let result = scheduler.create_reservation(
"alice".into(),
GpuSpec::Count(0),
start_time,
duration,
);
prop_assert!(result.is_err());
prop_assert!(result.unwrap_err().to_string().contains("must be greater than 0"));
}
#[test]
fn prop_reject_excessive_gpus(
total_gpus in 2u32..8,
extra in 1u32..10,
) {
let mut scheduler = scheduler_with_gpus(total_gpus);
let start_time = std::time::SystemTime::now() + std::time::Duration::from_secs(3600);
let duration = std::time::Duration::from_secs(7200);
let excessive_count = total_gpus + extra;
let result = scheduler.create_reservation(
"alice".into(),
GpuSpec::Count(excessive_count),
start_time,
duration,
);
prop_assert!(result.is_err());
}
}
}
}