sentinel_proxy/upstream/
mod.rs

1//! Upstream pool management module for Sentinel proxy
2//!
3//! This module handles upstream server pools, load balancing, health checking,
4//! connection pooling, and retry logic with circuit breakers.
5
6use async_trait::async_trait;
7use pingora::upstreams::peer::HttpPeer;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::RwLock;
13use tracing::{debug, error, info, trace, warn};
14
15use sentinel_common::{
16    errors::{SentinelError, SentinelResult},
17    types::{CircuitBreakerConfig, LoadBalancingAlgorithm},
18    CircuitBreaker, UpstreamId,
19};
20use sentinel_config::UpstreamConfig;
21
22// ============================================================================
23// Internal Upstream Target Type
24// ============================================================================
25
26/// Internal upstream target representation for load balancers
27///
28/// This is a simplified representation used internally by load balancers,
29/// separate from the user-facing config UpstreamTarget.
30#[derive(Debug, Clone)]
31pub struct UpstreamTarget {
32    /// Target IP address or hostname
33    pub address: String,
34    /// Target port
35    pub port: u16,
36    /// Weight for weighted load balancing
37    pub weight: u32,
38}
39
40impl UpstreamTarget {
41    /// Create a new upstream target
42    pub fn new(address: impl Into<String>, port: u16, weight: u32) -> Self {
43        Self {
44            address: address.into(),
45            port,
46            weight,
47        }
48    }
49
50    /// Create from a "host:port" string with default weight
51    pub fn from_address(addr: &str) -> Option<Self> {
52        let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
53        if parts.len() == 2 {
54            let port = parts[0].parse().ok()?;
55            let address = parts[1].to_string();
56            Some(Self {
57                address,
58                port,
59                weight: 100,
60            })
61        } else {
62            None
63        }
64    }
65
66    /// Convert from config UpstreamTarget
67    pub fn from_config(config: &sentinel_config::UpstreamTarget) -> Option<Self> {
68        Self::from_address(&config.address).map(|mut t| {
69            t.weight = config.weight;
70            t
71        })
72    }
73
74    /// Get the full address string
75    pub fn full_address(&self) -> String {
76        format!("{}:{}", self.address, self.port)
77    }
78}
79
80// ============================================================================
81// Load Balancing
82// ============================================================================
83
84// Load balancing algorithm implementations
85pub mod adaptive;
86pub mod consistent_hash;
87pub mod health;
88pub mod p2c;
89
90// Re-export commonly used types from sub-modules
91pub use adaptive::{AdaptiveBalancer, AdaptiveConfig};
92pub use consistent_hash::{ConsistentHashBalancer, ConsistentHashConfig};
93pub use health::{ActiveHealthChecker, HealthCheckRunner};
94pub use p2c::{P2cBalancer, P2cConfig};
95
96/// Request context for load balancer decisions
97#[derive(Debug, Clone)]
98pub struct RequestContext {
99    pub client_ip: Option<std::net::SocketAddr>,
100    pub headers: HashMap<String, String>,
101    pub path: String,
102    pub method: String,
103}
104
105/// Load balancer trait for different algorithms
106#[async_trait]
107pub trait LoadBalancer: Send + Sync {
108    /// Select next upstream target
109    async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection>;
110
111    /// Report target health status
112    async fn report_health(&self, address: &str, healthy: bool);
113
114    /// Get all healthy targets
115    async fn healthy_targets(&self) -> Vec<String>;
116
117    /// Release connection (for connection tracking)
118    async fn release(&self, _selection: &TargetSelection) {
119        // Default implementation - no-op
120    }
121
122    /// Report request result (for adaptive algorithms)
123    async fn report_result(
124        &self,
125        _selection: &TargetSelection,
126        _success: bool,
127        _latency: Option<Duration>,
128    ) {
129        // Default implementation - no-op
130    }
131}
132
133/// Selected upstream target
134#[derive(Debug, Clone)]
135pub struct TargetSelection {
136    /// Target address
137    pub address: String,
138    /// Target weight
139    pub weight: u32,
140    /// Target metadata
141    pub metadata: HashMap<String, String>,
142}
143
144/// Upstream pool managing multiple backend servers
145pub struct UpstreamPool {
146    /// Pool identifier
147    id: UpstreamId,
148    /// Configured targets
149    targets: Vec<UpstreamTarget>,
150    /// Load balancer implementation
151    load_balancer: Arc<dyn LoadBalancer>,
152    /// Connection pool configuration (Pingora handles actual pooling)
153    pool_config: ConnectionPoolConfig,
154    /// HTTP version configuration
155    http_version: HttpVersionOptions,
156    /// Whether TLS is enabled for this upstream
157    tls_enabled: bool,
158    /// SNI for TLS connections
159    tls_sni: Option<String>,
160    /// TLS configuration for upstream mTLS (client certificates)
161    tls_config: Option<sentinel_config::UpstreamTlsConfig>,
162    /// Circuit breakers per target
163    circuit_breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
164    /// Pool statistics
165    stats: Arc<PoolStats>,
166}
167
168// Note: Active health checking is handled by the PassiveHealthChecker in health.rs
169// and via load balancer health reporting. A future enhancement could add active
170// HTTP/TCP health probes here.
171
172/// Connection pool configuration for Pingora's built-in pooling
173///
174/// Note: Actual connection pooling is handled by Pingora internally.
175/// This struct holds configuration that is applied to peer options.
176pub struct ConnectionPoolConfig {
177    /// Maximum connections per target (informational - Pingora manages actual pooling)
178    pub max_connections: usize,
179    /// Maximum idle connections (informational - Pingora manages actual pooling)
180    pub max_idle: usize,
181    /// Maximum idle timeout for pooled connections
182    pub idle_timeout: Duration,
183    /// Maximum connection lifetime (None = unlimited)
184    pub max_lifetime: Option<Duration>,
185    /// Connection timeout
186    pub connection_timeout: Duration,
187    /// Read timeout
188    pub read_timeout: Duration,
189    /// Write timeout
190    pub write_timeout: Duration,
191}
192
193/// HTTP version configuration for upstream connections
194pub struct HttpVersionOptions {
195    /// Minimum HTTP version (1 or 2)
196    pub min_version: u8,
197    /// Maximum HTTP version (1 or 2)
198    pub max_version: u8,
199    /// H2 ping interval (0 to disable)
200    pub h2_ping_interval: Duration,
201    /// Maximum concurrent H2 streams per connection
202    pub max_h2_streams: usize,
203}
204
205impl ConnectionPoolConfig {
206    /// Create from upstream config
207    pub fn from_config(
208        pool_config: &sentinel_config::ConnectionPoolConfig,
209        timeouts: &sentinel_config::UpstreamTimeouts,
210    ) -> Self {
211        Self {
212            max_connections: pool_config.max_connections,
213            max_idle: pool_config.max_idle,
214            idle_timeout: Duration::from_secs(pool_config.idle_timeout_secs),
215            max_lifetime: pool_config.max_lifetime_secs.map(Duration::from_secs),
216            connection_timeout: Duration::from_secs(timeouts.connect_secs),
217            read_timeout: Duration::from_secs(timeouts.read_secs),
218            write_timeout: Duration::from_secs(timeouts.write_secs),
219        }
220    }
221}
222
223// CircuitBreaker is imported from sentinel_common
224
225/// Pool statistics
226#[derive(Default)]
227pub struct PoolStats {
228    /// Total requests
229    pub requests: AtomicU64,
230    /// Successful requests
231    pub successes: AtomicU64,
232    /// Failed requests
233    pub failures: AtomicU64,
234    /// Retried requests
235    pub retries: AtomicU64,
236    /// Circuit breaker trips
237    pub circuit_breaker_trips: AtomicU64,
238}
239
240/// Snapshot of pool configuration for metrics/debugging
241#[derive(Debug, Clone)]
242pub struct PoolConfigSnapshot {
243    /// Maximum connections per target
244    pub max_connections: usize,
245    /// Maximum idle connections
246    pub max_idle: usize,
247    /// Idle timeout in seconds
248    pub idle_timeout_secs: u64,
249    /// Maximum connection lifetime in seconds (None = unlimited)
250    pub max_lifetime_secs: Option<u64>,
251    /// Connection timeout in seconds
252    pub connection_timeout_secs: u64,
253    /// Read timeout in seconds
254    pub read_timeout_secs: u64,
255    /// Write timeout in seconds
256    pub write_timeout_secs: u64,
257}
258
259/// Round-robin load balancer
260struct RoundRobinBalancer {
261    targets: Vec<UpstreamTarget>,
262    current: AtomicUsize,
263    health_status: Arc<RwLock<HashMap<String, bool>>>,
264}
265
266impl RoundRobinBalancer {
267    fn new(targets: Vec<UpstreamTarget>) -> Self {
268        let mut health_status = HashMap::new();
269        for target in &targets {
270            health_status.insert(target.full_address(), true);
271        }
272
273        Self {
274            targets,
275            current: AtomicUsize::new(0),
276            health_status: Arc::new(RwLock::new(health_status)),
277        }
278    }
279}
280
281#[async_trait]
282impl LoadBalancer for RoundRobinBalancer {
283    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
284        trace!(
285            total_targets = self.targets.len(),
286            algorithm = "round_robin",
287            "Selecting upstream target"
288        );
289
290        let health = self.health_status.read().await;
291        let healthy_targets: Vec<_> = self
292            .targets
293            .iter()
294            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
295            .collect();
296
297        if healthy_targets.is_empty() {
298            warn!(
299                total_targets = self.targets.len(),
300                algorithm = "round_robin",
301                "No healthy upstream targets available"
302            );
303            return Err(SentinelError::NoHealthyUpstream);
304        }
305
306        let index = self.current.fetch_add(1, Ordering::Relaxed) % healthy_targets.len();
307        let target = healthy_targets[index];
308
309        trace!(
310            selected_target = %target.full_address(),
311            healthy_count = healthy_targets.len(),
312            index = index,
313            algorithm = "round_robin",
314            "Selected target via round robin"
315        );
316
317        Ok(TargetSelection {
318            address: target.full_address(),
319            weight: target.weight,
320            metadata: HashMap::new(),
321        })
322    }
323
324    async fn report_health(&self, address: &str, healthy: bool) {
325        trace!(
326            target = %address,
327            healthy = healthy,
328            algorithm = "round_robin",
329            "Updating target health status"
330        );
331        self.health_status
332            .write()
333            .await
334            .insert(address.to_string(), healthy);
335    }
336
337    async fn healthy_targets(&self) -> Vec<String> {
338        self.health_status
339            .read()
340            .await
341            .iter()
342            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
343            .collect()
344    }
345}
346
347/// Least connections load balancer
348struct LeastConnectionsBalancer {
349    targets: Vec<UpstreamTarget>,
350    connections: Arc<RwLock<HashMap<String, usize>>>,
351    health_status: Arc<RwLock<HashMap<String, bool>>>,
352}
353
354impl LeastConnectionsBalancer {
355    fn new(targets: Vec<UpstreamTarget>) -> Self {
356        let mut health_status = HashMap::new();
357        let mut connections = HashMap::new();
358
359        for target in &targets {
360            let addr = target.full_address();
361            health_status.insert(addr.clone(), true);
362            connections.insert(addr, 0);
363        }
364
365        Self {
366            targets,
367            connections: Arc::new(RwLock::new(connections)),
368            health_status: Arc::new(RwLock::new(health_status)),
369        }
370    }
371}
372
373#[async_trait]
374impl LoadBalancer for LeastConnectionsBalancer {
375    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
376        trace!(
377            total_targets = self.targets.len(),
378            algorithm = "least_connections",
379            "Selecting upstream target"
380        );
381
382        let health = self.health_status.read().await;
383        let conns = self.connections.read().await;
384
385        let mut best_target = None;
386        let mut min_connections = usize::MAX;
387
388        for target in &self.targets {
389            let addr = target.full_address();
390            if !*health.get(&addr).unwrap_or(&true) {
391                trace!(
392                    target = %addr,
393                    algorithm = "least_connections",
394                    "Skipping unhealthy target"
395                );
396                continue;
397            }
398
399            let conn_count = *conns.get(&addr).unwrap_or(&0);
400            trace!(
401                target = %addr,
402                connections = conn_count,
403                "Evaluating target connection count"
404            );
405            if conn_count < min_connections {
406                min_connections = conn_count;
407                best_target = Some(target);
408            }
409        }
410
411        match best_target {
412            Some(target) => {
413                trace!(
414                    selected_target = %target.full_address(),
415                    connections = min_connections,
416                    algorithm = "least_connections",
417                    "Selected target with fewest connections"
418                );
419                Ok(TargetSelection {
420                    address: target.full_address(),
421                    weight: target.weight,
422                    metadata: HashMap::new(),
423                })
424            }
425            None => {
426                warn!(
427                    total_targets = self.targets.len(),
428                    algorithm = "least_connections",
429                    "No healthy upstream targets available"
430                );
431                Err(SentinelError::NoHealthyUpstream)
432            }
433        }
434    }
435
436    async fn report_health(&self, address: &str, healthy: bool) {
437        trace!(
438            target = %address,
439            healthy = healthy,
440            algorithm = "least_connections",
441            "Updating target health status"
442        );
443        self.health_status
444            .write()
445            .await
446            .insert(address.to_string(), healthy);
447    }
448
449    async fn healthy_targets(&self) -> Vec<String> {
450        self.health_status
451            .read()
452            .await
453            .iter()
454            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
455            .collect()
456    }
457}
458
459/// Weighted load balancer
460struct WeightedBalancer {
461    targets: Vec<UpstreamTarget>,
462    weights: Vec<u32>,
463    current_index: AtomicUsize,
464    health_status: Arc<RwLock<HashMap<String, bool>>>,
465}
466
467#[async_trait]
468impl LoadBalancer for WeightedBalancer {
469    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
470        trace!(
471            total_targets = self.targets.len(),
472            algorithm = "weighted",
473            "Selecting upstream target"
474        );
475
476        let health = self.health_status.read().await;
477        let healthy_indices: Vec<_> = self
478            .targets
479            .iter()
480            .enumerate()
481            .filter(|(_, t)| *health.get(&t.full_address()).unwrap_or(&true))
482            .map(|(i, _)| i)
483            .collect();
484
485        if healthy_indices.is_empty() {
486            warn!(
487                total_targets = self.targets.len(),
488                algorithm = "weighted",
489                "No healthy upstream targets available"
490            );
491            return Err(SentinelError::NoHealthyUpstream);
492        }
493
494        let idx = self.current_index.fetch_add(1, Ordering::Relaxed) % healthy_indices.len();
495        let target_idx = healthy_indices[idx];
496        let target = &self.targets[target_idx];
497        let weight = self.weights.get(target_idx).copied().unwrap_or(1);
498
499        trace!(
500            selected_target = %target.full_address(),
501            weight = weight,
502            healthy_count = healthy_indices.len(),
503            algorithm = "weighted",
504            "Selected target via weighted round robin"
505        );
506
507        Ok(TargetSelection {
508            address: target.full_address(),
509            weight,
510            metadata: HashMap::new(),
511        })
512    }
513
514    async fn report_health(&self, address: &str, healthy: bool) {
515        trace!(
516            target = %address,
517            healthy = healthy,
518            algorithm = "weighted",
519            "Updating target health status"
520        );
521        self.health_status
522            .write()
523            .await
524            .insert(address.to_string(), healthy);
525    }
526
527    async fn healthy_targets(&self) -> Vec<String> {
528        self.health_status
529            .read()
530            .await
531            .iter()
532            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
533            .collect()
534    }
535}
536
537/// IP hash load balancer
538struct IpHashBalancer {
539    targets: Vec<UpstreamTarget>,
540    health_status: Arc<RwLock<HashMap<String, bool>>>,
541}
542
543#[async_trait]
544impl LoadBalancer for IpHashBalancer {
545    async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
546        trace!(
547            total_targets = self.targets.len(),
548            algorithm = "ip_hash",
549            "Selecting upstream target"
550        );
551
552        let health = self.health_status.read().await;
553        let healthy_targets: Vec<_> = self
554            .targets
555            .iter()
556            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
557            .collect();
558
559        if healthy_targets.is_empty() {
560            warn!(
561                total_targets = self.targets.len(),
562                algorithm = "ip_hash",
563                "No healthy upstream targets available"
564            );
565            return Err(SentinelError::NoHealthyUpstream);
566        }
567
568        // Hash the client IP to select a target
569        let (hash, client_ip_str) = if let Some(ctx) = context {
570            if let Some(ip) = &ctx.client_ip {
571                use std::hash::{Hash, Hasher};
572                let mut hasher = std::collections::hash_map::DefaultHasher::new();
573                ip.hash(&mut hasher);
574                (hasher.finish(), Some(ip.to_string()))
575            } else {
576                (0, None)
577            }
578        } else {
579            (0, None)
580        };
581
582        let idx = (hash as usize) % healthy_targets.len();
583        let target = healthy_targets[idx];
584
585        trace!(
586            selected_target = %target.full_address(),
587            client_ip = client_ip_str.as_deref().unwrap_or("unknown"),
588            hash = hash,
589            index = idx,
590            healthy_count = healthy_targets.len(),
591            algorithm = "ip_hash",
592            "Selected target via IP hash"
593        );
594
595        Ok(TargetSelection {
596            address: target.full_address(),
597            weight: target.weight,
598            metadata: HashMap::new(),
599        })
600    }
601
602    async fn report_health(&self, address: &str, healthy: bool) {
603        trace!(
604            target = %address,
605            healthy = healthy,
606            algorithm = "ip_hash",
607            "Updating target health status"
608        );
609        self.health_status
610            .write()
611            .await
612            .insert(address.to_string(), healthy);
613    }
614
615    async fn healthy_targets(&self) -> Vec<String> {
616        self.health_status
617            .read()
618            .await
619            .iter()
620            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
621            .collect()
622    }
623}
624
625impl UpstreamPool {
626    /// Create new upstream pool from configuration
627    pub async fn new(config: UpstreamConfig) -> SentinelResult<Self> {
628        let id = UpstreamId::new(&config.id);
629
630        info!(
631            upstream_id = %config.id,
632            target_count = config.targets.len(),
633            algorithm = ?config.load_balancing,
634            "Creating upstream pool"
635        );
636
637        // Convert config targets to internal targets
638        let targets: Vec<UpstreamTarget> = config
639            .targets
640            .iter()
641            .filter_map(UpstreamTarget::from_config)
642            .collect();
643
644        if targets.is_empty() {
645            error!(
646                upstream_id = %config.id,
647                "No valid upstream targets configured"
648            );
649            return Err(SentinelError::Config {
650                message: "No valid upstream targets".to_string(),
651                source: None,
652            });
653        }
654
655        for target in &targets {
656            debug!(
657                upstream_id = %config.id,
658                target = %target.full_address(),
659                weight = target.weight,
660                "Registered upstream target"
661            );
662        }
663
664        // Create load balancer
665        debug!(
666            upstream_id = %config.id,
667            algorithm = ?config.load_balancing,
668            "Creating load balancer"
669        );
670        let load_balancer = Self::create_load_balancer(&config.load_balancing, &targets)?;
671
672        // Create connection pool configuration (Pingora handles actual pooling)
673        debug!(
674            upstream_id = %config.id,
675            max_connections = config.connection_pool.max_connections,
676            max_idle = config.connection_pool.max_idle,
677            idle_timeout_secs = config.connection_pool.idle_timeout_secs,
678            connect_timeout_secs = config.timeouts.connect_secs,
679            read_timeout_secs = config.timeouts.read_secs,
680            write_timeout_secs = config.timeouts.write_secs,
681            "Creating connection pool configuration"
682        );
683        let pool_config =
684            ConnectionPoolConfig::from_config(&config.connection_pool, &config.timeouts);
685
686        // Create HTTP version configuration
687        let http_version = HttpVersionOptions {
688            min_version: config.http_version.min_version,
689            max_version: config.http_version.max_version,
690            h2_ping_interval: if config.http_version.h2_ping_interval_secs > 0 {
691                Duration::from_secs(config.http_version.h2_ping_interval_secs)
692            } else {
693                Duration::ZERO
694            },
695            max_h2_streams: config.http_version.max_h2_streams,
696        };
697
698        // TLS configuration
699        let tls_enabled = config.tls.is_some();
700        let tls_sni = config.tls.as_ref().and_then(|t| t.sni.clone());
701        let tls_config = config.tls.clone();
702
703        // Log mTLS configuration if present
704        if let Some(ref tls) = tls_config {
705            if tls.client_cert.is_some() {
706                info!(
707                    upstream_id = %config.id,
708                    "mTLS enabled for upstream (client certificate configured)"
709                );
710            }
711        }
712
713        if http_version.max_version >= 2 && tls_enabled {
714            info!(
715                upstream_id = %config.id,
716                "HTTP/2 enabled for upstream (via ALPN)"
717            );
718        }
719
720        // Initialize circuit breakers for each target
721        let mut circuit_breakers = HashMap::new();
722        for target in &targets {
723            trace!(
724                upstream_id = %config.id,
725                target = %target.full_address(),
726                "Initializing circuit breaker for target"
727            );
728            circuit_breakers.insert(
729                target.full_address(),
730                CircuitBreaker::new(CircuitBreakerConfig::default()),
731            );
732        }
733
734        let pool = Self {
735            id: id.clone(),
736            targets,
737            load_balancer,
738            pool_config,
739            http_version,
740            tls_enabled,
741            tls_sni,
742            tls_config,
743            circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
744            stats: Arc::new(PoolStats::default()),
745        };
746
747        info!(
748            upstream_id = %id,
749            target_count = pool.targets.len(),
750            "Upstream pool created successfully"
751        );
752
753        Ok(pool)
754    }
755
756    /// Create load balancer based on algorithm
757    fn create_load_balancer(
758        algorithm: &LoadBalancingAlgorithm,
759        targets: &[UpstreamTarget],
760    ) -> SentinelResult<Arc<dyn LoadBalancer>> {
761        let balancer: Arc<dyn LoadBalancer> = match algorithm {
762            LoadBalancingAlgorithm::RoundRobin => {
763                Arc::new(RoundRobinBalancer::new(targets.to_vec()))
764            }
765            LoadBalancingAlgorithm::LeastConnections => {
766                Arc::new(LeastConnectionsBalancer::new(targets.to_vec()))
767            }
768            LoadBalancingAlgorithm::Weighted => {
769                let weights: Vec<u32> = targets.iter().map(|t| t.weight).collect();
770                Arc::new(WeightedBalancer {
771                    targets: targets.to_vec(),
772                    weights,
773                    current_index: AtomicUsize::new(0),
774                    health_status: Arc::new(RwLock::new(HashMap::new())),
775                })
776            }
777            LoadBalancingAlgorithm::IpHash => Arc::new(IpHashBalancer {
778                targets: targets.to_vec(),
779                health_status: Arc::new(RwLock::new(HashMap::new())),
780            }),
781            LoadBalancingAlgorithm::Random => Arc::new(RoundRobinBalancer::new(targets.to_vec())),
782            LoadBalancingAlgorithm::ConsistentHash => Arc::new(ConsistentHashBalancer::new(
783                targets.to_vec(),
784                ConsistentHashConfig::default(),
785            )),
786            LoadBalancingAlgorithm::PowerOfTwoChoices => {
787                Arc::new(P2cBalancer::new(targets.to_vec(), P2cConfig::default()))
788            }
789            LoadBalancingAlgorithm::Adaptive => Arc::new(AdaptiveBalancer::new(
790                targets.to_vec(),
791                AdaptiveConfig::default(),
792            )),
793        };
794        Ok(balancer)
795    }
796
797    /// Select next upstream peer
798    pub async fn select_peer(&self, context: Option<&RequestContext>) -> SentinelResult<HttpPeer> {
799        let request_num = self.stats.requests.fetch_add(1, Ordering::Relaxed) + 1;
800
801        trace!(
802            upstream_id = %self.id,
803            request_num = request_num,
804            target_count = self.targets.len(),
805            "Starting peer selection"
806        );
807
808        let mut attempts = 0;
809        let max_attempts = self.targets.len() * 2;
810
811        while attempts < max_attempts {
812            attempts += 1;
813
814            trace!(
815                upstream_id = %self.id,
816                attempt = attempts,
817                max_attempts = max_attempts,
818                "Attempting to select peer"
819            );
820
821            let selection = match self.load_balancer.select(context).await {
822                Ok(s) => s,
823                Err(e) => {
824                    warn!(
825                        upstream_id = %self.id,
826                        attempt = attempts,
827                        error = %e,
828                        "Load balancer selection failed"
829                    );
830                    continue;
831                }
832            };
833
834            trace!(
835                upstream_id = %self.id,
836                target = %selection.address,
837                attempt = attempts,
838                "Load balancer selected target"
839            );
840
841            // Check circuit breaker
842            let breakers = self.circuit_breakers.read().await;
843            if let Some(breaker) = breakers.get(&selection.address) {
844                if !breaker.is_closed().await {
845                    debug!(
846                        upstream_id = %self.id,
847                        target = %selection.address,
848                        attempt = attempts,
849                        "Circuit breaker is open, skipping target"
850                    );
851                    self.stats
852                        .circuit_breaker_trips
853                        .fetch_add(1, Ordering::Relaxed);
854                    continue;
855                }
856            }
857
858            // Create peer with pooling options
859            // Note: Pingora handles actual connection pooling internally based on
860            // peer.options.idle_timeout and ServerConf.upstream_keepalive_pool_size
861            trace!(
862                upstream_id = %self.id,
863                target = %selection.address,
864                "Creating peer for upstream (Pingora handles connection reuse)"
865            );
866            let peer = self.create_peer(&selection)?;
867
868            debug!(
869                upstream_id = %self.id,
870                target = %selection.address,
871                attempt = attempts,
872                "Selected upstream peer"
873            );
874
875            self.stats.successes.fetch_add(1, Ordering::Relaxed);
876            return Ok(peer);
877        }
878
879        self.stats.failures.fetch_add(1, Ordering::Relaxed);
880        error!(
881            upstream_id = %self.id,
882            attempts = attempts,
883            max_attempts = max_attempts,
884            "Failed to select upstream after max attempts"
885        );
886        Err(SentinelError::upstream(
887            self.id.to_string(),
888            "Failed to select upstream after max attempts",
889        ))
890    }
891
892    /// Create new peer connection with connection pooling options
893    ///
894    /// Pingora handles actual connection pooling internally. When idle_timeout
895    /// is set on the peer options, Pingora will keep the connection alive and
896    /// reuse it for subsequent requests to the same upstream.
897    fn create_peer(&self, selection: &TargetSelection) -> SentinelResult<HttpPeer> {
898        // Determine SNI hostname for TLS connections
899        let sni_hostname = self.tls_sni.clone().unwrap_or_else(|| {
900            // Extract hostname from address (strip port)
901            selection
902                .address
903                .split(':')
904                .next()
905                .unwrap_or(&selection.address)
906                .to_string()
907        });
908
909        let mut peer = HttpPeer::new(&selection.address, self.tls_enabled, sni_hostname.clone());
910
911        // Configure connection pooling options for better performance
912        // idle_timeout enables Pingora's connection pooling - connections are
913        // kept alive and reused for this duration
914        peer.options.idle_timeout = Some(self.pool_config.idle_timeout);
915
916        // Connection timeouts
917        peer.options.connection_timeout = Some(self.pool_config.connection_timeout);
918        peer.options.total_connection_timeout = Some(Duration::from_secs(10));
919
920        // Read/write timeouts
921        peer.options.read_timeout = Some(self.pool_config.read_timeout);
922        peer.options.write_timeout = Some(self.pool_config.write_timeout);
923
924        // Enable TCP keepalive for long-lived connections
925        peer.options.tcp_keepalive = Some(pingora::protocols::TcpKeepalive {
926            idle: Duration::from_secs(60),
927            interval: Duration::from_secs(10),
928            count: 3,
929            // user_timeout is Linux-only
930            #[cfg(target_os = "linux")]
931            user_timeout: Duration::from_secs(60),
932        });
933
934        // Configure HTTP version and ALPN for TLS connections
935        if self.tls_enabled {
936            // Set ALPN protocols based on configured HTTP version range
937            let alpn = match (self.http_version.min_version, self.http_version.max_version) {
938                (2, _) => {
939                    // HTTP/2 only - use h2 ALPN
940                    pingora::upstreams::peer::ALPN::H2
941                }
942                (1, 2) | (_, 2) => {
943                    // Prefer HTTP/2 but fall back to HTTP/1.1
944                    pingora::upstreams::peer::ALPN::H2H1
945                }
946                _ => {
947                    // HTTP/1.1 only
948                    pingora::upstreams::peer::ALPN::H1
949                }
950            };
951            peer.options.alpn = alpn;
952
953            // Configure TLS verification options based on upstream config
954            if let Some(ref tls_config) = self.tls_config {
955                // Skip certificate verification if configured (DANGEROUS - testing only)
956                if tls_config.insecure_skip_verify {
957                    peer.options.verify_cert = false;
958                    peer.options.verify_hostname = false;
959                    warn!(
960                        upstream_id = %self.id,
961                        target = %selection.address,
962                        "TLS certificate verification DISABLED (insecure_skip_verify=true)"
963                    );
964                }
965
966                // Set alternative CN for verification if SNI differs from actual hostname
967                if let Some(ref sni) = tls_config.sni {
968                    peer.options.alternative_cn = Some(sni.clone());
969                    trace!(
970                        upstream_id = %self.id,
971                        target = %selection.address,
972                        alternative_cn = %sni,
973                        "Set alternative CN for TLS verification"
974                    );
975                }
976
977                // Log mTLS client certificate configuration
978                // Note: Full mTLS client cert support requires Pingora connector customization
979                // The certificates are loaded but Pingora's standard connector doesn't expose
980                // a direct way to inject client certs. For production mTLS to backends,
981                // consider using a custom Pingora Connector implementation.
982                if tls_config.client_cert.is_some() {
983                    debug!(
984                        upstream_id = %self.id,
985                        target = %selection.address,
986                        client_cert = ?tls_config.client_cert,
987                        "mTLS client certificate configured (requires custom connector for full support)"
988                    );
989                }
990            }
991
992            trace!(
993                upstream_id = %self.id,
994                target = %selection.address,
995                alpn = ?peer.options.alpn,
996                min_version = self.http_version.min_version,
997                max_version = self.http_version.max_version,
998                verify_cert = peer.options.verify_cert,
999                verify_hostname = peer.options.verify_hostname,
1000                "Configured ALPN and TLS options for HTTP version negotiation"
1001            );
1002        }
1003
1004        // Configure H2-specific settings when HTTP/2 is enabled
1005        if self.http_version.max_version >= 2 {
1006            // H2 ping interval for connection health monitoring
1007            if !self.http_version.h2_ping_interval.is_zero() {
1008                peer.options.h2_ping_interval = Some(self.http_version.h2_ping_interval);
1009                trace!(
1010                    upstream_id = %self.id,
1011                    target = %selection.address,
1012                    h2_ping_interval_secs = self.http_version.h2_ping_interval.as_secs(),
1013                    "Configured H2 ping interval"
1014                );
1015            }
1016        }
1017
1018        trace!(
1019            upstream_id = %self.id,
1020            target = %selection.address,
1021            tls = self.tls_enabled,
1022            sni = %sni_hostname,
1023            idle_timeout_secs = self.pool_config.idle_timeout.as_secs(),
1024            http_max_version = self.http_version.max_version,
1025            "Created peer with Pingora connection pooling enabled"
1026        );
1027
1028        Ok(peer)
1029    }
1030
1031    /// Report connection result for a target
1032    pub async fn report_result(&self, target: &str, success: bool) {
1033        trace!(
1034            upstream_id = %self.id,
1035            target = %target,
1036            success = success,
1037            "Reporting connection result"
1038        );
1039
1040        if success {
1041            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1042                breaker.record_success().await;
1043                trace!(
1044                    upstream_id = %self.id,
1045                    target = %target,
1046                    "Recorded success in circuit breaker"
1047                );
1048            }
1049            self.load_balancer.report_health(target, true).await;
1050        } else {
1051            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1052                breaker.record_failure().await;
1053                debug!(
1054                    upstream_id = %self.id,
1055                    target = %target,
1056                    "Recorded failure in circuit breaker"
1057                );
1058            }
1059            self.load_balancer.report_health(target, false).await;
1060            self.stats.failures.fetch_add(1, Ordering::Relaxed);
1061            warn!(
1062                upstream_id = %self.id,
1063                target = %target,
1064                "Connection failure reported for target"
1065            );
1066        }
1067    }
1068
1069    /// Get pool statistics
1070    pub fn stats(&self) -> &PoolStats {
1071        &self.stats
1072    }
1073
1074    /// Get pool ID
1075    pub fn id(&self) -> &UpstreamId {
1076        &self.id
1077    }
1078
1079    /// Get target count
1080    pub fn target_count(&self) -> usize {
1081        self.targets.len()
1082    }
1083
1084    /// Get pool configuration (for metrics/debugging)
1085    pub fn pool_config(&self) -> PoolConfigSnapshot {
1086        PoolConfigSnapshot {
1087            max_connections: self.pool_config.max_connections,
1088            max_idle: self.pool_config.max_idle,
1089            idle_timeout_secs: self.pool_config.idle_timeout.as_secs(),
1090            max_lifetime_secs: self.pool_config.max_lifetime.map(|d| d.as_secs()),
1091            connection_timeout_secs: self.pool_config.connection_timeout.as_secs(),
1092            read_timeout_secs: self.pool_config.read_timeout.as_secs(),
1093            write_timeout_secs: self.pool_config.write_timeout.as_secs(),
1094        }
1095    }
1096
1097    /// Shutdown the pool
1098    ///
1099    /// Note: Pingora manages connection pooling internally, so we just log stats.
1100    pub async fn shutdown(&self) {
1101        info!(
1102            upstream_id = %self.id,
1103            target_count = self.targets.len(),
1104            total_requests = self.stats.requests.load(Ordering::Relaxed),
1105            total_successes = self.stats.successes.load(Ordering::Relaxed),
1106            total_failures = self.stats.failures.load(Ordering::Relaxed),
1107            "Shutting down upstream pool"
1108        );
1109        // Pingora handles connection cleanup internally
1110        debug!(upstream_id = %self.id, "Upstream pool shutdown complete");
1111    }
1112}