1use async_trait::async_trait;
17use rand::seq::IndexedRandom;
18use std::collections::HashMap;
19use std::hash::{Hash, Hasher};
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::sync::RwLock;
23use tracing::{debug, info, trace, warn};
24use xxhash_rust::xxh3::xxh3_64;
25
26use sentinel_common::errors::{SentinelError, SentinelResult};
27
28use super::{LoadBalancer, RequestContext, TargetSelection, UpstreamTarget};
29
30#[derive(Debug, Clone)]
32pub struct SubsetConfig {
33 pub subset_size: usize,
36 pub proxy_id: String,
39 pub inner_algorithm: SubsetInnerAlgorithm,
41}
42
43impl Default for SubsetConfig {
44 fn default() -> Self {
45 Self {
46 subset_size: 10,
47 proxy_id: format!("proxy-{}", rand::random::<u32>()),
49 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, Default)]
56pub enum SubsetInnerAlgorithm {
57 #[default]
59 RoundRobin,
60 Random,
62 LeastConnections,
64}
65
66pub struct SubsetBalancer {
68 all_targets: Vec<UpstreamTarget>,
70 subset: Arc<RwLock<Vec<UpstreamTarget>>>,
72 current: AtomicUsize,
74 connections: Arc<RwLock<HashMap<String, usize>>>,
76 health_status: Arc<RwLock<HashMap<String, bool>>>,
78 config: SubsetConfig,
80}
81
82impl SubsetBalancer {
83 pub fn new(targets: Vec<UpstreamTarget>, config: SubsetConfig) -> Self {
85 let mut health_status = HashMap::new();
86 let mut connections = HashMap::new();
87
88 for target in &targets {
89 let addr = target.full_address();
90 health_status.insert(addr.clone(), true);
91 connections.insert(addr, 0);
92 }
93
94 let subset = Self::compute_subset(&targets, &config);
95
96 info!(
97 total_targets = targets.len(),
98 subset_size = subset.len(),
99 proxy_id = %config.proxy_id,
100 algorithm = "deterministic_subset",
101 "Created subset balancer"
102 );
103
104 for target in &subset {
105 debug!(
106 target = %target.full_address(),
107 proxy_id = %config.proxy_id,
108 "Target included in subset"
109 );
110 }
111
112 Self {
113 all_targets: targets,
114 subset: Arc::new(RwLock::new(subset)),
115 current: AtomicUsize::new(0),
116 connections: Arc::new(RwLock::new(connections)),
117 health_status: Arc::new(RwLock::new(health_status)),
118 config,
119 }
120 }
121
122 fn compute_subset(targets: &[UpstreamTarget], config: &SubsetConfig) -> Vec<UpstreamTarget> {
124 if targets.is_empty() {
125 return Vec::new();
126 }
127
128 let subset_size = config.subset_size.min(targets.len());
129
130 let mut scored_targets: Vec<_> = targets
132 .iter()
133 .map(|t| {
134 let score = Self::subset_score(&t.full_address(), &config.proxy_id);
135 (t.clone(), score)
136 })
137 .collect();
138
139 scored_targets.sort_by_key(|(_, score)| *score);
141 scored_targets
142 .into_iter()
143 .take(subset_size)
144 .map(|(t, _)| t)
145 .collect()
146 }
147
148 fn subset_score(target_addr: &str, proxy_id: &str) -> u64 {
151 let combined = format!("{}:{}", target_addr, proxy_id);
153 xxh3_64(combined.as_bytes())
154 }
155
156 async fn rebuild_subset_if_needed(&self) {
158 let health = self.health_status.read().await;
159 let current_subset = self.subset.read().await;
160
161 let healthy_in_subset = current_subset
163 .iter()
164 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
165 .count();
166
167 drop(current_subset);
168 drop(health);
169
170 if healthy_in_subset < self.config.subset_size / 2 {
172 self.rebuild_subset().await;
173 }
174 }
175
176 async fn rebuild_subset(&self) {
178 let health = self.health_status.read().await;
179
180 let healthy_targets: Vec<_> = self
182 .all_targets
183 .iter()
184 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
185 .cloned()
186 .collect();
187
188 drop(health);
189
190 if healthy_targets.is_empty() {
191 return;
193 }
194
195 let new_subset = Self::compute_subset(&healthy_targets, &self.config);
197
198 info!(
199 new_subset_size = new_subset.len(),
200 healthy_total = healthy_targets.len(),
201 proxy_id = %self.config.proxy_id,
202 algorithm = "deterministic_subset",
203 "Rebuilt subset from healthy targets"
204 );
205
206 let mut subset = self.subset.write().await;
207 *subset = new_subset;
208 }
209
210 async fn select_from_subset<'a>(
212 &self,
213 healthy: &[&'a UpstreamTarget],
214 ) -> Option<&'a UpstreamTarget> {
215 if healthy.is_empty() {
216 return None;
217 }
218
219 match self.config.inner_algorithm {
220 SubsetInnerAlgorithm::RoundRobin => {
221 let idx = self.current.fetch_add(1, Ordering::Relaxed) % healthy.len();
222 Some(healthy[idx])
223 }
224 SubsetInnerAlgorithm::Random => {
225 use rand::seq::SliceRandom;
226 let mut rng = rand::rng();
227 healthy.choose(&mut rng).copied()
228 }
229 SubsetInnerAlgorithm::LeastConnections => {
230 let conns = self.connections.read().await;
231 healthy
232 .iter()
233 .min_by_key(|t| conns.get(&t.full_address()).copied().unwrap_or(0))
234 .copied()
235 }
236 }
237 }
238}
239
240#[async_trait]
241impl LoadBalancer for SubsetBalancer {
242 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
243 trace!(
244 total_targets = self.all_targets.len(),
245 algorithm = "deterministic_subset",
246 "Selecting upstream target"
247 );
248
249 self.rebuild_subset_if_needed().await;
251
252 let health = self.health_status.read().await;
253 let subset = self.subset.read().await;
254
255 let healthy_subset: Vec<_> = subset
257 .iter()
258 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
259 .collect();
260
261 drop(health);
262
263 if healthy_subset.is_empty() {
264 warn!(
265 subset_size = subset.len(),
266 total_targets = self.all_targets.len(),
267 proxy_id = %self.config.proxy_id,
268 algorithm = "deterministic_subset",
269 "No healthy targets in subset"
270 );
271 drop(subset);
272 return Err(SentinelError::NoHealthyUpstream);
273 }
274
275 let target = self
276 .select_from_subset(&healthy_subset)
277 .await
278 .ok_or(SentinelError::NoHealthyUpstream)?;
279
280 if matches!(
282 self.config.inner_algorithm,
283 SubsetInnerAlgorithm::LeastConnections
284 ) {
285 let mut conns = self.connections.write().await;
286 *conns.entry(target.full_address()).or_insert(0) += 1;
287 }
288
289 trace!(
290 selected_target = %target.full_address(),
291 subset_size = subset.len(),
292 healthy_count = healthy_subset.len(),
293 proxy_id = %self.config.proxy_id,
294 algorithm = "deterministic_subset",
295 "Selected target from subset"
296 );
297
298 Ok(TargetSelection {
299 address: target.full_address(),
300 weight: target.weight,
301 metadata: HashMap::new(),
302 })
303 }
304
305 async fn release(&self, selection: &TargetSelection) {
306 if matches!(
307 self.config.inner_algorithm,
308 SubsetInnerAlgorithm::LeastConnections
309 ) {
310 let mut conns = self.connections.write().await;
311 if let Some(count) = conns.get_mut(&selection.address) {
312 *count = count.saturating_sub(1);
313 }
314 }
315 }
316
317 async fn report_health(&self, address: &str, healthy: bool) {
318 let prev_health = {
319 let health = self.health_status.read().await;
320 *health.get(address).unwrap_or(&true)
321 };
322
323 trace!(
324 target = %address,
325 healthy = healthy,
326 prev_healthy = prev_health,
327 algorithm = "deterministic_subset",
328 "Updating target health status"
329 );
330
331 self.health_status
332 .write()
333 .await
334 .insert(address.to_string(), healthy);
335
336 if prev_health != healthy {
338 self.rebuild_subset_if_needed().await;
339 }
340 }
341
342 async fn healthy_targets(&self) -> Vec<String> {
343 self.health_status
344 .read()
345 .await
346 .iter()
347 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
348 .collect()
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 fn make_targets(count: usize) -> Vec<UpstreamTarget> {
357 (0..count)
358 .map(|i| UpstreamTarget::new(format!("backend-{}", i), 8080, 100))
359 .collect()
360 }
361
362 #[test]
363 fn test_subset_size_limited() {
364 let targets = make_targets(100);
365 let config = SubsetConfig {
366 subset_size: 10,
367 proxy_id: "test-proxy".to_string(),
368 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
369 };
370
371 let balancer = SubsetBalancer::new(targets, config);
372 let subset = balancer.subset.blocking_read();
373 assert_eq!(subset.len(), 10);
374 }
375
376 #[test]
377 fn test_subset_deterministic() {
378 let targets = make_targets(50);
379 let config1 = SubsetConfig {
380 subset_size: 10,
381 proxy_id: "proxy-a".to_string(),
382 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
383 };
384 let config2 = SubsetConfig {
385 subset_size: 10,
386 proxy_id: "proxy-a".to_string(),
387 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
388 };
389
390 let balancer1 = SubsetBalancer::new(targets.clone(), config1);
391 let balancer2 = SubsetBalancer::new(targets, config2);
392
393 let subset1: Vec<_> = balancer1
394 .subset
395 .blocking_read()
396 .iter()
397 .map(|t| t.full_address())
398 .collect();
399 let subset2: Vec<_> = balancer2
400 .subset
401 .blocking_read()
402 .iter()
403 .map(|t| t.full_address())
404 .collect();
405
406 assert_eq!(subset1, subset2);
408 }
409
410 #[test]
411 fn test_different_proxies_get_different_subsets() {
412 let targets = make_targets(50);
413 let config1 = SubsetConfig {
414 subset_size: 10,
415 proxy_id: "proxy-a".to_string(),
416 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
417 };
418 let config2 = SubsetConfig {
419 subset_size: 10,
420 proxy_id: "proxy-b".to_string(),
421 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
422 };
423
424 let balancer1 = SubsetBalancer::new(targets.clone(), config1);
425 let balancer2 = SubsetBalancer::new(targets, config2);
426
427 let subset1: Vec<_> = balancer1
428 .subset
429 .blocking_read()
430 .iter()
431 .map(|t| t.full_address())
432 .collect();
433 let subset2: Vec<_> = balancer2
434 .subset
435 .blocking_read()
436 .iter()
437 .map(|t| t.full_address())
438 .collect();
439
440 assert_ne!(subset1, subset2);
442 }
443
444 #[tokio::test]
445 async fn test_selects_from_subset_only() {
446 let targets = make_targets(50);
447 let config = SubsetConfig {
448 subset_size: 5,
449 proxy_id: "test-proxy".to_string(),
450 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
451 };
452
453 let balancer = SubsetBalancer::new(targets, config);
454
455 let subset_addrs: Vec<_> = balancer
457 .subset
458 .read()
459 .await
460 .iter()
461 .map(|t| t.full_address())
462 .collect();
463
464 for _ in 0..20 {
466 let selection = balancer.select(None).await.unwrap();
467 assert!(
468 subset_addrs.contains(&selection.address),
469 "Selected {} which is not in subset {:?}",
470 selection.address,
471 subset_addrs
472 );
473 }
474 }
475
476 #[test]
477 fn test_even_distribution_across_proxies() {
478 let targets = make_targets(100);
481 let num_proxies = 100;
482 let subset_size = 10;
483
484 let mut backend_counts: HashMap<String, usize> = HashMap::new();
485
486 for i in 0..num_proxies {
487 let config = SubsetConfig {
488 subset_size,
489 proxy_id: format!("proxy-{}", i),
490 inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
491 };
492
493 let subset = SubsetBalancer::compute_subset(&targets, &config);
495
496 for target in subset.iter() {
497 *backend_counts.entry(target.full_address()).or_insert(0) += 1;
498 }
499 }
500
501 let expected = (num_proxies * subset_size) / targets.len();
503
504 let min_count = *backend_counts.values().min().unwrap_or(&0);
507 let max_count = *backend_counts.values().max().unwrap_or(&0);
508
509 assert!(min_count > 0, "Some backends were never selected");
511
512 assert!(
514 max_count <= expected * 3,
515 "Backend received too much traffic: {} (expected ~{})",
516 max_count,
517 expected
518 );
519
520 let mean = (num_proxies * subset_size) as f64 / targets.len() as f64;
523 let variance: f64 = backend_counts
524 .values()
525 .map(|&c| (c as f64 - mean).powi(2))
526 .sum::<f64>()
527 / targets.len() as f64;
528 let std_dev = variance.sqrt();
529
530 assert!(
532 std_dev < mean,
533 "Distribution too uneven: std_dev={:.2}, mean={:.2}",
534 std_dev,
535 mean
536 );
537 }
538}