azoth_balancer/
config_reloader.rs

1//! This module handles the logic for hot-reloading the configuration from disk.
2//!
3//! It detects changes in the configuration file, updates the load balancer's state
4//! accordingly, and manages the addition, removal, or modification of endpoints
5//! without requiring a full application restart.
6
7use crate::{
8    balancer::{extract_domain_name, LoadBalancer},
9    config::try_load_config,
10    endpoint::{EndpointMetrics, LoadBalancerError, RpcEndpoint},
11    metrics::{
12        COOLDOWNS_TRIGGERED, COOLDOWN_SECONDS_GAUGE, ENDPOINT_RATE_LIMIT_DEFERRED,
13        HEALTHCHECK_FAILED, HEALTHY_ENDPOINTS, REQUEST_LATENCY_PER_ENDPOINT, RPC_REQUESTS_FAILED,
14        RPC_REQUESTS_SUCCEEDED, TOTAL_ENDPOINTS,
15    },
16};
17use parking_lot::Mutex;
18use ratelimit_meter::DirectRateLimiter;
19use reqwest::Client;
20use serde::Serialize;
21use std::{
22    collections::{HashMap, HashSet},
23    num::NonZeroU32,
24    sync::{atomic::AtomicU64, Arc},
25    time::{Duration, Instant},
26};
27use tracing::info;
28
29#[derive(Serialize)]
30pub struct ReloadResponse {
31    pub success: bool,
32    pub message: String,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub error: Option<String>,
35    pub changes: ConfigChanges,
36}
37
38#[derive(Serialize, Default)]
39pub struct ConfigChanges {
40    pub endpoints_added: Vec<String>,
41    pub endpoints_removed: Vec<String>,
42    pub endpoints_modified: Vec<String>,
43    pub config_updated: bool,
44    pub client_rebuilt: bool,
45}
46
47/// Helper to remove all Prometheus metrics associated with an endpoint URL.
48fn cleanup_endpoint_metrics(url: &str) {
49    let labels = &[url];
50    let _ = COOLDOWN_SECONDS_GAUGE.remove_label_values(labels);
51    let _ = COOLDOWNS_TRIGGERED.remove_label_values(labels);
52    let _ = ENDPOINT_RATE_LIMIT_DEFERRED.remove_label_values(labels);
53    let _ = RPC_REQUESTS_SUCCEEDED.remove_label_values(labels);
54    let _ = RPC_REQUESTS_FAILED.remove_label_values(labels);
55    let _ = HEALTHCHECK_FAILED.remove_label_values(labels);
56    let _ = REQUEST_LATENCY_PER_ENDPOINT.remove_label_values(labels);
57    info!(url = %url, "Cleaned up metrics for removed endpoint");
58}
59
60/// Helper to initialize all Prometheus metrics for a new endpoint URL.
61fn initialize_endpoint_metrics(url: &str) {
62    let labels = &[url];
63    COOLDOWN_SECONDS_GAUGE.with_label_values(labels).set(0);
64    COOLDOWNS_TRIGGERED.with_label_values(labels).inc_by(0);
65    ENDPOINT_RATE_LIMIT_DEFERRED.with_label_values(labels).inc_by(0);
66    RPC_REQUESTS_SUCCEEDED.with_label_values(labels).inc_by(0);
67    RPC_REQUESTS_FAILED.with_label_values(labels).inc_by(0);
68    HEALTHCHECK_FAILED.with_label_values(labels).inc_by(0);
69    info!(url = %url, "Initialized metrics for new endpoint");
70}
71
72/// Creates a new thread-safe rate limiter.
73pub fn create_rate_limiter(
74    burst_size: u32,
75    rate_limit_per_sec: u32,
76) -> Arc<Mutex<DirectRateLimiter>> {
77    // This logic can panic if burst_size or rate_limit_per_sec are 0.
78    // The config finalizer should prevent this by ensuring they are > 0.
79    let capacity = NonZeroU32::new(burst_size).expect("Burst size from config must be > 0");
80    let period_nanos = (burst_size as u64 * 1_000_000_000) / rate_limit_per_sec as u64;
81    let period = Duration::from_nanos(period_nanos);
82    Arc::new(Mutex::new(DirectRateLimiter::new(capacity, period)))
83}
84
85/// Reloads the configuration and applies changes to the load balancer state.
86pub fn reload(balancer: &LoadBalancer) -> Result<ReloadResponse, LoadBalancerError> {
87    let config_path = balancer.config_path.read().clone();
88    info!(config_path = %config_path, "Starting config reload");
89
90    // Load and finalize the new configuration. This handles all validation and defaults.
91    let new_config_raw = try_load_config(&config_path)?.unwrap_or_default();
92    let new_config = new_config_raw.finalize()?;
93
94    // Destructure the config for cleaner access. `finalize` guarantees values are present.
95    let new_balancer_cfg = new_config.balancer.unwrap();
96    let crate::config::BalancerConfig {
97        max_batch_size: Some(new_max_batch_size),
98        base_cooldown_secs: Some(new_base_cooldown_secs),
99        max_cooldown_secs: Some(new_max_cooldown_secs),
100        health_check_interval_secs: Some(new_health_check_interval_secs),
101        health_check_timeout_secs: Some(new_health_check_timeout_secs),
102        latency_smoothing_factor: Some(new_latency_smoothing_factor),
103        connect_timeout_ms: Some(new_connect_timeout_ms),
104        timeout_secs: Some(new_timeout_secs),
105        pool_idle_timeout_secs: Some(new_pool_idle_timeout_secs),
106        pool_max_idle_per_host: Some(new_pool_max_idle_per_host),
107        endpoints: Some(new_endpoints_list),
108        ..
109    } = new_balancer_cfg
110    else {
111        // This case should be unreachable due to `finalize()`
112        return Err(LoadBalancerError::ConfigError(
113            "Finalized config is missing required values.".to_string(),
114        ));
115    };
116
117    let client_settings_changed = {
118        *balancer.connect_timeout_ms.read() != new_connect_timeout_ms
119            || *balancer.timeout_secs.read() != new_timeout_secs
120            || *balancer.pool_idle_timeout_secs.read() != new_pool_idle_timeout_secs
121            || *balancer.pool_max_idle_per_host.read() != new_pool_max_idle_per_host
122    };
123
124    if client_settings_changed {
125        info!("HTTP client settings have changed, rebuilding client.");
126        let new_client = Client::builder()
127            .tcp_nodelay(true)
128            .connect_timeout(Duration::from_millis(new_connect_timeout_ms))
129            .timeout(Duration::from_secs(new_timeout_secs))
130            .pool_idle_timeout(Some(Duration::from_secs(new_pool_idle_timeout_secs)))
131            .pool_max_idle_per_host(new_pool_max_idle_per_host)
132            .http1_title_case_headers()
133            .build()
134            .expect("Failed to build new HTTP client");
135
136        *balancer.client.write() = new_client;
137        *balancer.connect_timeout_ms.write() = new_connect_timeout_ms;
138        *balancer.timeout_secs.write() = new_timeout_secs;
139        *balancer.pool_idle_timeout_secs.write() = new_pool_idle_timeout_secs;
140        *balancer.pool_max_idle_per_host.write() = new_pool_max_idle_per_host;
141    }
142
143    let current_endpoints = balancer.endpoints.read();
144    let current_urls: HashSet<String> = current_endpoints.iter().map(|e| e.url.clone()).collect();
145    let new_urls: HashSet<String> = new_endpoints_list.iter().map(|e| e.url.clone()).collect();
146
147    let endpoints_added: Vec<String> = new_urls.difference(&current_urls).cloned().collect();
148    let endpoints_removed: Vec<String> = current_urls.difference(&new_urls).cloned().collect();
149
150    let mut endpoints_modified = Vec::new();
151    for (index, new_ep_cfg) in new_endpoints_list.iter().enumerate() {
152        if let Some(current_ep) = current_endpoints.iter().find(|e| e.url == new_ep_cfg.url) {
153            let new_name = new_ep_cfg.name.clone().unwrap_or_else(|| {
154                let domain_name = extract_domain_name(&new_ep_cfg.url);
155                format!("{:03}_{}", index + 1, domain_name)
156            });
157
158            if current_ep.rate_limit_per_sec != new_ep_cfg.rate_limit_per_sec
159                || current_ep.burst_size != new_ep_cfg.burst_size
160                || current_ep.weight != new_ep_cfg.weight.unwrap()
161                || current_ep.name != new_name
162            {
163                endpoints_modified.push(new_ep_cfg.url.clone());
164            }
165        }
166    }
167
168    drop(current_endpoints);
169
170    for removed_url in &endpoints_removed {
171        cleanup_endpoint_metrics(removed_url);
172    }
173
174    let mut new_rpc_endpoints = Vec::new();
175    let mut new_rate_limiters = HashMap::new();
176
177    {
178        let current_endpoints = balancer.endpoints.read();
179        let current_limiters = balancer.rate_limiters.read();
180
181        for (index, new_ep_cfg) in new_endpoints_list.iter().enumerate() {
182            let new_weight = new_ep_cfg.weight.unwrap();
183            let new_rate_limit = new_ep_cfg.rate_limit_per_sec;
184            let new_burst = new_ep_cfg.burst_size;
185
186            let name = new_ep_cfg.name.clone().unwrap_or_else(|| {
187                let domain_name = extract_domain_name(&new_ep_cfg.url);
188                format!("{:03}_{}", index + 1, domain_name)
189            });
190
191            if let Some(existing_ep) = current_endpoints.iter().find(|e| e.url == new_ep_cfg.url) {
192                // Preserve state of existing endpoint, including metrics
193                let mut updated_ep = existing_ep.clone();
194                updated_ep.name = name;
195                updated_ep.rate_limit_per_sec = new_rate_limit;
196                updated_ep.burst_size = new_burst;
197                updated_ep.weight = new_weight;
198                new_rpc_endpoints.push(updated_ep);
199
200                // Update rate limiter only if changed
201                if existing_ep.rate_limit_per_sec != new_rate_limit
202                    || existing_ep.burst_size != new_burst
203                {
204                    new_rate_limiters.insert(
205                        new_ep_cfg.url.clone(),
206                        create_rate_limiter(new_burst, new_rate_limit),
207                    );
208                } else if let Some(existing_limiter) = current_limiters.get(&new_ep_cfg.url) {
209                    new_rate_limiters.insert(new_ep_cfg.url.clone(), existing_limiter.clone());
210                }
211            } else {
212                // This is a new endpoint
213                let new_ep = RpcEndpoint {
214                    name,
215                    url: new_ep_cfg.url.clone(),
216                    healthy: true,
217                    last_check: Instant::now(),
218                    cooldown_until: None,
219                    cooldown_attempts: 0,
220                    rate_limit_per_sec: new_rate_limit,
221                    burst_size: new_burst,
222                    weight: new_weight,
223                    metrics: Arc::new(EndpointMetrics {
224                        ema_latency_ms: AtomicU64::new(*balancer.timeout_secs.read() * 1000),
225                        ..Default::default()
226                    }),
227                };
228                new_rpc_endpoints.push(new_ep);
229
230                new_rate_limiters
231                    .insert(new_ep_cfg.url.clone(), create_rate_limiter(new_burst, new_rate_limit));
232
233                initialize_endpoint_metrics(&new_ep_cfg.url);
234            }
235        }
236    }
237
238    // Atomically update the load balancer state
239    *balancer.endpoints.write() = new_rpc_endpoints;
240    *balancer.rate_limiters.write() = new_rate_limiters;
241    *balancer.max_batch_size.write() = new_max_batch_size;
242    *balancer.base_cooldown_secs.write() = new_base_cooldown_secs;
243    *balancer.max_cooldown_secs.write() = new_max_cooldown_secs;
244    *balancer.health_check_interval_secs.write() = new_health_check_interval_secs;
245    *balancer.health_check_timeout_secs.write() = new_health_check_timeout_secs;
246    *balancer.latency_smoothing_factor.write() = new_latency_smoothing_factor;
247
248    TOTAL_ENDPOINTS.set(new_endpoints_list.len() as i64);
249
250    let healthy_count = {
251        let endpoints = balancer.endpoints.read();
252        endpoints.iter().filter(|e| e.is_available()).count() as i64
253    };
254    HEALTHY_ENDPOINTS.set(healthy_count);
255
256    let changes = ConfigChanges {
257        endpoints_added: endpoints_added.clone(),
258        endpoints_removed: endpoints_removed.clone(),
259        endpoints_modified: endpoints_modified.clone(),
260        config_updated: true,
261        client_rebuilt: client_settings_changed,
262    };
263
264    info!(
265        added = endpoints_added.len(),
266        removed = endpoints_removed.len(),
267        modified = endpoints_modified.len(),
268        total_endpoints = new_endpoints_list.len(),
269        client_rebuilt = client_settings_changed,
270        "Configuration reload complete."
271    );
272
273    Ok(ReloadResponse {
274        success: true,
275        message: "Configuration reloaded successfully".to_string(),
276        error: None,
277        changes,
278    })
279}