forge_orchestration/scheduler/
placement.rs1use super::NodeResources;
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum PlacementConstraint {
14 NodeSelector { key: String, value: String },
16 Zone(String),
18 Region(String),
20 RequiresGpu,
22 GpuModel(String),
24 MinGpuMemory(u64),
26 RequiresTensorCores,
28 Architecture(String),
30 Expression(ConstraintExpression),
32}
33
34impl PlacementConstraint {
35 pub fn matches(&self, node: &NodeResources) -> bool {
37 match self {
38 PlacementConstraint::NodeSelector { key, value } => {
39 node.labels.get(key).map(|v| v == value).unwrap_or(false)
40 }
41 PlacementConstraint::Zone(zone) => {
42 node.labels.get("topology.kubernetes.io/zone")
43 .or_else(|| node.labels.get("zone"))
44 .map(|z| z == zone)
45 .unwrap_or(false)
46 }
47 PlacementConstraint::Region(region) => {
48 node.labels.get("topology.kubernetes.io/region")
49 .or_else(|| node.labels.get("region"))
50 .map(|r| r == region)
51 .unwrap_or(false)
52 }
53 PlacementConstraint::RequiresGpu => {
54 !node.gpus.is_empty() && node.gpus_available() > 0
55 }
56 PlacementConstraint::GpuModel(model) => {
57 node.gpus.iter().any(|g| g.model.contains(model))
58 }
59 PlacementConstraint::MinGpuMemory(min_mb) => {
60 node.gpus.iter().any(|g| g.memory_mb >= *min_mb)
61 }
62 PlacementConstraint::RequiresTensorCores => {
63 node.gpus.iter().any(|g| g.tensor_cores)
64 }
65 PlacementConstraint::Architecture(arch) => {
66 node.labels.get("kubernetes.io/arch")
67 .or_else(|| node.labels.get("arch"))
68 .map(|a| a == arch)
69 .unwrap_or(false)
70 }
71 PlacementConstraint::Expression(expr) => {
72 expr.evaluate(node)
73 }
74 }
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct ConstraintExpression {
81 pub key: String,
83 pub operator: ExpressionOperator,
85 pub values: Vec<String>,
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
91pub enum ExpressionOperator {
92 In,
94 NotIn,
96 Exists,
98 DoesNotExist,
100 Gt,
102 Lt,
104}
105
106impl ConstraintExpression {
107 pub fn evaluate(&self, node: &NodeResources) -> bool {
109 let value = node.labels.get(&self.key);
110
111 match self.operator {
112 ExpressionOperator::In => {
113 value.map(|v| self.values.contains(v)).unwrap_or(false)
114 }
115 ExpressionOperator::NotIn => {
116 value.map(|v| !self.values.contains(v)).unwrap_or(true)
117 }
118 ExpressionOperator::Exists => value.is_some(),
119 ExpressionOperator::DoesNotExist => value.is_none(),
120 ExpressionOperator::Gt => {
121 if let (Some(v), Some(threshold)) = (value, self.values.first()) {
122 v.parse::<i64>().ok()
123 .zip(threshold.parse::<i64>().ok())
124 .map(|(v, t)| v > t)
125 .unwrap_or(false)
126 } else {
127 false
128 }
129 }
130 ExpressionOperator::Lt => {
131 if let (Some(v), Some(threshold)) = (value, self.values.first()) {
132 v.parse::<i64>().ok()
133 .zip(threshold.parse::<i64>().ok())
134 .map(|(v, t)| v < t)
135 .unwrap_or(false)
136 } else {
137 false
138 }
139 }
140 }
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct Affinity {
147 pub node_affinity: Option<NodeAffinity>,
149 pub workload_affinity: Option<WorkloadAffinity>,
151 pub workload_anti_affinity: Option<WorkloadAffinity>,
153}
154
155impl Affinity {
156 pub fn new() -> Self {
158 Self {
159 node_affinity: None,
160 workload_affinity: None,
161 workload_anti_affinity: None,
162 }
163 }
164
165 pub fn with_node_affinity(mut self, affinity: NodeAffinity) -> Self {
167 self.node_affinity = Some(affinity);
168 self
169 }
170
171 pub fn with_workload_affinity(mut self, affinity: WorkloadAffinity) -> Self {
173 self.workload_affinity = Some(affinity);
174 self
175 }
176
177 pub fn with_workload_anti_affinity(mut self, affinity: WorkloadAffinity) -> Self {
179 self.workload_anti_affinity = Some(affinity);
180 self
181 }
182
183 pub fn matches(&self, node: &NodeResources) -> bool {
185 if let Some(node_affinity) = &self.node_affinity {
187 for rule in &node_affinity.required {
189 if !rule.matches(node) {
190 return false;
191 }
192 }
193 }
194
195 true
196 }
197}
198
199impl Default for Affinity {
200 fn default() -> Self {
201 Self::new()
202 }
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct NodeAffinity {
208 pub required: Vec<AffinityRule>,
210 pub preferred: Vec<WeightedAffinityRule>,
212}
213
214impl NodeAffinity {
215 pub fn new() -> Self {
217 Self {
218 required: Vec::new(),
219 preferred: Vec::new(),
220 }
221 }
222
223 pub fn require(mut self, rule: AffinityRule) -> Self {
225 self.required.push(rule);
226 self
227 }
228
229 pub fn prefer(mut self, weight: i32, rule: AffinityRule) -> Self {
231 self.preferred.push(WeightedAffinityRule { weight, rule });
232 self
233 }
234}
235
236impl Default for NodeAffinity {
237 fn default() -> Self {
238 Self::new()
239 }
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct WorkloadAffinity {
245 pub required: Vec<WorkloadAffinityTerm>,
247 pub preferred: Vec<WeightedWorkloadAffinityTerm>,
249}
250
251impl WorkloadAffinity {
252 pub fn new() -> Self {
254 Self {
255 required: Vec::new(),
256 preferred: Vec::new(),
257 }
258 }
259}
260
261impl Default for WorkloadAffinity {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct WorkloadAffinityTerm {
270 pub label_selector: LabelSelector,
272 pub topology_key: String,
274 pub namespaces: Option<Vec<String>>,
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct WeightedWorkloadAffinityTerm {
281 pub weight: i32,
283 pub term: WorkloadAffinityTerm,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct LabelSelector {
290 pub match_labels: std::collections::HashMap<String, String>,
292 pub match_expressions: Vec<ConstraintExpression>,
294}
295
296impl LabelSelector {
297 pub fn new() -> Self {
299 Self {
300 match_labels: std::collections::HashMap::new(),
301 match_expressions: Vec::new(),
302 }
303 }
304
305 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
307 self.match_labels.insert(key.into(), value.into());
308 self
309 }
310
311 pub fn with_expression(mut self, expr: ConstraintExpression) -> Self {
313 self.match_expressions.push(expr);
314 self
315 }
316}
317
318impl Default for LabelSelector {
319 fn default() -> Self {
320 Self::new()
321 }
322}
323
324#[derive(Debug, Clone, Serialize, Deserialize)]
326pub struct AffinityRule {
327 pub match_expressions: Vec<ConstraintExpression>,
329}
330
331impl AffinityRule {
332 pub fn new() -> Self {
334 Self {
335 match_expressions: Vec::new(),
336 }
337 }
338
339 pub fn with_expression(mut self, expr: ConstraintExpression) -> Self {
341 self.match_expressions.push(expr);
342 self
343 }
344
345 pub fn matches(&self, node: &NodeResources) -> bool {
347 self.match_expressions.iter().all(|expr| expr.evaluate(node))
348 }
349}
350
351impl Default for AffinityRule {
352 fn default() -> Self {
353 Self::new()
354 }
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct WeightedAffinityRule {
360 pub weight: i32,
362 pub rule: AffinityRule,
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use crate::types::{NodeId, GpuResources};
370
371 #[test]
372 fn test_node_selector() {
373 let mut node = NodeResources::new(NodeId::new(), 4000, 8192);
374 node.labels.insert("env".to_string(), "production".to_string());
375
376 let constraint = PlacementConstraint::NodeSelector {
377 key: "env".to_string(),
378 value: "production".to_string(),
379 };
380
381 assert!(constraint.matches(&node));
382
383 let wrong_constraint = PlacementConstraint::NodeSelector {
384 key: "env".to_string(),
385 value: "staging".to_string(),
386 };
387
388 assert!(!wrong_constraint.matches(&node));
389 }
390
391 #[test]
392 fn test_gpu_constraints() {
393 let gpu = GpuResources::new(0, "NVIDIA A100", 40960)
394 .with_tensor_cores(true)
395 .with_compute_capability(8.0);
396
397 let node = NodeResources::new(NodeId::new(), 4000, 8192)
398 .with_gpu(gpu);
399
400 assert!(PlacementConstraint::RequiresGpu.matches(&node));
401 assert!(PlacementConstraint::RequiresTensorCores.matches(&node));
402 assert!(PlacementConstraint::GpuModel("A100".to_string()).matches(&node));
403 assert!(PlacementConstraint::MinGpuMemory(40000).matches(&node));
404 assert!(!PlacementConstraint::MinGpuMemory(50000).matches(&node));
405 }
406
407 #[test]
408 fn test_expression_operators() {
409 let mut node = NodeResources::new(NodeId::new(), 4000, 8192);
410 node.labels.insert("tier".to_string(), "frontend".to_string());
411 node.labels.insert("priority".to_string(), "10".to_string());
412
413 let in_expr = ConstraintExpression {
414 key: "tier".to_string(),
415 operator: ExpressionOperator::In,
416 values: vec!["frontend".to_string(), "backend".to_string()],
417 };
418 assert!(in_expr.evaluate(&node));
419
420 let gt_expr = ConstraintExpression {
421 key: "priority".to_string(),
422 operator: ExpressionOperator::Gt,
423 values: vec!["5".to_string()],
424 };
425 assert!(gt_expr.evaluate(&node));
426
427 let exists_expr = ConstraintExpression {
428 key: "tier".to_string(),
429 operator: ExpressionOperator::Exists,
430 values: vec![],
431 };
432 assert!(exists_expr.evaluate(&node));
433 }
434}