1pub mod algorithms;
11pub mod placement;
12pub mod preemption;
13pub mod queue;
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19use tracing::{debug, info, warn};
20
21use crate::types::{NodeId, GpuResources};
22
23pub use algorithms::{SchedulingAlgorithm, BinPackScheduler, SpreadScheduler, GpuLocalityScheduler};
24pub use placement::{PlacementConstraint, AffinityRule, Affinity};
25pub use preemption::{PreemptionPolicy, PriorityClass};
26pub use queue::{SchedulingQueue, QueuedWorkload};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ResourceRequirements {
31 pub cpu_millis: u64,
33 pub memory_mb: u64,
35 pub gpu_count: u32,
37 pub gpu_memory_mb: u64,
39 pub storage_mb: u64,
41 pub network_mbps: u32,
43}
44
45impl Default for ResourceRequirements {
46 fn default() -> Self {
47 Self {
48 cpu_millis: 100,
49 memory_mb: 128,
50 gpu_count: 0,
51 gpu_memory_mb: 0,
52 storage_mb: 0,
53 network_mbps: 0,
54 }
55 }
56}
57
58impl ResourceRequirements {
59 pub fn new() -> Self {
61 Self::default()
62 }
63
64 pub fn cpu(mut self, millis: u64) -> Self {
66 self.cpu_millis = millis;
67 self
68 }
69
70 pub fn memory(mut self, mb: u64) -> Self {
72 self.memory_mb = mb;
73 self
74 }
75
76 pub fn gpu(mut self, count: u32, memory_mb: u64) -> Self {
78 self.gpu_count = count;
79 self.gpu_memory_mb = memory_mb;
80 self
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct NodeResources {
87 pub node_id: NodeId,
89 pub cpu_capacity: u64,
91 pub cpu_allocated: u64,
93 pub memory_capacity: u64,
95 pub memory_allocated: u64,
97 pub gpus: Vec<GpuResources>,
99 pub gpus_allocated: Vec<u32>,
101 pub labels: HashMap<String, String>,
103 pub taints: Vec<Taint>,
105 pub schedulable: bool,
107 pub conditions: Vec<NodeCondition>,
109}
110
111impl NodeResources {
112 pub fn new(node_id: NodeId, cpu_capacity: u64, memory_capacity: u64) -> Self {
114 Self {
115 node_id,
116 cpu_capacity,
117 cpu_allocated: 0,
118 memory_capacity,
119 memory_allocated: 0,
120 gpus: Vec::new(),
121 gpus_allocated: Vec::new(),
122 labels: HashMap::new(),
123 taints: Vec::new(),
124 schedulable: true,
125 conditions: Vec::new(),
126 }
127 }
128
129 pub fn with_gpu(mut self, gpu: GpuResources) -> Self {
131 self.gpus.push(gpu);
132 self
133 }
134
135 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
137 self.labels.insert(key.into(), value.into());
138 self
139 }
140
141 pub fn with_taint(mut self, taint: Taint) -> Self {
143 self.taints.push(taint);
144 self
145 }
146
147 pub fn cpu_available(&self) -> u64 {
149 self.cpu_capacity.saturating_sub(self.cpu_allocated)
150 }
151
152 pub fn memory_available(&self) -> u64 {
154 self.memory_capacity.saturating_sub(self.memory_allocated)
155 }
156
157 pub fn gpus_available(&self) -> usize {
159 self.gpus.len() - self.gpus_allocated.len()
160 }
161
162 pub fn can_fit(&self, req: &ResourceRequirements) -> bool {
164 if !self.schedulable {
165 return false;
166 }
167
168 if self.cpu_available() < req.cpu_millis {
169 return false;
170 }
171
172 if self.memory_available() < req.memory_mb {
173 return false;
174 }
175
176 if req.gpu_count > 0 {
177 let available_gpus: Vec<_> = self.gpus.iter()
178 .filter(|g| !self.gpus_allocated.contains(&g.device_id))
179 .filter(|g| g.available_memory_mb() >= req.gpu_memory_mb)
180 .collect();
181
182 if available_gpus.len() < req.gpu_count as usize {
183 return false;
184 }
185 }
186
187 true
188 }
189
190 pub fn allocate(&mut self, req: &ResourceRequirements) -> bool {
192 if !self.can_fit(req) {
193 return false;
194 }
195
196 self.cpu_allocated += req.cpu_millis;
197 self.memory_allocated += req.memory_mb;
198
199 for _ in 0..req.gpu_count {
201 if let Some(gpu) = self.gpus.iter()
202 .find(|g| !self.gpus_allocated.contains(&g.device_id)
203 && g.available_memory_mb() >= req.gpu_memory_mb)
204 {
205 self.gpus_allocated.push(gpu.device_id);
206 }
207 }
208
209 true
210 }
211
212 pub fn release(&mut self, req: &ResourceRequirements, gpu_ids: &[u32]) {
214 self.cpu_allocated = self.cpu_allocated.saturating_sub(req.cpu_millis);
215 self.memory_allocated = self.memory_allocated.saturating_sub(req.memory_mb);
216 self.gpus_allocated.retain(|id| !gpu_ids.contains(id));
217 }
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct Taint {
223 pub key: String,
225 pub value: String,
227 pub effect: TaintEffect,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
233pub enum TaintEffect {
234 NoSchedule,
236 PreferNoSchedule,
238 NoExecute,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct NodeCondition {
245 pub condition_type: String,
247 pub status: bool,
249 pub last_transition: chrono::DateTime<chrono::Utc>,
251 pub reason: Option<String>,
253 pub message: Option<String>,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct Workload {
260 pub id: String,
262 pub name: String,
264 pub namespace: String,
266 pub resources: ResourceRequirements,
268 pub priority: i32,
270 pub priority_class: Option<String>,
272 pub constraints: Vec<PlacementConstraint>,
274 pub affinity: Option<Affinity>,
276 pub tolerations: Vec<Toleration>,
278 pub preemption_policy: PreemptionPolicy,
280 pub created_at: chrono::DateTime<chrono::Utc>,
282}
283
284impl Workload {
285 pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
287 Self {
288 id: id.into(),
289 name: name.into(),
290 namespace: "default".to_string(),
291 resources: ResourceRequirements::default(),
292 priority: 0,
293 priority_class: None,
294 constraints: Vec::new(),
295 affinity: None,
296 tolerations: Vec::new(),
297 preemption_policy: PreemptionPolicy::PreemptLowerPriority,
298 created_at: chrono::Utc::now(),
299 }
300 }
301
302 pub fn with_resources(mut self, resources: ResourceRequirements) -> Self {
304 self.resources = resources;
305 self
306 }
307
308 pub fn with_priority(mut self, priority: i32) -> Self {
310 self.priority = priority;
311 self
312 }
313
314 pub fn with_constraint(mut self, constraint: PlacementConstraint) -> Self {
316 self.constraints.push(constraint);
317 self
318 }
319
320 pub fn with_affinity(mut self, affinity: Affinity) -> Self {
322 self.affinity = Some(affinity);
323 self
324 }
325
326 pub fn with_toleration(mut self, toleration: Toleration) -> Self {
328 self.tolerations.push(toleration);
329 self
330 }
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize)]
335pub struct Toleration {
336 pub key: Option<String>,
338 pub operator: TolerationOperator,
340 pub value: Option<String>,
342 pub effect: Option<TaintEffect>,
344 pub toleration_seconds: Option<u64>,
346}
347
348#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
350pub enum TolerationOperator {
351 Equal,
353 Exists,
355}
356
357#[derive(Debug, Clone)]
359pub struct SchedulingDecision {
360 pub workload_id: String,
362 pub node_id: Option<NodeId>,
364 pub score: f64,
366 pub reason: String,
368 pub preempted: Vec<String>,
370 pub latency_ms: u64,
372}
373
374pub struct Scheduler {
376 nodes: Arc<RwLock<HashMap<NodeId, NodeResources>>>,
378 queue: Arc<SchedulingQueue>,
380 algorithm: Arc<dyn SchedulingAlgorithm + Send + Sync>,
382 assignments: Arc<RwLock<HashMap<String, NodeId>>>,
384 priority_classes: Arc<RwLock<HashMap<String, PriorityClass>>>,
386}
387
388impl Scheduler {
389 pub fn new() -> Self {
391 Self {
392 nodes: Arc::new(RwLock::new(HashMap::new())),
393 queue: Arc::new(SchedulingQueue::new()),
394 algorithm: Arc::new(BinPackScheduler::new()),
395 assignments: Arc::new(RwLock::new(HashMap::new())),
396 priority_classes: Arc::new(RwLock::new(HashMap::new())),
397 }
398 }
399
400 pub fn with_algorithm<A: SchedulingAlgorithm + Send + Sync + 'static>(algorithm: A) -> Self {
402 Self {
403 nodes: Arc::new(RwLock::new(HashMap::new())),
404 queue: Arc::new(SchedulingQueue::new()),
405 algorithm: Arc::new(algorithm),
406 assignments: Arc::new(RwLock::new(HashMap::new())),
407 priority_classes: Arc::new(RwLock::new(HashMap::new())),
408 }
409 }
410
411 pub fn register_node(&self, node: NodeResources) {
413 info!(node_id = %node.node_id, "Registering node");
414 self.nodes.write().insert(node.node_id, node);
415 }
416
417 pub fn unregister_node(&self, node_id: &NodeId) {
419 info!(node_id = %node_id, "Unregistering node");
420 self.nodes.write().remove(node_id);
421 }
422
423 pub fn update_node(&self, node: NodeResources) {
425 self.nodes.write().insert(node.node_id, node);
426 }
427
428 pub fn node_count(&self) -> usize {
430 self.nodes.read().len()
431 }
432
433 pub fn submit(&self, workload: Workload) {
435 debug!(workload_id = %workload.id, "Submitting workload");
436 self.queue.enqueue(workload);
437 }
438
439 pub fn schedule_next(&self) -> Option<SchedulingDecision> {
441 let workload = self.queue.dequeue()?;
442 Some(self.schedule(&workload))
443 }
444
445 pub fn schedule(&self, workload: &Workload) -> SchedulingDecision {
447 let start = std::time::Instant::now();
448 let nodes = self.nodes.read();
449
450 let candidates: Vec<_> = nodes.values()
452 .filter(|n| n.can_fit(&workload.resources))
453 .filter(|n| self.check_constraints(workload, n))
454 .filter(|n| self.check_taints(workload, n))
455 .collect();
456
457 if candidates.is_empty() {
458 drop(nodes);
460 if let Some(decision) = self.try_preemption(workload) {
461 return decision;
462 }
463
464 return SchedulingDecision {
465 workload_id: workload.id.clone(),
466 node_id: None,
467 score: 0.0,
468 reason: "No suitable nodes available".to_string(),
469 preempted: Vec::new(),
470 latency_ms: start.elapsed().as_millis() as u64,
471 };
472 }
473
474 let scored: Vec<_> = candidates.iter()
476 .map(|n| (n, self.algorithm.score(workload, n)))
477 .collect();
478
479 let (best_node, score) = scored.iter()
480 .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
481 .map(|(n, s)| (*n, *s))
482 .unwrap();
483
484 let node_id = best_node.node_id;
485 drop(nodes);
486
487 if let Some(node) = self.nodes.write().get_mut(&node_id) {
489 node.allocate(&workload.resources);
490 }
491
492 self.assignments.write().insert(workload.id.clone(), node_id);
493
494 info!(
495 workload_id = %workload.id,
496 node_id = %node_id,
497 score = score,
498 "Workload scheduled"
499 );
500
501 SchedulingDecision {
502 workload_id: workload.id.clone(),
503 node_id: Some(node_id),
504 score,
505 reason: "Scheduled successfully".to_string(),
506 preempted: Vec::new(),
507 latency_ms: start.elapsed().as_millis() as u64,
508 }
509 }
510
511 fn check_constraints(&self, workload: &Workload, node: &NodeResources) -> bool {
513 for constraint in &workload.constraints {
514 if !constraint.matches(node) {
515 return false;
516 }
517 }
518
519 if let Some(affinity) = &workload.affinity {
521 if !affinity.matches(node) {
522 return false;
523 }
524 }
525
526 true
527 }
528
529 fn check_taints(&self, workload: &Workload, node: &NodeResources) -> bool {
531 for taint in &node.taints {
532 let tolerated = workload.tolerations.iter().any(|t| {
533 let key_matches = t.key.as_ref().map(|k| k == &taint.key).unwrap_or(true);
535
536 let value_matches = match t.operator {
538 TolerationOperator::Exists => true,
539 TolerationOperator::Equal => {
540 t.value.as_ref().map(|v| v == &taint.value).unwrap_or(false)
541 }
542 };
543
544 let effect_matches = t.effect.map(|e| e == taint.effect).unwrap_or(true);
546
547 key_matches && value_matches && effect_matches
548 });
549
550 if !tolerated && taint.effect == TaintEffect::NoSchedule {
551 return false;
552 }
553 }
554
555 true
556 }
557
558 fn try_preemption(&self, workload: &Workload) -> Option<SchedulingDecision> {
560 if workload.preemption_policy == PreemptionPolicy::Never {
561 return None;
562 }
563
564 let assignments = self.assignments.read();
566 let mut nodes = self.nodes.write();
567
568 for (node_id, node) in nodes.iter_mut() {
569 let preemptable: Vec<_> = assignments.iter()
571 .filter(|(_, n)| *n == node_id)
572 .map(|(w, _)| w.clone())
573 .collect();
574
575 if node.can_fit(&workload.resources) || !preemptable.is_empty() {
578 warn!(
580 workload_id = %workload.id,
581 node_id = %node_id,
582 "Preemption would be required"
583 );
584 }
585 }
586
587 None
588 }
589
590 pub fn get_assignment(&self, workload_id: &str) -> Option<NodeId> {
592 self.assignments.read().get(workload_id).copied()
593 }
594
595 pub fn release(&self, workload_id: &str, resources: &ResourceRequirements, gpu_ids: &[u32]) {
597 if let Some(node_id) = self.assignments.write().remove(workload_id) {
598 if let Some(node) = self.nodes.write().get_mut(&node_id) {
599 node.release(resources, gpu_ids);
600 }
601 }
602 }
603
604 pub fn queue_len(&self) -> usize {
606 self.queue.len()
607 }
608
609 pub fn register_priority_class(&self, class: PriorityClass) {
611 self.priority_classes.write().insert(class.name.clone(), class);
612 }
613}
614
615impl Default for Scheduler {
616 fn default() -> Self {
617 Self::new()
618 }
619}
620
621#[cfg(test)]
622mod tests {
623 use super::*;
624
625 #[test]
626 fn test_node_resources() {
627 let mut node = NodeResources::new(NodeId::new(), 4000, 8192);
628
629 let req = ResourceRequirements::new().cpu(1000).memory(2048);
630 assert!(node.can_fit(&req));
631
632 assert!(node.allocate(&req));
633 assert_eq!(node.cpu_available(), 3000);
634 assert_eq!(node.memory_available(), 6144);
635 }
636
637 #[test]
638 fn test_scheduler_basic() {
639 let scheduler = Scheduler::new();
640
641 let node = NodeResources::new(NodeId::new(), 4000, 8192);
642 scheduler.register_node(node);
643
644 let workload = Workload::new("w1", "test")
645 .with_resources(ResourceRequirements::new().cpu(1000).memory(1024));
646
647 let decision = scheduler.schedule(&workload);
648 assert!(decision.node_id.is_some());
649 }
650
651 #[test]
652 fn test_scheduler_no_capacity() {
653 let scheduler = Scheduler::new();
654
655 let node = NodeResources::new(NodeId::new(), 1000, 1024);
656 scheduler.register_node(node);
657
658 let workload = Workload::new("w1", "test")
659 .with_resources(ResourceRequirements::new().cpu(2000).memory(2048));
660
661 let decision = scheduler.schedule(&workload);
662 assert!(decision.node_id.is_none());
663 }
664}