Skip to main content

perpcity_sdk/transport/
config.rs

1//! Transport configuration with builder pattern.
2//!
3//! Configure multi-endpoint RPC transport with per-endpoint timeouts,
4//! retry policies, circuit breaker thresholds, and routing strategies.
5//!
6//! Endpoints are organized into three pools:
7//!
8//! - **Shared** (`.shared_endpoint()`) — handles any request type and serves as
9//!   fallback when dedicated read/write endpoints are unhealthy.
10//! - **Read** (`.read_endpoint()`) — dedicated to read operations (`eth_call`,
11//!   `eth_getBalance`, etc.). Falls back to the shared pool if all read endpoints
12//!   are unhealthy.
13//! - **Write** (`.write_endpoint()`) — dedicated to write operations
14//!   (`eth_sendRawTransaction`). Falls back to the shared pool if all write
15//!   endpoints are unhealthy.
16//!
17//! # Example
18//!
19//! ```
20//! use perpcity_sdk::transport::config::{TransportConfig, Strategy};
21//! use std::time::Duration;
22//!
23//! // Single shared endpoint (simplest setup, all requests go here)
24//! let config = TransportConfig::builder()
25//!     .shared_endpoint("https://base-rpc.publicnode.com")
26//!     .build()
27//!     .unwrap();
28//!
29//! // Read/write split: free public RPC for reads, paid for writes + fallback
30//! let config = TransportConfig::builder()
31//!     .shared_endpoint("https://base.g.alchemy.com/v2/KEY")
32//!     .read_endpoint("https://base-rpc.publicnode.com")
33//!     .ws_endpoint("wss://base-rpc.publicnode.com")
34//!     .strategy(Strategy::LatencyBased)
35//!     .request_timeout(Duration::from_millis(2000))
36//!     .build()
37//!     .unwrap();
38//!
39//! assert_eq!(config.shared_endpoints.len(), 1);
40//! assert_eq!(config.read_endpoints.len(), 1);
41//! assert!(config.ws_endpoint.is_some());
42//! ```
43
44use std::time::Duration;
45
46use alloy::rpc::json_rpc::ResponsePacket;
47
48use crate::errors::PerpCityError;
49
50/// Endpoint selection strategy for routing RPC requests.
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
52pub enum Strategy {
53    /// Cycle through healthy endpoints sequentially.
54    RoundRobin,
55    /// Pick the endpoint with the lowest observed latency.
56    #[default]
57    LatencyBased,
58    /// Fan out reads to `fan_out` endpoints, take the fastest response.
59    /// Writes always go to a single best endpoint.
60    Hedged {
61        /// Number of endpoints to fan out reads to.
62        fan_out: usize,
63    },
64}
65
66/// Circuit breaker configuration per endpoint.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub struct CircuitBreakerConfig {
69    /// Number of consecutive failures before opening the circuit.
70    pub failure_threshold: u32,
71    /// Time to wait in Open state before probing (HalfOpen).
72    pub recovery_timeout: Duration,
73    /// Maximum concurrent probe requests allowed in HalfOpen state.
74    pub half_open_max_requests: u32,
75}
76
77impl Default for CircuitBreakerConfig {
78    fn default() -> Self {
79        Self {
80            failure_threshold: 3,
81            recovery_timeout: Duration::from_secs(30),
82            half_open_max_requests: 1,
83        }
84    }
85}
86
87/// Retry configuration for read operations (any transport or RPC error).
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct ReadRetryConfig {
90    /// Maximum number of retry attempts (0 = no retries, just the initial try).
91    pub max_retries: u32,
92    /// Base delay between retries. Scaled by 2^attempt for exponential backoff.
93    pub base_delay: Duration,
94}
95
96impl Default for ReadRetryConfig {
97    fn default() -> Self {
98        Self {
99            max_retries: 2,
100            base_delay: Duration::from_millis(100),
101        }
102    }
103}
104
105/// Retry configuration for write operations.
106///
107/// Writes are only retried when the RPC node *rejects* the transaction before
108/// mempool inclusion (e.g. `-32003 insufficient funds` from a stale read
109/// replica). A rejected tx never lands on-chain, so resending the same signed
110/// bytes is safe and idempotent.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct WriteRetryConfig {
113    /// Maximum number of retry attempts (0 = no retries, just the initial try).
114    pub max_retries: u32,
115    /// Base delay between retries. Scaled by 2^attempt for exponential backoff.
116    pub base_delay: Duration,
117}
118
119impl Default for WriteRetryConfig {
120    fn default() -> Self {
121        Self {
122            max_retries: 3,
123            base_delay: Duration::from_millis(500),
124        }
125    }
126}
127
128impl WriteRetryConfig {
129    /// Check if a JSON-RPC response is a pre-mempool rejection safe to retry.
130    ///
131    /// Any error response to `eth_sendRawTransaction` means the RPC node
132    /// rejected the transaction before mempool inclusion — the signed bytes
133    /// never landed on-chain, so resending them is always safe and idempotent.
134    ///
135    /// Rather than maintaining a fragile allow-list of specific error codes
136    /// (e.g. `-32003`, `-32000` for "insufficient funds"), we retry on any
137    /// error. The worst case for genuinely invalid transactions is a bounded
138    /// delay (~1.75s) as retries exhaust harmlessly.
139    pub fn is_retriable(&self, response: &ResponsePacket) -> bool {
140        response.first_error_code().is_some()
141    }
142}
143
144/// Complete transport configuration.
145#[derive(Debug, Clone)]
146pub struct TransportConfig {
147    /// Shared HTTP RPC endpoints — handle any request type and serve as
148    /// fallback when dedicated read/write endpoints are unhealthy.
149    pub shared_endpoints: Vec<String>,
150    /// Dedicated read endpoints. Read requests prefer these; falls back to
151    /// shared endpoints if all read endpoints are unhealthy.
152    pub read_endpoints: Vec<String>,
153    /// Dedicated write endpoints. Write requests prefer these; falls back to
154    /// shared endpoints if all write endpoints are unhealthy.
155    pub write_endpoints: Vec<String>,
156    /// Optional WebSocket endpoint URL for subscriptions.
157    pub ws_endpoint: Option<String>,
158    /// Per-request timeout.
159    pub request_timeout: Duration,
160    /// Endpoint selection strategy.
161    pub strategy: Strategy,
162    /// Circuit breaker settings (applied per endpoint).
163    pub circuit_breaker: CircuitBreakerConfig,
164    /// Retry settings for read operations.
165    pub read_retry: ReadRetryConfig,
166    /// Retry settings for write operations (pre-mempool rejections only).
167    pub write_retry: WriteRetryConfig,
168}
169
170impl TransportConfig {
171    /// Create a new builder for `TransportConfig`.
172    pub fn builder() -> TransportConfigBuilder {
173        TransportConfigBuilder::default()
174    }
175}
176
177/// Builder for [`TransportConfig`].
178#[derive(Debug, Clone)]
179pub struct TransportConfigBuilder {
180    shared_endpoints: Vec<String>,
181    read_endpoints: Vec<String>,
182    write_endpoints: Vec<String>,
183    ws_endpoint: Option<String>,
184    request_timeout: Duration,
185    strategy: Strategy,
186    circuit_breaker: CircuitBreakerConfig,
187    read_retry: ReadRetryConfig,
188    write_retry: WriteRetryConfig,
189}
190
191impl Default for TransportConfigBuilder {
192    fn default() -> Self {
193        Self {
194            shared_endpoints: Vec::new(),
195            read_endpoints: Vec::new(),
196            write_endpoints: Vec::new(),
197            ws_endpoint: None,
198            request_timeout: Duration::from_secs(5),
199            strategy: Strategy::default(),
200            circuit_breaker: CircuitBreakerConfig::default(),
201            read_retry: ReadRetryConfig::default(),
202            write_retry: WriteRetryConfig::default(),
203        }
204    }
205}
206
207impl TransportConfigBuilder {
208    /// Add a shared HTTP RPC endpoint.
209    ///
210    /// Shared endpoints handle any request type (reads and writes) and serve
211    /// as the fallback when dedicated read or write endpoints are unhealthy.
212    ///
213    /// At least one endpoint must be configured across all pools.
214    pub fn shared_endpoint(mut self, url: impl Into<String>) -> Self {
215        self.shared_endpoints.push(url.into());
216        self
217    }
218
219    /// Add a dedicated read endpoint.
220    ///
221    /// Read requests (`eth_call`, `eth_getBalance`, etc.) prefer these
222    /// endpoints. If all read endpoints are unhealthy, reads fall back to
223    /// the shared pool.
224    pub fn read_endpoint(mut self, url: impl Into<String>) -> Self {
225        self.read_endpoints.push(url.into());
226        self
227    }
228
229    /// Add a dedicated write endpoint.
230    ///
231    /// Write requests (`eth_sendRawTransaction`) prefer these endpoints.
232    /// If all write endpoints are unhealthy, writes fall back to the shared
233    /// pool.
234    pub fn write_endpoint(mut self, url: impl Into<String>) -> Self {
235        self.write_endpoints.push(url.into());
236        self
237    }
238
239    /// Set the WebSocket endpoint URL for subscriptions.
240    pub fn ws_endpoint(mut self, url: impl Into<String>) -> Self {
241        self.ws_endpoint = Some(url.into());
242        self
243    }
244
245    /// Set the per-request timeout.
246    pub fn request_timeout(mut self, timeout: Duration) -> Self {
247        self.request_timeout = timeout;
248        self
249    }
250
251    /// Set the endpoint selection strategy.
252    pub fn strategy(mut self, strategy: Strategy) -> Self {
253        self.strategy = strategy;
254        self
255    }
256
257    /// Set the circuit breaker configuration.
258    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
259        self.circuit_breaker = config;
260        self
261    }
262
263    /// Set the retry configuration for read operations.
264    pub fn read_retry(mut self, config: ReadRetryConfig) -> Self {
265        self.read_retry = config;
266        self
267    }
268
269    /// Set the retry configuration for write operations.
270    pub fn write_retry(mut self, config: WriteRetryConfig) -> Self {
271        self.write_retry = config;
272        self
273    }
274
275    /// Build the [`TransportConfig`].
276    ///
277    /// Returns an error if no endpoints are configured across any pool, or
278    /// if writes have no reachable endpoint (write + shared pools both empty).
279    pub fn build(self) -> crate::Result<TransportConfig> {
280        let total =
281            self.shared_endpoints.len() + self.read_endpoints.len() + self.write_endpoints.len();
282        if total == 0 {
283            return Err(PerpCityError::InvalidConfig {
284                reason: "no endpoints configured".into(),
285            });
286        }
287        if self.write_endpoints.is_empty() && self.shared_endpoints.is_empty() {
288            return Err(PerpCityError::InvalidConfig {
289                reason: "writes have no reachable endpoint: \
290                         configure at least one shared or write endpoint"
291                    .into(),
292            });
293        }
294        if let Strategy::Hedged { fan_out } = self.strategy
295            && fan_out < 2
296        {
297            return Err(PerpCityError::InvalidConfig {
298                reason: "hedged strategy requires fan_out >= 2".into(),
299            });
300        }
301        Ok(TransportConfig {
302            shared_endpoints: self.shared_endpoints,
303            read_endpoints: self.read_endpoints,
304            write_endpoints: self.write_endpoints,
305            ws_endpoint: self.ws_endpoint,
306            request_timeout: self.request_timeout,
307            strategy: self.strategy,
308            circuit_breaker: self.circuit_breaker,
309            read_retry: self.read_retry,
310            write_retry: self.write_retry,
311        })
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn builder_defaults() {
321        let config = TransportConfig::builder()
322            .shared_endpoint("https://rpc1.example.com")
323            .build()
324            .unwrap();
325        assert_eq!(config.shared_endpoints.len(), 1);
326        assert!(config.read_endpoints.is_empty());
327        assert!(config.write_endpoints.is_empty());
328        assert!(config.ws_endpoint.is_none());
329        assert_eq!(config.request_timeout, Duration::from_secs(5));
330        assert_eq!(config.strategy, Strategy::LatencyBased);
331        assert_eq!(config.circuit_breaker.failure_threshold, 3);
332        assert_eq!(config.read_retry.max_retries, 2);
333        assert_eq!(config.write_retry.max_retries, 3);
334    }
335
336    #[test]
337    fn builder_all_options() {
338        let config = TransportConfig::builder()
339            .shared_endpoint("https://rpc1.example.com")
340            .shared_endpoint("https://rpc2.example.com")
341            .read_endpoint("https://read.example.com")
342            .write_endpoint("https://write.example.com")
343            .ws_endpoint("wss://ws.example.com")
344            .request_timeout(Duration::from_millis(500))
345            .strategy(Strategy::Hedged { fan_out: 3 })
346            .circuit_breaker(CircuitBreakerConfig {
347                failure_threshold: 5,
348                recovery_timeout: Duration::from_secs(60),
349                half_open_max_requests: 2,
350            })
351            .read_retry(ReadRetryConfig {
352                max_retries: 5,
353                base_delay: Duration::from_millis(50),
354            })
355            .write_retry(WriteRetryConfig {
356                max_retries: 1,
357                base_delay: Duration::from_millis(500),
358            })
359            .build()
360            .unwrap();
361
362        assert_eq!(config.shared_endpoints.len(), 2);
363        assert_eq!(config.read_endpoints.len(), 1);
364        assert_eq!(config.write_endpoints.len(), 1);
365        assert_eq!(config.ws_endpoint.as_deref(), Some("wss://ws.example.com"));
366        assert_eq!(config.request_timeout, Duration::from_millis(500));
367        assert!(matches!(config.strategy, Strategy::Hedged { fan_out: 3 }));
368        assert_eq!(config.circuit_breaker.failure_threshold, 5);
369        assert_eq!(config.read_retry.max_retries, 5);
370        assert_eq!(config.write_retry.max_retries, 1);
371    }
372
373    #[test]
374    fn read_write_split() {
375        let config = TransportConfig::builder()
376            .shared_endpoint("https://alchemy.example.com")
377            .read_endpoint("https://public.example.com")
378            .build()
379            .unwrap();
380        assert_eq!(config.shared_endpoints.len(), 1);
381        assert_eq!(config.read_endpoints.len(), 1);
382        assert!(config.write_endpoints.is_empty());
383    }
384
385    #[test]
386    fn no_endpoints_errors() {
387        let result = TransportConfig::builder().build();
388        assert!(result.is_err());
389    }
390
391    #[test]
392    fn read_only_endpoints_errors() {
393        // Only read endpoints, no shared or write — writes have nowhere to go
394        let result = TransportConfig::builder()
395            .read_endpoint("https://read.example.com")
396            .build();
397        assert!(result.is_err());
398    }
399
400    #[test]
401    fn write_only_endpoints_ok() {
402        // Only write endpoints — reads fall back to write pool? No, reads
403        // fall back to shared which is empty. But writes work. This is a
404        // valid (if unusual) config: the user only cares about writes.
405        let result = TransportConfig::builder()
406            .write_endpoint("https://write.example.com")
407            .build();
408        assert!(result.is_ok());
409    }
410
411    #[test]
412    fn hedged_fan_out_one_errors() {
413        let result = TransportConfig::builder()
414            .shared_endpoint("https://rpc1.example.com")
415            .strategy(Strategy::Hedged { fan_out: 1 })
416            .build();
417        assert!(result.is_err());
418    }
419}