sentinel_proxy/upstream/
p2c.rs

1use async_trait::async_trait;
2use rand::rngs::StdRng;
3use rand::{Rng, SeedableRng};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::RwLock;
9
10use tracing::{debug, info, trace, warn};
11
12use super::{LoadBalancer, RequestContext, TargetSelection, UpstreamTarget};
13use sentinel_common::errors::{SentinelError, SentinelResult};
14
15/// Load metric type for P2C selection
16#[derive(Debug, Clone, Copy)]
17pub enum LoadMetric {
18    /// Active connection count
19    Connections,
20    /// Average response latency
21    Latency,
22    /// Combined score (connections * latency)
23    Combined,
24    /// CPU usage (requires external monitoring)
25    CpuUsage,
26    /// Request rate
27    RequestRate,
28}
29
30impl Default for LoadMetric {
31    fn default() -> Self {
32        LoadMetric::Connections
33    }
34}
35
36/// Configuration for P2C load balancer
37#[derive(Debug, Clone)]
38pub struct P2cConfig {
39    /// Load metric to use for selection
40    pub load_metric: LoadMetric,
41    /// Weight multiplier for secondary metric in combined mode
42    pub secondary_weight: f64,
43    /// Whether to use weighted random selection
44    pub use_weights: bool,
45    /// Latency window for averaging (in seconds)
46    pub latency_window_secs: u64,
47    /// Enable power of three choices for better distribution
48    pub power_of_three: bool,
49}
50
51impl Default for P2cConfig {
52    fn default() -> Self {
53        Self {
54            load_metric: LoadMetric::Connections,
55            secondary_weight: 0.5,
56            use_weights: true,
57            latency_window_secs: 10,
58            power_of_three: false,
59        }
60    }
61}
62
63/// Target metrics for load calculation
64#[derive(Debug, Clone)]
65struct TargetMetrics {
66    /// Active connections
67    connections: Arc<AtomicU64>,
68    /// Total requests
69    requests: Arc<AtomicU64>,
70    /// Total latency in microseconds
71    total_latency_us: Arc<AtomicU64>,
72    /// Request count for latency averaging
73    latency_count: Arc<AtomicU64>,
74    /// CPU usage percentage (0-100)
75    cpu_usage: Arc<AtomicU64>,
76    /// Last update time
77    last_update: Arc<RwLock<Instant>>,
78    /// Recent latency measurements (ring buffer)
79    recent_latencies: Arc<RwLock<Vec<Duration>>>,
80    /// Ring buffer position
81    latency_buffer_pos: Arc<AtomicUsize>,
82}
83
84impl TargetMetrics {
85    fn new(buffer_size: usize) -> Self {
86        Self {
87            connections: Arc::new(AtomicU64::new(0)),
88            requests: Arc::new(AtomicU64::new(0)),
89            total_latency_us: Arc::new(AtomicU64::new(0)),
90            latency_count: Arc::new(AtomicU64::new(0)),
91            cpu_usage: Arc::new(AtomicU64::new(0)),
92            last_update: Arc::new(RwLock::new(Instant::now())),
93            recent_latencies: Arc::new(RwLock::new(vec![Duration::ZERO; buffer_size])),
94            latency_buffer_pos: Arc::new(AtomicUsize::new(0)),
95        }
96    }
97
98    /// Calculate average latency over the window
99    async fn average_latency(&self) -> Duration {
100        let latencies = self.recent_latencies.read().await;
101        let count = self.latency_count.load(Ordering::Relaxed);
102
103        if count == 0 {
104            return Duration::ZERO;
105        }
106
107        let total: Duration = latencies.iter().sum();
108        let sample_count = count.min(latencies.len() as u64);
109
110        if sample_count > 0 {
111            total / sample_count as u32
112        } else {
113            Duration::ZERO
114        }
115    }
116
117    /// Record a latency measurement
118    async fn record_latency(&self, latency: Duration) {
119        let pos = self.latency_buffer_pos.fetch_add(1, Ordering::Relaxed);
120        let mut latencies = self.recent_latencies.write().await;
121        let buffer_size = latencies.len();
122        latencies[pos % buffer_size] = latency;
123
124        self.total_latency_us
125            .fetch_add(latency.as_micros() as u64, Ordering::Relaxed);
126        self.latency_count.fetch_add(1, Ordering::Relaxed);
127    }
128
129    /// Get current load based on metric type
130    async fn get_load(&self, metric: LoadMetric) -> f64 {
131        match metric {
132            LoadMetric::Connections => self.connections.load(Ordering::Relaxed) as f64,
133            LoadMetric::Latency => self.average_latency().await.as_micros() as f64,
134            LoadMetric::Combined => {
135                let connections = self.connections.load(Ordering::Relaxed) as f64;
136                let latency = self.average_latency().await.as_micros() as f64;
137                // Normalize latency to be on similar scale as connections
138                // (assuming avg latency ~10ms = 10000us, and avg connections ~100)
139                connections + (latency / 100.0)
140            }
141            LoadMetric::CpuUsage => self.cpu_usage.load(Ordering::Relaxed) as f64,
142            LoadMetric::RequestRate => {
143                // Calculate requests per second over the last update interval
144                let requests = self.requests.load(Ordering::Relaxed);
145                let last_update = *self.last_update.read().await;
146                let elapsed = last_update.elapsed().as_secs_f64();
147                if elapsed > 0.0 {
148                    requests as f64 / elapsed
149                } else {
150                    0.0
151                }
152            }
153        }
154    }
155}
156
157/// Power of Two Choices load balancer
158pub struct P2cBalancer {
159    /// Configuration
160    config: P2cConfig,
161    /// All upstream targets
162    targets: Vec<UpstreamTarget>,
163    /// Target health status
164    health_status: Arc<RwLock<HashMap<String, bool>>>,
165    /// Metrics per target
166    metrics: Vec<TargetMetrics>,
167    /// Random number generator (thread-safe)
168    rng: Arc<RwLock<StdRng>>,
169    /// Cumulative weights for weighted selection
170    cumulative_weights: Vec<u32>,
171}
172
173impl P2cBalancer {
174    pub fn new(targets: Vec<UpstreamTarget>, config: P2cConfig) -> Self {
175        trace!(
176            target_count = targets.len(),
177            load_metric = ?config.load_metric,
178            use_weights = config.use_weights,
179            power_of_three = config.power_of_three,
180            latency_window_secs = config.latency_window_secs,
181            "Creating P2C balancer"
182        );
183
184        let buffer_size = (config.latency_window_secs * 100) as usize; // 100 samples/sec
185        let metrics = targets
186            .iter()
187            .map(|_| TargetMetrics::new(buffer_size))
188            .collect();
189
190        // Calculate cumulative weights for weighted random selection
191        let mut cumulative_weights = Vec::with_capacity(targets.len());
192        let mut cumsum = 0u32;
193        for target in &targets {
194            cumsum += target.weight;
195            cumulative_weights.push(cumsum);
196        }
197
198        debug!(
199            target_count = targets.len(),
200            total_weight = cumsum,
201            buffer_size = buffer_size,
202            "P2C balancer initialized"
203        );
204
205        Self {
206            config,
207            targets,
208            health_status: Arc::new(RwLock::new(HashMap::new())),
209            metrics,
210            rng: Arc::new(RwLock::new(StdRng::from_entropy())),
211            cumulative_weights,
212        }
213    }
214
215    /// Select a random healthy target index
216    async fn random_healthy_target(&self) -> Option<usize> {
217        let health = self.health_status.read().await;
218        let healthy_indices: Vec<usize> = self
219            .targets
220            .iter()
221            .enumerate()
222            .filter_map(|(i, t)| {
223                let target_id = format!("{}:{}", t.address, t.port);
224                if health.get(&target_id).copied().unwrap_or(true) {
225                    Some(i)
226                } else {
227                    None
228                }
229            })
230            .collect();
231
232        trace!(
233            total_targets = self.targets.len(),
234            healthy_count = healthy_indices.len(),
235            use_weights = self.config.use_weights,
236            "Selecting random healthy target"
237        );
238
239        if healthy_indices.is_empty() {
240            warn!("No healthy targets available for P2C selection");
241            return None;
242        }
243
244        let mut rng = self.rng.write().await;
245
246        if self.config.use_weights && !self.cumulative_weights.is_empty() {
247            // Weighted random selection
248            let total_weight = self.cumulative_weights.last().copied().unwrap_or(0);
249            if total_weight > 0 {
250                let threshold = rng.gen_range(0..total_weight);
251                for &idx in &healthy_indices {
252                    if self.cumulative_weights[idx] > threshold {
253                        trace!(
254                            target_index = idx,
255                            threshold = threshold,
256                            "Selected target via weighted random"
257                        );
258                        return Some(idx);
259                    }
260                }
261            }
262        }
263
264        // Fallback to uniform random
265        let selected = healthy_indices[rng.gen_range(0..healthy_indices.len())];
266        trace!(
267            target_index = selected,
268            "Selected target via uniform random"
269        );
270        Some(selected)
271    }
272
273    /// Select the least loaded target from candidates
274    async fn select_least_loaded(&self, candidates: Vec<usize>) -> Option<usize> {
275        if candidates.is_empty() {
276            trace!("No candidates provided for least loaded selection");
277            return None;
278        }
279
280        trace!(
281            candidate_count = candidates.len(),
282            load_metric = ?self.config.load_metric,
283            "Evaluating candidates for least loaded"
284        );
285
286        let mut min_load = f64::MAX;
287        let mut best_target = candidates[0];
288
289        for &idx in &candidates {
290            let load = self.metrics[idx].get_load(self.config.load_metric).await;
291
292            trace!(
293                target_index = idx,
294                load = load,
295                "Candidate load"
296            );
297
298            if load < min_load {
299                min_load = load;
300                best_target = idx;
301            }
302        }
303
304        debug!(
305            target_index = best_target,
306            load = min_load,
307            candidate_count = candidates.len(),
308            "P2C selected least loaded target"
309        );
310
311        Some(best_target)
312    }
313
314    /// Track connection acquisition
315    pub fn acquire_connection(&self, target_index: usize) {
316        let connections = self.metrics[target_index]
317            .connections
318            .fetch_add(1, Ordering::Relaxed) + 1;
319        let requests = self.metrics[target_index]
320            .requests
321            .fetch_add(1, Ordering::Relaxed) + 1;
322
323        trace!(
324            target_index = target_index,
325            connections = connections,
326            total_requests = requests,
327            "P2C acquired connection"
328        );
329    }
330
331    /// Track connection release
332    pub fn release_connection(&self, target_index: usize) {
333        let connections = self.metrics[target_index]
334            .connections
335            .fetch_sub(1, Ordering::Relaxed) - 1;
336
337        trace!(
338            target_index = target_index,
339            connections = connections,
340            "P2C released connection"
341        );
342    }
343
344    /// Update target metrics
345    pub async fn update_metrics(
346        &self,
347        target_index: usize,
348        latency: Option<Duration>,
349        cpu_usage: Option<u8>,
350    ) {
351        trace!(
352            target_index = target_index,
353            latency_ms = latency.map(|l| l.as_millis() as u64),
354            cpu_usage = cpu_usage,
355            "Updating P2C target metrics"
356        );
357
358        if let Some(latency) = latency {
359            self.metrics[target_index].record_latency(latency).await;
360        }
361
362        if let Some(cpu) = cpu_usage {
363            self.metrics[target_index]
364                .cpu_usage
365                .store(cpu as u64, Ordering::Relaxed);
366        }
367
368        *self.metrics[target_index].last_update.write().await = Instant::now();
369    }
370}
371
372#[async_trait]
373impl LoadBalancer for P2cBalancer {
374    async fn select(
375        &self,
376        _context: Option<&RequestContext>,
377    ) -> SentinelResult<TargetSelection> {
378        // Select candidates
379        let num_choices = if self.config.power_of_three { 3 } else { 2 };
380
381        trace!(
382            num_choices = num_choices,
383            power_of_three = self.config.power_of_three,
384            "P2C select started"
385        );
386
387        let mut candidates = Vec::with_capacity(num_choices);
388
389        for i in 0..num_choices {
390            if let Some(idx) = self.random_healthy_target().await {
391                if !candidates.contains(&idx) {
392                    candidates.push(idx);
393                    trace!(
394                        choice = i,
395                        target_index = idx,
396                        "Added candidate"
397                    );
398                }
399            }
400        }
401
402        if candidates.is_empty() {
403            warn!("P2C: No healthy targets available");
404            return Err(SentinelError::NoHealthyUpstream);
405        }
406
407        // Select least loaded from candidates
408        let target_index = self
409            .select_least_loaded(candidates)
410            .await
411            .ok_or_else(|| {
412                warn!("P2C: Failed to select from candidates");
413                SentinelError::NoHealthyUpstream
414            })?;
415
416        let target = &self.targets[target_index];
417
418        // Track connection
419        self.acquire_connection(target_index);
420
421        // Get current metrics for metadata
422        let current_load = self.metrics[target_index]
423            .get_load(self.config.load_metric)
424            .await;
425        let connections = self.metrics[target_index]
426            .connections
427            .load(Ordering::Relaxed);
428        let avg_latency = self.metrics[target_index].average_latency().await;
429
430        debug!(
431            target = %format!("{}:{}", target.address, target.port),
432            target_index = target_index,
433            load = current_load,
434            connections = connections,
435            avg_latency_ms = avg_latency.as_millis() as u64,
436            "P2C selected target"
437        );
438
439        Ok(TargetSelection {
440            address: format!("{}:{}", target.address, target.port),
441            weight: target.weight,
442            metadata: {
443                let mut meta = HashMap::new();
444                meta.insert("algorithm".to_string(), "p2c".to_string());
445                meta.insert("target_index".to_string(), target_index.to_string());
446                meta.insert("current_load".to_string(), format!("{:.2}", current_load));
447                meta.insert("connections".to_string(), connections.to_string());
448                meta.insert(
449                    "avg_latency_ms".to_string(),
450                    format!("{:.2}", avg_latency.as_millis()),
451                );
452                meta.insert(
453                    "metric_type".to_string(),
454                    format!("{:?}", self.config.load_metric),
455                );
456                meta
457            },
458        })
459    }
460
461    async fn report_health(&self, address: &str, healthy: bool) {
462        trace!(
463            address = %address,
464            healthy = healthy,
465            "P2C reporting target health"
466        );
467
468        let mut health = self.health_status.write().await;
469        let previous = health.insert(address.to_string(), healthy);
470
471        if previous != Some(healthy) {
472            info!(
473                address = %address,
474                previous = ?previous,
475                healthy = healthy,
476                "P2C target health changed"
477            );
478        }
479    }
480
481    async fn healthy_targets(&self) -> Vec<String> {
482        let health = self.health_status.read().await;
483        let targets: Vec<String> = self.targets
484            .iter()
485            .filter_map(|t| {
486                let target_id = format!("{}:{}", t.address, t.port);
487                if health.get(&target_id).copied().unwrap_or(true) {
488                    Some(target_id)
489                } else {
490                    None
491                }
492            })
493            .collect();
494
495        trace!(
496            total = self.targets.len(),
497            healthy = targets.len(),
498            "P2C healthy targets"
499        );
500
501        targets
502    }
503
504    async fn release(&self, selection: &TargetSelection) {
505        if let Some(index_str) = selection.metadata.get("target_index") {
506            if let Ok(index) = index_str.parse::<usize>() {
507                trace!(
508                    target_index = index,
509                    address = %selection.address,
510                    "P2C releasing connection"
511                );
512                self.release_connection(index);
513            }
514        }
515    }
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521
522    fn create_test_targets(count: usize) -> Vec<UpstreamTarget> {
523        (0..count)
524            .map(|i| UpstreamTarget {
525                address: format!("10.0.0.{}", i + 1),
526                port: 8080,
527                weight: 100,
528            })
529            .collect()
530    }
531
532    #[tokio::test]
533    async fn test_p2c_selection() {
534        let targets = create_test_targets(5);
535        let config = P2cConfig::default();
536        let balancer = P2cBalancer::new(targets.clone(), config);
537
538        // Simulate different loads
539        balancer.metrics[0].connections.store(10, Ordering::Relaxed);
540        balancer.metrics[1].connections.store(5, Ordering::Relaxed);
541        balancer.metrics[2].connections.store(15, Ordering::Relaxed);
542        balancer.metrics[3].connections.store(3, Ordering::Relaxed);
543        balancer.metrics[4].connections.store(8, Ordering::Relaxed);
544
545        // Run selections and verify distribution
546        let mut selections = vec![0usize; 5];
547        for _ in 0..1000 {
548            if let Ok(selection) = balancer.select(None).await {
549                if let Some(idx_str) = selection.metadata.get("target_index") {
550                    if let Ok(idx) = idx_str.parse::<usize>() {
551                        selections[idx] += 1;
552
553                        // Simulate connection release
554                        balancer.release(&selection).await;
555                    }
556                }
557            }
558        }
559
560        // Verify that lower loaded targets get more selections
561        // Target 3 (load=3) should get more than target 2 (load=15)
562        assert!(selections[3] > selections[2]);
563
564        // All targets should get some traffic
565        for count in selections {
566            assert!(count > 0, "All targets should receive some traffic");
567        }
568    }
569
570    #[tokio::test]
571    async fn test_p2c_with_latency_metric() {
572        let targets = create_test_targets(3);
573        let config = P2cConfig {
574            load_metric: LoadMetric::Latency,
575            ..Default::default()
576        };
577        let balancer = P2cBalancer::new(targets.clone(), config);
578
579        // Set different latencies
580        balancer
581            .update_metrics(0, Some(Duration::from_millis(100)), None)
582            .await;
583        balancer
584            .update_metrics(1, Some(Duration::from_millis(10)), None)
585            .await;
586        balancer
587            .update_metrics(2, Some(Duration::from_millis(50)), None)
588            .await;
589
590        let selection = balancer.select(None).await.unwrap();
591        let metadata = &selection.metadata;
592
593        // Should tend to select lower latency targets
594        assert!(metadata.contains_key("avg_latency_ms"));
595    }
596
597    #[tokio::test]
598    async fn test_p2c_power_of_three() {
599        let targets = create_test_targets(10);
600        let config = P2cConfig {
601            power_of_three: true,
602            ..Default::default()
603        };
604        let balancer = P2cBalancer::new(targets.clone(), config);
605
606        // Set varied loads
607        for i in 0..10 {
608            balancer.metrics[i]
609                .connections
610                .store((i * 2) as u64, Ordering::Relaxed);
611        }
612
613        let mut low_load_selections = 0;
614        for _ in 0..100 {
615            if let Ok(selection) = balancer.select(None).await {
616                if let Some(idx_str) = selection.metadata.get("target_index") {
617                    if let Ok(idx) = idx_str.parse::<usize>() {
618                        if idx < 3 {
619                            // Low load targets
620                            low_load_selections += 1;
621                        }
622                        balancer.release(&selection).await;
623                    }
624                }
625            }
626        }
627
628        // Power of three should give even better selection of low-load targets
629        assert!(
630            low_load_selections > 60,
631            "P3C should favor low-load targets more"
632        );
633    }
634
635    #[tokio::test]
636    async fn test_weighted_selection() {
637        let mut targets = create_test_targets(3);
638        targets[0].weight = 100;
639        targets[1].weight = 200; // Double weight
640        targets[2].weight = 100;
641
642        let config = P2cConfig {
643            use_weights: true,
644            ..Default::default()
645        };
646        let balancer = P2cBalancer::new(targets.clone(), config);
647
648        // Equal loads - weight should influence selection
649        for i in 0..3 {
650            balancer.metrics[i].connections.store(5, Ordering::Relaxed);
651        }
652
653        let mut selections = vec![0usize; 3];
654        for _ in 0..1000 {
655            if let Some(idx) = balancer.random_healthy_target().await {
656                selections[idx] += 1;
657            }
658        }
659
660        // Target 1 should get roughly twice the traffic due to weight
661        let ratio = selections[1] as f64 / selections[0] as f64;
662        assert!(
663            ratio > 1.5 && ratio < 2.5,
664            "Weighted selection not working properly"
665        );
666    }
667}