use super::*;
impl Scheduler {
pub fn create_reservation(
&mut self,
user: CompactString,
gpu_spec: crate::core::reservation::GpuSpec,
start_time: std::time::SystemTime,
duration: std::time::Duration,
) -> anyhow::Result<u32> {
use crate::core::conflict;
use crate::core::reservation::{GpuReservation, ReservationStatus};
let total_gpus = self.gpu_slots_count() as u32;
let gpu_count = gpu_spec.count();
if gpu_count == 0 {
anyhow::bail!("GPU count must be greater than 0");
}
if gpu_count > total_gpus {
anyhow::bail!(
"Requested {} GPUs but only {} GPUs available",
gpu_count,
total_gpus
);
}
if let Some(indices) = gpu_spec.indices() {
for &idx in indices {
if idx >= total_gpus {
anyhow::bail!(
"GPU index {} is out of range (available: 0-{})",
idx,
total_gpus - 1
);
}
}
}
let now = std::time::SystemTime::now();
if start_time < now {
anyhow::bail!("Start time cannot be in the past");
}
let end_time = start_time + duration;
let state = conflict::collect_reservation_state(&self.reservations, start_time, end_time);
conflict::check_reservation_conflict(&gpu_spec, &state, total_gpus)?;
let id = self.next_reservation_id;
self.next_reservation_id += 1;
let reservation = GpuReservation {
id,
user,
gpu_spec,
start_time,
duration,
status: ReservationStatus::Pending,
created_at: now,
cancelled_at: None,
};
self.reservations.push(reservation);
self.reservations.sort_by_key(|r| r.start_time);
Ok(id)
}
pub fn get_reservation(&self, id: u32) -> Option<&GpuReservation> {
self.reservations.iter().find(|r| r.id == id)
}
pub fn get_reservation_mut(&mut self, id: u32) -> Option<&mut GpuReservation> {
self.reservations.iter_mut().find(|r| r.id == id)
}
pub fn cancel_reservation(&mut self, id: u32) -> anyhow::Result<()> {
use crate::core::reservation::ReservationStatus;
let reservation = self
.get_reservation_mut(id)
.ok_or_else(|| anyhow::anyhow!("Reservation {} not found", id))?;
match reservation.status {
ReservationStatus::Completed => {
anyhow::bail!("Cannot cancel completed reservation");
}
ReservationStatus::Cancelled => {
anyhow::bail!("Reservation already cancelled");
}
ReservationStatus::Pending | ReservationStatus::Active => {
reservation.status = ReservationStatus::Cancelled;
reservation.cancelled_at = Some(std::time::SystemTime::now());
Ok(())
}
}
}
pub fn list_reservations(
&self,
user_filter: Option<&str>,
status_filter: Option<ReservationStatus>,
active_only: bool,
) -> Vec<&GpuReservation> {
let now = std::time::SystemTime::now();
self.reservations
.iter()
.filter(|r| {
if let Some(user) = user_filter {
if r.user != user {
return false;
}
}
if let Some(status) = status_filter {
if r.status != status {
return false;
}
}
if active_only && !r.is_active(now) {
return false;
}
true
})
.collect()
}
pub fn update_reservation_statuses(&mut self) {
use crate::core::reservation::ReservationStatus;
let now = std::time::SystemTime::now();
for reservation in &mut self.reservations {
reservation.update_status(now);
}
self.reservations.retain(|r| {
matches!(
r.status,
ReservationStatus::Pending | ReservationStatus::Active
)
});
}
pub fn get_active_reservations(&self) -> Vec<&GpuReservation> {
use crate::core::reservation::ReservationStatus;
let now = std::time::SystemTime::now();
self.reservations
.iter()
.filter(|r| r.status == ReservationStatus::Active && r.is_active(now))
.collect()
}
pub(super) fn check_job_respects_reservations(
&self,
job_user: &str,
job_gpu_count: u32,
available_gpus: &[u32],
) -> bool {
use crate::core::reservation::GpuSpec;
use std::collections::HashSet;
let active_reservations = self.get_active_reservations();
if active_reservations.is_empty() {
return true; }
let total_gpus = self.gpu_slots_count() as u32;
let mut blocked_indices = HashSet::new();
let mut user_reserved_count = 0u32;
let mut user_reserved_indices = Vec::new();
let mut other_count_reserved = 0u32;
for reservation in &active_reservations {
if reservation.user == job_user {
match &reservation.gpu_spec {
GpuSpec::Indices(indices) => {
user_reserved_indices.extend(indices.iter().copied());
}
GpuSpec::Count(count) => {
user_reserved_count += count;
}
}
} else {
match &reservation.gpu_spec {
GpuSpec::Indices(indices) => {
blocked_indices.extend(indices.iter().copied());
}
GpuSpec::Count(count) => {
other_count_reserved += count;
}
}
}
}
if !user_reserved_indices.is_empty() {
return job_gpu_count <= user_reserved_indices.len() as u32;
}
if user_reserved_count > 0 {
return job_gpu_count <= user_reserved_count;
}
let available_for_unreserved = total_gpus
.saturating_sub(blocked_indices.len() as u32)
.saturating_sub(other_count_reserved);
let usable_gpus: Vec<u32> = available_gpus
.iter()
.filter(|&&gpu| !blocked_indices.contains(&gpu))
.copied()
.collect();
job_gpu_count <= available_for_unreserved && job_gpu_count <= usable_gpus.len() as u32
}
pub(super) fn filter_usable_gpus(&self, job_user: &str, available_gpus: &[u32]) -> Vec<u32> {
use crate::core::reservation::GpuSpec;
use std::collections::HashSet;
let active_reservations = self.get_active_reservations();
if active_reservations.is_empty() {
return available_gpus.to_vec();
}
let mut blocked_indices = HashSet::new();
let mut user_reserved_indices = Vec::new();
for reservation in &active_reservations {
if reservation.user == job_user {
if let GpuSpec::Indices(indices) = &reservation.gpu_spec {
user_reserved_indices.extend(indices.iter().copied());
}
} else {
if let GpuSpec::Indices(indices) = &reservation.gpu_spec {
blocked_indices.extend(indices.iter().copied());
}
}
}
if !user_reserved_indices.is_empty() {
return user_reserved_indices
.into_iter()
.filter(|gpu| available_gpus.contains(gpu))
.collect();
}
available_gpus
.iter()
.filter(|&&gpu| !blocked_indices.contains(&gpu))
.copied()
.collect()
}
pub(super) fn reorder_usable_gpus(&self, job_id: u32, usable_gpus: &mut [u32]) {
match self.gpu_allocation_strategy {
GpuAllocationStrategy::Sequential => {
usable_gpus.sort_unstable();
}
GpuAllocationStrategy::Random => {
let time_seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() ^ ((d.subsec_nanos() as u64) << 32))
.unwrap_or(0);
let seed = time_seed ^ ((job_id as u64) << 32) ^ (self.next_job_id as u64);
usable_gpus.sort_unstable_by_key(|gpu| {
splitmix64(seed ^ ((*gpu as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15)))
});
}
}
}
}
fn splitmix64(mut x: u64) -> u64 {
x = x.wrapping_add(0x9E37_79B9_7F4A_7C15);
x = (x ^ (x >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
x = (x ^ (x >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
x ^ (x >> 31)
}