1use async_trait::async_trait;
10use std::sync::Arc;
11use std::time::Duration;
12
13use chainrpc_core::error::TransportError;
14use chainrpc_core::metrics::ProviderMetrics;
15use chainrpc_core::policy::{
16 CircuitBreaker, CircuitBreakerConfig, RateLimiter, RateLimiterConfig, RetryConfig, RetryPolicy,
17};
18use chainrpc_core::request::{JsonRpcRequest, JsonRpcResponse};
19use chainrpc_core::transport::{HealthStatus, RpcTransport};
20
21#[derive(Debug, Clone)]
23pub struct HttpClientConfig {
24 pub retry: RetryConfig,
25 pub circuit_breaker: CircuitBreakerConfig,
26 pub rate_limiter: RateLimiterConfig,
27 pub request_timeout: Duration,
28}
29
30impl Default for HttpClientConfig {
31 fn default() -> Self {
32 Self {
33 retry: RetryConfig::default(),
34 circuit_breaker: CircuitBreakerConfig::default(),
35 rate_limiter: RateLimiterConfig::default(),
36 request_timeout: Duration::from_secs(30),
37 }
38 }
39}
40
41pub struct HttpRpcClient {
43 url: String,
44 http: reqwest::Client,
45 retry: RetryPolicy,
46 circuit: CircuitBreaker,
47 rate_limiter: RateLimiter,
48 #[allow(dead_code)]
49 request_timeout: Duration,
50 metrics: Option<Arc<ProviderMetrics>>,
51 adaptive_remaining: std::sync::atomic::AtomicU32,
53}
54
55impl HttpRpcClient {
56 pub fn new(url: impl Into<String>, config: HttpClientConfig) -> Self {
58 let http = reqwest::Client::builder()
59 .timeout(config.request_timeout)
60 .build()
61 .expect("failed to build reqwest client");
62
63 Self {
64 url: url.into(),
65 http,
66 retry: RetryPolicy::new(config.retry),
67 circuit: CircuitBreaker::new(config.circuit_breaker),
68 rate_limiter: RateLimiter::new(config.rate_limiter),
69 request_timeout: config.request_timeout,
70 metrics: None,
71 adaptive_remaining: std::sync::atomic::AtomicU32::new(u32::MAX),
72 }
73 }
74
75 pub fn with_metrics(
77 url: impl Into<String>,
78 config: HttpClientConfig,
79 metrics: Arc<ProviderMetrics>,
80 ) -> Self {
81 let mut client = Self::new(url, config);
82 client.metrics = Some(metrics);
83 client
84 }
85
86 pub fn default_for(url: impl Into<String>) -> Self {
88 Self::new(url, HttpClientConfig::default())
89 }
90
91 async fn send_once(&self, req: &JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
92 let resp = self
93 .http
94 .post(&self.url)
95 .json(req)
96 .send()
97 .await
98 .map_err(|e| TransportError::Http(e.to_string()))?;
99
100 let rl_info = chainrpc_core::rate_limit_headers::RateLimitInfo::from_headers(
102 resp.headers()
103 .iter()
104 .map(|(k, v)| (k.as_str(), v.to_str().unwrap_or(""))),
105 );
106 if let Some(remaining) = rl_info.remaining {
107 self.adaptive_remaining
108 .store(remaining, std::sync::atomic::Ordering::Relaxed);
109 }
110
111 if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
112 if let Some(ref m) = self.metrics {
113 m.record_rate_limit();
114 }
115 let _wait = rl_info
116 .retry_after
117 .unwrap_or(std::time::Duration::from_secs(1));
118 return Err(TransportError::RateLimited {
119 provider: self.url.clone(),
120 });
121 }
122
123 if !resp.status().is_success() {
124 let status = resp.status().as_u16();
125 let body = resp.text().await.unwrap_or_default();
126 return Err(TransportError::Http(format!("HTTP {status}: {body}")));
127 }
128
129 resp.json::<JsonRpcResponse>()
130 .await
131 .map_err(|e| TransportError::Http(e.to_string()))
132 }
133}
134
135#[async_trait]
136impl RpcTransport for HttpRpcClient {
137 async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
138 if !self.rate_limiter.try_acquire() {
140 if let Some(ref m) = self.metrics {
141 m.record_rate_limit();
142 }
143 let wait = self.rate_limiter.wait_time();
144 tracing::debug!(wait_ms = wait.as_millis(), "rate limited — backing off");
145 tokio::time::sleep(wait).await;
146 }
147
148 if !self.circuit.is_allowed() {
150 if let Some(ref m) = self.metrics {
151 m.record_circuit_open();
152 }
153 return Err(TransportError::CircuitOpen {
154 provider: self.url.clone(),
155 });
156 }
157
158 let start = std::time::Instant::now();
159
160 let safety = chainrpc_core::method_safety::classify_method(&req.method);
162
163 let mut attempt = 0u32;
165 loop {
166 attempt += 1;
167 match self.send_once(&req).await {
168 Ok(resp) => {
169 self.circuit.record_success();
170 if let Some(ref m) = self.metrics {
171 m.record_success(start.elapsed());
172 }
173 return Ok(resp);
174 }
175 Err(e)
176 if e.is_retryable()
177 && safety == chainrpc_core::method_safety::MethodSafety::Safe =>
178 {
179 self.circuit.record_failure();
180 match self.retry.next_delay(attempt) {
181 Some(delay) => {
182 tracing::warn!(
183 attempt,
184 delay_ms = delay.as_millis(),
185 error = %e,
186 url = %self.url,
187 "retrying request"
188 );
189 tokio::time::sleep(delay).await;
190 }
191 None => {
192 tracing::error!(
193 attempt,
194 error = %e,
195 url = %self.url,
196 "max retries exceeded"
197 );
198 if let Some(ref m) = self.metrics {
199 m.record_failure();
200 }
201 return Err(e);
202 }
203 }
204 }
205 Err(e) => {
206 if let Some(ref m) = self.metrics {
209 m.record_failure();
210 }
211 return Err(e);
212 }
213 }
214 }
215 }
216
217 async fn send_batch(
219 &self,
220 reqs: Vec<JsonRpcRequest>,
221 ) -> Result<Vec<JsonRpcResponse>, TransportError> {
222 if reqs.is_empty() {
223 return Ok(vec![]);
224 }
225
226 let resp = self
227 .http
228 .post(&self.url)
229 .json(&reqs)
230 .send()
231 .await
232 .map_err(|e| TransportError::Http(e.to_string()))?;
233
234 if !resp.status().is_success() {
235 let status = resp.status().as_u16();
236 let body = resp.text().await.unwrap_or_default();
237 return Err(TransportError::Http(format!("HTTP {status}: {body}")));
238 }
239
240 resp.json::<Vec<JsonRpcResponse>>()
241 .await
242 .map_err(|e| TransportError::Http(e.to_string()))
243 }
244
245 fn health(&self) -> HealthStatus {
246 match self.circuit.state() {
247 chainrpc_core::policy::CircuitState::Open => HealthStatus::Unhealthy,
248 chainrpc_core::policy::CircuitState::HalfOpen => HealthStatus::Degraded,
249 chainrpc_core::policy::CircuitState::Closed => HealthStatus::Healthy,
250 }
251 }
252
253 fn url(&self) -> &str {
254 &self.url
255 }
256}