sentinel_proxy/upstream/
mod.rs

1//! Upstream pool management module for Sentinel proxy
2//!
3//! This module handles upstream server pools, load balancing, health checking,
4//! connection pooling, and retry logic with circuit breakers.
5
6use async_trait::async_trait;
7use pingora::upstreams::peer::HttpPeer;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::RwLock;
13use tracing::{debug, error, info, trace, warn};
14
15use sentinel_common::{
16    errors::{SentinelError, SentinelResult},
17    types::{CircuitBreakerConfig, LoadBalancingAlgorithm, RetryPolicy},
18    CircuitBreaker, UpstreamId,
19};
20use sentinel_config::{HealthCheck as HealthCheckConfig, UpstreamConfig};
21
22// ============================================================================
23// Internal Upstream Target Type
24// ============================================================================
25
26/// Internal upstream target representation for load balancers
27///
28/// This is a simplified representation used internally by load balancers,
29/// separate from the user-facing config UpstreamTarget.
30#[derive(Debug, Clone)]
31pub struct UpstreamTarget {
32    /// Target IP address or hostname
33    pub address: String,
34    /// Target port
35    pub port: u16,
36    /// Weight for weighted load balancing
37    pub weight: u32,
38}
39
40impl UpstreamTarget {
41    /// Create a new upstream target
42    pub fn new(address: impl Into<String>, port: u16, weight: u32) -> Self {
43        Self {
44            address: address.into(),
45            port,
46            weight,
47        }
48    }
49
50    /// Create from a "host:port" string with default weight
51    pub fn from_address(addr: &str) -> Option<Self> {
52        let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
53        if parts.len() == 2 {
54            let port = parts[0].parse().ok()?;
55            let address = parts[1].to_string();
56            Some(Self {
57                address,
58                port,
59                weight: 100,
60            })
61        } else {
62            None
63        }
64    }
65
66    /// Convert from config UpstreamTarget
67    pub fn from_config(config: &sentinel_config::UpstreamTarget) -> Option<Self> {
68        Self::from_address(&config.address).map(|mut t| {
69            t.weight = config.weight;
70            t
71        })
72    }
73
74    /// Get the full address string
75    pub fn full_address(&self) -> String {
76        format!("{}:{}", self.address, self.port)
77    }
78}
79
80// ============================================================================
81// Load Balancing
82// ============================================================================
83
84// Load balancing algorithm implementations
85pub mod adaptive;
86pub mod consistent_hash;
87pub mod p2c;
88
89// Re-export commonly used types from sub-modules
90pub use adaptive::{AdaptiveBalancer, AdaptiveConfig};
91pub use consistent_hash::{
92    ConsistentHashBalancer, ConsistentHashConfig,
93};
94pub use p2c::{P2cBalancer, P2cConfig};
95
96/// Request context for load balancer decisions
97#[derive(Debug, Clone)]
98pub struct RequestContext {
99    pub client_ip: Option<std::net::SocketAddr>,
100    pub headers: HashMap<String, String>,
101    pub path: String,
102    pub method: String,
103}
104
105/// Load balancer trait for different algorithms
106#[async_trait]
107pub trait LoadBalancer: Send + Sync {
108    /// Select next upstream target
109    async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection>;
110
111    /// Report target health status
112    async fn report_health(&self, address: &str, healthy: bool);
113
114    /// Get all healthy targets
115    async fn healthy_targets(&self) -> Vec<String>;
116
117    /// Release connection (for connection tracking)
118    async fn release(&self, _selection: &TargetSelection) {
119        // Default implementation - no-op
120    }
121
122    /// Report request result (for adaptive algorithms)
123    async fn report_result(
124        &self,
125        _selection: &TargetSelection,
126        _success: bool,
127        _latency: Option<Duration>,
128    ) {
129        // Default implementation - no-op
130    }
131}
132
133/// Selected upstream target
134#[derive(Debug, Clone)]
135pub struct TargetSelection {
136    /// Target address
137    pub address: String,
138    /// Target weight
139    pub weight: u32,
140    /// Target metadata
141    pub metadata: HashMap<String, String>,
142}
143
144/// Upstream pool managing multiple backend servers
145pub struct UpstreamPool {
146    /// Pool identifier
147    id: UpstreamId,
148    /// Configured targets
149    targets: Vec<UpstreamTarget>,
150    /// Load balancer implementation
151    load_balancer: Arc<dyn LoadBalancer>,
152    /// Health checker
153    health_checker: Option<Arc<UpstreamHealthChecker>>,
154    /// Connection pool
155    connection_pool: Arc<ConnectionPool>,
156    /// Circuit breakers per target
157    circuit_breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
158    /// Retry policy
159    retry_policy: Option<RetryPolicy>,
160    /// Pool statistics
161    stats: Arc<PoolStats>,
162}
163
164/// Health checker for upstream targets
165///
166/// Performs active health checking on upstream targets to determine
167/// their availability for load balancing.
168pub struct UpstreamHealthChecker {
169    /// Check configuration
170    config: HealthCheckConfig,
171    /// Health status per target
172    health_status: Arc<RwLock<HashMap<String, TargetHealthStatus>>>,
173    /// Check tasks handles
174    check_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
175}
176
177impl UpstreamHealthChecker {
178    /// Create a new health checker
179    pub fn new(config: HealthCheckConfig) -> Self {
180        Self {
181            config,
182            health_status: Arc::new(RwLock::new(HashMap::new())),
183            check_handles: Arc::new(RwLock::new(Vec::new())),
184        }
185    }
186}
187
188/// Health status for an upstream target
189#[derive(Debug, Clone)]
190struct TargetHealthStatus {
191    /// Is target healthy
192    healthy: bool,
193    /// Consecutive successes
194    consecutive_successes: u32,
195    /// Consecutive failures
196    consecutive_failures: u32,
197    /// Last check time
198    last_check: Instant,
199    /// Last successful check
200    last_success: Option<Instant>,
201    /// Last error message
202    last_error: Option<String>,
203}
204
205/// Connection pool for upstream connections
206pub struct ConnectionPool {
207    /// Pool configuration
208    max_connections: usize,
209    max_idle: usize,
210    idle_timeout: Duration,
211    max_lifetime: Option<Duration>,
212    /// Active connections per target
213    connections: Arc<RwLock<HashMap<String, Vec<PooledConnection>>>>,
214    /// Connection statistics
215    stats: Arc<ConnectionPoolStats>,
216}
217
218impl ConnectionPool {
219    /// Create a new connection pool
220    pub fn new(
221        max_connections: usize,
222        max_idle: usize,
223        idle_timeout: Duration,
224        max_lifetime: Option<Duration>,
225    ) -> Self {
226        Self {
227            max_connections,
228            max_idle,
229            idle_timeout,
230            max_lifetime,
231            connections: Arc::new(RwLock::new(HashMap::new())),
232            stats: Arc::new(ConnectionPoolStats::default()),
233        }
234    }
235
236    /// Acquire a connection from the pool
237    pub async fn acquire(&self, _address: &str) -> SentinelResult<Option<HttpPeer>> {
238        // TODO: Implement actual connection pooling logic
239        // For now, return None to always create new connections
240        Ok(None)
241    }
242
243    /// Close all connections in the pool
244    pub async fn close_all(&self) {
245        let mut connections = self.connections.write().await;
246        connections.clear();
247    }
248}
249
250/// Pooled connection wrapper
251struct PooledConnection {
252    /// The actual connection/peer
253    peer: HttpPeer,
254    /// Creation time
255    created: Instant,
256    /// Last used time
257    last_used: Instant,
258    /// Is currently in use
259    in_use: bool,
260}
261
262/// Connection pool statistics
263#[derive(Default)]
264struct ConnectionPoolStats {
265    /// Total connections created
266    created: AtomicU64,
267    /// Total connections reused
268    reused: AtomicU64,
269    /// Total connections closed
270    closed: AtomicU64,
271    /// Current active connections
272    active: AtomicU64,
273    /// Current idle connections
274    idle: AtomicU64,
275}
276
277// CircuitBreaker is imported from sentinel_common
278
279/// Pool statistics
280#[derive(Default)]
281pub struct PoolStats {
282    /// Total requests
283    pub requests: AtomicU64,
284    /// Successful requests
285    pub successes: AtomicU64,
286    /// Failed requests
287    pub failures: AtomicU64,
288    /// Retried requests
289    pub retries: AtomicU64,
290    /// Circuit breaker trips
291    pub circuit_breaker_trips: AtomicU64,
292}
293
294/// Round-robin load balancer
295struct RoundRobinBalancer {
296    targets: Vec<UpstreamTarget>,
297    current: AtomicUsize,
298    health_status: Arc<RwLock<HashMap<String, bool>>>,
299}
300
301impl RoundRobinBalancer {
302    fn new(targets: Vec<UpstreamTarget>) -> Self {
303        let mut health_status = HashMap::new();
304        for target in &targets {
305            health_status.insert(target.full_address(), true);
306        }
307
308        Self {
309            targets,
310            current: AtomicUsize::new(0),
311            health_status: Arc::new(RwLock::new(health_status)),
312        }
313    }
314}
315
316#[async_trait]
317impl LoadBalancer for RoundRobinBalancer {
318    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
319        trace!(
320            total_targets = self.targets.len(),
321            algorithm = "round_robin",
322            "Selecting upstream target"
323        );
324
325        let health = self.health_status.read().await;
326        let healthy_targets: Vec<_> = self
327            .targets
328            .iter()
329            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
330            .collect();
331
332        if healthy_targets.is_empty() {
333            warn!(
334                total_targets = self.targets.len(),
335                algorithm = "round_robin",
336                "No healthy upstream targets available"
337            );
338            return Err(SentinelError::NoHealthyUpstream);
339        }
340
341        let index = self.current.fetch_add(1, Ordering::Relaxed) % healthy_targets.len();
342        let target = healthy_targets[index];
343
344        trace!(
345            selected_target = %target.full_address(),
346            healthy_count = healthy_targets.len(),
347            index = index,
348            algorithm = "round_robin",
349            "Selected target via round robin"
350        );
351
352        Ok(TargetSelection {
353            address: target.full_address(),
354            weight: target.weight,
355            metadata: HashMap::new(),
356        })
357    }
358
359    async fn report_health(&self, address: &str, healthy: bool) {
360        trace!(
361            target = %address,
362            healthy = healthy,
363            algorithm = "round_robin",
364            "Updating target health status"
365        );
366        self.health_status
367            .write()
368            .await
369            .insert(address.to_string(), healthy);
370    }
371
372    async fn healthy_targets(&self) -> Vec<String> {
373        self.health_status
374            .read()
375            .await
376            .iter()
377            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
378            .collect()
379    }
380}
381
382/// Least connections load balancer
383struct LeastConnectionsBalancer {
384    targets: Vec<UpstreamTarget>,
385    connections: Arc<RwLock<HashMap<String, usize>>>,
386    health_status: Arc<RwLock<HashMap<String, bool>>>,
387}
388
389impl LeastConnectionsBalancer {
390    fn new(targets: Vec<UpstreamTarget>) -> Self {
391        let mut health_status = HashMap::new();
392        let mut connections = HashMap::new();
393
394        for target in &targets {
395            let addr = target.full_address();
396            health_status.insert(addr.clone(), true);
397            connections.insert(addr, 0);
398        }
399
400        Self {
401            targets,
402            connections: Arc::new(RwLock::new(connections)),
403            health_status: Arc::new(RwLock::new(health_status)),
404        }
405    }
406}
407
408#[async_trait]
409impl LoadBalancer for LeastConnectionsBalancer {
410    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
411        trace!(
412            total_targets = self.targets.len(),
413            algorithm = "least_connections",
414            "Selecting upstream target"
415        );
416
417        let health = self.health_status.read().await;
418        let conns = self.connections.read().await;
419
420        let mut best_target = None;
421        let mut min_connections = usize::MAX;
422
423        for target in &self.targets {
424            let addr = target.full_address();
425            if !*health.get(&addr).unwrap_or(&true) {
426                trace!(
427                    target = %addr,
428                    algorithm = "least_connections",
429                    "Skipping unhealthy target"
430                );
431                continue;
432            }
433
434            let conn_count = *conns.get(&addr).unwrap_or(&0);
435            trace!(
436                target = %addr,
437                connections = conn_count,
438                "Evaluating target connection count"
439            );
440            if conn_count < min_connections {
441                min_connections = conn_count;
442                best_target = Some(target);
443            }
444        }
445
446        match best_target {
447            Some(target) => {
448                trace!(
449                    selected_target = %target.full_address(),
450                    connections = min_connections,
451                    algorithm = "least_connections",
452                    "Selected target with fewest connections"
453                );
454                Ok(TargetSelection {
455                    address: target.full_address(),
456                    weight: target.weight,
457                    metadata: HashMap::new(),
458                })
459            }
460            None => {
461                warn!(
462                    total_targets = self.targets.len(),
463                    algorithm = "least_connections",
464                    "No healthy upstream targets available"
465                );
466                Err(SentinelError::NoHealthyUpstream)
467            }
468        }
469    }
470
471    async fn report_health(&self, address: &str, healthy: bool) {
472        trace!(
473            target = %address,
474            healthy = healthy,
475            algorithm = "least_connections",
476            "Updating target health status"
477        );
478        self.health_status
479            .write()
480            .await
481            .insert(address.to_string(), healthy);
482    }
483
484    async fn healthy_targets(&self) -> Vec<String> {
485        self.health_status
486            .read()
487            .await
488            .iter()
489            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
490            .collect()
491    }
492}
493
494/// Weighted load balancer
495struct WeightedBalancer {
496    targets: Vec<UpstreamTarget>,
497    weights: Vec<u32>,
498    current_index: AtomicUsize,
499    health_status: Arc<RwLock<HashMap<String, bool>>>,
500}
501
502#[async_trait]
503impl LoadBalancer for WeightedBalancer {
504    async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
505        trace!(
506            total_targets = self.targets.len(),
507            algorithm = "weighted",
508            "Selecting upstream target"
509        );
510
511        let health = self.health_status.read().await;
512        let healthy_indices: Vec<_> = self
513            .targets
514            .iter()
515            .enumerate()
516            .filter(|(_, t)| *health.get(&t.full_address()).unwrap_or(&true))
517            .map(|(i, _)| i)
518            .collect();
519
520        if healthy_indices.is_empty() {
521            warn!(
522                total_targets = self.targets.len(),
523                algorithm = "weighted",
524                "No healthy upstream targets available"
525            );
526            return Err(SentinelError::NoHealthyUpstream);
527        }
528
529        let idx = self.current_index.fetch_add(1, Ordering::Relaxed) % healthy_indices.len();
530        let target_idx = healthy_indices[idx];
531        let target = &self.targets[target_idx];
532        let weight = self.weights.get(target_idx).copied().unwrap_or(1);
533
534        trace!(
535            selected_target = %target.full_address(),
536            weight = weight,
537            healthy_count = healthy_indices.len(),
538            algorithm = "weighted",
539            "Selected target via weighted round robin"
540        );
541
542        Ok(TargetSelection {
543            address: target.full_address(),
544            weight,
545            metadata: HashMap::new(),
546        })
547    }
548
549    async fn report_health(&self, address: &str, healthy: bool) {
550        trace!(
551            target = %address,
552            healthy = healthy,
553            algorithm = "weighted",
554            "Updating target health status"
555        );
556        self.health_status
557            .write()
558            .await
559            .insert(address.to_string(), healthy);
560    }
561
562    async fn healthy_targets(&self) -> Vec<String> {
563        self.health_status
564            .read()
565            .await
566            .iter()
567            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
568            .collect()
569    }
570}
571
572/// IP hash load balancer
573struct IpHashBalancer {
574    targets: Vec<UpstreamTarget>,
575    health_status: Arc<RwLock<HashMap<String, bool>>>,
576}
577
578#[async_trait]
579impl LoadBalancer for IpHashBalancer {
580    async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
581        trace!(
582            total_targets = self.targets.len(),
583            algorithm = "ip_hash",
584            "Selecting upstream target"
585        );
586
587        let health = self.health_status.read().await;
588        let healthy_targets: Vec<_> = self
589            .targets
590            .iter()
591            .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
592            .collect();
593
594        if healthy_targets.is_empty() {
595            warn!(
596                total_targets = self.targets.len(),
597                algorithm = "ip_hash",
598                "No healthy upstream targets available"
599            );
600            return Err(SentinelError::NoHealthyUpstream);
601        }
602
603        // Hash the client IP to select a target
604        let (hash, client_ip_str) = if let Some(ctx) = context {
605            if let Some(ip) = &ctx.client_ip {
606                use std::hash::{Hash, Hasher};
607                let mut hasher = std::collections::hash_map::DefaultHasher::new();
608                ip.hash(&mut hasher);
609                (hasher.finish(), Some(ip.to_string()))
610            } else {
611                (0, None)
612            }
613        } else {
614            (0, None)
615        };
616
617        let idx = (hash as usize) % healthy_targets.len();
618        let target = healthy_targets[idx];
619
620        trace!(
621            selected_target = %target.full_address(),
622            client_ip = client_ip_str.as_deref().unwrap_or("unknown"),
623            hash = hash,
624            index = idx,
625            healthy_count = healthy_targets.len(),
626            algorithm = "ip_hash",
627            "Selected target via IP hash"
628        );
629
630        Ok(TargetSelection {
631            address: target.full_address(),
632            weight: target.weight,
633            metadata: HashMap::new(),
634        })
635    }
636
637    async fn report_health(&self, address: &str, healthy: bool) {
638        trace!(
639            target = %address,
640            healthy = healthy,
641            algorithm = "ip_hash",
642            "Updating target health status"
643        );
644        self.health_status
645            .write()
646            .await
647            .insert(address.to_string(), healthy);
648    }
649
650    async fn healthy_targets(&self) -> Vec<String> {
651        self.health_status
652            .read()
653            .await
654            .iter()
655            .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
656            .collect()
657    }
658}
659
660impl UpstreamPool {
661    /// Create new upstream pool from configuration
662    pub async fn new(config: UpstreamConfig) -> SentinelResult<Self> {
663        let id = UpstreamId::new(&config.id);
664
665        info!(
666            upstream_id = %config.id,
667            target_count = config.targets.len(),
668            algorithm = ?config.load_balancing,
669            "Creating upstream pool"
670        );
671
672        // Convert config targets to internal targets
673        let targets: Vec<UpstreamTarget> = config
674            .targets
675            .iter()
676            .filter_map(|t| UpstreamTarget::from_config(t))
677            .collect();
678
679        if targets.is_empty() {
680            error!(
681                upstream_id = %config.id,
682                "No valid upstream targets configured"
683            );
684            return Err(SentinelError::Config {
685                message: "No valid upstream targets".to_string(),
686                source: None,
687            });
688        }
689
690        for target in &targets {
691            debug!(
692                upstream_id = %config.id,
693                target = %target.full_address(),
694                weight = target.weight,
695                "Registered upstream target"
696            );
697        }
698
699        // Create load balancer
700        debug!(
701            upstream_id = %config.id,
702            algorithm = ?config.load_balancing,
703            "Creating load balancer"
704        );
705        let load_balancer = Self::create_load_balancer(&config.load_balancing, &targets)?;
706
707        // Create health checker if configured
708        let health_checker = config
709            .health_check
710            .as_ref()
711            .map(|hc_config| {
712                debug!(
713                    upstream_id = %config.id,
714                    check_type = ?hc_config.check_type,
715                    interval_secs = hc_config.interval_secs,
716                    "Creating health checker"
717                );
718                Arc::new(UpstreamHealthChecker::new(hc_config.clone()))
719            });
720
721        // Create connection pool
722        debug!(
723            upstream_id = %config.id,
724            max_connections = config.connection_pool.max_connections,
725            max_idle = config.connection_pool.max_idle,
726            idle_timeout_secs = config.connection_pool.idle_timeout_secs,
727            "Creating connection pool"
728        );
729        let connection_pool = Arc::new(ConnectionPool::new(
730            config.connection_pool.max_connections,
731            config.connection_pool.max_idle,
732            Duration::from_secs(config.connection_pool.idle_timeout_secs),
733            config
734                .connection_pool
735                .max_lifetime_secs
736                .map(Duration::from_secs),
737        ));
738
739        // Initialize circuit breakers for each target
740        let mut circuit_breakers = HashMap::new();
741        for target in &targets {
742            trace!(
743                upstream_id = %config.id,
744                target = %target.full_address(),
745                "Initializing circuit breaker for target"
746            );
747            circuit_breakers.insert(
748                target.full_address(),
749                CircuitBreaker::new(CircuitBreakerConfig::default()),
750            );
751        }
752
753        let pool = Self {
754            id: id.clone(),
755            targets,
756            load_balancer,
757            health_checker,
758            connection_pool,
759            circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
760            retry_policy: None,
761            stats: Arc::new(PoolStats::default()),
762        };
763
764        info!(
765            upstream_id = %id,
766            target_count = pool.targets.len(),
767            "Upstream pool created successfully"
768        );
769
770        Ok(pool)
771    }
772
773    /// Create load balancer based on algorithm
774    fn create_load_balancer(
775        algorithm: &LoadBalancingAlgorithm,
776        targets: &[UpstreamTarget],
777    ) -> SentinelResult<Arc<dyn LoadBalancer>> {
778        let balancer: Arc<dyn LoadBalancer> = match algorithm {
779            LoadBalancingAlgorithm::RoundRobin => {
780                Arc::new(RoundRobinBalancer::new(targets.to_vec()))
781            }
782            LoadBalancingAlgorithm::LeastConnections => {
783                Arc::new(LeastConnectionsBalancer::new(targets.to_vec()))
784            }
785            LoadBalancingAlgorithm::Weighted => {
786                let weights: Vec<u32> = targets.iter().map(|t| t.weight).collect();
787                Arc::new(WeightedBalancer {
788                    targets: targets.to_vec(),
789                    weights,
790                    current_index: AtomicUsize::new(0),
791                    health_status: Arc::new(RwLock::new(HashMap::new())),
792                })
793            }
794            LoadBalancingAlgorithm::IpHash => Arc::new(IpHashBalancer {
795                targets: targets.to_vec(),
796                health_status: Arc::new(RwLock::new(HashMap::new())),
797            }),
798            LoadBalancingAlgorithm::Random => {
799                Arc::new(RoundRobinBalancer::new(targets.to_vec()))
800            }
801            LoadBalancingAlgorithm::ConsistentHash => Arc::new(ConsistentHashBalancer::new(
802                targets.to_vec(),
803                ConsistentHashConfig::default(),
804            )),
805            LoadBalancingAlgorithm::PowerOfTwoChoices => {
806                Arc::new(P2cBalancer::new(targets.to_vec(), P2cConfig::default()))
807            }
808            LoadBalancingAlgorithm::Adaptive => Arc::new(AdaptiveBalancer::new(
809                targets.to_vec(),
810                AdaptiveConfig::default(),
811            )),
812        };
813        Ok(balancer)
814    }
815
816    /// Select next upstream peer
817    pub async fn select_peer(&self, context: Option<&RequestContext>) -> SentinelResult<HttpPeer> {
818        let request_num = self.stats.requests.fetch_add(1, Ordering::Relaxed) + 1;
819
820        trace!(
821            upstream_id = %self.id,
822            request_num = request_num,
823            target_count = self.targets.len(),
824            "Starting peer selection"
825        );
826
827        let mut attempts = 0;
828        let max_attempts = self.targets.len() * 2;
829
830        while attempts < max_attempts {
831            attempts += 1;
832
833            trace!(
834                upstream_id = %self.id,
835                attempt = attempts,
836                max_attempts = max_attempts,
837                "Attempting to select peer"
838            );
839
840            let selection = match self.load_balancer.select(context).await {
841                Ok(s) => s,
842                Err(e) => {
843                    warn!(
844                        upstream_id = %self.id,
845                        attempt = attempts,
846                        error = %e,
847                        "Load balancer selection failed"
848                    );
849                    continue;
850                }
851            };
852
853            trace!(
854                upstream_id = %self.id,
855                target = %selection.address,
856                attempt = attempts,
857                "Load balancer selected target"
858            );
859
860            // Check circuit breaker
861            let breakers = self.circuit_breakers.read().await;
862            if let Some(breaker) = breakers.get(&selection.address) {
863                if !breaker.is_closed().await {
864                    debug!(
865                        upstream_id = %self.id,
866                        target = %selection.address,
867                        attempt = attempts,
868                        "Circuit breaker is open, skipping target"
869                    );
870                    self.stats.circuit_breaker_trips.fetch_add(1, Ordering::Relaxed);
871                    continue;
872                }
873            }
874
875            // Try to get connection from pool
876            if let Some(peer) = self.connection_pool.acquire(&selection.address).await? {
877                debug!(
878                    upstream_id = %self.id,
879                    target = %selection.address,
880                    attempt = attempts,
881                    "Reusing pooled connection"
882                );
883                return Ok(peer);
884            }
885
886            // Create new connection
887            trace!(
888                upstream_id = %self.id,
889                target = %selection.address,
890                "Creating new connection to upstream"
891            );
892            let peer = self.create_peer(&selection)?;
893
894            debug!(
895                upstream_id = %self.id,
896                target = %selection.address,
897                attempt = attempts,
898                "Selected upstream peer"
899            );
900
901            self.stats.successes.fetch_add(1, Ordering::Relaxed);
902            return Ok(peer);
903        }
904
905        self.stats.failures.fetch_add(1, Ordering::Relaxed);
906        error!(
907            upstream_id = %self.id,
908            attempts = attempts,
909            max_attempts = max_attempts,
910            "Failed to select upstream after max attempts"
911        );
912        Err(SentinelError::upstream(
913            &self.id.to_string(),
914            "Failed to select upstream after max attempts",
915        ))
916    }
917
918    /// Create new peer connection with connection pooling options
919    fn create_peer(&self, selection: &TargetSelection) -> SentinelResult<HttpPeer> {
920        let mut peer = HttpPeer::new(
921            &selection.address,
922            false,
923            String::new(),
924        );
925
926        // Configure connection pooling options for better performance
927        // idle_timeout enables Pingora's connection pooling - connections are
928        // kept alive and reused for this duration
929        peer.options.idle_timeout = Some(self.connection_pool.idle_timeout);
930
931        // Connection timeouts
932        peer.options.connection_timeout = Some(Duration::from_secs(5));
933        peer.options.total_connection_timeout = Some(Duration::from_secs(10));
934
935        // Read/write timeouts
936        peer.options.read_timeout = Some(Duration::from_secs(60));
937        peer.options.write_timeout = Some(Duration::from_secs(60));
938
939        // Enable TCP keepalive for long-lived connections
940        peer.options.tcp_keepalive = Some(pingora::protocols::TcpKeepalive {
941            idle: Duration::from_secs(60),
942            interval: Duration::from_secs(10),
943            count: 3,
944            // user_timeout is Linux-only
945            #[cfg(target_os = "linux")]
946            user_timeout: Duration::from_secs(60),
947        });
948
949        trace!(
950            upstream_id = %self.id,
951            target = %selection.address,
952            idle_timeout_secs = self.connection_pool.idle_timeout.as_secs(),
953            "Created peer with connection pooling options"
954        );
955
956        Ok(peer)
957    }
958
959    /// Report connection result for a target
960    pub async fn report_result(&self, target: &str, success: bool) {
961        trace!(
962            upstream_id = %self.id,
963            target = %target,
964            success = success,
965            "Reporting connection result"
966        );
967
968        if success {
969            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
970                breaker.record_success().await;
971                trace!(
972                    upstream_id = %self.id,
973                    target = %target,
974                    "Recorded success in circuit breaker"
975                );
976            }
977            self.load_balancer.report_health(target, true).await;
978        } else {
979            if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
980                breaker.record_failure().await;
981                debug!(
982                    upstream_id = %self.id,
983                    target = %target,
984                    "Recorded failure in circuit breaker"
985                );
986            }
987            self.load_balancer.report_health(target, false).await;
988            self.stats.failures.fetch_add(1, Ordering::Relaxed);
989            warn!(
990                upstream_id = %self.id,
991                target = %target,
992                "Connection failure reported for target"
993            );
994        }
995    }
996
997    /// Get pool statistics
998    pub fn stats(&self) -> &PoolStats {
999        &self.stats
1000    }
1001
1002    /// Shutdown the pool
1003    pub async fn shutdown(&self) {
1004        info!(
1005            upstream_id = %self.id,
1006            target_count = self.targets.len(),
1007            total_requests = self.stats.requests.load(Ordering::Relaxed),
1008            total_successes = self.stats.successes.load(Ordering::Relaxed),
1009            total_failures = self.stats.failures.load(Ordering::Relaxed),
1010            "Shutting down upstream pool"
1011        );
1012        self.connection_pool.close_all().await;
1013        debug!(upstream_id = %self.id, "Connection pool closed");
1014    }
1015}