1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
use super::*;
impl Scheduler {
/// Update the cached state->job_ids index.
///
/// This maintains sorted job IDs per state so API handlers can iterate in stable ID order
/// without scanning all jobs.
pub(super) fn update_state_jobs_index(
&mut self,
job_id: u32,
old_state: JobState,
new_state: JobState,
) {
if old_state == new_state {
return;
}
if let Some(v) = self.state_jobs_index.get_mut(&old_state) {
if let Ok(pos) = v.binary_search(&job_id) {
v.remove(pos);
if v.is_empty() {
self.state_jobs_index.remove(&old_state);
}
}
}
let entry = self.state_jobs_index.entry(new_state).or_default();
match entry.binary_search(&job_id) {
Ok(_) => {} // already present
Err(pos) => entry.insert(pos, job_id),
}
}
/// Update the cached project->job_ids index.
///
/// This maintains sorted job IDs per project so API handlers can iterate in stable ID order
/// without scanning all jobs. Uses binary search to maintain sort order.
///
/// Note: Projects are immutable after job submission. This method is designed for initial
/// indexing and rebuild operations, not for updating existing job projects.
pub(super) fn update_project_jobs_index(
&mut self,
job_id: u32,
old_project: Option<&CompactString>,
new_project: Option<&CompactString>,
) {
if old_project == new_project {
return;
}
// Remove from old project
if let Some(old_proj) = old_project {
if let Some(v) = self.project_jobs_index.get_mut(old_proj) {
if let Ok(pos) = v.binary_search(&job_id) {
v.remove(pos);
if v.is_empty() {
self.project_jobs_index.remove(old_proj);
}
}
}
}
// Add to new project
if let Some(new_proj) = new_project {
let entry = self.project_jobs_index.entry(new_proj.clone()).or_default();
match entry.binary_search(&job_id) {
Ok(_) => {} // already present
Err(pos) => entry.insert(pos, job_id),
}
}
}
/// Get job IDs by project for fast filtering.
///
/// Returns a sorted list of job IDs (ascending order) for the given project,
/// or None if no jobs exist for that project.
pub fn job_ids_by_project(&self, project: &str) -> Option<&Vec<u32>> {
self.project_jobs_index.get(project)
}
/// Get a JobSpec by ID (job IDs start at 1, so we subtract 1 for the index)
#[inline]
pub fn get_job_spec(&self, job_id: u32) -> Option<&JobSpec> {
if job_id == 0 {
return None;
}
self.job_specs.get((job_id - 1) as usize)
}
/// Get a JobRuntime by ID
#[inline]
pub fn get_job_runtime(&self, job_id: u32) -> Option<&JobRuntime> {
if job_id == 0 {
return None;
}
self.job_runtimes.get((job_id - 1) as usize)
}
/// Get a mutable JobRuntime by ID
#[inline]
pub fn get_job_runtime_mut(&mut self, job_id: u32) -> Option<&mut JobRuntime> {
if job_id == 0 {
return None;
}
self.job_runtimes.get_mut((job_id - 1) as usize)
}
/// Get a JobView combining spec and runtime
pub fn get_job_view(&self, job_id: u32) -> Option<JobView> {
let spec = self.get_job_spec(job_id)?;
let runtime = self.get_job_runtime(job_id)?;
Some(JobView::from_refs(spec, runtime))
}
/// Borrow `JobSpec + JobRuntime` for a job without allocating.
pub fn get_job_parts(&self, job_id: u32) -> Option<(&JobSpec, &JobRuntime)> {
let idx = job_id.checked_sub(1)? as usize;
let spec = self.job_specs.get(idx)?;
let rt = self.job_runtimes.get(idx)?;
Some((spec, rt))
}
/// Mutably borrow `JobSpec + JobRuntime` for a job without allocating.
pub fn get_job_parts_mut(&mut self, job_id: u32) -> Option<(&mut JobSpec, &mut JobRuntime)> {
let idx = job_id.checked_sub(1)? as usize;
let spec = self.job_specs.get_mut(idx)?;
let rt = self.job_runtimes.get_mut(idx)?;
Some((spec, rt))
}
/// Check invariant: job_specs and job_runtimes must have same length
#[inline]
pub(super) fn check_invariant(&self) {
debug_assert_eq!(
self.job_specs.len(),
self.job_runtimes.len(),
"job_specs and job_runtimes must have same length"
);
debug_assert!(
self.dependency_runtimes.is_empty()
|| self.dependency_runtimes.len() == self.job_runtimes.len(),
"dependency_runtimes must be empty or aligned with job_runtimes"
);
}
/// Total jobs stored in the scheduler.
#[inline]
pub fn jobs_len(&self) -> usize {
self.job_runtimes.len()
}
/// Read-only access to all job specs (cold data).
pub fn job_specs(&self) -> &[JobSpec] {
&self.job_specs
}
/// Read-only access to all job runtimes (hot data).
pub fn job_runtimes(&self) -> &[JobRuntime] {
&self.job_runtimes
}
/// Materialize a legacy `Job` by composing `JobSpec + JobRuntime`.
///
/// This is intentionally **not** the primary storage representation (to keep the hot
/// contiguous working set small). Prefer using `get_job_spec*` / `get_job_runtime*` for
/// internal logic.
#[inline]
pub fn get_job(&self, job_id: u32) -> Option<Job> {
let spec = self.get_job_spec(job_id)?;
let runtime = self.get_job_runtime(job_id)?;
Some(Job::from_parts(spec.clone(), runtime.clone()))
}
/// Materialize all jobs as legacy `Job` structs (allocates/clones).
pub fn jobs_as_vec(&self) -> Vec<Job> {
self.check_invariant();
self.job_specs
.iter()
.zip(self.job_runtimes.iter())
.map(|(spec, runtime)| Job::from_parts(spec.clone(), runtime.clone()))
.collect()
}
/// Check if a job exists
#[inline]
pub fn job_exists(&self, job_id: u32) -> bool {
job_id != 0 && (job_id as usize) <= self.job_runtimes.len()
}
/// Get available GPU slots respecting restrictions
pub fn get_available_gpu_slots(&self) -> Vec<u32> {
let mut slots: Vec<u32> = self
.gpu_slots
.values()
.filter(|slot| slot.available)
.map(|slot| slot.index)
.filter(|&index| {
// Apply GPU restriction filter
match &self.allowed_gpu_indices {
None => true, // No restriction, all GPUs allowed
Some(allowed) => allowed.contains(&index),
}
})
.collect();
slots.sort_unstable();
slots
}
/// Get scheduler info (GPU status and restrictions)
pub fn info(&self) -> SchedulerInfo {
let mut gpus: Vec<GpuInfo> = self
.gpu_slots
.iter()
.map(|(uuid, slot)| GpuInfo {
uuid: uuid.clone(),
index: slot.index,
available: slot.available,
reason: slot.reason.clone(),
})
.collect();
// Sort by index for stable output
gpus.sort_by_key(|g| g.index);
SchedulerInfo {
gpus,
allowed_gpu_indices: self.allowed_gpu_indices.clone(),
gpu_allocation_strategy: self.gpu_allocation_strategy,
}
}
/// Get total number of GPU slots
pub fn gpu_slots_count(&self) -> usize {
self.gpu_slots.len()
}
/// Check whether a GPU slot with the given scheduler-visible index exists.
pub fn has_gpu_index(&self, index: u32) -> bool {
self.gpu_slots.values().any(|slot| slot.index == index)
}
/// Set GPU restrictions
pub fn set_allowed_gpu_indices(&mut self, indices: Option<Vec<u32>>) {
self.allowed_gpu_indices = indices;
}
/// Get GPU restrictions
pub fn allowed_gpu_indices(&self) -> Option<&Vec<u32>> {
self.allowed_gpu_indices.as_ref()
}
/// Set GPU allocation strategy.
pub fn set_gpu_allocation_strategy(&mut self, strategy: GpuAllocationStrategy) {
self.gpu_allocation_strategy = strategy;
}
/// Get current GPU allocation strategy.
pub fn gpu_allocation_strategy(&self) -> GpuAllocationStrategy {
self.gpu_allocation_strategy
}
}