Skip to main content

orca_control/scheduler/
algo.rs

1use std::collections::HashMap;
2
3use orca_core::types::RuntimeKind;
4
5use super::types::{NodeCapacity, ScheduleAction};
6
7/// A request describing the desired state for one service.
8#[derive(Debug, Clone)]
9pub struct ServiceRequest {
10    pub name: String,
11    pub replicas_desired: u32,
12    pub runtime: RuntimeKind,
13    pub cpu_required: f64,
14    pub memory_required: u64,
15    pub gpu_required: u32,
16    pub placement_labels: HashMap<String, String>,
17    pub requires_gpu: bool,
18}
19
20/// A current replica assignment.
21#[derive(Debug, Clone, PartialEq)]
22pub struct Assignment {
23    pub service: String,
24    pub replica_idx: u32,
25    pub node_id: u64,
26}
27
28/// Pure scheduling function. Given the desired services, available nodes, and
29/// current assignments, returns the list of actions to converge toward the
30/// desired state.
31pub fn schedule(
32    services: &[ServiceRequest],
33    nodes: &[NodeCapacity],
34    assignments: &[Assignment],
35) -> Vec<ScheduleAction> {
36    let mut actions = Vec::new();
37
38    for svc in services {
39        let current: Vec<&Assignment> = assignments
40            .iter()
41            .filter(|a| a.service == svc.name)
42            .collect();
43        let current_count = current.len() as u32;
44        let desired = svc.replicas_desired;
45
46        if desired > current_count {
47            // Scale up: find next replica indices and assign them.
48            let used_indices: Vec<u32> = current.iter().map(|a| a.replica_idx).collect();
49            let assigned_nodes: Vec<u64> = current.iter().map(|a| a.node_id).collect();
50            let new_count = desired - current_count;
51
52            let mut next_idx = 0u32;
53            let mut placed = 0u32;
54            while placed < new_count {
55                // Find next unused replica index.
56                while used_indices.contains(&next_idx) {
57                    next_idx += 1;
58                }
59
60                if let Some(node_id) = pick_best_node(nodes, svc, &assigned_nodes, &actions) {
61                    actions.push(ScheduleAction::Assign {
62                        service: svc.name.clone(),
63                        replica_idx: next_idx,
64                        node_id,
65                    });
66                }
67                next_idx += 1;
68                placed += 1;
69            }
70        } else if desired < current_count {
71            // Scale down: remove replicas from the most-loaded nodes first.
72            let remove_count = current_count - desired;
73            let mut removable: Vec<&Assignment> = current.clone();
74            removable.sort_by(|a, b| {
75                let load_a = node_workload_count(nodes, a.node_id);
76                let load_b = node_workload_count(nodes, b.node_id);
77                load_b.cmp(&load_a) // most-loaded first
78            });
79
80            for assignment in removable.iter().take(remove_count as usize) {
81                actions.push(ScheduleAction::Unassign {
82                    service: assignment.service.clone(),
83                    replica_idx: assignment.replica_idx,
84                    node_id: assignment.node_id,
85                });
86            }
87        }
88    }
89
90    actions
91}
92
93/// Pick the best node for a service replica by scoring all eligible nodes.
94fn pick_best_node(
95    nodes: &[NodeCapacity],
96    svc: &ServiceRequest,
97    already_assigned: &[u64],
98    pending_actions: &[ScheduleAction],
99) -> Option<u64> {
100    let mut best_score = 0.0_f64;
101    let mut best_node = None;
102
103    for node in nodes {
104        let score = score_node(node, svc, already_assigned, pending_actions);
105        if score > best_score {
106            best_score = score;
107            best_node = Some(node.node_id);
108        }
109    }
110
111    best_node
112}
113
114/// Score a node for a service. Returns 0.0 if the node is ineligible.
115fn score_node(
116    node: &NodeCapacity,
117    svc: &ServiceRequest,
118    already_assigned: &[u64],
119    pending_actions: &[ScheduleAction],
120) -> f64 {
121    // Hard constraints: resources.
122    if node.cpu_available < svc.cpu_required {
123        return 0.0;
124    }
125    if node.memory_available < svc.memory_required {
126        return 0.0;
127    }
128    if svc.requires_gpu && node.gpu_count < svc.gpu_required.max(1) {
129        return 0.0;
130    }
131
132    // Hard constraint: placement labels must all match.
133    for (key, value) in &svc.placement_labels {
134        match node.labels.get(key) {
135            Some(v) if v == value => {}
136            _ => return 0.0,
137        }
138    }
139
140    // Base score: weighted sum of available resources (normalized loosely).
141    let mut score = node.cpu_available * 10.0 + node.memory_available as f64 / 1_000_000.0;
142
143    // Bonus for wasm-capable nodes when the service is wasm.
144    if svc.runtime == RuntimeKind::Wasm && node.has_wasm_runtime {
145        score += 50.0;
146    }
147
148    // Penalty if this node already runs a replica of the same service (spread).
149    let existing_count = already_assigned
150        .iter()
151        .filter(|&&id| id == node.node_id)
152        .count();
153    let pending_count = pending_actions
154        .iter()
155        .filter(|a| matches!(a, ScheduleAction::Assign { node_id, service, .. } if *node_id == node.node_id && *service == svc.name))
156        .count();
157    let total_same = existing_count + pending_count;
158    score -= total_same as f64 * 100.0;
159
160    // Penalty for overall node load.
161    score -= node.current_workload_count as f64 * 5.0;
162
163    score
164}
165
166/// Look up a node's current workload count.
167fn node_workload_count(nodes: &[NodeCapacity], node_id: u64) -> u32 {
168    nodes
169        .iter()
170        .find(|n| n.node_id == node_id)
171        .map_or(0, |n| n.current_workload_count)
172}
173
174#[cfg(test)]
175#[path = "algo_tests.rs"]
176mod tests;