azoth_balancer/endpoint.rs
1//! This module defines the core data structures for the load balancer.
2//!
3//! It contains the `RpcEndpoint` struct, which represents the state and
4//! configuration of a single upstream node, and the `LoadBalancerError` enum
5//! for handling all possible error conditions within the application.
6
7use crate::config::ConfigError;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::{sync::Arc, time::Instant};
10use thiserror::Error;
11
12#[derive(Debug, Error)]
13pub enum LoadBalancerError {
14 #[error("Rate limit exceeded for endpoint: {0}")]
15 RateLimited(String),
16 #[error("Upstream error: {0}")]
17 UpstreamError(String),
18 #[error("Configuration error: {0}")]
19 ConfigError(String),
20 #[error("Bad request: {0}")]
21 BadRequest(String),
22}
23
24impl From<ConfigError> for LoadBalancerError {
25 fn from(err: ConfigError) -> Self {
26 LoadBalancerError::ConfigError(err.to_string())
27 }
28}
29
30// --- New Endpoint Metrics Logic ---
31
32/// Represents the type of error encountered during a request.
33#[derive(Copy, Clone, Debug)]
34pub enum ErrorKind {
35 Timeout,
36 ConnectionError,
37 Http5xx,
38 Http4xx,
39 RateLimit,
40}
41
42/// Represents the outcome of a forwarded request for metrics tracking.
43pub enum RequestOutcome {
44 Success { latency_ms: u64 },
45 Failure { error_kind: ErrorKind, latency_ms: u64 },
46}
47
48/// A dedicated structure for tracking the performance metrics of an endpoint.
49#[derive(Debug, Default)]
50pub struct EndpointMetrics {
51 /// EMA of latency for successful requests (in milliseconds).
52 pub ema_latency_ms: AtomicU64,
53 /// EMA of a "penalty" score that increases on failures (in milliseconds).
54 pub ema_error_penalty_ms: AtomicU64,
55 /// Count of consecutive failures for exponential backoff of penalty.
56 pub consecutive_failures: AtomicU32,
57 /// Timestamp (seconds since UNIX_EPOCH) of when this endpoint was last selected.
58 pub last_selected: AtomicU64,
59}
60
61impl EndpointMetrics {
62 /// Updates the endpoint's metrics based on the outcome of a request.
63 pub fn update(&self, outcome: &RequestOutcome, latency_smoothing_factor: f64) {
64 match outcome {
65 RequestOutcome::Success { latency_ms } => {
66 // Update the success latency EMA.
67 self.update_ema(&self.ema_latency_ms, *latency_ms, latency_smoothing_factor);
68 // Reset failure count and decay error penalty quickly on success.
69 self.consecutive_failures.store(0, Ordering::Relaxed);
70 self.update_ema(&self.ema_error_penalty_ms, 0, 0.5); // Fast decay
71 }
72 RequestOutcome::Failure { error_kind, latency_ms } => {
73 let penalty = self.calculate_penalty(*error_kind, *latency_ms);
74 // Update the error penalty EMA aggressively.
75 self.update_ema(&self.ema_error_penalty_ms, penalty, 0.8); // Fast attack
76 let _ = self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
77 }
78 }
79 }
80
81 /// A generic helper to update an atomic U64 using an EMA formula.
82 pub fn update_ema(&self, atomic: &AtomicU64, new_value: u64, alpha: f64) {
83 let _ = atomic.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_ema| {
84 // Use floating point for accurate calculation, then convert back to u64.
85 let new_ema = (alpha * new_value as f64) + ((1.0 - alpha) * current_ema as f64);
86 Some(new_ema.round() as u64)
87 });
88 }
89
90 /// Calculates a penalty value based on the type of error.
91 fn calculate_penalty(&self, error_kind: ErrorKind, latency_ms: u64) -> u64 {
92 let base_penalty = match error_kind {
93 ErrorKind::Timeout => 30_000, // 30 seconds - very bad
94 ErrorKind::ConnectionError => 20_000, // 20 seconds - very bad
95 ErrorKind::Http5xx => 15_000, // 15 seconds - bad
96 ErrorKind::Http4xx => 5_000, // 5 seconds - not great
97 ErrorKind::RateLimit => 10_000, // 10 seconds - bad, but often temporary
98 };
99 // The penalty is the worse of the actual latency or the base penalty for the error type.
100 base_penalty.max(latency_ms)
101 }
102
103 /// Calculates the total "cost" of using an endpoint, combining latency and penalties.
104 /// This is the primary value used by the selection strategy.
105 pub fn get_total_cost(&self) -> u64 {
106 let success_latency = self.ema_latency_ms.load(Ordering::Relaxed);
107 let error_penalty = self.ema_error_penalty_ms.load(Ordering::Relaxed);
108 let raw_cost = success_latency.saturating_add(error_penalty);
109
110 // Prevent costs from dropping too low and creating feedback loops
111 raw_cost.max(1000) // Minimum 1000ms cost
112 }
113}
114
115/// Represents the state and configuration of a single upstream JSON-RPC endpoint.
116#[derive(Debug, Clone)]
117pub struct RpcEndpoint {
118 /// The name of the endpoint for identification
119 pub name: String,
120 /// The URL of the RPC endpoint.
121 pub url: String,
122 /// Whether the endpoint is currently considered healthy based on the last health check.
123 pub healthy: bool,
124 /// The timestamp of the last health check.
125 pub last_check: Instant,
126 /// If the endpoint is in cooldown, this holds the time until it is available again.
127 pub cooldown_until: Option<Instant>,
128 /// The number of times this endpoint has been put into cooldown. Used for exponential backoff.
129 pub cooldown_attempts: u32,
130 /// The configured rate limit in requests per second.
131 pub rate_limit_per_sec: u32,
132 /// The configured burst size for the rate limiter.
133 pub burst_size: u32,
134 /// The manual priority weight assigned to this endpoint. Higher is better.
135 pub weight: u32,
136 /// A shared, atomically updatable struct for tracking performance metrics.
137 pub metrics: Arc<EndpointMetrics>,
138}
139
140impl RpcEndpoint {
141 /// Returns `true` if the endpoint is currently in a cooldown period.
142 pub fn is_in_cooldown(&self) -> bool {
143 if let Some(until) = self.cooldown_until {
144 Instant::now() < until
145 } else {
146 false
147 }
148 }
149
150 /// Returns the number of seconds remaining in the cooldown period.
151 /// Returns 0 if the endpoint is not in cooldown.
152 pub fn cooldown_remaining_secs(&self) -> i64 {
153 match self.cooldown_until {
154 Some(until) => {
155 let now = Instant::now();
156 if until > now {
157 (until.duration_since(now).as_secs()) as i64
158 } else {
159 0
160 }
161 }
162 None => 0,
163 }
164 }
165
166 /// Returns `true` if the endpoint is healthy and not in cooldown.
167 /// This is the primary check to determine if an endpoint can be used.
168 pub fn is_available(&self) -> bool {
169 self.healthy && !self.is_in_cooldown()
170 }
171}