forge_orchestration/scheduler/
optimized.rs1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use parking_lot::RwLock;
12use rayon::prelude::*;
13
14use super::{NodeResources, Workload, ResourceRequirements};
15use crate::types::NodeId;
16
17#[derive(Debug)]
19struct NodeScoreCache {
20 node_id: NodeId,
22 cpu_score: u32,
24 memory_score: u32,
26 gpu_score: u32,
28 combined_score: u32,
30 cpu_available: u64,
32 memory_available: u64,
34 gpu_available: u32,
36 schedulable: bool,
38}
39
40impl NodeScoreCache {
41 fn from_node(node: &NodeResources) -> Self {
42 let cpu_available = node.cpu_available();
43 let memory_available = node.memory_available();
44 let gpu_available = node.gpus_available() as u32;
45
46 let cpu_score = ((cpu_available as f64 / node.cpu_capacity.max(1) as f64) * 1000.0) as u32;
48 let memory_score = ((memory_available as f64 / node.memory_capacity.max(1) as f64) * 1000.0) as u32;
49 let gpu_score = if node.gpus.is_empty() {
50 500
51 } else {
52 ((gpu_available as f64 / node.gpus.len() as f64) * 1000.0) as u32
53 };
54
55 let combined_score = (cpu_score + memory_score + gpu_score) / 3;
57
58 Self {
59 node_id: node.node_id,
60 cpu_score,
61 memory_score,
62 gpu_score,
63 combined_score,
64 cpu_available,
65 memory_available,
66 gpu_available,
67 schedulable: node.schedulable,
68 }
69 }
70
71 #[inline(always)]
72 fn can_fit(&self, req: &ResourceRequirements) -> bool {
73 self.schedulable
74 && self.cpu_available >= req.cpu_millis
75 && self.memory_available >= req.memory_mb
76 && self.gpu_available >= req.gpu_count
77 }
78
79 #[inline(always)]
80 fn score_for_workload(&self, req: &ResourceRequirements) -> u32 {
81 if !self.can_fit(req) {
82 return 0;
83 }
84
85 let cpu_fit = 1000 - ((self.cpu_available - req.cpu_millis) * 1000 / self.cpu_available.max(1)) as u32;
88 let mem_fit = 1000 - ((self.memory_available - req.memory_mb) * 1000 / self.memory_available.max(1)) as u32;
89
90 (cpu_fit * 4 + mem_fit * 4 + self.gpu_score * 2) / 10
92 }
93}
94
95pub struct WorkloadBatch {
97 workloads: Vec<Workload>,
98 results: Vec<Option<NodeId>>,
99}
100
101impl WorkloadBatch {
102 pub fn new(workloads: Vec<Workload>) -> Self {
104 let len = workloads.len();
105 Self {
106 workloads,
107 results: vec![None; len],
108 }
109 }
110
111 pub fn results(&self) -> &[Option<NodeId>] {
113 &self.results
114 }
115
116 pub fn workloads(&self) -> &[Workload] {
118 &self.workloads
119 }
120}
121
122pub struct OptimizedScheduler {
130 node_cache: RwLock<Vec<NodeScoreCache>>,
132 nodes: RwLock<Vec<NodeResources>>,
134 scheduled_count: AtomicU64,
136 total_time_ns: AtomicU64,
138 cache_generation: AtomicUsize,
140}
141
142impl OptimizedScheduler {
143 pub fn new() -> Self {
145 Self {
146 node_cache: RwLock::new(Vec::new()),
147 nodes: RwLock::new(Vec::new()),
148 scheduled_count: AtomicU64::new(0),
149 total_time_ns: AtomicU64::new(0),
150 cache_generation: AtomicUsize::new(0),
151 }
152 }
153
154 pub fn register_node(&self, node: NodeResources) {
156 let cache = NodeScoreCache::from_node(&node);
157 self.nodes.write().push(node);
158 self.node_cache.write().push(cache);
159 self.cache_generation.fetch_add(1, Ordering::Relaxed);
160 }
161
162 pub fn refresh_cache(&self) {
164 let nodes = self.nodes.read();
165 let mut cache = self.node_cache.write();
166 cache.clear();
167 cache.extend(nodes.iter().map(NodeScoreCache::from_node));
168 self.cache_generation.fetch_add(1, Ordering::Relaxed);
169 }
170
171 #[inline]
173 pub fn schedule_fast(&self, workload: &Workload) -> Option<NodeId> {
174 let start = std::time::Instant::now();
175 let cache = self.node_cache.read();
176
177 if cache.is_empty() {
178 return None;
179 }
180
181 let req = &workload.resources;
182
183 let best = if cache.len() > 16 {
185 cache.par_iter()
187 .filter(|n| n.can_fit(req))
188 .max_by_key(|n| n.score_for_workload(req))
189 .map(|n| n.node_id)
190 } else {
191 cache.iter()
193 .filter(|n| n.can_fit(req))
194 .max_by_key(|n| n.score_for_workload(req))
195 .map(|n| n.node_id)
196 };
197
198 self.scheduled_count.fetch_add(1, Ordering::Relaxed);
200 self.total_time_ns.fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
201
202 best
203 }
204
205 pub fn schedule_batch(&self, batch: &mut WorkloadBatch) {
207 let start = std::time::Instant::now();
208 let cache = self.node_cache.read();
209
210 if cache.is_empty() {
211 return;
212 }
213
214 let mut indices: Vec<usize> = (0..batch.workloads.len()).collect();
216 indices.sort_by(|&a, &b| {
217 batch.workloads[b].priority.cmp(&batch.workloads[a].priority)
218 });
219
220 let mut node_allocated: Vec<(u64, u64, u32)> = cache.iter()
222 .map(|n| (n.cpu_available, n.memory_available, n.gpu_available))
223 .collect();
224
225 for idx in indices {
227 let workload = &batch.workloads[idx];
228 let req = &workload.resources;
229
230 let mut best_node: Option<usize> = None;
232 let mut best_score: u32 = 0;
233
234 for (i, (n, alloc)) in cache.iter().zip(node_allocated.iter()).enumerate() {
235 if !n.schedulable {
236 continue;
237 }
238
239 if alloc.0 < req.cpu_millis || alloc.1 < req.memory_mb || alloc.2 < req.gpu_count {
241 continue;
242 }
243
244 let remaining_cpu = alloc.0 - req.cpu_millis;
246 let remaining_mem = alloc.1 - req.memory_mb;
247
248 let score = 2000 - (remaining_cpu * 1000 / n.cpu_available.max(1)) as u32
250 - (remaining_mem * 1000 / n.memory_available.max(1)) as u32;
251
252 if score > best_score {
253 best_score = score;
254 best_node = Some(i);
255 }
256 }
257
258 if let Some(node_idx) = best_node {
259 batch.results[idx] = Some(cache[node_idx].node_id);
260
261 node_allocated[node_idx].0 -= req.cpu_millis;
263 node_allocated[node_idx].1 -= req.memory_mb;
264 node_allocated[node_idx].2 -= req.gpu_count;
265 }
266 }
267
268 let count = batch.workloads.len() as u64;
270 self.scheduled_count.fetch_add(count, Ordering::Relaxed);
271 self.total_time_ns.fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
272 }
273
274 pub fn stats(&self) -> SchedulerStats {
276 let count = self.scheduled_count.load(Ordering::Relaxed);
277 let time_ns = self.total_time_ns.load(Ordering::Relaxed);
278
279 SchedulerStats {
280 total_scheduled: count,
281 total_time_ns: time_ns,
282 avg_time_ns: if count > 0 { time_ns / count } else { 0 },
283 decisions_per_sec: if time_ns > 0 {
284 (count as f64 * 1_000_000_000.0 / time_ns as f64) as u64
285 } else {
286 0
287 },
288 node_count: self.node_cache.read().len(),
289 }
290 }
291
292 pub fn reset_stats(&self) {
294 self.scheduled_count.store(0, Ordering::Relaxed);
295 self.total_time_ns.store(0, Ordering::Relaxed);
296 }
297
298 pub fn node_count(&self) -> usize {
300 self.node_cache.read().len()
301 }
302
303 pub fn utilization(&self) -> ClusterUtilization {
305 let nodes = self.nodes.read();
306
307 let mut total_cpu: u64 = 0;
308 let mut used_cpu: u64 = 0;
309 let mut total_mem: u64 = 0;
310 let mut used_mem: u64 = 0;
311 let mut total_gpu: u32 = 0;
312 let mut used_gpu: u32 = 0;
313
314 for node in nodes.iter() {
315 total_cpu += node.cpu_capacity;
316 used_cpu += node.cpu_allocated;
317 total_mem += node.memory_capacity;
318 used_mem += node.memory_allocated;
319 total_gpu += node.gpus.len() as u32;
320 used_gpu += node.gpus_allocated.len() as u32;
321 }
322
323 ClusterUtilization {
324 cpu_percent: if total_cpu > 0 { (used_cpu as f64 / total_cpu as f64) * 100.0 } else { 0.0 },
325 memory_percent: if total_mem > 0 { (used_mem as f64 / total_mem as f64) * 100.0 } else { 0.0 },
326 gpu_percent: if total_gpu > 0 { (used_gpu as f64 / total_gpu as f64) * 100.0 } else { 0.0 },
327 total_cpu,
328 used_cpu,
329 total_memory: total_mem,
330 used_memory: used_mem,
331 total_gpus: total_gpu,
332 used_gpus: used_gpu,
333 }
334 }
335}
336
337impl Default for OptimizedScheduler {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343#[derive(Debug, Clone)]
345pub struct SchedulerStats {
346 pub total_scheduled: u64,
348 pub total_time_ns: u64,
350 pub avg_time_ns: u64,
352 pub decisions_per_sec: u64,
354 pub node_count: usize,
356}
357
358#[derive(Debug, Clone)]
360pub struct ClusterUtilization {
361 pub cpu_percent: f64,
363 pub memory_percent: f64,
365 pub gpu_percent: f64,
367 pub total_cpu: u64,
369 pub used_cpu: u64,
371 pub total_memory: u64,
373 pub used_memory: u64,
375 pub total_gpus: u32,
377 pub used_gpus: u32,
379}
380
381pub struct FFDBinPacker {
385 nodes: Vec<NodeResources>,
387}
388
389impl FFDBinPacker {
390 pub fn new(mut nodes: Vec<NodeResources>) -> Self {
392 nodes.sort_by(|a, b| {
394 let cap_a = a.cpu_capacity + a.memory_capacity;
395 let cap_b = b.cpu_capacity + b.memory_capacity;
396 cap_b.cmp(&cap_a)
397 });
398 Self { nodes }
399 }
400
401 pub fn pack(&mut self, mut workloads: Vec<Workload>) -> (Vec<(String, NodeId)>, f64) {
404 workloads.sort_by(|a, b| {
406 let req_a = a.resources.cpu_millis + a.resources.memory_mb;
407 let req_b = b.resources.cpu_millis + b.resources.memory_mb;
408 req_b.cmp(&req_a)
409 });
410
411 let mut assignments = Vec::new();
412 let mut node_usage: Vec<(u64, u64)> = self.nodes.iter()
413 .map(|n| (0u64, 0u64))
414 .collect();
415
416 for workload in &workloads {
417 let req = &workload.resources;
418
419 for (i, node) in self.nodes.iter().enumerate() {
421 let (used_cpu, used_mem) = node_usage[i];
422 let avail_cpu = node.cpu_capacity.saturating_sub(used_cpu);
423 let avail_mem = node.memory_capacity.saturating_sub(used_mem);
424
425 if avail_cpu >= req.cpu_millis && avail_mem >= req.memory_mb {
426 assignments.push((workload.id.clone(), node.node_id));
427 node_usage[i].0 += req.cpu_millis;
428 node_usage[i].1 += req.memory_mb;
429 break;
430 }
431 }
432 }
433
434 let total_cpu: u64 = self.nodes.iter().map(|n| n.cpu_capacity).sum();
436 let used_cpu: u64 = node_usage.iter().map(|(c, _)| c).sum();
437 let utilization = if total_cpu > 0 {
438 (used_cpu as f64 / total_cpu as f64) * 100.0
439 } else {
440 0.0
441 };
442
443 (assignments, utilization)
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 fn create_nodes(count: usize) -> Vec<NodeResources> {
452 (0..count).map(|_| {
453 NodeResources::new(NodeId::new(), 8000, 32768)
454 }).collect()
455 }
456
457 fn create_workloads(count: usize) -> Vec<Workload> {
458 (0..count).map(|i| {
459 Workload::new(format!("w-{}", i), "test")
460 .with_resources(ResourceRequirements::new()
461 .cpu(100 + (i as u64 % 10) * 100)
462 .memory(256 + (i as u64 % 8) * 256))
463 }).collect()
464 }
465
466 #[test]
467 fn test_optimized_scheduler_fast() {
468 let scheduler = OptimizedScheduler::new();
469
470 for node in create_nodes(100) {
471 scheduler.register_node(node);
472 }
473
474 let workloads = create_workloads(1000);
475 let mut scheduled = 0;
476
477 for workload in &workloads {
478 if scheduler.schedule_fast(workload).is_some() {
479 scheduled += 1;
480 }
481 }
482
483 assert!(scheduled > 0);
484
485 let stats = scheduler.stats();
486 println!("Scheduled: {}, Rate: {} decisions/sec", scheduled, stats.decisions_per_sec);
487 assert!(stats.decisions_per_sec > 10_000, "Expected >10K/sec, got {}", stats.decisions_per_sec);
489 }
490
491 #[test]
492 fn test_batch_scheduling() {
493 let scheduler = OptimizedScheduler::new();
494
495 for node in create_nodes(50) {
496 scheduler.register_node(node);
497 }
498
499 let workloads = create_workloads(100);
500 let mut batch = WorkloadBatch::new(workloads);
501
502 scheduler.schedule_batch(&mut batch);
503
504 let scheduled: usize = batch.results().iter().filter(|r| r.is_some()).count();
505 assert!(scheduled > 0);
506 println!("Batch scheduled: {}/100", scheduled);
507 }
508
509 #[test]
510 fn test_ffd_bin_packing() {
511 let nodes = create_nodes(10);
512 let workloads = create_workloads(50);
513
514 let mut packer = FFDBinPacker::new(nodes);
515 let (assignments, utilization) = packer.pack(workloads);
516
517 println!("FFD packed {} workloads, utilization: {:.1}%", assignments.len(), utilization);
518 assert!(assignments.len() > 0);
519 }
520}