Skip to main content

foxtive_worker/strategies/
mod.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3
4/// Load balancing strategy for distributing messages across workers.
5#[derive(Debug, Clone, Copy, Default)]
6pub enum LoadBalancingStrategy {
7    /// Distribute messages in round-robin fashion.
8    #[default]
9    RoundRobin,
10    
11    /// Select a random worker for each message.
12    Random,
13    
14    /// Select the worker with the least active tasks.
15    LeastLoaded,
16}
17
18/// Round-robin load balancer.
19///
20/// Distributes messages evenly across workers in a circular fashion.
21#[derive(Debug)]
22pub struct RoundRobinBalancer {
23    current: AtomicUsize,
24}
25
26impl RoundRobinBalancer {
27    /// Create a new round-robin balancer.
28    pub fn new() -> Self {
29        Self {
30            current: AtomicUsize::new(0),
31        }
32    }
33
34    /// Get the next worker index.
35    pub fn next(&self, worker_count: usize) -> usize {
36        if worker_count == 0 {
37            return 0;
38        }
39        self.current.fetch_add(1, Ordering::SeqCst) % worker_count
40    }
41}
42
43impl Default for RoundRobinBalancer {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49/// Random load balancer.
50///
51/// Selects a random worker for each message distribution.
52#[derive(Debug, Default)]
53pub struct RandomBalancer;
54
55impl RandomBalancer {
56    /// Create a new random balancer.
57    pub fn new() -> Self {
58        Self
59    }
60
61    /// Get a random worker index.
62    pub fn next(&self, worker_count: usize) -> usize {
63        if worker_count == 0 {
64            return 0;
65        }
66        (rand::random::<u64>() % worker_count as u64) as usize
67    }
68}
69
70/// Least-loaded balancer state.
71///
72/// Tracks the number of active tasks per worker to select the least busy one.
73/// 
74/// OPTIMIZATION: Uses Arc<Vec<AtomicUsize>> with atomic swap for lock-free updates
75/// when workers are added/removed, avoiding expensive recreation and copy operations.
76#[derive(Debug)]
77pub struct LeastLoadedBalancer {
78    worker_loads: std::sync::RwLock<Arc<Vec<AtomicUsize>>>,
79}
80
81impl LeastLoadedBalancer {
82    /// Create a new least-loaded balancer with the given number of workers.
83    pub fn new(worker_count: usize) -> Self {
84        let loads = (0..worker_count)
85            .map(|_| AtomicUsize::new(0))
86            .collect();
87        Self {
88            worker_loads: std::sync::RwLock::new(Arc::new(loads)),
89        }
90    }
91
92    /// Add a new worker slot to the balancer without recreating existing state.
93    /// 
94    /// This is an O(1) operation that atomically swaps in a new vector,
95    /// preserving all existing load counts.
96    pub fn add_worker(&self) {
97        let mut current = self.worker_loads.write().unwrap();
98        let mut new_loads = Vec::with_capacity(current.len() + 1);
99        
100        // Clone existing Arc<AtomicUsize> references (cheap - just increments ref count)
101        for load in current.iter() {
102            // We need to clone the AtomicUsize itself, not the reference
103            let current_value = load.load(Ordering::Relaxed);
104            new_loads.push(AtomicUsize::new(current_value));
105        }
106        
107        // Add new worker with zero load
108        new_loads.push(AtomicUsize::new(0));
109        
110        // Atomically swap in the new vector
111        *current = Arc::new(new_loads);
112    }
113
114    /// Get the index of the least loaded worker.
115    pub fn next(&self) -> usize {
116        let loads = self.worker_loads.read().unwrap();
117        
118        if loads.is_empty() {
119            return 0;
120        }
121
122        let mut min_load = usize::MAX;
123        let mut min_index = 0;
124
125        for (i, load) in loads.iter().enumerate() {
126            let current_load = load.load(Ordering::Relaxed);
127            if current_load < min_load {
128                min_load = current_load;
129                min_index = i;
130            }
131        }
132
133        min_index
134    }
135
136    /// Increment the load for a worker.
137    pub fn increment_load(&self, worker_index: usize) {
138        let loads = self.worker_loads.read().unwrap();
139        if let Some(load) = loads.get(worker_index) {
140            load.fetch_add(1, Ordering::SeqCst);
141        }
142    }
143
144    /// Decrement the load for a worker.
145    pub fn decrement_load(&self, worker_index: usize) {
146        let loads = self.worker_loads.read().unwrap();
147        if let Some(load) = loads.get(worker_index) {
148            load.fetch_sub(1, Ordering::SeqCst);
149        }
150    }
151
152    /// Get the current load for a worker.
153    pub fn get_load(&self, worker_index: usize) -> usize {
154        let loads = self.worker_loads.read().unwrap();
155        loads
156            .get(worker_index)
157            .map(|load| load.load(Ordering::Relaxed))
158            .unwrap_or(0)
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn test_round_robin_distribution() {
168        let balancer = RoundRobinBalancer::new();
169        
170        assert_eq!(balancer.next(3), 0);
171        assert_eq!(balancer.next(3), 1);
172        assert_eq!(balancer.next(3), 2);
173        assert_eq!(balancer.next(3), 0);
174        assert_eq!(balancer.next(3), 1);
175    }
176
177    #[test]
178    fn test_round_robin_single_worker() {
179        let balancer = RoundRobinBalancer::new();
180        
181        assert_eq!(balancer.next(1), 0);
182        assert_eq!(balancer.next(1), 0);
183        assert_eq!(balancer.next(1), 0);
184    }
185
186    #[test]
187    fn test_round_robin_zero_workers() {
188        let balancer = RoundRobinBalancer::new();
189        assert_eq!(balancer.next(0), 0);
190    }
191
192    #[test]
193    fn test_random_balancer() {
194        let balancer = RandomBalancer::new();
195        
196        // Should return valid indices
197        let idx = balancer.next(5);
198        assert!(idx < 5);
199        
200        let idx = balancer.next(1);
201        assert_eq!(idx, 0);
202    }
203
204    #[test]
205    fn test_least_loaded_initial() {
206        let balancer = LeastLoadedBalancer::new(3);
207        
208        // All workers start with 0 load, should return first
209        assert_eq!(balancer.next(), 0);
210    }
211
212    #[test]
213    fn test_least_loaded_after_increment() {
214        let balancer = LeastLoadedBalancer::new(3);
215        
216        // Increment load on worker 0
217        balancer.increment_load(0);
218        balancer.increment_load(0);
219        
220        // Worker 0 has load 2, workers 1 and 2 have load 0
221        // Should return worker 1 (first with minimum load)
222        assert_eq!(balancer.next(), 1);
223    }
224
225    #[test]
226    fn test_least_loaded_load_tracking() {
227        let balancer = LeastLoadedBalancer::new(3);
228        
229        balancer.increment_load(0);
230        balancer.increment_load(0);
231        balancer.increment_load(1);
232        
233        assert_eq!(balancer.get_load(0), 2);
234        assert_eq!(balancer.get_load(1), 1);
235        assert_eq!(balancer.get_load(2), 0);
236        
237        balancer.decrement_load(0);
238        assert_eq!(balancer.get_load(0), 1);
239    }
240
241    #[test]
242    fn test_least_loaded_empty() {
243        let balancer = LeastLoadedBalancer::new(0);
244        assert_eq!(balancer.next(), 0);
245    }
246}