1use async_trait::async_trait;
7use pingora::upstreams::peer::HttpPeer;
8use std::collections::HashMap;
9use std::net::ToSocketAddrs;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::RwLock;
14use tracing::{debug, error, info, trace, warn};
15
16use sentinel_common::{
17 errors::{SentinelError, SentinelResult},
18 types::{CircuitBreakerConfig, LoadBalancingAlgorithm},
19 CircuitBreaker, UpstreamId,
20};
21use sentinel_config::UpstreamConfig;
22
23#[derive(Debug, Clone)]
32pub struct UpstreamTarget {
33 pub address: String,
35 pub port: u16,
37 pub weight: u32,
39}
40
41impl UpstreamTarget {
42 pub fn new(address: impl Into<String>, port: u16, weight: u32) -> Self {
44 Self {
45 address: address.into(),
46 port,
47 weight,
48 }
49 }
50
51 pub fn from_address(addr: &str) -> Option<Self> {
53 let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
54 if parts.len() == 2 {
55 let port = parts[0].parse().ok()?;
56 let address = parts[1].to_string();
57 Some(Self {
58 address,
59 port,
60 weight: 100,
61 })
62 } else {
63 None
64 }
65 }
66
67 pub fn from_config(config: &sentinel_config::UpstreamTarget) -> Option<Self> {
69 Self::from_address(&config.address).map(|mut t| {
70 t.weight = config.weight;
71 t
72 })
73 }
74
75 pub fn full_address(&self) -> String {
77 format!("{}:{}", self.address, self.port)
78 }
79}
80
81pub mod adaptive;
87pub mod consistent_hash;
88pub mod health;
89pub mod p2c;
90
91pub use adaptive::{AdaptiveBalancer, AdaptiveConfig};
93pub use consistent_hash::{ConsistentHashBalancer, ConsistentHashConfig};
94pub use health::{ActiveHealthChecker, HealthCheckRunner};
95pub use p2c::{P2cBalancer, P2cConfig};
96
97#[derive(Debug, Clone)]
99pub struct RequestContext {
100 pub client_ip: Option<std::net::SocketAddr>,
101 pub headers: HashMap<String, String>,
102 pub path: String,
103 pub method: String,
104}
105
106#[async_trait]
108pub trait LoadBalancer: Send + Sync {
109 async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection>;
111
112 async fn report_health(&self, address: &str, healthy: bool);
114
115 async fn healthy_targets(&self) -> Vec<String>;
117
118 async fn release(&self, _selection: &TargetSelection) {
120 }
122
123 async fn report_result(
125 &self,
126 _selection: &TargetSelection,
127 _success: bool,
128 _latency: Option<Duration>,
129 ) {
130 }
132}
133
134#[derive(Debug, Clone)]
136pub struct TargetSelection {
137 pub address: String,
139 pub weight: u32,
141 pub metadata: HashMap<String, String>,
143}
144
145pub struct UpstreamPool {
147 id: UpstreamId,
149 targets: Vec<UpstreamTarget>,
151 load_balancer: Arc<dyn LoadBalancer>,
153 pool_config: ConnectionPoolConfig,
155 http_version: HttpVersionOptions,
157 tls_enabled: bool,
159 tls_sni: Option<String>,
161 tls_config: Option<sentinel_config::UpstreamTlsConfig>,
163 circuit_breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
165 stats: Arc<PoolStats>,
167}
168
169pub struct ConnectionPoolConfig {
178 pub max_connections: usize,
180 pub max_idle: usize,
182 pub idle_timeout: Duration,
184 pub max_lifetime: Option<Duration>,
186 pub connection_timeout: Duration,
188 pub read_timeout: Duration,
190 pub write_timeout: Duration,
192}
193
194pub struct HttpVersionOptions {
196 pub min_version: u8,
198 pub max_version: u8,
200 pub h2_ping_interval: Duration,
202 pub max_h2_streams: usize,
204}
205
206impl ConnectionPoolConfig {
207 pub fn from_config(
209 pool_config: &sentinel_config::ConnectionPoolConfig,
210 timeouts: &sentinel_config::UpstreamTimeouts,
211 ) -> Self {
212 Self {
213 max_connections: pool_config.max_connections,
214 max_idle: pool_config.max_idle,
215 idle_timeout: Duration::from_secs(pool_config.idle_timeout_secs),
216 max_lifetime: pool_config.max_lifetime_secs.map(Duration::from_secs),
217 connection_timeout: Duration::from_secs(timeouts.connect_secs),
218 read_timeout: Duration::from_secs(timeouts.read_secs),
219 write_timeout: Duration::from_secs(timeouts.write_secs),
220 }
221 }
222}
223
224#[derive(Default)]
228pub struct PoolStats {
229 pub requests: AtomicU64,
231 pub successes: AtomicU64,
233 pub failures: AtomicU64,
235 pub retries: AtomicU64,
237 pub circuit_breaker_trips: AtomicU64,
239}
240
241#[derive(Debug, Clone)]
243pub struct PoolConfigSnapshot {
244 pub max_connections: usize,
246 pub max_idle: usize,
248 pub idle_timeout_secs: u64,
250 pub max_lifetime_secs: Option<u64>,
252 pub connection_timeout_secs: u64,
254 pub read_timeout_secs: u64,
256 pub write_timeout_secs: u64,
258}
259
260struct RoundRobinBalancer {
262 targets: Vec<UpstreamTarget>,
263 current: AtomicUsize,
264 health_status: Arc<RwLock<HashMap<String, bool>>>,
265}
266
267impl RoundRobinBalancer {
268 fn new(targets: Vec<UpstreamTarget>) -> Self {
269 let mut health_status = HashMap::new();
270 for target in &targets {
271 health_status.insert(target.full_address(), true);
272 }
273
274 Self {
275 targets,
276 current: AtomicUsize::new(0),
277 health_status: Arc::new(RwLock::new(health_status)),
278 }
279 }
280}
281
282#[async_trait]
283impl LoadBalancer for RoundRobinBalancer {
284 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
285 trace!(
286 total_targets = self.targets.len(),
287 algorithm = "round_robin",
288 "Selecting upstream target"
289 );
290
291 let health = self.health_status.read().await;
292 let healthy_targets: Vec<_> = self
293 .targets
294 .iter()
295 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
296 .collect();
297
298 if healthy_targets.is_empty() {
299 warn!(
300 total_targets = self.targets.len(),
301 algorithm = "round_robin",
302 "No healthy upstream targets available"
303 );
304 return Err(SentinelError::NoHealthyUpstream);
305 }
306
307 let index = self.current.fetch_add(1, Ordering::Relaxed) % healthy_targets.len();
308 let target = healthy_targets[index];
309
310 trace!(
311 selected_target = %target.full_address(),
312 healthy_count = healthy_targets.len(),
313 index = index,
314 algorithm = "round_robin",
315 "Selected target via round robin"
316 );
317
318 Ok(TargetSelection {
319 address: target.full_address(),
320 weight: target.weight,
321 metadata: HashMap::new(),
322 })
323 }
324
325 async fn report_health(&self, address: &str, healthy: bool) {
326 trace!(
327 target = %address,
328 healthy = healthy,
329 algorithm = "round_robin",
330 "Updating target health status"
331 );
332 self.health_status
333 .write()
334 .await
335 .insert(address.to_string(), healthy);
336 }
337
338 async fn healthy_targets(&self) -> Vec<String> {
339 self.health_status
340 .read()
341 .await
342 .iter()
343 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
344 .collect()
345 }
346}
347
348struct LeastConnectionsBalancer {
350 targets: Vec<UpstreamTarget>,
351 connections: Arc<RwLock<HashMap<String, usize>>>,
352 health_status: Arc<RwLock<HashMap<String, bool>>>,
353}
354
355impl LeastConnectionsBalancer {
356 fn new(targets: Vec<UpstreamTarget>) -> Self {
357 let mut health_status = HashMap::new();
358 let mut connections = HashMap::new();
359
360 for target in &targets {
361 let addr = target.full_address();
362 health_status.insert(addr.clone(), true);
363 connections.insert(addr, 0);
364 }
365
366 Self {
367 targets,
368 connections: Arc::new(RwLock::new(connections)),
369 health_status: Arc::new(RwLock::new(health_status)),
370 }
371 }
372}
373
374#[async_trait]
375impl LoadBalancer for LeastConnectionsBalancer {
376 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
377 trace!(
378 total_targets = self.targets.len(),
379 algorithm = "least_connections",
380 "Selecting upstream target"
381 );
382
383 let health = self.health_status.read().await;
384 let conns = self.connections.read().await;
385
386 let mut best_target = None;
387 let mut min_connections = usize::MAX;
388
389 for target in &self.targets {
390 let addr = target.full_address();
391 if !*health.get(&addr).unwrap_or(&true) {
392 trace!(
393 target = %addr,
394 algorithm = "least_connections",
395 "Skipping unhealthy target"
396 );
397 continue;
398 }
399
400 let conn_count = *conns.get(&addr).unwrap_or(&0);
401 trace!(
402 target = %addr,
403 connections = conn_count,
404 "Evaluating target connection count"
405 );
406 if conn_count < min_connections {
407 min_connections = conn_count;
408 best_target = Some(target);
409 }
410 }
411
412 match best_target {
413 Some(target) => {
414 trace!(
415 selected_target = %target.full_address(),
416 connections = min_connections,
417 algorithm = "least_connections",
418 "Selected target with fewest connections"
419 );
420 Ok(TargetSelection {
421 address: target.full_address(),
422 weight: target.weight,
423 metadata: HashMap::new(),
424 })
425 }
426 None => {
427 warn!(
428 total_targets = self.targets.len(),
429 algorithm = "least_connections",
430 "No healthy upstream targets available"
431 );
432 Err(SentinelError::NoHealthyUpstream)
433 }
434 }
435 }
436
437 async fn report_health(&self, address: &str, healthy: bool) {
438 trace!(
439 target = %address,
440 healthy = healthy,
441 algorithm = "least_connections",
442 "Updating target health status"
443 );
444 self.health_status
445 .write()
446 .await
447 .insert(address.to_string(), healthy);
448 }
449
450 async fn healthy_targets(&self) -> Vec<String> {
451 self.health_status
452 .read()
453 .await
454 .iter()
455 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
456 .collect()
457 }
458}
459
460struct WeightedBalancer {
462 targets: Vec<UpstreamTarget>,
463 weights: Vec<u32>,
464 current_index: AtomicUsize,
465 health_status: Arc<RwLock<HashMap<String, bool>>>,
466}
467
468#[async_trait]
469impl LoadBalancer for WeightedBalancer {
470 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
471 trace!(
472 total_targets = self.targets.len(),
473 algorithm = "weighted",
474 "Selecting upstream target"
475 );
476
477 let health = self.health_status.read().await;
478 let healthy_indices: Vec<_> = self
479 .targets
480 .iter()
481 .enumerate()
482 .filter(|(_, t)| *health.get(&t.full_address()).unwrap_or(&true))
483 .map(|(i, _)| i)
484 .collect();
485
486 if healthy_indices.is_empty() {
487 warn!(
488 total_targets = self.targets.len(),
489 algorithm = "weighted",
490 "No healthy upstream targets available"
491 );
492 return Err(SentinelError::NoHealthyUpstream);
493 }
494
495 let idx = self.current_index.fetch_add(1, Ordering::Relaxed) % healthy_indices.len();
496 let target_idx = healthy_indices[idx];
497 let target = &self.targets[target_idx];
498 let weight = self.weights.get(target_idx).copied().unwrap_or(1);
499
500 trace!(
501 selected_target = %target.full_address(),
502 weight = weight,
503 healthy_count = healthy_indices.len(),
504 algorithm = "weighted",
505 "Selected target via weighted round robin"
506 );
507
508 Ok(TargetSelection {
509 address: target.full_address(),
510 weight,
511 metadata: HashMap::new(),
512 })
513 }
514
515 async fn report_health(&self, address: &str, healthy: bool) {
516 trace!(
517 target = %address,
518 healthy = healthy,
519 algorithm = "weighted",
520 "Updating target health status"
521 );
522 self.health_status
523 .write()
524 .await
525 .insert(address.to_string(), healthy);
526 }
527
528 async fn healthy_targets(&self) -> Vec<String> {
529 self.health_status
530 .read()
531 .await
532 .iter()
533 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
534 .collect()
535 }
536}
537
538struct IpHashBalancer {
540 targets: Vec<UpstreamTarget>,
541 health_status: Arc<RwLock<HashMap<String, bool>>>,
542}
543
544#[async_trait]
545impl LoadBalancer for IpHashBalancer {
546 async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
547 trace!(
548 total_targets = self.targets.len(),
549 algorithm = "ip_hash",
550 "Selecting upstream target"
551 );
552
553 let health = self.health_status.read().await;
554 let healthy_targets: Vec<_> = self
555 .targets
556 .iter()
557 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
558 .collect();
559
560 if healthy_targets.is_empty() {
561 warn!(
562 total_targets = self.targets.len(),
563 algorithm = "ip_hash",
564 "No healthy upstream targets available"
565 );
566 return Err(SentinelError::NoHealthyUpstream);
567 }
568
569 let (hash, client_ip_str) = if let Some(ctx) = context {
571 if let Some(ip) = &ctx.client_ip {
572 use std::hash::{Hash, Hasher};
573 let mut hasher = std::collections::hash_map::DefaultHasher::new();
574 ip.hash(&mut hasher);
575 (hasher.finish(), Some(ip.to_string()))
576 } else {
577 (0, None)
578 }
579 } else {
580 (0, None)
581 };
582
583 let idx = (hash as usize) % healthy_targets.len();
584 let target = healthy_targets[idx];
585
586 trace!(
587 selected_target = %target.full_address(),
588 client_ip = client_ip_str.as_deref().unwrap_or("unknown"),
589 hash = hash,
590 index = idx,
591 healthy_count = healthy_targets.len(),
592 algorithm = "ip_hash",
593 "Selected target via IP hash"
594 );
595
596 Ok(TargetSelection {
597 address: target.full_address(),
598 weight: target.weight,
599 metadata: HashMap::new(),
600 })
601 }
602
603 async fn report_health(&self, address: &str, healthy: bool) {
604 trace!(
605 target = %address,
606 healthy = healthy,
607 algorithm = "ip_hash",
608 "Updating target health status"
609 );
610 self.health_status
611 .write()
612 .await
613 .insert(address.to_string(), healthy);
614 }
615
616 async fn healthy_targets(&self) -> Vec<String> {
617 self.health_status
618 .read()
619 .await
620 .iter()
621 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
622 .collect()
623 }
624}
625
626impl UpstreamPool {
627 pub async fn new(config: UpstreamConfig) -> SentinelResult<Self> {
629 let id = UpstreamId::new(&config.id);
630
631 info!(
632 upstream_id = %config.id,
633 target_count = config.targets.len(),
634 algorithm = ?config.load_balancing,
635 "Creating upstream pool"
636 );
637
638 let targets: Vec<UpstreamTarget> = config
640 .targets
641 .iter()
642 .filter_map(UpstreamTarget::from_config)
643 .collect();
644
645 if targets.is_empty() {
646 error!(
647 upstream_id = %config.id,
648 "No valid upstream targets configured"
649 );
650 return Err(SentinelError::Config {
651 message: "No valid upstream targets".to_string(),
652 source: None,
653 });
654 }
655
656 for target in &targets {
657 debug!(
658 upstream_id = %config.id,
659 target = %target.full_address(),
660 weight = target.weight,
661 "Registered upstream target"
662 );
663 }
664
665 debug!(
667 upstream_id = %config.id,
668 algorithm = ?config.load_balancing,
669 "Creating load balancer"
670 );
671 let load_balancer = Self::create_load_balancer(&config.load_balancing, &targets)?;
672
673 debug!(
675 upstream_id = %config.id,
676 max_connections = config.connection_pool.max_connections,
677 max_idle = config.connection_pool.max_idle,
678 idle_timeout_secs = config.connection_pool.idle_timeout_secs,
679 connect_timeout_secs = config.timeouts.connect_secs,
680 read_timeout_secs = config.timeouts.read_secs,
681 write_timeout_secs = config.timeouts.write_secs,
682 "Creating connection pool configuration"
683 );
684 let pool_config =
685 ConnectionPoolConfig::from_config(&config.connection_pool, &config.timeouts);
686
687 let http_version = HttpVersionOptions {
689 min_version: config.http_version.min_version,
690 max_version: config.http_version.max_version,
691 h2_ping_interval: if config.http_version.h2_ping_interval_secs > 0 {
692 Duration::from_secs(config.http_version.h2_ping_interval_secs)
693 } else {
694 Duration::ZERO
695 },
696 max_h2_streams: config.http_version.max_h2_streams,
697 };
698
699 let tls_enabled = config.tls.is_some();
701 let tls_sni = config.tls.as_ref().and_then(|t| t.sni.clone());
702 let tls_config = config.tls.clone();
703
704 if let Some(ref tls) = tls_config {
706 if tls.client_cert.is_some() {
707 info!(
708 upstream_id = %config.id,
709 "mTLS enabled for upstream (client certificate configured)"
710 );
711 }
712 }
713
714 if http_version.max_version >= 2 && tls_enabled {
715 info!(
716 upstream_id = %config.id,
717 "HTTP/2 enabled for upstream (via ALPN)"
718 );
719 }
720
721 let mut circuit_breakers = HashMap::new();
723 for target in &targets {
724 trace!(
725 upstream_id = %config.id,
726 target = %target.full_address(),
727 "Initializing circuit breaker for target"
728 );
729 circuit_breakers.insert(
730 target.full_address(),
731 CircuitBreaker::new(CircuitBreakerConfig::default()),
732 );
733 }
734
735 let pool = Self {
736 id: id.clone(),
737 targets,
738 load_balancer,
739 pool_config,
740 http_version,
741 tls_enabled,
742 tls_sni,
743 tls_config,
744 circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
745 stats: Arc::new(PoolStats::default()),
746 };
747
748 info!(
749 upstream_id = %id,
750 target_count = pool.targets.len(),
751 "Upstream pool created successfully"
752 );
753
754 Ok(pool)
755 }
756
757 fn create_load_balancer(
759 algorithm: &LoadBalancingAlgorithm,
760 targets: &[UpstreamTarget],
761 ) -> SentinelResult<Arc<dyn LoadBalancer>> {
762 let balancer: Arc<dyn LoadBalancer> = match algorithm {
763 LoadBalancingAlgorithm::RoundRobin => {
764 Arc::new(RoundRobinBalancer::new(targets.to_vec()))
765 }
766 LoadBalancingAlgorithm::LeastConnections => {
767 Arc::new(LeastConnectionsBalancer::new(targets.to_vec()))
768 }
769 LoadBalancingAlgorithm::Weighted => {
770 let weights: Vec<u32> = targets.iter().map(|t| t.weight).collect();
771 Arc::new(WeightedBalancer {
772 targets: targets.to_vec(),
773 weights,
774 current_index: AtomicUsize::new(0),
775 health_status: Arc::new(RwLock::new(HashMap::new())),
776 })
777 }
778 LoadBalancingAlgorithm::IpHash => Arc::new(IpHashBalancer {
779 targets: targets.to_vec(),
780 health_status: Arc::new(RwLock::new(HashMap::new())),
781 }),
782 LoadBalancingAlgorithm::Random => Arc::new(RoundRobinBalancer::new(targets.to_vec())),
783 LoadBalancingAlgorithm::ConsistentHash => Arc::new(ConsistentHashBalancer::new(
784 targets.to_vec(),
785 ConsistentHashConfig::default(),
786 )),
787 LoadBalancingAlgorithm::PowerOfTwoChoices => {
788 Arc::new(P2cBalancer::new(targets.to_vec(), P2cConfig::default()))
789 }
790 LoadBalancingAlgorithm::Adaptive => Arc::new(AdaptiveBalancer::new(
791 targets.to_vec(),
792 AdaptiveConfig::default(),
793 )),
794 };
795 Ok(balancer)
796 }
797
798 pub async fn select_peer(&self, context: Option<&RequestContext>) -> SentinelResult<HttpPeer> {
800 let request_num = self.stats.requests.fetch_add(1, Ordering::Relaxed) + 1;
801
802 trace!(
803 upstream_id = %self.id,
804 request_num = request_num,
805 target_count = self.targets.len(),
806 "Starting peer selection"
807 );
808
809 let mut attempts = 0;
810 let max_attempts = self.targets.len() * 2;
811
812 while attempts < max_attempts {
813 attempts += 1;
814
815 trace!(
816 upstream_id = %self.id,
817 attempt = attempts,
818 max_attempts = max_attempts,
819 "Attempting to select peer"
820 );
821
822 let selection = match self.load_balancer.select(context).await {
823 Ok(s) => s,
824 Err(e) => {
825 warn!(
826 upstream_id = %self.id,
827 attempt = attempts,
828 error = %e,
829 "Load balancer selection failed"
830 );
831 continue;
832 }
833 };
834
835 trace!(
836 upstream_id = %self.id,
837 target = %selection.address,
838 attempt = attempts,
839 "Load balancer selected target"
840 );
841
842 let breakers = self.circuit_breakers.read().await;
844 if let Some(breaker) = breakers.get(&selection.address) {
845 if !breaker.is_closed().await {
846 debug!(
847 upstream_id = %self.id,
848 target = %selection.address,
849 attempt = attempts,
850 "Circuit breaker is open, skipping target"
851 );
852 self.stats
853 .circuit_breaker_trips
854 .fetch_add(1, Ordering::Relaxed);
855 continue;
856 }
857 }
858
859 trace!(
863 upstream_id = %self.id,
864 target = %selection.address,
865 "Creating peer for upstream (Pingora handles connection reuse)"
866 );
867 let peer = self.create_peer(&selection)?;
868
869 debug!(
870 upstream_id = %self.id,
871 target = %selection.address,
872 attempt = attempts,
873 "Selected upstream peer"
874 );
875
876 self.stats.successes.fetch_add(1, Ordering::Relaxed);
877 return Ok(peer);
878 }
879
880 self.stats.failures.fetch_add(1, Ordering::Relaxed);
881 error!(
882 upstream_id = %self.id,
883 attempts = attempts,
884 max_attempts = max_attempts,
885 "Failed to select upstream after max attempts"
886 );
887 Err(SentinelError::upstream(
888 self.id.to_string(),
889 "Failed to select upstream after max attempts",
890 ))
891 }
892
893 fn create_peer(&self, selection: &TargetSelection) -> SentinelResult<HttpPeer> {
899 let sni_hostname = self.tls_sni.clone().unwrap_or_else(|| {
901 selection
903 .address
904 .split(':')
905 .next()
906 .unwrap_or(&selection.address)
907 .to_string()
908 });
909
910 let resolved_address = selection
913 .address
914 .to_socket_addrs()
915 .map_err(|e| {
916 error!(
917 upstream = %self.id,
918 address = %selection.address,
919 error = %e,
920 "Failed to resolve upstream address"
921 );
922 SentinelError::Upstream {
923 upstream: self.id.to_string(),
924 message: format!("DNS resolution failed for {}: {}", selection.address, e),
925 retryable: true,
926 source: None,
927 }
928 })?
929 .next()
930 .ok_or_else(|| {
931 error!(
932 upstream = %self.id,
933 address = %selection.address,
934 "No addresses returned from DNS resolution"
935 );
936 SentinelError::Upstream {
937 upstream: self.id.to_string(),
938 message: format!("No addresses for {}", selection.address),
939 retryable: true,
940 source: None,
941 }
942 })?;
943
944 let mut peer = HttpPeer::new(resolved_address, self.tls_enabled, sni_hostname.clone());
946
947 peer.options.idle_timeout = Some(self.pool_config.idle_timeout);
951
952 peer.options.connection_timeout = Some(self.pool_config.connection_timeout);
954 peer.options.total_connection_timeout = Some(Duration::from_secs(10));
955
956 peer.options.read_timeout = Some(self.pool_config.read_timeout);
958 peer.options.write_timeout = Some(self.pool_config.write_timeout);
959
960 peer.options.tcp_keepalive = Some(pingora::protocols::TcpKeepalive {
962 idle: Duration::from_secs(60),
963 interval: Duration::from_secs(10),
964 count: 3,
965 #[cfg(target_os = "linux")]
967 user_timeout: Duration::from_secs(60),
968 });
969
970 if self.tls_enabled {
972 let alpn = match (self.http_version.min_version, self.http_version.max_version) {
974 (2, _) => {
975 pingora::upstreams::peer::ALPN::H2
977 }
978 (1, 2) | (_, 2) => {
979 pingora::upstreams::peer::ALPN::H2H1
981 }
982 _ => {
983 pingora::upstreams::peer::ALPN::H1
985 }
986 };
987 peer.options.alpn = alpn;
988
989 if let Some(ref tls_config) = self.tls_config {
991 if tls_config.insecure_skip_verify {
993 peer.options.verify_cert = false;
994 peer.options.verify_hostname = false;
995 warn!(
996 upstream_id = %self.id,
997 target = %selection.address,
998 "TLS certificate verification DISABLED (insecure_skip_verify=true)"
999 );
1000 }
1001
1002 if let Some(ref sni) = tls_config.sni {
1004 peer.options.alternative_cn = Some(sni.clone());
1005 trace!(
1006 upstream_id = %self.id,
1007 target = %selection.address,
1008 alternative_cn = %sni,
1009 "Set alternative CN for TLS verification"
1010 );
1011 }
1012
1013 if tls_config.client_cert.is_some() {
1019 debug!(
1020 upstream_id = %self.id,
1021 target = %selection.address,
1022 client_cert = ?tls_config.client_cert,
1023 "mTLS client certificate configured (requires custom connector for full support)"
1024 );
1025 }
1026 }
1027
1028 trace!(
1029 upstream_id = %self.id,
1030 target = %selection.address,
1031 alpn = ?peer.options.alpn,
1032 min_version = self.http_version.min_version,
1033 max_version = self.http_version.max_version,
1034 verify_cert = peer.options.verify_cert,
1035 verify_hostname = peer.options.verify_hostname,
1036 "Configured ALPN and TLS options for HTTP version negotiation"
1037 );
1038 }
1039
1040 if self.http_version.max_version >= 2 {
1042 if !self.http_version.h2_ping_interval.is_zero() {
1044 peer.options.h2_ping_interval = Some(self.http_version.h2_ping_interval);
1045 trace!(
1046 upstream_id = %self.id,
1047 target = %selection.address,
1048 h2_ping_interval_secs = self.http_version.h2_ping_interval.as_secs(),
1049 "Configured H2 ping interval"
1050 );
1051 }
1052 }
1053
1054 trace!(
1055 upstream_id = %self.id,
1056 target = %selection.address,
1057 tls = self.tls_enabled,
1058 sni = %sni_hostname,
1059 idle_timeout_secs = self.pool_config.idle_timeout.as_secs(),
1060 http_max_version = self.http_version.max_version,
1061 "Created peer with Pingora connection pooling enabled"
1062 );
1063
1064 Ok(peer)
1065 }
1066
1067 pub async fn report_result(&self, target: &str, success: bool) {
1069 trace!(
1070 upstream_id = %self.id,
1071 target = %target,
1072 success = success,
1073 "Reporting connection result"
1074 );
1075
1076 if success {
1077 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1078 breaker.record_success().await;
1079 trace!(
1080 upstream_id = %self.id,
1081 target = %target,
1082 "Recorded success in circuit breaker"
1083 );
1084 }
1085 self.load_balancer.report_health(target, true).await;
1086 } else {
1087 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1088 breaker.record_failure().await;
1089 debug!(
1090 upstream_id = %self.id,
1091 target = %target,
1092 "Recorded failure in circuit breaker"
1093 );
1094 }
1095 self.load_balancer.report_health(target, false).await;
1096 self.stats.failures.fetch_add(1, Ordering::Relaxed);
1097 warn!(
1098 upstream_id = %self.id,
1099 target = %target,
1100 "Connection failure reported for target"
1101 );
1102 }
1103 }
1104
1105 pub fn stats(&self) -> &PoolStats {
1107 &self.stats
1108 }
1109
1110 pub fn id(&self) -> &UpstreamId {
1112 &self.id
1113 }
1114
1115 pub fn target_count(&self) -> usize {
1117 self.targets.len()
1118 }
1119
1120 pub fn pool_config(&self) -> PoolConfigSnapshot {
1122 PoolConfigSnapshot {
1123 max_connections: self.pool_config.max_connections,
1124 max_idle: self.pool_config.max_idle,
1125 idle_timeout_secs: self.pool_config.idle_timeout.as_secs(),
1126 max_lifetime_secs: self.pool_config.max_lifetime.map(|d| d.as_secs()),
1127 connection_timeout_secs: self.pool_config.connection_timeout.as_secs(),
1128 read_timeout_secs: self.pool_config.read_timeout.as_secs(),
1129 write_timeout_secs: self.pool_config.write_timeout.as_secs(),
1130 }
1131 }
1132
1133 pub async fn shutdown(&self) {
1137 info!(
1138 upstream_id = %self.id,
1139 target_count = self.targets.len(),
1140 total_requests = self.stats.requests.load(Ordering::Relaxed),
1141 total_successes = self.stats.successes.load(Ordering::Relaxed),
1142 total_failures = self.stats.failures.load(Ordering::Relaxed),
1143 "Shutting down upstream pool"
1144 );
1145 debug!(upstream_id = %self.id, "Upstream pool shutdown complete");
1147 }
1148}