Skip to main content

perpcity_sdk/transport/
provider.rs

1//! Multi-endpoint RPC transport with health-aware routing.
2//!
3//! `HftTransport` implements [`tower::Service<RequestPacket>`], which makes it
4//! a valid Alloy [`Transport`](alloy::transports::Transport) via the blanket
5//! impl in `alloy-transport`. This means it can be used directly with
6//! [`RootProvider`](alloy::providers::RootProvider) and all Alloy provider
7//! methods.
8//!
9//! # Features
10//!
11//! - **Per-endpoint circuit breaker**: automatically routes around dead endpoints
12//! - **Strategy-based selection**: round-robin, latency-based, or hedged reads
13//! - **Read/write classification**: reads are retried on failure; writes never are
14//! - **Hedged requests**: fan out reads to N endpoints, take the fastest response;
15//!   losing requests are **cancelled** via `JoinSet::abort_all` to save RPC rate limits
16//! - **Lock-free endpoint selection**: read path uses atomic mirrors, zero mutex
17//!   contention in steady state (all endpoints healthy)
18//! - **Tower integration**: composes with tower timeout and retry middleware
19//!
20//! # Example
21//!
22//! ```rust,no_run
23//! use perpcity_sdk::transport::config::TransportConfig;
24//! use perpcity_sdk::transport::provider::HftTransport;
25//! use alloy::providers::RootProvider;
26//! use alloy::transports::BoxTransport;
27//! use alloy::network::Ethereum;
28//!
29//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
30//! let config = TransportConfig::builder()
31//!     .shared_endpoint("https://base.g.alchemy.com/v2/KEY")
32//!     .read_endpoint("https://base-rpc.publicnode.com")
33//!     .build()?;
34//!
35//! let transport = HftTransport::new(config)?;
36//! let client = alloy::rpc::client::RpcClient::new(BoxTransport::new(transport), false);
37//! let provider: RootProvider<Ethereum> = RootProvider::new(client);
38//! # Ok(())
39//! # }
40//! ```
41
42use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
43use std::sync::{Arc, Mutex};
44use std::task::{Context, Poll};
45use std::time::Instant;
46
47use alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
48use alloy::transports::{TransportError, TransportFut};
49use tower::Service;
50
51use super::config::{Strategy, TransportConfig};
52use super::health::{CircuitState, EndpointHealth, EndpointStatus};
53
54// ── Packed atomic state constants ────────────────────────────────────
55//
56// Circuit state is packed into a single AtomicU64 for lock-free reads:
57//   bits[63:62] = state tag (00=Closed, 01=Open, 10=HalfOpen)
58//   bits[61:0]  = since_ms (Open) or probes_in_flight (HalfOpen) or 0 (Closed)
59
60const TAG_CLOSED: u64 = 0;
61const TAG_OPEN: u64 = 1 << 62;
62const TAG_HALFOPEN: u64 = 2 << 62;
63const TAG_MASK: u64 = 3 << 62;
64
65#[inline]
66fn pack_state(state: CircuitState) -> u64 {
67    match state {
68        CircuitState::Closed => TAG_CLOSED,
69        CircuitState::Open { since_ms } => TAG_OPEN | (since_ms & !TAG_MASK),
70        CircuitState::HalfOpen { probes_in_flight } => TAG_HALFOPEN | (probes_in_flight as u64),
71    }
72}
73
74/// A managed RPC endpoint: transport + health tracker + atomic mirrors.
75struct ManagedEndpoint {
76    /// The underlying Alloy boxed transport for this endpoint.
77    transport: alloy::transports::BoxTransport,
78    /// Per-endpoint health state (circuit breaker + latency). Protected by Mutex
79    /// for mutations only; reads use atomic mirrors below.
80    health: Mutex<EndpointHealth>,
81    /// The endpoint URL (for diagnostics).
82    url: String,
83    // ── Lock-free mirrors (eventually consistent with Mutex state) ──
84    // Updated after every health mutation. Reads never take locks.
85    // Follows the evmap pattern: reads are lock-free, writes sync atomics.
86    /// Atomic mirror of `EndpointHealth::avg_latency_ns`.
87    /// Read by `select_latency_based` without locking.
88    atomic_latency_ns: AtomicU64,
89    /// Packed circuit state for lock-free reads.
90    /// Read by `select_*` to filter callable endpoints without locking.
91    atomic_state: AtomicU64,
92}
93
94impl ManagedEndpoint {
95    /// Record a successful request. Updates Mutex state + atomic mirrors.
96    #[inline]
97    fn record_success(&self, latency_ns: u64) {
98        let mut h = self.health.lock().unwrap();
99        let old_state = h.state();
100        h.record_success(latency_ns);
101        let new_state = h.state();
102        // Sync atomic mirrors (Relaxed is sufficient: no cross-field ordering needed,
103        // eventual consistency is acceptable for endpoint selection heuristics)
104        self.atomic_latency_ns
105            .store(h.avg_latency_ns(), Ordering::Relaxed);
106        self.atomic_state
107            .store(pack_state(new_state), Ordering::Relaxed);
108        if old_state != new_state {
109            tracing::info!(
110                endpoint = %self.url,
111                from = ?old_state,
112                to = ?new_state,
113                "circuit breaker state changed"
114            );
115        }
116    }
117
118    /// Record a failed request. Updates Mutex state + atomic mirrors.
119    #[inline]
120    fn record_failure(&self, now_ms: u64) {
121        let mut h = self.health.lock().unwrap();
122        let old_state = h.state();
123        h.record_failure(now_ms);
124        let new_state = h.state();
125        self.atomic_state
126            .store(pack_state(new_state), Ordering::Relaxed);
127        // Latency is not updated on failure (EMA stays the same).
128        if old_state != new_state {
129            tracing::warn!(
130                endpoint = %self.url,
131                from = ?old_state,
132                to = ?new_state,
133                consecutive_failures = h.status().consecutive_failures,
134                "circuit breaker state changed"
135            );
136        }
137    }
138}
139
140impl std::fmt::Debug for ManagedEndpoint {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("ManagedEndpoint")
143            .field("url", &self.url)
144            .finish_non_exhaustive()
145    }
146}
147
148/// A pool of RPC endpoints with health-aware selection.
149///
150/// Each pool owns its endpoints and round-robin counter, and operates
151/// independently. Selection logic (round-robin, latency-based) is
152/// encapsulated here — the [`Router`] delegates to the appropriate pool
153/// based on whether the request is a read or write.
154#[doc(hidden)]
155#[derive(Debug)]
156pub struct EndpointPool {
157    endpoints: Vec<ManagedEndpoint>,
158    round_robin: AtomicUsize,
159}
160
161impl EndpointPool {
162    /// Build a pool from a list of endpoint URLs.
163    pub fn from_urls(
164        urls: &[String],
165        cb_config: super::config::CircuitBreakerConfig,
166    ) -> crate::Result<Self> {
167        let endpoints = urls
168            .iter()
169            .map(|url| {
170                let parsed: url::Url = url.parse().map_err(|e: url::ParseError| {
171                    crate::PerpCityError::InvalidConfig {
172                        reason: format!("invalid endpoint URL '{url}': {e}"),
173                    }
174                })?;
175                let http = alloy::transports::http::Http::new(parsed);
176                let boxed = alloy::transports::BoxTransport::new(http);
177                Ok(ManagedEndpoint {
178                    transport: boxed,
179                    health: Mutex::new(EndpointHealth::new(cb_config)),
180                    url: url.clone(),
181                    atomic_latency_ns: AtomicU64::new(0),
182                    atomic_state: AtomicU64::new(TAG_CLOSED),
183                })
184            })
185            .collect::<crate::Result<Vec<_>>>()?;
186
187        Ok(Self {
188            endpoints,
189            round_robin: AtomicUsize::new(0),
190        })
191    }
192
193    /// True if this pool has no endpoints.
194    pub fn is_empty(&self) -> bool {
195        self.endpoints.is_empty()
196    }
197
198    /// Select the best endpoint index based on strategy.
199    ///
200    /// Returns `None` if all endpoints are unavailable.
201    pub fn select(&self, strategy: Strategy, now_ms: u64) -> Option<usize> {
202        match strategy {
203            Strategy::RoundRobin => self.select_round_robin(now_ms),
204            Strategy::LatencyBased | Strategy::Hedged { .. } => self.select_latency_based(now_ms),
205        }
206    }
207
208    /// Round-robin selection with lock-free fast path.
209    ///
210    /// Fast path: scan atomic state tags — if a Closed endpoint is found,
211    /// return it immediately without locking. Only falls back to Mutex
212    /// when all endpoints are non-Closed (rare: circuit breaker tripped).
213    fn select_round_robin(&self, now_ms: u64) -> Option<usize> {
214        let n = self.endpoints.len();
215        let start = self.round_robin.fetch_add(1, Ordering::Relaxed);
216
217        // Lock-free fast path: find first Closed endpoint in round-robin order
218        for i in 0..n {
219            let idx = (start + i) % n;
220            if self.endpoints[idx].atomic_state.load(Ordering::Relaxed) & TAG_MASK == TAG_CLOSED {
221                return Some(idx);
222            }
223        }
224
225        // Slow path: all non-Closed, try is_callable (may transition Open→HalfOpen)
226        for i in 0..n {
227            let idx = (start + i) % n;
228            let ep = &self.endpoints[idx];
229            let mut h = ep.health.lock().unwrap();
230            if h.is_callable(now_ms) {
231                ep.atomic_state
232                    .store(pack_state(h.state()), Ordering::Relaxed);
233                return Some(idx);
234            }
235            ep.atomic_state
236                .store(pack_state(h.state()), Ordering::Relaxed);
237        }
238
239        None
240    }
241
242    /// Latency-based selection with lock-free fast path.
243    ///
244    /// Fast path: scan atomic latency + state for all endpoints without locking.
245    /// Among Closed endpoints, pick the one with lowest latency. This is the
246    /// steady-state hot path — zero mutex contention.
247    ///
248    /// Slow path: no Closed endpoints available. Lock each non-Closed endpoint
249    /// and call `is_callable()` which may transition Open→HalfOpen. Only entered
250    /// when circuit breakers have tripped (error condition, rare).
251    fn select_latency_based(&self, now_ms: u64) -> Option<usize> {
252        // Lock-free fast path: find best Closed endpoint by latency
253        let mut best_idx = None;
254        let mut best_latency = u64::MAX;
255        let mut any_non_closed = false;
256
257        for (i, ep) in self.endpoints.iter().enumerate() {
258            let state = ep.atomic_state.load(Ordering::Relaxed);
259            if state & TAG_MASK == TAG_CLOSED {
260                let lat = ep.atomic_latency_ns.load(Ordering::Relaxed);
261                if lat < best_latency {
262                    best_latency = lat;
263                    best_idx = Some(i);
264                }
265            } else {
266                any_non_closed = true;
267            }
268        }
269
270        if best_idx.is_some() {
271            return best_idx;
272        }
273
274        // Slow path: no Closed endpoints, try Open/HalfOpen with locks
275        if any_non_closed {
276            for (i, ep) in self.endpoints.iter().enumerate() {
277                let mut h = ep.health.lock().unwrap();
278                if h.is_callable(now_ms) {
279                    let lat = h.avg_latency_ns();
280                    // Sync atomics after potential state transition
281                    ep.atomic_latency_ns
282                        .store(h.avg_latency_ns(), Ordering::Relaxed);
283                    ep.atomic_state
284                        .store(pack_state(h.state()), Ordering::Relaxed);
285                    if lat < best_latency {
286                        best_latency = lat;
287                        best_idx = Some(i);
288                    }
289                } else {
290                    ep.atomic_state
291                        .store(pack_state(h.state()), Ordering::Relaxed);
292                }
293            }
294        }
295
296        best_idx
297    }
298
299    /// Select up to `n` callable endpoints for hedged requests, ordered by latency.
300    ///
301    /// Uses a fixed-size stack buffer (max 16 endpoints) to avoid heap allocation
302    /// in the common case.
303    pub fn select_n(&self, n: usize, now_ms: u64) -> Vec<usize> {
304        // Stack buffer avoids Vec allocation for up to 16 endpoints
305        let mut candidates: [(usize, u64); 16] = [(0, u64::MAX); 16];
306        let mut count = 0;
307        let mut any_non_closed = false;
308
309        // Lock-free fast path: collect Closed endpoints
310        for (i, ep) in self.endpoints.iter().enumerate() {
311            if count >= 16 {
312                break;
313            }
314            let state = ep.atomic_state.load(Ordering::Relaxed);
315            if state & TAG_MASK == TAG_CLOSED {
316                let lat = ep.atomic_latency_ns.load(Ordering::Relaxed);
317                candidates[count] = (i, lat);
318                count += 1;
319            } else {
320                any_non_closed = true;
321            }
322        }
323
324        // If we have enough Closed endpoints, sort and return top-n
325        if count >= n {
326            candidates[..count].sort_unstable_by_key(|&(_, lat)| lat);
327            return candidates[..n].iter().map(|&(i, _)| i).collect();
328        }
329
330        // Slow path: not enough Closed, add recoverable Open/HalfOpen
331        if any_non_closed {
332            for (i, ep) in self.endpoints.iter().enumerate() {
333                if count >= 16 {
334                    break;
335                }
336                // Skip already-collected Closed endpoints
337                let state = ep.atomic_state.load(Ordering::Relaxed);
338                if state & TAG_MASK == TAG_CLOSED {
339                    continue;
340                }
341                let mut h = ep.health.lock().unwrap();
342                if h.is_callable(now_ms) {
343                    let lat = h.avg_latency_ns();
344                    ep.atomic_state
345                        .store(pack_state(h.state()), Ordering::Relaxed);
346                    candidates[count] = (i, lat);
347                    count += 1;
348                } else {
349                    ep.atomic_state
350                        .store(pack_state(h.state()), Ordering::Relaxed);
351                }
352            }
353        }
354
355        candidates[..count].sort_unstable_by_key(|&(_, lat)| lat);
356        candidates[..count.min(n)].iter().map(|&(i, _)| i).collect()
357    }
358
359    /// Number of endpoints in this pool.
360    pub fn len(&self) -> usize {
361        self.endpoints.len()
362    }
363
364    /// Record a successful request on endpoint `idx`.
365    pub fn record_success(&self, idx: usize, latency_ns: u64) {
366        self.endpoints[idx].record_success(latency_ns);
367    }
368
369    /// Record a failed request on endpoint `idx`.
370    pub fn record_failure(&self, idx: usize, now_ms: u64) {
371        self.endpoints[idx].record_failure(now_ms);
372    }
373
374    /// Clone the transport for endpoint `idx` (cheap, clones the inner Arc).
375    fn transport(&self, idx: usize) -> alloy::transports::BoxTransport {
376        self.endpoints[idx].transport.clone()
377    }
378
379    /// URL of endpoint `idx` (for diagnostics/tracing).
380    fn url(&self, idx: usize) -> &str {
381        &self.endpoints[idx].url
382    }
383
384    /// Number of endpoints currently in Closed (healthy) state.
385    ///
386    /// Lock-free: reads atomic state mirrors without taking any mutexes.
387    pub fn healthy_count(&self) -> usize {
388        self.endpoints
389            .iter()
390            .filter(|ep| ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK == TAG_CLOSED)
391            .count()
392    }
393
394    /// Health status of all endpoints in this pool.
395    pub fn health_status(&self) -> Vec<EndpointStatus> {
396        self.endpoints
397            .iter()
398            .map(|ep| ep.health.lock().unwrap().status())
399            .collect()
400    }
401
402    /// URLs of all endpoints in this pool.
403    pub fn endpoint_urls(&self) -> Vec<&str> {
404        self.endpoints.iter().map(|ep| ep.url.as_str()).collect()
405    }
406}
407
408/// Request router — manages endpoint pools and dispatches requests.
409///
410/// Holds three pools (shared, read, write) and routes each request to the
411/// appropriate pool based on whether it is a read or write. If the dedicated
412/// pool is empty or all its endpoints are unhealthy, the request falls back
413/// to the shared pool.
414#[derive(Debug)]
415struct Router {
416    shared: EndpointPool,
417    read: EndpointPool,
418    write: EndpointPool,
419    strategy: Strategy,
420    config: TransportConfig,
421}
422
423/// Multi-endpoint RPC transport with health-aware routing.
424///
425/// Implements `tower::Service<RequestPacket>` → Alloy `Transport` (blanket impl)
426/// → usable with `RootProvider`.
427///
428/// Clone is cheap (Arc).
429#[derive(Clone, Debug)]
430pub struct HftTransport {
431    router: Arc<Router>,
432}
433
434impl HftTransport {
435    /// Create a new transport from configuration.
436    ///
437    /// Initializes one HTTP transport per configured endpoint. Each gets its own
438    /// circuit breaker. This does NOT make any network calls — the transports
439    /// connect lazily on first request.
440    pub fn new(config: TransportConfig) -> crate::Result<Self> {
441        let cb = config.circuit_breaker;
442        let shared = EndpointPool::from_urls(&config.shared_endpoints, cb)?;
443        let read = EndpointPool::from_urls(&config.read_endpoints, cb)?;
444        let write = EndpointPool::from_urls(&config.write_endpoints, cb)?;
445
446        Ok(Self {
447            router: Arc::new(Router {
448                shared,
449                read,
450                write,
451                strategy: config.strategy,
452                config,
453            }),
454        })
455    }
456
457    /// Get the health status of all endpoints across all pools.
458    pub fn health_status(&self) -> Vec<EndpointStatus> {
459        let r = &self.router;
460        let mut out = r.shared.health_status();
461        out.extend(r.read.health_status());
462        out.extend(r.write.health_status());
463        out
464    }
465
466    /// Number of endpoints currently in Closed (healthy) state across all pools.
467    ///
468    /// Lock-free: reads atomic state mirrors without taking any mutexes.
469    pub fn healthy_count(&self) -> usize {
470        let r = &self.router;
471        r.shared.healthy_count() + r.read.healthy_count() + r.write.healthy_count()
472    }
473
474    /// URLs of all configured endpoints across all pools.
475    pub fn endpoint_urls(&self) -> Vec<&str> {
476        let r = &self.router;
477        let mut out = r.shared.endpoint_urls();
478        out.extend(r.read.endpoint_urls());
479        out.extend(r.write.endpoint_urls());
480        out
481    }
482}
483
484// ── JSON-RPC method classification ──────────────────────────────────
485
486/// Returns true if the JSON-RPC method is a write (state-changing) operation.
487///
488/// Write methods must NOT be retried — double-sends could cause double spends.
489/// All other methods are treated as reads (safe to retry/hedge).
490fn is_write_method(req: &RequestPacket) -> bool {
491    match req {
492        RequestPacket::Single(call) => is_write_method_name(call.method()),
493        RequestPacket::Batch(calls) => calls.iter().any(|c| is_write_method_name(c.method())),
494    }
495}
496
497fn is_write_method_name(method: &str) -> bool {
498    matches!(method, "eth_sendRawTransaction" | "eth_sendTransaction")
499}
500
501// ── Pool-aware routing ─────────────────────────────────────────────
502
503impl Router {
504    /// Select the pool and endpoint index for a request.
505    ///
506    /// Tries the dedicated pool first (read or write), then falls back to
507    /// the shared pool. Returns a reference to the chosen pool and the
508    /// endpoint index within that pool, or `None` if all endpoints across
509    /// both pools are unavailable.
510    fn select_for(&self, is_write: bool, now_ms: u64) -> Option<(&EndpointPool, usize)> {
511        let dedicated = if is_write { &self.write } else { &self.read };
512
513        // Try dedicated pool first
514        if !dedicated.is_empty() {
515            if let Some(idx) = dedicated.select(self.strategy, now_ms) {
516                return Some((dedicated, idx));
517            }
518        }
519
520        // Fall back to shared pool
521        self.shared
522            .select(self.strategy, now_ms)
523            .map(|idx| (&self.shared, idx))
524    }
525
526    /// Select the pool for hedged reads. Prefers the read pool if it has
527    /// healthy endpoints, otherwise falls back to shared.
528    fn read_pool(&self) -> &EndpointPool {
529        if self.read.healthy_count() > 0 {
530            &self.read
531        } else {
532            &self.shared
533        }
534    }
535
536    /// Route a request through the best endpoint, with retry.
537    ///
538    /// Reads retry on any transport or RPC error. Writes only retry when the
539    /// RPC node rejects the transaction before mempool inclusion (e.g. `-32003
540    /// insufficient funds` from a stale read replica). A rejected tx never
541    /// lands on-chain, so resending the same signed bytes is idempotent.
542    async fn route_request(
543        self: &Arc<Self>,
544        req: RequestPacket,
545    ) -> Result<ResponsePacket, TransportError> {
546        let is_write = is_write_method(&req);
547        let (max_attempts, base_delay) = if is_write {
548            (
549                1 + self.config.write_retry.max_retries,
550                self.config.write_retry.base_delay,
551            )
552        } else {
553            (
554                1 + self.config.read_retry.max_retries,
555                self.config.read_retry.base_delay,
556            )
557        };
558        let timeout = self.config.request_timeout;
559
560        // Handle hedged reads
561        if !is_write && let Strategy::Hedged { fan_out } = self.strategy {
562            let pool = self.read_pool();
563            return self.hedged_request(pool, req, fan_out, timeout).await;
564        }
565
566        // Standard path: select endpoint from appropriate pool, try with retry
567        let mut last_err = None;
568        let now_ms = now_ms();
569
570        for attempt in 0..max_attempts {
571            let Some((pool, idx)) = self.select_for(is_write, now_ms) else {
572                tracing::error!("all RPC endpoints unavailable (circuits open)");
573                return Err(TransportError::local_usage_str(
574                    "all RPC endpoints unavailable (circuits open)",
575                ));
576            };
577
578            let start = Instant::now();
579            let mut transport = pool.transport(idx);
580
581            // Apply tower timeout
582            let result = tokio::time::timeout(timeout, transport.call(req.clone())).await;
583
584            match result {
585                Ok(Ok(response)) => {
586                    // For writes, check if the response is a pre-mempool rejection
587                    // that is safe to retry (tx was never accepted).
588                    if is_write && self.config.write_retry.is_retriable(&response) {
589                        // Stale-replica rejections are not evidence of an
590                        // unhealthy endpoint — don't touch the circuit breaker.
591                        if attempt + 1 < max_attempts {
592                            tracing::warn!(
593                                attempt = attempt + 1,
594                                max_attempts,
595                                endpoint = %pool.url(idx),
596                                error_code = response.first_error_code(),
597                                "write rejected pre-mempool, retrying"
598                            );
599                        } else {
600                            tracing::warn!(
601                                endpoint = %pool.url(idx),
602                                error_code = response.first_error_code(),
603                                "write rejected after all retries exhausted"
604                            );
605                            return Ok(response);
606                        }
607                    } else {
608                        let latency_ns = start.elapsed().as_nanos() as u64;
609                        pool.record_success(idx, latency_ns);
610                        return Ok(response);
611                    }
612                }
613                Ok(Err(e)) => {
614                    pool.record_failure(idx, now_ms);
615                    tracing::warn!(
616                        attempt = attempt + 1,
617                        max_attempts,
618                        endpoint = %pool.url(idx),
619                        error = %e,
620                        is_write,
621                        "transport error"
622                    );
623                    last_err = Some(e);
624                }
625                Err(_timeout) => {
626                    pool.record_failure(idx, now_ms);
627                    tracing::warn!(
628                        attempt = attempt + 1,
629                        max_attempts,
630                        endpoint = %pool.url(idx),
631                        is_write,
632                        "request timed out"
633                    );
634                    last_err = Some(TransportError::local_usage_str("request timed out"));
635                }
636            }
637
638            // Backoff between retries (exponential: base * 2^attempt)
639            if attempt + 1 < max_attempts {
640                let delay = base_delay * 2u32.saturating_pow(attempt);
641                tokio::time::sleep(delay).await;
642            }
643        }
644
645        Err(last_err.unwrap_or_else(|| TransportError::local_usage_str("no endpoints available")))
646    }
647
648    /// Fan out a read request to multiple endpoints in a pool, return the
649    /// fastest success.
650    ///
651    /// Uses [`JoinSet`] to properly cancel losing requests via `abort_all()`,
652    /// saving RPC rate limits and network bandwidth. Health is recorded for all
653    /// endpoints that complete before cancellation.
654    async fn hedged_request(
655        &self,
656        pool: &EndpointPool,
657        req: RequestPacket,
658        fan_out: usize,
659        timeout: std::time::Duration,
660    ) -> Result<ResponsePacket, TransportError> {
661        let now_ms = now_ms();
662        let indices = pool.select_n(fan_out, now_ms);
663
664        if indices.is_empty() {
665            return Err(TransportError::local_usage_str(
666                "all RPC endpoints unavailable (circuits open)",
667            ));
668        }
669
670        // If only one endpoint is available, fall back to single request
671        if indices.len() == 1 {
672            let idx = indices[0];
673            let start = Instant::now();
674            let mut transport = pool.transport(idx);
675            let result = tokio::time::timeout(timeout, transport.call(req)).await;
676
677            return match result {
678                Ok(Ok(resp)) => {
679                    pool.record_success(idx, start.elapsed().as_nanos() as u64);
680                    Ok(resp)
681                }
682                Ok(Err(e)) => {
683                    pool.record_failure(idx, now_ms);
684                    Err(e)
685                }
686                Err(_) => {
687                    pool.record_failure(idx, now_ms);
688                    Err(TransportError::local_usage_str("request timed out"))
689                }
690            };
691        }
692
693        // Fan out to multiple endpoints using JoinSet for proper cancellation
694        let mut join_set = tokio::task::JoinSet::new();
695
696        for &idx in &indices {
697            let mut transport = pool.transport(idx);
698            let req_clone = req.clone();
699
700            join_set.spawn(async move {
701                let start = Instant::now();
702                let result = tokio::time::timeout(timeout, transport.call(req_clone)).await;
703                let result = match result {
704                    Ok(r) => r,
705                    Err(_) => Err(TransportError::local_usage_str("request timed out")),
706                };
707                (idx, result, start)
708            });
709        }
710
711        let mut last_err = None;
712
713        while let Some(join_result) = join_set.join_next().await {
714            match join_result {
715                Ok((idx, Ok(response), start)) => {
716                    let latency_ns = start.elapsed().as_nanos() as u64;
717                    pool.record_success(idx, latency_ns);
718                    // Cancel remaining in-flight requests — saves RPC rate limits.
719                    // JoinSet::drop also aborts, but explicit abort_all is clearer.
720                    join_set.abort_all();
721                    return Ok(response);
722                }
723                Ok((idx, Err(e), _start)) => {
724                    pool.record_failure(idx, now_ms);
725                    last_err = Some(e);
726                }
727                // Task was aborted (by our abort_all or JoinSet::drop) — expected
728                Err(e) if e.is_cancelled() => {}
729                // Task panicked — treat as failure
730                Err(_) => {
731                    last_err = Some(TransportError::local_usage_str(
732                        "hedged request task panicked",
733                    ));
734                }
735            }
736        }
737
738        Err(last_err
739            .unwrap_or_else(|| TransportError::local_usage_str("all hedged requests failed")))
740    }
741}
742
743/// Get current time in milliseconds. Used for health tracking timestamps.
744fn now_ms() -> u64 {
745    std::time::SystemTime::now()
746        .duration_since(std::time::UNIX_EPOCH)
747        .unwrap_or_default()
748        .as_millis() as u64
749}
750
751// ── tower::Service implementation ───────────────────────────────────
752//
753// This blanket-qualifies HftTransport as an Alloy Transport:
754//   Service<RequestPacket, Response=ResponsePacket, Error=TransportError,
755//           Future=TransportFut<'static>> + Clone + Send + Sync + 'static
756//   → impl Transport for HftTransport
757//   → BoxTransport::new(hft_transport) works
758//   → RootProvider::new(hft_transport) works
759
760impl Service<RequestPacket> for HftTransport {
761    type Response = ResponsePacket;
762    type Error = TransportError;
763    type Future = TransportFut<'static>;
764
765    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
766        // We're always ready to accept requests. Endpoint availability
767        // is checked in `call` (fail-fast on route_request).
768        Poll::Ready(Ok(()))
769    }
770
771    fn call(&mut self, req: RequestPacket) -> Self::Future {
772        let router = Arc::clone(&self.router);
773        Box::pin(async move { router.route_request(req).await })
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780    use crate::transport::config::TransportConfig;
781
782    /// Helper to create a SerializedRequest for testing.
783    fn make_request(method: &'static str, id: u64) -> alloy::rpc::json_rpc::SerializedRequest {
784        use alloy::rpc::json_rpc::{Id, Request};
785        let params = serde_json::value::RawValue::from_string("[]".to_string()).unwrap();
786        Request::new(method, Id::Number(id), params)
787            .serialize()
788            .unwrap()
789    }
790
791    #[test]
792    fn classify_write_methods() {
793        let read = RequestPacket::Single(make_request("eth_getBlockByNumber", 1));
794        assert!(!is_write_method(&read));
795
796        let write = RequestPacket::Single(make_request("eth_sendRawTransaction", 2));
797        assert!(is_write_method(&write));
798    }
799
800    #[test]
801    fn classify_batch_with_write() {
802        let batch = RequestPacket::Batch(vec![
803            make_request("eth_getBalance", 1),
804            make_request("eth_sendRawTransaction", 2),
805        ]);
806        assert!(is_write_method(&batch));
807    }
808
809    #[test]
810    fn new_transport_shared_only() {
811        let config = TransportConfig::builder()
812            .shared_endpoint("https://mainnet.base.org")
813            .shared_endpoint("https://base-rpc.publicnode.com")
814            .build()
815            .unwrap();
816        let transport = HftTransport::new(config).unwrap();
817        assert_eq!(transport.healthy_count(), 2);
818        assert_eq!(transport.endpoint_urls().len(), 2);
819    }
820
821    #[test]
822    fn new_transport_read_write_split() {
823        let config = TransportConfig::builder()
824            .shared_endpoint("https://alchemy.example.com")
825            .read_endpoint("https://public.example.com")
826            .build()
827            .unwrap();
828        let transport = HftTransport::new(config).unwrap();
829        assert_eq!(transport.healthy_count(), 2);
830        assert_eq!(transport.endpoint_urls().len(), 2);
831    }
832
833    #[test]
834    fn new_transport_invalid_url() {
835        let config = TransportConfig::builder()
836            .shared_endpoint("not a valid url")
837            .build()
838            .unwrap();
839        let result = HftTransport::new(config);
840        assert!(result.is_err());
841    }
842
843    #[test]
844    fn transport_is_clone_send_sync() {
845        fn assert_clone_send_sync<T: Clone + Send + Sync + 'static>() {}
846        assert_clone_send_sync::<HftTransport>();
847    }
848
849    #[test]
850    fn transport_implements_tower_service() {
851        fn assert_service<T: tower::Service<RequestPacket>>() {}
852        assert_service::<HftTransport>();
853    }
854
855    // ── EndpointPool selection tests ─────────────────────────────────
856
857    #[test]
858    fn pool_round_robin_selection() {
859        let pool = EndpointPool::from_urls(
860            &[
861                "https://rpc1.example.com".into(),
862                "https://rpc2.example.com".into(),
863                "https://rpc3.example.com".into(),
864            ],
865            Default::default(),
866        )
867        .unwrap();
868
869        let now = now_ms();
870        let a = pool.select(Strategy::RoundRobin, now).unwrap();
871        let b = pool.select(Strategy::RoundRobin, now).unwrap();
872        let c = pool.select(Strategy::RoundRobin, now).unwrap();
873        let d = pool.select(Strategy::RoundRobin, now).unwrap();
874
875        // Should cycle through 0, 1, 2, 0
876        assert_eq!(a, 0);
877        assert_eq!(b, 1);
878        assert_eq!(c, 2);
879        assert_eq!(d, 0);
880    }
881
882    #[test]
883    fn pool_latency_based_prefers_lower() {
884        let pool = EndpointPool::from_urls(
885            &[
886                "https://rpc1.example.com".into(),
887                "https://rpc2.example.com".into(),
888            ],
889            Default::default(),
890        )
891        .unwrap();
892
893        pool.record_success(0, 10_000_000); // 10ms
894        pool.record_success(1, 1_000_000); // 1ms
895
896        let selected = pool.select(Strategy::LatencyBased, now_ms()).unwrap();
897        assert_eq!(selected, 1); // lower latency
898    }
899
900    #[test]
901    fn pool_skips_open_circuit() {
902        let pool = EndpointPool::from_urls(
903            &[
904                "https://rpc1.example.com".into(),
905                "https://rpc2.example.com".into(),
906            ],
907            Default::default(),
908        )
909        .unwrap();
910
911        let now = now_ms();
912        // Trip circuit breaker on endpoint 0
913        pool.record_failure(0, now);
914        pool.record_failure(0, now);
915        pool.record_failure(0, now);
916
917        let selected = pool.select(Strategy::LatencyBased, now).unwrap();
918        assert_eq!(selected, 1); // only healthy endpoint
919    }
920
921    #[test]
922    fn pool_select_n_ordered_by_latency() {
923        let pool = EndpointPool::from_urls(
924            &[
925                "https://rpc1.example.com".into(),
926                "https://rpc2.example.com".into(),
927                "https://rpc3.example.com".into(),
928            ],
929            Default::default(),
930        )
931        .unwrap();
932
933        pool.record_success(0, 5_000_000);
934        pool.record_success(1, 1_000_000);
935        pool.record_success(2, 3_000_000);
936
937        let selected = pool.select_n(2, now_ms());
938        assert_eq!(selected, vec![1, 2]); // ordered by latency, take 2
939    }
940
941    #[test]
942    fn pool_all_circuits_open_returns_none() {
943        let pool = EndpointPool::from_urls(
944            &[
945                "https://rpc1.example.com".into(),
946                "https://rpc2.example.com".into(),
947            ],
948            Default::default(),
949        )
950        .unwrap();
951
952        for idx in 0..pool.len() {
953            for t in 1..=3 {
954                pool.record_failure(idx, t * 1000);
955            }
956        }
957
958        assert!(pool.select(Strategy::LatencyBased, 5000).is_none());
959    }
960
961    // ── Router fallback tests ────────────────────────────────────────
962
963    #[test]
964    fn router_read_uses_read_pool() {
965        let config = TransportConfig::builder()
966            .shared_endpoint("https://shared.example.com")
967            .read_endpoint("https://read.example.com")
968            .build()
969            .unwrap();
970        let transport = HftTransport::new(config).unwrap();
971        let router = &transport.router;
972
973        let (pool, _idx) = router.select_for(false, now_ms()).unwrap();
974        // Should select from the read pool (1 endpoint), not shared
975        assert_eq!(pool.len(), 1);
976        assert_eq!(pool.endpoint_urls()[0], "https://read.example.com");
977    }
978
979    #[test]
980    fn router_write_uses_shared_when_no_write_pool() {
981        let config = TransportConfig::builder()
982            .shared_endpoint("https://shared.example.com")
983            .read_endpoint("https://read.example.com")
984            .build()
985            .unwrap();
986        let transport = HftTransport::new(config).unwrap();
987        let router = &transport.router;
988
989        let (pool, _idx) = router.select_for(true, now_ms()).unwrap();
990        // No write pool → falls back to shared
991        assert_eq!(pool.endpoint_urls()[0], "https://shared.example.com");
992    }
993
994    #[test]
995    fn router_read_falls_back_to_shared() {
996        let config = TransportConfig::builder()
997            .shared_endpoint("https://shared.example.com")
998            .read_endpoint("https://read.example.com")
999            .build()
1000            .unwrap();
1001        let transport = HftTransport::new(config).unwrap();
1002        let router = &transport.router;
1003
1004        // Trip the read pool's circuit breaker
1005        let now = now_ms();
1006        router.read.record_failure(0, now);
1007        router.read.record_failure(0, now);
1008        router.read.record_failure(0, now);
1009
1010        // Read should fall back to shared
1011        let (pool, _idx) = router.select_for(false, now).unwrap();
1012        assert_eq!(pool.endpoint_urls()[0], "https://shared.example.com");
1013    }
1014
1015    #[test]
1016    fn router_hedged_read_falls_back_to_shared() {
1017        let config = TransportConfig::builder()
1018            .shared_endpoint("https://shared.example.com")
1019            .read_endpoint("https://read.example.com")
1020            .strategy(Strategy::Hedged { fan_out: 2 })
1021            .build()
1022            .unwrap();
1023        let transport = HftTransport::new(config).unwrap();
1024        let router = &transport.router;
1025
1026        // Trip the read pool's circuit breaker
1027        let now = now_ms();
1028        router.read.record_failure(0, now);
1029        router.read.record_failure(0, now);
1030        router.read.record_failure(0, now);
1031
1032        // read_pool() should fall back to shared when read pool is unhealthy
1033        let pool = router.read_pool();
1034        assert_eq!(pool.endpoint_urls()[0], "https://shared.example.com");
1035    }
1036
1037    // ── Lock-free verification tests ─────────────────────────────────
1038
1039    #[test]
1040    fn atomic_state_reflects_mutations() {
1041        // This test verifies the lock-free atomic mirrors stay in sync
1042        // with the Mutex-protected health state. It accesses internal
1043        // fields because it's testing the internal consistency guarantee.
1044        let pool =
1045            EndpointPool::from_urls(&["https://rpc1.example.com".into()], Default::default())
1046                .unwrap();
1047        let ep = &pool.endpoints[0];
1048
1049        // Initial state: Closed
1050        assert_eq!(
1051            ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK,
1052            TAG_CLOSED
1053        );
1054        assert_eq!(ep.atomic_latency_ns.load(Ordering::Relaxed), 0);
1055
1056        // After success: still Closed, latency updated
1057        pool.record_success(0, 5_000_000);
1058        assert_eq!(
1059            ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK,
1060            TAG_CLOSED
1061        );
1062        assert_eq!(ep.atomic_latency_ns.load(Ordering::Relaxed), 5_000_000);
1063
1064        // After 3 failures: state is Open
1065        pool.record_failure(0, 1000);
1066        pool.record_failure(0, 2000);
1067        pool.record_failure(0, 3000);
1068        assert_eq!(ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK, TAG_OPEN);
1069    }
1070
1071    #[test]
1072    fn healthy_count_across_pools() {
1073        let config = TransportConfig::builder()
1074            .shared_endpoint("https://shared1.example.com")
1075            .shared_endpoint("https://shared2.example.com")
1076            .read_endpoint("https://read.example.com")
1077            .build()
1078            .unwrap();
1079        let transport = HftTransport::new(config).unwrap();
1080
1081        assert_eq!(transport.healthy_count(), 3);
1082
1083        // Trip the read endpoint's circuit
1084        transport.router.read.record_failure(0, 1000);
1085        transport.router.read.record_failure(0, 2000);
1086        transport.router.read.record_failure(0, 3000);
1087
1088        assert_eq!(transport.healthy_count(), 2);
1089    }
1090
1091    #[test]
1092    fn pool_latency_fast_path_no_locks() {
1093        let pool = EndpointPool::from_urls(
1094            &[
1095                "https://rpc1.example.com".into(),
1096                "https://rpc2.example.com".into(),
1097            ],
1098            Default::default(),
1099        )
1100        .unwrap();
1101
1102        pool.record_success(0, 10_000_000);
1103        pool.record_success(1, 2_000_000);
1104
1105        // Multiple selections should consistently pick endpoint 1 (lower latency)
1106        for _ in 0..100 {
1107            assert_eq!(pool.select(Strategy::LatencyBased, 1000).unwrap(), 1);
1108        }
1109    }
1110
1111    #[test]
1112    fn pool_select_n_fast_path_with_enough_closed() {
1113        let pool = EndpointPool::from_urls(
1114            &[
1115                "https://rpc1.example.com".into(),
1116                "https://rpc2.example.com".into(),
1117                "https://rpc3.example.com".into(),
1118            ],
1119            Default::default(),
1120        )
1121        .unwrap();
1122
1123        pool.record_success(0, 8_000_000);
1124        pool.record_success(1, 2_000_000);
1125        pool.record_success(2, 5_000_000);
1126
1127        let selected = pool.select_n(2, 1000);
1128        assert_eq!(selected, vec![1, 2]); // ordered by latency
1129    }
1130}