1pub mod algorithms;
11pub mod optimized;
12pub mod placement;
13pub mod preemption;
14pub mod queue;
15
16use std::collections::HashMap;
17use std::sync::Arc;
18use parking_lot::RwLock;
19use serde::{Deserialize, Serialize};
20use tracing::{debug, info, warn};
21
22use crate::types::{NodeId, GpuResources};
23
24pub use algorithms::{SchedulingAlgorithm, BinPackScheduler, SpreadScheduler, GpuLocalityScheduler};
25pub use optimized::{OptimizedScheduler, WorkloadBatch, SchedulerStats, ClusterUtilization, FFDBinPacker};
26pub use placement::{PlacementConstraint, AffinityRule, Affinity};
27pub use preemption::{PreemptionPolicy, PriorityClass};
28pub use queue::{SchedulingQueue, QueuedWorkload};
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ResourceRequirements {
33 pub cpu_millis: u64,
35 pub memory_mb: u64,
37 pub gpu_count: u32,
39 pub gpu_memory_mb: u64,
41 pub storage_mb: u64,
43 pub network_mbps: u32,
45}
46
47impl Default for ResourceRequirements {
48 fn default() -> Self {
49 Self {
50 cpu_millis: 100,
51 memory_mb: 128,
52 gpu_count: 0,
53 gpu_memory_mb: 0,
54 storage_mb: 0,
55 network_mbps: 0,
56 }
57 }
58}
59
60impl ResourceRequirements {
61 pub fn new() -> Self {
63 Self::default()
64 }
65
66 pub fn cpu(mut self, millis: u64) -> Self {
68 self.cpu_millis = millis;
69 self
70 }
71
72 pub fn memory(mut self, mb: u64) -> Self {
74 self.memory_mb = mb;
75 self
76 }
77
78 pub fn gpu(mut self, count: u32, memory_mb: u64) -> Self {
80 self.gpu_count = count;
81 self.gpu_memory_mb = memory_mb;
82 self
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct NodeResources {
89 pub node_id: NodeId,
91 pub cpu_capacity: u64,
93 pub cpu_allocated: u64,
95 pub memory_capacity: u64,
97 pub memory_allocated: u64,
99 pub gpus: Vec<GpuResources>,
101 pub gpus_allocated: Vec<u32>,
103 pub labels: HashMap<String, String>,
105 pub taints: Vec<Taint>,
107 pub schedulable: bool,
109 pub conditions: Vec<NodeCondition>,
111}
112
113impl NodeResources {
114 pub fn new(node_id: NodeId, cpu_capacity: u64, memory_capacity: u64) -> Self {
116 Self {
117 node_id,
118 cpu_capacity,
119 cpu_allocated: 0,
120 memory_capacity,
121 memory_allocated: 0,
122 gpus: Vec::new(),
123 gpus_allocated: Vec::new(),
124 labels: HashMap::new(),
125 taints: Vec::new(),
126 schedulable: true,
127 conditions: Vec::new(),
128 }
129 }
130
131 pub fn with_gpu(mut self, gpu: GpuResources) -> Self {
133 self.gpus.push(gpu);
134 self
135 }
136
137 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
139 self.labels.insert(key.into(), value.into());
140 self
141 }
142
143 pub fn with_taint(mut self, taint: Taint) -> Self {
145 self.taints.push(taint);
146 self
147 }
148
149 pub fn cpu_available(&self) -> u64 {
151 self.cpu_capacity.saturating_sub(self.cpu_allocated)
152 }
153
154 pub fn memory_available(&self) -> u64 {
156 self.memory_capacity.saturating_sub(self.memory_allocated)
157 }
158
159 pub fn gpus_available(&self) -> usize {
161 self.gpus.len() - self.gpus_allocated.len()
162 }
163
164 pub fn can_fit(&self, req: &ResourceRequirements) -> bool {
166 if !self.schedulable {
167 return false;
168 }
169
170 if self.cpu_available() < req.cpu_millis {
171 return false;
172 }
173
174 if self.memory_available() < req.memory_mb {
175 return false;
176 }
177
178 if req.gpu_count > 0 {
179 let available_gpus: Vec<_> = self.gpus.iter()
180 .filter(|g| !self.gpus_allocated.contains(&g.device_id))
181 .filter(|g| g.available_memory_mb() >= req.gpu_memory_mb)
182 .collect();
183
184 if available_gpus.len() < req.gpu_count as usize {
185 return false;
186 }
187 }
188
189 true
190 }
191
192 pub fn allocate(&mut self, req: &ResourceRequirements) -> bool {
194 if !self.can_fit(req) {
195 return false;
196 }
197
198 self.cpu_allocated += req.cpu_millis;
199 self.memory_allocated += req.memory_mb;
200
201 for _ in 0..req.gpu_count {
203 if let Some(gpu) = self.gpus.iter()
204 .find(|g| !self.gpus_allocated.contains(&g.device_id)
205 && g.available_memory_mb() >= req.gpu_memory_mb)
206 {
207 self.gpus_allocated.push(gpu.device_id);
208 }
209 }
210
211 true
212 }
213
214 pub fn release(&mut self, req: &ResourceRequirements, gpu_ids: &[u32]) {
216 self.cpu_allocated = self.cpu_allocated.saturating_sub(req.cpu_millis);
217 self.memory_allocated = self.memory_allocated.saturating_sub(req.memory_mb);
218 self.gpus_allocated.retain(|id| !gpu_ids.contains(id));
219 }
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct Taint {
225 pub key: String,
227 pub value: String,
229 pub effect: TaintEffect,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
235pub enum TaintEffect {
236 NoSchedule,
238 PreferNoSchedule,
240 NoExecute,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct NodeCondition {
247 pub condition_type: String,
249 pub status: bool,
251 pub last_transition: chrono::DateTime<chrono::Utc>,
253 pub reason: Option<String>,
255 pub message: Option<String>,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct Workload {
262 pub id: String,
264 pub name: String,
266 pub namespace: String,
268 pub resources: ResourceRequirements,
270 pub priority: i32,
272 pub priority_class: Option<String>,
274 pub constraints: Vec<PlacementConstraint>,
276 pub affinity: Option<Affinity>,
278 pub tolerations: Vec<Toleration>,
280 pub preemption_policy: PreemptionPolicy,
282 pub created_at: chrono::DateTime<chrono::Utc>,
284}
285
286impl Workload {
287 pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
289 Self {
290 id: id.into(),
291 name: name.into(),
292 namespace: "default".to_string(),
293 resources: ResourceRequirements::default(),
294 priority: 0,
295 priority_class: None,
296 constraints: Vec::new(),
297 affinity: None,
298 tolerations: Vec::new(),
299 preemption_policy: PreemptionPolicy::PreemptLowerPriority,
300 created_at: chrono::Utc::now(),
301 }
302 }
303
304 pub fn with_resources(mut self, resources: ResourceRequirements) -> Self {
306 self.resources = resources;
307 self
308 }
309
310 pub fn with_priority(mut self, priority: i32) -> Self {
312 self.priority = priority;
313 self
314 }
315
316 pub fn with_constraint(mut self, constraint: PlacementConstraint) -> Self {
318 self.constraints.push(constraint);
319 self
320 }
321
322 pub fn with_affinity(mut self, affinity: Affinity) -> Self {
324 self.affinity = Some(affinity);
325 self
326 }
327
328 pub fn with_toleration(mut self, toleration: Toleration) -> Self {
330 self.tolerations.push(toleration);
331 self
332 }
333}
334
335#[derive(Debug, Clone, Serialize, Deserialize)]
337pub struct Toleration {
338 pub key: Option<String>,
340 pub operator: TolerationOperator,
342 pub value: Option<String>,
344 pub effect: Option<TaintEffect>,
346 pub toleration_seconds: Option<u64>,
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
352pub enum TolerationOperator {
353 Equal,
355 Exists,
357}
358
359#[derive(Debug, Clone)]
361pub struct SchedulingDecision {
362 pub workload_id: String,
364 pub node_id: Option<NodeId>,
366 pub score: f64,
368 pub reason: String,
370 pub preempted: Vec<String>,
372 pub latency_ms: u64,
374}
375
376pub struct Scheduler {
378 nodes: Arc<RwLock<HashMap<NodeId, NodeResources>>>,
380 queue: Arc<SchedulingQueue>,
382 algorithm: Arc<dyn SchedulingAlgorithm + Send + Sync>,
384 assignments: Arc<RwLock<HashMap<String, NodeId>>>,
386 priority_classes: Arc<RwLock<HashMap<String, PriorityClass>>>,
388}
389
390impl Scheduler {
391 pub fn new() -> Self {
393 Self {
394 nodes: Arc::new(RwLock::new(HashMap::new())),
395 queue: Arc::new(SchedulingQueue::new()),
396 algorithm: Arc::new(BinPackScheduler::new()),
397 assignments: Arc::new(RwLock::new(HashMap::new())),
398 priority_classes: Arc::new(RwLock::new(HashMap::new())),
399 }
400 }
401
402 pub fn with_algorithm<A: SchedulingAlgorithm + Send + Sync + 'static>(algorithm: A) -> Self {
404 Self {
405 nodes: Arc::new(RwLock::new(HashMap::new())),
406 queue: Arc::new(SchedulingQueue::new()),
407 algorithm: Arc::new(algorithm),
408 assignments: Arc::new(RwLock::new(HashMap::new())),
409 priority_classes: Arc::new(RwLock::new(HashMap::new())),
410 }
411 }
412
413 pub fn register_node(&self, node: NodeResources) {
415 info!(node_id = %node.node_id, "Registering node");
416 self.nodes.write().insert(node.node_id, node);
417 }
418
419 pub fn unregister_node(&self, node_id: &NodeId) {
421 info!(node_id = %node_id, "Unregistering node");
422 self.nodes.write().remove(node_id);
423 }
424
425 pub fn update_node(&self, node: NodeResources) {
427 self.nodes.write().insert(node.node_id, node);
428 }
429
430 pub fn node_count(&self) -> usize {
432 self.nodes.read().len()
433 }
434
435 pub fn submit(&self, workload: Workload) {
437 debug!(workload_id = %workload.id, "Submitting workload");
438 self.queue.enqueue(workload);
439 }
440
441 pub fn schedule_next(&self) -> Option<SchedulingDecision> {
443 let workload = self.queue.dequeue()?;
444 Some(self.schedule(&workload))
445 }
446
447 pub fn schedule(&self, workload: &Workload) -> SchedulingDecision {
449 let start = std::time::Instant::now();
450 let nodes = self.nodes.read();
451
452 let candidates: Vec<_> = nodes.values()
454 .filter(|n| n.can_fit(&workload.resources))
455 .filter(|n| self.check_constraints(workload, n))
456 .filter(|n| self.check_taints(workload, n))
457 .collect();
458
459 if candidates.is_empty() {
460 drop(nodes);
462 if let Some(decision) = self.try_preemption(workload) {
463 return decision;
464 }
465
466 return SchedulingDecision {
467 workload_id: workload.id.clone(),
468 node_id: None,
469 score: 0.0,
470 reason: "No suitable nodes available".to_string(),
471 preempted: Vec::new(),
472 latency_ms: start.elapsed().as_millis() as u64,
473 };
474 }
475
476 let scored: Vec<_> = candidates.iter()
478 .map(|n| (n, self.algorithm.score(workload, n)))
479 .collect();
480
481 let (best_node, score) = scored.iter()
482 .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
483 .map(|(n, s)| (*n, *s))
484 .unwrap();
485
486 let node_id = best_node.node_id;
487 drop(nodes);
488
489 if let Some(node) = self.nodes.write().get_mut(&node_id) {
491 node.allocate(&workload.resources);
492 }
493
494 self.assignments.write().insert(workload.id.clone(), node_id);
495
496 info!(
497 workload_id = %workload.id,
498 node_id = %node_id,
499 score = score,
500 "Workload scheduled"
501 );
502
503 SchedulingDecision {
504 workload_id: workload.id.clone(),
505 node_id: Some(node_id),
506 score,
507 reason: "Scheduled successfully".to_string(),
508 preempted: Vec::new(),
509 latency_ms: start.elapsed().as_millis() as u64,
510 }
511 }
512
513 fn check_constraints(&self, workload: &Workload, node: &NodeResources) -> bool {
515 for constraint in &workload.constraints {
516 if !constraint.matches(node) {
517 return false;
518 }
519 }
520
521 if let Some(affinity) = &workload.affinity {
523 if !affinity.matches(node) {
524 return false;
525 }
526 }
527
528 true
529 }
530
531 fn check_taints(&self, workload: &Workload, node: &NodeResources) -> bool {
533 for taint in &node.taints {
534 let tolerated = workload.tolerations.iter().any(|t| {
535 let key_matches = t.key.as_ref().map(|k| k == &taint.key).unwrap_or(true);
537
538 let value_matches = match t.operator {
540 TolerationOperator::Exists => true,
541 TolerationOperator::Equal => {
542 t.value.as_ref().map(|v| v == &taint.value).unwrap_or(false)
543 }
544 };
545
546 let effect_matches = t.effect.map(|e| e == taint.effect).unwrap_or(true);
548
549 key_matches && value_matches && effect_matches
550 });
551
552 if !tolerated && taint.effect == TaintEffect::NoSchedule {
553 return false;
554 }
555 }
556
557 true
558 }
559
560 fn try_preemption(&self, workload: &Workload) -> Option<SchedulingDecision> {
562 if workload.preemption_policy == PreemptionPolicy::Never {
563 return None;
564 }
565
566 let assignments = self.assignments.read();
568 let mut nodes = self.nodes.write();
569
570 for (node_id, node) in nodes.iter_mut() {
571 let preemptable: Vec<_> = assignments.iter()
573 .filter(|(_, n)| *n == node_id)
574 .map(|(w, _)| w.clone())
575 .collect();
576
577 if node.can_fit(&workload.resources) || !preemptable.is_empty() {
580 warn!(
582 workload_id = %workload.id,
583 node_id = %node_id,
584 "Preemption would be required"
585 );
586 }
587 }
588
589 None
590 }
591
592 pub fn get_assignment(&self, workload_id: &str) -> Option<NodeId> {
594 self.assignments.read().get(workload_id).copied()
595 }
596
597 pub fn release(&self, workload_id: &str, resources: &ResourceRequirements, gpu_ids: &[u32]) {
599 if let Some(node_id) = self.assignments.write().remove(workload_id) {
600 if let Some(node) = self.nodes.write().get_mut(&node_id) {
601 node.release(resources, gpu_ids);
602 }
603 }
604 }
605
606 pub fn queue_len(&self) -> usize {
608 self.queue.len()
609 }
610
611 pub fn register_priority_class(&self, class: PriorityClass) {
613 self.priority_classes.write().insert(class.name.clone(), class);
614 }
615}
616
617impl Default for Scheduler {
618 fn default() -> Self {
619 Self::new()
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626
627 #[test]
628 fn test_node_resources() {
629 let mut node = NodeResources::new(NodeId::new(), 4000, 8192);
630
631 let req = ResourceRequirements::new().cpu(1000).memory(2048);
632 assert!(node.can_fit(&req));
633
634 assert!(node.allocate(&req));
635 assert_eq!(node.cpu_available(), 3000);
636 assert_eq!(node.memory_available(), 6144);
637 }
638
639 #[test]
640 fn test_scheduler_basic() {
641 let scheduler = Scheduler::new();
642
643 let node = NodeResources::new(NodeId::new(), 4000, 8192);
644 scheduler.register_node(node);
645
646 let workload = Workload::new("w1", "test")
647 .with_resources(ResourceRequirements::new().cpu(1000).memory(1024));
648
649 let decision = scheduler.schedule(&workload);
650 assert!(decision.node_id.is_some());
651 }
652
653 #[test]
654 fn test_scheduler_no_capacity() {
655 let scheduler = Scheduler::new();
656
657 let node = NodeResources::new(NodeId::new(), 1000, 1024);
658 scheduler.register_node(node);
659
660 let workload = Workload::new("w1", "test")
661 .with_resources(ResourceRequirements::new().cpu(2000).memory(2048));
662
663 let decision = scheduler.schedule(&workload);
664 assert!(decision.node_id.is_none());
665 }
666}