1use async_trait::async_trait;
7use pingora::upstreams::peer::HttpPeer;
8use std::collections::HashMap;
9use std::net::ToSocketAddrs;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::RwLock;
14use tracing::{debug, error, info, trace, warn};
15
16use sentinel_common::{
17 errors::{SentinelError, SentinelResult},
18 types::{CircuitBreakerConfig, LoadBalancingAlgorithm},
19 CircuitBreaker, UpstreamId,
20};
21use sentinel_config::UpstreamConfig;
22
23#[derive(Debug, Clone)]
32pub struct UpstreamTarget {
33 pub address: String,
35 pub port: u16,
37 pub weight: u32,
39}
40
41impl UpstreamTarget {
42 pub fn new(address: impl Into<String>, port: u16, weight: u32) -> Self {
44 Self {
45 address: address.into(),
46 port,
47 weight,
48 }
49 }
50
51 pub fn from_address(addr: &str) -> Option<Self> {
53 let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
54 if parts.len() == 2 {
55 let port = parts[0].parse().ok()?;
56 let address = parts[1].to_string();
57 Some(Self {
58 address,
59 port,
60 weight: 100,
61 })
62 } else {
63 None
64 }
65 }
66
67 pub fn from_config(config: &sentinel_config::UpstreamTarget) -> Option<Self> {
69 Self::from_address(&config.address).map(|mut t| {
70 t.weight = config.weight;
71 t
72 })
73 }
74
75 pub fn full_address(&self) -> String {
77 format!("{}:{}", self.address, self.port)
78 }
79}
80
81pub mod adaptive;
87pub mod consistent_hash;
88pub mod health;
89pub mod p2c;
90
91pub use adaptive::{AdaptiveBalancer, AdaptiveConfig};
93pub use consistent_hash::{ConsistentHashBalancer, ConsistentHashConfig};
94pub use health::{ActiveHealthChecker, HealthCheckRunner};
95pub use p2c::{P2cBalancer, P2cConfig};
96
97#[derive(Debug, Clone)]
99pub struct RequestContext {
100 pub client_ip: Option<std::net::SocketAddr>,
101 pub headers: HashMap<String, String>,
102 pub path: String,
103 pub method: String,
104}
105
106#[async_trait]
108pub trait LoadBalancer: Send + Sync {
109 async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection>;
111
112 async fn report_health(&self, address: &str, healthy: bool);
114
115 async fn healthy_targets(&self) -> Vec<String>;
117
118 async fn release(&self, _selection: &TargetSelection) {
120 }
122
123 async fn report_result(
125 &self,
126 _selection: &TargetSelection,
127 _success: bool,
128 _latency: Option<Duration>,
129 ) {
130 }
132
133 async fn report_result_with_latency(
140 &self,
141 address: &str,
142 success: bool,
143 _latency: Option<Duration>,
144 ) {
145 self.report_health(address, success).await;
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct TargetSelection {
153 pub address: String,
155 pub weight: u32,
157 pub metadata: HashMap<String, String>,
159}
160
161pub struct UpstreamPool {
163 id: UpstreamId,
165 targets: Vec<UpstreamTarget>,
167 load_balancer: Arc<dyn LoadBalancer>,
169 pool_config: ConnectionPoolConfig,
171 http_version: HttpVersionOptions,
173 tls_enabled: bool,
175 tls_sni: Option<String>,
177 tls_config: Option<sentinel_config::UpstreamTlsConfig>,
179 circuit_breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
181 stats: Arc<PoolStats>,
183}
184
185pub struct ConnectionPoolConfig {
194 pub max_connections: usize,
196 pub max_idle: usize,
198 pub idle_timeout: Duration,
200 pub max_lifetime: Option<Duration>,
202 pub connection_timeout: Duration,
204 pub read_timeout: Duration,
206 pub write_timeout: Duration,
208}
209
210pub struct HttpVersionOptions {
212 pub min_version: u8,
214 pub max_version: u8,
216 pub h2_ping_interval: Duration,
218 pub max_h2_streams: usize,
220}
221
222impl ConnectionPoolConfig {
223 pub fn from_config(
225 pool_config: &sentinel_config::ConnectionPoolConfig,
226 timeouts: &sentinel_config::UpstreamTimeouts,
227 ) -> Self {
228 Self {
229 max_connections: pool_config.max_connections,
230 max_idle: pool_config.max_idle,
231 idle_timeout: Duration::from_secs(pool_config.idle_timeout_secs),
232 max_lifetime: pool_config.max_lifetime_secs.map(Duration::from_secs),
233 connection_timeout: Duration::from_secs(timeouts.connect_secs),
234 read_timeout: Duration::from_secs(timeouts.read_secs),
235 write_timeout: Duration::from_secs(timeouts.write_secs),
236 }
237 }
238}
239
240#[derive(Default)]
244pub struct PoolStats {
245 pub requests: AtomicU64,
247 pub successes: AtomicU64,
249 pub failures: AtomicU64,
251 pub retries: AtomicU64,
253 pub circuit_breaker_trips: AtomicU64,
255}
256
257#[derive(Debug, Clone)]
259pub struct PoolConfigSnapshot {
260 pub max_connections: usize,
262 pub max_idle: usize,
264 pub idle_timeout_secs: u64,
266 pub max_lifetime_secs: Option<u64>,
268 pub connection_timeout_secs: u64,
270 pub read_timeout_secs: u64,
272 pub write_timeout_secs: u64,
274}
275
276struct RoundRobinBalancer {
278 targets: Vec<UpstreamTarget>,
279 current: AtomicUsize,
280 health_status: Arc<RwLock<HashMap<String, bool>>>,
281}
282
283impl RoundRobinBalancer {
284 fn new(targets: Vec<UpstreamTarget>) -> Self {
285 let mut health_status = HashMap::new();
286 for target in &targets {
287 health_status.insert(target.full_address(), true);
288 }
289
290 Self {
291 targets,
292 current: AtomicUsize::new(0),
293 health_status: Arc::new(RwLock::new(health_status)),
294 }
295 }
296}
297
298#[async_trait]
299impl LoadBalancer for RoundRobinBalancer {
300 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
301 trace!(
302 total_targets = self.targets.len(),
303 algorithm = "round_robin",
304 "Selecting upstream target"
305 );
306
307 let health = self.health_status.read().await;
308 let healthy_targets: Vec<_> = self
309 .targets
310 .iter()
311 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
312 .collect();
313
314 if healthy_targets.is_empty() {
315 warn!(
316 total_targets = self.targets.len(),
317 algorithm = "round_robin",
318 "No healthy upstream targets available"
319 );
320 return Err(SentinelError::NoHealthyUpstream);
321 }
322
323 let index = self.current.fetch_add(1, Ordering::Relaxed) % healthy_targets.len();
324 let target = healthy_targets[index];
325
326 trace!(
327 selected_target = %target.full_address(),
328 healthy_count = healthy_targets.len(),
329 index = index,
330 algorithm = "round_robin",
331 "Selected target via round robin"
332 );
333
334 Ok(TargetSelection {
335 address: target.full_address(),
336 weight: target.weight,
337 metadata: HashMap::new(),
338 })
339 }
340
341 async fn report_health(&self, address: &str, healthy: bool) {
342 trace!(
343 target = %address,
344 healthy = healthy,
345 algorithm = "round_robin",
346 "Updating target health status"
347 );
348 self.health_status
349 .write()
350 .await
351 .insert(address.to_string(), healthy);
352 }
353
354 async fn healthy_targets(&self) -> Vec<String> {
355 self.health_status
356 .read()
357 .await
358 .iter()
359 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
360 .collect()
361 }
362}
363
364struct LeastConnectionsBalancer {
366 targets: Vec<UpstreamTarget>,
367 connections: Arc<RwLock<HashMap<String, usize>>>,
368 health_status: Arc<RwLock<HashMap<String, bool>>>,
369}
370
371impl LeastConnectionsBalancer {
372 fn new(targets: Vec<UpstreamTarget>) -> Self {
373 let mut health_status = HashMap::new();
374 let mut connections = HashMap::new();
375
376 for target in &targets {
377 let addr = target.full_address();
378 health_status.insert(addr.clone(), true);
379 connections.insert(addr, 0);
380 }
381
382 Self {
383 targets,
384 connections: Arc::new(RwLock::new(connections)),
385 health_status: Arc::new(RwLock::new(health_status)),
386 }
387 }
388}
389
390#[async_trait]
391impl LoadBalancer for LeastConnectionsBalancer {
392 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
393 trace!(
394 total_targets = self.targets.len(),
395 algorithm = "least_connections",
396 "Selecting upstream target"
397 );
398
399 let health = self.health_status.read().await;
400 let conns = self.connections.read().await;
401
402 let mut best_target = None;
403 let mut min_connections = usize::MAX;
404
405 for target in &self.targets {
406 let addr = target.full_address();
407 if !*health.get(&addr).unwrap_or(&true) {
408 trace!(
409 target = %addr,
410 algorithm = "least_connections",
411 "Skipping unhealthy target"
412 );
413 continue;
414 }
415
416 let conn_count = *conns.get(&addr).unwrap_or(&0);
417 trace!(
418 target = %addr,
419 connections = conn_count,
420 "Evaluating target connection count"
421 );
422 if conn_count < min_connections {
423 min_connections = conn_count;
424 best_target = Some(target);
425 }
426 }
427
428 match best_target {
429 Some(target) => {
430 trace!(
431 selected_target = %target.full_address(),
432 connections = min_connections,
433 algorithm = "least_connections",
434 "Selected target with fewest connections"
435 );
436 Ok(TargetSelection {
437 address: target.full_address(),
438 weight: target.weight,
439 metadata: HashMap::new(),
440 })
441 }
442 None => {
443 warn!(
444 total_targets = self.targets.len(),
445 algorithm = "least_connections",
446 "No healthy upstream targets available"
447 );
448 Err(SentinelError::NoHealthyUpstream)
449 }
450 }
451 }
452
453 async fn report_health(&self, address: &str, healthy: bool) {
454 trace!(
455 target = %address,
456 healthy = healthy,
457 algorithm = "least_connections",
458 "Updating target health status"
459 );
460 self.health_status
461 .write()
462 .await
463 .insert(address.to_string(), healthy);
464 }
465
466 async fn healthy_targets(&self) -> Vec<String> {
467 self.health_status
468 .read()
469 .await
470 .iter()
471 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
472 .collect()
473 }
474}
475
476struct WeightedBalancer {
478 targets: Vec<UpstreamTarget>,
479 weights: Vec<u32>,
480 current_index: AtomicUsize,
481 health_status: Arc<RwLock<HashMap<String, bool>>>,
482}
483
484#[async_trait]
485impl LoadBalancer for WeightedBalancer {
486 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
487 trace!(
488 total_targets = self.targets.len(),
489 algorithm = "weighted",
490 "Selecting upstream target"
491 );
492
493 let health = self.health_status.read().await;
494 let healthy_indices: Vec<_> = self
495 .targets
496 .iter()
497 .enumerate()
498 .filter(|(_, t)| *health.get(&t.full_address()).unwrap_or(&true))
499 .map(|(i, _)| i)
500 .collect();
501
502 if healthy_indices.is_empty() {
503 warn!(
504 total_targets = self.targets.len(),
505 algorithm = "weighted",
506 "No healthy upstream targets available"
507 );
508 return Err(SentinelError::NoHealthyUpstream);
509 }
510
511 let idx = self.current_index.fetch_add(1, Ordering::Relaxed) % healthy_indices.len();
512 let target_idx = healthy_indices[idx];
513 let target = &self.targets[target_idx];
514 let weight = self.weights.get(target_idx).copied().unwrap_or(1);
515
516 trace!(
517 selected_target = %target.full_address(),
518 weight = weight,
519 healthy_count = healthy_indices.len(),
520 algorithm = "weighted",
521 "Selected target via weighted round robin"
522 );
523
524 Ok(TargetSelection {
525 address: target.full_address(),
526 weight,
527 metadata: HashMap::new(),
528 })
529 }
530
531 async fn report_health(&self, address: &str, healthy: bool) {
532 trace!(
533 target = %address,
534 healthy = healthy,
535 algorithm = "weighted",
536 "Updating target health status"
537 );
538 self.health_status
539 .write()
540 .await
541 .insert(address.to_string(), healthy);
542 }
543
544 async fn healthy_targets(&self) -> Vec<String> {
545 self.health_status
546 .read()
547 .await
548 .iter()
549 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
550 .collect()
551 }
552}
553
554struct IpHashBalancer {
556 targets: Vec<UpstreamTarget>,
557 health_status: Arc<RwLock<HashMap<String, bool>>>,
558}
559
560#[async_trait]
561impl LoadBalancer for IpHashBalancer {
562 async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
563 trace!(
564 total_targets = self.targets.len(),
565 algorithm = "ip_hash",
566 "Selecting upstream target"
567 );
568
569 let health = self.health_status.read().await;
570 let healthy_targets: Vec<_> = self
571 .targets
572 .iter()
573 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
574 .collect();
575
576 if healthy_targets.is_empty() {
577 warn!(
578 total_targets = self.targets.len(),
579 algorithm = "ip_hash",
580 "No healthy upstream targets available"
581 );
582 return Err(SentinelError::NoHealthyUpstream);
583 }
584
585 let (hash, client_ip_str) = if let Some(ctx) = context {
587 if let Some(ip) = &ctx.client_ip {
588 use std::hash::{Hash, Hasher};
589 let mut hasher = std::collections::hash_map::DefaultHasher::new();
590 ip.hash(&mut hasher);
591 (hasher.finish(), Some(ip.to_string()))
592 } else {
593 (0, None)
594 }
595 } else {
596 (0, None)
597 };
598
599 let idx = (hash as usize) % healthy_targets.len();
600 let target = healthy_targets[idx];
601
602 trace!(
603 selected_target = %target.full_address(),
604 client_ip = client_ip_str.as_deref().unwrap_or("unknown"),
605 hash = hash,
606 index = idx,
607 healthy_count = healthy_targets.len(),
608 algorithm = "ip_hash",
609 "Selected target via IP hash"
610 );
611
612 Ok(TargetSelection {
613 address: target.full_address(),
614 weight: target.weight,
615 metadata: HashMap::new(),
616 })
617 }
618
619 async fn report_health(&self, address: &str, healthy: bool) {
620 trace!(
621 target = %address,
622 healthy = healthy,
623 algorithm = "ip_hash",
624 "Updating target health status"
625 );
626 self.health_status
627 .write()
628 .await
629 .insert(address.to_string(), healthy);
630 }
631
632 async fn healthy_targets(&self) -> Vec<String> {
633 self.health_status
634 .read()
635 .await
636 .iter()
637 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
638 .collect()
639 }
640}
641
642impl UpstreamPool {
643 pub async fn new(config: UpstreamConfig) -> SentinelResult<Self> {
645 let id = UpstreamId::new(&config.id);
646
647 info!(
648 upstream_id = %config.id,
649 target_count = config.targets.len(),
650 algorithm = ?config.load_balancing,
651 "Creating upstream pool"
652 );
653
654 let targets: Vec<UpstreamTarget> = config
656 .targets
657 .iter()
658 .filter_map(UpstreamTarget::from_config)
659 .collect();
660
661 if targets.is_empty() {
662 error!(
663 upstream_id = %config.id,
664 "No valid upstream targets configured"
665 );
666 return Err(SentinelError::Config {
667 message: "No valid upstream targets".to_string(),
668 source: None,
669 });
670 }
671
672 for target in &targets {
673 debug!(
674 upstream_id = %config.id,
675 target = %target.full_address(),
676 weight = target.weight,
677 "Registered upstream target"
678 );
679 }
680
681 debug!(
683 upstream_id = %config.id,
684 algorithm = ?config.load_balancing,
685 "Creating load balancer"
686 );
687 let load_balancer = Self::create_load_balancer(&config.load_balancing, &targets)?;
688
689 debug!(
691 upstream_id = %config.id,
692 max_connections = config.connection_pool.max_connections,
693 max_idle = config.connection_pool.max_idle,
694 idle_timeout_secs = config.connection_pool.idle_timeout_secs,
695 connect_timeout_secs = config.timeouts.connect_secs,
696 read_timeout_secs = config.timeouts.read_secs,
697 write_timeout_secs = config.timeouts.write_secs,
698 "Creating connection pool configuration"
699 );
700 let pool_config =
701 ConnectionPoolConfig::from_config(&config.connection_pool, &config.timeouts);
702
703 let http_version = HttpVersionOptions {
705 min_version: config.http_version.min_version,
706 max_version: config.http_version.max_version,
707 h2_ping_interval: if config.http_version.h2_ping_interval_secs > 0 {
708 Duration::from_secs(config.http_version.h2_ping_interval_secs)
709 } else {
710 Duration::ZERO
711 },
712 max_h2_streams: config.http_version.max_h2_streams,
713 };
714
715 let tls_enabled = config.tls.is_some();
717 let tls_sni = config.tls.as_ref().and_then(|t| t.sni.clone());
718 let tls_config = config.tls.clone();
719
720 if let Some(ref tls) = tls_config {
722 if tls.client_cert.is_some() {
723 info!(
724 upstream_id = %config.id,
725 "mTLS enabled for upstream (client certificate configured)"
726 );
727 }
728 }
729
730 if http_version.max_version >= 2 && tls_enabled {
731 info!(
732 upstream_id = %config.id,
733 "HTTP/2 enabled for upstream (via ALPN)"
734 );
735 }
736
737 let mut circuit_breakers = HashMap::new();
739 for target in &targets {
740 trace!(
741 upstream_id = %config.id,
742 target = %target.full_address(),
743 "Initializing circuit breaker for target"
744 );
745 circuit_breakers.insert(
746 target.full_address(),
747 CircuitBreaker::new(CircuitBreakerConfig::default()),
748 );
749 }
750
751 let pool = Self {
752 id: id.clone(),
753 targets,
754 load_balancer,
755 pool_config,
756 http_version,
757 tls_enabled,
758 tls_sni,
759 tls_config,
760 circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
761 stats: Arc::new(PoolStats::default()),
762 };
763
764 info!(
765 upstream_id = %id,
766 target_count = pool.targets.len(),
767 "Upstream pool created successfully"
768 );
769
770 Ok(pool)
771 }
772
773 fn create_load_balancer(
775 algorithm: &LoadBalancingAlgorithm,
776 targets: &[UpstreamTarget],
777 ) -> SentinelResult<Arc<dyn LoadBalancer>> {
778 let balancer: Arc<dyn LoadBalancer> = match algorithm {
779 LoadBalancingAlgorithm::RoundRobin => {
780 Arc::new(RoundRobinBalancer::new(targets.to_vec()))
781 }
782 LoadBalancingAlgorithm::LeastConnections => {
783 Arc::new(LeastConnectionsBalancer::new(targets.to_vec()))
784 }
785 LoadBalancingAlgorithm::Weighted => {
786 let weights: Vec<u32> = targets.iter().map(|t| t.weight).collect();
787 Arc::new(WeightedBalancer {
788 targets: targets.to_vec(),
789 weights,
790 current_index: AtomicUsize::new(0),
791 health_status: Arc::new(RwLock::new(HashMap::new())),
792 })
793 }
794 LoadBalancingAlgorithm::IpHash => Arc::new(IpHashBalancer {
795 targets: targets.to_vec(),
796 health_status: Arc::new(RwLock::new(HashMap::new())),
797 }),
798 LoadBalancingAlgorithm::Random => Arc::new(RoundRobinBalancer::new(targets.to_vec())),
799 LoadBalancingAlgorithm::ConsistentHash => Arc::new(ConsistentHashBalancer::new(
800 targets.to_vec(),
801 ConsistentHashConfig::default(),
802 )),
803 LoadBalancingAlgorithm::PowerOfTwoChoices => {
804 Arc::new(P2cBalancer::new(targets.to_vec(), P2cConfig::default()))
805 }
806 LoadBalancingAlgorithm::Adaptive => Arc::new(AdaptiveBalancer::new(
807 targets.to_vec(),
808 AdaptiveConfig::default(),
809 )),
810 };
811 Ok(balancer)
812 }
813
814 pub async fn select_peer(&self, context: Option<&RequestContext>) -> SentinelResult<HttpPeer> {
816 let request_num = self.stats.requests.fetch_add(1, Ordering::Relaxed) + 1;
817
818 trace!(
819 upstream_id = %self.id,
820 request_num = request_num,
821 target_count = self.targets.len(),
822 "Starting peer selection"
823 );
824
825 let mut attempts = 0;
826 let max_attempts = self.targets.len() * 2;
827
828 while attempts < max_attempts {
829 attempts += 1;
830
831 trace!(
832 upstream_id = %self.id,
833 attempt = attempts,
834 max_attempts = max_attempts,
835 "Attempting to select peer"
836 );
837
838 let selection = match self.load_balancer.select(context).await {
839 Ok(s) => s,
840 Err(e) => {
841 warn!(
842 upstream_id = %self.id,
843 attempt = attempts,
844 error = %e,
845 "Load balancer selection failed"
846 );
847 continue;
848 }
849 };
850
851 trace!(
852 upstream_id = %self.id,
853 target = %selection.address,
854 attempt = attempts,
855 "Load balancer selected target"
856 );
857
858 let breakers = self.circuit_breakers.read().await;
860 if let Some(breaker) = breakers.get(&selection.address) {
861 if !breaker.is_closed().await {
862 debug!(
863 upstream_id = %self.id,
864 target = %selection.address,
865 attempt = attempts,
866 "Circuit breaker is open, skipping target"
867 );
868 self.stats
869 .circuit_breaker_trips
870 .fetch_add(1, Ordering::Relaxed);
871 continue;
872 }
873 }
874
875 trace!(
879 upstream_id = %self.id,
880 target = %selection.address,
881 "Creating peer for upstream (Pingora handles connection reuse)"
882 );
883 let peer = self.create_peer(&selection)?;
884
885 debug!(
886 upstream_id = %self.id,
887 target = %selection.address,
888 attempt = attempts,
889 "Selected upstream peer"
890 );
891
892 self.stats.successes.fetch_add(1, Ordering::Relaxed);
893 return Ok(peer);
894 }
895
896 self.stats.failures.fetch_add(1, Ordering::Relaxed);
897 error!(
898 upstream_id = %self.id,
899 attempts = attempts,
900 max_attempts = max_attempts,
901 "Failed to select upstream after max attempts"
902 );
903 Err(SentinelError::upstream(
904 self.id.to_string(),
905 "Failed to select upstream after max attempts",
906 ))
907 }
908
909 fn create_peer(&self, selection: &TargetSelection) -> SentinelResult<HttpPeer> {
915 let sni_hostname = self.tls_sni.clone().unwrap_or_else(|| {
917 selection
919 .address
920 .split(':')
921 .next()
922 .unwrap_or(&selection.address)
923 .to_string()
924 });
925
926 let resolved_address = selection
929 .address
930 .to_socket_addrs()
931 .map_err(|e| {
932 error!(
933 upstream = %self.id,
934 address = %selection.address,
935 error = %e,
936 "Failed to resolve upstream address"
937 );
938 SentinelError::Upstream {
939 upstream: self.id.to_string(),
940 message: format!("DNS resolution failed for {}: {}", selection.address, e),
941 retryable: true,
942 source: None,
943 }
944 })?
945 .next()
946 .ok_or_else(|| {
947 error!(
948 upstream = %self.id,
949 address = %selection.address,
950 "No addresses returned from DNS resolution"
951 );
952 SentinelError::Upstream {
953 upstream: self.id.to_string(),
954 message: format!("No addresses for {}", selection.address),
955 retryable: true,
956 source: None,
957 }
958 })?;
959
960 let mut peer = HttpPeer::new(resolved_address, self.tls_enabled, sni_hostname.clone());
962
963 peer.options.idle_timeout = Some(self.pool_config.idle_timeout);
967
968 peer.options.connection_timeout = Some(self.pool_config.connection_timeout);
970 peer.options.total_connection_timeout = Some(Duration::from_secs(10));
971
972 peer.options.read_timeout = Some(self.pool_config.read_timeout);
974 peer.options.write_timeout = Some(self.pool_config.write_timeout);
975
976 peer.options.tcp_keepalive = Some(pingora::protocols::TcpKeepalive {
978 idle: Duration::from_secs(60),
979 interval: Duration::from_secs(10),
980 count: 3,
981 #[cfg(target_os = "linux")]
983 user_timeout: Duration::from_secs(60),
984 });
985
986 if self.tls_enabled {
988 let alpn = match (self.http_version.min_version, self.http_version.max_version) {
990 (2, _) => {
991 pingora::upstreams::peer::ALPN::H2
993 }
994 (1, 2) | (_, 2) => {
995 pingora::upstreams::peer::ALPN::H2H1
997 }
998 _ => {
999 pingora::upstreams::peer::ALPN::H1
1001 }
1002 };
1003 peer.options.alpn = alpn;
1004
1005 if let Some(ref tls_config) = self.tls_config {
1007 if tls_config.insecure_skip_verify {
1009 peer.options.verify_cert = false;
1010 peer.options.verify_hostname = false;
1011 warn!(
1012 upstream_id = %self.id,
1013 target = %selection.address,
1014 "TLS certificate verification DISABLED (insecure_skip_verify=true)"
1015 );
1016 }
1017
1018 if let Some(ref sni) = tls_config.sni {
1020 peer.options.alternative_cn = Some(sni.clone());
1021 trace!(
1022 upstream_id = %self.id,
1023 target = %selection.address,
1024 alternative_cn = %sni,
1025 "Set alternative CN for TLS verification"
1026 );
1027 }
1028
1029 if tls_config.client_cert.is_some() {
1035 debug!(
1036 upstream_id = %self.id,
1037 target = %selection.address,
1038 client_cert = ?tls_config.client_cert,
1039 "mTLS client certificate configured (requires custom connector for full support)"
1040 );
1041 }
1042 }
1043
1044 trace!(
1045 upstream_id = %self.id,
1046 target = %selection.address,
1047 alpn = ?peer.options.alpn,
1048 min_version = self.http_version.min_version,
1049 max_version = self.http_version.max_version,
1050 verify_cert = peer.options.verify_cert,
1051 verify_hostname = peer.options.verify_hostname,
1052 "Configured ALPN and TLS options for HTTP version negotiation"
1053 );
1054 }
1055
1056 if self.http_version.max_version >= 2 {
1058 if !self.http_version.h2_ping_interval.is_zero() {
1060 peer.options.h2_ping_interval = Some(self.http_version.h2_ping_interval);
1061 trace!(
1062 upstream_id = %self.id,
1063 target = %selection.address,
1064 h2_ping_interval_secs = self.http_version.h2_ping_interval.as_secs(),
1065 "Configured H2 ping interval"
1066 );
1067 }
1068 }
1069
1070 trace!(
1071 upstream_id = %self.id,
1072 target = %selection.address,
1073 tls = self.tls_enabled,
1074 sni = %sni_hostname,
1075 idle_timeout_secs = self.pool_config.idle_timeout.as_secs(),
1076 http_max_version = self.http_version.max_version,
1077 "Created peer with Pingora connection pooling enabled"
1078 );
1079
1080 Ok(peer)
1081 }
1082
1083 pub async fn report_result(&self, target: &str, success: bool) {
1085 trace!(
1086 upstream_id = %self.id,
1087 target = %target,
1088 success = success,
1089 "Reporting connection result"
1090 );
1091
1092 if success {
1093 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1094 breaker.record_success().await;
1095 trace!(
1096 upstream_id = %self.id,
1097 target = %target,
1098 "Recorded success in circuit breaker"
1099 );
1100 }
1101 self.load_balancer.report_health(target, true).await;
1102 } else {
1103 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1104 breaker.record_failure().await;
1105 debug!(
1106 upstream_id = %self.id,
1107 target = %target,
1108 "Recorded failure in circuit breaker"
1109 );
1110 }
1111 self.load_balancer.report_health(target, false).await;
1112 self.stats.failures.fetch_add(1, Ordering::Relaxed);
1113 warn!(
1114 upstream_id = %self.id,
1115 target = %target,
1116 "Connection failure reported for target"
1117 );
1118 }
1119 }
1120
1121 pub async fn report_result_with_latency(
1127 &self,
1128 target: &str,
1129 success: bool,
1130 latency: Option<Duration>,
1131 ) {
1132 trace!(
1133 upstream_id = %self.id,
1134 target = %target,
1135 success = success,
1136 latency_ms = latency.map(|l| l.as_millis() as u64),
1137 "Reporting result with latency for adaptive LB"
1138 );
1139
1140 if success {
1142 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1143 breaker.record_success().await;
1144 }
1145 } else {
1146 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
1147 breaker.record_failure().await;
1148 }
1149 self.stats.failures.fetch_add(1, Ordering::Relaxed);
1150 }
1151
1152 self.load_balancer
1154 .report_result_with_latency(target, success, latency)
1155 .await;
1156 }
1157
1158 pub fn stats(&self) -> &PoolStats {
1160 &self.stats
1161 }
1162
1163 pub fn id(&self) -> &UpstreamId {
1165 &self.id
1166 }
1167
1168 pub fn target_count(&self) -> usize {
1170 self.targets.len()
1171 }
1172
1173 pub fn pool_config(&self) -> PoolConfigSnapshot {
1175 PoolConfigSnapshot {
1176 max_connections: self.pool_config.max_connections,
1177 max_idle: self.pool_config.max_idle,
1178 idle_timeout_secs: self.pool_config.idle_timeout.as_secs(),
1179 max_lifetime_secs: self.pool_config.max_lifetime.map(|d| d.as_secs()),
1180 connection_timeout_secs: self.pool_config.connection_timeout.as_secs(),
1181 read_timeout_secs: self.pool_config.read_timeout.as_secs(),
1182 write_timeout_secs: self.pool_config.write_timeout.as_secs(),
1183 }
1184 }
1185
1186 pub async fn shutdown(&self) {
1190 info!(
1191 upstream_id = %self.id,
1192 target_count = self.targets.len(),
1193 total_requests = self.stats.requests.load(Ordering::Relaxed),
1194 total_successes = self.stats.successes.load(Ordering::Relaxed),
1195 total_failures = self.stats.failures.load(Ordering::Relaxed),
1196 "Shutting down upstream pool"
1197 );
1198 debug!(upstream_id = %self.id, "Upstream pool shutdown complete");
1200 }
1201}