Skip to main content

foxtive_worker/strategies/
mod.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3
4/// Load balancing strategy for distributing messages across workers.
5#[derive(Debug, Clone, Copy, Default)]
6pub enum LoadBalancingStrategy {
7    /// Distribute messages in round-robin fashion.
8    #[default]
9    RoundRobin,
10
11    /// Select a random worker for each message.
12    Random,
13
14    /// Select the worker with the least active tasks.
15    LeastLoaded,
16}
17
18/// Round-robin load balancer.
19///
20/// Distributes messages evenly across workers in a circular fashion.
21#[derive(Debug)]
22pub struct RoundRobinBalancer {
23    current: AtomicUsize,
24}
25
26impl RoundRobinBalancer {
27    /// Create a new round-robin balancer.
28    pub fn new() -> Self {
29        Self {
30            current: AtomicUsize::new(0),
31        }
32    }
33
34    /// Get the next worker index.
35    pub fn next(&self, worker_count: usize) -> usize {
36        if worker_count == 0 {
37            return 0;
38        }
39        self.current.fetch_add(1, Ordering::SeqCst) % worker_count
40    }
41}
42
43impl Default for RoundRobinBalancer {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49/// Random load balancer.
50///
51/// Selects a random worker for each message distribution.
52#[derive(Debug, Default)]
53pub struct RandomBalancer;
54
55impl RandomBalancer {
56    /// Create a new random balancer.
57    pub fn new() -> Self {
58        Self
59    }
60
61    /// Get a random worker index.
62    pub fn next(&self, worker_count: usize) -> usize {
63        if worker_count == 0 {
64            return 0;
65        }
66        (rand::random::<u64>() % worker_count as u64) as usize
67    }
68}
69
70/// Least-loaded balancer state.
71///
72/// Tracks the number of active tasks per worker to select the least busy one.
73///
74/// OPTIMIZATION: Uses Arc<Vec<AtomicUsize>> with atomic swap for lock-free updates
75/// when workers are added/removed, avoiding expensive recreation and copy operations.
76#[derive(Debug)]
77pub struct LeastLoadedBalancer {
78    worker_loads: std::sync::RwLock<Arc<Vec<AtomicUsize>>>,
79}
80
81impl LeastLoadedBalancer {
82    /// Create a new least-loaded balancer with the given number of workers.
83    pub fn new(worker_count: usize) -> Self {
84        let loads = (0..worker_count).map(|_| AtomicUsize::new(0)).collect();
85        Self {
86            worker_loads: std::sync::RwLock::new(Arc::new(loads)),
87        }
88    }
89
90    /// Add a new worker slot to the balancer without recreating existing state.
91    ///
92    /// This is an O(1) operation that atomically swaps in a new vector,
93    /// preserving all existing load counts.
94    pub fn add_worker(&self) {
95        let mut current = self.worker_loads.write().unwrap();
96        let mut new_loads = Vec::with_capacity(current.len() + 1);
97
98        // Clone existing Arc<AtomicUsize> references (cheap - just increments ref count)
99        for load in current.iter() {
100            // We need to clone the AtomicUsize itself, not the reference
101            let current_value = load.load(Ordering::Relaxed);
102            new_loads.push(AtomicUsize::new(current_value));
103        }
104
105        // Add new worker with zero load
106        new_loads.push(AtomicUsize::new(0));
107
108        // Atomically swap in the new vector
109        *current = Arc::new(new_loads);
110    }
111
112    /// Get the index of the least loaded worker.
113    pub fn next(&self) -> usize {
114        let loads = self.worker_loads.read().unwrap();
115
116        if loads.is_empty() {
117            return 0;
118        }
119
120        let mut min_load = usize::MAX;
121        let mut min_index = 0;
122
123        for (i, load) in loads.iter().enumerate() {
124            let current_load = load.load(Ordering::Relaxed);
125            if current_load < min_load {
126                min_load = current_load;
127                min_index = i;
128            }
129        }
130
131        min_index
132    }
133
134    /// Increment the load for a worker.
135    pub fn increment_load(&self, worker_index: usize) {
136        let loads = self.worker_loads.read().unwrap();
137        if let Some(load) = loads.get(worker_index) {
138            load.fetch_add(1, Ordering::SeqCst);
139        }
140    }
141
142    /// Decrement the load for a worker.
143    pub fn decrement_load(&self, worker_index: usize) {
144        let loads = self.worker_loads.read().unwrap();
145        if let Some(load) = loads.get(worker_index) {
146            load.fetch_sub(1, Ordering::SeqCst);
147        }
148    }
149
150    /// Get the current load for a worker.
151    pub fn get_load(&self, worker_index: usize) -> usize {
152        let loads = self.worker_loads.read().unwrap();
153        loads
154            .get(worker_index)
155            .map(|load| load.load(Ordering::Relaxed))
156            .unwrap_or(0)
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    #[test]
165    fn test_round_robin_distribution() {
166        let balancer = RoundRobinBalancer::new();
167
168        assert_eq!(balancer.next(3), 0);
169        assert_eq!(balancer.next(3), 1);
170        assert_eq!(balancer.next(3), 2);
171        assert_eq!(balancer.next(3), 0);
172        assert_eq!(balancer.next(3), 1);
173    }
174
175    #[test]
176    fn test_round_robin_single_worker() {
177        let balancer = RoundRobinBalancer::new();
178
179        assert_eq!(balancer.next(1), 0);
180        assert_eq!(balancer.next(1), 0);
181        assert_eq!(balancer.next(1), 0);
182    }
183
184    #[test]
185    fn test_round_robin_zero_workers() {
186        let balancer = RoundRobinBalancer::new();
187        assert_eq!(balancer.next(0), 0);
188    }
189
190    #[test]
191    fn test_random_balancer() {
192        let balancer = RandomBalancer::new();
193
194        // Should return valid indices
195        let idx = balancer.next(5);
196        assert!(idx < 5);
197
198        let idx = balancer.next(1);
199        assert_eq!(idx, 0);
200    }
201
202    #[test]
203    fn test_least_loaded_initial() {
204        let balancer = LeastLoadedBalancer::new(3);
205
206        // All workers start with 0 load, should return first
207        assert_eq!(balancer.next(), 0);
208    }
209
210    #[test]
211    fn test_least_loaded_after_increment() {
212        let balancer = LeastLoadedBalancer::new(3);
213
214        // Increment load on worker 0
215        balancer.increment_load(0);
216        balancer.increment_load(0);
217
218        // Worker 0 has load 2, workers 1 and 2 have load 0
219        // Should return worker 1 (first with minimum load)
220        assert_eq!(balancer.next(), 1);
221    }
222
223    #[test]
224    fn test_least_loaded_load_tracking() {
225        let balancer = LeastLoadedBalancer::new(3);
226
227        balancer.increment_load(0);
228        balancer.increment_load(0);
229        balancer.increment_load(1);
230
231        assert_eq!(balancer.get_load(0), 2);
232        assert_eq!(balancer.get_load(1), 1);
233        assert_eq!(balancer.get_load(2), 0);
234
235        balancer.decrement_load(0);
236        assert_eq!(balancer.get_load(0), 1);
237    }
238
239    #[test]
240    fn test_least_loaded_empty() {
241        let balancer = LeastLoadedBalancer::new(0);
242        assert_eq!(balancer.next(), 0);
243    }
244}