1use async_trait::async_trait;
2use rand::rngs::StdRng;
3use rand::{Rng, SeedableRng};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::RwLock;
9
10use tracing::{debug, info, trace, warn};
11
12use super::{LoadBalancer, RequestContext, TargetSelection, UpstreamTarget};
13use sentinel_common::errors::{SentinelError, SentinelResult};
14
15#[derive(Debug, Clone, Copy, Default)]
17pub enum LoadMetric {
18 #[default]
20 Connections,
21 Latency,
23 Combined,
25 CpuUsage,
27 RequestRate,
29}
30
31#[derive(Debug, Clone)]
33pub struct P2cConfig {
34 pub load_metric: LoadMetric,
36 pub secondary_weight: f64,
38 pub use_weights: bool,
40 pub latency_window_secs: u64,
42 pub power_of_three: bool,
44}
45
46impl Default for P2cConfig {
47 fn default() -> Self {
48 Self {
49 load_metric: LoadMetric::Connections,
50 secondary_weight: 0.5,
51 use_weights: true,
52 latency_window_secs: 10,
53 power_of_three: false,
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
60struct TargetMetrics {
61 connections: Arc<AtomicU64>,
63 requests: Arc<AtomicU64>,
65 total_latency_us: Arc<AtomicU64>,
67 latency_count: Arc<AtomicU64>,
69 cpu_usage: Arc<AtomicU64>,
71 last_update: Arc<RwLock<Instant>>,
73 recent_latencies: Arc<RwLock<Vec<Duration>>>,
75 latency_buffer_pos: Arc<AtomicUsize>,
77}
78
79impl TargetMetrics {
80 fn new(buffer_size: usize) -> Self {
81 Self {
82 connections: Arc::new(AtomicU64::new(0)),
83 requests: Arc::new(AtomicU64::new(0)),
84 total_latency_us: Arc::new(AtomicU64::new(0)),
85 latency_count: Arc::new(AtomicU64::new(0)),
86 cpu_usage: Arc::new(AtomicU64::new(0)),
87 last_update: Arc::new(RwLock::new(Instant::now())),
88 recent_latencies: Arc::new(RwLock::new(vec![Duration::ZERO; buffer_size])),
89 latency_buffer_pos: Arc::new(AtomicUsize::new(0)),
90 }
91 }
92
93 async fn average_latency(&self) -> Duration {
95 let latencies = self.recent_latencies.read().await;
96 let count = self.latency_count.load(Ordering::Relaxed);
97
98 if count == 0 {
99 return Duration::ZERO;
100 }
101
102 let total: Duration = latencies.iter().sum();
103 let sample_count = count.min(latencies.len() as u64);
104
105 if sample_count > 0 {
106 total / sample_count as u32
107 } else {
108 Duration::ZERO
109 }
110 }
111
112 async fn record_latency(&self, latency: Duration) {
114 let pos = self.latency_buffer_pos.fetch_add(1, Ordering::Relaxed);
115 let mut latencies = self.recent_latencies.write().await;
116 let buffer_size = latencies.len();
117 latencies[pos % buffer_size] = latency;
118
119 self.total_latency_us
120 .fetch_add(latency.as_micros() as u64, Ordering::Relaxed);
121 self.latency_count.fetch_add(1, Ordering::Relaxed);
122 }
123
124 async fn get_load(&self, metric: LoadMetric) -> f64 {
126 match metric {
127 LoadMetric::Connections => self.connections.load(Ordering::Relaxed) as f64,
128 LoadMetric::Latency => self.average_latency().await.as_micros() as f64,
129 LoadMetric::Combined => {
130 let connections = self.connections.load(Ordering::Relaxed) as f64;
131 let latency = self.average_latency().await.as_micros() as f64;
132 connections + (latency / 100.0)
135 }
136 LoadMetric::CpuUsage => self.cpu_usage.load(Ordering::Relaxed) as f64,
137 LoadMetric::RequestRate => {
138 let requests = self.requests.load(Ordering::Relaxed);
140 let last_update = *self.last_update.read().await;
141 let elapsed = last_update.elapsed().as_secs_f64();
142 if elapsed > 0.0 {
143 requests as f64 / elapsed
144 } else {
145 0.0
146 }
147 }
148 }
149 }
150}
151
152pub struct P2cBalancer {
154 config: P2cConfig,
156 targets: Vec<UpstreamTarget>,
158 health_status: Arc<RwLock<HashMap<String, bool>>>,
160 metrics: Vec<TargetMetrics>,
162 rng: Arc<RwLock<StdRng>>,
164 cumulative_weights: Vec<u32>,
166}
167
168impl P2cBalancer {
169 pub fn new(targets: Vec<UpstreamTarget>, config: P2cConfig) -> Self {
170 trace!(
171 target_count = targets.len(),
172 load_metric = ?config.load_metric,
173 use_weights = config.use_weights,
174 power_of_three = config.power_of_three,
175 latency_window_secs = config.latency_window_secs,
176 "Creating P2C balancer"
177 );
178
179 let buffer_size = (config.latency_window_secs * 100) as usize; let metrics = targets
181 .iter()
182 .map(|_| TargetMetrics::new(buffer_size))
183 .collect();
184
185 let mut cumulative_weights = Vec::with_capacity(targets.len());
187 let mut cumsum = 0u32;
188 for target in &targets {
189 cumsum += target.weight;
190 cumulative_weights.push(cumsum);
191 }
192
193 debug!(
194 target_count = targets.len(),
195 total_weight = cumsum,
196 buffer_size = buffer_size,
197 "P2C balancer initialized"
198 );
199
200 Self {
201 config,
202 targets,
203 health_status: Arc::new(RwLock::new(HashMap::new())),
204 metrics,
205 rng: Arc::new(RwLock::new(StdRng::from_entropy())),
206 cumulative_weights,
207 }
208 }
209
210 async fn random_healthy_target(&self) -> Option<usize> {
212 let health = self.health_status.read().await;
213 let healthy_indices: Vec<usize> = self
214 .targets
215 .iter()
216 .enumerate()
217 .filter_map(|(i, t)| {
218 let target_id = format!("{}:{}", t.address, t.port);
219 if health.get(&target_id).copied().unwrap_or(true) {
220 Some(i)
221 } else {
222 None
223 }
224 })
225 .collect();
226
227 trace!(
228 total_targets = self.targets.len(),
229 healthy_count = healthy_indices.len(),
230 use_weights = self.config.use_weights,
231 "Selecting random healthy target"
232 );
233
234 if healthy_indices.is_empty() {
235 warn!("No healthy targets available for P2C selection");
236 return None;
237 }
238
239 let mut rng = self.rng.write().await;
240
241 if self.config.use_weights && !self.cumulative_weights.is_empty() {
242 let total_weight = self.cumulative_weights.last().copied().unwrap_or(0);
244 if total_weight > 0 {
245 let threshold = rng.gen_range(0..total_weight);
246 for &idx in &healthy_indices {
247 if self.cumulative_weights[idx] > threshold {
248 trace!(
249 target_index = idx,
250 threshold = threshold,
251 "Selected target via weighted random"
252 );
253 return Some(idx);
254 }
255 }
256 }
257 }
258
259 let selected = healthy_indices[rng.gen_range(0..healthy_indices.len())];
261 trace!(
262 target_index = selected,
263 "Selected target via uniform random"
264 );
265 Some(selected)
266 }
267
268 async fn select_least_loaded(&self, candidates: Vec<usize>) -> Option<usize> {
270 if candidates.is_empty() {
271 trace!("No candidates provided for least loaded selection");
272 return None;
273 }
274
275 trace!(
276 candidate_count = candidates.len(),
277 load_metric = ?self.config.load_metric,
278 "Evaluating candidates for least loaded"
279 );
280
281 let mut min_load = f64::MAX;
282 let mut best_target = candidates[0];
283
284 for &idx in &candidates {
285 let load = self.metrics[idx].get_load(self.config.load_metric).await;
286
287 trace!(target_index = idx, load = load, "Candidate load");
288
289 if load < min_load {
290 min_load = load;
291 best_target = idx;
292 }
293 }
294
295 debug!(
296 target_index = best_target,
297 load = min_load,
298 candidate_count = candidates.len(),
299 "P2C selected least loaded target"
300 );
301
302 Some(best_target)
303 }
304
305 pub fn acquire_connection(&self, target_index: usize) {
307 let connections = self.metrics[target_index]
308 .connections
309 .fetch_add(1, Ordering::Relaxed)
310 + 1;
311 let requests = self.metrics[target_index]
312 .requests
313 .fetch_add(1, Ordering::Relaxed)
314 + 1;
315
316 trace!(
317 target_index = target_index,
318 connections = connections,
319 total_requests = requests,
320 "P2C acquired connection"
321 );
322 }
323
324 pub fn release_connection(&self, target_index: usize) {
326 let connections = self.metrics[target_index]
327 .connections
328 .fetch_sub(1, Ordering::Relaxed)
329 - 1;
330
331 trace!(
332 target_index = target_index,
333 connections = connections,
334 "P2C released connection"
335 );
336 }
337
338 pub async fn update_metrics(
340 &self,
341 target_index: usize,
342 latency: Option<Duration>,
343 cpu_usage: Option<u8>,
344 ) {
345 trace!(
346 target_index = target_index,
347 latency_ms = latency.map(|l| l.as_millis() as u64),
348 cpu_usage = cpu_usage,
349 "Updating P2C target metrics"
350 );
351
352 if let Some(latency) = latency {
353 self.metrics[target_index].record_latency(latency).await;
354 }
355
356 if let Some(cpu) = cpu_usage {
357 self.metrics[target_index]
358 .cpu_usage
359 .store(cpu as u64, Ordering::Relaxed);
360 }
361
362 *self.metrics[target_index].last_update.write().await = Instant::now();
363 }
364}
365
366#[async_trait]
367impl LoadBalancer for P2cBalancer {
368 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
369 let num_choices = if self.config.power_of_three { 3 } else { 2 };
371
372 trace!(
373 num_choices = num_choices,
374 power_of_three = self.config.power_of_three,
375 "P2C select started"
376 );
377
378 let mut candidates = Vec::with_capacity(num_choices);
379
380 for i in 0..num_choices {
381 if let Some(idx) = self.random_healthy_target().await {
382 if !candidates.contains(&idx) {
383 candidates.push(idx);
384 trace!(choice = i, target_index = idx, "Added candidate");
385 }
386 }
387 }
388
389 if candidates.is_empty() {
390 warn!("P2C: No healthy targets available");
391 return Err(SentinelError::NoHealthyUpstream);
392 }
393
394 let target_index = self.select_least_loaded(candidates).await.ok_or_else(|| {
396 warn!("P2C: Failed to select from candidates");
397 SentinelError::NoHealthyUpstream
398 })?;
399
400 let target = &self.targets[target_index];
401
402 self.acquire_connection(target_index);
404
405 let current_load = self.metrics[target_index]
407 .get_load(self.config.load_metric)
408 .await;
409 let connections = self.metrics[target_index]
410 .connections
411 .load(Ordering::Relaxed);
412 let avg_latency = self.metrics[target_index].average_latency().await;
413
414 debug!(
415 target = %format!("{}:{}", target.address, target.port),
416 target_index = target_index,
417 load = current_load,
418 connections = connections,
419 avg_latency_ms = avg_latency.as_millis() as u64,
420 "P2C selected target"
421 );
422
423 Ok(TargetSelection {
424 address: format!("{}:{}", target.address, target.port),
425 weight: target.weight,
426 metadata: {
427 let mut meta = HashMap::new();
428 meta.insert("algorithm".to_string(), "p2c".to_string());
429 meta.insert("target_index".to_string(), target_index.to_string());
430 meta.insert("current_load".to_string(), format!("{:.2}", current_load));
431 meta.insert("connections".to_string(), connections.to_string());
432 meta.insert(
433 "avg_latency_ms".to_string(),
434 format!("{:.2}", avg_latency.as_millis()),
435 );
436 meta.insert(
437 "metric_type".to_string(),
438 format!("{:?}", self.config.load_metric),
439 );
440 meta
441 },
442 })
443 }
444
445 async fn report_health(&self, address: &str, healthy: bool) {
446 trace!(
447 address = %address,
448 healthy = healthy,
449 "P2C reporting target health"
450 );
451
452 let mut health = self.health_status.write().await;
453 let previous = health.insert(address.to_string(), healthy);
454
455 if previous != Some(healthy) {
456 info!(
457 address = %address,
458 previous = ?previous,
459 healthy = healthy,
460 "P2C target health changed"
461 );
462 }
463 }
464
465 async fn healthy_targets(&self) -> Vec<String> {
466 let health = self.health_status.read().await;
467 let targets: Vec<String> = self
468 .targets
469 .iter()
470 .filter_map(|t| {
471 let target_id = format!("{}:{}", t.address, t.port);
472 if health.get(&target_id).copied().unwrap_or(true) {
473 Some(target_id)
474 } else {
475 None
476 }
477 })
478 .collect();
479
480 trace!(
481 total = self.targets.len(),
482 healthy = targets.len(),
483 "P2C healthy targets"
484 );
485
486 targets
487 }
488
489 async fn release(&self, selection: &TargetSelection) {
490 if let Some(index_str) = selection.metadata.get("target_index") {
491 if let Ok(index) = index_str.parse::<usize>() {
492 trace!(
493 target_index = index,
494 address = %selection.address,
495 "P2C releasing connection"
496 );
497 self.release_connection(index);
498 }
499 }
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 fn create_test_targets(count: usize) -> Vec<UpstreamTarget> {
508 (0..count)
509 .map(|i| UpstreamTarget {
510 address: format!("10.0.0.{}", i + 1),
511 port: 8080,
512 weight: 100,
513 })
514 .collect()
515 }
516
517 #[tokio::test]
518 async fn test_p2c_selection() {
519 let targets = create_test_targets(5);
520 let config = P2cConfig::default();
521 let balancer = P2cBalancer::new(targets.clone(), config);
522
523 balancer.metrics[0].connections.store(10, Ordering::Relaxed);
525 balancer.metrics[1].connections.store(5, Ordering::Relaxed);
526 balancer.metrics[2].connections.store(15, Ordering::Relaxed);
527 balancer.metrics[3].connections.store(3, Ordering::Relaxed);
528 balancer.metrics[4].connections.store(8, Ordering::Relaxed);
529
530 let mut selections = vec![0usize; 5];
532 for _ in 0..1000 {
533 if let Ok(selection) = balancer.select(None).await {
534 if let Some(idx_str) = selection.metadata.get("target_index") {
535 if let Ok(idx) = idx_str.parse::<usize>() {
536 selections[idx] += 1;
537
538 balancer.release(&selection).await;
540 }
541 }
542 }
543 }
544
545 assert!(selections[3] > selections[2]);
548
549 for count in selections {
551 assert!(count > 0, "All targets should receive some traffic");
552 }
553 }
554
555 #[tokio::test]
556 async fn test_p2c_with_latency_metric() {
557 let targets = create_test_targets(3);
558 let config = P2cConfig {
559 load_metric: LoadMetric::Latency,
560 ..Default::default()
561 };
562 let balancer = P2cBalancer::new(targets.clone(), config);
563
564 balancer
566 .update_metrics(0, Some(Duration::from_millis(100)), None)
567 .await;
568 balancer
569 .update_metrics(1, Some(Duration::from_millis(10)), None)
570 .await;
571 balancer
572 .update_metrics(2, Some(Duration::from_millis(50)), None)
573 .await;
574
575 let selection = balancer.select(None).await.unwrap();
576 let metadata = &selection.metadata;
577
578 assert!(metadata.contains_key("avg_latency_ms"));
580 }
581
582 #[tokio::test]
583 async fn test_p2c_power_of_three() {
584 let targets = create_test_targets(10);
585 let config = P2cConfig {
586 power_of_three: true,
587 ..Default::default()
588 };
589 let balancer = P2cBalancer::new(targets.clone(), config);
590
591 for i in 0..10 {
593 balancer.metrics[i]
594 .connections
595 .store((i * 2) as u64, Ordering::Relaxed);
596 }
597
598 let mut low_load_selections = 0;
599 for _ in 0..100 {
600 if let Ok(selection) = balancer.select(None).await {
601 if let Some(idx_str) = selection.metadata.get("target_index") {
602 if let Ok(idx) = idx_str.parse::<usize>() {
603 if idx < 3 {
604 low_load_selections += 1;
606 }
607 balancer.release(&selection).await;
608 }
609 }
610 }
611 }
612
613 assert!(
615 low_load_selections > 60,
616 "P3C should favor low-load targets more"
617 );
618 }
619
620 #[tokio::test]
621 async fn test_weighted_selection() {
622 let mut targets = create_test_targets(3);
623 targets[0].weight = 100;
624 targets[1].weight = 200; targets[2].weight = 100;
626
627 let config = P2cConfig {
628 use_weights: true,
629 ..Default::default()
630 };
631 let balancer = P2cBalancer::new(targets.clone(), config);
632
633 for i in 0..3 {
635 balancer.metrics[i].connections.store(5, Ordering::Relaxed);
636 }
637
638 let mut selections = [0usize; 3];
639 for _ in 0..1000 {
640 if let Some(idx) = balancer.random_healthy_target().await {
641 selections[idx] += 1;
642 }
643 }
644
645 let ratio = selections[1] as f64 / selections[0] as f64;
647 assert!(
648 ratio > 1.5 && ratio < 2.5,
649 "Weighted selection not working properly"
650 );
651 }
652}