forge_orchestration/scheduler/
algorithms.rs

1//! Scheduling algorithms for workload placement
2//!
3//! Implements multiple scheduling strategies including:
4//! - Bin-packing (maximize utilization)
5//! - Spread (maximize availability)
6//! - GPU locality (minimize data movement)
7//! - Learned routing (ML-based adaptive scheduling)
8
9use super::{NodeResources, Workload};
10
11/// Trait for scheduling algorithms
12pub trait SchedulingAlgorithm: Send + Sync {
13    /// Score a node for a workload (higher = better)
14    fn score(&self, workload: &Workload, node: &NodeResources) -> f64;
15    
16    /// Algorithm name
17    fn name(&self) -> &str;
18}
19
20/// Bin-packing scheduler - maximizes node utilization
21#[derive(Debug, Clone)]
22pub struct BinPackScheduler {
23    /// Weight for CPU utilization
24    cpu_weight: f64,
25    /// Weight for memory utilization
26    memory_weight: f64,
27    /// Weight for GPU utilization
28    gpu_weight: f64,
29}
30
31impl BinPackScheduler {
32    /// Create new bin-pack scheduler
33    pub fn new() -> Self {
34        Self {
35            cpu_weight: 1.0,
36            memory_weight: 1.0,
37            gpu_weight: 2.0, // GPUs are expensive, pack them tighter
38        }
39    }
40
41    /// Set weights
42    pub fn with_weights(mut self, cpu: f64, memory: f64, gpu: f64) -> Self {
43        self.cpu_weight = cpu;
44        self.memory_weight = memory;
45        self.gpu_weight = gpu;
46        self
47    }
48}
49
50impl Default for BinPackScheduler {
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56impl SchedulingAlgorithm for BinPackScheduler {
57    fn score(&self, workload: &Workload, node: &NodeResources) -> f64 {
58        // Prefer nodes that are already more utilized (bin-packing)
59        let cpu_util = node.cpu_allocated as f64 / node.cpu_capacity as f64;
60        let mem_util = node.memory_allocated as f64 / node.memory_capacity as f64;
61        
62        let gpu_util = if !node.gpus.is_empty() {
63            node.gpus_allocated.len() as f64 / node.gpus.len() as f64
64        } else {
65            0.0
66        };
67
68        // Score based on current utilization (higher util = higher score for bin-packing)
69        let base_score = (cpu_util * self.cpu_weight + mem_util * self.memory_weight + gpu_util * self.gpu_weight)
70            / (self.cpu_weight + self.memory_weight + self.gpu_weight);
71
72        // Penalize if workload barely fits
73        let cpu_headroom = (node.cpu_available() as f64 - workload.resources.cpu_millis as f64) 
74            / node.cpu_capacity as f64;
75        let mem_headroom = (node.memory_available() as f64 - workload.resources.memory_mb as f64)
76            / node.memory_capacity as f64;
77
78        let headroom_penalty = if cpu_headroom < 0.05 || mem_headroom < 0.05 {
79            0.1
80        } else {
81            0.0
82        };
83
84        (base_score - headroom_penalty).max(0.0)
85    }
86
87    fn name(&self) -> &str {
88        "bin-pack"
89    }
90}
91
92/// Spread scheduler - maximizes availability by spreading workloads
93#[derive(Debug, Clone)]
94pub struct SpreadScheduler {
95    /// Topology key for spreading (e.g., "zone", "rack")
96    topology_key: Option<String>,
97}
98
99impl SpreadScheduler {
100    /// Create new spread scheduler
101    pub fn new() -> Self {
102        Self { topology_key: None }
103    }
104
105    /// Set topology key for spreading
106    pub fn with_topology(mut self, key: impl Into<String>) -> Self {
107        self.topology_key = Some(key.into());
108        self
109    }
110}
111
112impl Default for SpreadScheduler {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118impl SchedulingAlgorithm for SpreadScheduler {
119    fn score(&self, _workload: &Workload, node: &NodeResources) -> f64 {
120        // Prefer nodes with lower utilization (spreading)
121        let cpu_util = node.cpu_allocated as f64 / node.cpu_capacity as f64;
122        let mem_util = node.memory_allocated as f64 / node.memory_capacity as f64;
123
124        // Invert utilization for spread scoring
125        let spread_score = 1.0 - (cpu_util + mem_util) / 2.0;
126
127        // Bonus for matching topology key
128        let topology_bonus = if let Some(key) = &self.topology_key {
129            if node.labels.contains_key(key) { 0.1 } else { 0.0 }
130        } else {
131            0.0
132        };
133
134        spread_score + topology_bonus
135    }
136
137    fn name(&self) -> &str {
138        "spread"
139    }
140}
141
142/// GPU locality scheduler - optimizes for GPU workloads
143#[derive(Debug, Clone)]
144pub struct GpuLocalityScheduler {
145    /// Prefer nodes with tensor cores
146    prefer_tensor_cores: bool,
147    /// Minimum compute capability
148    min_compute_capability: Option<f32>,
149    /// Prefer NVLink interconnect
150    prefer_nvlink: bool,
151}
152
153impl GpuLocalityScheduler {
154    /// Create new GPU locality scheduler
155    pub fn new() -> Self {
156        Self {
157            prefer_tensor_cores: true,
158            min_compute_capability: None,
159            prefer_nvlink: true,
160        }
161    }
162
163    /// Set minimum compute capability
164    pub fn min_compute_capability(mut self, cc: f32) -> Self {
165        self.min_compute_capability = Some(cc);
166        self
167    }
168
169    /// Set tensor core preference
170    pub fn prefer_tensor_cores(mut self, prefer: bool) -> Self {
171        self.prefer_tensor_cores = prefer;
172        self
173    }
174}
175
176impl Default for GpuLocalityScheduler {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182impl SchedulingAlgorithm for GpuLocalityScheduler {
183    fn score(&self, workload: &Workload, node: &NodeResources) -> f64 {
184        if workload.resources.gpu_count == 0 {
185            // Fall back to spread for non-GPU workloads
186            let cpu_util = node.cpu_allocated as f64 / node.cpu_capacity as f64;
187            return 1.0 - cpu_util;
188        }
189
190        let available_gpus: Vec<_> = node.gpus.iter()
191            .filter(|g| !node.gpus_allocated.contains(&g.device_id))
192            .collect();
193
194        if available_gpus.is_empty() {
195            return 0.0;
196        }
197
198        let mut score = 0.5; // Base score
199
200        // Score based on GPU memory availability
201        let total_available_mem: u64 = available_gpus.iter()
202            .map(|g| g.available_memory_mb())
203            .sum();
204        let required_mem = workload.resources.gpu_memory_mb * workload.resources.gpu_count as u64;
205        
206        if total_available_mem >= required_mem {
207            score += 0.2;
208        }
209
210        // Bonus for tensor cores
211        if self.prefer_tensor_cores {
212            let tensor_core_count = available_gpus.iter()
213                .filter(|g| g.tensor_cores)
214                .count();
215            score += 0.1 * (tensor_core_count as f64 / available_gpus.len() as f64);
216        }
217
218        // Check compute capability
219        if let Some(min_cc) = self.min_compute_capability {
220            let meets_cc = available_gpus.iter()
221                .all(|g| g.compute_capability.map(|cc| cc >= min_cc).unwrap_or(false));
222            if meets_cc {
223                score += 0.1;
224            } else {
225                score -= 0.2;
226            }
227        }
228
229        // Prefer nodes where GPUs are on same NUMA node (locality)
230        // This is approximated by preferring nodes with contiguous GPU IDs
231        let gpu_ids: Vec<_> = available_gpus.iter().map(|g| g.device_id).collect();
232        if gpu_ids.len() >= workload.resources.gpu_count as usize {
233            let contiguous = gpu_ids.windows(2)
234                .all(|w| w[1] == w[0] + 1);
235            if contiguous {
236                score += 0.1;
237            }
238        }
239
240        score.min(1.0)
241    }
242
243    fn name(&self) -> &str {
244        "gpu-locality"
245    }
246}
247
248/// Adaptive learned scheduler using online learning
249#[derive(Debug)]
250pub struct LearnedScheduler {
251    /// Feature weights learned from feedback
252    weights: parking_lot::RwLock<Vec<f64>>,
253    /// Learning rate
254    learning_rate: f64,
255    /// Number of features
256    num_features: usize,
257    /// Historical performance data
258    history: parking_lot::RwLock<Vec<SchedulingFeedback>>,
259}
260
261/// Feedback for learning
262#[derive(Debug, Clone)]
263pub struct SchedulingFeedback {
264    /// Features used for decision
265    pub features: Vec<f64>,
266    /// Actual performance (0.0 = bad, 1.0 = good)
267    pub performance: f64,
268}
269
270impl LearnedScheduler {
271    /// Create new learned scheduler
272    pub fn new() -> Self {
273        let num_features = 8; // CPU, mem, GPU util, headroom, etc.
274        Self {
275            weights: parking_lot::RwLock::new(vec![0.5; num_features]),
276            learning_rate: 0.01,
277            num_features,
278            history: parking_lot::RwLock::new(Vec::new()),
279        }
280    }
281
282    /// Extract features from workload and node
283    fn extract_features(&self, workload: &Workload, node: &NodeResources) -> Vec<f64> {
284        vec![
285            // Utilization features
286            node.cpu_allocated as f64 / node.cpu_capacity as f64,
287            node.memory_allocated as f64 / node.memory_capacity as f64,
288            if node.gpus.is_empty() { 0.0 } else { node.gpus_allocated.len() as f64 / node.gpus.len() as f64 },
289            
290            // Headroom features
291            (node.cpu_available() as f64 - workload.resources.cpu_millis as f64) / node.cpu_capacity as f64,
292            (node.memory_available() as f64 - workload.resources.memory_mb as f64) / node.memory_capacity as f64,
293            
294            // Workload features
295            workload.priority as f64 / 100.0,
296            if workload.resources.gpu_count > 0 { 1.0 } else { 0.0 },
297            
298            // Node features
299            if node.schedulable { 1.0 } else { 0.0 },
300        ]
301    }
302
303    /// Record feedback for learning
304    pub fn record_feedback(&self, feedback: SchedulingFeedback) {
305        // Online gradient descent update
306        let mut weights = self.weights.write();
307        
308        // Compute prediction
309        let prediction: f64 = feedback.features.iter()
310            .zip(weights.iter())
311            .map(|(f, w)| f * w)
312            .sum();
313        
314        // Compute error
315        let error = feedback.performance - prediction;
316        
317        // Update weights
318        for (i, feature) in feedback.features.iter().enumerate() {
319            weights[i] += self.learning_rate * error * feature;
320            // Clamp weights to reasonable range
321            weights[i] = weights[i].clamp(-2.0, 2.0);
322        }
323
324        // Store in history for batch updates
325        self.history.write().push(feedback);
326    }
327
328    /// Get current weights
329    pub fn weights(&self) -> Vec<f64> {
330        self.weights.read().clone()
331    }
332}
333
334impl Default for LearnedScheduler {
335    fn default() -> Self {
336        Self::new()
337    }
338}
339
340impl SchedulingAlgorithm for LearnedScheduler {
341    fn score(&self, workload: &Workload, node: &NodeResources) -> f64 {
342        let features = self.extract_features(workload, node);
343        let weights = self.weights.read();
344        
345        // Linear combination of features and weights
346        let score: f64 = features.iter()
347            .zip(weights.iter())
348            .map(|(f, w)| f * w)
349            .sum();
350        
351        // Sigmoid to bound output
352        1.0 / (1.0 + (-score).exp())
353    }
354
355    fn name(&self) -> &str {
356        "learned"
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use crate::types::NodeId;
364
365    fn test_node(cpu_alloc: u64, mem_alloc: u64) -> NodeResources {
366        let mut node = NodeResources::new(NodeId::new(), 4000, 8192);
367        node.cpu_allocated = cpu_alloc;
368        node.memory_allocated = mem_alloc;
369        node
370    }
371
372    #[test]
373    fn test_bin_pack_prefers_utilized() {
374        let scheduler = BinPackScheduler::new();
375        let workload = Workload::new("w1", "test");
376
377        let low_util = test_node(1000, 2048);
378        let high_util = test_node(3000, 6144);
379
380        let low_score = scheduler.score(&workload, &low_util);
381        let high_score = scheduler.score(&workload, &high_util);
382
383        assert!(high_score > low_score, "Bin-pack should prefer higher utilization");
384    }
385
386    #[test]
387    fn test_spread_prefers_empty() {
388        let scheduler = SpreadScheduler::new();
389        let workload = Workload::new("w1", "test");
390
391        let low_util = test_node(1000, 2048);
392        let high_util = test_node(3000, 6144);
393
394        let low_score = scheduler.score(&workload, &low_util);
395        let high_score = scheduler.score(&workload, &high_util);
396
397        assert!(low_score > high_score, "Spread should prefer lower utilization");
398    }
399
400    #[test]
401    fn test_learned_scheduler() {
402        let scheduler = LearnedScheduler::new();
403        let workload = Workload::new("w1", "test");
404        let node = test_node(2000, 4096);
405
406        let score = scheduler.score(&workload, &node);
407        assert!(score >= 0.0 && score <= 1.0, "Score should be bounded");
408
409        // Test learning
410        let features = vec![0.5, 0.5, 0.0, 0.25, 0.25, 0.0, 0.0, 1.0];
411        scheduler.record_feedback(SchedulingFeedback {
412            features,
413            performance: 0.9,
414        });
415
416        // Weights should have changed
417        let weights = scheduler.weights();
418        assert!(weights.iter().any(|w| *w != 0.5));
419    }
420}