sentinel_proxy/upstream/
mod.rs

1//! Upstream pool management module for Sentinel proxy
2//!
3//! This module handles upstream server pools, load balancing, health checking,
4//! connection pooling, and retry logic with circuit breakers.
5
6use async_trait::async_trait;
7use pingora::upstreams::peer::HttpPeer;
8use std::collections::HashMap;
9use std::net::ToSocketAddrs;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::RwLock;
14use tracing::{debug, error, info, trace, warn};
15
16use sentinel_common::{
17    errors::{SentinelError, SentinelResult},
18    types::{CircuitBreakerConfig, LoadBalancingAlgorithm},
19    CircuitBreaker, UpstreamId,
20};
21use sentinel_config::UpstreamConfig;
22
23// ============================================================================
24// Internal Upstream Target Type
25// ============================================================================
26
27/// Internal upstream target representation for load balancers
28///
29/// This is a simplified representation used internally by load balancers,
30/// separate from the user-facing config UpstreamTarget.
31#[derive(Debug, Clone)]
32pub struct UpstreamTarget {
33    /// Target IP address or hostname
34    pub address: String,
35    /// Target port
36    pub port: u16,
37    /// Weight for weighted load balancing
38    pub weight: u32,
39}
40
41impl UpstreamTarget {
42    /// Create a new upstream target
43    pub fn new(address: impl Into<String>, port: u16, weight: u32) -> Self {
44        Self {
45            address: address.into(),
46            port,
47            weight,
48        }
49    }
50
51    /// Create from a "host:port" string with default weight
52    pub fn from_address(addr: &str) -> Option<Self> {
53        let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
54        if parts.len() == 2 {
55            let port = parts[0].parse().ok()?;
56            let address = parts[1].to_string();
57            Some(Self {
58                address,
59                port,
60                weight: 100,
61            })
62        } else {
63            None
64        }
65    }
66
67    /// Convert from config UpstreamTarget
68    pub fn from_config(config: &sentinel_config::UpstreamTarget) -> Option<Self> {
69        Self::from_address(&config.address).map(|mut t| {
70            t.weight = config.weight;
71            t
72        })
73    }
74
75    /// Get the full address string
76    pub fn full_address(&self) -> String {
77        format!("{}:{}", self.address, self.port)
78    }
79}
80
81// ============================================================================
82// Load Balancing
83// ============================================================================
84
85// Load balancing algorithm implementations
86pub mod adaptive;
87pub mod consistent_hash;
88pub mod health;
89pub mod p2c;
90
91// Re-export commonly used types from sub-modules
92pub use adaptive::{AdaptiveBalancer, AdaptiveConfig};
93pub use consistent_hash::{ConsistentHashBalancer, ConsistentHashConfig};
94pub use health::{ActiveHealthChecker, HealthCheckRunner};
95pub use p2c::{P2cBalancer, P2cConfig};
96
97/// Request context for load balancer decisions
98#[derive(Debug, Clone)]
99pub struct RequestContext {
100    pub client_ip: Option<std::net::SocketAddr>,
101    pub headers: HashMap<String, String>,
102    pub path: String,
103    pub method: String,
104}
105
106/// Load balancer trait for different algorithms
107#[async_trait]
108pub trait LoadBalancer: Send + Sync {
109    /// Select next upstream target
110    async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection>;
111
112    /// Report target health status
113    async fn report_health(&self, address: &str, healthy: bool);
114
115    /// Get all healthy targets
116    async fn healthy_targets(&self) -> Vec<String>;
117
118    /// Release connection (for connection tracking)
119    async fn release(&self, _selection: &TargetSelection) {
120        // Default implementation - no-op
121    }
122
123    /// Report request result (for adaptive algorithms)
124    async fn report_result(
125        &self,
126        _selection: &TargetSelection,
127        _success: bool,
128        _latency: Option<Duration>,
129    ) {
130        // Default implementation - no-op
131    }
132}
133
134/// Selected upstream target
135#[derive(Debug, Clone)]
136pub struct TargetSelection {
137    /// Target address
138    pub address: String,
139    /// Target weight
140    pub weight: u32,
141    /// Target metadata
142    pub metadata: HashMap<String, String>,
143}
144
145/// Upstream pool managing multiple backend servers
146pub struct UpstreamPool {
147    /// Pool identifier
148    id: UpstreamId,
149    /// Configured targets
150    targets: Vec<UpstreamTarget>,
151    /// Load balancer implementation
152    load_balancer: Arc<dyn LoadBalancer>,
153    /// Connection pool configuration (Pingora handles actual pooling)
154    pool_config: ConnectionPoolConfig,
155    /// HTTP version configuration
156    http_version: HttpVersionOptions,
157    /// Whether TLS is enabled for this upstream
158    tls_enabled: bool,
159    /// SNI for TLS connections
160    tls_sni: Option<String>,
161    /// TLS configuration for upstream mTLS (client certificates)
162    tls_config: Option<sentinel_config::UpstreamTlsConfig>,
163    /// Circuit breakers per target
164    circuit_breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
165    /// Pool statistics
166    stats: Arc<PoolStats>,
167}
168
169// Note: Active health checking is handled by the PassiveHealthChecker in health.rs
170// and via load balancer health reporting. A future enhancement could add active
171// HTTP/TCP health probes here.
172
173/// Connection pool configuration for Pingora's built-in pooling
174///
175/// Note: Actual connection pooling is handled by Pingora internally.
176/// This struct holds configuration that is applied to peer options.
177pub struct ConnectionPoolConfig {
178    /// Maximum connections per target (informational - Pingora manages actual pooling)
179    pub max_connections: usize,
180    /// Maximum idle connections (informational - Pingora manages actual pooling)
181    pub max_idle: usize,
182    /// Maximum idle timeout for pooled connections
183    pub idle_timeout: Duration,
184    /// Maximum connection lifetime (None = unlimited)
185    pub max_lifetime: Option<Duration>,
186    /// Connection timeout
187    pub connection_timeout: Duration,
188    /// Read timeout
189    pub read_timeout: Duration,
190    /// Write timeout
191    pub write_timeout: Duration,
192}
193
194/// HTTP version configuration for upstream connections
195pub struct HttpVersionOptions {
196    /// Minimum HTTP version (1 or 2)
197    pub min_version: u8,
198    /// Maximum HTTP version (1 or 2)
199    pub max_version: u8,
200    /// H2 ping interval (0 to disable)
201    pub h2_ping_interval: Duration,
202    /// Maximum concurrent H2 streams per connection
203    pub max_h2_streams: usize,
204}
205
206impl ConnectionPoolConfig {
207    /// Create from upstream config
208    pub fn from_config(
209        pool_config: &sentinel_config::ConnectionPoolConfig,
210        timeouts: &sentinel_config::UpstreamTimeouts,
211    ) -> Self {
212        Self {
213            max_connections: pool_config.max_connections,
214            max_idle: pool_config.max_idle,
215            idle_timeout: Duration::from_secs(pool_config.idle_timeout_secs),
216            max_lifetime: pool_config.max_lifetime_secs.map(Duration::from_secs),
217            connection_timeout: Duration::from_secs(timeouts.connect_secs),
218            read_timeout: Duration::from_secs(timeouts.read_secs),
219            write_timeout: Duration::from_secs(timeouts.write_secs),
220        }
221    }
222}
223
224// CircuitBreaker is imported from sentinel_common
225
226/// Pool statistics
227#[derive(Default)]
228pub struct PoolStats {
229    /// Total requests
230    pub requests: AtomicU64,
231    /// Successful requests
232    pub successes: AtomicU64,
233    /// Failed requests
234    pub failures: AtomicU64,
235    /// Retried requests
236    pub retries: AtomicU64,
237    /// Circuit breaker trips
238    pub circuit_breaker_trips: AtomicU64,
239}
240
241/// Snapshot of pool configuration for metrics/debugging
242#[derive(Debug, Clone)]
243pub struct PoolConfigSnapshot {
244    /// Maximum connections per target
245    pub max_connections: usize,
246    /// Maximum idle connections
247    pub max_idle: usize,
248    /// Idle timeout in seconds
249    pub idle_timeout_secs: u64,
250    /// Maximum connection lifetime in seconds (None = unlimited)
251    pub max_lifetime_secs: Option<u64>,
252    /// Connection timeout in seconds
253    pub connection_timeout_secs: u64,
254    /// Read timeout in seconds
255    pub read_timeout_secs: u64,
256    /// Write timeout in seconds
257    pub write_timeout_secs: u64,
258}
259
260/// Round-robin load balancer
261struct RoundRobinBalancer {
262    targets: Vec<UpstreamTarget>,
263    current: AtomicUsize,
264    health_status: Arc<RwLock<HashMap<String, bool>>>,
265}
266
267impl RoundRobinBalancer {
268    fn new(targets: Vec<UpstreamTarget>) -> Self {
269        let mut health_status = HashMap::new();
270        for target in &targets {
271            health_status.insert(target.full_address(), true);
272        }
273
274        Self {
275            targets,
276            current: AtomicUsize::new(0),
277            health_status: Arc::new(RwLock::new(health_status)),
278        }
279    }
280}
281
282#[async_trait]
283impl LoadBalancer for RoundRobinBalancer {
284    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
285        trace!(
286            total_targets = self.targets.len(),
287            algorithm = "round_robin",
288            "Selecting upstream target"
289        );
290
291        let health = self.health_status.read().await;
292        let healthy_targets: Vec<_> = self
293            .targets
294            .iter()
295            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
296            .collect();
297
298        if healthy_targets.is_empty() {
299            warn!(
300                total_targets = self.targets.len(),
301                algorithm = "round_robin",
302                "No healthy upstream targets available"
303            );
304            return Err(SentinelError::NoHealthyUpstream);
305        }
306
307        let index = self.current.fetch_add(1, Ordering::Relaxed) % healthy_targets.len();
308        let target = healthy_targets[index];
309
310        trace!(
311            selected_target = %target.full_address(),
312            healthy_count = healthy_targets.len(),
313            index = index,
314            algorithm = "round_robin",
315            "Selected target via round robin"
316        );
317
318        Ok(TargetSelection {
319            address: target.full_address(),
320            weight: target.weight,
321            metadata: HashMap::new(),
322        })
323    }
324
325    async fn report_health(&self, address: &str, healthy: bool) {
326        trace!(
327            target = %address,
328            healthy = healthy,
329            algorithm = "round_robin",
330            "Updating target health status"
331        );
332        self.health_status
333            .write()
334            .await
335            .insert(address.to_string(), healthy);
336    }
337
338    async fn healthy_targets(&self) -> Vec<String> {
339        self.health_status
340            .read()
341            .await
342            .iter()
343            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
344            .collect()
345    }
346}
347
348/// Least connections load balancer
349struct LeastConnectionsBalancer {
350    targets: Vec<UpstreamTarget>,
351    connections: Arc<RwLock<HashMap<String, usize>>>,
352    health_status: Arc<RwLock<HashMap<String, bool>>>,
353}
354
355impl LeastConnectionsBalancer {
356    fn new(targets: Vec<UpstreamTarget>) -> Self {
357        let mut health_status = HashMap::new();
358        let mut connections = HashMap::new();
359
360        for target in &targets {
361            let addr = target.full_address();
362            health_status.insert(addr.clone(), true);
363            connections.insert(addr, 0);
364        }
365
366        Self {
367            targets,
368            connections: Arc::new(RwLock::new(connections)),
369            health_status: Arc::new(RwLock::new(health_status)),
370        }
371    }
372}
373
374#[async_trait]
375impl LoadBalancer for LeastConnectionsBalancer {
376    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
377        trace!(
378            total_targets = self.targets.len(),
379            algorithm = "least_connections",
380            "Selecting upstream target"
381        );
382
383        let health = self.health_status.read().await;
384        let conns = self.connections.read().await;
385
386        let mut best_target = None;
387        let mut min_connections = usize::MAX;
388
389        for target in &self.targets {
390            let addr = target.full_address();
391            if !*health.get(&addr).unwrap_or(&true) {
392                trace!(
393                    target = %addr,
394                    algorithm = "least_connections",
395                    "Skipping unhealthy target"
396                );
397                continue;
398            }
399
400            let conn_count = *conns.get(&addr).unwrap_or(&0);
401            trace!(
402                target = %addr,
403                connections = conn_count,
404                "Evaluating target connection count"
405            );
406            if conn_count < min_connections {
407                min_connections = conn_count;
408                best_target = Some(target);
409            }
410        }
411
412        match best_target {
413            Some(target) => {
414                trace!(
415                    selected_target = %target.full_address(),
416                    connections = min_connections,
417                    algorithm = "least_connections",
418                    "Selected target with fewest connections"
419                );
420                Ok(TargetSelection {
421                    address: target.full_address(),
422                    weight: target.weight,
423                    metadata: HashMap::new(),
424                })
425            }
426            None => {
427                warn!(
428                    total_targets = self.targets.len(),
429                    algorithm = "least_connections",
430                    "No healthy upstream targets available"
431                );
432                Err(SentinelError::NoHealthyUpstream)
433            }
434        }
435    }
436
437    async fn report_health(&self, address: &str, healthy: bool) {
438        trace!(
439            target = %address,
440            healthy = healthy,
441            algorithm = "least_connections",
442            "Updating target health status"
443        );
444        self.health_status
445            .write()
446            .await
447            .insert(address.to_string(), healthy);
448    }
449
450    async fn healthy_targets(&self) -> Vec<String> {
451        self.health_status
452            .read()
453            .await
454            .iter()
455            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
456            .collect()
457    }
458}
459
460/// Weighted load balancer
461struct WeightedBalancer {
462    targets: Vec<UpstreamTarget>,
463    weights: Vec<u32>,
464    current_index: AtomicUsize,
465    health_status: Arc<RwLock<HashMap<String, bool>>>,
466}
467
468#[async_trait]
469impl LoadBalancer for WeightedBalancer {
470    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
471        trace!(
472            total_targets = self.targets.len(),
473            algorithm = "weighted",
474            "Selecting upstream target"
475        );
476
477        let health = self.health_status.read().await;
478        let healthy_indices: Vec<_> = self
479            .targets
480            .iter()
481            .enumerate()
482            .filter(|(_, t)| *health.get(&t.full_address()).unwrap_or(&true))
483            .map(|(i, _)| i)
484            .collect();
485
486        if healthy_indices.is_empty() {
487            warn!(
488                total_targets = self.targets.len(),
489                algorithm = "weighted",
490                "No healthy upstream targets available"
491            );
492            return Err(SentinelError::NoHealthyUpstream);
493        }
494
495        let idx = self.current_index.fetch_add(1, Ordering::Relaxed) % healthy_indices.len();
496        let target_idx = healthy_indices[idx];
497        let target = &self.targets[target_idx];
498        let weight = self.weights.get(target_idx).copied().unwrap_or(1);
499
500        trace!(
501            selected_target = %target.full_address(),
502            weight = weight,
503            healthy_count = healthy_indices.len(),
504            algorithm = "weighted",
505            "Selected target via weighted round robin"
506        );
507
508        Ok(TargetSelection {
509            address: target.full_address(),
510            weight,
511            metadata: HashMap::new(),
512        })
513    }
514
515    async fn report_health(&self, address: &str, healthy: bool) {
516        trace!(
517            target = %address,
518            healthy = healthy,
519            algorithm = "weighted",
520            "Updating target health status"
521        );
522        self.health_status
523            .write()
524            .await
525            .insert(address.to_string(), healthy);
526    }
527
528    async fn healthy_targets(&self) -> Vec<String> {
529        self.health_status
530            .read()
531            .await
532            .iter()
533            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
534            .collect()
535    }
536}
537
538/// IP hash load balancer
539struct IpHashBalancer {
540    targets: Vec<UpstreamTarget>,
541    health_status: Arc<RwLock<HashMap<String, bool>>>,
542}
543
544#[async_trait]
545impl LoadBalancer for IpHashBalancer {
546    async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
547        trace!(
548            total_targets = self.targets.len(),
549            algorithm = "ip_hash",
550            "Selecting upstream target"
551        );
552
553        let health = self.health_status.read().await;
554        let healthy_targets: Vec<_> = self
555            .targets
556            .iter()
557            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
558            .collect();
559
560        if healthy_targets.is_empty() {
561            warn!(
562                total_targets = self.targets.len(),
563                algorithm = "ip_hash",
564                "No healthy upstream targets available"
565            );
566            return Err(SentinelError::NoHealthyUpstream);
567        }
568
569        // Hash the client IP to select a target
570        let (hash, client_ip_str) = if let Some(ctx) = context {
571            if let Some(ip) = &ctx.client_ip {
572                use std::hash::{Hash, Hasher};
573                let mut hasher = std::collections::hash_map::DefaultHasher::new();
574                ip.hash(&mut hasher);
575                (hasher.finish(), Some(ip.to_string()))
576            } else {
577                (0, None)
578            }
579        } else {
580            (0, None)
581        };
582
583        let idx = (hash as usize) % healthy_targets.len();
584        let target = healthy_targets[idx];
585
586        trace!(
587            selected_target = %target.full_address(),
588            client_ip = client_ip_str.as_deref().unwrap_or("unknown"),
589            hash = hash,
590            index = idx,
591            healthy_count = healthy_targets.len(),
592            algorithm = "ip_hash",
593            "Selected target via IP hash"
594        );
595
596        Ok(TargetSelection {
597            address: target.full_address(),
598            weight: target.weight,
599            metadata: HashMap::new(),
600        })
601    }
602
603    async fn report_health(&self, address: &str, healthy: bool) {
604        trace!(
605            target = %address,
606            healthy = healthy,
607            algorithm = "ip_hash",
608            "Updating target health status"
609        );
610        self.health_status
611            .write()
612            .await
613            .insert(address.to_string(), healthy);
614    }
615
616    async fn healthy_targets(&self) -> Vec<String> {
617        self.health_status
618            .read()
619            .await
620            .iter()
621            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
622            .collect()
623    }
624}
625
626impl UpstreamPool {
627    /// Create new upstream pool from configuration
628    pub async fn new(config: UpstreamConfig) -> SentinelResult<Self> {
629        let id = UpstreamId::new(&config.id);
630
631        info!(
632            upstream_id = %config.id,
633            target_count = config.targets.len(),
634            algorithm = ?config.load_balancing,
635            "Creating upstream pool"
636        );
637
638        // Convert config targets to internal targets
639        let targets: Vec<UpstreamTarget> = config
640            .targets
641            .iter()
642            .filter_map(UpstreamTarget::from_config)
643            .collect();
644
645        if targets.is_empty() {
646            error!(
647                upstream_id = %config.id,
648                "No valid upstream targets configured"
649            );
650            return Err(SentinelError::Config {
651                message: "No valid upstream targets".to_string(),
652                source: None,
653            });
654        }
655
656        for target in &targets {
657            debug!(
658                upstream_id = %config.id,
659                target = %target.full_address(),
660                weight = target.weight,
661                "Registered upstream target"
662            );
663        }
664
665        // Create load balancer
666        debug!(
667            upstream_id = %config.id,
668            algorithm = ?config.load_balancing,
669            "Creating load balancer"
670        );
671        let load_balancer = Self::create_load_balancer(&config.load_balancing, &targets)?;
672
673        // Create connection pool configuration (Pingora handles actual pooling)
674        debug!(
675            upstream_id = %config.id,
676            max_connections = config.connection_pool.max_connections,
677            max_idle = config.connection_pool.max_idle,
678            idle_timeout_secs = config.connection_pool.idle_timeout_secs,
679            connect_timeout_secs = config.timeouts.connect_secs,
680            read_timeout_secs = config.timeouts.read_secs,
681            write_timeout_secs = config.timeouts.write_secs,
682            "Creating connection pool configuration"
683        );
684        let pool_config =
685            ConnectionPoolConfig::from_config(&config.connection_pool, &config.timeouts);
686
687        // Create HTTP version configuration
688        let http_version = HttpVersionOptions {
689            min_version: config.http_version.min_version,
690            max_version: config.http_version.max_version,
691            h2_ping_interval: if config.http_version.h2_ping_interval_secs > 0 {
692                Duration::from_secs(config.http_version.h2_ping_interval_secs)
693            } else {
694                Duration::ZERO
695            },
696            max_h2_streams: config.http_version.max_h2_streams,
697        };
698
699        // TLS configuration
700        let tls_enabled = config.tls.is_some();
701        let tls_sni = config.tls.as_ref().and_then(|t| t.sni.clone());
702        let tls_config = config.tls.clone();
703
704        // Log mTLS configuration if present
705        if let Some(ref tls) = tls_config {
706            if tls.client_cert.is_some() {
707                info!(
708                    upstream_id = %config.id,
709                    "mTLS enabled for upstream (client certificate configured)"
710                );
711            }
712        }
713
714        if http_version.max_version >= 2 && tls_enabled {
715            info!(
716                upstream_id = %config.id,
717                "HTTP/2 enabled for upstream (via ALPN)"
718            );
719        }
720
721        // Initialize circuit breakers for each target
722        let mut circuit_breakers = HashMap::new();
723        for target in &targets {
724            trace!(
725                upstream_id = %config.id,
726                target = %target.full_address(),
727                "Initializing circuit breaker for target"
728            );
729            circuit_breakers.insert(
730                target.full_address(),
731                CircuitBreaker::new(CircuitBreakerConfig::default()),
732            );
733        }
734
735        let pool = Self {
736            id: id.clone(),
737            targets,
738            load_balancer,
739            pool_config,
740            http_version,
741            tls_enabled,
742            tls_sni,
743            tls_config,
744            circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
745            stats: Arc::new(PoolStats::default()),
746        };
747
748        info!(
749            upstream_id = %id,
750            target_count = pool.targets.len(),
751            "Upstream pool created successfully"
752        );
753
754        Ok(pool)
755    }
756
757    /// Create load balancer based on algorithm
758    fn create_load_balancer(
759        algorithm: &LoadBalancingAlgorithm,
760        targets: &[UpstreamTarget],
761    ) -> SentinelResult<Arc<dyn LoadBalancer>> {
762        let balancer: Arc<dyn LoadBalancer> = match algorithm {
763            LoadBalancingAlgorithm::RoundRobin => {
764                Arc::new(RoundRobinBalancer::new(targets.to_vec()))
765            }
766            LoadBalancingAlgorithm::LeastConnections => {
767                Arc::new(LeastConnectionsBalancer::new(targets.to_vec()))
768            }
769            LoadBalancingAlgorithm::Weighted => {
770                let weights: Vec<u32> = targets.iter().map(|t| t.weight).collect();
771                Arc::new(WeightedBalancer {
772                    targets: targets.to_vec(),
773                    weights,
774                    current_index: AtomicUsize::new(0),
775                    health_status: Arc::new(RwLock::new(HashMap::new())),
776                })
777            }
778            LoadBalancingAlgorithm::IpHash => Arc::new(IpHashBalancer {
779                targets: targets.to_vec(),
780                health_status: Arc::new(RwLock::new(HashMap::new())),
781            }),
782            LoadBalancingAlgorithm::Random => Arc::new(RoundRobinBalancer::new(targets.to_vec())),
783            LoadBalancingAlgorithm::ConsistentHash => Arc::new(ConsistentHashBalancer::new(
784                targets.to_vec(),
785                ConsistentHashConfig::default(),
786            )),
787            LoadBalancingAlgorithm::PowerOfTwoChoices => {
788                Arc::new(P2cBalancer::new(targets.to_vec(), P2cConfig::default()))
789            }
790            LoadBalancingAlgorithm::Adaptive => Arc::new(AdaptiveBalancer::new(
791                targets.to_vec(),
792                AdaptiveConfig::default(),
793            )),
794        };
795        Ok(balancer)
796    }
797
798    /// Select next upstream peer
799    pub async fn select_peer(&self, context: Option<&RequestContext>) -> SentinelResult<HttpPeer> {
800        let request_num = self.stats.requests.fetch_add(1, Ordering::Relaxed) + 1;
801
802        trace!(
803            upstream_id = %self.id,
804            request_num = request_num,
805            target_count = self.targets.len(),
806            "Starting peer selection"
807        );
808
809        let mut attempts = 0;
810        let max_attempts = self.targets.len() * 2;
811
812        while attempts < max_attempts {
813            attempts += 1;
814
815            trace!(
816                upstream_id = %self.id,
817                attempt = attempts,
818                max_attempts = max_attempts,
819                "Attempting to select peer"
820            );
821
822            let selection = match self.load_balancer.select(context).await {
823                Ok(s) => s,
824                Err(e) => {
825                    warn!(
826                        upstream_id = %self.id,
827                        attempt = attempts,
828                        error = %e,
829                        "Load balancer selection failed"
830                    );
831                    continue;
832                }
833            };
834
835            trace!(
836                upstream_id = %self.id,
837                target = %selection.address,
838                attempt = attempts,
839                "Load balancer selected target"
840            );
841
842            // Check circuit breaker
843            let breakers = self.circuit_breakers.read().await;
844            if let Some(breaker) = breakers.get(&selection.address) {
845                if !breaker.is_closed().await {
846                    debug!(
847                        upstream_id = %self.id,
848                        target = %selection.address,
849                        attempt = attempts,
850                        "Circuit breaker is open, skipping target"
851                    );
852                    self.stats
853                        .circuit_breaker_trips
854                        .fetch_add(1, Ordering::Relaxed);
855                    continue;
856                }
857            }
858
859            // Create peer with pooling options
860            // Note: Pingora handles actual connection pooling internally based on
861            // peer.options.idle_timeout and ServerConf.upstream_keepalive_pool_size
862            trace!(
863                upstream_id = %self.id,
864                target = %selection.address,
865                "Creating peer for upstream (Pingora handles connection reuse)"
866            );
867            let peer = self.create_peer(&selection)?;
868
869            debug!(
870                upstream_id = %self.id,
871                target = %selection.address,
872                attempt = attempts,
873                "Selected upstream peer"
874            );
875
876            self.stats.successes.fetch_add(1, Ordering::Relaxed);
877            return Ok(peer);
878        }
879
880        self.stats.failures.fetch_add(1, Ordering::Relaxed);
881        error!(
882            upstream_id = %self.id,
883            attempts = attempts,
884            max_attempts = max_attempts,
885            "Failed to select upstream after max attempts"
886        );
887        Err(SentinelError::upstream(
888            self.id.to_string(),
889            "Failed to select upstream after max attempts",
890        ))
891    }
892
893    /// Create new peer connection with connection pooling options
894    ///
895    /// Pingora handles actual connection pooling internally. When idle_timeout
896    /// is set on the peer options, Pingora will keep the connection alive and
897    /// reuse it for subsequent requests to the same upstream.
898    fn create_peer(&self, selection: &TargetSelection) -> SentinelResult<HttpPeer> {
899        // Determine SNI hostname for TLS connections
900        let sni_hostname = self.tls_sni.clone().unwrap_or_else(|| {
901            // Extract hostname from address (strip port)
902            selection
903                .address
904                .split(':')
905                .next()
906                .unwrap_or(&selection.address)
907                .to_string()
908        });
909
910        // Pre-resolve the address to avoid panics in Pingora's HttpPeer::new
911        // when DNS resolution fails (e.g., when a container is killed)
912        let resolved_address = selection
913            .address
914            .to_socket_addrs()
915            .map_err(|e| {
916                error!(
917                    upstream = %self.id,
918                    address = %selection.address,
919                    error = %e,
920                    "Failed to resolve upstream address"
921                );
922                SentinelError::Upstream {
923                    upstream: self.id.to_string(),
924                    message: format!("DNS resolution failed for {}: {}", selection.address, e),
925                    retryable: true,
926                    source: None,
927                }
928            })?
929            .next()
930            .ok_or_else(|| {
931                error!(
932                    upstream = %self.id,
933                    address = %selection.address,
934                    "No addresses returned from DNS resolution"
935                );
936                SentinelError::Upstream {
937                    upstream: self.id.to_string(),
938                    message: format!("No addresses for {}", selection.address),
939                    retryable: true,
940                    source: None,
941                }
942            })?;
943
944        // Use the resolved IP address to create the peer
945        let mut peer = HttpPeer::new(resolved_address, self.tls_enabled, sni_hostname.clone());
946
947        // Configure connection pooling options for better performance
948        // idle_timeout enables Pingora's connection pooling - connections are
949        // kept alive and reused for this duration
950        peer.options.idle_timeout = Some(self.pool_config.idle_timeout);
951
952        // Connection timeouts
953        peer.options.connection_timeout = Some(self.pool_config.connection_timeout);
954        peer.options.total_connection_timeout = Some(Duration::from_secs(10));
955
956        // Read/write timeouts
957        peer.options.read_timeout = Some(self.pool_config.read_timeout);
958        peer.options.write_timeout = Some(self.pool_config.write_timeout);
959
960        // Enable TCP keepalive for long-lived connections
961        peer.options.tcp_keepalive = Some(pingora::protocols::TcpKeepalive {
962            idle: Duration::from_secs(60),
963            interval: Duration::from_secs(10),
964            count: 3,
965            // user_timeout is Linux-only
966            #[cfg(target_os = "linux")]
967            user_timeout: Duration::from_secs(60),
968        });
969
970        // Configure HTTP version and ALPN for TLS connections
971        if self.tls_enabled {
972            // Set ALPN protocols based on configured HTTP version range
973            let alpn = match (self.http_version.min_version, self.http_version.max_version) {
974                (2, _) => {
975                    // HTTP/2 only - use h2 ALPN
976                    pingora::upstreams::peer::ALPN::H2
977                }
978                (1, 2) | (_, 2) => {
979                    // Prefer HTTP/2 but fall back to HTTP/1.1
980                    pingora::upstreams::peer::ALPN::H2H1
981                }
982                _ => {
983                    // HTTP/1.1 only
984                    pingora::upstreams::peer::ALPN::H1
985                }
986            };
987            peer.options.alpn = alpn;
988
989            // Configure TLS verification options based on upstream config
990            if let Some(ref tls_config) = self.tls_config {
991                // Skip certificate verification if configured (DANGEROUS - testing only)
992                if tls_config.insecure_skip_verify {
993                    peer.options.verify_cert = false;
994                    peer.options.verify_hostname = false;
995                    warn!(
996                        upstream_id = %self.id,
997                        target = %selection.address,
998                        "TLS certificate verification DISABLED (insecure_skip_verify=true)"
999                    );
1000                }
1001
1002                // Set alternative CN for verification if SNI differs from actual hostname
1003                if let Some(ref sni) = tls_config.sni {
1004                    peer.options.alternative_cn = Some(sni.clone());
1005                    trace!(
1006                        upstream_id = %self.id,
1007                        target = %selection.address,
1008                        alternative_cn = %sni,
1009                        "Set alternative CN for TLS verification"
1010                    );
1011                }
1012
1013                // Log mTLS client certificate configuration
1014                // Note: Full mTLS client cert support requires Pingora connector customization
1015                // The certificates are loaded but Pingora's standard connector doesn't expose
1016                // a direct way to inject client certs. For production mTLS to backends,
1017                // consider using a custom Pingora Connector implementation.
1018                if tls_config.client_cert.is_some() {
1019                    debug!(
1020                        upstream_id = %self.id,
1021                        target = %selection.address,
1022                        client_cert = ?tls_config.client_cert,
1023                        "mTLS client certificate configured (requires custom connector for full support)"
1024                    );
1025                }
1026            }
1027
1028            trace!(
1029                upstream_id = %self.id,
1030                target = %selection.address,
1031                alpn = ?peer.options.alpn,
1032                min_version = self.http_version.min_version,
1033                max_version = self.http_version.max_version,
1034                verify_cert = peer.options.verify_cert,
1035                verify_hostname = peer.options.verify_hostname,
1036                "Configured ALPN and TLS options for HTTP version negotiation"
1037            );
1038        }
1039
1040        // Configure H2-specific settings when HTTP/2 is enabled
1041        if self.http_version.max_version >= 2 {
1042            // H2 ping interval for connection health monitoring
1043            if !self.http_version.h2_ping_interval.is_zero() {
1044                peer.options.h2_ping_interval = Some(self.http_version.h2_ping_interval);
1045                trace!(
1046                    upstream_id = %self.id,
1047                    target = %selection.address,
1048                    h2_ping_interval_secs = self.http_version.h2_ping_interval.as_secs(),
1049                    "Configured H2 ping interval"
1050                );
1051            }
1052        }
1053
1054        trace!(
1055            upstream_id = %self.id,
1056            target = %selection.address,
1057            tls = self.tls_enabled,
1058            sni = %sni_hostname,
1059            idle_timeout_secs = self.pool_config.idle_timeout.as_secs(),
1060            http_max_version = self.http_version.max_version,
1061            "Created peer with Pingora connection pooling enabled"
1062        );
1063
1064        Ok(peer)
1065    }
1066
1067    /// Report connection result for a target
1068    pub async fn report_result(&self, target: &str, success: bool) {
1069        trace!(
1070            upstream_id = %self.id,
1071            target = %target,
1072            success = success,
1073            "Reporting connection result"
1074        );
1075
1076        if success {
1077            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1078                breaker.record_success().await;
1079                trace!(
1080                    upstream_id = %self.id,
1081                    target = %target,
1082                    "Recorded success in circuit breaker"
1083                );
1084            }
1085            self.load_balancer.report_health(target, true).await;
1086        } else {
1087            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1088                breaker.record_failure().await;
1089                debug!(
1090                    upstream_id = %self.id,
1091                    target = %target,
1092                    "Recorded failure in circuit breaker"
1093                );
1094            }
1095            self.load_balancer.report_health(target, false).await;
1096            self.stats.failures.fetch_add(1, Ordering::Relaxed);
1097            warn!(
1098                upstream_id = %self.id,
1099                target = %target,
1100                "Connection failure reported for target"
1101            );
1102        }
1103    }
1104
1105    /// Get pool statistics
1106    pub fn stats(&self) -> &PoolStats {
1107        &self.stats
1108    }
1109
1110    /// Get pool ID
1111    pub fn id(&self) -> &UpstreamId {
1112        &self.id
1113    }
1114
1115    /// Get target count
1116    pub fn target_count(&self) -> usize {
1117        self.targets.len()
1118    }
1119
1120    /// Get pool configuration (for metrics/debugging)
1121    pub fn pool_config(&self) -> PoolConfigSnapshot {
1122        PoolConfigSnapshot {
1123            max_connections: self.pool_config.max_connections,
1124            max_idle: self.pool_config.max_idle,
1125            idle_timeout_secs: self.pool_config.idle_timeout.as_secs(),
1126            max_lifetime_secs: self.pool_config.max_lifetime.map(|d| d.as_secs()),
1127            connection_timeout_secs: self.pool_config.connection_timeout.as_secs(),
1128            read_timeout_secs: self.pool_config.read_timeout.as_secs(),
1129            write_timeout_secs: self.pool_config.write_timeout.as_secs(),
1130        }
1131    }
1132
1133    /// Shutdown the pool
1134    ///
1135    /// Note: Pingora manages connection pooling internally, so we just log stats.
1136    pub async fn shutdown(&self) {
1137        info!(
1138            upstream_id = %self.id,
1139            target_count = self.targets.len(),
1140            total_requests = self.stats.requests.load(Ordering::Relaxed),
1141            total_successes = self.stats.successes.load(Ordering::Relaxed),
1142            total_failures = self.stats.failures.load(Ordering::Relaxed),
1143            "Shutting down upstream pool"
1144        );
1145        // Pingora handles connection cleanup internally
1146        debug!(upstream_id = %self.id, "Upstream pool shutdown complete");
1147    }
1148}