1use crate::{
8 balancer::{extract_domain_name, LoadBalancer},
9 config::try_load_config,
10 endpoint::{EndpointMetrics, LoadBalancerError, RpcEndpoint},
11 metrics::{
12 COOLDOWNS_TRIGGERED, COOLDOWN_SECONDS_GAUGE, ENDPOINT_RATE_LIMIT_DEFERRED,
13 HEALTHCHECK_FAILED, HEALTHY_ENDPOINTS, REQUEST_LATENCY_PER_ENDPOINT, RPC_REQUESTS_FAILED,
14 RPC_REQUESTS_SUCCEEDED, TOTAL_ENDPOINTS,
15 },
16};
17use parking_lot::Mutex;
18use ratelimit_meter::DirectRateLimiter;
19use reqwest::Client;
20use serde::Serialize;
21use std::{
22 collections::{HashMap, HashSet},
23 num::NonZeroU32,
24 sync::{atomic::AtomicU64, Arc},
25 time::{Duration, Instant},
26};
27use tracing::info;
28
29#[derive(Serialize)]
30pub struct ReloadResponse {
31 pub success: bool,
32 pub message: String,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub error: Option<String>,
35 pub changes: ConfigChanges,
36}
37
38#[derive(Serialize, Default)]
39pub struct ConfigChanges {
40 pub endpoints_added: Vec<String>,
41 pub endpoints_removed: Vec<String>,
42 pub endpoints_modified: Vec<String>,
43 pub config_updated: bool,
44 pub client_rebuilt: bool,
45}
46
47fn cleanup_endpoint_metrics(url: &str) {
49 let labels = &[url];
50 let _ = COOLDOWN_SECONDS_GAUGE.remove_label_values(labels);
51 let _ = COOLDOWNS_TRIGGERED.remove_label_values(labels);
52 let _ = ENDPOINT_RATE_LIMIT_DEFERRED.remove_label_values(labels);
53 let _ = RPC_REQUESTS_SUCCEEDED.remove_label_values(labels);
54 let _ = RPC_REQUESTS_FAILED.remove_label_values(labels);
55 let _ = HEALTHCHECK_FAILED.remove_label_values(labels);
56 let _ = REQUEST_LATENCY_PER_ENDPOINT.remove_label_values(labels);
57 info!(url = %url, "Cleaned up metrics for removed endpoint");
58}
59
60fn initialize_endpoint_metrics(url: &str) {
62 let labels = &[url];
63 COOLDOWN_SECONDS_GAUGE.with_label_values(labels).set(0);
64 COOLDOWNS_TRIGGERED.with_label_values(labels).inc_by(0);
65 ENDPOINT_RATE_LIMIT_DEFERRED.with_label_values(labels).inc_by(0);
66 RPC_REQUESTS_SUCCEEDED.with_label_values(labels).inc_by(0);
67 RPC_REQUESTS_FAILED.with_label_values(labels).inc_by(0);
68 HEALTHCHECK_FAILED.with_label_values(labels).inc_by(0);
69 info!(url = %url, "Initialized metrics for new endpoint");
70}
71
72pub fn create_rate_limiter(
74 burst_size: u32,
75 rate_limit_per_sec: u32,
76) -> Arc<Mutex<DirectRateLimiter>> {
77 let capacity = NonZeroU32::new(burst_size).expect("Burst size from config must be > 0");
80 let period_nanos = (burst_size as u64 * 1_000_000_000) / rate_limit_per_sec as u64;
81 let period = Duration::from_nanos(period_nanos);
82 Arc::new(Mutex::new(DirectRateLimiter::new(capacity, period)))
83}
84
85pub fn reload(balancer: &LoadBalancer) -> Result<ReloadResponse, LoadBalancerError> {
87 let config_path = balancer.config_path.read().clone();
88 info!(config_path = %config_path, "Starting config reload");
89
90 let new_config_raw = try_load_config(&config_path)?.unwrap_or_default();
92 let new_config = new_config_raw.finalize()?;
93
94 let new_balancer_cfg = new_config.balancer.unwrap();
96 let crate::config::BalancerConfig {
97 max_batch_size: Some(new_max_batch_size),
98 base_cooldown_secs: Some(new_base_cooldown_secs),
99 max_cooldown_secs: Some(new_max_cooldown_secs),
100 health_check_interval_secs: Some(new_health_check_interval_secs),
101 health_check_timeout_secs: Some(new_health_check_timeout_secs),
102 latency_smoothing_factor: Some(new_latency_smoothing_factor),
103 connect_timeout_ms: Some(new_connect_timeout_ms),
104 timeout_secs: Some(new_timeout_secs),
105 pool_idle_timeout_secs: Some(new_pool_idle_timeout_secs),
106 pool_max_idle_per_host: Some(new_pool_max_idle_per_host),
107 endpoints: Some(new_endpoints_list),
108 ..
109 } = new_balancer_cfg
110 else {
111 return Err(LoadBalancerError::ConfigError(
113 "Finalized config is missing required values.".to_string(),
114 ));
115 };
116
117 let client_settings_changed = {
118 *balancer.connect_timeout_ms.read() != new_connect_timeout_ms
119 || *balancer.timeout_secs.read() != new_timeout_secs
120 || *balancer.pool_idle_timeout_secs.read() != new_pool_idle_timeout_secs
121 || *balancer.pool_max_idle_per_host.read() != new_pool_max_idle_per_host
122 };
123
124 if client_settings_changed {
125 info!("HTTP client settings have changed, rebuilding client.");
126 let new_client = Client::builder()
127 .tcp_nodelay(true)
128 .connect_timeout(Duration::from_millis(new_connect_timeout_ms))
129 .timeout(Duration::from_secs(new_timeout_secs))
130 .pool_idle_timeout(Some(Duration::from_secs(new_pool_idle_timeout_secs)))
131 .pool_max_idle_per_host(new_pool_max_idle_per_host)
132 .http1_title_case_headers()
133 .build()
134 .expect("Failed to build new HTTP client");
135
136 *balancer.client.write() = new_client;
137 *balancer.connect_timeout_ms.write() = new_connect_timeout_ms;
138 *balancer.timeout_secs.write() = new_timeout_secs;
139 *balancer.pool_idle_timeout_secs.write() = new_pool_idle_timeout_secs;
140 *balancer.pool_max_idle_per_host.write() = new_pool_max_idle_per_host;
141 }
142
143 let current_endpoints = balancer.endpoints.read();
144 let current_urls: HashSet<String> = current_endpoints.iter().map(|e| e.url.clone()).collect();
145 let new_urls: HashSet<String> = new_endpoints_list.iter().map(|e| e.url.clone()).collect();
146
147 let endpoints_added: Vec<String> = new_urls.difference(¤t_urls).cloned().collect();
148 let endpoints_removed: Vec<String> = current_urls.difference(&new_urls).cloned().collect();
149
150 let mut endpoints_modified = Vec::new();
151 for (index, new_ep_cfg) in new_endpoints_list.iter().enumerate() {
152 if let Some(current_ep) = current_endpoints.iter().find(|e| e.url == new_ep_cfg.url) {
153 let new_name = new_ep_cfg.name.clone().unwrap_or_else(|| {
154 let domain_name = extract_domain_name(&new_ep_cfg.url);
155 format!("{:03}_{}", index + 1, domain_name)
156 });
157
158 if current_ep.rate_limit_per_sec != new_ep_cfg.rate_limit_per_sec
159 || current_ep.burst_size != new_ep_cfg.burst_size
160 || current_ep.weight != new_ep_cfg.weight.unwrap()
161 || current_ep.name != new_name
162 {
163 endpoints_modified.push(new_ep_cfg.url.clone());
164 }
165 }
166 }
167
168 drop(current_endpoints);
169
170 for removed_url in &endpoints_removed {
171 cleanup_endpoint_metrics(removed_url);
172 }
173
174 let mut new_rpc_endpoints = Vec::new();
175 let mut new_rate_limiters = HashMap::new();
176
177 {
178 let current_endpoints = balancer.endpoints.read();
179 let current_limiters = balancer.rate_limiters.read();
180
181 for (index, new_ep_cfg) in new_endpoints_list.iter().enumerate() {
182 let new_weight = new_ep_cfg.weight.unwrap();
183 let new_rate_limit = new_ep_cfg.rate_limit_per_sec;
184 let new_burst = new_ep_cfg.burst_size;
185
186 let name = new_ep_cfg.name.clone().unwrap_or_else(|| {
187 let domain_name = extract_domain_name(&new_ep_cfg.url);
188 format!("{:03}_{}", index + 1, domain_name)
189 });
190
191 if let Some(existing_ep) = current_endpoints.iter().find(|e| e.url == new_ep_cfg.url) {
192 let mut updated_ep = existing_ep.clone();
194 updated_ep.name = name;
195 updated_ep.rate_limit_per_sec = new_rate_limit;
196 updated_ep.burst_size = new_burst;
197 updated_ep.weight = new_weight;
198 new_rpc_endpoints.push(updated_ep);
199
200 if existing_ep.rate_limit_per_sec != new_rate_limit
202 || existing_ep.burst_size != new_burst
203 {
204 new_rate_limiters.insert(
205 new_ep_cfg.url.clone(),
206 create_rate_limiter(new_burst, new_rate_limit),
207 );
208 } else if let Some(existing_limiter) = current_limiters.get(&new_ep_cfg.url) {
209 new_rate_limiters.insert(new_ep_cfg.url.clone(), existing_limiter.clone());
210 }
211 } else {
212 let new_ep = RpcEndpoint {
214 name,
215 url: new_ep_cfg.url.clone(),
216 healthy: true,
217 last_check: Instant::now(),
218 cooldown_until: None,
219 cooldown_attempts: 0,
220 rate_limit_per_sec: new_rate_limit,
221 burst_size: new_burst,
222 weight: new_weight,
223 metrics: Arc::new(EndpointMetrics {
224 ema_latency_ms: AtomicU64::new(*balancer.timeout_secs.read() * 1000),
225 ..Default::default()
226 }),
227 };
228 new_rpc_endpoints.push(new_ep);
229
230 new_rate_limiters
231 .insert(new_ep_cfg.url.clone(), create_rate_limiter(new_burst, new_rate_limit));
232
233 initialize_endpoint_metrics(&new_ep_cfg.url);
234 }
235 }
236 }
237
238 *balancer.endpoints.write() = new_rpc_endpoints;
240 *balancer.rate_limiters.write() = new_rate_limiters;
241 *balancer.max_batch_size.write() = new_max_batch_size;
242 *balancer.base_cooldown_secs.write() = new_base_cooldown_secs;
243 *balancer.max_cooldown_secs.write() = new_max_cooldown_secs;
244 *balancer.health_check_interval_secs.write() = new_health_check_interval_secs;
245 *balancer.health_check_timeout_secs.write() = new_health_check_timeout_secs;
246 *balancer.latency_smoothing_factor.write() = new_latency_smoothing_factor;
247
248 TOTAL_ENDPOINTS.set(new_endpoints_list.len() as i64);
249
250 let healthy_count = {
251 let endpoints = balancer.endpoints.read();
252 endpoints.iter().filter(|e| e.is_available()).count() as i64
253 };
254 HEALTHY_ENDPOINTS.set(healthy_count);
255
256 let changes = ConfigChanges {
257 endpoints_added: endpoints_added.clone(),
258 endpoints_removed: endpoints_removed.clone(),
259 endpoints_modified: endpoints_modified.clone(),
260 config_updated: true,
261 client_rebuilt: client_settings_changed,
262 };
263
264 info!(
265 added = endpoints_added.len(),
266 removed = endpoints_removed.len(),
267 modified = endpoints_modified.len(),
268 total_endpoints = new_endpoints_list.len(),
269 client_rebuilt = client_settings_changed,
270 "Configuration reload complete."
271 );
272
273 Ok(ReloadResponse {
274 success: true,
275 message: "Configuration reloaded successfully".to_string(),
276 error: None,
277 changes,
278 })
279}