azoth_balancer/
endpoint.rs

1//! This module defines the core data structures for the load balancer.
2//!
3//! It contains the `RpcEndpoint` struct, which represents the state and
4//! configuration of a single upstream node, and the `LoadBalancerError` enum
5//! for handling all possible error conditions within the application.
6
7use crate::config::ConfigError;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::{sync::Arc, time::Instant};
10use thiserror::Error;
11
12#[derive(Debug, Error)]
13pub enum LoadBalancerError {
14    #[error("Rate limit exceeded for endpoint: {0}")]
15    RateLimited(String),
16    #[error("Upstream error: {0}")]
17    UpstreamError(String),
18    #[error("Configuration error: {0}")]
19    ConfigError(String),
20    #[error("Bad request: {0}")]
21    BadRequest(String),
22}
23
24impl From<ConfigError> for LoadBalancerError {
25    fn from(err: ConfigError) -> Self {
26        LoadBalancerError::ConfigError(err.to_string())
27    }
28}
29
30// --- New Endpoint Metrics Logic ---
31
32/// Represents the type of error encountered during a request.
33#[derive(Copy, Clone, Debug)]
34pub enum ErrorKind {
35    Timeout,
36    ConnectionError,
37    Http5xx,
38    Http4xx,
39    RateLimit,
40}
41
42/// Represents the outcome of a forwarded request for metrics tracking.
43pub enum RequestOutcome {
44    Success { latency_ms: u64 },
45    Failure { error_kind: ErrorKind, latency_ms: u64 },
46}
47
48/// A dedicated structure for tracking the performance metrics of an endpoint.
49#[derive(Debug, Default)]
50pub struct EndpointMetrics {
51    /// EMA of latency for successful requests (in milliseconds).
52    pub ema_latency_ms: AtomicU64,
53    /// EMA of a "penalty" score that increases on failures (in milliseconds).
54    pub ema_error_penalty_ms: AtomicU64,
55    /// Count of consecutive failures for exponential backoff of penalty.
56    pub consecutive_failures: AtomicU32,
57    /// Timestamp (seconds since UNIX_EPOCH) of when this endpoint was last selected.
58    pub last_selected: AtomicU64,
59}
60
61impl EndpointMetrics {
62    /// Updates the endpoint's metrics based on the outcome of a request.
63    pub fn update(&self, outcome: &RequestOutcome, latency_smoothing_factor: f64) {
64        match outcome {
65            RequestOutcome::Success { latency_ms } => {
66                // Update the success latency EMA.
67                self.update_ema(&self.ema_latency_ms, *latency_ms, latency_smoothing_factor);
68                // Reset failure count and decay error penalty quickly on success.
69                self.consecutive_failures.store(0, Ordering::Relaxed);
70                self.update_ema(&self.ema_error_penalty_ms, 0, 0.5); // Fast decay
71            }
72            RequestOutcome::Failure { error_kind, latency_ms } => {
73                let penalty = self.calculate_penalty(*error_kind, *latency_ms);
74                // Update the error penalty EMA aggressively.
75                self.update_ema(&self.ema_error_penalty_ms, penalty, 0.8); // Fast attack
76                let _ = self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
77            }
78        }
79    }
80
81    /// A generic helper to update an atomic U64 using an EMA formula.
82    pub fn update_ema(&self, atomic: &AtomicU64, new_value: u64, alpha: f64) {
83        let _ = atomic.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_ema| {
84            // Use floating point for accurate calculation, then convert back to u64.
85            let new_ema = (alpha * new_value as f64) + ((1.0 - alpha) * current_ema as f64);
86            Some(new_ema.round() as u64)
87        });
88    }
89
90    /// Calculates a penalty value based on the type of error.
91    fn calculate_penalty(&self, error_kind: ErrorKind, latency_ms: u64) -> u64 {
92        let base_penalty = match error_kind {
93            ErrorKind::Timeout => 30_000,         // 30 seconds - very bad
94            ErrorKind::ConnectionError => 20_000, // 20 seconds - very bad
95            ErrorKind::Http5xx => 15_000,         // 15 seconds - bad
96            ErrorKind::Http4xx => 5_000,          // 5 seconds - not great
97            ErrorKind::RateLimit => 10_000,       // 10 seconds - bad, but often temporary
98        };
99        // The penalty is the worse of the actual latency or the base penalty for the error type.
100        base_penalty.max(latency_ms)
101    }
102
103    /// Calculates the total "cost" of using an endpoint, combining latency and penalties.
104    /// This is the primary value used by the selection strategy.
105    pub fn get_total_cost(&self) -> u64 {
106        let success_latency = self.ema_latency_ms.load(Ordering::Relaxed);
107        let error_penalty = self.ema_error_penalty_ms.load(Ordering::Relaxed);
108        let raw_cost = success_latency.saturating_add(error_penalty);
109
110        // Prevent costs from dropping too low and creating feedback loops
111        raw_cost.max(1000) // Minimum 1000ms cost
112    }
113}
114
115/// Represents the state and configuration of a single upstream JSON-RPC endpoint.
116#[derive(Debug, Clone)]
117pub struct RpcEndpoint {
118    /// The name of the endpoint for identification
119    pub name: String,
120    /// The URL of the RPC endpoint.
121    pub url: String,
122    /// Whether the endpoint is currently considered healthy based on the last health check.
123    pub healthy: bool,
124    /// The timestamp of the last health check.
125    pub last_check: Instant,
126    /// If the endpoint is in cooldown, this holds the time until it is available again.
127    pub cooldown_until: Option<Instant>,
128    /// The number of times this endpoint has been put into cooldown. Used for exponential backoff.
129    pub cooldown_attempts: u32,
130    /// The configured rate limit in requests per second.
131    pub rate_limit_per_sec: u32,
132    /// The configured burst size for the rate limiter.
133    pub burst_size: u32,
134    /// The manual priority weight assigned to this endpoint. Higher is better.
135    pub weight: u32,
136    /// A shared, atomically updatable struct for tracking performance metrics.
137    pub metrics: Arc<EndpointMetrics>,
138}
139
140impl RpcEndpoint {
141    /// Returns `true` if the endpoint is currently in a cooldown period.
142    pub fn is_in_cooldown(&self) -> bool {
143        if let Some(until) = self.cooldown_until {
144            Instant::now() < until
145        } else {
146            false
147        }
148    }
149
150    /// Returns the number of seconds remaining in the cooldown period.
151    /// Returns 0 if the endpoint is not in cooldown.
152    pub fn cooldown_remaining_secs(&self) -> i64 {
153        match self.cooldown_until {
154            Some(until) => {
155                let now = Instant::now();
156                if until > now {
157                    (until.duration_since(now).as_secs()) as i64
158                } else {
159                    0
160                }
161            }
162            None => 0,
163        }
164    }
165
166    /// Returns `true` if the endpoint is healthy and not in cooldown.
167    /// This is the primary check to determine if an endpoint can be used.
168    pub fn is_available(&self) -> bool {
169        self.healthy && !self.is_in_cooldown()
170    }
171}