1use async_trait::async_trait;
2use rand::rngs::StdRng;
3use rand::{Rng, SeedableRng};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::RwLock;
9
10use tracing::{debug, info, trace, warn};
11
12use super::{LoadBalancer, RequestContext, TargetSelection, UpstreamTarget};
13use sentinel_common::errors::{SentinelError, SentinelResult};
14
15#[derive(Debug, Clone, Copy)]
17pub enum LoadMetric {
18 Connections,
20 Latency,
22 Combined,
24 CpuUsage,
26 RequestRate,
28}
29
30impl Default for LoadMetric {
31 fn default() -> Self {
32 LoadMetric::Connections
33 }
34}
35
36#[derive(Debug, Clone)]
38pub struct P2cConfig {
39 pub load_metric: LoadMetric,
41 pub secondary_weight: f64,
43 pub use_weights: bool,
45 pub latency_window_secs: u64,
47 pub power_of_three: bool,
49}
50
51impl Default for P2cConfig {
52 fn default() -> Self {
53 Self {
54 load_metric: LoadMetric::Connections,
55 secondary_weight: 0.5,
56 use_weights: true,
57 latency_window_secs: 10,
58 power_of_three: false,
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
65struct TargetMetrics {
66 connections: Arc<AtomicU64>,
68 requests: Arc<AtomicU64>,
70 total_latency_us: Arc<AtomicU64>,
72 latency_count: Arc<AtomicU64>,
74 cpu_usage: Arc<AtomicU64>,
76 last_update: Arc<RwLock<Instant>>,
78 recent_latencies: Arc<RwLock<Vec<Duration>>>,
80 latency_buffer_pos: Arc<AtomicUsize>,
82}
83
84impl TargetMetrics {
85 fn new(buffer_size: usize) -> Self {
86 Self {
87 connections: Arc::new(AtomicU64::new(0)),
88 requests: Arc::new(AtomicU64::new(0)),
89 total_latency_us: Arc::new(AtomicU64::new(0)),
90 latency_count: Arc::new(AtomicU64::new(0)),
91 cpu_usage: Arc::new(AtomicU64::new(0)),
92 last_update: Arc::new(RwLock::new(Instant::now())),
93 recent_latencies: Arc::new(RwLock::new(vec![Duration::ZERO; buffer_size])),
94 latency_buffer_pos: Arc::new(AtomicUsize::new(0)),
95 }
96 }
97
98 async fn average_latency(&self) -> Duration {
100 let latencies = self.recent_latencies.read().await;
101 let count = self.latency_count.load(Ordering::Relaxed);
102
103 if count == 0 {
104 return Duration::ZERO;
105 }
106
107 let total: Duration = latencies.iter().sum();
108 let sample_count = count.min(latencies.len() as u64);
109
110 if sample_count > 0 {
111 total / sample_count as u32
112 } else {
113 Duration::ZERO
114 }
115 }
116
117 async fn record_latency(&self, latency: Duration) {
119 let pos = self.latency_buffer_pos.fetch_add(1, Ordering::Relaxed);
120 let mut latencies = self.recent_latencies.write().await;
121 let buffer_size = latencies.len();
122 latencies[pos % buffer_size] = latency;
123
124 self.total_latency_us
125 .fetch_add(latency.as_micros() as u64, Ordering::Relaxed);
126 self.latency_count.fetch_add(1, Ordering::Relaxed);
127 }
128
129 async fn get_load(&self, metric: LoadMetric) -> f64 {
131 match metric {
132 LoadMetric::Connections => self.connections.load(Ordering::Relaxed) as f64,
133 LoadMetric::Latency => self.average_latency().await.as_micros() as f64,
134 LoadMetric::Combined => {
135 let connections = self.connections.load(Ordering::Relaxed) as f64;
136 let latency = self.average_latency().await.as_micros() as f64;
137 connections + (latency / 100.0)
140 }
141 LoadMetric::CpuUsage => self.cpu_usage.load(Ordering::Relaxed) as f64,
142 LoadMetric::RequestRate => {
143 let requests = self.requests.load(Ordering::Relaxed);
145 let last_update = *self.last_update.read().await;
146 let elapsed = last_update.elapsed().as_secs_f64();
147 if elapsed > 0.0 {
148 requests as f64 / elapsed
149 } else {
150 0.0
151 }
152 }
153 }
154 }
155}
156
157pub struct P2cBalancer {
159 config: P2cConfig,
161 targets: Vec<UpstreamTarget>,
163 health_status: Arc<RwLock<HashMap<String, bool>>>,
165 metrics: Vec<TargetMetrics>,
167 rng: Arc<RwLock<StdRng>>,
169 cumulative_weights: Vec<u32>,
171}
172
173impl P2cBalancer {
174 pub fn new(targets: Vec<UpstreamTarget>, config: P2cConfig) -> Self {
175 trace!(
176 target_count = targets.len(),
177 load_metric = ?config.load_metric,
178 use_weights = config.use_weights,
179 power_of_three = config.power_of_three,
180 latency_window_secs = config.latency_window_secs,
181 "Creating P2C balancer"
182 );
183
184 let buffer_size = (config.latency_window_secs * 100) as usize; let metrics = targets
186 .iter()
187 .map(|_| TargetMetrics::new(buffer_size))
188 .collect();
189
190 let mut cumulative_weights = Vec::with_capacity(targets.len());
192 let mut cumsum = 0u32;
193 for target in &targets {
194 cumsum += target.weight;
195 cumulative_weights.push(cumsum);
196 }
197
198 debug!(
199 target_count = targets.len(),
200 total_weight = cumsum,
201 buffer_size = buffer_size,
202 "P2C balancer initialized"
203 );
204
205 Self {
206 config,
207 targets,
208 health_status: Arc::new(RwLock::new(HashMap::new())),
209 metrics,
210 rng: Arc::new(RwLock::new(StdRng::from_entropy())),
211 cumulative_weights,
212 }
213 }
214
215 async fn random_healthy_target(&self) -> Option<usize> {
217 let health = self.health_status.read().await;
218 let healthy_indices: Vec<usize> = self
219 .targets
220 .iter()
221 .enumerate()
222 .filter_map(|(i, t)| {
223 let target_id = format!("{}:{}", t.address, t.port);
224 if health.get(&target_id).copied().unwrap_or(true) {
225 Some(i)
226 } else {
227 None
228 }
229 })
230 .collect();
231
232 trace!(
233 total_targets = self.targets.len(),
234 healthy_count = healthy_indices.len(),
235 use_weights = self.config.use_weights,
236 "Selecting random healthy target"
237 );
238
239 if healthy_indices.is_empty() {
240 warn!("No healthy targets available for P2C selection");
241 return None;
242 }
243
244 let mut rng = self.rng.write().await;
245
246 if self.config.use_weights && !self.cumulative_weights.is_empty() {
247 let total_weight = self.cumulative_weights.last().copied().unwrap_or(0);
249 if total_weight > 0 {
250 let threshold = rng.gen_range(0..total_weight);
251 for &idx in &healthy_indices {
252 if self.cumulative_weights[idx] > threshold {
253 trace!(
254 target_index = idx,
255 threshold = threshold,
256 "Selected target via weighted random"
257 );
258 return Some(idx);
259 }
260 }
261 }
262 }
263
264 let selected = healthy_indices[rng.gen_range(0..healthy_indices.len())];
266 trace!(
267 target_index = selected,
268 "Selected target via uniform random"
269 );
270 Some(selected)
271 }
272
273 async fn select_least_loaded(&self, candidates: Vec<usize>) -> Option<usize> {
275 if candidates.is_empty() {
276 trace!("No candidates provided for least loaded selection");
277 return None;
278 }
279
280 trace!(
281 candidate_count = candidates.len(),
282 load_metric = ?self.config.load_metric,
283 "Evaluating candidates for least loaded"
284 );
285
286 let mut min_load = f64::MAX;
287 let mut best_target = candidates[0];
288
289 for &idx in &candidates {
290 let load = self.metrics[idx].get_load(self.config.load_metric).await;
291
292 trace!(
293 target_index = idx,
294 load = load,
295 "Candidate load"
296 );
297
298 if load < min_load {
299 min_load = load;
300 best_target = idx;
301 }
302 }
303
304 debug!(
305 target_index = best_target,
306 load = min_load,
307 candidate_count = candidates.len(),
308 "P2C selected least loaded target"
309 );
310
311 Some(best_target)
312 }
313
314 pub fn acquire_connection(&self, target_index: usize) {
316 let connections = self.metrics[target_index]
317 .connections
318 .fetch_add(1, Ordering::Relaxed) + 1;
319 let requests = self.metrics[target_index]
320 .requests
321 .fetch_add(1, Ordering::Relaxed) + 1;
322
323 trace!(
324 target_index = target_index,
325 connections = connections,
326 total_requests = requests,
327 "P2C acquired connection"
328 );
329 }
330
331 pub fn release_connection(&self, target_index: usize) {
333 let connections = self.metrics[target_index]
334 .connections
335 .fetch_sub(1, Ordering::Relaxed) - 1;
336
337 trace!(
338 target_index = target_index,
339 connections = connections,
340 "P2C released connection"
341 );
342 }
343
344 pub async fn update_metrics(
346 &self,
347 target_index: usize,
348 latency: Option<Duration>,
349 cpu_usage: Option<u8>,
350 ) {
351 trace!(
352 target_index = target_index,
353 latency_ms = latency.map(|l| l.as_millis() as u64),
354 cpu_usage = cpu_usage,
355 "Updating P2C target metrics"
356 );
357
358 if let Some(latency) = latency {
359 self.metrics[target_index].record_latency(latency).await;
360 }
361
362 if let Some(cpu) = cpu_usage {
363 self.metrics[target_index]
364 .cpu_usage
365 .store(cpu as u64, Ordering::Relaxed);
366 }
367
368 *self.metrics[target_index].last_update.write().await = Instant::now();
369 }
370}
371
372#[async_trait]
373impl LoadBalancer for P2cBalancer {
374 async fn select(
375 &self,
376 _context: Option<&RequestContext>,
377 ) -> SentinelResult<TargetSelection> {
378 let num_choices = if self.config.power_of_three { 3 } else { 2 };
380
381 trace!(
382 num_choices = num_choices,
383 power_of_three = self.config.power_of_three,
384 "P2C select started"
385 );
386
387 let mut candidates = Vec::with_capacity(num_choices);
388
389 for i in 0..num_choices {
390 if let Some(idx) = self.random_healthy_target().await {
391 if !candidates.contains(&idx) {
392 candidates.push(idx);
393 trace!(
394 choice = i,
395 target_index = idx,
396 "Added candidate"
397 );
398 }
399 }
400 }
401
402 if candidates.is_empty() {
403 warn!("P2C: No healthy targets available");
404 return Err(SentinelError::NoHealthyUpstream);
405 }
406
407 let target_index = self
409 .select_least_loaded(candidates)
410 .await
411 .ok_or_else(|| {
412 warn!("P2C: Failed to select from candidates");
413 SentinelError::NoHealthyUpstream
414 })?;
415
416 let target = &self.targets[target_index];
417
418 self.acquire_connection(target_index);
420
421 let current_load = self.metrics[target_index]
423 .get_load(self.config.load_metric)
424 .await;
425 let connections = self.metrics[target_index]
426 .connections
427 .load(Ordering::Relaxed);
428 let avg_latency = self.metrics[target_index].average_latency().await;
429
430 debug!(
431 target = %format!("{}:{}", target.address, target.port),
432 target_index = target_index,
433 load = current_load,
434 connections = connections,
435 avg_latency_ms = avg_latency.as_millis() as u64,
436 "P2C selected target"
437 );
438
439 Ok(TargetSelection {
440 address: format!("{}:{}", target.address, target.port),
441 weight: target.weight,
442 metadata: {
443 let mut meta = HashMap::new();
444 meta.insert("algorithm".to_string(), "p2c".to_string());
445 meta.insert("target_index".to_string(), target_index.to_string());
446 meta.insert("current_load".to_string(), format!("{:.2}", current_load));
447 meta.insert("connections".to_string(), connections.to_string());
448 meta.insert(
449 "avg_latency_ms".to_string(),
450 format!("{:.2}", avg_latency.as_millis()),
451 );
452 meta.insert(
453 "metric_type".to_string(),
454 format!("{:?}", self.config.load_metric),
455 );
456 meta
457 },
458 })
459 }
460
461 async fn report_health(&self, address: &str, healthy: bool) {
462 trace!(
463 address = %address,
464 healthy = healthy,
465 "P2C reporting target health"
466 );
467
468 let mut health = self.health_status.write().await;
469 let previous = health.insert(address.to_string(), healthy);
470
471 if previous != Some(healthy) {
472 info!(
473 address = %address,
474 previous = ?previous,
475 healthy = healthy,
476 "P2C target health changed"
477 );
478 }
479 }
480
481 async fn healthy_targets(&self) -> Vec<String> {
482 let health = self.health_status.read().await;
483 let targets: Vec<String> = self.targets
484 .iter()
485 .filter_map(|t| {
486 let target_id = format!("{}:{}", t.address, t.port);
487 if health.get(&target_id).copied().unwrap_or(true) {
488 Some(target_id)
489 } else {
490 None
491 }
492 })
493 .collect();
494
495 trace!(
496 total = self.targets.len(),
497 healthy = targets.len(),
498 "P2C healthy targets"
499 );
500
501 targets
502 }
503
504 async fn release(&self, selection: &TargetSelection) {
505 if let Some(index_str) = selection.metadata.get("target_index") {
506 if let Ok(index) = index_str.parse::<usize>() {
507 trace!(
508 target_index = index,
509 address = %selection.address,
510 "P2C releasing connection"
511 );
512 self.release_connection(index);
513 }
514 }
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 fn create_test_targets(count: usize) -> Vec<UpstreamTarget> {
523 (0..count)
524 .map(|i| UpstreamTarget {
525 address: format!("10.0.0.{}", i + 1),
526 port: 8080,
527 weight: 100,
528 })
529 .collect()
530 }
531
532 #[tokio::test]
533 async fn test_p2c_selection() {
534 let targets = create_test_targets(5);
535 let config = P2cConfig::default();
536 let balancer = P2cBalancer::new(targets.clone(), config);
537
538 balancer.metrics[0].connections.store(10, Ordering::Relaxed);
540 balancer.metrics[1].connections.store(5, Ordering::Relaxed);
541 balancer.metrics[2].connections.store(15, Ordering::Relaxed);
542 balancer.metrics[3].connections.store(3, Ordering::Relaxed);
543 balancer.metrics[4].connections.store(8, Ordering::Relaxed);
544
545 let mut selections = vec![0usize; 5];
547 for _ in 0..1000 {
548 if let Ok(selection) = balancer.select(None).await {
549 if let Some(idx_str) = selection.metadata.get("target_index") {
550 if let Ok(idx) = idx_str.parse::<usize>() {
551 selections[idx] += 1;
552
553 balancer.release(&selection).await;
555 }
556 }
557 }
558 }
559
560 assert!(selections[3] > selections[2]);
563
564 for count in selections {
566 assert!(count > 0, "All targets should receive some traffic");
567 }
568 }
569
570 #[tokio::test]
571 async fn test_p2c_with_latency_metric() {
572 let targets = create_test_targets(3);
573 let config = P2cConfig {
574 load_metric: LoadMetric::Latency,
575 ..Default::default()
576 };
577 let balancer = P2cBalancer::new(targets.clone(), config);
578
579 balancer
581 .update_metrics(0, Some(Duration::from_millis(100)), None)
582 .await;
583 balancer
584 .update_metrics(1, Some(Duration::from_millis(10)), None)
585 .await;
586 balancer
587 .update_metrics(2, Some(Duration::from_millis(50)), None)
588 .await;
589
590 let selection = balancer.select(None).await.unwrap();
591 let metadata = &selection.metadata;
592
593 assert!(metadata.contains_key("avg_latency_ms"));
595 }
596
597 #[tokio::test]
598 async fn test_p2c_power_of_three() {
599 let targets = create_test_targets(10);
600 let config = P2cConfig {
601 power_of_three: true,
602 ..Default::default()
603 };
604 let balancer = P2cBalancer::new(targets.clone(), config);
605
606 for i in 0..10 {
608 balancer.metrics[i]
609 .connections
610 .store((i * 2) as u64, Ordering::Relaxed);
611 }
612
613 let mut low_load_selections = 0;
614 for _ in 0..100 {
615 if let Ok(selection) = balancer.select(None).await {
616 if let Some(idx_str) = selection.metadata.get("target_index") {
617 if let Ok(idx) = idx_str.parse::<usize>() {
618 if idx < 3 {
619 low_load_selections += 1;
621 }
622 balancer.release(&selection).await;
623 }
624 }
625 }
626 }
627
628 assert!(
630 low_load_selections > 60,
631 "P3C should favor low-load targets more"
632 );
633 }
634
635 #[tokio::test]
636 async fn test_weighted_selection() {
637 let mut targets = create_test_targets(3);
638 targets[0].weight = 100;
639 targets[1].weight = 200; targets[2].weight = 100;
641
642 let config = P2cConfig {
643 use_weights: true,
644 ..Default::default()
645 };
646 let balancer = P2cBalancer::new(targets.clone(), config);
647
648 for i in 0..3 {
650 balancer.metrics[i].connections.store(5, Ordering::Relaxed);
651 }
652
653 let mut selections = vec![0usize; 3];
654 for _ in 0..1000 {
655 if let Some(idx) = balancer.random_healthy_target().await {
656 selections[idx] += 1;
657 }
658 }
659
660 let ratio = selections[1] as f64 / selections[0] as f64;
662 assert!(
663 ratio > 1.5 && ratio < 2.5,
664 "Weighted selection not working properly"
665 );
666 }
667}