1use async_trait::async_trait;
7use pingora::upstreams::peer::HttpPeer;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::RwLock;
13use tracing::{debug, error, info, trace, warn};
14
15use sentinel_common::{
16 errors::{SentinelError, SentinelResult},
17 types::{CircuitBreakerConfig, LoadBalancingAlgorithm},
18 CircuitBreaker, UpstreamId,
19};
20use sentinel_config::UpstreamConfig;
21
22#[derive(Debug, Clone)]
31pub struct UpstreamTarget {
32 pub address: String,
34 pub port: u16,
36 pub weight: u32,
38}
39
40impl UpstreamTarget {
41 pub fn new(address: impl Into<String>, port: u16, weight: u32) -> Self {
43 Self {
44 address: address.into(),
45 port,
46 weight,
47 }
48 }
49
50 pub fn from_address(addr: &str) -> Option<Self> {
52 let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
53 if parts.len() == 2 {
54 let port = parts[0].parse().ok()?;
55 let address = parts[1].to_string();
56 Some(Self {
57 address,
58 port,
59 weight: 100,
60 })
61 } else {
62 None
63 }
64 }
65
66 pub fn from_config(config: &sentinel_config::UpstreamTarget) -> Option<Self> {
68 Self::from_address(&config.address).map(|mut t| {
69 t.weight = config.weight;
70 t
71 })
72 }
73
74 pub fn full_address(&self) -> String {
76 format!("{}:{}", self.address, self.port)
77 }
78}
79
80pub mod adaptive;
86pub mod consistent_hash;
87pub mod health;
88pub mod p2c;
89
90pub use adaptive::{AdaptiveBalancer, AdaptiveConfig};
92pub use consistent_hash::{ConsistentHashBalancer, ConsistentHashConfig};
93pub use health::{ActiveHealthChecker, HealthCheckRunner};
94pub use p2c::{P2cBalancer, P2cConfig};
95
96#[derive(Debug, Clone)]
98pub struct RequestContext {
99 pub client_ip: Option<std::net::SocketAddr>,
100 pub headers: HashMap<String, String>,
101 pub path: String,
102 pub method: String,
103}
104
105#[async_trait]
107pub trait LoadBalancer: Send + Sync {
108 async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection>;
110
111 async fn report_health(&self, address: &str, healthy: bool);
113
114 async fn healthy_targets(&self) -> Vec<String>;
116
117 async fn release(&self, _selection: &TargetSelection) {
119 }
121
122 async fn report_result(
124 &self,
125 _selection: &TargetSelection,
126 _success: bool,
127 _latency: Option<Duration>,
128 ) {
129 }
131}
132
133#[derive(Debug, Clone)]
135pub struct TargetSelection {
136 pub address: String,
138 pub weight: u32,
140 pub metadata: HashMap<String, String>,
142}
143
144pub struct UpstreamPool {
146 id: UpstreamId,
148 targets: Vec<UpstreamTarget>,
150 load_balancer: Arc<dyn LoadBalancer>,
152 pool_config: ConnectionPoolConfig,
154 http_version: HttpVersionOptions,
156 tls_enabled: bool,
158 tls_sni: Option<String>,
160 tls_config: Option<sentinel_config::UpstreamTlsConfig>,
162 circuit_breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
164 stats: Arc<PoolStats>,
166}
167
168pub struct ConnectionPoolConfig {
177 pub max_connections: usize,
179 pub max_idle: usize,
181 pub idle_timeout: Duration,
183 pub max_lifetime: Option<Duration>,
185 pub connection_timeout: Duration,
187 pub read_timeout: Duration,
189 pub write_timeout: Duration,
191}
192
193pub struct HttpVersionOptions {
195 pub min_version: u8,
197 pub max_version: u8,
199 pub h2_ping_interval: Duration,
201 pub max_h2_streams: usize,
203}
204
205impl ConnectionPoolConfig {
206 pub fn from_config(
208 pool_config: &sentinel_config::ConnectionPoolConfig,
209 timeouts: &sentinel_config::UpstreamTimeouts,
210 ) -> Self {
211 Self {
212 max_connections: pool_config.max_connections,
213 max_idle: pool_config.max_idle,
214 idle_timeout: Duration::from_secs(pool_config.idle_timeout_secs),
215 max_lifetime: pool_config.max_lifetime_secs.map(Duration::from_secs),
216 connection_timeout: Duration::from_secs(timeouts.connect_secs),
217 read_timeout: Duration::from_secs(timeouts.read_secs),
218 write_timeout: Duration::from_secs(timeouts.write_secs),
219 }
220 }
221}
222
223#[derive(Default)]
227pub struct PoolStats {
228 pub requests: AtomicU64,
230 pub successes: AtomicU64,
232 pub failures: AtomicU64,
234 pub retries: AtomicU64,
236 pub circuit_breaker_trips: AtomicU64,
238}
239
240#[derive(Debug, Clone)]
242pub struct PoolConfigSnapshot {
243 pub max_connections: usize,
245 pub max_idle: usize,
247 pub idle_timeout_secs: u64,
249 pub max_lifetime_secs: Option<u64>,
251 pub connection_timeout_secs: u64,
253 pub read_timeout_secs: u64,
255 pub write_timeout_secs: u64,
257}
258
259struct RoundRobinBalancer {
261 targets: Vec<UpstreamTarget>,
262 current: AtomicUsize,
263 health_status: Arc<RwLock<HashMap<String, bool>>>,
264}
265
266impl RoundRobinBalancer {
267 fn new(targets: Vec<UpstreamTarget>) -> Self {
268 let mut health_status = HashMap::new();
269 for target in &targets {
270 health_status.insert(target.full_address(), true);
271 }
272
273 Self {
274 targets,
275 current: AtomicUsize::new(0),
276 health_status: Arc::new(RwLock::new(health_status)),
277 }
278 }
279}
280
281#[async_trait]
282impl LoadBalancer for RoundRobinBalancer {
283 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
284 trace!(
285 total_targets = self.targets.len(),
286 algorithm = "round_robin",
287 "Selecting upstream target"
288 );
289
290 let health = self.health_status.read().await;
291 let healthy_targets: Vec<_> = self
292 .targets
293 .iter()
294 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
295 .collect();
296
297 if healthy_targets.is_empty() {
298 warn!(
299 total_targets = self.targets.len(),
300 algorithm = "round_robin",
301 "No healthy upstream targets available"
302 );
303 return Err(SentinelError::NoHealthyUpstream);
304 }
305
306 let index = self.current.fetch_add(1, Ordering::Relaxed) % healthy_targets.len();
307 let target = healthy_targets[index];
308
309 trace!(
310 selected_target = %target.full_address(),
311 healthy_count = healthy_targets.len(),
312 index = index,
313 algorithm = "round_robin",
314 "Selected target via round robin"
315 );
316
317 Ok(TargetSelection {
318 address: target.full_address(),
319 weight: target.weight,
320 metadata: HashMap::new(),
321 })
322 }
323
324 async fn report_health(&self, address: &str, healthy: bool) {
325 trace!(
326 target = %address,
327 healthy = healthy,
328 algorithm = "round_robin",
329 "Updating target health status"
330 );
331 self.health_status
332 .write()
333 .await
334 .insert(address.to_string(), healthy);
335 }
336
337 async fn healthy_targets(&self) -> Vec<String> {
338 self.health_status
339 .read()
340 .await
341 .iter()
342 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
343 .collect()
344 }
345}
346
347struct LeastConnectionsBalancer {
349 targets: Vec<UpstreamTarget>,
350 connections: Arc<RwLock<HashMap<String, usize>>>,
351 health_status: Arc<RwLock<HashMap<String, bool>>>,
352}
353
354impl LeastConnectionsBalancer {
355 fn new(targets: Vec<UpstreamTarget>) -> Self {
356 let mut health_status = HashMap::new();
357 let mut connections = HashMap::new();
358
359 for target in &targets {
360 let addr = target.full_address();
361 health_status.insert(addr.clone(), true);
362 connections.insert(addr, 0);
363 }
364
365 Self {
366 targets,
367 connections: Arc::new(RwLock::new(connections)),
368 health_status: Arc::new(RwLock::new(health_status)),
369 }
370 }
371}
372
373#[async_trait]
374impl LoadBalancer for LeastConnectionsBalancer {
375 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
376 trace!(
377 total_targets = self.targets.len(),
378 algorithm = "least_connections",
379 "Selecting upstream target"
380 );
381
382 let health = self.health_status.read().await;
383 let conns = self.connections.read().await;
384
385 let mut best_target = None;
386 let mut min_connections = usize::MAX;
387
388 for target in &self.targets {
389 let addr = target.full_address();
390 if !*health.get(&addr).unwrap_or(&true) {
391 trace!(
392 target = %addr,
393 algorithm = "least_connections",
394 "Skipping unhealthy target"
395 );
396 continue;
397 }
398
399 let conn_count = *conns.get(&addr).unwrap_or(&0);
400 trace!(
401 target = %addr,
402 connections = conn_count,
403 "Evaluating target connection count"
404 );
405 if conn_count < min_connections {
406 min_connections = conn_count;
407 best_target = Some(target);
408 }
409 }
410
411 match best_target {
412 Some(target) => {
413 trace!(
414 selected_target = %target.full_address(),
415 connections = min_connections,
416 algorithm = "least_connections",
417 "Selected target with fewest connections"
418 );
419 Ok(TargetSelection {
420 address: target.full_address(),
421 weight: target.weight,
422 metadata: HashMap::new(),
423 })
424 }
425 None => {
426 warn!(
427 total_targets = self.targets.len(),
428 algorithm = "least_connections",
429 "No healthy upstream targets available"
430 );
431 Err(SentinelError::NoHealthyUpstream)
432 }
433 }
434 }
435
436 async fn report_health(&self, address: &str, healthy: bool) {
437 trace!(
438 target = %address,
439 healthy = healthy,
440 algorithm = "least_connections",
441 "Updating target health status"
442 );
443 self.health_status
444 .write()
445 .await
446 .insert(address.to_string(), healthy);
447 }
448
449 async fn healthy_targets(&self) -> Vec<String> {
450 self.health_status
451 .read()
452 .await
453 .iter()
454 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
455 .collect()
456 }
457}
458
459struct WeightedBalancer {
461 targets: Vec<UpstreamTarget>,
462 weights: Vec<u32>,
463 current_index: AtomicUsize,
464 health_status: Arc<RwLock<HashMap<String, bool>>>,
465}
466
467#[async_trait]
468impl LoadBalancer for WeightedBalancer {
469 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
470 trace!(
471 total_targets = self.targets.len(),
472 algorithm = "weighted",
473 "Selecting upstream target"
474 );
475
476 let health = self.health_status.read().await;
477 let healthy_indices: Vec<_> = self
478 .targets
479 .iter()
480 .enumerate()
481 .filter(|(_, t)| *health.get(&t.full_address()).unwrap_or(&true))
482 .map(|(i, _)| i)
483 .collect();
484
485 if healthy_indices.is_empty() {
486 warn!(
487 total_targets = self.targets.len(),
488 algorithm = "weighted",
489 "No healthy upstream targets available"
490 );
491 return Err(SentinelError::NoHealthyUpstream);
492 }
493
494 let idx = self.current_index.fetch_add(1, Ordering::Relaxed) % healthy_indices.len();
495 let target_idx = healthy_indices[idx];
496 let target = &self.targets[target_idx];
497 let weight = self.weights.get(target_idx).copied().unwrap_or(1);
498
499 trace!(
500 selected_target = %target.full_address(),
501 weight = weight,
502 healthy_count = healthy_indices.len(),
503 algorithm = "weighted",
504 "Selected target via weighted round robin"
505 );
506
507 Ok(TargetSelection {
508 address: target.full_address(),
509 weight,
510 metadata: HashMap::new(),
511 })
512 }
513
514 async fn report_health(&self, address: &str, healthy: bool) {
515 trace!(
516 target = %address,
517 healthy = healthy,
518 algorithm = "weighted",
519 "Updating target health status"
520 );
521 self.health_status
522 .write()
523 .await
524 .insert(address.to_string(), healthy);
525 }
526
527 async fn healthy_targets(&self) -> Vec<String> {
528 self.health_status
529 .read()
530 .await
531 .iter()
532 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
533 .collect()
534 }
535}
536
537struct IpHashBalancer {
539 targets: Vec<UpstreamTarget>,
540 health_status: Arc<RwLock<HashMap<String, bool>>>,
541}
542
543#[async_trait]
544impl LoadBalancer for IpHashBalancer {
545 async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
546 trace!(
547 total_targets = self.targets.len(),
548 algorithm = "ip_hash",
549 "Selecting upstream target"
550 );
551
552 let health = self.health_status.read().await;
553 let healthy_targets: Vec<_> = self
554 .targets
555 .iter()
556 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
557 .collect();
558
559 if healthy_targets.is_empty() {
560 warn!(
561 total_targets = self.targets.len(),
562 algorithm = "ip_hash",
563 "No healthy upstream targets available"
564 );
565 return Err(SentinelError::NoHealthyUpstream);
566 }
567
568 let (hash, client_ip_str) = if let Some(ctx) = context {
570 if let Some(ip) = &ctx.client_ip {
571 use std::hash::{Hash, Hasher};
572 let mut hasher = std::collections::hash_map::DefaultHasher::new();
573 ip.hash(&mut hasher);
574 (hasher.finish(), Some(ip.to_string()))
575 } else {
576 (0, None)
577 }
578 } else {
579 (0, None)
580 };
581
582 let idx = (hash as usize) % healthy_targets.len();
583 let target = healthy_targets[idx];
584
585 trace!(
586 selected_target = %target.full_address(),
587 client_ip = client_ip_str.as_deref().unwrap_or("unknown"),
588 hash = hash,
589 index = idx,
590 healthy_count = healthy_targets.len(),
591 algorithm = "ip_hash",
592 "Selected target via IP hash"
593 );
594
595 Ok(TargetSelection {
596 address: target.full_address(),
597 weight: target.weight,
598 metadata: HashMap::new(),
599 })
600 }
601
602 async fn report_health(&self, address: &str, healthy: bool) {
603 trace!(
604 target = %address,
605 healthy = healthy,
606 algorithm = "ip_hash",
607 "Updating target health status"
608 );
609 self.health_status
610 .write()
611 .await
612 .insert(address.to_string(), healthy);
613 }
614
615 async fn healthy_targets(&self) -> Vec<String> {
616 self.health_status
617 .read()
618 .await
619 .iter()
620 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
621 .collect()
622 }
623}
624
625impl UpstreamPool {
626 pub async fn new(config: UpstreamConfig) -> SentinelResult<Self> {
628 let id = UpstreamId::new(&config.id);
629
630 info!(
631 upstream_id = %config.id,
632 target_count = config.targets.len(),
633 algorithm = ?config.load_balancing,
634 "Creating upstream pool"
635 );
636
637 let targets: Vec<UpstreamTarget> = config
639 .targets
640 .iter()
641 .filter_map(UpstreamTarget::from_config)
642 .collect();
643
644 if targets.is_empty() {
645 error!(
646 upstream_id = %config.id,
647 "No valid upstream targets configured"
648 );
649 return Err(SentinelError::Config {
650 message: "No valid upstream targets".to_string(),
651 source: None,
652 });
653 }
654
655 for target in &targets {
656 debug!(
657 upstream_id = %config.id,
658 target = %target.full_address(),
659 weight = target.weight,
660 "Registered upstream target"
661 );
662 }
663
664 debug!(
666 upstream_id = %config.id,
667 algorithm = ?config.load_balancing,
668 "Creating load balancer"
669 );
670 let load_balancer = Self::create_load_balancer(&config.load_balancing, &targets)?;
671
672 debug!(
674 upstream_id = %config.id,
675 max_connections = config.connection_pool.max_connections,
676 max_idle = config.connection_pool.max_idle,
677 idle_timeout_secs = config.connection_pool.idle_timeout_secs,
678 connect_timeout_secs = config.timeouts.connect_secs,
679 read_timeout_secs = config.timeouts.read_secs,
680 write_timeout_secs = config.timeouts.write_secs,
681 "Creating connection pool configuration"
682 );
683 let pool_config =
684 ConnectionPoolConfig::from_config(&config.connection_pool, &config.timeouts);
685
686 let http_version = HttpVersionOptions {
688 min_version: config.http_version.min_version,
689 max_version: config.http_version.max_version,
690 h2_ping_interval: if config.http_version.h2_ping_interval_secs > 0 {
691 Duration::from_secs(config.http_version.h2_ping_interval_secs)
692 } else {
693 Duration::ZERO
694 },
695 max_h2_streams: config.http_version.max_h2_streams,
696 };
697
698 let tls_enabled = config.tls.is_some();
700 let tls_sni = config.tls.as_ref().and_then(|t| t.sni.clone());
701 let tls_config = config.tls.clone();
702
703 if let Some(ref tls) = tls_config {
705 if tls.client_cert.is_some() {
706 info!(
707 upstream_id = %config.id,
708 "mTLS enabled for upstream (client certificate configured)"
709 );
710 }
711 }
712
713 if http_version.max_version >= 2 && tls_enabled {
714 info!(
715 upstream_id = %config.id,
716 "HTTP/2 enabled for upstream (via ALPN)"
717 );
718 }
719
720 let mut circuit_breakers = HashMap::new();
722 for target in &targets {
723 trace!(
724 upstream_id = %config.id,
725 target = %target.full_address(),
726 "Initializing circuit breaker for target"
727 );
728 circuit_breakers.insert(
729 target.full_address(),
730 CircuitBreaker::new(CircuitBreakerConfig::default()),
731 );
732 }
733
734 let pool = Self {
735 id: id.clone(),
736 targets,
737 load_balancer,
738 pool_config,
739 http_version,
740 tls_enabled,
741 tls_sni,
742 tls_config,
743 circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
744 stats: Arc::new(PoolStats::default()),
745 };
746
747 info!(
748 upstream_id = %id,
749 target_count = pool.targets.len(),
750 "Upstream pool created successfully"
751 );
752
753 Ok(pool)
754 }
755
756 fn create_load_balancer(
758 algorithm: &LoadBalancingAlgorithm,
759 targets: &[UpstreamTarget],
760 ) -> SentinelResult<Arc<dyn LoadBalancer>> {
761 let balancer: Arc<dyn LoadBalancer> = match algorithm {
762 LoadBalancingAlgorithm::RoundRobin => {
763 Arc::new(RoundRobinBalancer::new(targets.to_vec()))
764 }
765 LoadBalancingAlgorithm::LeastConnections => {
766 Arc::new(LeastConnectionsBalancer::new(targets.to_vec()))
767 }
768 LoadBalancingAlgorithm::Weighted => {
769 let weights: Vec<u32> = targets.iter().map(|t| t.weight).collect();
770 Arc::new(WeightedBalancer {
771 targets: targets.to_vec(),
772 weights,
773 current_index: AtomicUsize::new(0),
774 health_status: Arc::new(RwLock::new(HashMap::new())),
775 })
776 }
777 LoadBalancingAlgorithm::IpHash => Arc::new(IpHashBalancer {
778 targets: targets.to_vec(),
779 health_status: Arc::new(RwLock::new(HashMap::new())),
780 }),
781 LoadBalancingAlgorithm::Random => Arc::new(RoundRobinBalancer::new(targets.to_vec())),
782 LoadBalancingAlgorithm::ConsistentHash => Arc::new(ConsistentHashBalancer::new(
783 targets.to_vec(),
784 ConsistentHashConfig::default(),
785 )),
786 LoadBalancingAlgorithm::PowerOfTwoChoices => {
787 Arc::new(P2cBalancer::new(targets.to_vec(), P2cConfig::default()))
788 }
789 LoadBalancingAlgorithm::Adaptive => Arc::new(AdaptiveBalancer::new(
790 targets.to_vec(),
791 AdaptiveConfig::default(),
792 )),
793 };
794 Ok(balancer)
795 }
796
797 pub async fn select_peer(&self, context: Option<&RequestContext>) -> SentinelResult<HttpPeer> {
799 let request_num = self.stats.requests.fetch_add(1, Ordering::Relaxed) + 1;
800
801 trace!(
802 upstream_id = %self.id,
803 request_num = request_num,
804 target_count = self.targets.len(),
805 "Starting peer selection"
806 );
807
808 let mut attempts = 0;
809 let max_attempts = self.targets.len() * 2;
810
811 while attempts < max_attempts {
812 attempts += 1;
813
814 trace!(
815 upstream_id = %self.id,
816 attempt = attempts,
817 max_attempts = max_attempts,
818 "Attempting to select peer"
819 );
820
821 let selection = match self.load_balancer.select(context).await {
822 Ok(s) => s,
823 Err(e) => {
824 warn!(
825 upstream_id = %self.id,
826 attempt = attempts,
827 error = %e,
828 "Load balancer selection failed"
829 );
830 continue;
831 }
832 };
833
834 trace!(
835 upstream_id = %self.id,
836 target = %selection.address,
837 attempt = attempts,
838 "Load balancer selected target"
839 );
840
841 let breakers = self.circuit_breakers.read().await;
843 if let Some(breaker) = breakers.get(&selection.address) {
844 if !breaker.is_closed().await {
845 debug!(
846 upstream_id = %self.id,
847 target = %selection.address,
848 attempt = attempts,
849 "Circuit breaker is open, skipping target"
850 );
851 self.stats
852 .circuit_breaker_trips
853 .fetch_add(1, Ordering::Relaxed);
854 continue;
855 }
856 }
857
858 trace!(
862 upstream_id = %self.id,
863 target = %selection.address,
864 "Creating peer for upstream (Pingora handles connection reuse)"
865 );
866 let peer = self.create_peer(&selection)?;
867
868 debug!(
869 upstream_id = %self.id,
870 target = %selection.address,
871 attempt = attempts,
872 "Selected upstream peer"
873 );
874
875 self.stats.successes.fetch_add(1, Ordering::Relaxed);
876 return Ok(peer);
877 }
878
879 self.stats.failures.fetch_add(1, Ordering::Relaxed);
880 error!(
881 upstream_id = %self.id,
882 attempts = attempts,
883 max_attempts = max_attempts,
884 "Failed to select upstream after max attempts"
885 );
886 Err(SentinelError::upstream(
887 self.id.to_string(),
888 "Failed to select upstream after max attempts",
889 ))
890 }
891
892 fn create_peer(&self, selection: &TargetSelection) -> SentinelResult<HttpPeer> {
898 let sni_hostname = self.tls_sni.clone().unwrap_or_else(|| {
900 selection
902 .address
903 .split(':')
904 .next()
905 .unwrap_or(&selection.address)
906 .to_string()
907 });
908
909 let mut peer = HttpPeer::new(&selection.address, self.tls_enabled, sni_hostname.clone());
910
911 peer.options.idle_timeout = Some(self.pool_config.idle_timeout);
915
916 peer.options.connection_timeout = Some(self.pool_config.connection_timeout);
918 peer.options.total_connection_timeout = Some(Duration::from_secs(10));
919
920 peer.options.read_timeout = Some(self.pool_config.read_timeout);
922 peer.options.write_timeout = Some(self.pool_config.write_timeout);
923
924 peer.options.tcp_keepalive = Some(pingora::protocols::TcpKeepalive {
926 idle: Duration::from_secs(60),
927 interval: Duration::from_secs(10),
928 count: 3,
929 #[cfg(target_os = "linux")]
931 user_timeout: Duration::from_secs(60),
932 });
933
934 if self.tls_enabled {
936 let alpn = match (self.http_version.min_version, self.http_version.max_version) {
938 (2, _) => {
939 pingora::upstreams::peer::ALPN::H2
941 }
942 (1, 2) | (_, 2) => {
943 pingora::upstreams::peer::ALPN::H2H1
945 }
946 _ => {
947 pingora::upstreams::peer::ALPN::H1
949 }
950 };
951 peer.options.alpn = alpn;
952
953 if let Some(ref tls_config) = self.tls_config {
955 if tls_config.insecure_skip_verify {
957 peer.options.verify_cert = false;
958 peer.options.verify_hostname = false;
959 warn!(
960 upstream_id = %self.id,
961 target = %selection.address,
962 "TLS certificate verification DISABLED (insecure_skip_verify=true)"
963 );
964 }
965
966 if let Some(ref sni) = tls_config.sni {
968 peer.options.alternative_cn = Some(sni.clone());
969 trace!(
970 upstream_id = %self.id,
971 target = %selection.address,
972 alternative_cn = %sni,
973 "Set alternative CN for TLS verification"
974 );
975 }
976
977 if tls_config.client_cert.is_some() {
983 debug!(
984 upstream_id = %self.id,
985 target = %selection.address,
986 client_cert = ?tls_config.client_cert,
987 "mTLS client certificate configured (requires custom connector for full support)"
988 );
989 }
990 }
991
992 trace!(
993 upstream_id = %self.id,
994 target = %selection.address,
995 alpn = ?peer.options.alpn,
996 min_version = self.http_version.min_version,
997 max_version = self.http_version.max_version,
998 verify_cert = peer.options.verify_cert,
999 verify_hostname = peer.options.verify_hostname,
1000 "Configured ALPN and TLS options for HTTP version negotiation"
1001 );
1002 }
1003
1004 if self.http_version.max_version >= 2 {
1006 if !self.http_version.h2_ping_interval.is_zero() {
1008 peer.options.h2_ping_interval = Some(self.http_version.h2_ping_interval);
1009 trace!(
1010 upstream_id = %self.id,
1011 target = %selection.address,
1012 h2_ping_interval_secs = self.http_version.h2_ping_interval.as_secs(),
1013 "Configured H2 ping interval"
1014 );
1015 }
1016 }
1017
1018 trace!(
1019 upstream_id = %self.id,
1020 target = %selection.address,
1021 tls = self.tls_enabled,
1022 sni = %sni_hostname,
1023 idle_timeout_secs = self.pool_config.idle_timeout.as_secs(),
1024 http_max_version = self.http_version.max_version,
1025 "Created peer with Pingora connection pooling enabled"
1026 );
1027
1028 Ok(peer)
1029 }
1030
1031 pub async fn report_result(&self, target: &str, success: bool) {
1033 trace!(
1034 upstream_id = %self.id,
1035 target = %target,
1036 success = success,
1037 "Reporting connection result"
1038 );
1039
1040 if success {
1041 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1042 breaker.record_success().await;
1043 trace!(
1044 upstream_id = %self.id,
1045 target = %target,
1046 "Recorded success in circuit breaker"
1047 );
1048 }
1049 self.load_balancer.report_health(target, true).await;
1050 } else {
1051 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1052 breaker.record_failure().await;
1053 debug!(
1054 upstream_id = %self.id,
1055 target = %target,
1056 "Recorded failure in circuit breaker"
1057 );
1058 }
1059 self.load_balancer.report_health(target, false).await;
1060 self.stats.failures.fetch_add(1, Ordering::Relaxed);
1061 warn!(
1062 upstream_id = %self.id,
1063 target = %target,
1064 "Connection failure reported for target"
1065 );
1066 }
1067 }
1068
1069 pub fn stats(&self) -> &PoolStats {
1071 &self.stats
1072 }
1073
1074 pub fn id(&self) -> &UpstreamId {
1076 &self.id
1077 }
1078
1079 pub fn target_count(&self) -> usize {
1081 self.targets.len()
1082 }
1083
1084 pub fn pool_config(&self) -> PoolConfigSnapshot {
1086 PoolConfigSnapshot {
1087 max_connections: self.pool_config.max_connections,
1088 max_idle: self.pool_config.max_idle,
1089 idle_timeout_secs: self.pool_config.idle_timeout.as_secs(),
1090 max_lifetime_secs: self.pool_config.max_lifetime.map(|d| d.as_secs()),
1091 connection_timeout_secs: self.pool_config.connection_timeout.as_secs(),
1092 read_timeout_secs: self.pool_config.read_timeout.as_secs(),
1093 write_timeout_secs: self.pool_config.write_timeout.as_secs(),
1094 }
1095 }
1096
1097 pub async fn shutdown(&self) {
1101 info!(
1102 upstream_id = %self.id,
1103 target_count = self.targets.len(),
1104 total_requests = self.stats.requests.load(Ordering::Relaxed),
1105 total_successes = self.stats.successes.load(Ordering::Relaxed),
1106 total_failures = self.stats.failures.load(Ordering::Relaxed),
1107 "Shutting down upstream pool"
1108 );
1109 debug!(upstream_id = %self.id, "Upstream pool shutdown complete");
1111 }
1112}