1use async_trait::async_trait;
17use std::collections::HashMap;
18use std::hash::{Hash, Hasher};
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use tracing::{debug, info, trace, warn};
23use xxhash_rust::xxh3::xxh3_64;
24
25use sentinel_common::errors::{SentinelError, SentinelResult};
26
27use super::{LoadBalancer, RequestContext, TargetSelection, UpstreamTarget};
28
29#[derive(Debug, Clone)]
31pub struct SubsetConfig {
32 pub subset_size: usize,
35 pub proxy_id: String,
38 pub inner_algorithm: SubsetInnerAlgorithm,
40}
41
42impl Default for SubsetConfig {
43 fn default() -> Self {
44 Self {
45 subset_size: 10,
46 proxy_id: format!("proxy-{}", rand::random::<u32>()),
48 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
49 }
50 }
51}
52
53#[derive(Debug, Clone, Copy, Default)]
55pub enum SubsetInnerAlgorithm {
56 #[default]
58 RoundRobin,
59 Random,
61 LeastConnections,
63}
64
65pub struct SubsetBalancer {
67 all_targets: Vec<UpstreamTarget>,
69 subset: Arc<RwLock<Vec<UpstreamTarget>>>,
71 current: AtomicUsize,
73 connections: Arc<RwLock<HashMap<String, usize>>>,
75 health_status: Arc<RwLock<HashMap<String, bool>>>,
77 config: SubsetConfig,
79}
80
81impl SubsetBalancer {
82 pub fn new(targets: Vec<UpstreamTarget>, config: SubsetConfig) -> Self {
84 let mut health_status = HashMap::new();
85 let mut connections = HashMap::new();
86
87 for target in &targets {
88 let addr = target.full_address();
89 health_status.insert(addr.clone(), true);
90 connections.insert(addr, 0);
91 }
92
93 let subset = Self::compute_subset(&targets, &config);
94
95 info!(
96 total_targets = targets.len(),
97 subset_size = subset.len(),
98 proxy_id = %config.proxy_id,
99 algorithm = "deterministic_subset",
100 "Created subset balancer"
101 );
102
103 for target in &subset {
104 debug!(
105 target = %target.full_address(),
106 proxy_id = %config.proxy_id,
107 "Target included in subset"
108 );
109 }
110
111 Self {
112 all_targets: targets,
113 subset: Arc::new(RwLock::new(subset)),
114 current: AtomicUsize::new(0),
115 connections: Arc::new(RwLock::new(connections)),
116 health_status: Arc::new(RwLock::new(health_status)),
117 config,
118 }
119 }
120
121 fn compute_subset(targets: &[UpstreamTarget], config: &SubsetConfig) -> Vec<UpstreamTarget> {
123 if targets.is_empty() {
124 return Vec::new();
125 }
126
127 let subset_size = config.subset_size.min(targets.len());
128
129 let mut scored_targets: Vec<_> = targets
131 .iter()
132 .map(|t| {
133 let score = Self::subset_score(&t.full_address(), &config.proxy_id);
134 (t.clone(), score)
135 })
136 .collect();
137
138 scored_targets.sort_by_key(|(_, score)| *score);
140 scored_targets
141 .into_iter()
142 .take(subset_size)
143 .map(|(t, _)| t)
144 .collect()
145 }
146
147 fn subset_score(target_addr: &str, proxy_id: &str) -> u64 {
150 let combined = format!("{}:{}", target_addr, proxy_id);
152 xxh3_64(combined.as_bytes())
153 }
154
155 async fn rebuild_subset_if_needed(&self) {
157 let health = self.health_status.read().await;
158 let current_subset = self.subset.read().await;
159
160 let healthy_in_subset = current_subset
162 .iter()
163 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
164 .count();
165
166 drop(current_subset);
167 drop(health);
168
169 if healthy_in_subset < self.config.subset_size / 2 {
171 self.rebuild_subset().await;
172 }
173 }
174
175 async fn rebuild_subset(&self) {
177 let health = self.health_status.read().await;
178
179 let healthy_targets: Vec<_> = self
181 .all_targets
182 .iter()
183 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
184 .cloned()
185 .collect();
186
187 drop(health);
188
189 if healthy_targets.is_empty() {
190 return;
192 }
193
194 let new_subset = Self::compute_subset(&healthy_targets, &self.config);
196
197 info!(
198 new_subset_size = new_subset.len(),
199 healthy_total = healthy_targets.len(),
200 proxy_id = %self.config.proxy_id,
201 algorithm = "deterministic_subset",
202 "Rebuilt subset from healthy targets"
203 );
204
205 let mut subset = self.subset.write().await;
206 *subset = new_subset;
207 }
208
209 async fn select_from_subset<'a>(
211 &self,
212 healthy: &[&'a UpstreamTarget],
213 ) -> Option<&'a UpstreamTarget> {
214 if healthy.is_empty() {
215 return None;
216 }
217
218 match self.config.inner_algorithm {
219 SubsetInnerAlgorithm::RoundRobin => {
220 let idx = self.current.fetch_add(1, Ordering::Relaxed) % healthy.len();
221 Some(healthy[idx])
222 }
223 SubsetInnerAlgorithm::Random => {
224 use rand::seq::SliceRandom;
225 let mut rng = rand::thread_rng();
226 healthy.choose(&mut rng).copied()
227 }
228 SubsetInnerAlgorithm::LeastConnections => {
229 let conns = self.connections.read().await;
230 healthy
231 .iter()
232 .min_by_key(|t| conns.get(&t.full_address()).copied().unwrap_or(0))
233 .copied()
234 }
235 }
236 }
237}
238
239#[async_trait]
240impl LoadBalancer for SubsetBalancer {
241 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
242 trace!(
243 total_targets = self.all_targets.len(),
244 algorithm = "deterministic_subset",
245 "Selecting upstream target"
246 );
247
248 self.rebuild_subset_if_needed().await;
250
251 let health = self.health_status.read().await;
252 let subset = self.subset.read().await;
253
254 let healthy_subset: Vec<_> = subset
256 .iter()
257 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
258 .collect();
259
260 drop(health);
261
262 if healthy_subset.is_empty() {
263 warn!(
264 subset_size = subset.len(),
265 total_targets = self.all_targets.len(),
266 proxy_id = %self.config.proxy_id,
267 algorithm = "deterministic_subset",
268 "No healthy targets in subset"
269 );
270 drop(subset);
271 return Err(SentinelError::NoHealthyUpstream);
272 }
273
274 let target = self
275 .select_from_subset(&healthy_subset)
276 .await
277 .ok_or(SentinelError::NoHealthyUpstream)?;
278
279 if matches!(
281 self.config.inner_algorithm,
282 SubsetInnerAlgorithm::LeastConnections
283 ) {
284 let mut conns = self.connections.write().await;
285 *conns.entry(target.full_address()).or_insert(0) += 1;
286 }
287
288 trace!(
289 selected_target = %target.full_address(),
290 subset_size = subset.len(),
291 healthy_count = healthy_subset.len(),
292 proxy_id = %self.config.proxy_id,
293 algorithm = "deterministic_subset",
294 "Selected target from subset"
295 );
296
297 Ok(TargetSelection {
298 address: target.full_address(),
299 weight: target.weight,
300 metadata: HashMap::new(),
301 })
302 }
303
304 async fn release(&self, selection: &TargetSelection) {
305 if matches!(
306 self.config.inner_algorithm,
307 SubsetInnerAlgorithm::LeastConnections
308 ) {
309 let mut conns = self.connections.write().await;
310 if let Some(count) = conns.get_mut(&selection.address) {
311 *count = count.saturating_sub(1);
312 }
313 }
314 }
315
316 async fn report_health(&self, address: &str, healthy: bool) {
317 let prev_health = {
318 let health = self.health_status.read().await;
319 *health.get(address).unwrap_or(&true)
320 };
321
322 trace!(
323 target = %address,
324 healthy = healthy,
325 prev_healthy = prev_health,
326 algorithm = "deterministic_subset",
327 "Updating target health status"
328 );
329
330 self.health_status
331 .write()
332 .await
333 .insert(address.to_string(), healthy);
334
335 if prev_health != healthy {
337 self.rebuild_subset_if_needed().await;
338 }
339 }
340
341 async fn healthy_targets(&self) -> Vec<String> {
342 self.health_status
343 .read()
344 .await
345 .iter()
346 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
347 .collect()
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 fn make_targets(count: usize) -> Vec<UpstreamTarget> {
356 (0..count)
357 .map(|i| UpstreamTarget::new(format!("backend-{}", i), 8080, 100))
358 .collect()
359 }
360
361 #[test]
362 fn test_subset_size_limited() {
363 let targets = make_targets(100);
364 let config = SubsetConfig {
365 subset_size: 10,
366 proxy_id: "test-proxy".to_string(),
367 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
368 };
369
370 let balancer = SubsetBalancer::new(targets, config);
371 let subset = balancer.subset.blocking_read();
372 assert_eq!(subset.len(), 10);
373 }
374
375 #[test]
376 fn test_subset_deterministic() {
377 let targets = make_targets(50);
378 let config1 = SubsetConfig {
379 subset_size: 10,
380 proxy_id: "proxy-a".to_string(),
381 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
382 };
383 let config2 = SubsetConfig {
384 subset_size: 10,
385 proxy_id: "proxy-a".to_string(),
386 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
387 };
388
389 let balancer1 = SubsetBalancer::new(targets.clone(), config1);
390 let balancer2 = SubsetBalancer::new(targets, config2);
391
392 let subset1: Vec<_> = balancer1
393 .subset
394 .blocking_read()
395 .iter()
396 .map(|t| t.full_address())
397 .collect();
398 let subset2: Vec<_> = balancer2
399 .subset
400 .blocking_read()
401 .iter()
402 .map(|t| t.full_address())
403 .collect();
404
405 assert_eq!(subset1, subset2);
407 }
408
409 #[test]
410 fn test_different_proxies_get_different_subsets() {
411 let targets = make_targets(50);
412 let config1 = SubsetConfig {
413 subset_size: 10,
414 proxy_id: "proxy-a".to_string(),
415 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
416 };
417 let config2 = SubsetConfig {
418 subset_size: 10,
419 proxy_id: "proxy-b".to_string(),
420 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
421 };
422
423 let balancer1 = SubsetBalancer::new(targets.clone(), config1);
424 let balancer2 = SubsetBalancer::new(targets, config2);
425
426 let subset1: Vec<_> = balancer1
427 .subset
428 .blocking_read()
429 .iter()
430 .map(|t| t.full_address())
431 .collect();
432 let subset2: Vec<_> = balancer2
433 .subset
434 .blocking_read()
435 .iter()
436 .map(|t| t.full_address())
437 .collect();
438
439 assert_ne!(subset1, subset2);
441 }
442
443 #[tokio::test]
444 async fn test_selects_from_subset_only() {
445 let targets = make_targets(50);
446 let config = SubsetConfig {
447 subset_size: 5,
448 proxy_id: "test-proxy".to_string(),
449 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
450 };
451
452 let balancer = SubsetBalancer::new(targets, config);
453
454 let subset_addrs: Vec<_> = balancer
456 .subset
457 .read()
458 .await
459 .iter()
460 .map(|t| t.full_address())
461 .collect();
462
463 for _ in 0..20 {
465 let selection = balancer.select(None).await.unwrap();
466 assert!(
467 subset_addrs.contains(&selection.address),
468 "Selected {} which is not in subset {:?}",
469 selection.address,
470 subset_addrs
471 );
472 }
473 }
474
475 #[test]
476 fn test_even_distribution_across_proxies() {
477 let targets = make_targets(100);
480 let num_proxies = 100;
481 let subset_size = 10;
482
483 let mut backend_counts: HashMap<String, usize> = HashMap::new();
484
485 for i in 0..num_proxies {
486 let config = SubsetConfig {
487 subset_size,
488 proxy_id: format!("proxy-{}", i),
489 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
490 };
491
492 let subset = SubsetBalancer::compute_subset(&targets, &config);
494
495 for target in subset.iter() {
496 *backend_counts.entry(target.full_address()).or_insert(0) += 1;
497 }
498 }
499
500 let expected = (num_proxies * subset_size) / targets.len();
502
503 let min_count = *backend_counts.values().min().unwrap_or(&0);
506 let max_count = *backend_counts.values().max().unwrap_or(&0);
507
508 assert!(min_count > 0, "Some backends were never selected");
510
511 assert!(
513 max_count <= expected * 3,
514 "Backend received too much traffic: {} (expected ~{})",
515 max_count,
516 expected
517 );
518
519 let mean = (num_proxies * subset_size) as f64 / targets.len() as f64;
522 let variance: f64 = backend_counts
523 .values()
524 .map(|&c| (c as f64 - mean).powi(2))
525 .sum::<f64>()
526 / targets.len() as f64;
527 let std_dev = variance.sqrt();
528
529 assert!(
531 std_dev < mean,
532 "Distribution too uneven: std_dev={:.2}, mean={:.2}",
533 std_dev,
534 mean
535 );
536 }
537}