forge_orchestration/scheduler/
algorithms.rs1use super::{NodeResources, Workload};
10
11pub trait SchedulingAlgorithm: Send + Sync {
13 fn score(&self, workload: &Workload, node: &NodeResources) -> f64;
15
16 fn name(&self) -> &str;
18}
19
20#[derive(Debug, Clone)]
22pub struct BinPackScheduler {
23 cpu_weight: f64,
25 memory_weight: f64,
27 gpu_weight: f64,
29}
30
31impl BinPackScheduler {
32 pub fn new() -> Self {
34 Self {
35 cpu_weight: 1.0,
36 memory_weight: 1.0,
37 gpu_weight: 2.0, }
39 }
40
41 pub fn with_weights(mut self, cpu: f64, memory: f64, gpu: f64) -> Self {
43 self.cpu_weight = cpu;
44 self.memory_weight = memory;
45 self.gpu_weight = gpu;
46 self
47 }
48}
49
50impl Default for BinPackScheduler {
51 fn default() -> Self {
52 Self::new()
53 }
54}
55
56impl SchedulingAlgorithm for BinPackScheduler {
57 fn score(&self, workload: &Workload, node: &NodeResources) -> f64 {
58 let cpu_util = node.cpu_allocated as f64 / node.cpu_capacity as f64;
60 let mem_util = node.memory_allocated as f64 / node.memory_capacity as f64;
61
62 let gpu_util = if !node.gpus.is_empty() {
63 node.gpus_allocated.len() as f64 / node.gpus.len() as f64
64 } else {
65 0.0
66 };
67
68 let base_score = (cpu_util * self.cpu_weight + mem_util * self.memory_weight + gpu_util * self.gpu_weight)
70 / (self.cpu_weight + self.memory_weight + self.gpu_weight);
71
72 let cpu_headroom = (node.cpu_available() as f64 - workload.resources.cpu_millis as f64)
74 / node.cpu_capacity as f64;
75 let mem_headroom = (node.memory_available() as f64 - workload.resources.memory_mb as f64)
76 / node.memory_capacity as f64;
77
78 let headroom_penalty = if cpu_headroom < 0.05 || mem_headroom < 0.05 {
79 0.1
80 } else {
81 0.0
82 };
83
84 (base_score - headroom_penalty).max(0.0)
85 }
86
87 fn name(&self) -> &str {
88 "bin-pack"
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct SpreadScheduler {
95 topology_key: Option<String>,
97}
98
99impl SpreadScheduler {
100 pub fn new() -> Self {
102 Self { topology_key: None }
103 }
104
105 pub fn with_topology(mut self, key: impl Into<String>) -> Self {
107 self.topology_key = Some(key.into());
108 self
109 }
110}
111
112impl Default for SpreadScheduler {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl SchedulingAlgorithm for SpreadScheduler {
119 fn score(&self, _workload: &Workload, node: &NodeResources) -> f64 {
120 let cpu_util = node.cpu_allocated as f64 / node.cpu_capacity as f64;
122 let mem_util = node.memory_allocated as f64 / node.memory_capacity as f64;
123
124 let spread_score = 1.0 - (cpu_util + mem_util) / 2.0;
126
127 let topology_bonus = if let Some(key) = &self.topology_key {
129 if node.labels.contains_key(key) { 0.1 } else { 0.0 }
130 } else {
131 0.0
132 };
133
134 spread_score + topology_bonus
135 }
136
137 fn name(&self) -> &str {
138 "spread"
139 }
140}
141
142#[derive(Debug, Clone)]
144pub struct GpuLocalityScheduler {
145 prefer_tensor_cores: bool,
147 min_compute_capability: Option<f32>,
149 prefer_nvlink: bool,
151}
152
153impl GpuLocalityScheduler {
154 pub fn new() -> Self {
156 Self {
157 prefer_tensor_cores: true,
158 min_compute_capability: None,
159 prefer_nvlink: true,
160 }
161 }
162
163 pub fn min_compute_capability(mut self, cc: f32) -> Self {
165 self.min_compute_capability = Some(cc);
166 self
167 }
168
169 pub fn prefer_tensor_cores(mut self, prefer: bool) -> Self {
171 self.prefer_tensor_cores = prefer;
172 self
173 }
174}
175
176impl Default for GpuLocalityScheduler {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182impl SchedulingAlgorithm for GpuLocalityScheduler {
183 fn score(&self, workload: &Workload, node: &NodeResources) -> f64 {
184 if workload.resources.gpu_count == 0 {
185 let cpu_util = node.cpu_allocated as f64 / node.cpu_capacity as f64;
187 return 1.0 - cpu_util;
188 }
189
190 let available_gpus: Vec<_> = node.gpus.iter()
191 .filter(|g| !node.gpus_allocated.contains(&g.device_id))
192 .collect();
193
194 if available_gpus.is_empty() {
195 return 0.0;
196 }
197
198 let mut score = 0.5; let total_available_mem: u64 = available_gpus.iter()
202 .map(|g| g.available_memory_mb())
203 .sum();
204 let required_mem = workload.resources.gpu_memory_mb * workload.resources.gpu_count as u64;
205
206 if total_available_mem >= required_mem {
207 score += 0.2;
208 }
209
210 if self.prefer_tensor_cores {
212 let tensor_core_count = available_gpus.iter()
213 .filter(|g| g.tensor_cores)
214 .count();
215 score += 0.1 * (tensor_core_count as f64 / available_gpus.len() as f64);
216 }
217
218 if let Some(min_cc) = self.min_compute_capability {
220 let meets_cc = available_gpus.iter()
221 .all(|g| g.compute_capability.map(|cc| cc >= min_cc).unwrap_or(false));
222 if meets_cc {
223 score += 0.1;
224 } else {
225 score -= 0.2;
226 }
227 }
228
229 let gpu_ids: Vec<_> = available_gpus.iter().map(|g| g.device_id).collect();
232 if gpu_ids.len() >= workload.resources.gpu_count as usize {
233 let contiguous = gpu_ids.windows(2)
234 .all(|w| w[1] == w[0] + 1);
235 if contiguous {
236 score += 0.1;
237 }
238 }
239
240 score.min(1.0)
241 }
242
243 fn name(&self) -> &str {
244 "gpu-locality"
245 }
246}
247
248#[derive(Debug)]
250pub struct LearnedScheduler {
251 weights: parking_lot::RwLock<Vec<f64>>,
253 learning_rate: f64,
255 num_features: usize,
257 history: parking_lot::RwLock<Vec<SchedulingFeedback>>,
259}
260
261#[derive(Debug, Clone)]
263pub struct SchedulingFeedback {
264 pub features: Vec<f64>,
266 pub performance: f64,
268}
269
270impl LearnedScheduler {
271 pub fn new() -> Self {
273 let num_features = 8; Self {
275 weights: parking_lot::RwLock::new(vec![0.5; num_features]),
276 learning_rate: 0.01,
277 num_features,
278 history: parking_lot::RwLock::new(Vec::new()),
279 }
280 }
281
282 fn extract_features(&self, workload: &Workload, node: &NodeResources) -> Vec<f64> {
284 vec![
285 node.cpu_allocated as f64 / node.cpu_capacity as f64,
287 node.memory_allocated as f64 / node.memory_capacity as f64,
288 if node.gpus.is_empty() { 0.0 } else { node.gpus_allocated.len() as f64 / node.gpus.len() as f64 },
289
290 (node.cpu_available() as f64 - workload.resources.cpu_millis as f64) / node.cpu_capacity as f64,
292 (node.memory_available() as f64 - workload.resources.memory_mb as f64) / node.memory_capacity as f64,
293
294 workload.priority as f64 / 100.0,
296 if workload.resources.gpu_count > 0 { 1.0 } else { 0.0 },
297
298 if node.schedulable { 1.0 } else { 0.0 },
300 ]
301 }
302
303 pub fn record_feedback(&self, feedback: SchedulingFeedback) {
305 let mut weights = self.weights.write();
307
308 let prediction: f64 = feedback.features.iter()
310 .zip(weights.iter())
311 .map(|(f, w)| f * w)
312 .sum();
313
314 let error = feedback.performance - prediction;
316
317 for (i, feature) in feedback.features.iter().enumerate() {
319 weights[i] += self.learning_rate * error * feature;
320 weights[i] = weights[i].clamp(-2.0, 2.0);
322 }
323
324 self.history.write().push(feedback);
326 }
327
328 pub fn weights(&self) -> Vec<f64> {
330 self.weights.read().clone()
331 }
332}
333
334impl Default for LearnedScheduler {
335 fn default() -> Self {
336 Self::new()
337 }
338}
339
340impl SchedulingAlgorithm for LearnedScheduler {
341 fn score(&self, workload: &Workload, node: &NodeResources) -> f64 {
342 let features = self.extract_features(workload, node);
343 let weights = self.weights.read();
344
345 let score: f64 = features.iter()
347 .zip(weights.iter())
348 .map(|(f, w)| f * w)
349 .sum();
350
351 1.0 / (1.0 + (-score).exp())
353 }
354
355 fn name(&self) -> &str {
356 "learned"
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use crate::types::NodeId;
364
365 fn test_node(cpu_alloc: u64, mem_alloc: u64) -> NodeResources {
366 let mut node = NodeResources::new(NodeId::new(), 4000, 8192);
367 node.cpu_allocated = cpu_alloc;
368 node.memory_allocated = mem_alloc;
369 node
370 }
371
372 #[test]
373 fn test_bin_pack_prefers_utilized() {
374 let scheduler = BinPackScheduler::new();
375 let workload = Workload::new("w1", "test");
376
377 let low_util = test_node(1000, 2048);
378 let high_util = test_node(3000, 6144);
379
380 let low_score = scheduler.score(&workload, &low_util);
381 let high_score = scheduler.score(&workload, &high_util);
382
383 assert!(high_score > low_score, "Bin-pack should prefer higher utilization");
384 }
385
386 #[test]
387 fn test_spread_prefers_empty() {
388 let scheduler = SpreadScheduler::new();
389 let workload = Workload::new("w1", "test");
390
391 let low_util = test_node(1000, 2048);
392 let high_util = test_node(3000, 6144);
393
394 let low_score = scheduler.score(&workload, &low_util);
395 let high_score = scheduler.score(&workload, &high_util);
396
397 assert!(low_score > high_score, "Spread should prefer lower utilization");
398 }
399
400 #[test]
401 fn test_learned_scheduler() {
402 let scheduler = LearnedScheduler::new();
403 let workload = Workload::new("w1", "test");
404 let node = test_node(2000, 4096);
405
406 let score = scheduler.score(&workload, &node);
407 assert!(score >= 0.0 && score <= 1.0, "Score should be bounded");
408
409 let features = vec![0.5, 0.5, 0.0, 0.25, 0.25, 0.0, 0.0, 1.0];
411 scheduler.record_feedback(SchedulingFeedback {
412 features,
413 performance: 0.9,
414 });
415
416 let weights = scheduler.weights();
418 assert!(weights.iter().any(|w| *w != 0.5));
419 }
420}