sentinel_proxy/upstream/
mod.rs

1//! Upstream pool management module for Sentinel proxy
2//!
3//! This module handles upstream server pools, load balancing, health checking,
4//! connection pooling, and retry logic with circuit breakers.
5
6use async_trait::async_trait;
7use pingora::upstreams::peer::HttpPeer;
8use std::collections::HashMap;
9use std::net::ToSocketAddrs;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::RwLock;
14use tracing::{debug, error, info, trace, warn};
15
16use sentinel_common::{
17    errors::{SentinelError, SentinelResult},
18    types::{CircuitBreakerConfig, LoadBalancingAlgorithm},
19    CircuitBreaker, UpstreamId,
20};
21use sentinel_config::UpstreamConfig;
22
23// ============================================================================
24// Internal Upstream Target Type
25// ============================================================================
26
27/// Internal upstream target representation for load balancers
28///
29/// This is a simplified representation used internally by load balancers,
30/// separate from the user-facing config UpstreamTarget.
31#[derive(Debug, Clone)]
32pub struct UpstreamTarget {
33    /// Target IP address or hostname
34    pub address: String,
35    /// Target port
36    pub port: u16,
37    /// Weight for weighted load balancing
38    pub weight: u32,
39}
40
41impl UpstreamTarget {
42    /// Create a new upstream target
43    pub fn new(address: impl Into<String>, port: u16, weight: u32) -> Self {
44        Self {
45            address: address.into(),
46            port,
47            weight,
48        }
49    }
50
51    /// Create from a "host:port" string with default weight
52    pub fn from_address(addr: &str) -> Option<Self> {
53        let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
54        if parts.len() == 2 {
55            let port = parts[0].parse().ok()?;
56            let address = parts[1].to_string();
57            Some(Self {
58                address,
59                port,
60                weight: 100,
61            })
62        } else {
63            None
64        }
65    }
66
67    /// Convert from config UpstreamTarget
68    pub fn from_config(config: &sentinel_config::UpstreamTarget) -> Option<Self> {
69        Self::from_address(&config.address).map(|mut t| {
70            t.weight = config.weight;
71            t
72        })
73    }
74
75    /// Get the full address string
76    pub fn full_address(&self) -> String {
77        format!("{}:{}", self.address, self.port)
78    }
79}
80
81// ============================================================================
82// Load Balancing
83// ============================================================================
84
85// Load balancing algorithm implementations
86pub mod adaptive;
87pub mod consistent_hash;
88pub mod health;
89pub mod p2c;
90
91// Re-export commonly used types from sub-modules
92pub use adaptive::{AdaptiveBalancer, AdaptiveConfig};
93pub use consistent_hash::{ConsistentHashBalancer, ConsistentHashConfig};
94pub use health::{ActiveHealthChecker, HealthCheckRunner};
95pub use p2c::{P2cBalancer, P2cConfig};
96
97/// Request context for load balancer decisions
98#[derive(Debug, Clone)]
99pub struct RequestContext {
100    pub client_ip: Option<std::net::SocketAddr>,
101    pub headers: HashMap<String, String>,
102    pub path: String,
103    pub method: String,
104}
105
106/// Load balancer trait for different algorithms
107#[async_trait]
108pub trait LoadBalancer: Send + Sync {
109    /// Select next upstream target
110    async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection>;
111
112    /// Report target health status
113    async fn report_health(&self, address: &str, healthy: bool);
114
115    /// Get all healthy targets
116    async fn healthy_targets(&self) -> Vec<String>;
117
118    /// Release connection (for connection tracking)
119    async fn release(&self, _selection: &TargetSelection) {
120        // Default implementation - no-op
121    }
122
123    /// Report request result (for adaptive algorithms)
124    async fn report_result(
125        &self,
126        _selection: &TargetSelection,
127        _success: bool,
128        _latency: Option<Duration>,
129    ) {
130        // Default implementation - no-op
131    }
132
133    /// Report request result by address with latency (for adaptive algorithms)
134    ///
135    /// This method allows reporting results without needing the full TargetSelection,
136    /// which is useful when the selection is not available (e.g., in logging callback).
137    /// The default implementation just calls report_health; adaptive balancers override
138    /// this to update their metrics.
139    async fn report_result_with_latency(
140        &self,
141        address: &str,
142        success: bool,
143        _latency: Option<Duration>,
144    ) {
145        // Default implementation - just report health
146        self.report_health(address, success).await;
147    }
148}
149
150/// Selected upstream target
151#[derive(Debug, Clone)]
152pub struct TargetSelection {
153    /// Target address
154    pub address: String,
155    /// Target weight
156    pub weight: u32,
157    /// Target metadata
158    pub metadata: HashMap<String, String>,
159}
160
161/// Upstream pool managing multiple backend servers
162pub struct UpstreamPool {
163    /// Pool identifier
164    id: UpstreamId,
165    /// Configured targets
166    targets: Vec<UpstreamTarget>,
167    /// Load balancer implementation
168    load_balancer: Arc<dyn LoadBalancer>,
169    /// Connection pool configuration (Pingora handles actual pooling)
170    pool_config: ConnectionPoolConfig,
171    /// HTTP version configuration
172    http_version: HttpVersionOptions,
173    /// Whether TLS is enabled for this upstream
174    tls_enabled: bool,
175    /// SNI for TLS connections
176    tls_sni: Option<String>,
177    /// TLS configuration for upstream mTLS (client certificates)
178    tls_config: Option<sentinel_config::UpstreamTlsConfig>,
179    /// Circuit breakers per target
180    circuit_breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
181    /// Pool statistics
182    stats: Arc<PoolStats>,
183}
184
185// Note: Active health checking is handled by the PassiveHealthChecker in health.rs
186// and via load balancer health reporting. A future enhancement could add active
187// HTTP/TCP health probes here.
188
189/// Connection pool configuration for Pingora's built-in pooling
190///
191/// Note: Actual connection pooling is handled by Pingora internally.
192/// This struct holds configuration that is applied to peer options.
193pub struct ConnectionPoolConfig {
194    /// Maximum connections per target (informational - Pingora manages actual pooling)
195    pub max_connections: usize,
196    /// Maximum idle connections (informational - Pingora manages actual pooling)
197    pub max_idle: usize,
198    /// Maximum idle timeout for pooled connections
199    pub idle_timeout: Duration,
200    /// Maximum connection lifetime (None = unlimited)
201    pub max_lifetime: Option<Duration>,
202    /// Connection timeout
203    pub connection_timeout: Duration,
204    /// Read timeout
205    pub read_timeout: Duration,
206    /// Write timeout
207    pub write_timeout: Duration,
208}
209
210/// HTTP version configuration for upstream connections
211pub struct HttpVersionOptions {
212    /// Minimum HTTP version (1 or 2)
213    pub min_version: u8,
214    /// Maximum HTTP version (1 or 2)
215    pub max_version: u8,
216    /// H2 ping interval (0 to disable)
217    pub h2_ping_interval: Duration,
218    /// Maximum concurrent H2 streams per connection
219    pub max_h2_streams: usize,
220}
221
222impl ConnectionPoolConfig {
223    /// Create from upstream config
224    pub fn from_config(
225        pool_config: &sentinel_config::ConnectionPoolConfig,
226        timeouts: &sentinel_config::UpstreamTimeouts,
227    ) -> Self {
228        Self {
229            max_connections: pool_config.max_connections,
230            max_idle: pool_config.max_idle,
231            idle_timeout: Duration::from_secs(pool_config.idle_timeout_secs),
232            max_lifetime: pool_config.max_lifetime_secs.map(Duration::from_secs),
233            connection_timeout: Duration::from_secs(timeouts.connect_secs),
234            read_timeout: Duration::from_secs(timeouts.read_secs),
235            write_timeout: Duration::from_secs(timeouts.write_secs),
236        }
237    }
238}
239
240// CircuitBreaker is imported from sentinel_common
241
242/// Pool statistics
243#[derive(Default)]
244pub struct PoolStats {
245    /// Total requests
246    pub requests: AtomicU64,
247    /// Successful requests
248    pub successes: AtomicU64,
249    /// Failed requests
250    pub failures: AtomicU64,
251    /// Retried requests
252    pub retries: AtomicU64,
253    /// Circuit breaker trips
254    pub circuit_breaker_trips: AtomicU64,
255}
256
257/// Snapshot of pool configuration for metrics/debugging
258#[derive(Debug, Clone)]
259pub struct PoolConfigSnapshot {
260    /// Maximum connections per target
261    pub max_connections: usize,
262    /// Maximum idle connections
263    pub max_idle: usize,
264    /// Idle timeout in seconds
265    pub idle_timeout_secs: u64,
266    /// Maximum connection lifetime in seconds (None = unlimited)
267    pub max_lifetime_secs: Option<u64>,
268    /// Connection timeout in seconds
269    pub connection_timeout_secs: u64,
270    /// Read timeout in seconds
271    pub read_timeout_secs: u64,
272    /// Write timeout in seconds
273    pub write_timeout_secs: u64,
274}
275
276/// Round-robin load balancer
277struct RoundRobinBalancer {
278    targets: Vec<UpstreamTarget>,
279    current: AtomicUsize,
280    health_status: Arc<RwLock<HashMap<String, bool>>>,
281}
282
283impl RoundRobinBalancer {
284    fn new(targets: Vec<UpstreamTarget>) -> Self {
285        let mut health_status = HashMap::new();
286        for target in &targets {
287            health_status.insert(target.full_address(), true);
288        }
289
290        Self {
291            targets,
292            current: AtomicUsize::new(0),
293            health_status: Arc::new(RwLock::new(health_status)),
294        }
295    }
296}
297
298#[async_trait]
299impl LoadBalancer for RoundRobinBalancer {
300    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
301        trace!(
302            total_targets = self.targets.len(),
303            algorithm = "round_robin",
304            "Selecting upstream target"
305        );
306
307        let health = self.health_status.read().await;
308        let healthy_targets: Vec<_> = self
309            .targets
310            .iter()
311            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
312            .collect();
313
314        if healthy_targets.is_empty() {
315            warn!(
316                total_targets = self.targets.len(),
317                algorithm = "round_robin",
318                "No healthy upstream targets available"
319            );
320            return Err(SentinelError::NoHealthyUpstream);
321        }
322
323        let index = self.current.fetch_add(1, Ordering::Relaxed) % healthy_targets.len();
324        let target = healthy_targets[index];
325
326        trace!(
327            selected_target = %target.full_address(),
328            healthy_count = healthy_targets.len(),
329            index = index,
330            algorithm = "round_robin",
331            "Selected target via round robin"
332        );
333
334        Ok(TargetSelection {
335            address: target.full_address(),
336            weight: target.weight,
337            metadata: HashMap::new(),
338        })
339    }
340
341    async fn report_health(&self, address: &str, healthy: bool) {
342        trace!(
343            target = %address,
344            healthy = healthy,
345            algorithm = "round_robin",
346            "Updating target health status"
347        );
348        self.health_status
349            .write()
350            .await
351            .insert(address.to_string(), healthy);
352    }
353
354    async fn healthy_targets(&self) -> Vec<String> {
355        self.health_status
356            .read()
357            .await
358            .iter()
359            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
360            .collect()
361    }
362}
363
364/// Least connections load balancer
365struct LeastConnectionsBalancer {
366    targets: Vec<UpstreamTarget>,
367    connections: Arc<RwLock<HashMap<String, usize>>>,
368    health_status: Arc<RwLock<HashMap<String, bool>>>,
369}
370
371impl LeastConnectionsBalancer {
372    fn new(targets: Vec<UpstreamTarget>) -> Self {
373        let mut health_status = HashMap::new();
374        let mut connections = HashMap::new();
375
376        for target in &targets {
377            let addr = target.full_address();
378            health_status.insert(addr.clone(), true);
379            connections.insert(addr, 0);
380        }
381
382        Self {
383            targets,
384            connections: Arc::new(RwLock::new(connections)),
385            health_status: Arc::new(RwLock::new(health_status)),
386        }
387    }
388}
389
390#[async_trait]
391impl LoadBalancer for LeastConnectionsBalancer {
392    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
393        trace!(
394            total_targets = self.targets.len(),
395            algorithm = "least_connections",
396            "Selecting upstream target"
397        );
398
399        let health = self.health_status.read().await;
400        let conns = self.connections.read().await;
401
402        let mut best_target = None;
403        let mut min_connections = usize::MAX;
404
405        for target in &self.targets {
406            let addr = target.full_address();
407            if !*health.get(&addr).unwrap_or(&true) {
408                trace!(
409                    target = %addr,
410                    algorithm = "least_connections",
411                    "Skipping unhealthy target"
412                );
413                continue;
414            }
415
416            let conn_count = *conns.get(&addr).unwrap_or(&0);
417            trace!(
418                target = %addr,
419                connections = conn_count,
420                "Evaluating target connection count"
421            );
422            if conn_count < min_connections {
423                min_connections = conn_count;
424                best_target = Some(target);
425            }
426        }
427
428        match best_target {
429            Some(target) => {
430                trace!(
431                    selected_target = %target.full_address(),
432                    connections = min_connections,
433                    algorithm = "least_connections",
434                    "Selected target with fewest connections"
435                );
436                Ok(TargetSelection {
437                    address: target.full_address(),
438                    weight: target.weight,
439                    metadata: HashMap::new(),
440                })
441            }
442            None => {
443                warn!(
444                    total_targets = self.targets.len(),
445                    algorithm = "least_connections",
446                    "No healthy upstream targets available"
447                );
448                Err(SentinelError::NoHealthyUpstream)
449            }
450        }
451    }
452
453    async fn report_health(&self, address: &str, healthy: bool) {
454        trace!(
455            target = %address,
456            healthy = healthy,
457            algorithm = "least_connections",
458            "Updating target health status"
459        );
460        self.health_status
461            .write()
462            .await
463            .insert(address.to_string(), healthy);
464    }
465
466    async fn healthy_targets(&self) -> Vec<String> {
467        self.health_status
468            .read()
469            .await
470            .iter()
471            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
472            .collect()
473    }
474}
475
476/// Weighted load balancer
477struct WeightedBalancer {
478    targets: Vec<UpstreamTarget>,
479    weights: Vec<u32>,
480    current_index: AtomicUsize,
481    health_status: Arc<RwLock<HashMap<String, bool>>>,
482}
483
484#[async_trait]
485impl LoadBalancer for WeightedBalancer {
486    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
487        trace!(
488            total_targets = self.targets.len(),
489            algorithm = "weighted",
490            "Selecting upstream target"
491        );
492
493        let health = self.health_status.read().await;
494        let healthy_indices: Vec<_> = self
495            .targets
496            .iter()
497            .enumerate()
498            .filter(|(_, t)| *health.get(&t.full_address()).unwrap_or(&true))
499            .map(|(i, _)| i)
500            .collect();
501
502        if healthy_indices.is_empty() {
503            warn!(
504                total_targets = self.targets.len(),
505                algorithm = "weighted",
506                "No healthy upstream targets available"
507            );
508            return Err(SentinelError::NoHealthyUpstream);
509        }
510
511        let idx = self.current_index.fetch_add(1, Ordering::Relaxed) % healthy_indices.len();
512        let target_idx = healthy_indices[idx];
513        let target = &self.targets[target_idx];
514        let weight = self.weights.get(target_idx).copied().unwrap_or(1);
515
516        trace!(
517            selected_target = %target.full_address(),
518            weight = weight,
519            healthy_count = healthy_indices.len(),
520            algorithm = "weighted",
521            "Selected target via weighted round robin"
522        );
523
524        Ok(TargetSelection {
525            address: target.full_address(),
526            weight,
527            metadata: HashMap::new(),
528        })
529    }
530
531    async fn report_health(&self, address: &str, healthy: bool) {
532        trace!(
533            target = %address,
534            healthy = healthy,
535            algorithm = "weighted",
536            "Updating target health status"
537        );
538        self.health_status
539            .write()
540            .await
541            .insert(address.to_string(), healthy);
542    }
543
544    async fn healthy_targets(&self) -> Vec<String> {
545        self.health_status
546            .read()
547            .await
548            .iter()
549            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
550            .collect()
551    }
552}
553
554/// IP hash load balancer
555struct IpHashBalancer {
556    targets: Vec<UpstreamTarget>,
557    health_status: Arc<RwLock<HashMap<String, bool>>>,
558}
559
560#[async_trait]
561impl LoadBalancer for IpHashBalancer {
562    async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
563        trace!(
564            total_targets = self.targets.len(),
565            algorithm = "ip_hash",
566            "Selecting upstream target"
567        );
568
569        let health = self.health_status.read().await;
570        let healthy_targets: Vec<_> = self
571            .targets
572            .iter()
573            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
574            .collect();
575
576        if healthy_targets.is_empty() {
577            warn!(
578                total_targets = self.targets.len(),
579                algorithm = "ip_hash",
580                "No healthy upstream targets available"
581            );
582            return Err(SentinelError::NoHealthyUpstream);
583        }
584
585        // Hash the client IP to select a target
586        let (hash, client_ip_str) = if let Some(ctx) = context {
587            if let Some(ip) = &ctx.client_ip {
588                use std::hash::{Hash, Hasher};
589                let mut hasher = std::collections::hash_map::DefaultHasher::new();
590                ip.hash(&mut hasher);
591                (hasher.finish(), Some(ip.to_string()))
592            } else {
593                (0, None)
594            }
595        } else {
596            (0, None)
597        };
598
599        let idx = (hash as usize) % healthy_targets.len();
600        let target = healthy_targets[idx];
601
602        trace!(
603            selected_target = %target.full_address(),
604            client_ip = client_ip_str.as_deref().unwrap_or("unknown"),
605            hash = hash,
606            index = idx,
607            healthy_count = healthy_targets.len(),
608            algorithm = "ip_hash",
609            "Selected target via IP hash"
610        );
611
612        Ok(TargetSelection {
613            address: target.full_address(),
614            weight: target.weight,
615            metadata: HashMap::new(),
616        })
617    }
618
619    async fn report_health(&self, address: &str, healthy: bool) {
620        trace!(
621            target = %address,
622            healthy = healthy,
623            algorithm = "ip_hash",
624            "Updating target health status"
625        );
626        self.health_status
627            .write()
628            .await
629            .insert(address.to_string(), healthy);
630    }
631
632    async fn healthy_targets(&self) -> Vec<String> {
633        self.health_status
634            .read()
635            .await
636            .iter()
637            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
638            .collect()
639    }
640}
641
642impl UpstreamPool {
643    /// Create new upstream pool from configuration
644    pub async fn new(config: UpstreamConfig) -> SentinelResult<Self> {
645        let id = UpstreamId::new(&config.id);
646
647        info!(
648            upstream_id = %config.id,
649            target_count = config.targets.len(),
650            algorithm = ?config.load_balancing,
651            "Creating upstream pool"
652        );
653
654        // Convert config targets to internal targets
655        let targets: Vec<UpstreamTarget> = config
656            .targets
657            .iter()
658            .filter_map(UpstreamTarget::from_config)
659            .collect();
660
661        if targets.is_empty() {
662            error!(
663                upstream_id = %config.id,
664                "No valid upstream targets configured"
665            );
666            return Err(SentinelError::Config {
667                message: "No valid upstream targets".to_string(),
668                source: None,
669            });
670        }
671
672        for target in &targets {
673            debug!(
674                upstream_id = %config.id,
675                target = %target.full_address(),
676                weight = target.weight,
677                "Registered upstream target"
678            );
679        }
680
681        // Create load balancer
682        debug!(
683            upstream_id = %config.id,
684            algorithm = ?config.load_balancing,
685            "Creating load balancer"
686        );
687        let load_balancer = Self::create_load_balancer(&config.load_balancing, &targets)?;
688
689        // Create connection pool configuration (Pingora handles actual pooling)
690        debug!(
691            upstream_id = %config.id,
692            max_connections = config.connection_pool.max_connections,
693            max_idle = config.connection_pool.max_idle,
694            idle_timeout_secs = config.connection_pool.idle_timeout_secs,
695            connect_timeout_secs = config.timeouts.connect_secs,
696            read_timeout_secs = config.timeouts.read_secs,
697            write_timeout_secs = config.timeouts.write_secs,
698            "Creating connection pool configuration"
699        );
700        let pool_config =
701            ConnectionPoolConfig::from_config(&config.connection_pool, &config.timeouts);
702
703        // Create HTTP version configuration
704        let http_version = HttpVersionOptions {
705            min_version: config.http_version.min_version,
706            max_version: config.http_version.max_version,
707            h2_ping_interval: if config.http_version.h2_ping_interval_secs > 0 {
708                Duration::from_secs(config.http_version.h2_ping_interval_secs)
709            } else {
710                Duration::ZERO
711            },
712            max_h2_streams: config.http_version.max_h2_streams,
713        };
714
715        // TLS configuration
716        let tls_enabled = config.tls.is_some();
717        let tls_sni = config.tls.as_ref().and_then(|t| t.sni.clone());
718        let tls_config = config.tls.clone();
719
720        // Log mTLS configuration if present
721        if let Some(ref tls) = tls_config {
722            if tls.client_cert.is_some() {
723                info!(
724                    upstream_id = %config.id,
725                    "mTLS enabled for upstream (client certificate configured)"
726                );
727            }
728        }
729
730        if http_version.max_version >= 2 && tls_enabled {
731            info!(
732                upstream_id = %config.id,
733                "HTTP/2 enabled for upstream (via ALPN)"
734            );
735        }
736
737        // Initialize circuit breakers for each target
738        let mut circuit_breakers = HashMap::new();
739        for target in &targets {
740            trace!(
741                upstream_id = %config.id,
742                target = %target.full_address(),
743                "Initializing circuit breaker for target"
744            );
745            circuit_breakers.insert(
746                target.full_address(),
747                CircuitBreaker::new(CircuitBreakerConfig::default()),
748            );
749        }
750
751        let pool = Self {
752            id: id.clone(),
753            targets,
754            load_balancer,
755            pool_config,
756            http_version,
757            tls_enabled,
758            tls_sni,
759            tls_config,
760            circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
761            stats: Arc::new(PoolStats::default()),
762        };
763
764        info!(
765            upstream_id = %id,
766            target_count = pool.targets.len(),
767            "Upstream pool created successfully"
768        );
769
770        Ok(pool)
771    }
772
773    /// Create load balancer based on algorithm
774    fn create_load_balancer(
775        algorithm: &LoadBalancingAlgorithm,
776        targets: &[UpstreamTarget],
777    ) -> SentinelResult<Arc<dyn LoadBalancer>> {
778        let balancer: Arc<dyn LoadBalancer> = match algorithm {
779            LoadBalancingAlgorithm::RoundRobin => {
780                Arc::new(RoundRobinBalancer::new(targets.to_vec()))
781            }
782            LoadBalancingAlgorithm::LeastConnections => {
783                Arc::new(LeastConnectionsBalancer::new(targets.to_vec()))
784            }
785            LoadBalancingAlgorithm::Weighted => {
786                let weights: Vec<u32> = targets.iter().map(|t| t.weight).collect();
787                Arc::new(WeightedBalancer {
788                    targets: targets.to_vec(),
789                    weights,
790                    current_index: AtomicUsize::new(0),
791                    health_status: Arc::new(RwLock::new(HashMap::new())),
792                })
793            }
794            LoadBalancingAlgorithm::IpHash => Arc::new(IpHashBalancer {
795                targets: targets.to_vec(),
796                health_status: Arc::new(RwLock::new(HashMap::new())),
797            }),
798            LoadBalancingAlgorithm::Random => Arc::new(RoundRobinBalancer::new(targets.to_vec())),
799            LoadBalancingAlgorithm::ConsistentHash => Arc::new(ConsistentHashBalancer::new(
800                targets.to_vec(),
801                ConsistentHashConfig::default(),
802            )),
803            LoadBalancingAlgorithm::PowerOfTwoChoices => {
804                Arc::new(P2cBalancer::new(targets.to_vec(), P2cConfig::default()))
805            }
806            LoadBalancingAlgorithm::Adaptive => Arc::new(AdaptiveBalancer::new(
807                targets.to_vec(),
808                AdaptiveConfig::default(),
809            )),
810        };
811        Ok(balancer)
812    }
813
814    /// Select next upstream peer
815    pub async fn select_peer(&self, context: Option<&RequestContext>) -> SentinelResult<HttpPeer> {
816        let request_num = self.stats.requests.fetch_add(1, Ordering::Relaxed) + 1;
817
818        trace!(
819            upstream_id = %self.id,
820            request_num = request_num,
821            target_count = self.targets.len(),
822            "Starting peer selection"
823        );
824
825        let mut attempts = 0;
826        let max_attempts = self.targets.len() * 2;
827
828        while attempts < max_attempts {
829            attempts += 1;
830
831            trace!(
832                upstream_id = %self.id,
833                attempt = attempts,
834                max_attempts = max_attempts,
835                "Attempting to select peer"
836            );
837
838            let selection = match self.load_balancer.select(context).await {
839                Ok(s) => s,
840                Err(e) => {
841                    warn!(
842                        upstream_id = %self.id,
843                        attempt = attempts,
844                        error = %e,
845                        "Load balancer selection failed"
846                    );
847                    continue;
848                }
849            };
850
851            trace!(
852                upstream_id = %self.id,
853                target = %selection.address,
854                attempt = attempts,
855                "Load balancer selected target"
856            );
857
858            // Check circuit breaker
859            let breakers = self.circuit_breakers.read().await;
860            if let Some(breaker) = breakers.get(&selection.address) {
861                if !breaker.is_closed().await {
862                    debug!(
863                        upstream_id = %self.id,
864                        target = %selection.address,
865                        attempt = attempts,
866                        "Circuit breaker is open, skipping target"
867                    );
868                    self.stats
869                        .circuit_breaker_trips
870                        .fetch_add(1, Ordering::Relaxed);
871                    continue;
872                }
873            }
874
875            // Create peer with pooling options
876            // Note: Pingora handles actual connection pooling internally based on
877            // peer.options.idle_timeout and ServerConf.upstream_keepalive_pool_size
878            trace!(
879                upstream_id = %self.id,
880                target = %selection.address,
881                "Creating peer for upstream (Pingora handles connection reuse)"
882            );
883            let peer = self.create_peer(&selection)?;
884
885            debug!(
886                upstream_id = %self.id,
887                target = %selection.address,
888                attempt = attempts,
889                "Selected upstream peer"
890            );
891
892            self.stats.successes.fetch_add(1, Ordering::Relaxed);
893            return Ok(peer);
894        }
895
896        self.stats.failures.fetch_add(1, Ordering::Relaxed);
897        error!(
898            upstream_id = %self.id,
899            attempts = attempts,
900            max_attempts = max_attempts,
901            "Failed to select upstream after max attempts"
902        );
903        Err(SentinelError::upstream(
904            self.id.to_string(),
905            "Failed to select upstream after max attempts",
906        ))
907    }
908
909    /// Create new peer connection with connection pooling options
910    ///
911    /// Pingora handles actual connection pooling internally. When idle_timeout
912    /// is set on the peer options, Pingora will keep the connection alive and
913    /// reuse it for subsequent requests to the same upstream.
914    fn create_peer(&self, selection: &TargetSelection) -> SentinelResult<HttpPeer> {
915        // Determine SNI hostname for TLS connections
916        let sni_hostname = self.tls_sni.clone().unwrap_or_else(|| {
917            // Extract hostname from address (strip port)
918            selection
919                .address
920                .split(':')
921                .next()
922                .unwrap_or(&selection.address)
923                .to_string()
924        });
925
926        // Pre-resolve the address to avoid panics in Pingora's HttpPeer::new
927        // when DNS resolution fails (e.g., when a container is killed)
928        let resolved_address = selection
929            .address
930            .to_socket_addrs()
931            .map_err(|e| {
932                error!(
933                    upstream = %self.id,
934                    address = %selection.address,
935                    error = %e,
936                    "Failed to resolve upstream address"
937                );
938                SentinelError::Upstream {
939                    upstream: self.id.to_string(),
940                    message: format!("DNS resolution failed for {}: {}", selection.address, e),
941                    retryable: true,
942                    source: None,
943                }
944            })?
945            .next()
946            .ok_or_else(|| {
947                error!(
948                    upstream = %self.id,
949                    address = %selection.address,
950                    "No addresses returned from DNS resolution"
951                );
952                SentinelError::Upstream {
953                    upstream: self.id.to_string(),
954                    message: format!("No addresses for {}", selection.address),
955                    retryable: true,
956                    source: None,
957                }
958            })?;
959
960        // Use the resolved IP address to create the peer
961        let mut peer = HttpPeer::new(resolved_address, self.tls_enabled, sni_hostname.clone());
962
963        // Configure connection pooling options for better performance
964        // idle_timeout enables Pingora's connection pooling - connections are
965        // kept alive and reused for this duration
966        peer.options.idle_timeout = Some(self.pool_config.idle_timeout);
967
968        // Connection timeouts
969        peer.options.connection_timeout = Some(self.pool_config.connection_timeout);
970        peer.options.total_connection_timeout = Some(Duration::from_secs(10));
971
972        // Read/write timeouts
973        peer.options.read_timeout = Some(self.pool_config.read_timeout);
974        peer.options.write_timeout = Some(self.pool_config.write_timeout);
975
976        // Enable TCP keepalive for long-lived connections
977        peer.options.tcp_keepalive = Some(pingora::protocols::TcpKeepalive {
978            idle: Duration::from_secs(60),
979            interval: Duration::from_secs(10),
980            count: 3,
981            // user_timeout is Linux-only
982            #[cfg(target_os = "linux")]
983            user_timeout: Duration::from_secs(60),
984        });
985
986        // Configure HTTP version and ALPN for TLS connections
987        if self.tls_enabled {
988            // Set ALPN protocols based on configured HTTP version range
989            let alpn = match (self.http_version.min_version, self.http_version.max_version) {
990                (2, _) => {
991                    // HTTP/2 only - use h2 ALPN
992                    pingora::upstreams::peer::ALPN::H2
993                }
994                (1, 2) | (_, 2) => {
995                    // Prefer HTTP/2 but fall back to HTTP/1.1
996                    pingora::upstreams::peer::ALPN::H2H1
997                }
998                _ => {
999                    // HTTP/1.1 only
1000                    pingora::upstreams::peer::ALPN::H1
1001                }
1002            };
1003            peer.options.alpn = alpn;
1004
1005            // Configure TLS verification options based on upstream config
1006            if let Some(ref tls_config) = self.tls_config {
1007                // Skip certificate verification if configured (DANGEROUS - testing only)
1008                if tls_config.insecure_skip_verify {
1009                    peer.options.verify_cert = false;
1010                    peer.options.verify_hostname = false;
1011                    warn!(
1012                        upstream_id = %self.id,
1013                        target = %selection.address,
1014                        "TLS certificate verification DISABLED (insecure_skip_verify=true)"
1015                    );
1016                }
1017
1018                // Set alternative CN for verification if SNI differs from actual hostname
1019                if let Some(ref sni) = tls_config.sni {
1020                    peer.options.alternative_cn = Some(sni.clone());
1021                    trace!(
1022                        upstream_id = %self.id,
1023                        target = %selection.address,
1024                        alternative_cn = %sni,
1025                        "Set alternative CN for TLS verification"
1026                    );
1027                }
1028
1029                // Log mTLS client certificate configuration
1030                // Note: Full mTLS client cert support requires Pingora connector customization
1031                // The certificates are loaded but Pingora's standard connector doesn't expose
1032                // a direct way to inject client certs. For production mTLS to backends,
1033                // consider using a custom Pingora Connector implementation.
1034                if tls_config.client_cert.is_some() {
1035                    debug!(
1036                        upstream_id = %self.id,
1037                        target = %selection.address,
1038                        client_cert = ?tls_config.client_cert,
1039                        "mTLS client certificate configured (requires custom connector for full support)"
1040                    );
1041                }
1042            }
1043
1044            trace!(
1045                upstream_id = %self.id,
1046                target = %selection.address,
1047                alpn = ?peer.options.alpn,
1048                min_version = self.http_version.min_version,
1049                max_version = self.http_version.max_version,
1050                verify_cert = peer.options.verify_cert,
1051                verify_hostname = peer.options.verify_hostname,
1052                "Configured ALPN and TLS options for HTTP version negotiation"
1053            );
1054        }
1055
1056        // Configure H2-specific settings when HTTP/2 is enabled
1057        if self.http_version.max_version >= 2 {
1058            // H2 ping interval for connection health monitoring
1059            if !self.http_version.h2_ping_interval.is_zero() {
1060                peer.options.h2_ping_interval = Some(self.http_version.h2_ping_interval);
1061                trace!(
1062                    upstream_id = %self.id,
1063                    target = %selection.address,
1064                    h2_ping_interval_secs = self.http_version.h2_ping_interval.as_secs(),
1065                    "Configured H2 ping interval"
1066                );
1067            }
1068        }
1069
1070        trace!(
1071            upstream_id = %self.id,
1072            target = %selection.address,
1073            tls = self.tls_enabled,
1074            sni = %sni_hostname,
1075            idle_timeout_secs = self.pool_config.idle_timeout.as_secs(),
1076            http_max_version = self.http_version.max_version,
1077            "Created peer with Pingora connection pooling enabled"
1078        );
1079
1080        Ok(peer)
1081    }
1082
1083    /// Report connection result for a target
1084    pub async fn report_result(&self, target: &str, success: bool) {
1085        trace!(
1086            upstream_id = %self.id,
1087            target = %target,
1088            success = success,
1089            "Reporting connection result"
1090        );
1091
1092        if success {
1093            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1094                breaker.record_success().await;
1095                trace!(
1096                    upstream_id = %self.id,
1097                    target = %target,
1098                    "Recorded success in circuit breaker"
1099                );
1100            }
1101            self.load_balancer.report_health(target, true).await;
1102        } else {
1103            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1104                breaker.record_failure().await;
1105                debug!(
1106                    upstream_id = %self.id,
1107                    target = %target,
1108                    "Recorded failure in circuit breaker"
1109                );
1110            }
1111            self.load_balancer.report_health(target, false).await;
1112            self.stats.failures.fetch_add(1, Ordering::Relaxed);
1113            warn!(
1114                upstream_id = %self.id,
1115                target = %target,
1116                "Connection failure reported for target"
1117            );
1118        }
1119    }
1120
1121    /// Report request result with latency for adaptive load balancing
1122    ///
1123    /// This method passes latency information to the load balancer for
1124    /// adaptive weight adjustment. It updates circuit breakers, health
1125    /// status, and load balancer metrics.
1126    pub async fn report_result_with_latency(
1127        &self,
1128        target: &str,
1129        success: bool,
1130        latency: Option<Duration>,
1131    ) {
1132        trace!(
1133            upstream_id = %self.id,
1134            target = %target,
1135            success = success,
1136            latency_ms = latency.map(|l| l.as_millis() as u64),
1137            "Reporting result with latency for adaptive LB"
1138        );
1139
1140        // Update circuit breaker
1141        if success {
1142            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1143                breaker.record_success().await;
1144            }
1145        } else {
1146            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1147                breaker.record_failure().await;
1148            }
1149            self.stats.failures.fetch_add(1, Ordering::Relaxed);
1150        }
1151
1152        // Report to load balancer with latency (enables adaptive weight adjustment)
1153        self.load_balancer
1154            .report_result_with_latency(target, success, latency)
1155            .await;
1156    }
1157
1158    /// Get pool statistics
1159    pub fn stats(&self) -> &PoolStats {
1160        &self.stats
1161    }
1162
1163    /// Get pool ID
1164    pub fn id(&self) -> &UpstreamId {
1165        &self.id
1166    }
1167
1168    /// Get target count
1169    pub fn target_count(&self) -> usize {
1170        self.targets.len()
1171    }
1172
1173    /// Get pool configuration (for metrics/debugging)
1174    pub fn pool_config(&self) -> PoolConfigSnapshot {
1175        PoolConfigSnapshot {
1176            max_connections: self.pool_config.max_connections,
1177            max_idle: self.pool_config.max_idle,
1178            idle_timeout_secs: self.pool_config.idle_timeout.as_secs(),
1179            max_lifetime_secs: self.pool_config.max_lifetime.map(|d| d.as_secs()),
1180            connection_timeout_secs: self.pool_config.connection_timeout.as_secs(),
1181            read_timeout_secs: self.pool_config.read_timeout.as_secs(),
1182            write_timeout_secs: self.pool_config.write_timeout.as_secs(),
1183        }
1184    }
1185
1186    /// Shutdown the pool
1187    ///
1188    /// Note: Pingora manages connection pooling internally, so we just log stats.
1189    pub async fn shutdown(&self) {
1190        info!(
1191            upstream_id = %self.id,
1192            target_count = self.targets.len(),
1193            total_requests = self.stats.requests.load(Ordering::Relaxed),
1194            total_successes = self.stats.successes.load(Ordering::Relaxed),
1195            total_failures = self.stats.failures.load(Ordering::Relaxed),
1196            "Shutting down upstream pool"
1197        );
1198        // Pingora handles connection cleanup internally
1199        debug!(upstream_id = %self.id, "Upstream pool shutdown complete");
1200    }
1201}