sentinel_proxy/upstream/
subset.rs

1//! Deterministic Subsetting load balancer
2//!
3//! For very large clusters (1000+ backends), maintaining connections to all
4//! backends is expensive. Deterministic subsetting limits each proxy instance
5//! to a subset of backends while ensuring:
6//!
7//! 1. Each backend gets roughly equal traffic across all proxies
8//! 2. The subset is stable (same proxy always uses same subset)
9//! 3. Subset membership is deterministic (based on proxy ID)
10//!
11//! The algorithm uses consistent hashing to assign backends to subsets,
12//! ensuring minimal disruption when backends are added or removed.
13//!
14//! Reference: https://sre.google/sre-book/load-balancing-datacenter/
15
16use async_trait::async_trait;
17use std::collections::HashMap;
18use std::hash::{Hash, Hasher};
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use tracing::{debug, info, trace, warn};
23use xxhash_rust::xxh3::xxh3_64;
24
25use sentinel_common::errors::{SentinelError, SentinelResult};
26
27use super::{LoadBalancer, RequestContext, TargetSelection, UpstreamTarget};
28
29/// Configuration for Deterministic Subsetting
30#[derive(Debug, Clone)]
31pub struct SubsetConfig {
32    /// Number of backends in each subset (default: 10)
33    /// Smaller subsets reduce connection overhead but may impact load distribution
34    pub subset_size: usize,
35    /// Unique identifier for this proxy instance
36    /// Used to deterministically select which subset this proxy uses
37    pub proxy_id: String,
38    /// Inner load balancing algorithm for selecting within the subset
39    pub inner_algorithm: SubsetInnerAlgorithm,
40}
41
42impl Default for SubsetConfig {
43    fn default() -> Self {
44        Self {
45            subset_size: 10,
46            // Default to a random proxy ID (each instance gets unique subset)
47            proxy_id: format!("proxy-{}", rand::random::<u32>()),
48            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
49        }
50    }
51}
52
53/// Inner algorithm for selecting within the subset
54#[derive(Debug, Clone, Copy, Default)]
55pub enum SubsetInnerAlgorithm {
56    /// Round-robin within subset (default)
57    #[default]
58    RoundRobin,
59    /// Random selection within subset
60    Random,
61    /// Least connections within subset
62    LeastConnections,
63}
64
65/// Deterministic Subsetting load balancer
66pub struct SubsetBalancer {
67    /// Full list of all targets (for subset calculation)
68    all_targets: Vec<UpstreamTarget>,
69    /// Current subset of targets for this proxy
70    subset: Arc<RwLock<Vec<UpstreamTarget>>>,
71    /// Round-robin counter for inner algorithm
72    current: AtomicUsize,
73    /// Connection counts per target (for least connections)
74    connections: Arc<RwLock<HashMap<String, usize>>>,
75    /// Health status per target
76    health_status: Arc<RwLock<HashMap<String, bool>>>,
77    /// Configuration
78    config: SubsetConfig,
79}
80
81impl SubsetBalancer {
82    /// Create a new Subset balancer
83    pub fn new(targets: Vec<UpstreamTarget>, config: SubsetConfig) -> Self {
84        let mut health_status = HashMap::new();
85        let mut connections = HashMap::new();
86
87        for target in &targets {
88            let addr = target.full_address();
89            health_status.insert(addr.clone(), true);
90            connections.insert(addr, 0);
91        }
92
93        let subset = Self::compute_subset(&targets, &config);
94
95        info!(
96            total_targets = targets.len(),
97            subset_size = subset.len(),
98            proxy_id = %config.proxy_id,
99            algorithm = "deterministic_subset",
100            "Created subset balancer"
101        );
102
103        for target in &subset {
104            debug!(
105                target = %target.full_address(),
106                proxy_id = %config.proxy_id,
107                "Target included in subset"
108            );
109        }
110
111        Self {
112            all_targets: targets,
113            subset: Arc::new(RwLock::new(subset)),
114            current: AtomicUsize::new(0),
115            connections: Arc::new(RwLock::new(connections)),
116            health_status: Arc::new(RwLock::new(health_status)),
117            config,
118        }
119    }
120
121    /// Compute the subset of targets for this proxy instance
122    fn compute_subset(targets: &[UpstreamTarget], config: &SubsetConfig) -> Vec<UpstreamTarget> {
123        if targets.is_empty() {
124            return Vec::new();
125        }
126
127        let subset_size = config.subset_size.min(targets.len());
128
129        // Hash each target to get a score relative to this proxy
130        let mut scored_targets: Vec<_> = targets
131            .iter()
132            .map(|t| {
133                let score = Self::subset_score(&t.full_address(), &config.proxy_id);
134                (t.clone(), score)
135            })
136            .collect();
137
138        // Sort by score and take the top N
139        scored_targets.sort_by_key(|(_, score)| *score);
140        scored_targets
141            .into_iter()
142            .take(subset_size)
143            .map(|(t, _)| t)
144            .collect()
145    }
146
147    /// Calculate a deterministic score for a target-proxy pair
148    /// Lower scores mean the target is "closer" to this proxy
149    fn subset_score(target_addr: &str, proxy_id: &str) -> u64 {
150        // Combine target and proxy identifiers
151        let combined = format!("{}:{}", target_addr, proxy_id);
152        xxh3_64(combined.as_bytes())
153    }
154
155    /// Rebuild subset when health changes significantly
156    async fn rebuild_subset_if_needed(&self) {
157        let health = self.health_status.read().await;
158        let current_subset = self.subset.read().await;
159
160        // Count healthy targets in current subset
161        let healthy_in_subset = current_subset
162            .iter()
163            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
164            .count();
165
166        drop(current_subset);
167        drop(health);
168
169        // If less than half the subset is healthy, try to rebuild
170        if healthy_in_subset < self.config.subset_size / 2 {
171            self.rebuild_subset().await;
172        }
173    }
174
175    /// Rebuild the subset considering health status
176    async fn rebuild_subset(&self) {
177        let health = self.health_status.read().await;
178
179        // Get all healthy targets
180        let healthy_targets: Vec<_> = self
181            .all_targets
182            .iter()
183            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
184            .cloned()
185            .collect();
186
187        drop(health);
188
189        if healthy_targets.is_empty() {
190            // Keep current subset as fallback
191            return;
192        }
193
194        // Recompute subset from healthy targets
195        let new_subset = Self::compute_subset(&healthy_targets, &self.config);
196
197        info!(
198            new_subset_size = new_subset.len(),
199            healthy_total = healthy_targets.len(),
200            proxy_id = %self.config.proxy_id,
201            algorithm = "deterministic_subset",
202            "Rebuilt subset from healthy targets"
203        );
204
205        let mut subset = self.subset.write().await;
206        *subset = new_subset;
207    }
208
209    /// Select using inner algorithm
210    async fn select_from_subset<'a>(
211        &self,
212        healthy: &[&'a UpstreamTarget],
213    ) -> Option<&'a UpstreamTarget> {
214        if healthy.is_empty() {
215            return None;
216        }
217
218        match self.config.inner_algorithm {
219            SubsetInnerAlgorithm::RoundRobin => {
220                let idx = self.current.fetch_add(1, Ordering::Relaxed) % healthy.len();
221                Some(healthy[idx])
222            }
223            SubsetInnerAlgorithm::Random => {
224                use rand::seq::SliceRandom;
225                let mut rng = rand::thread_rng();
226                healthy.choose(&mut rng).copied()
227            }
228            SubsetInnerAlgorithm::LeastConnections => {
229                let conns = self.connections.read().await;
230                healthy
231                    .iter()
232                    .min_by_key(|t| conns.get(&t.full_address()).copied().unwrap_or(0))
233                    .copied()
234            }
235        }
236    }
237}
238
239#[async_trait]
240impl LoadBalancer for SubsetBalancer {
241    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
242        trace!(
243            total_targets = self.all_targets.len(),
244            algorithm = "deterministic_subset",
245            "Selecting upstream target"
246        );
247
248        // Check if we need to rebuild subset
249        self.rebuild_subset_if_needed().await;
250
251        let health = self.health_status.read().await;
252        let subset = self.subset.read().await;
253
254        // Get healthy targets from our subset
255        let healthy_subset: Vec<_> = subset
256            .iter()
257            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
258            .collect();
259
260        drop(health);
261
262        if healthy_subset.is_empty() {
263            warn!(
264                subset_size = subset.len(),
265                total_targets = self.all_targets.len(),
266                proxy_id = %self.config.proxy_id,
267                algorithm = "deterministic_subset",
268                "No healthy targets in subset"
269            );
270            drop(subset);
271            return Err(SentinelError::NoHealthyUpstream);
272        }
273
274        let target = self
275            .select_from_subset(&healthy_subset)
276            .await
277            .ok_or(SentinelError::NoHealthyUpstream)?;
278
279        // Track connections if using least connections
280        if matches!(
281            self.config.inner_algorithm,
282            SubsetInnerAlgorithm::LeastConnections
283        ) {
284            let mut conns = self.connections.write().await;
285            *conns.entry(target.full_address()).or_insert(0) += 1;
286        }
287
288        trace!(
289            selected_target = %target.full_address(),
290            subset_size = subset.len(),
291            healthy_count = healthy_subset.len(),
292            proxy_id = %self.config.proxy_id,
293            algorithm = "deterministic_subset",
294            "Selected target from subset"
295        );
296
297        Ok(TargetSelection {
298            address: target.full_address(),
299            weight: target.weight,
300            metadata: HashMap::new(),
301        })
302    }
303
304    async fn release(&self, selection: &TargetSelection) {
305        if matches!(
306            self.config.inner_algorithm,
307            SubsetInnerAlgorithm::LeastConnections
308        ) {
309            let mut conns = self.connections.write().await;
310            if let Some(count) = conns.get_mut(&selection.address) {
311                *count = count.saturating_sub(1);
312            }
313        }
314    }
315
316    async fn report_health(&self, address: &str, healthy: bool) {
317        let prev_health = {
318            let health = self.health_status.read().await;
319            *health.get(address).unwrap_or(&true)
320        };
321
322        trace!(
323            target = %address,
324            healthy = healthy,
325            prev_healthy = prev_health,
326            algorithm = "deterministic_subset",
327            "Updating target health status"
328        );
329
330        self.health_status
331            .write()
332            .await
333            .insert(address.to_string(), healthy);
334
335        // If health changed, consider rebuilding subset
336        if prev_health != healthy {
337            self.rebuild_subset_if_needed().await;
338        }
339    }
340
341    async fn healthy_targets(&self) -> Vec<String> {
342        self.health_status
343            .read()
344            .await
345            .iter()
346            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
347            .collect()
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    fn make_targets(count: usize) -> Vec<UpstreamTarget> {
356        (0..count)
357            .map(|i| UpstreamTarget::new(format!("backend-{}", i), 8080, 100))
358            .collect()
359    }
360
361    #[test]
362    fn test_subset_size_limited() {
363        let targets = make_targets(100);
364        let config = SubsetConfig {
365            subset_size: 10,
366            proxy_id: "test-proxy".to_string(),
367            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
368        };
369
370        let balancer = SubsetBalancer::new(targets, config);
371        let subset = balancer.subset.blocking_read();
372        assert_eq!(subset.len(), 10);
373    }
374
375    #[test]
376    fn test_subset_deterministic() {
377        let targets = make_targets(50);
378        let config1 = SubsetConfig {
379            subset_size: 10,
380            proxy_id: "proxy-a".to_string(),
381            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
382        };
383        let config2 = SubsetConfig {
384            subset_size: 10,
385            proxy_id: "proxy-a".to_string(),
386            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
387        };
388
389        let balancer1 = SubsetBalancer::new(targets.clone(), config1);
390        let balancer2 = SubsetBalancer::new(targets, config2);
391
392        let subset1: Vec<_> = balancer1
393            .subset
394            .blocking_read()
395            .iter()
396            .map(|t| t.full_address())
397            .collect();
398        let subset2: Vec<_> = balancer2
399            .subset
400            .blocking_read()
401            .iter()
402            .map(|t| t.full_address())
403            .collect();
404
405        // Same proxy ID should get same subset
406        assert_eq!(subset1, subset2);
407    }
408
409    #[test]
410    fn test_different_proxies_get_different_subsets() {
411        let targets = make_targets(50);
412        let config1 = SubsetConfig {
413            subset_size: 10,
414            proxy_id: "proxy-a".to_string(),
415            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
416        };
417        let config2 = SubsetConfig {
418            subset_size: 10,
419            proxy_id: "proxy-b".to_string(),
420            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
421        };
422
423        let balancer1 = SubsetBalancer::new(targets.clone(), config1);
424        let balancer2 = SubsetBalancer::new(targets, config2);
425
426        let subset1: Vec<_> = balancer1
427            .subset
428            .blocking_read()
429            .iter()
430            .map(|t| t.full_address())
431            .collect();
432        let subset2: Vec<_> = balancer2
433            .subset
434            .blocking_read()
435            .iter()
436            .map(|t| t.full_address())
437            .collect();
438
439        // Different proxy IDs should (very likely) get different subsets
440        assert_ne!(subset1, subset2);
441    }
442
443    #[tokio::test]
444    async fn test_selects_from_subset_only() {
445        let targets = make_targets(50);
446        let config = SubsetConfig {
447            subset_size: 5,
448            proxy_id: "test-proxy".to_string(),
449            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
450        };
451
452        let balancer = SubsetBalancer::new(targets, config);
453
454        // Get the subset addresses
455        let subset_addrs: Vec<_> = balancer
456            .subset
457            .read()
458            .await
459            .iter()
460            .map(|t| t.full_address())
461            .collect();
462
463        // All selections should be from the subset
464        for _ in 0..20 {
465            let selection = balancer.select(None).await.unwrap();
466            assert!(
467                subset_addrs.contains(&selection.address),
468                "Selected {} which is not in subset {:?}",
469                selection.address,
470                subset_addrs
471            );
472        }
473    }
474
475    #[test]
476    fn test_even_distribution_across_proxies() {
477        // With many proxies, each backend should be selected by roughly
478        // (num_proxies * subset_size / num_backends) proxies
479        let targets = make_targets(100);
480        let num_proxies = 100;
481        let subset_size = 10;
482
483        let mut backend_counts: HashMap<String, usize> = HashMap::new();
484
485        for i in 0..num_proxies {
486            let config = SubsetConfig {
487                subset_size,
488                proxy_id: format!("proxy-{}", i),
489                inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
490            };
491
492            // Use compute_subset directly to avoid async/blocking issues
493            let subset = SubsetBalancer::compute_subset(&targets, &config);
494
495            for target in subset.iter() {
496                *backend_counts.entry(target.full_address()).or_insert(0) += 1;
497            }
498        }
499
500        // Each backend should be selected by roughly 10 proxies (100 * 10 / 100)
501        let expected = (num_proxies * subset_size) / targets.len();
502
503        // Verify no backend is completely starved or overwhelmed
504        // With consistent hashing, some variance is expected
505        let min_count = *backend_counts.values().min().unwrap_or(&0);
506        let max_count = *backend_counts.values().max().unwrap_or(&0);
507
508        // All backends should be selected at least once
509        assert!(min_count > 0, "Some backends were never selected");
510
511        // No backend should receive more than 3x the expected traffic
512        assert!(
513            max_count <= expected * 3,
514            "Backend received too much traffic: {} (expected ~{})",
515            max_count,
516            expected
517        );
518
519        // The distribution should not be wildly skewed
520        // Standard deviation should be reasonable
521        let mean = (num_proxies * subset_size) as f64 / targets.len() as f64;
522        let variance: f64 = backend_counts
523            .values()
524            .map(|&c| (c as f64 - mean).powi(2))
525            .sum::<f64>()
526            / targets.len() as f64;
527        let std_dev = variance.sqrt();
528
529        // Standard deviation should be less than the mean (coefficient of variation < 1)
530        assert!(
531            std_dev < mean,
532            "Distribution too uneven: std_dev={:.2}, mean={:.2}",
533            std_dev,
534            mean
535        );
536    }
537}