orca_control/scheduler/
algo.rs1use std::collections::HashMap;
2
3use orca_core::types::RuntimeKind;
4
5use super::types::{NodeCapacity, ScheduleAction};
6
7#[derive(Debug, Clone)]
9pub struct ServiceRequest {
10 pub name: String,
11 pub replicas_desired: u32,
12 pub runtime: RuntimeKind,
13 pub cpu_required: f64,
14 pub memory_required: u64,
15 pub gpu_required: u32,
16 pub placement_labels: HashMap<String, String>,
17 pub requires_gpu: bool,
18}
19
20#[derive(Debug, Clone, PartialEq)]
22pub struct Assignment {
23 pub service: String,
24 pub replica_idx: u32,
25 pub node_id: u64,
26}
27
28pub fn schedule(
32 services: &[ServiceRequest],
33 nodes: &[NodeCapacity],
34 assignments: &[Assignment],
35) -> Vec<ScheduleAction> {
36 let mut actions = Vec::new();
37
38 for svc in services {
39 let current: Vec<&Assignment> = assignments
40 .iter()
41 .filter(|a| a.service == svc.name)
42 .collect();
43 let current_count = current.len() as u32;
44 let desired = svc.replicas_desired;
45
46 if desired > current_count {
47 let used_indices: Vec<u32> = current.iter().map(|a| a.replica_idx).collect();
49 let assigned_nodes: Vec<u64> = current.iter().map(|a| a.node_id).collect();
50 let new_count = desired - current_count;
51
52 let mut next_idx = 0u32;
53 let mut placed = 0u32;
54 while placed < new_count {
55 while used_indices.contains(&next_idx) {
57 next_idx += 1;
58 }
59
60 if let Some(node_id) = pick_best_node(nodes, svc, &assigned_nodes, &actions) {
61 actions.push(ScheduleAction::Assign {
62 service: svc.name.clone(),
63 replica_idx: next_idx,
64 node_id,
65 });
66 }
67 next_idx += 1;
68 placed += 1;
69 }
70 } else if desired < current_count {
71 let remove_count = current_count - desired;
73 let mut removable: Vec<&Assignment> = current.clone();
74 removable.sort_by(|a, b| {
75 let load_a = node_workload_count(nodes, a.node_id);
76 let load_b = node_workload_count(nodes, b.node_id);
77 load_b.cmp(&load_a) });
79
80 for assignment in removable.iter().take(remove_count as usize) {
81 actions.push(ScheduleAction::Unassign {
82 service: assignment.service.clone(),
83 replica_idx: assignment.replica_idx,
84 node_id: assignment.node_id,
85 });
86 }
87 }
88 }
89
90 actions
91}
92
93fn pick_best_node(
95 nodes: &[NodeCapacity],
96 svc: &ServiceRequest,
97 already_assigned: &[u64],
98 pending_actions: &[ScheduleAction],
99) -> Option<u64> {
100 let mut best_score = 0.0_f64;
101 let mut best_node = None;
102
103 for node in nodes {
104 let score = score_node(node, svc, already_assigned, pending_actions);
105 if score > best_score {
106 best_score = score;
107 best_node = Some(node.node_id);
108 }
109 }
110
111 best_node
112}
113
114fn score_node(
116 node: &NodeCapacity,
117 svc: &ServiceRequest,
118 already_assigned: &[u64],
119 pending_actions: &[ScheduleAction],
120) -> f64 {
121 if node.cpu_available < svc.cpu_required {
123 return 0.0;
124 }
125 if node.memory_available < svc.memory_required {
126 return 0.0;
127 }
128 if svc.requires_gpu && node.gpu_count < svc.gpu_required.max(1) {
129 return 0.0;
130 }
131
132 for (key, value) in &svc.placement_labels {
134 match node.labels.get(key) {
135 Some(v) if v == value => {}
136 _ => return 0.0,
137 }
138 }
139
140 let mut score = node.cpu_available * 10.0 + node.memory_available as f64 / 1_000_000.0;
142
143 if svc.runtime == RuntimeKind::Wasm && node.has_wasm_runtime {
145 score += 50.0;
146 }
147
148 let existing_count = already_assigned
150 .iter()
151 .filter(|&&id| id == node.node_id)
152 .count();
153 let pending_count = pending_actions
154 .iter()
155 .filter(|a| matches!(a, ScheduleAction::Assign { node_id, service, .. } if *node_id == node.node_id && *service == svc.name))
156 .count();
157 let total_same = existing_count + pending_count;
158 score -= total_same as f64 * 100.0;
159
160 score -= node.current_workload_count as f64 * 5.0;
162
163 score
164}
165
166fn node_workload_count(nodes: &[NodeCapacity], node_id: u64) -> u32 {
168 nodes
169 .iter()
170 .find(|n| n.node_id == node_id)
171 .map_or(0, |n| n.current_workload_count)
172}
173
174#[cfg(test)]
175#[path = "algo_tests.rs"]
176mod tests;