Skip to main content

sentinel_proxy/upstream/
subset.rs

1//! Deterministic Subsetting load balancer
2//!
3//! For very large clusters (1000+ backends), maintaining connections to all
4//! backends is expensive. Deterministic subsetting limits each proxy instance
5//! to a subset of backends while ensuring:
6//!
7//! 1. Each backend gets roughly equal traffic across all proxies
8//! 2. The subset is stable (same proxy always uses same subset)
9//! 3. Subset membership is deterministic (based on proxy ID)
10//!
11//! The algorithm uses consistent hashing to assign backends to subsets,
12//! ensuring minimal disruption when backends are added or removed.
13//!
14//! Reference: https://sre.google/sre-book/load-balancing-datacenter/
15
16use async_trait::async_trait;
17use rand::seq::IndexedRandom;
18use std::collections::HashMap;
19use std::hash::{Hash, Hasher};
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::sync::RwLock;
23use tracing::{debug, info, trace, warn};
24use xxhash_rust::xxh3::xxh3_64;
25
26use sentinel_common::errors::{SentinelError, SentinelResult};
27
28use super::{LoadBalancer, RequestContext, TargetSelection, UpstreamTarget};
29
30/// Configuration for Deterministic Subsetting
31#[derive(Debug, Clone)]
32pub struct SubsetConfig {
33    /// Number of backends in each subset (default: 10)
34    /// Smaller subsets reduce connection overhead but may impact load distribution
35    pub subset_size: usize,
36    /// Unique identifier for this proxy instance
37    /// Used to deterministically select which subset this proxy uses
38    pub proxy_id: String,
39    /// Inner load balancing algorithm for selecting within the subset
40    pub inner_algorithm: SubsetInnerAlgorithm,
41}
42
43impl Default for SubsetConfig {
44    fn default() -> Self {
45        Self {
46            subset_size: 10,
47            // Default to a random proxy ID (each instance gets unique subset)
48            proxy_id: format!("proxy-{}", rand::random::<u32>()),
49            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
50        }
51    }
52}
53
54/// Inner algorithm for selecting within the subset
55#[derive(Debug, Clone, Copy, Default)]
56pub enum SubsetInnerAlgorithm {
57    /// Round-robin within subset (default)
58    #[default]
59    RoundRobin,
60    /// Random selection within subset
61    Random,
62    /// Least connections within subset
63    LeastConnections,
64}
65
66/// Deterministic Subsetting load balancer
67pub struct SubsetBalancer {
68    /// Full list of all targets (for subset calculation)
69    all_targets: Vec<UpstreamTarget>,
70    /// Current subset of targets for this proxy
71    subset: Arc<RwLock<Vec<UpstreamTarget>>>,
72    /// Round-robin counter for inner algorithm
73    current: AtomicUsize,
74    /// Connection counts per target (for least connections)
75    connections: Arc<RwLock<HashMap<String, usize>>>,
76    /// Health status per target
77    health_status: Arc<RwLock<HashMap<String, bool>>>,
78    /// Configuration
79    config: SubsetConfig,
80}
81
82impl SubsetBalancer {
83    /// Create a new Subset balancer
84    pub fn new(targets: Vec<UpstreamTarget>, config: SubsetConfig) -> Self {
85        let mut health_status = HashMap::new();
86        let mut connections = HashMap::new();
87
88        for target in &targets {
89            let addr = target.full_address();
90            health_status.insert(addr.clone(), true);
91            connections.insert(addr, 0);
92        }
93
94        let subset = Self::compute_subset(&targets, &config);
95
96        info!(
97            total_targets = targets.len(),
98            subset_size = subset.len(),
99            proxy_id = %config.proxy_id,
100            algorithm = "deterministic_subset",
101            "Created subset balancer"
102        );
103
104        for target in &subset {
105            debug!(
106                target = %target.full_address(),
107                proxy_id = %config.proxy_id,
108                "Target included in subset"
109            );
110        }
111
112        Self {
113            all_targets: targets,
114            subset: Arc::new(RwLock::new(subset)),
115            current: AtomicUsize::new(0),
116            connections: Arc::new(RwLock::new(connections)),
117            health_status: Arc::new(RwLock::new(health_status)),
118            config,
119        }
120    }
121
122    /// Compute the subset of targets for this proxy instance
123    fn compute_subset(targets: &[UpstreamTarget], config: &SubsetConfig) -> Vec<UpstreamTarget> {
124        if targets.is_empty() {
125            return Vec::new();
126        }
127
128        let subset_size = config.subset_size.min(targets.len());
129
130        // Hash each target to get a score relative to this proxy
131        let mut scored_targets: Vec<_> = targets
132            .iter()
133            .map(|t| {
134                let score = Self::subset_score(&t.full_address(), &config.proxy_id);
135                (t.clone(), score)
136            })
137            .collect();
138
139        // Sort by score and take the top N
140        scored_targets.sort_by_key(|(_, score)| *score);
141        scored_targets
142            .into_iter()
143            .take(subset_size)
144            .map(|(t, _)| t)
145            .collect()
146    }
147
148    /// Calculate a deterministic score for a target-proxy pair
149    /// Lower scores mean the target is "closer" to this proxy
150    fn subset_score(target_addr: &str, proxy_id: &str) -> u64 {
151        // Combine target and proxy identifiers
152        let combined = format!("{}:{}", target_addr, proxy_id);
153        xxh3_64(combined.as_bytes())
154    }
155
156    /// Rebuild subset when health changes significantly
157    async fn rebuild_subset_if_needed(&self) {
158        let health = self.health_status.read().await;
159        let current_subset = self.subset.read().await;
160
161        // Count healthy targets in current subset
162        let healthy_in_subset = current_subset
163            .iter()
164            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
165            .count();
166
167        drop(current_subset);
168        drop(health);
169
170        // If less than half the subset is healthy, try to rebuild
171        if healthy_in_subset < self.config.subset_size / 2 {
172            self.rebuild_subset().await;
173        }
174    }
175
176    /// Rebuild the subset considering health status
177    async fn rebuild_subset(&self) {
178        let health = self.health_status.read().await;
179
180        // Get all healthy targets
181        let healthy_targets: Vec<_> = self
182            .all_targets
183            .iter()
184            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
185            .cloned()
186            .collect();
187
188        drop(health);
189
190        if healthy_targets.is_empty() {
191            // Keep current subset as fallback
192            return;
193        }
194
195        // Recompute subset from healthy targets
196        let new_subset = Self::compute_subset(&healthy_targets, &self.config);
197
198        info!(
199            new_subset_size = new_subset.len(),
200            healthy_total = healthy_targets.len(),
201            proxy_id = %self.config.proxy_id,
202            algorithm = "deterministic_subset",
203            "Rebuilt subset from healthy targets"
204        );
205
206        let mut subset = self.subset.write().await;
207        *subset = new_subset;
208    }
209
210    /// Select using inner algorithm
211    async fn select_from_subset<'a>(
212        &self,
213        healthy: &[&'a UpstreamTarget],
214    ) -> Option<&'a UpstreamTarget> {
215        if healthy.is_empty() {
216            return None;
217        }
218
219        match self.config.inner_algorithm {
220            SubsetInnerAlgorithm::RoundRobin => {
221                let idx = self.current.fetch_add(1, Ordering::Relaxed) % healthy.len();
222                Some(healthy[idx])
223            }
224            SubsetInnerAlgorithm::Random => {
225                use rand::seq::SliceRandom;
226                let mut rng = rand::rng();
227                healthy.choose(&mut rng).copied()
228            }
229            SubsetInnerAlgorithm::LeastConnections => {
230                let conns = self.connections.read().await;
231                healthy
232                    .iter()
233                    .min_by_key(|t| conns.get(&t.full_address()).copied().unwrap_or(0))
234                    .copied()
235            }
236        }
237    }
238}
239
240#[async_trait]
241impl LoadBalancer for SubsetBalancer {
242    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
243        trace!(
244            total_targets = self.all_targets.len(),
245            algorithm = "deterministic_subset",
246            "Selecting upstream target"
247        );
248
249        // Check if we need to rebuild subset
250        self.rebuild_subset_if_needed().await;
251
252        let health = self.health_status.read().await;
253        let subset = self.subset.read().await;
254
255        // Get healthy targets from our subset
256        let healthy_subset: Vec<_> = subset
257            .iter()
258            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
259            .collect();
260
261        drop(health);
262
263        if healthy_subset.is_empty() {
264            warn!(
265                subset_size = subset.len(),
266                total_targets = self.all_targets.len(),
267                proxy_id = %self.config.proxy_id,
268                algorithm = "deterministic_subset",
269                "No healthy targets in subset"
270            );
271            drop(subset);
272            return Err(SentinelError::NoHealthyUpstream);
273        }
274
275        let target = self
276            .select_from_subset(&healthy_subset)
277            .await
278            .ok_or(SentinelError::NoHealthyUpstream)?;
279
280        // Track connections if using least connections
281        if matches!(
282            self.config.inner_algorithm,
283            SubsetInnerAlgorithm::LeastConnections
284        ) {
285            let mut conns = self.connections.write().await;
286            *conns.entry(target.full_address()).or_insert(0) += 1;
287        }
288
289        trace!(
290            selected_target = %target.full_address(),
291            subset_size = subset.len(),
292            healthy_count = healthy_subset.len(),
293            proxy_id = %self.config.proxy_id,
294            algorithm = "deterministic_subset",
295            "Selected target from subset"
296        );
297
298        Ok(TargetSelection {
299            address: target.full_address(),
300            weight: target.weight,
301            metadata: HashMap::new(),
302        })
303    }
304
305    async fn release(&self, selection: &TargetSelection) {
306        if matches!(
307            self.config.inner_algorithm,
308            SubsetInnerAlgorithm::LeastConnections
309        ) {
310            let mut conns = self.connections.write().await;
311            if let Some(count) = conns.get_mut(&selection.address) {
312                *count = count.saturating_sub(1);
313            }
314        }
315    }
316
317    async fn report_health(&self, address: &str, healthy: bool) {
318        let prev_health = {
319            let health = self.health_status.read().await;
320            *health.get(address).unwrap_or(&true)
321        };
322
323        trace!(
324            target = %address,
325            healthy = healthy,
326            prev_healthy = prev_health,
327            algorithm = "deterministic_subset",
328            "Updating target health status"
329        );
330
331        self.health_status
332            .write()
333            .await
334            .insert(address.to_string(), healthy);
335
336        // If health changed, consider rebuilding subset
337        if prev_health != healthy {
338            self.rebuild_subset_if_needed().await;
339        }
340    }
341
342    async fn healthy_targets(&self) -> Vec<String> {
343        self.health_status
344            .read()
345            .await
346            .iter()
347            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
348            .collect()
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    fn make_targets(count: usize) -> Vec<UpstreamTarget> {
357        (0..count)
358            .map(|i| UpstreamTarget::new(format!("backend-{}", i), 8080, 100))
359            .collect()
360    }
361
362    #[test]
363    fn test_subset_size_limited() {
364        let targets = make_targets(100);
365        let config = SubsetConfig {
366            subset_size: 10,
367            proxy_id: "test-proxy".to_string(),
368            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
369        };
370
371        let balancer = SubsetBalancer::new(targets, config);
372        let subset = balancer.subset.blocking_read();
373        assert_eq!(subset.len(), 10);
374    }
375
376    #[test]
377    fn test_subset_deterministic() {
378        let targets = make_targets(50);
379        let config1 = SubsetConfig {
380            subset_size: 10,
381            proxy_id: "proxy-a".to_string(),
382            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
383        };
384        let config2 = SubsetConfig {
385            subset_size: 10,
386            proxy_id: "proxy-a".to_string(),
387            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
388        };
389
390        let balancer1 = SubsetBalancer::new(targets.clone(), config1);
391        let balancer2 = SubsetBalancer::new(targets, config2);
392
393        let subset1: Vec<_> = balancer1
394            .subset
395            .blocking_read()
396            .iter()
397            .map(|t| t.full_address())
398            .collect();
399        let subset2: Vec<_> = balancer2
400            .subset
401            .blocking_read()
402            .iter()
403            .map(|t| t.full_address())
404            .collect();
405
406        // Same proxy ID should get same subset
407        assert_eq!(subset1, subset2);
408    }
409
410    #[test]
411    fn test_different_proxies_get_different_subsets() {
412        let targets = make_targets(50);
413        let config1 = SubsetConfig {
414            subset_size: 10,
415            proxy_id: "proxy-a".to_string(),
416            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
417        };
418        let config2 = SubsetConfig {
419            subset_size: 10,
420            proxy_id: "proxy-b".to_string(),
421            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
422        };
423
424        let balancer1 = SubsetBalancer::new(targets.clone(), config1);
425        let balancer2 = SubsetBalancer::new(targets, config2);
426
427        let subset1: Vec<_> = balancer1
428            .subset
429            .blocking_read()
430            .iter()
431            .map(|t| t.full_address())
432            .collect();
433        let subset2: Vec<_> = balancer2
434            .subset
435            .blocking_read()
436            .iter()
437            .map(|t| t.full_address())
438            .collect();
439
440        // Different proxy IDs should (very likely) get different subsets
441        assert_ne!(subset1, subset2);
442    }
443
444    #[tokio::test]
445    async fn test_selects_from_subset_only() {
446        let targets = make_targets(50);
447        let config = SubsetConfig {
448            subset_size: 5,
449            proxy_id: "test-proxy".to_string(),
450            inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
451        };
452
453        let balancer = SubsetBalancer::new(targets, config);
454
455        // Get the subset addresses
456        let subset_addrs: Vec<_> = balancer
457            .subset
458            .read()
459            .await
460            .iter()
461            .map(|t| t.full_address())
462            .collect();
463
464        // All selections should be from the subset
465        for _ in 0..20 {
466            let selection = balancer.select(None).await.unwrap();
467            assert!(
468                subset_addrs.contains(&selection.address),
469                "Selected {} which is not in subset {:?}",
470                selection.address,
471                subset_addrs
472            );
473        }
474    }
475
476    #[test]
477    fn test_even_distribution_across_proxies() {
478        // With many proxies, each backend should be selected by roughly
479        // (num_proxies * subset_size / num_backends) proxies
480        let targets = make_targets(100);
481        let num_proxies = 100;
482        let subset_size = 10;
483
484        let mut backend_counts: HashMap<String, usize> = HashMap::new();
485
486        for i in 0..num_proxies {
487            let config = SubsetConfig {
488                subset_size,
489                proxy_id: format!("proxy-{}", i),
490                inner_algorithm: SubsetInnerAlgorithm::RoundRobin,
491            };
492
493            // Use compute_subset directly to avoid async/blocking issues
494            let subset = SubsetBalancer::compute_subset(&targets, &config);
495
496            for target in subset.iter() {
497                *backend_counts.entry(target.full_address()).or_insert(0) += 1;
498            }
499        }
500
501        // Each backend should be selected by roughly 10 proxies (100 * 10 / 100)
502        let expected = (num_proxies * subset_size) / targets.len();
503
504        // Verify no backend is completely starved or overwhelmed
505        // With consistent hashing, some variance is expected
506        let min_count = *backend_counts.values().min().unwrap_or(&0);
507        let max_count = *backend_counts.values().max().unwrap_or(&0);
508
509        // All backends should be selected at least once
510        assert!(min_count > 0, "Some backends were never selected");
511
512        // No backend should receive more than 3x the expected traffic
513        assert!(
514            max_count <= expected * 3,
515            "Backend received too much traffic: {} (expected ~{})",
516            max_count,
517            expected
518        );
519
520        // The distribution should not be wildly skewed
521        // Standard deviation should be reasonable
522        let mean = (num_proxies * subset_size) as f64 / targets.len() as f64;
523        let variance: f64 = backend_counts
524            .values()
525            .map(|&c| (c as f64 - mean).powi(2))
526            .sum::<f64>()
527            / targets.len() as f64;
528        let std_dev = variance.sqrt();
529
530        // Standard deviation should be less than the mean (coefficient of variation < 1)
531        assert!(
532            std_dev < mean,
533            "Distribution too uneven: std_dev={:.2}, mean={:.2}",
534            std_dev,
535            mean
536        );
537    }
538}