forge_orchestration/scheduler/
mod.rs

1//! Distributed Scheduler for Forge Orchestration
2//!
3//! A full-featured scheduler comparable to Kubernetes, with:
4//! - Bin-packing and spread scheduling algorithms
5//! - Affinity/anti-affinity rules
6//! - Resource-aware placement (CPU, memory, GPU)
7//! - Preemption and priority-based scheduling
8//! - Topology-aware scheduling for NUMA/GPU locality
9
10pub mod algorithms;
11pub mod optimized;
12pub mod placement;
13pub mod preemption;
14pub mod queue;
15
16use std::collections::HashMap;
17use std::sync::Arc;
18use parking_lot::RwLock;
19use serde::{Deserialize, Serialize};
20use tracing::{debug, info, warn};
21
22use crate::types::{NodeId, GpuResources};
23
24pub use algorithms::{SchedulingAlgorithm, BinPackScheduler, SpreadScheduler, GpuLocalityScheduler};
25pub use optimized::{OptimizedScheduler, WorkloadBatch, SchedulerStats, ClusterUtilization, FFDBinPacker};
26pub use placement::{PlacementConstraint, AffinityRule, Affinity};
27pub use preemption::{PreemptionPolicy, PriorityClass};
28pub use queue::{SchedulingQueue, QueuedWorkload};
29
30/// Resource requirements for a workload
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ResourceRequirements {
33    /// CPU cores requested (millicores, 1000 = 1 core)
34    pub cpu_millis: u64,
35    /// Memory requested in MB
36    pub memory_mb: u64,
37    /// GPU count requested
38    pub gpu_count: u32,
39    /// GPU memory required per GPU in MB
40    pub gpu_memory_mb: u64,
41    /// Ephemeral storage in MB
42    pub storage_mb: u64,
43    /// Network bandwidth in Mbps
44    pub network_mbps: u32,
45}
46
47impl Default for ResourceRequirements {
48    fn default() -> Self {
49        Self {
50            cpu_millis: 100,
51            memory_mb: 128,
52            gpu_count: 0,
53            gpu_memory_mb: 0,
54            storage_mb: 0,
55            network_mbps: 0,
56        }
57    }
58}
59
60impl ResourceRequirements {
61    /// Create new resource requirements
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    /// Set CPU requirement
67    pub fn cpu(mut self, millis: u64) -> Self {
68        self.cpu_millis = millis;
69        self
70    }
71
72    /// Set memory requirement
73    pub fn memory(mut self, mb: u64) -> Self {
74        self.memory_mb = mb;
75        self
76    }
77
78    /// Set GPU requirements
79    pub fn gpu(mut self, count: u32, memory_mb: u64) -> Self {
80        self.gpu_count = count;
81        self.gpu_memory_mb = memory_mb;
82        self
83    }
84}
85
86/// Node resources and capacity
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct NodeResources {
89    /// Node identifier
90    pub node_id: NodeId,
91    /// Total CPU capacity (millicores)
92    pub cpu_capacity: u64,
93    /// Allocated CPU (millicores)
94    pub cpu_allocated: u64,
95    /// Total memory capacity (MB)
96    pub memory_capacity: u64,
97    /// Allocated memory (MB)
98    pub memory_allocated: u64,
99    /// GPU resources
100    pub gpus: Vec<GpuResources>,
101    /// GPUs allocated (by device ID)
102    pub gpus_allocated: Vec<u32>,
103    /// Node labels for affinity matching
104    pub labels: HashMap<String, String>,
105    /// Node taints
106    pub taints: Vec<Taint>,
107    /// Is node schedulable
108    pub schedulable: bool,
109    /// Node conditions
110    pub conditions: Vec<NodeCondition>,
111}
112
113impl NodeResources {
114    /// Create new node resources
115    pub fn new(node_id: NodeId, cpu_capacity: u64, memory_capacity: u64) -> Self {
116        Self {
117            node_id,
118            cpu_capacity,
119            cpu_allocated: 0,
120            memory_capacity,
121            memory_allocated: 0,
122            gpus: Vec::new(),
123            gpus_allocated: Vec::new(),
124            labels: HashMap::new(),
125            taints: Vec::new(),
126            schedulable: true,
127            conditions: Vec::new(),
128        }
129    }
130
131    /// Add GPU to node
132    pub fn with_gpu(mut self, gpu: GpuResources) -> Self {
133        self.gpus.push(gpu);
134        self
135    }
136
137    /// Add label
138    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
139        self.labels.insert(key.into(), value.into());
140        self
141    }
142
143    /// Add taint
144    pub fn with_taint(mut self, taint: Taint) -> Self {
145        self.taints.push(taint);
146        self
147    }
148
149    /// Available CPU
150    pub fn cpu_available(&self) -> u64 {
151        self.cpu_capacity.saturating_sub(self.cpu_allocated)
152    }
153
154    /// Available memory
155    pub fn memory_available(&self) -> u64 {
156        self.memory_capacity.saturating_sub(self.memory_allocated)
157    }
158
159    /// Available GPUs
160    pub fn gpus_available(&self) -> usize {
161        self.gpus.len() - self.gpus_allocated.len()
162    }
163
164    /// Check if node can fit workload
165    pub fn can_fit(&self, req: &ResourceRequirements) -> bool {
166        if !self.schedulable {
167            return false;
168        }
169
170        if self.cpu_available() < req.cpu_millis {
171            return false;
172        }
173
174        if self.memory_available() < req.memory_mb {
175            return false;
176        }
177
178        if req.gpu_count > 0 {
179            let available_gpus: Vec<_> = self.gpus.iter()
180                .filter(|g| !self.gpus_allocated.contains(&g.device_id))
181                .filter(|g| g.available_memory_mb() >= req.gpu_memory_mb)
182                .collect();
183            
184            if available_gpus.len() < req.gpu_count as usize {
185                return false;
186            }
187        }
188
189        true
190    }
191
192    /// Allocate resources for workload
193    pub fn allocate(&mut self, req: &ResourceRequirements) -> bool {
194        if !self.can_fit(req) {
195            return false;
196        }
197
198        self.cpu_allocated += req.cpu_millis;
199        self.memory_allocated += req.memory_mb;
200
201        // Allocate GPUs
202        for _ in 0..req.gpu_count {
203            if let Some(gpu) = self.gpus.iter()
204                .find(|g| !self.gpus_allocated.contains(&g.device_id) 
205                    && g.available_memory_mb() >= req.gpu_memory_mb)
206            {
207                self.gpus_allocated.push(gpu.device_id);
208            }
209        }
210
211        true
212    }
213
214    /// Release resources
215    pub fn release(&mut self, req: &ResourceRequirements, gpu_ids: &[u32]) {
216        self.cpu_allocated = self.cpu_allocated.saturating_sub(req.cpu_millis);
217        self.memory_allocated = self.memory_allocated.saturating_sub(req.memory_mb);
218        self.gpus_allocated.retain(|id| !gpu_ids.contains(id));
219    }
220}
221
222/// Node taint for scheduling constraints
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct Taint {
225    /// Taint key
226    pub key: String,
227    /// Taint value
228    pub value: String,
229    /// Taint effect
230    pub effect: TaintEffect,
231}
232
233/// Taint effect
234#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
235pub enum TaintEffect {
236    /// Do not schedule new workloads
237    NoSchedule,
238    /// Prefer not to schedule
239    PreferNoSchedule,
240    /// Evict existing workloads
241    NoExecute,
242}
243
244/// Node condition
245#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct NodeCondition {
247    /// Condition type
248    pub condition_type: String,
249    /// Condition status
250    pub status: bool,
251    /// Last transition time
252    pub last_transition: chrono::DateTime<chrono::Utc>,
253    /// Reason for condition
254    pub reason: Option<String>,
255    /// Human-readable message
256    pub message: Option<String>,
257}
258
259/// Workload to be scheduled
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct Workload {
262    /// Unique workload ID
263    pub id: String,
264    /// Workload name
265    pub name: String,
266    /// Namespace
267    pub namespace: String,
268    /// Resource requirements
269    pub resources: ResourceRequirements,
270    /// Priority (higher = more important)
271    pub priority: i32,
272    /// Priority class name
273    pub priority_class: Option<String>,
274    /// Placement constraints
275    pub constraints: Vec<PlacementConstraint>,
276    /// Node affinity rules
277    pub affinity: Option<Affinity>,
278    /// Tolerations for taints
279    pub tolerations: Vec<Toleration>,
280    /// Preemption policy
281    pub preemption_policy: PreemptionPolicy,
282    /// Creation timestamp
283    pub created_at: chrono::DateTime<chrono::Utc>,
284}
285
286impl Workload {
287    /// Create a new workload
288    pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
289        Self {
290            id: id.into(),
291            name: name.into(),
292            namespace: "default".to_string(),
293            resources: ResourceRequirements::default(),
294            priority: 0,
295            priority_class: None,
296            constraints: Vec::new(),
297            affinity: None,
298            tolerations: Vec::new(),
299            preemption_policy: PreemptionPolicy::PreemptLowerPriority,
300            created_at: chrono::Utc::now(),
301        }
302    }
303
304    /// Set resource requirements
305    pub fn with_resources(mut self, resources: ResourceRequirements) -> Self {
306        self.resources = resources;
307        self
308    }
309
310    /// Set priority
311    pub fn with_priority(mut self, priority: i32) -> Self {
312        self.priority = priority;
313        self
314    }
315
316    /// Add constraint
317    pub fn with_constraint(mut self, constraint: PlacementConstraint) -> Self {
318        self.constraints.push(constraint);
319        self
320    }
321
322    /// Set affinity
323    pub fn with_affinity(mut self, affinity: Affinity) -> Self {
324        self.affinity = Some(affinity);
325        self
326    }
327
328    /// Add toleration
329    pub fn with_toleration(mut self, toleration: Toleration) -> Self {
330        self.tolerations.push(toleration);
331        self
332    }
333}
334
335/// Toleration for node taints
336#[derive(Debug, Clone, Serialize, Deserialize)]
337pub struct Toleration {
338    /// Key to match
339    pub key: Option<String>,
340    /// Operator for matching
341    pub operator: TolerationOperator,
342    /// Value to match
343    pub value: Option<String>,
344    /// Effect to tolerate
345    pub effect: Option<TaintEffect>,
346    /// Toleration seconds for NoExecute
347    pub toleration_seconds: Option<u64>,
348}
349
350/// Toleration operator
351#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
352pub enum TolerationOperator {
353    /// Key must equal value
354    Equal,
355    /// Key must exist
356    Exists,
357}
358
359/// Scheduling decision
360#[derive(Debug, Clone)]
361pub struct SchedulingDecision {
362    /// Workload ID
363    pub workload_id: String,
364    /// Selected node
365    pub node_id: Option<NodeId>,
366    /// Score for the selected node
367    pub score: f64,
368    /// Reason for decision
369    pub reason: String,
370    /// Preempted workloads (if any)
371    pub preempted: Vec<String>,
372    /// Scheduling latency
373    pub latency_ms: u64,
374}
375
376/// The main scheduler
377pub struct Scheduler {
378    /// Registered nodes
379    nodes: Arc<RwLock<HashMap<NodeId, NodeResources>>>,
380    /// Scheduling queue
381    queue: Arc<SchedulingQueue>,
382    /// Scheduling algorithm
383    algorithm: Arc<dyn SchedulingAlgorithm + Send + Sync>,
384    /// Scheduled workloads (workload_id -> node_id)
385    assignments: Arc<RwLock<HashMap<String, NodeId>>>,
386    /// Priority classes
387    priority_classes: Arc<RwLock<HashMap<String, PriorityClass>>>,
388}
389
390impl Scheduler {
391    /// Create a new scheduler with default algorithm
392    pub fn new() -> Self {
393        Self {
394            nodes: Arc::new(RwLock::new(HashMap::new())),
395            queue: Arc::new(SchedulingQueue::new()),
396            algorithm: Arc::new(BinPackScheduler::new()),
397            assignments: Arc::new(RwLock::new(HashMap::new())),
398            priority_classes: Arc::new(RwLock::new(HashMap::new())),
399        }
400    }
401
402    /// Create with specific algorithm
403    pub fn with_algorithm<A: SchedulingAlgorithm + Send + Sync + 'static>(algorithm: A) -> Self {
404        Self {
405            nodes: Arc::new(RwLock::new(HashMap::new())),
406            queue: Arc::new(SchedulingQueue::new()),
407            algorithm: Arc::new(algorithm),
408            assignments: Arc::new(RwLock::new(HashMap::new())),
409            priority_classes: Arc::new(RwLock::new(HashMap::new())),
410        }
411    }
412
413    /// Register a node
414    pub fn register_node(&self, node: NodeResources) {
415        info!(node_id = %node.node_id, "Registering node");
416        self.nodes.write().insert(node.node_id, node);
417    }
418
419    /// Unregister a node
420    pub fn unregister_node(&self, node_id: &NodeId) {
421        info!(node_id = %node_id, "Unregistering node");
422        self.nodes.write().remove(node_id);
423    }
424
425    /// Update node resources
426    pub fn update_node(&self, node: NodeResources) {
427        self.nodes.write().insert(node.node_id, node);
428    }
429
430    /// Get node count
431    pub fn node_count(&self) -> usize {
432        self.nodes.read().len()
433    }
434
435    /// Submit workload for scheduling
436    pub fn submit(&self, workload: Workload) {
437        debug!(workload_id = %workload.id, "Submitting workload");
438        self.queue.enqueue(workload);
439    }
440
441    /// Schedule next workload from queue
442    pub fn schedule_next(&self) -> Option<SchedulingDecision> {
443        let workload = self.queue.dequeue()?;
444        Some(self.schedule(&workload))
445    }
446
447    /// Schedule a specific workload
448    pub fn schedule(&self, workload: &Workload) -> SchedulingDecision {
449        let start = std::time::Instant::now();
450        let nodes = self.nodes.read();
451
452        // Filter nodes that can fit the workload
453        let candidates: Vec<_> = nodes.values()
454            .filter(|n| n.can_fit(&workload.resources))
455            .filter(|n| self.check_constraints(workload, n))
456            .filter(|n| self.check_taints(workload, n))
457            .collect();
458
459        if candidates.is_empty() {
460            // Try preemption
461            drop(nodes);
462            if let Some(decision) = self.try_preemption(workload) {
463                return decision;
464            }
465
466            return SchedulingDecision {
467                workload_id: workload.id.clone(),
468                node_id: None,
469                score: 0.0,
470                reason: "No suitable nodes available".to_string(),
471                preempted: Vec::new(),
472                latency_ms: start.elapsed().as_millis() as u64,
473            };
474        }
475
476        // Score and select best node
477        let scored: Vec<_> = candidates.iter()
478            .map(|n| (n, self.algorithm.score(workload, n)))
479            .collect();
480
481        let (best_node, score) = scored.iter()
482            .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
483            .map(|(n, s)| (*n, *s))
484            .unwrap();
485
486        let node_id = best_node.node_id;
487        drop(nodes);
488
489        // Allocate resources
490        if let Some(node) = self.nodes.write().get_mut(&node_id) {
491            node.allocate(&workload.resources);
492        }
493
494        self.assignments.write().insert(workload.id.clone(), node_id);
495
496        info!(
497            workload_id = %workload.id,
498            node_id = %node_id,
499            score = score,
500            "Workload scheduled"
501        );
502
503        SchedulingDecision {
504            workload_id: workload.id.clone(),
505            node_id: Some(node_id),
506            score,
507            reason: "Scheduled successfully".to_string(),
508            preempted: Vec::new(),
509            latency_ms: start.elapsed().as_millis() as u64,
510        }
511    }
512
513    /// Check placement constraints
514    fn check_constraints(&self, workload: &Workload, node: &NodeResources) -> bool {
515        for constraint in &workload.constraints {
516            if !constraint.matches(node) {
517                return false;
518            }
519        }
520
521        // Check affinity
522        if let Some(affinity) = &workload.affinity {
523            if !affinity.matches(node) {
524                return false;
525            }
526        }
527
528        true
529    }
530
531    /// Check if workload tolerates node taints
532    fn check_taints(&self, workload: &Workload, node: &NodeResources) -> bool {
533        for taint in &node.taints {
534            let tolerated = workload.tolerations.iter().any(|t| {
535                // Check key match
536                let key_matches = t.key.as_ref().map(|k| k == &taint.key).unwrap_or(true);
537                
538                // Check operator
539                let value_matches = match t.operator {
540                    TolerationOperator::Exists => true,
541                    TolerationOperator::Equal => {
542                        t.value.as_ref().map(|v| v == &taint.value).unwrap_or(false)
543                    }
544                };
545
546                // Check effect
547                let effect_matches = t.effect.map(|e| e == taint.effect).unwrap_or(true);
548
549                key_matches && value_matches && effect_matches
550            });
551
552            if !tolerated && taint.effect == TaintEffect::NoSchedule {
553                return false;
554            }
555        }
556
557        true
558    }
559
560    /// Try to preempt lower priority workloads
561    fn try_preemption(&self, workload: &Workload) -> Option<SchedulingDecision> {
562        if workload.preemption_policy == PreemptionPolicy::Never {
563            return None;
564        }
565
566        // Find workloads that can be preempted
567        let assignments = self.assignments.read();
568        let mut nodes = self.nodes.write();
569
570        for (node_id, node) in nodes.iter_mut() {
571            // Find lower priority workloads on this node
572            let preemptable: Vec<_> = assignments.iter()
573                .filter(|(_, n)| *n == node_id)
574                .map(|(w, _)| w.clone())
575                .collect();
576
577            // Simulate releasing resources
578            // In a real implementation, we'd track workload resources per node
579            if node.can_fit(&workload.resources) || !preemptable.is_empty() {
580                // For now, just return that preemption is possible
581                warn!(
582                    workload_id = %workload.id,
583                    node_id = %node_id,
584                    "Preemption would be required"
585                );
586            }
587        }
588
589        None
590    }
591
592    /// Get workload assignment
593    pub fn get_assignment(&self, workload_id: &str) -> Option<NodeId> {
594        self.assignments.read().get(workload_id).copied()
595    }
596
597    /// Release workload resources
598    pub fn release(&self, workload_id: &str, resources: &ResourceRequirements, gpu_ids: &[u32]) {
599        if let Some(node_id) = self.assignments.write().remove(workload_id) {
600            if let Some(node) = self.nodes.write().get_mut(&node_id) {
601                node.release(resources, gpu_ids);
602            }
603        }
604    }
605
606    /// Get queue length
607    pub fn queue_len(&self) -> usize {
608        self.queue.len()
609    }
610
611    /// Register priority class
612    pub fn register_priority_class(&self, class: PriorityClass) {
613        self.priority_classes.write().insert(class.name.clone(), class);
614    }
615}
616
617impl Default for Scheduler {
618    fn default() -> Self {
619        Self::new()
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    #[test]
628    fn test_node_resources() {
629        let mut node = NodeResources::new(NodeId::new(), 4000, 8192);
630        
631        let req = ResourceRequirements::new().cpu(1000).memory(2048);
632        assert!(node.can_fit(&req));
633        
634        assert!(node.allocate(&req));
635        assert_eq!(node.cpu_available(), 3000);
636        assert_eq!(node.memory_available(), 6144);
637    }
638
639    #[test]
640    fn test_scheduler_basic() {
641        let scheduler = Scheduler::new();
642        
643        let node = NodeResources::new(NodeId::new(), 4000, 8192);
644        scheduler.register_node(node);
645        
646        let workload = Workload::new("w1", "test")
647            .with_resources(ResourceRequirements::new().cpu(1000).memory(1024));
648        
649        let decision = scheduler.schedule(&workload);
650        assert!(decision.node_id.is_some());
651    }
652
653    #[test]
654    fn test_scheduler_no_capacity() {
655        let scheduler = Scheduler::new();
656        
657        let node = NodeResources::new(NodeId::new(), 1000, 1024);
658        scheduler.register_node(node);
659        
660        let workload = Workload::new("w1", "test")
661            .with_resources(ResourceRequirements::new().cpu(2000).memory(2048));
662        
663        let decision = scheduler.schedule(&workload);
664        assert!(decision.node_id.is_none());
665    }
666}