sentinel_proxy/upstream/
p2c.rs

1use async_trait::async_trait;
2use rand::rngs::StdRng;
3use rand::{Rng, SeedableRng};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::RwLock;
9
10use tracing::{debug, info, trace, warn};
11
12use super::{LoadBalancer, RequestContext, TargetSelection, UpstreamTarget};
13use sentinel_common::errors::{SentinelError, SentinelResult};
14
15/// Load metric type for P2C selection
16#[derive(Debug, Clone, Copy, Default)]
17pub enum LoadMetric {
18    /// Active connection count
19    #[default]
20    Connections,
21    /// Average response latency
22    Latency,
23    /// Combined score (connections * latency)
24    Combined,
25    /// CPU usage (requires external monitoring)
26    CpuUsage,
27    /// Request rate
28    RequestRate,
29}
30
31/// Configuration for P2C load balancer
32#[derive(Debug, Clone)]
33pub struct P2cConfig {
34    /// Load metric to use for selection
35    pub load_metric: LoadMetric,
36    /// Weight multiplier for secondary metric in combined mode
37    pub secondary_weight: f64,
38    /// Whether to use weighted random selection
39    pub use_weights: bool,
40    /// Latency window for averaging (in seconds)
41    pub latency_window_secs: u64,
42    /// Enable power of three choices for better distribution
43    pub power_of_three: bool,
44}
45
46impl Default for P2cConfig {
47    fn default() -> Self {
48        Self {
49            load_metric: LoadMetric::Connections,
50            secondary_weight: 0.5,
51            use_weights: true,
52            latency_window_secs: 10,
53            power_of_three: false,
54        }
55    }
56}
57
58/// Target metrics for load calculation
59#[derive(Debug, Clone)]
60struct TargetMetrics {
61    /// Active connections
62    connections: Arc<AtomicU64>,
63    /// Total requests
64    requests: Arc<AtomicU64>,
65    /// Total latency in microseconds
66    total_latency_us: Arc<AtomicU64>,
67    /// Request count for latency averaging
68    latency_count: Arc<AtomicU64>,
69    /// CPU usage percentage (0-100)
70    cpu_usage: Arc<AtomicU64>,
71    /// Last update time
72    last_update: Arc<RwLock<Instant>>,
73    /// Recent latency measurements (ring buffer)
74    recent_latencies: Arc<RwLock<Vec<Duration>>>,
75    /// Ring buffer position
76    latency_buffer_pos: Arc<AtomicUsize>,
77}
78
79impl TargetMetrics {
80    fn new(buffer_size: usize) -> Self {
81        Self {
82            connections: Arc::new(AtomicU64::new(0)),
83            requests: Arc::new(AtomicU64::new(0)),
84            total_latency_us: Arc::new(AtomicU64::new(0)),
85            latency_count: Arc::new(AtomicU64::new(0)),
86            cpu_usage: Arc::new(AtomicU64::new(0)),
87            last_update: Arc::new(RwLock::new(Instant::now())),
88            recent_latencies: Arc::new(RwLock::new(vec![Duration::ZERO; buffer_size])),
89            latency_buffer_pos: Arc::new(AtomicUsize::new(0)),
90        }
91    }
92
93    /// Calculate average latency over the window
94    async fn average_latency(&self) -> Duration {
95        let latencies = self.recent_latencies.read().await;
96        let count = self.latency_count.load(Ordering::Relaxed);
97
98        if count == 0 {
99            return Duration::ZERO;
100        }
101
102        let total: Duration = latencies.iter().sum();
103        let sample_count = count.min(latencies.len() as u64);
104
105        if sample_count > 0 {
106            total / sample_count as u32
107        } else {
108            Duration::ZERO
109        }
110    }
111
112    /// Record a latency measurement
113    async fn record_latency(&self, latency: Duration) {
114        let pos = self.latency_buffer_pos.fetch_add(1, Ordering::Relaxed);
115        let mut latencies = self.recent_latencies.write().await;
116        let buffer_size = latencies.len();
117        latencies[pos % buffer_size] = latency;
118
119        self.total_latency_us
120            .fetch_add(latency.as_micros() as u64, Ordering::Relaxed);
121        self.latency_count.fetch_add(1, Ordering::Relaxed);
122    }
123
124    /// Get current load based on metric type
125    async fn get_load(&self, metric: LoadMetric) -> f64 {
126        match metric {
127            LoadMetric::Connections => self.connections.load(Ordering::Relaxed) as f64,
128            LoadMetric::Latency => self.average_latency().await.as_micros() as f64,
129            LoadMetric::Combined => {
130                let connections = self.connections.load(Ordering::Relaxed) as f64;
131                let latency = self.average_latency().await.as_micros() as f64;
132                // Normalize latency to be on similar scale as connections
133                // (assuming avg latency ~10ms = 10000us, and avg connections ~100)
134                connections + (latency / 100.0)
135            }
136            LoadMetric::CpuUsage => self.cpu_usage.load(Ordering::Relaxed) as f64,
137            LoadMetric::RequestRate => {
138                // Calculate requests per second over the last update interval
139                let requests = self.requests.load(Ordering::Relaxed);
140                let last_update = *self.last_update.read().await;
141                let elapsed = last_update.elapsed().as_secs_f64();
142                if elapsed > 0.0 {
143                    requests as f64 / elapsed
144                } else {
145                    0.0
146                }
147            }
148        }
149    }
150}
151
152/// Power of Two Choices load balancer
153pub struct P2cBalancer {
154    /// Configuration
155    config: P2cConfig,
156    /// All upstream targets
157    targets: Vec<UpstreamTarget>,
158    /// Target health status
159    health_status: Arc<RwLock<HashMap<String, bool>>>,
160    /// Metrics per target
161    metrics: Vec<TargetMetrics>,
162    /// Random number generator (thread-safe)
163    rng: Arc<RwLock<StdRng>>,
164    /// Cumulative weights for weighted selection
165    cumulative_weights: Vec<u32>,
166}
167
168impl P2cBalancer {
169    pub fn new(targets: Vec<UpstreamTarget>, config: P2cConfig) -> Self {
170        trace!(
171            target_count = targets.len(),
172            load_metric = ?config.load_metric,
173            use_weights = config.use_weights,
174            power_of_three = config.power_of_three,
175            latency_window_secs = config.latency_window_secs,
176            "Creating P2C balancer"
177        );
178
179        let buffer_size = (config.latency_window_secs * 100) as usize; // 100 samples/sec
180        let metrics = targets
181            .iter()
182            .map(|_| TargetMetrics::new(buffer_size))
183            .collect();
184
185        // Calculate cumulative weights for weighted random selection
186        let mut cumulative_weights = Vec::with_capacity(targets.len());
187        let mut cumsum = 0u32;
188        for target in &targets {
189            cumsum += target.weight;
190            cumulative_weights.push(cumsum);
191        }
192
193        debug!(
194            target_count = targets.len(),
195            total_weight = cumsum,
196            buffer_size = buffer_size,
197            "P2C balancer initialized"
198        );
199
200        Self {
201            config,
202            targets,
203            health_status: Arc::new(RwLock::new(HashMap::new())),
204            metrics,
205            rng: Arc::new(RwLock::new(StdRng::from_entropy())),
206            cumulative_weights,
207        }
208    }
209
210    /// Select a random healthy target index
211    async fn random_healthy_target(&self) -> Option<usize> {
212        let health = self.health_status.read().await;
213        let healthy_indices: Vec<usize> = self
214            .targets
215            .iter()
216            .enumerate()
217            .filter_map(|(i, t)| {
218                let target_id = format!("{}:{}", t.address, t.port);
219                if health.get(&target_id).copied().unwrap_or(true) {
220                    Some(i)
221                } else {
222                    None
223                }
224            })
225            .collect();
226
227        trace!(
228            total_targets = self.targets.len(),
229            healthy_count = healthy_indices.len(),
230            use_weights = self.config.use_weights,
231            "Selecting random healthy target"
232        );
233
234        if healthy_indices.is_empty() {
235            warn!("No healthy targets available for P2C selection");
236            return None;
237        }
238
239        let mut rng = self.rng.write().await;
240
241        if self.config.use_weights && !self.cumulative_weights.is_empty() {
242            // Weighted random selection
243            let total_weight = self.cumulative_weights.last().copied().unwrap_or(0);
244            if total_weight > 0 {
245                let threshold = rng.gen_range(0..total_weight);
246                for &idx in &healthy_indices {
247                    if self.cumulative_weights[idx] > threshold {
248                        trace!(
249                            target_index = idx,
250                            threshold = threshold,
251                            "Selected target via weighted random"
252                        );
253                        return Some(idx);
254                    }
255                }
256            }
257        }
258
259        // Fallback to uniform random
260        let selected = healthy_indices[rng.gen_range(0..healthy_indices.len())];
261        trace!(
262            target_index = selected,
263            "Selected target via uniform random"
264        );
265        Some(selected)
266    }
267
268    /// Select the least loaded target from candidates
269    async fn select_least_loaded(&self, candidates: Vec<usize>) -> Option<usize> {
270        if candidates.is_empty() {
271            trace!("No candidates provided for least loaded selection");
272            return None;
273        }
274
275        trace!(
276            candidate_count = candidates.len(),
277            load_metric = ?self.config.load_metric,
278            "Evaluating candidates for least loaded"
279        );
280
281        let mut min_load = f64::MAX;
282        let mut best_target = candidates[0];
283
284        for &idx in &candidates {
285            let load = self.metrics[idx].get_load(self.config.load_metric).await;
286
287            trace!(target_index = idx, load = load, "Candidate load");
288
289            if load < min_load {
290                min_load = load;
291                best_target = idx;
292            }
293        }
294
295        debug!(
296            target_index = best_target,
297            load = min_load,
298            candidate_count = candidates.len(),
299            "P2C selected least loaded target"
300        );
301
302        Some(best_target)
303    }
304
305    /// Track connection acquisition
306    pub fn acquire_connection(&self, target_index: usize) {
307        let connections = self.metrics[target_index]
308            .connections
309            .fetch_add(1, Ordering::Relaxed)
310            + 1;
311        let requests = self.metrics[target_index]
312            .requests
313            .fetch_add(1, Ordering::Relaxed)
314            + 1;
315
316        trace!(
317            target_index = target_index,
318            connections = connections,
319            total_requests = requests,
320            "P2C acquired connection"
321        );
322    }
323
324    /// Track connection release
325    pub fn release_connection(&self, target_index: usize) {
326        let connections = self.metrics[target_index]
327            .connections
328            .fetch_sub(1, Ordering::Relaxed)
329            - 1;
330
331        trace!(
332            target_index = target_index,
333            connections = connections,
334            "P2C released connection"
335        );
336    }
337
338    /// Update target metrics
339    pub async fn update_metrics(
340        &self,
341        target_index: usize,
342        latency: Option<Duration>,
343        cpu_usage: Option<u8>,
344    ) {
345        trace!(
346            target_index = target_index,
347            latency_ms = latency.map(|l| l.as_millis() as u64),
348            cpu_usage = cpu_usage,
349            "Updating P2C target metrics"
350        );
351
352        if let Some(latency) = latency {
353            self.metrics[target_index].record_latency(latency).await;
354        }
355
356        if let Some(cpu) = cpu_usage {
357            self.metrics[target_index]
358                .cpu_usage
359                .store(cpu as u64, Ordering::Relaxed);
360        }
361
362        *self.metrics[target_index].last_update.write().await = Instant::now();
363    }
364}
365
366#[async_trait]
367impl LoadBalancer for P2cBalancer {
368    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
369        // Select candidates
370        let num_choices = if self.config.power_of_three { 3 } else { 2 };
371
372        trace!(
373            num_choices = num_choices,
374            power_of_three = self.config.power_of_three,
375            "P2C select started"
376        );
377
378        let mut candidates = Vec::with_capacity(num_choices);
379
380        for i in 0..num_choices {
381            if let Some(idx) = self.random_healthy_target().await {
382                if !candidates.contains(&idx) {
383                    candidates.push(idx);
384                    trace!(choice = i, target_index = idx, "Added candidate");
385                }
386            }
387        }
388
389        if candidates.is_empty() {
390            warn!("P2C: No healthy targets available");
391            return Err(SentinelError::NoHealthyUpstream);
392        }
393
394        // Select least loaded from candidates
395        let target_index = self.select_least_loaded(candidates).await.ok_or_else(|| {
396            warn!("P2C: Failed to select from candidates");
397            SentinelError::NoHealthyUpstream
398        })?;
399
400        let target = &self.targets[target_index];
401
402        // Track connection
403        self.acquire_connection(target_index);
404
405        // Get current metrics for metadata
406        let current_load = self.metrics[target_index]
407            .get_load(self.config.load_metric)
408            .await;
409        let connections = self.metrics[target_index]
410            .connections
411            .load(Ordering::Relaxed);
412        let avg_latency = self.metrics[target_index].average_latency().await;
413
414        debug!(
415            target = %format!("{}:{}", target.address, target.port),
416            target_index = target_index,
417            load = current_load,
418            connections = connections,
419            avg_latency_ms = avg_latency.as_millis() as u64,
420            "P2C selected target"
421        );
422
423        Ok(TargetSelection {
424            address: format!("{}:{}", target.address, target.port),
425            weight: target.weight,
426            metadata: {
427                let mut meta = HashMap::new();
428                meta.insert("algorithm".to_string(), "p2c".to_string());
429                meta.insert("target_index".to_string(), target_index.to_string());
430                meta.insert("current_load".to_string(), format!("{:.2}", current_load));
431                meta.insert("connections".to_string(), connections.to_string());
432                meta.insert(
433                    "avg_latency_ms".to_string(),
434                    format!("{:.2}", avg_latency.as_millis()),
435                );
436                meta.insert(
437                    "metric_type".to_string(),
438                    format!("{:?}", self.config.load_metric),
439                );
440                meta
441            },
442        })
443    }
444
445    async fn report_health(&self, address: &str, healthy: bool) {
446        trace!(
447            address = %address,
448            healthy = healthy,
449            "P2C reporting target health"
450        );
451
452        let mut health = self.health_status.write().await;
453        let previous = health.insert(address.to_string(), healthy);
454
455        if previous != Some(healthy) {
456            info!(
457                address = %address,
458                previous = ?previous,
459                healthy = healthy,
460                "P2C target health changed"
461            );
462        }
463    }
464
465    async fn healthy_targets(&self) -> Vec<String> {
466        let health = self.health_status.read().await;
467        let targets: Vec<String> = self
468            .targets
469            .iter()
470            .filter_map(|t| {
471                let target_id = format!("{}:{}", t.address, t.port);
472                if health.get(&target_id).copied().unwrap_or(true) {
473                    Some(target_id)
474                } else {
475                    None
476                }
477            })
478            .collect();
479
480        trace!(
481            total = self.targets.len(),
482            healthy = targets.len(),
483            "P2C healthy targets"
484        );
485
486        targets
487    }
488
489    async fn release(&self, selection: &TargetSelection) {
490        if let Some(index_str) = selection.metadata.get("target_index") {
491            if let Ok(index) = index_str.parse::<usize>() {
492                trace!(
493                    target_index = index,
494                    address = %selection.address,
495                    "P2C releasing connection"
496                );
497                self.release_connection(index);
498            }
499        }
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    fn create_test_targets(count: usize) -> Vec<UpstreamTarget> {
508        (0..count)
509            .map(|i| UpstreamTarget {
510                address: format!("10.0.0.{}", i + 1),
511                port: 8080,
512                weight: 100,
513            })
514            .collect()
515    }
516
517    #[tokio::test]
518    async fn test_p2c_selection() {
519        let targets = create_test_targets(5);
520        let config = P2cConfig::default();
521        let balancer = P2cBalancer::new(targets.clone(), config);
522
523        // Simulate different loads
524        balancer.metrics[0].connections.store(10, Ordering::Relaxed);
525        balancer.metrics[1].connections.store(5, Ordering::Relaxed);
526        balancer.metrics[2].connections.store(15, Ordering::Relaxed);
527        balancer.metrics[3].connections.store(3, Ordering::Relaxed);
528        balancer.metrics[4].connections.store(8, Ordering::Relaxed);
529
530        // Run selections and verify distribution
531        let mut selections = vec![0usize; 5];
532        for _ in 0..1000 {
533            if let Ok(selection) = balancer.select(None).await {
534                if let Some(idx_str) = selection.metadata.get("target_index") {
535                    if let Ok(idx) = idx_str.parse::<usize>() {
536                        selections[idx] += 1;
537
538                        // Simulate connection release
539                        balancer.release(&selection).await;
540                    }
541                }
542            }
543        }
544
545        // Verify that lower loaded targets get more selections
546        // Target 3 (load=3) should get more than target 2 (load=15)
547        assert!(selections[3] > selections[2]);
548
549        // All targets should get some traffic
550        for count in selections {
551            assert!(count > 0, "All targets should receive some traffic");
552        }
553    }
554
555    #[tokio::test]
556    async fn test_p2c_with_latency_metric() {
557        let targets = create_test_targets(3);
558        let config = P2cConfig {
559            load_metric: LoadMetric::Latency,
560            ..Default::default()
561        };
562        let balancer = P2cBalancer::new(targets.clone(), config);
563
564        // Set different latencies
565        balancer
566            .update_metrics(0, Some(Duration::from_millis(100)), None)
567            .await;
568        balancer
569            .update_metrics(1, Some(Duration::from_millis(10)), None)
570            .await;
571        balancer
572            .update_metrics(2, Some(Duration::from_millis(50)), None)
573            .await;
574
575        let selection = balancer.select(None).await.unwrap();
576        let metadata = &selection.metadata;
577
578        // Should tend to select lower latency targets
579        assert!(metadata.contains_key("avg_latency_ms"));
580    }
581
582    #[tokio::test]
583    async fn test_p2c_power_of_three() {
584        let targets = create_test_targets(10);
585        let config = P2cConfig {
586            power_of_three: true,
587            ..Default::default()
588        };
589        let balancer = P2cBalancer::new(targets.clone(), config);
590
591        // Set varied loads
592        for i in 0..10 {
593            balancer.metrics[i]
594                .connections
595                .store((i * 2) as u64, Ordering::Relaxed);
596        }
597
598        let mut low_load_selections = 0;
599        for _ in 0..100 {
600            if let Ok(selection) = balancer.select(None).await {
601                if let Some(idx_str) = selection.metadata.get("target_index") {
602                    if let Ok(idx) = idx_str.parse::<usize>() {
603                        if idx < 3 {
604                            // Low load targets
605                            low_load_selections += 1;
606                        }
607                        balancer.release(&selection).await;
608                    }
609                }
610            }
611        }
612
613        // Power of three should give even better selection of low-load targets
614        assert!(
615            low_load_selections > 60,
616            "P3C should favor low-load targets more"
617        );
618    }
619
620    #[tokio::test]
621    async fn test_weighted_selection() {
622        let mut targets = create_test_targets(3);
623        targets[0].weight = 100;
624        targets[1].weight = 200; // Double weight
625        targets[2].weight = 100;
626
627        let config = P2cConfig {
628            use_weights: true,
629            ..Default::default()
630        };
631        let balancer = P2cBalancer::new(targets.clone(), config);
632
633        // Equal loads - weight should influence selection
634        for i in 0..3 {
635            balancer.metrics[i].connections.store(5, Ordering::Relaxed);
636        }
637
638        let mut selections = [0usize; 3];
639        for _ in 0..1000 {
640            if let Some(idx) = balancer.random_healthy_target().await {
641                selections[idx] += 1;
642            }
643        }
644
645        // Target 1 should get roughly twice the traffic due to weight
646        let ratio = selections[1] as f64 / selections[0] as f64;
647        assert!(
648            ratio > 1.5 && ratio < 2.5,
649            "Weighted selection not working properly"
650        );
651    }
652}