forge_orchestration/scheduler/
mod.rs

1//! Distributed Scheduler for Forge Orchestration
2//!
3//! A full-featured scheduler comparable to Kubernetes, with:
4//! - Bin-packing and spread scheduling algorithms
5//! - Affinity/anti-affinity rules
6//! - Resource-aware placement (CPU, memory, GPU)
7//! - Preemption and priority-based scheduling
8//! - Topology-aware scheduling for NUMA/GPU locality
9
10pub mod algorithms;
11pub mod placement;
12pub mod preemption;
13pub mod queue;
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19use tracing::{debug, info, warn};
20
21use crate::types::{NodeId, GpuResources};
22
23pub use algorithms::{SchedulingAlgorithm, BinPackScheduler, SpreadScheduler, GpuLocalityScheduler};
24pub use placement::{PlacementConstraint, AffinityRule, Affinity};
25pub use preemption::{PreemptionPolicy, PriorityClass};
26pub use queue::{SchedulingQueue, QueuedWorkload};
27
28/// Resource requirements for a workload
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ResourceRequirements {
31    /// CPU cores requested (millicores, 1000 = 1 core)
32    pub cpu_millis: u64,
33    /// Memory requested in MB
34    pub memory_mb: u64,
35    /// GPU count requested
36    pub gpu_count: u32,
37    /// GPU memory required per GPU in MB
38    pub gpu_memory_mb: u64,
39    /// Ephemeral storage in MB
40    pub storage_mb: u64,
41    /// Network bandwidth in Mbps
42    pub network_mbps: u32,
43}
44
45impl Default for ResourceRequirements {
46    fn default() -> Self {
47        Self {
48            cpu_millis: 100,
49            memory_mb: 128,
50            gpu_count: 0,
51            gpu_memory_mb: 0,
52            storage_mb: 0,
53            network_mbps: 0,
54        }
55    }
56}
57
58impl ResourceRequirements {
59    /// Create new resource requirements
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    /// Set CPU requirement
65    pub fn cpu(mut self, millis: u64) -> Self {
66        self.cpu_millis = millis;
67        self
68    }
69
70    /// Set memory requirement
71    pub fn memory(mut self, mb: u64) -> Self {
72        self.memory_mb = mb;
73        self
74    }
75
76    /// Set GPU requirements
77    pub fn gpu(mut self, count: u32, memory_mb: u64) -> Self {
78        self.gpu_count = count;
79        self.gpu_memory_mb = memory_mb;
80        self
81    }
82}
83
84/// Node resources and capacity
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct NodeResources {
87    /// Node identifier
88    pub node_id: NodeId,
89    /// Total CPU capacity (millicores)
90    pub cpu_capacity: u64,
91    /// Allocated CPU (millicores)
92    pub cpu_allocated: u64,
93    /// Total memory capacity (MB)
94    pub memory_capacity: u64,
95    /// Allocated memory (MB)
96    pub memory_allocated: u64,
97    /// GPU resources
98    pub gpus: Vec<GpuResources>,
99    /// GPUs allocated (by device ID)
100    pub gpus_allocated: Vec<u32>,
101    /// Node labels for affinity matching
102    pub labels: HashMap<String, String>,
103    /// Node taints
104    pub taints: Vec<Taint>,
105    /// Is node schedulable
106    pub schedulable: bool,
107    /// Node conditions
108    pub conditions: Vec<NodeCondition>,
109}
110
111impl NodeResources {
112    /// Create new node resources
113    pub fn new(node_id: NodeId, cpu_capacity: u64, memory_capacity: u64) -> Self {
114        Self {
115            node_id,
116            cpu_capacity,
117            cpu_allocated: 0,
118            memory_capacity,
119            memory_allocated: 0,
120            gpus: Vec::new(),
121            gpus_allocated: Vec::new(),
122            labels: HashMap::new(),
123            taints: Vec::new(),
124            schedulable: true,
125            conditions: Vec::new(),
126        }
127    }
128
129    /// Add GPU to node
130    pub fn with_gpu(mut self, gpu: GpuResources) -> Self {
131        self.gpus.push(gpu);
132        self
133    }
134
135    /// Add label
136    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
137        self.labels.insert(key.into(), value.into());
138        self
139    }
140
141    /// Add taint
142    pub fn with_taint(mut self, taint: Taint) -> Self {
143        self.taints.push(taint);
144        self
145    }
146
147    /// Available CPU
148    pub fn cpu_available(&self) -> u64 {
149        self.cpu_capacity.saturating_sub(self.cpu_allocated)
150    }
151
152    /// Available memory
153    pub fn memory_available(&self) -> u64 {
154        self.memory_capacity.saturating_sub(self.memory_allocated)
155    }
156
157    /// Available GPUs
158    pub fn gpus_available(&self) -> usize {
159        self.gpus.len() - self.gpus_allocated.len()
160    }
161
162    /// Check if node can fit workload
163    pub fn can_fit(&self, req: &ResourceRequirements) -> bool {
164        if !self.schedulable {
165            return false;
166        }
167
168        if self.cpu_available() < req.cpu_millis {
169            return false;
170        }
171
172        if self.memory_available() < req.memory_mb {
173            return false;
174        }
175
176        if req.gpu_count > 0 {
177            let available_gpus: Vec<_> = self.gpus.iter()
178                .filter(|g| !self.gpus_allocated.contains(&g.device_id))
179                .filter(|g| g.available_memory_mb() >= req.gpu_memory_mb)
180                .collect();
181            
182            if available_gpus.len() < req.gpu_count as usize {
183                return false;
184            }
185        }
186
187        true
188    }
189
190    /// Allocate resources for workload
191    pub fn allocate(&mut self, req: &ResourceRequirements) -> bool {
192        if !self.can_fit(req) {
193            return false;
194        }
195
196        self.cpu_allocated += req.cpu_millis;
197        self.memory_allocated += req.memory_mb;
198
199        // Allocate GPUs
200        for _ in 0..req.gpu_count {
201            if let Some(gpu) = self.gpus.iter()
202                .find(|g| !self.gpus_allocated.contains(&g.device_id) 
203                    && g.available_memory_mb() >= req.gpu_memory_mb)
204            {
205                self.gpus_allocated.push(gpu.device_id);
206            }
207        }
208
209        true
210    }
211
212    /// Release resources
213    pub fn release(&mut self, req: &ResourceRequirements, gpu_ids: &[u32]) {
214        self.cpu_allocated = self.cpu_allocated.saturating_sub(req.cpu_millis);
215        self.memory_allocated = self.memory_allocated.saturating_sub(req.memory_mb);
216        self.gpus_allocated.retain(|id| !gpu_ids.contains(id));
217    }
218}
219
220/// Node taint for scheduling constraints
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct Taint {
223    /// Taint key
224    pub key: String,
225    /// Taint value
226    pub value: String,
227    /// Taint effect
228    pub effect: TaintEffect,
229}
230
231/// Taint effect
232#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
233pub enum TaintEffect {
234    /// Do not schedule new workloads
235    NoSchedule,
236    /// Prefer not to schedule
237    PreferNoSchedule,
238    /// Evict existing workloads
239    NoExecute,
240}
241
242/// Node condition
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct NodeCondition {
245    /// Condition type
246    pub condition_type: String,
247    /// Condition status
248    pub status: bool,
249    /// Last transition time
250    pub last_transition: chrono::DateTime<chrono::Utc>,
251    /// Reason for condition
252    pub reason: Option<String>,
253    /// Human-readable message
254    pub message: Option<String>,
255}
256
257/// Workload to be scheduled
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct Workload {
260    /// Unique workload ID
261    pub id: String,
262    /// Workload name
263    pub name: String,
264    /// Namespace
265    pub namespace: String,
266    /// Resource requirements
267    pub resources: ResourceRequirements,
268    /// Priority (higher = more important)
269    pub priority: i32,
270    /// Priority class name
271    pub priority_class: Option<String>,
272    /// Placement constraints
273    pub constraints: Vec<PlacementConstraint>,
274    /// Node affinity rules
275    pub affinity: Option<Affinity>,
276    /// Tolerations for taints
277    pub tolerations: Vec<Toleration>,
278    /// Preemption policy
279    pub preemption_policy: PreemptionPolicy,
280    /// Creation timestamp
281    pub created_at: chrono::DateTime<chrono::Utc>,
282}
283
284impl Workload {
285    /// Create a new workload
286    pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
287        Self {
288            id: id.into(),
289            name: name.into(),
290            namespace: "default".to_string(),
291            resources: ResourceRequirements::default(),
292            priority: 0,
293            priority_class: None,
294            constraints: Vec::new(),
295            affinity: None,
296            tolerations: Vec::new(),
297            preemption_policy: PreemptionPolicy::PreemptLowerPriority,
298            created_at: chrono::Utc::now(),
299        }
300    }
301
302    /// Set resource requirements
303    pub fn with_resources(mut self, resources: ResourceRequirements) -> Self {
304        self.resources = resources;
305        self
306    }
307
308    /// Set priority
309    pub fn with_priority(mut self, priority: i32) -> Self {
310        self.priority = priority;
311        self
312    }
313
314    /// Add constraint
315    pub fn with_constraint(mut self, constraint: PlacementConstraint) -> Self {
316        self.constraints.push(constraint);
317        self
318    }
319
320    /// Set affinity
321    pub fn with_affinity(mut self, affinity: Affinity) -> Self {
322        self.affinity = Some(affinity);
323        self
324    }
325
326    /// Add toleration
327    pub fn with_toleration(mut self, toleration: Toleration) -> Self {
328        self.tolerations.push(toleration);
329        self
330    }
331}
332
333/// Toleration for node taints
334#[derive(Debug, Clone, Serialize, Deserialize)]
335pub struct Toleration {
336    /// Key to match
337    pub key: Option<String>,
338    /// Operator for matching
339    pub operator: TolerationOperator,
340    /// Value to match
341    pub value: Option<String>,
342    /// Effect to tolerate
343    pub effect: Option<TaintEffect>,
344    /// Toleration seconds for NoExecute
345    pub toleration_seconds: Option<u64>,
346}
347
348/// Toleration operator
349#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
350pub enum TolerationOperator {
351    /// Key must equal value
352    Equal,
353    /// Key must exist
354    Exists,
355}
356
357/// Scheduling decision
358#[derive(Debug, Clone)]
359pub struct SchedulingDecision {
360    /// Workload ID
361    pub workload_id: String,
362    /// Selected node
363    pub node_id: Option<NodeId>,
364    /// Score for the selected node
365    pub score: f64,
366    /// Reason for decision
367    pub reason: String,
368    /// Preempted workloads (if any)
369    pub preempted: Vec<String>,
370    /// Scheduling latency
371    pub latency_ms: u64,
372}
373
374/// The main scheduler
375pub struct Scheduler {
376    /// Registered nodes
377    nodes: Arc<RwLock<HashMap<NodeId, NodeResources>>>,
378    /// Scheduling queue
379    queue: Arc<SchedulingQueue>,
380    /// Scheduling algorithm
381    algorithm: Arc<dyn SchedulingAlgorithm + Send + Sync>,
382    /// Scheduled workloads (workload_id -> node_id)
383    assignments: Arc<RwLock<HashMap<String, NodeId>>>,
384    /// Priority classes
385    priority_classes: Arc<RwLock<HashMap<String, PriorityClass>>>,
386}
387
388impl Scheduler {
389    /// Create a new scheduler with default algorithm
390    pub fn new() -> Self {
391        Self {
392            nodes: Arc::new(RwLock::new(HashMap::new())),
393            queue: Arc::new(SchedulingQueue::new()),
394            algorithm: Arc::new(BinPackScheduler::new()),
395            assignments: Arc::new(RwLock::new(HashMap::new())),
396            priority_classes: Arc::new(RwLock::new(HashMap::new())),
397        }
398    }
399
400    /// Create with specific algorithm
401    pub fn with_algorithm<A: SchedulingAlgorithm + Send + Sync + 'static>(algorithm: A) -> Self {
402        Self {
403            nodes: Arc::new(RwLock::new(HashMap::new())),
404            queue: Arc::new(SchedulingQueue::new()),
405            algorithm: Arc::new(algorithm),
406            assignments: Arc::new(RwLock::new(HashMap::new())),
407            priority_classes: Arc::new(RwLock::new(HashMap::new())),
408        }
409    }
410
411    /// Register a node
412    pub fn register_node(&self, node: NodeResources) {
413        info!(node_id = %node.node_id, "Registering node");
414        self.nodes.write().insert(node.node_id, node);
415    }
416
417    /// Unregister a node
418    pub fn unregister_node(&self, node_id: &NodeId) {
419        info!(node_id = %node_id, "Unregistering node");
420        self.nodes.write().remove(node_id);
421    }
422
423    /// Update node resources
424    pub fn update_node(&self, node: NodeResources) {
425        self.nodes.write().insert(node.node_id, node);
426    }
427
428    /// Get node count
429    pub fn node_count(&self) -> usize {
430        self.nodes.read().len()
431    }
432
433    /// Submit workload for scheduling
434    pub fn submit(&self, workload: Workload) {
435        debug!(workload_id = %workload.id, "Submitting workload");
436        self.queue.enqueue(workload);
437    }
438
439    /// Schedule next workload from queue
440    pub fn schedule_next(&self) -> Option<SchedulingDecision> {
441        let workload = self.queue.dequeue()?;
442        Some(self.schedule(&workload))
443    }
444
445    /// Schedule a specific workload
446    pub fn schedule(&self, workload: &Workload) -> SchedulingDecision {
447        let start = std::time::Instant::now();
448        let nodes = self.nodes.read();
449
450        // Filter nodes that can fit the workload
451        let candidates: Vec<_> = nodes.values()
452            .filter(|n| n.can_fit(&workload.resources))
453            .filter(|n| self.check_constraints(workload, n))
454            .filter(|n| self.check_taints(workload, n))
455            .collect();
456
457        if candidates.is_empty() {
458            // Try preemption
459            drop(nodes);
460            if let Some(decision) = self.try_preemption(workload) {
461                return decision;
462            }
463
464            return SchedulingDecision {
465                workload_id: workload.id.clone(),
466                node_id: None,
467                score: 0.0,
468                reason: "No suitable nodes available".to_string(),
469                preempted: Vec::new(),
470                latency_ms: start.elapsed().as_millis() as u64,
471            };
472        }
473
474        // Score and select best node
475        let scored: Vec<_> = candidates.iter()
476            .map(|n| (n, self.algorithm.score(workload, n)))
477            .collect();
478
479        let (best_node, score) = scored.iter()
480            .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
481            .map(|(n, s)| (*n, *s))
482            .unwrap();
483
484        let node_id = best_node.node_id;
485        drop(nodes);
486
487        // Allocate resources
488        if let Some(node) = self.nodes.write().get_mut(&node_id) {
489            node.allocate(&workload.resources);
490        }
491
492        self.assignments.write().insert(workload.id.clone(), node_id);
493
494        info!(
495            workload_id = %workload.id,
496            node_id = %node_id,
497            score = score,
498            "Workload scheduled"
499        );
500
501        SchedulingDecision {
502            workload_id: workload.id.clone(),
503            node_id: Some(node_id),
504            score,
505            reason: "Scheduled successfully".to_string(),
506            preempted: Vec::new(),
507            latency_ms: start.elapsed().as_millis() as u64,
508        }
509    }
510
511    /// Check placement constraints
512    fn check_constraints(&self, workload: &Workload, node: &NodeResources) -> bool {
513        for constraint in &workload.constraints {
514            if !constraint.matches(node) {
515                return false;
516            }
517        }
518
519        // Check affinity
520        if let Some(affinity) = &workload.affinity {
521            if !affinity.matches(node) {
522                return false;
523            }
524        }
525
526        true
527    }
528
529    /// Check if workload tolerates node taints
530    fn check_taints(&self, workload: &Workload, node: &NodeResources) -> bool {
531        for taint in &node.taints {
532            let tolerated = workload.tolerations.iter().any(|t| {
533                // Check key match
534                let key_matches = t.key.as_ref().map(|k| k == &taint.key).unwrap_or(true);
535                
536                // Check operator
537                let value_matches = match t.operator {
538                    TolerationOperator::Exists => true,
539                    TolerationOperator::Equal => {
540                        t.value.as_ref().map(|v| v == &taint.value).unwrap_or(false)
541                    }
542                };
543
544                // Check effect
545                let effect_matches = t.effect.map(|e| e == taint.effect).unwrap_or(true);
546
547                key_matches && value_matches && effect_matches
548            });
549
550            if !tolerated && taint.effect == TaintEffect::NoSchedule {
551                return false;
552            }
553        }
554
555        true
556    }
557
558    /// Try to preempt lower priority workloads
559    fn try_preemption(&self, workload: &Workload) -> Option<SchedulingDecision> {
560        if workload.preemption_policy == PreemptionPolicy::Never {
561            return None;
562        }
563
564        // Find workloads that can be preempted
565        let assignments = self.assignments.read();
566        let mut nodes = self.nodes.write();
567
568        for (node_id, node) in nodes.iter_mut() {
569            // Find lower priority workloads on this node
570            let preemptable: Vec<_> = assignments.iter()
571                .filter(|(_, n)| *n == node_id)
572                .map(|(w, _)| w.clone())
573                .collect();
574
575            // Simulate releasing resources
576            // In a real implementation, we'd track workload resources per node
577            if node.can_fit(&workload.resources) || !preemptable.is_empty() {
578                // For now, just return that preemption is possible
579                warn!(
580                    workload_id = %workload.id,
581                    node_id = %node_id,
582                    "Preemption would be required"
583                );
584            }
585        }
586
587        None
588    }
589
590    /// Get workload assignment
591    pub fn get_assignment(&self, workload_id: &str) -> Option<NodeId> {
592        self.assignments.read().get(workload_id).copied()
593    }
594
595    /// Release workload resources
596    pub fn release(&self, workload_id: &str, resources: &ResourceRequirements, gpu_ids: &[u32]) {
597        if let Some(node_id) = self.assignments.write().remove(workload_id) {
598            if let Some(node) = self.nodes.write().get_mut(&node_id) {
599                node.release(resources, gpu_ids);
600            }
601        }
602    }
603
604    /// Get queue length
605    pub fn queue_len(&self) -> usize {
606        self.queue.len()
607    }
608
609    /// Register priority class
610    pub fn register_priority_class(&self, class: PriorityClass) {
611        self.priority_classes.write().insert(class.name.clone(), class);
612    }
613}
614
615impl Default for Scheduler {
616    fn default() -> Self {
617        Self::new()
618    }
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624
625    #[test]
626    fn test_node_resources() {
627        let mut node = NodeResources::new(NodeId::new(), 4000, 8192);
628        
629        let req = ResourceRequirements::new().cpu(1000).memory(2048);
630        assert!(node.can_fit(&req));
631        
632        assert!(node.allocate(&req));
633        assert_eq!(node.cpu_available(), 3000);
634        assert_eq!(node.memory_available(), 6144);
635    }
636
637    #[test]
638    fn test_scheduler_basic() {
639        let scheduler = Scheduler::new();
640        
641        let node = NodeResources::new(NodeId::new(), 4000, 8192);
642        scheduler.register_node(node);
643        
644        let workload = Workload::new("w1", "test")
645            .with_resources(ResourceRequirements::new().cpu(1000).memory(1024));
646        
647        let decision = scheduler.schedule(&workload);
648        assert!(decision.node_id.is_some());
649    }
650
651    #[test]
652    fn test_scheduler_no_capacity() {
653        let scheduler = Scheduler::new();
654        
655        let node = NodeResources::new(NodeId::new(), 1000, 1024);
656        scheduler.register_node(node);
657        
658        let workload = Workload::new("w1", "test")
659            .with_resources(ResourceRequirements::new().cpu(2000).memory(2048));
660        
661        let decision = scheduler.schedule(&workload);
662        assert!(decision.node_id.is_none());
663    }
664}