Skip to main content

chainrpc_http/
client.rs

1//! HTTP JSON-RPC client backed by `reqwest`.
2//!
3//! Features:
4//! - Automatic retry with exponential backoff for transient errors
5//! - Circuit breaker per provider
6//! - Rate limiter (token bucket)
7//! - Batch request support (true HTTP batching)
8
9use async_trait::async_trait;
10use std::sync::Arc;
11use std::time::Duration;
12
13use chainrpc_core::error::TransportError;
14use chainrpc_core::metrics::ProviderMetrics;
15use chainrpc_core::policy::{
16    CircuitBreaker, CircuitBreakerConfig, RateLimiter, RateLimiterConfig, RetryConfig, RetryPolicy,
17};
18use chainrpc_core::request::{JsonRpcRequest, JsonRpcResponse};
19use chainrpc_core::transport::{HealthStatus, RpcTransport};
20
21/// Configuration for `HttpRpcClient`.
22#[derive(Debug, Clone)]
23pub struct HttpClientConfig {
24    pub retry: RetryConfig,
25    pub circuit_breaker: CircuitBreakerConfig,
26    pub rate_limiter: RateLimiterConfig,
27    pub request_timeout: Duration,
28}
29
30impl Default for HttpClientConfig {
31    fn default() -> Self {
32        Self {
33            retry: RetryConfig::default(),
34            circuit_breaker: CircuitBreakerConfig::default(),
35            rate_limiter: RateLimiterConfig::default(),
36            request_timeout: Duration::from_secs(30),
37        }
38    }
39}
40
41/// HTTP JSON-RPC client with built-in reliability features.
42pub struct HttpRpcClient {
43    url: String,
44    http: reqwest::Client,
45    retry: RetryPolicy,
46    circuit: CircuitBreaker,
47    rate_limiter: RateLimiter,
48    #[allow(dead_code)]
49    request_timeout: Duration,
50    metrics: Option<Arc<ProviderMetrics>>,
51    /// Adaptive rate limit state from response headers.
52    adaptive_remaining: std::sync::atomic::AtomicU32,
53}
54
55impl HttpRpcClient {
56    /// Create a new client for the given JSON-RPC endpoint URL.
57    pub fn new(url: impl Into<String>, config: HttpClientConfig) -> Self {
58        let http = reqwest::Client::builder()
59            .timeout(config.request_timeout)
60            .build()
61            .expect("failed to build reqwest client");
62
63        Self {
64            url: url.into(),
65            http,
66            retry: RetryPolicy::new(config.retry),
67            circuit: CircuitBreaker::new(config.circuit_breaker),
68            rate_limiter: RateLimiter::new(config.rate_limiter),
69            request_timeout: config.request_timeout,
70            metrics: None,
71            adaptive_remaining: std::sync::atomic::AtomicU32::new(u32::MAX),
72        }
73    }
74
75    /// Create a new client with metrics recording enabled.
76    pub fn with_metrics(
77        url: impl Into<String>,
78        config: HttpClientConfig,
79        metrics: Arc<ProviderMetrics>,
80    ) -> Self {
81        let mut client = Self::new(url, config);
82        client.metrics = Some(metrics);
83        client
84    }
85
86    /// Create with default configuration.
87    pub fn default_for(url: impl Into<String>) -> Self {
88        Self::new(url, HttpClientConfig::default())
89    }
90
91    async fn send_once(&self, req: &JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
92        let resp = self
93            .http
94            .post(&self.url)
95            .json(req)
96            .send()
97            .await
98            .map_err(|e| TransportError::Http(e.to_string()))?;
99
100        // Parse rate limit headers from response
101        let rl_info = chainrpc_core::rate_limit_headers::RateLimitInfo::from_headers(
102            resp.headers()
103                .iter()
104                .map(|(k, v)| (k.as_str(), v.to_str().unwrap_or(""))),
105        );
106        if let Some(remaining) = rl_info.remaining {
107            self.adaptive_remaining
108                .store(remaining, std::sync::atomic::Ordering::Relaxed);
109        }
110
111        if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
112            if let Some(ref m) = self.metrics {
113                m.record_rate_limit();
114            }
115            let _wait = rl_info
116                .retry_after
117                .unwrap_or(std::time::Duration::from_secs(1));
118            return Err(TransportError::RateLimited {
119                provider: self.url.clone(),
120            });
121        }
122
123        if !resp.status().is_success() {
124            let status = resp.status().as_u16();
125            let body = resp.text().await.unwrap_or_default();
126            return Err(TransportError::Http(format!("HTTP {status}: {body}")));
127        }
128
129        resp.json::<JsonRpcResponse>()
130            .await
131            .map_err(|e| TransportError::Http(e.to_string()))
132    }
133}
134
135#[async_trait]
136impl RpcTransport for HttpRpcClient {
137    async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
138        // Rate limiter check
139        if !self.rate_limiter.try_acquire() {
140            if let Some(ref m) = self.metrics {
141                m.record_rate_limit();
142            }
143            let wait = self.rate_limiter.wait_time();
144            tracing::debug!(wait_ms = wait.as_millis(), "rate limited — backing off");
145            tokio::time::sleep(wait).await;
146        }
147
148        // Circuit breaker check
149        if !self.circuit.is_allowed() {
150            if let Some(ref m) = self.metrics {
151                m.record_circuit_open();
152            }
153            return Err(TransportError::CircuitOpen {
154                provider: self.url.clone(),
155            });
156        }
157
158        let start = std::time::Instant::now();
159
160        // Method safety classification — only Safe methods are auto-retried.
161        let safety = chainrpc_core::method_safety::classify_method(&req.method);
162
163        // Retry loop
164        let mut attempt = 0u32;
165        loop {
166            attempt += 1;
167            match self.send_once(&req).await {
168                Ok(resp) => {
169                    self.circuit.record_success();
170                    if let Some(ref m) = self.metrics {
171                        m.record_success(start.elapsed());
172                    }
173                    return Ok(resp);
174                }
175                Err(e)
176                    if e.is_retryable()
177                        && safety == chainrpc_core::method_safety::MethodSafety::Safe =>
178                {
179                    self.circuit.record_failure();
180                    match self.retry.next_delay(attempt) {
181                        Some(delay) => {
182                            tracing::warn!(
183                                attempt,
184                                delay_ms = delay.as_millis(),
185                                error = %e,
186                                url = %self.url,
187                                "retrying request"
188                            );
189                            tokio::time::sleep(delay).await;
190                        }
191                        None => {
192                            tracing::error!(
193                                attempt,
194                                error = %e,
195                                url = %self.url,
196                                "max retries exceeded"
197                            );
198                            if let Some(ref m) = self.metrics {
199                                m.record_failure();
200                            }
201                            return Err(e);
202                        }
203                    }
204                }
205                Err(e) => {
206                    // Non-retryable: either the error itself isn't retryable,
207                    // or the method is Idempotent/Unsafe and must not be auto-retried.
208                    if let Some(ref m) = self.metrics {
209                        m.record_failure();
210                    }
211                    return Err(e);
212                }
213            }
214        }
215    }
216
217    /// True HTTP batch: send all requests as a JSON array in one HTTP call.
218    async fn send_batch(
219        &self,
220        reqs: Vec<JsonRpcRequest>,
221    ) -> Result<Vec<JsonRpcResponse>, TransportError> {
222        if reqs.is_empty() {
223            return Ok(vec![]);
224        }
225
226        let resp = self
227            .http
228            .post(&self.url)
229            .json(&reqs)
230            .send()
231            .await
232            .map_err(|e| TransportError::Http(e.to_string()))?;
233
234        if !resp.status().is_success() {
235            let status = resp.status().as_u16();
236            let body = resp.text().await.unwrap_or_default();
237            return Err(TransportError::Http(format!("HTTP {status}: {body}")));
238        }
239
240        resp.json::<Vec<JsonRpcResponse>>()
241            .await
242            .map_err(|e| TransportError::Http(e.to_string()))
243    }
244
245    fn health(&self) -> HealthStatus {
246        match self.circuit.state() {
247            chainrpc_core::policy::CircuitState::Open => HealthStatus::Unhealthy,
248            chainrpc_core::policy::CircuitState::HalfOpen => HealthStatus::Degraded,
249            chainrpc_core::policy::CircuitState::Closed => HealthStatus::Healthy,
250        }
251    }
252
253    fn url(&self) -> &str {
254        &self.url
255    }
256}