1use async_trait::async_trait;
7use pingora::upstreams::peer::HttpPeer;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::RwLock;
13use tracing::{debug, error, info, trace, warn};
14
15use sentinel_common::{
16 errors::{SentinelError, SentinelResult},
17 types::{CircuitBreakerConfig, LoadBalancingAlgorithm, RetryPolicy},
18 CircuitBreaker, UpstreamId,
19};
20use sentinel_config::{HealthCheck as HealthCheckConfig, UpstreamConfig};
21
22#[derive(Debug, Clone)]
31pub struct UpstreamTarget {
32 pub address: String,
34 pub port: u16,
36 pub weight: u32,
38}
39
40impl UpstreamTarget {
41 pub fn new(address: impl Into<String>, port: u16, weight: u32) -> Self {
43 Self {
44 address: address.into(),
45 port,
46 weight,
47 }
48 }
49
50 pub fn from_address(addr: &str) -> Option<Self> {
52 let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
53 if parts.len() == 2 {
54 let port = parts[0].parse().ok()?;
55 let address = parts[1].to_string();
56 Some(Self {
57 address,
58 port,
59 weight: 100,
60 })
61 } else {
62 None
63 }
64 }
65
66 pub fn from_config(config: &sentinel_config::UpstreamTarget) -> Option<Self> {
68 Self::from_address(&config.address).map(|mut t| {
69 t.weight = config.weight;
70 t
71 })
72 }
73
74 pub fn full_address(&self) -> String {
76 format!("{}:{}", self.address, self.port)
77 }
78}
79
80pub mod adaptive;
86pub mod consistent_hash;
87pub mod p2c;
88
89pub use adaptive::{AdaptiveBalancer, AdaptiveConfig};
91pub use consistent_hash::{
92 ConsistentHashBalancer, ConsistentHashConfig,
93};
94pub use p2c::{P2cBalancer, P2cConfig};
95
96#[derive(Debug, Clone)]
98pub struct RequestContext {
99 pub client_ip: Option<std::net::SocketAddr>,
100 pub headers: HashMap<String, String>,
101 pub path: String,
102 pub method: String,
103}
104
105#[async_trait]
107pub trait LoadBalancer: Send + Sync {
108 async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection>;
110
111 async fn report_health(&self, address: &str, healthy: bool);
113
114 async fn healthy_targets(&self) -> Vec<String>;
116
117 async fn release(&self, _selection: &TargetSelection) {
119 }
121
122 async fn report_result(
124 &self,
125 _selection: &TargetSelection,
126 _success: bool,
127 _latency: Option<Duration>,
128 ) {
129 }
131}
132
133#[derive(Debug, Clone)]
135pub struct TargetSelection {
136 pub address: String,
138 pub weight: u32,
140 pub metadata: HashMap<String, String>,
142}
143
144pub struct UpstreamPool {
146 id: UpstreamId,
148 targets: Vec<UpstreamTarget>,
150 load_balancer: Arc<dyn LoadBalancer>,
152 health_checker: Option<Arc<UpstreamHealthChecker>>,
154 connection_pool: Arc<ConnectionPool>,
156 circuit_breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
158 retry_policy: Option<RetryPolicy>,
160 stats: Arc<PoolStats>,
162}
163
164pub struct UpstreamHealthChecker {
169 config: HealthCheckConfig,
171 health_status: Arc<RwLock<HashMap<String, TargetHealthStatus>>>,
173 check_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
175}
176
177impl UpstreamHealthChecker {
178 pub fn new(config: HealthCheckConfig) -> Self {
180 Self {
181 config,
182 health_status: Arc::new(RwLock::new(HashMap::new())),
183 check_handles: Arc::new(RwLock::new(Vec::new())),
184 }
185 }
186}
187
188#[derive(Debug, Clone)]
190struct TargetHealthStatus {
191 healthy: bool,
193 consecutive_successes: u32,
195 consecutive_failures: u32,
197 last_check: Instant,
199 last_success: Option<Instant>,
201 last_error: Option<String>,
203}
204
205pub struct ConnectionPool {
207 max_connections: usize,
209 max_idle: usize,
210 idle_timeout: Duration,
211 max_lifetime: Option<Duration>,
212 connections: Arc<RwLock<HashMap<String, Vec<PooledConnection>>>>,
214 stats: Arc<ConnectionPoolStats>,
216}
217
218impl ConnectionPool {
219 pub fn new(
221 max_connections: usize,
222 max_idle: usize,
223 idle_timeout: Duration,
224 max_lifetime: Option<Duration>,
225 ) -> Self {
226 Self {
227 max_connections,
228 max_idle,
229 idle_timeout,
230 max_lifetime,
231 connections: Arc::new(RwLock::new(HashMap::new())),
232 stats: Arc::new(ConnectionPoolStats::default()),
233 }
234 }
235
236 pub async fn acquire(&self, _address: &str) -> SentinelResult<Option<HttpPeer>> {
238 Ok(None)
241 }
242
243 pub async fn close_all(&self) {
245 let mut connections = self.connections.write().await;
246 connections.clear();
247 }
248}
249
250struct PooledConnection {
252 peer: HttpPeer,
254 created: Instant,
256 last_used: Instant,
258 in_use: bool,
260}
261
262#[derive(Default)]
264struct ConnectionPoolStats {
265 created: AtomicU64,
267 reused: AtomicU64,
269 closed: AtomicU64,
271 active: AtomicU64,
273 idle: AtomicU64,
275}
276
277#[derive(Default)]
281pub struct PoolStats {
282 pub requests: AtomicU64,
284 pub successes: AtomicU64,
286 pub failures: AtomicU64,
288 pub retries: AtomicU64,
290 pub circuit_breaker_trips: AtomicU64,
292}
293
294struct RoundRobinBalancer {
296 targets: Vec<UpstreamTarget>,
297 current: AtomicUsize,
298 health_status: Arc<RwLock<HashMap<String, bool>>>,
299}
300
301impl RoundRobinBalancer {
302 fn new(targets: Vec<UpstreamTarget>) -> Self {
303 let mut health_status = HashMap::new();
304 for target in &targets {
305 health_status.insert(target.full_address(), true);
306 }
307
308 Self {
309 targets,
310 current: AtomicUsize::new(0),
311 health_status: Arc::new(RwLock::new(health_status)),
312 }
313 }
314}
315
316#[async_trait]
317impl LoadBalancer for RoundRobinBalancer {
318 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
319 trace!(
320 total_targets = self.targets.len(),
321 algorithm = "round_robin",
322 "Selecting upstream target"
323 );
324
325 let health = self.health_status.read().await;
326 let healthy_targets: Vec<_> = self
327 .targets
328 .iter()
329 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
330 .collect();
331
332 if healthy_targets.is_empty() {
333 warn!(
334 total_targets = self.targets.len(),
335 algorithm = "round_robin",
336 "No healthy upstream targets available"
337 );
338 return Err(SentinelError::NoHealthyUpstream);
339 }
340
341 let index = self.current.fetch_add(1, Ordering::Relaxed) % healthy_targets.len();
342 let target = healthy_targets[index];
343
344 trace!(
345 selected_target = %target.full_address(),
346 healthy_count = healthy_targets.len(),
347 index = index,
348 algorithm = "round_robin",
349 "Selected target via round robin"
350 );
351
352 Ok(TargetSelection {
353 address: target.full_address(),
354 weight: target.weight,
355 metadata: HashMap::new(),
356 })
357 }
358
359 async fn report_health(&self, address: &str, healthy: bool) {
360 trace!(
361 target = %address,
362 healthy = healthy,
363 algorithm = "round_robin",
364 "Updating target health status"
365 );
366 self.health_status
367 .write()
368 .await
369 .insert(address.to_string(), healthy);
370 }
371
372 async fn healthy_targets(&self) -> Vec<String> {
373 self.health_status
374 .read()
375 .await
376 .iter()
377 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
378 .collect()
379 }
380}
381
382struct LeastConnectionsBalancer {
384 targets: Vec<UpstreamTarget>,
385 connections: Arc<RwLock<HashMap<String, usize>>>,
386 health_status: Arc<RwLock<HashMap<String, bool>>>,
387}
388
389impl LeastConnectionsBalancer {
390 fn new(targets: Vec<UpstreamTarget>) -> Self {
391 let mut health_status = HashMap::new();
392 let mut connections = HashMap::new();
393
394 for target in &targets {
395 let addr = target.full_address();
396 health_status.insert(addr.clone(), true);
397 connections.insert(addr, 0);
398 }
399
400 Self {
401 targets,
402 connections: Arc::new(RwLock::new(connections)),
403 health_status: Arc::new(RwLock::new(health_status)),
404 }
405 }
406}
407
408#[async_trait]
409impl LoadBalancer for LeastConnectionsBalancer {
410 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
411 trace!(
412 total_targets = self.targets.len(),
413 algorithm = "least_connections",
414 "Selecting upstream target"
415 );
416
417 let health = self.health_status.read().await;
418 let conns = self.connections.read().await;
419
420 let mut best_target = None;
421 let mut min_connections = usize::MAX;
422
423 for target in &self.targets {
424 let addr = target.full_address();
425 if !*health.get(&addr).unwrap_or(&true) {
426 trace!(
427 target = %addr,
428 algorithm = "least_connections",
429 "Skipping unhealthy target"
430 );
431 continue;
432 }
433
434 let conn_count = *conns.get(&addr).unwrap_or(&0);
435 trace!(
436 target = %addr,
437 connections = conn_count,
438 "Evaluating target connection count"
439 );
440 if conn_count < min_connections {
441 min_connections = conn_count;
442 best_target = Some(target);
443 }
444 }
445
446 match best_target {
447 Some(target) => {
448 trace!(
449 selected_target = %target.full_address(),
450 connections = min_connections,
451 algorithm = "least_connections",
452 "Selected target with fewest connections"
453 );
454 Ok(TargetSelection {
455 address: target.full_address(),
456 weight: target.weight,
457 metadata: HashMap::new(),
458 })
459 }
460 None => {
461 warn!(
462 total_targets = self.targets.len(),
463 algorithm = "least_connections",
464 "No healthy upstream targets available"
465 );
466 Err(SentinelError::NoHealthyUpstream)
467 }
468 }
469 }
470
471 async fn report_health(&self, address: &str, healthy: bool) {
472 trace!(
473 target = %address,
474 healthy = healthy,
475 algorithm = "least_connections",
476 "Updating target health status"
477 );
478 self.health_status
479 .write()
480 .await
481 .insert(address.to_string(), healthy);
482 }
483
484 async fn healthy_targets(&self) -> Vec<String> {
485 self.health_status
486 .read()
487 .await
488 .iter()
489 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
490 .collect()
491 }
492}
493
494struct WeightedBalancer {
496 targets: Vec<UpstreamTarget>,
497 weights: Vec<u32>,
498 current_index: AtomicUsize,
499 health_status: Arc<RwLock<HashMap<String, bool>>>,
500}
501
502#[async_trait]
503impl LoadBalancer for WeightedBalancer {
504 async fn select(&self, _context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
505 trace!(
506 total_targets = self.targets.len(),
507 algorithm = "weighted",
508 "Selecting upstream target"
509 );
510
511 let health = self.health_status.read().await;
512 let healthy_indices: Vec<_> = self
513 .targets
514 .iter()
515 .enumerate()
516 .filter(|(_, t)| *health.get(&t.full_address()).unwrap_or(&true))
517 .map(|(i, _)| i)
518 .collect();
519
520 if healthy_indices.is_empty() {
521 warn!(
522 total_targets = self.targets.len(),
523 algorithm = "weighted",
524 "No healthy upstream targets available"
525 );
526 return Err(SentinelError::NoHealthyUpstream);
527 }
528
529 let idx = self.current_index.fetch_add(1, Ordering::Relaxed) % healthy_indices.len();
530 let target_idx = healthy_indices[idx];
531 let target = &self.targets[target_idx];
532 let weight = self.weights.get(target_idx).copied().unwrap_or(1);
533
534 trace!(
535 selected_target = %target.full_address(),
536 weight = weight,
537 healthy_count = healthy_indices.len(),
538 algorithm = "weighted",
539 "Selected target via weighted round robin"
540 );
541
542 Ok(TargetSelection {
543 address: target.full_address(),
544 weight,
545 metadata: HashMap::new(),
546 })
547 }
548
549 async fn report_health(&self, address: &str, healthy: bool) {
550 trace!(
551 target = %address,
552 healthy = healthy,
553 algorithm = "weighted",
554 "Updating target health status"
555 );
556 self.health_status
557 .write()
558 .await
559 .insert(address.to_string(), healthy);
560 }
561
562 async fn healthy_targets(&self) -> Vec<String> {
563 self.health_status
564 .read()
565 .await
566 .iter()
567 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
568 .collect()
569 }
570}
571
572struct IpHashBalancer {
574 targets: Vec<UpstreamTarget>,
575 health_status: Arc<RwLock<HashMap<String, bool>>>,
576}
577
578#[async_trait]
579impl LoadBalancer for IpHashBalancer {
580 async fn select(&self, context: Option<&RequestContext>) -> SentinelResult<TargetSelection> {
581 trace!(
582 total_targets = self.targets.len(),
583 algorithm = "ip_hash",
584 "Selecting upstream target"
585 );
586
587 let health = self.health_status.read().await;
588 let healthy_targets: Vec<_> = self
589 .targets
590 .iter()
591 .filter(|t| *health.get(&t.full_address()).unwrap_or(&true))
592 .collect();
593
594 if healthy_targets.is_empty() {
595 warn!(
596 total_targets = self.targets.len(),
597 algorithm = "ip_hash",
598 "No healthy upstream targets available"
599 );
600 return Err(SentinelError::NoHealthyUpstream);
601 }
602
603 let (hash, client_ip_str) = if let Some(ctx) = context {
605 if let Some(ip) = &ctx.client_ip {
606 use std::hash::{Hash, Hasher};
607 let mut hasher = std::collections::hash_map::DefaultHasher::new();
608 ip.hash(&mut hasher);
609 (hasher.finish(), Some(ip.to_string()))
610 } else {
611 (0, None)
612 }
613 } else {
614 (0, None)
615 };
616
617 let idx = (hash as usize) % healthy_targets.len();
618 let target = healthy_targets[idx];
619
620 trace!(
621 selected_target = %target.full_address(),
622 client_ip = client_ip_str.as_deref().unwrap_or("unknown"),
623 hash = hash,
624 index = idx,
625 healthy_count = healthy_targets.len(),
626 algorithm = "ip_hash",
627 "Selected target via IP hash"
628 );
629
630 Ok(TargetSelection {
631 address: target.full_address(),
632 weight: target.weight,
633 metadata: HashMap::new(),
634 })
635 }
636
637 async fn report_health(&self, address: &str, healthy: bool) {
638 trace!(
639 target = %address,
640 healthy = healthy,
641 algorithm = "ip_hash",
642 "Updating target health status"
643 );
644 self.health_status
645 .write()
646 .await
647 .insert(address.to_string(), healthy);
648 }
649
650 async fn healthy_targets(&self) -> Vec<String> {
651 self.health_status
652 .read()
653 .await
654 .iter()
655 .filter_map(|(addr, &healthy)| if healthy { Some(addr.clone()) } else { None })
656 .collect()
657 }
658}
659
660impl UpstreamPool {
661 pub async fn new(config: UpstreamConfig) -> SentinelResult<Self> {
663 let id = UpstreamId::new(&config.id);
664
665 info!(
666 upstream_id = %config.id,
667 target_count = config.targets.len(),
668 algorithm = ?config.load_balancing,
669 "Creating upstream pool"
670 );
671
672 let targets: Vec<UpstreamTarget> = config
674 .targets
675 .iter()
676 .filter_map(|t| UpstreamTarget::from_config(t))
677 .collect();
678
679 if targets.is_empty() {
680 error!(
681 upstream_id = %config.id,
682 "No valid upstream targets configured"
683 );
684 return Err(SentinelError::Config {
685 message: "No valid upstream targets".to_string(),
686 source: None,
687 });
688 }
689
690 for target in &targets {
691 debug!(
692 upstream_id = %config.id,
693 target = %target.full_address(),
694 weight = target.weight,
695 "Registered upstream target"
696 );
697 }
698
699 debug!(
701 upstream_id = %config.id,
702 algorithm = ?config.load_balancing,
703 "Creating load balancer"
704 );
705 let load_balancer = Self::create_load_balancer(&config.load_balancing, &targets)?;
706
707 let health_checker = config
709 .health_check
710 .as_ref()
711 .map(|hc_config| {
712 debug!(
713 upstream_id = %config.id,
714 check_type = ?hc_config.check_type,
715 interval_secs = hc_config.interval_secs,
716 "Creating health checker"
717 );
718 Arc::new(UpstreamHealthChecker::new(hc_config.clone()))
719 });
720
721 debug!(
723 upstream_id = %config.id,
724 max_connections = config.connection_pool.max_connections,
725 max_idle = config.connection_pool.max_idle,
726 idle_timeout_secs = config.connection_pool.idle_timeout_secs,
727 "Creating connection pool"
728 );
729 let connection_pool = Arc::new(ConnectionPool::new(
730 config.connection_pool.max_connections,
731 config.connection_pool.max_idle,
732 Duration::from_secs(config.connection_pool.idle_timeout_secs),
733 config
734 .connection_pool
735 .max_lifetime_secs
736 .map(Duration::from_secs),
737 ));
738
739 let mut circuit_breakers = HashMap::new();
741 for target in &targets {
742 trace!(
743 upstream_id = %config.id,
744 target = %target.full_address(),
745 "Initializing circuit breaker for target"
746 );
747 circuit_breakers.insert(
748 target.full_address(),
749 CircuitBreaker::new(CircuitBreakerConfig::default()),
750 );
751 }
752
753 let pool = Self {
754 id: id.clone(),
755 targets,
756 load_balancer,
757 health_checker,
758 connection_pool,
759 circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
760 retry_policy: None,
761 stats: Arc::new(PoolStats::default()),
762 };
763
764 info!(
765 upstream_id = %id,
766 target_count = pool.targets.len(),
767 "Upstream pool created successfully"
768 );
769
770 Ok(pool)
771 }
772
773 fn create_load_balancer(
775 algorithm: &LoadBalancingAlgorithm,
776 targets: &[UpstreamTarget],
777 ) -> SentinelResult<Arc<dyn LoadBalancer>> {
778 let balancer: Arc<dyn LoadBalancer> = match algorithm {
779 LoadBalancingAlgorithm::RoundRobin => {
780 Arc::new(RoundRobinBalancer::new(targets.to_vec()))
781 }
782 LoadBalancingAlgorithm::LeastConnections => {
783 Arc::new(LeastConnectionsBalancer::new(targets.to_vec()))
784 }
785 LoadBalancingAlgorithm::Weighted => {
786 let weights: Vec<u32> = targets.iter().map(|t| t.weight).collect();
787 Arc::new(WeightedBalancer {
788 targets: targets.to_vec(),
789 weights,
790 current_index: AtomicUsize::new(0),
791 health_status: Arc::new(RwLock::new(HashMap::new())),
792 })
793 }
794 LoadBalancingAlgorithm::IpHash => Arc::new(IpHashBalancer {
795 targets: targets.to_vec(),
796 health_status: Arc::new(RwLock::new(HashMap::new())),
797 }),
798 LoadBalancingAlgorithm::Random => {
799 Arc::new(RoundRobinBalancer::new(targets.to_vec()))
800 }
801 LoadBalancingAlgorithm::ConsistentHash => Arc::new(ConsistentHashBalancer::new(
802 targets.to_vec(),
803 ConsistentHashConfig::default(),
804 )),
805 LoadBalancingAlgorithm::PowerOfTwoChoices => {
806 Arc::new(P2cBalancer::new(targets.to_vec(), P2cConfig::default()))
807 }
808 LoadBalancingAlgorithm::Adaptive => Arc::new(AdaptiveBalancer::new(
809 targets.to_vec(),
810 AdaptiveConfig::default(),
811 )),
812 };
813 Ok(balancer)
814 }
815
816 pub async fn select_peer(&self, context: Option<&RequestContext>) -> SentinelResult<HttpPeer> {
818 let request_num = self.stats.requests.fetch_add(1, Ordering::Relaxed) + 1;
819
820 trace!(
821 upstream_id = %self.id,
822 request_num = request_num,
823 target_count = self.targets.len(),
824 "Starting peer selection"
825 );
826
827 let mut attempts = 0;
828 let max_attempts = self.targets.len() * 2;
829
830 while attempts < max_attempts {
831 attempts += 1;
832
833 trace!(
834 upstream_id = %self.id,
835 attempt = attempts,
836 max_attempts = max_attempts,
837 "Attempting to select peer"
838 );
839
840 let selection = match self.load_balancer.select(context).await {
841 Ok(s) => s,
842 Err(e) => {
843 warn!(
844 upstream_id = %self.id,
845 attempt = attempts,
846 error = %e,
847 "Load balancer selection failed"
848 );
849 continue;
850 }
851 };
852
853 trace!(
854 upstream_id = %self.id,
855 target = %selection.address,
856 attempt = attempts,
857 "Load balancer selected target"
858 );
859
860 let breakers = self.circuit_breakers.read().await;
862 if let Some(breaker) = breakers.get(&selection.address) {
863 if !breaker.is_closed().await {
864 debug!(
865 upstream_id = %self.id,
866 target = %selection.address,
867 attempt = attempts,
868 "Circuit breaker is open, skipping target"
869 );
870 self.stats.circuit_breaker_trips.fetch_add(1, Ordering::Relaxed);
871 continue;
872 }
873 }
874
875 if let Some(peer) = self.connection_pool.acquire(&selection.address).await? {
877 debug!(
878 upstream_id = %self.id,
879 target = %selection.address,
880 attempt = attempts,
881 "Reusing pooled connection"
882 );
883 return Ok(peer);
884 }
885
886 trace!(
888 upstream_id = %self.id,
889 target = %selection.address,
890 "Creating new connection to upstream"
891 );
892 let peer = self.create_peer(&selection)?;
893
894 debug!(
895 upstream_id = %self.id,
896 target = %selection.address,
897 attempt = attempts,
898 "Selected upstream peer"
899 );
900
901 self.stats.successes.fetch_add(1, Ordering::Relaxed);
902 return Ok(peer);
903 }
904
905 self.stats.failures.fetch_add(1, Ordering::Relaxed);
906 error!(
907 upstream_id = %self.id,
908 attempts = attempts,
909 max_attempts = max_attempts,
910 "Failed to select upstream after max attempts"
911 );
912 Err(SentinelError::upstream(
913 &self.id.to_string(),
914 "Failed to select upstream after max attempts",
915 ))
916 }
917
918 fn create_peer(&self, selection: &TargetSelection) -> SentinelResult<HttpPeer> {
920 let mut peer = HttpPeer::new(
921 &selection.address,
922 false,
923 String::new(),
924 );
925
926 peer.options.idle_timeout = Some(self.connection_pool.idle_timeout);
930
931 peer.options.connection_timeout = Some(Duration::from_secs(5));
933 peer.options.total_connection_timeout = Some(Duration::from_secs(10));
934
935 peer.options.read_timeout = Some(Duration::from_secs(60));
937 peer.options.write_timeout = Some(Duration::from_secs(60));
938
939 peer.options.tcp_keepalive = Some(pingora::protocols::TcpKeepalive {
941 idle: Duration::from_secs(60),
942 interval: Duration::from_secs(10),
943 count: 3,
944 #[cfg(target_os = "linux")]
946 user_timeout: Duration::from_secs(60),
947 });
948
949 trace!(
950 upstream_id = %self.id,
951 target = %selection.address,
952 idle_timeout_secs = self.connection_pool.idle_timeout.as_secs(),
953 "Created peer with connection pooling options"
954 );
955
956 Ok(peer)
957 }
958
959 pub async fn report_result(&self, target: &str, success: bool) {
961 trace!(
962 upstream_id = %self.id,
963 target = %target,
964 success = success,
965 "Reporting connection result"
966 );
967
968 if success {
969 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
970 breaker.record_success().await;
971 trace!(
972 upstream_id = %self.id,
973 target = %target,
974 "Recorded success in circuit breaker"
975 );
976 }
977 self.load_balancer.report_health(target, true).await;
978 } else {
979 if let Some(breaker) = self.circuit_breakers.read().await.get(target) {
980 breaker.record_failure().await;
981 debug!(
982 upstream_id = %self.id,
983 target = %target,
984 "Recorded failure in circuit breaker"
985 );
986 }
987 self.load_balancer.report_health(target, false).await;
988 self.stats.failures.fetch_add(1, Ordering::Relaxed);
989 warn!(
990 upstream_id = %self.id,
991 target = %target,
992 "Connection failure reported for target"
993 );
994 }
995 }
996
997 pub fn stats(&self) -> &PoolStats {
999 &self.stats
1000 }
1001
1002 pub async fn shutdown(&self) {
1004 info!(
1005 upstream_id = %self.id,
1006 target_count = self.targets.len(),
1007 total_requests = self.stats.requests.load(Ordering::Relaxed),
1008 total_successes = self.stats.successes.load(Ordering::Relaxed),
1009 total_failures = self.stats.failures.load(Ordering::Relaxed),
1010 "Shutting down upstream pool"
1011 );
1012 self.connection_pool.close_all().await;
1013 debug!(upstream_id = %self.id, "Connection pool closed");
1014 }
1015}