pingora_load_balancing/
background.rs1use std::time::{Duration, Instant};
18
19use super::{BackendIter, BackendSelection, LoadBalancer};
20use async_trait::async_trait;
21use pingora_core::services::{background::BackgroundService, ServiceReadyNotifier};
22
23impl<S: Send + Sync + BackendSelection + 'static> LoadBalancer<S>
24where
25 S::Iter: BackendIter,
26{
27 pub async fn run(
28 &self,
29 shutdown: pingora_core::server::ShutdownWatch,
30 mut ready_opt: Option<ServiceReadyNotifier>,
31 ) -> () {
32 const NEVER: Duration = Duration::from_secs(u32::MAX as u64);
34 let mut now = Instant::now();
35 let mut next_update = now;
37 let mut next_health_check = now;
38
39 loop {
40 if *shutdown.borrow() {
41 return;
42 }
43
44 if next_update <= now {
45 let _ = self.update().await;
47 next_update = now + self.update_frequency.unwrap_or(NEVER);
48 }
49
50 if let Some(ready) = ready_opt.take() {
53 ServiceReadyNotifier::notify_ready(ready)
54 }
55
56 if next_health_check <= now {
57 self.backends
58 .run_health_check(self.parallel_health_check)
59 .await;
60 next_health_check = now + self.health_check_frequency.unwrap_or(NEVER);
61 }
62
63 if self.update_frequency.is_none() && self.health_check_frequency.is_none() {
64 return;
65 }
66 let to_wake = std::cmp::min(next_update, next_health_check);
67 tokio::time::sleep_until(to_wake.into()).await;
68 now = Instant::now();
69 }
70 }
71}
72
73#[async_trait]
77impl<S: Send + Sync + BackendSelection + 'static> BackgroundService for LoadBalancer<S>
78where
79 S::Iter: BackendIter,
80{
81 async fn start_with_ready_notifier(
82 &self,
83 shutdown: pingora_core::server::ShutdownWatch,
84 ready: ServiceReadyNotifier,
85 ) -> () {
86 self.run(shutdown, Some(ready)).await
87 }
88
89 async fn start(&self, shutdown: pingora_core::server::ShutdownWatch) -> () {
90 self.run(shutdown, None).await
91 }
92}