Skip to main content

perpcity_sdk/transport/
provider.rs

1//! Multi-endpoint RPC transport with health-aware routing.
2//!
3//! `HftTransport` implements [`tower::Service<RequestPacket>`], which makes it
4//! a valid Alloy [`Transport`](alloy::transports::Transport) via the blanket
5//! impl in `alloy-transport`. This means it can be used directly with
6//! [`RootProvider`](alloy::providers::RootProvider) and all Alloy provider
7//! methods.
8//!
9//! # Features
10//!
11//! - **Per-endpoint circuit breaker**: automatically routes around dead endpoints
12//! - **Strategy-based selection**: round-robin, latency-based, or hedged reads
13//! - **Read/write classification**: reads are retried on failure; writes never are
14//! - **Hedged requests**: fan out reads to N endpoints, take the fastest response;
15//!   losing requests are **cancelled** via `JoinSet::abort_all` to save RPC rate limits
16//! - **Lock-free endpoint selection**: read path uses atomic mirrors, zero mutex
17//!   contention in steady state (all endpoints healthy)
18//! - **Tower integration**: composes with tower timeout and retry middleware
19//!
20//! # Example
21//!
22//! ```rust,no_run
23//! use perpcity_sdk::transport::config::TransportConfig;
24//! use perpcity_sdk::transport::provider::HftTransport;
25//! use alloy::providers::RootProvider;
26//! use alloy::transports::BoxTransport;
27//! use alloy::network::Ethereum;
28//!
29//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
30//! let config = TransportConfig::builder()
31//!     .endpoint("https://mainnet.base.org")
32//!     .endpoint("https://base-rpc.publicnode.com")
33//!     .build()?;
34//!
35//! let transport = HftTransport::new(config)?;
36//! let client = alloy::rpc::client::RpcClient::new(BoxTransport::new(transport), false);
37//! let provider: RootProvider<Ethereum> = RootProvider::new(client);
38//! # Ok(())
39//! # }
40//! ```
41
42use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
43use std::sync::{Arc, Mutex};
44use std::task::{Context, Poll};
45use std::time::Instant;
46
47use alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
48use alloy::transports::{TransportError, TransportFut};
49use tower::Service;
50
51use super::config::{Strategy, TransportConfig};
52use super::health::{CircuitState, EndpointHealth, EndpointStatus};
53
54// ── Packed atomic state constants ────────────────────────────────────
55//
56// Circuit state is packed into a single AtomicU64 for lock-free reads:
57//   bits[63:62] = state tag (00=Closed, 01=Open, 10=HalfOpen)
58//   bits[61:0]  = since_ms (Open) or probes_in_flight (HalfOpen) or 0 (Closed)
59
60const TAG_CLOSED: u64 = 0;
61const TAG_OPEN: u64 = 1 << 62;
62const TAG_HALFOPEN: u64 = 2 << 62;
63const TAG_MASK: u64 = 3 << 62;
64
65#[inline]
66fn pack_state(state: CircuitState) -> u64 {
67    match state {
68        CircuitState::Closed => TAG_CLOSED,
69        CircuitState::Open { since_ms } => TAG_OPEN | (since_ms & !TAG_MASK),
70        CircuitState::HalfOpen { probes_in_flight } => TAG_HALFOPEN | (probes_in_flight as u64),
71    }
72}
73
74/// A managed RPC endpoint: transport + health tracker + atomic mirrors.
75struct ManagedEndpoint {
76    /// The underlying Alloy boxed transport for this endpoint.
77    transport: alloy::transports::BoxTransport,
78    /// Per-endpoint health state (circuit breaker + latency). Protected by Mutex
79    /// for mutations only; reads use atomic mirrors below.
80    health: Mutex<EndpointHealth>,
81    /// The endpoint URL (for diagnostics).
82    url: String,
83    // ── Lock-free mirrors (eventually consistent with Mutex state) ──
84    // Updated after every health mutation. Reads never take locks.
85    // Follows the evmap pattern: reads are lock-free, writes sync atomics.
86    /// Atomic mirror of `EndpointHealth::avg_latency_ns`.
87    /// Read by `select_latency_based` without locking.
88    atomic_latency_ns: AtomicU64,
89    /// Packed circuit state for lock-free reads.
90    /// Read by `select_*` to filter callable endpoints without locking.
91    atomic_state: AtomicU64,
92}
93
94impl ManagedEndpoint {
95    /// Record a successful request. Updates Mutex state + atomic mirrors.
96    #[inline]
97    fn record_success(&self, latency_ns: u64) {
98        let mut h = self.health.lock().unwrap();
99        h.record_success(latency_ns);
100        // Sync atomic mirrors (Relaxed is sufficient: no cross-field ordering needed,
101        // eventual consistency is acceptable for endpoint selection heuristics)
102        self.atomic_latency_ns
103            .store(h.avg_latency_ns(), Ordering::Relaxed);
104        self.atomic_state
105            .store(pack_state(h.state()), Ordering::Relaxed);
106    }
107
108    /// Record a failed request. Updates Mutex state + atomic mirrors.
109    #[inline]
110    fn record_failure(&self, now_ms: u64) {
111        let mut h = self.health.lock().unwrap();
112        h.record_failure(now_ms);
113        self.atomic_state
114            .store(pack_state(h.state()), Ordering::Relaxed);
115        // Latency is not updated on failure (EMA stays the same).
116    }
117}
118
119impl std::fmt::Debug for ManagedEndpoint {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        f.debug_struct("ManagedEndpoint")
122            .field("url", &self.url)
123            .finish_non_exhaustive()
124    }
125}
126
127/// Shared inner state for the transport.
128#[derive(Debug)]
129struct TransportInner {
130    endpoints: Vec<ManagedEndpoint>,
131    strategy: Strategy,
132    config: TransportConfig,
133    round_robin: AtomicUsize,
134}
135
136/// Multi-endpoint RPC transport with health-aware routing.
137///
138/// Implements `tower::Service<RequestPacket>` → Alloy `Transport` (blanket impl)
139/// → usable with `RootProvider`.
140///
141/// Clone is cheap (Arc).
142#[derive(Clone, Debug)]
143pub struct HftTransport {
144    inner: Arc<TransportInner>,
145}
146
147impl HftTransport {
148    /// Create a new transport from configuration.
149    ///
150    /// Initializes one HTTP transport per configured endpoint. Each gets its own
151    /// circuit breaker. This does NOT make any network calls — the transports
152    /// connect lazily on first request.
153    pub fn new(config: TransportConfig) -> crate::Result<Self> {
154        let endpoints = config
155            .http_endpoints
156            .iter()
157            .map(|url| {
158                let parsed: url::Url = url.parse().map_err(|e: url::ParseError| {
159                    crate::PerpCityError::InvalidConfig {
160                        reason: format!("invalid endpoint URL '{url}': {e}"),
161                    }
162                })?;
163                let http = alloy::transports::http::Http::new(parsed);
164                let boxed = alloy::transports::BoxTransport::new(http);
165                Ok(ManagedEndpoint {
166                    transport: boxed,
167                    health: Mutex::new(EndpointHealth::new(config.circuit_breaker)),
168                    url: url.clone(),
169                    atomic_latency_ns: AtomicU64::new(0),
170                    atomic_state: AtomicU64::new(TAG_CLOSED),
171                })
172            })
173            .collect::<crate::Result<Vec<_>>>()?;
174
175        Ok(Self {
176            inner: Arc::new(TransportInner {
177                endpoints,
178                strategy: config.strategy,
179                config,
180                round_robin: AtomicUsize::new(0),
181            }),
182        })
183    }
184
185    /// Get the health status of all endpoints.
186    pub fn health_status(&self) -> Vec<EndpointStatus> {
187        self.inner
188            .endpoints
189            .iter()
190            .map(|ep| ep.health.lock().unwrap().status())
191            .collect()
192    }
193
194    /// Number of endpoints currently in Closed (healthy) state.
195    ///
196    /// Lock-free: reads atomic state mirrors without taking any mutexes.
197    // healthy_count: lock-free via atomics (was N mutex locks)
198    pub fn healthy_count(&self) -> usize {
199        self.inner
200            .endpoints
201            .iter()
202            .filter(|ep| ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK == TAG_CLOSED)
203            .count()
204    }
205
206    /// URLs of all configured endpoints.
207    pub fn endpoint_urls(&self) -> Vec<&str> {
208        self.inner
209            .endpoints
210            .iter()
211            .map(|ep| ep.url.as_str())
212            .collect()
213    }
214
215    // ── Benchmark accessors ─────────────────────────────────────────
216
217    /// Select the best endpoint index based on the current strategy.
218    ///
219    /// Exposed for benchmarking endpoint selection latency.
220    #[doc(hidden)]
221    pub fn select_endpoint(&self, now_ms: u64) -> Option<usize> {
222        self.inner.select_endpoint(now_ms)
223    }
224
225    /// Select up to `n` callable endpoints for hedged requests, ordered by latency.
226    ///
227    /// Exposed for benchmarking hedged fan-out overhead.
228    #[doc(hidden)]
229    pub fn select_n_endpoints(&self, n: usize, now_ms: u64) -> Vec<usize> {
230        self.inner.select_n_endpoints(n, now_ms)
231    }
232
233    /// Record a success with latency on a specific endpoint.
234    ///
235    /// Exposed for benchmarking health recording.
236    #[doc(hidden)]
237    pub fn record_success(&self, idx: usize, latency_ns: u64) {
238        self.inner.endpoints[idx].record_success(latency_ns);
239    }
240
241    /// Record a failure on a specific endpoint.
242    ///
243    /// Exposed for benchmarking health recording.
244    #[doc(hidden)]
245    pub fn record_failure(&self, idx: usize, now_ms: u64) {
246        self.inner.endpoints[idx].record_failure(now_ms);
247    }
248}
249
250// ── JSON-RPC method classification ──────────────────────────────────
251
252/// Returns true if the JSON-RPC method is a write (state-changing) operation.
253///
254/// Write methods must NOT be retried — double-sends could cause double spends.
255/// All other methods are treated as reads (safe to retry/hedge).
256fn is_write_method(req: &RequestPacket) -> bool {
257    match req {
258        RequestPacket::Single(call) => is_write_method_name(call.method()),
259        RequestPacket::Batch(calls) => calls.iter().any(|c| is_write_method_name(c.method())),
260    }
261}
262
263fn is_write_method_name(method: &str) -> bool {
264    matches!(method, "eth_sendRawTransaction" | "eth_sendTransaction")
265}
266
267// ── Endpoint selection ──────────────────────────────────────────────
268
269impl TransportInner {
270    /// Select the best endpoint index based on the current strategy.
271    ///
272    /// Returns `None` if all endpoints are unavailable (circuit open + not yet
273    /// past recovery timeout).
274    fn select_endpoint(&self, now_ms: u64) -> Option<usize> {
275        match self.strategy {
276            Strategy::RoundRobin => self.select_round_robin(now_ms),
277            Strategy::LatencyBased | Strategy::Hedged { .. } => self.select_latency_based(now_ms),
278        }
279    }
280
281    /// Round-robin selection with lock-free fast path.
282    ///
283    /// Fast path: scan atomic state tags — if a Closed endpoint is found,
284    /// return it immediately without locking. Only falls back to Mutex
285    /// when all endpoints are non-Closed (rare: circuit breaker tripped).
286    fn select_round_robin(&self, now_ms: u64) -> Option<usize> {
287        let n = self.endpoints.len();
288        let start = self.round_robin.fetch_add(1, Ordering::Relaxed);
289
290        // Lock-free fast path: find first Closed endpoint in round-robin order
291        for i in 0..n {
292            let idx = (start + i) % n;
293            if self.endpoints[idx].atomic_state.load(Ordering::Relaxed) & TAG_MASK == TAG_CLOSED {
294                return Some(idx);
295            }
296        }
297
298        // Slow path: all non-Closed, try is_callable (may transition Open→HalfOpen)
299        for i in 0..n {
300            let idx = (start + i) % n;
301            let ep = &self.endpoints[idx];
302            let mut h = ep.health.lock().unwrap();
303            if h.is_callable(now_ms) {
304                ep.atomic_state
305                    .store(pack_state(h.state()), Ordering::Relaxed);
306                return Some(idx);
307            }
308            ep.atomic_state
309                .store(pack_state(h.state()), Ordering::Relaxed);
310        }
311
312        None
313    }
314
315    /// Latency-based selection with lock-free fast path.
316    ///
317    /// Fast path: scan atomic latency + state for all endpoints without locking.
318    /// Among Closed endpoints, pick the one with lowest latency. This is the
319    /// steady-state hot path — zero mutex contention.
320    ///
321    /// Slow path: no Closed endpoints available. Lock each non-Closed endpoint
322    /// and call `is_callable()` which may transition Open→HalfOpen. Only entered
323    /// when circuit breakers have tripped (error condition, rare).
324    // select_latency_based: lock-free fast path via atomics (was N mutex locks)
325    fn select_latency_based(&self, now_ms: u64) -> Option<usize> {
326        // Lock-free fast path: find best Closed endpoint by latency
327        let mut best_idx = None;
328        let mut best_latency = u64::MAX;
329        let mut any_non_closed = false;
330
331        for (i, ep) in self.endpoints.iter().enumerate() {
332            let state = ep.atomic_state.load(Ordering::Relaxed);
333            if state & TAG_MASK == TAG_CLOSED {
334                let lat = ep.atomic_latency_ns.load(Ordering::Relaxed);
335                if lat < best_latency {
336                    best_latency = lat;
337                    best_idx = Some(i);
338                }
339            } else {
340                any_non_closed = true;
341            }
342        }
343
344        if best_idx.is_some() {
345            return best_idx;
346        }
347
348        // Slow path: no Closed endpoints, try Open/HalfOpen with locks
349        if any_non_closed {
350            for (i, ep) in self.endpoints.iter().enumerate() {
351                let mut h = ep.health.lock().unwrap();
352                if h.is_callable(now_ms) {
353                    let lat = h.avg_latency_ns();
354                    // Sync atomics after potential state transition
355                    ep.atomic_latency_ns
356                        .store(h.avg_latency_ns(), Ordering::Relaxed);
357                    ep.atomic_state
358                        .store(pack_state(h.state()), Ordering::Relaxed);
359                    if lat < best_latency {
360                        best_latency = lat;
361                        best_idx = Some(i);
362                    }
363                } else {
364                    ep.atomic_state
365                        .store(pack_state(h.state()), Ordering::Relaxed);
366                }
367            }
368        }
369
370        best_idx
371    }
372
373    /// Select up to `n` callable endpoints for hedged requests, ordered by latency.
374    ///
375    /// Uses a fixed-size stack buffer (max 16 endpoints) to avoid heap allocation
376    /// in the common case.
377    // select_n_endpoints: stack-allocated buffer + lock-free fast path
378    fn select_n_endpoints(&self, n: usize, now_ms: u64) -> Vec<usize> {
379        // Stack buffer avoids Vec allocation for up to 16 endpoints
380        let mut candidates: [(usize, u64); 16] = [(0, u64::MAX); 16];
381        let mut count = 0;
382        let mut any_non_closed = false;
383
384        // Lock-free fast path: collect Closed endpoints
385        for (i, ep) in self.endpoints.iter().enumerate() {
386            if count >= 16 {
387                break;
388            }
389            let state = ep.atomic_state.load(Ordering::Relaxed);
390            if state & TAG_MASK == TAG_CLOSED {
391                let lat = ep.atomic_latency_ns.load(Ordering::Relaxed);
392                candidates[count] = (i, lat);
393                count += 1;
394            } else {
395                any_non_closed = true;
396            }
397        }
398
399        // If we have enough Closed endpoints, sort and return top-n
400        if count >= n {
401            candidates[..count].sort_unstable_by_key(|&(_, lat)| lat);
402            return candidates[..n].iter().map(|&(i, _)| i).collect();
403        }
404
405        // Slow path: not enough Closed, add recoverable Open/HalfOpen
406        if any_non_closed {
407            for (i, ep) in self.endpoints.iter().enumerate() {
408                if count >= 16 {
409                    break;
410                }
411                // Skip already-collected Closed endpoints
412                let state = ep.atomic_state.load(Ordering::Relaxed);
413                if state & TAG_MASK == TAG_CLOSED {
414                    continue;
415                }
416                let mut h = ep.health.lock().unwrap();
417                if h.is_callable(now_ms) {
418                    let lat = h.avg_latency_ns();
419                    ep.atomic_state
420                        .store(pack_state(h.state()), Ordering::Relaxed);
421                    candidates[count] = (i, lat);
422                    count += 1;
423                } else {
424                    ep.atomic_state
425                        .store(pack_state(h.state()), Ordering::Relaxed);
426                }
427            }
428        }
429
430        candidates[..count].sort_unstable_by_key(|&(_, lat)| lat);
431        candidates[..count.min(n)].iter().map(|&(i, _)| i).collect()
432    }
433
434    /// Route a request through the best endpoint, with retry for reads.
435    async fn route_request(
436        self: &Arc<Self>,
437        req: RequestPacket,
438    ) -> Result<ResponsePacket, TransportError> {
439        let is_write = is_write_method(&req);
440        let max_attempts = if is_write {
441            1
442        } else {
443            1 + self.config.retry.max_retries
444        };
445        let timeout = self.config.request_timeout;
446
447        // Handle hedged reads
448        if !is_write && let Strategy::Hedged { fan_out } = self.strategy {
449            return self.hedged_request(req, fan_out, timeout).await;
450        }
451
452        // Standard path: select endpoint, try with retry
453        let mut last_err = None;
454        let now_ms = now_ms();
455
456        for attempt in 0..max_attempts {
457            let Some(idx) = self.select_endpoint(now_ms) else {
458                return Err(TransportError::local_usage_str(
459                    "all RPC endpoints unavailable (circuits open)",
460                ));
461            };
462
463            let start = Instant::now();
464            let mut transport = self.endpoints[idx].transport.clone();
465
466            // Apply tower timeout
467            let result = tokio::time::timeout(timeout, transport.call(req.clone())).await;
468
469            match result {
470                Ok(Ok(response)) => {
471                    let latency_ns = start.elapsed().as_nanos() as u64;
472                    self.endpoints[idx].record_success(latency_ns);
473                    return Ok(response);
474                }
475                Ok(Err(e)) => {
476                    self.endpoints[idx].record_failure(now_ms);
477                    last_err = Some(e);
478                }
479                Err(_timeout) => {
480                    self.endpoints[idx].record_failure(now_ms);
481                    last_err = Some(TransportError::local_usage_str("request timed out"));
482                }
483            }
484
485            // Backoff between retries (exponential: base * 2^attempt)
486            if attempt + 1 < max_attempts {
487                let delay = self.config.retry.base_delay * 2u32.saturating_pow(attempt);
488                tokio::time::sleep(delay).await;
489            }
490        }
491
492        Err(last_err.unwrap_or_else(|| TransportError::local_usage_str("no endpoints available")))
493    }
494
495    /// Fan out a read request to multiple endpoints, return the fastest success.
496    ///
497    /// Uses [`JoinSet`] to properly cancel losing requests via `abort_all()`,
498    /// saving RPC rate limits and network bandwidth. Health is recorded for all
499    /// endpoints that complete before cancellation.
500    // hedged_request: JoinSet + abort_all (was mpsc + leaked tasks)
501    async fn hedged_request(
502        &self,
503        req: RequestPacket,
504        fan_out: usize,
505        timeout: std::time::Duration,
506    ) -> Result<ResponsePacket, TransportError> {
507        let now_ms = now_ms();
508        let indices = self.select_n_endpoints(fan_out, now_ms);
509
510        if indices.is_empty() {
511            return Err(TransportError::local_usage_str(
512                "all RPC endpoints unavailable (circuits open)",
513            ));
514        }
515
516        // If only one endpoint is available, fall back to single request
517        if indices.len() == 1 {
518            let idx = indices[0];
519            let start = Instant::now();
520            let mut transport = self.endpoints[idx].transport.clone();
521            let result = tokio::time::timeout(timeout, transport.call(req)).await;
522
523            return match result {
524                Ok(Ok(resp)) => {
525                    self.endpoints[idx].record_success(start.elapsed().as_nanos() as u64);
526                    Ok(resp)
527                }
528                Ok(Err(e)) => {
529                    self.endpoints[idx].record_failure(now_ms);
530                    Err(e)
531                }
532                Err(_) => {
533                    self.endpoints[idx].record_failure(now_ms);
534                    Err(TransportError::local_usage_str("request timed out"))
535                }
536            };
537        }
538
539        // Fan out to multiple endpoints using JoinSet for proper cancellation
540        let mut join_set = tokio::task::JoinSet::new();
541
542        for &idx in &indices {
543            let mut transport = self.endpoints[idx].transport.clone();
544            let req_clone = req.clone();
545
546            join_set.spawn(async move {
547                let start = Instant::now();
548                let result = tokio::time::timeout(timeout, transport.call(req_clone)).await;
549                let result = match result {
550                    Ok(r) => r,
551                    Err(_) => Err(TransportError::local_usage_str("request timed out")),
552                };
553                (idx, result, start)
554            });
555        }
556
557        let mut last_err = None;
558
559        while let Some(join_result) = join_set.join_next().await {
560            match join_result {
561                Ok((idx, Ok(response), start)) => {
562                    let latency_ns = start.elapsed().as_nanos() as u64;
563                    self.endpoints[idx].record_success(latency_ns);
564                    // Cancel remaining in-flight requests — saves RPC rate limits.
565                    // JoinSet::drop also aborts, but explicit abort_all is clearer.
566                    join_set.abort_all();
567                    return Ok(response);
568                }
569                Ok((idx, Err(e), _start)) => {
570                    self.endpoints[idx].record_failure(now_ms);
571                    last_err = Some(e);
572                }
573                // Task was aborted (by our abort_all or JoinSet::drop) — expected
574                Err(e) if e.is_cancelled() => {}
575                // Task panicked — treat as failure
576                Err(_) => {
577                    last_err = Some(TransportError::local_usage_str(
578                        "hedged request task panicked",
579                    ));
580                }
581            }
582        }
583
584        Err(last_err
585            .unwrap_or_else(|| TransportError::local_usage_str("all hedged requests failed")))
586    }
587}
588
589/// Get current time in milliseconds. Used for health tracking timestamps.
590fn now_ms() -> u64 {
591    std::time::SystemTime::now()
592        .duration_since(std::time::UNIX_EPOCH)
593        .unwrap_or_default()
594        .as_millis() as u64
595}
596
597// ── tower::Service implementation ───────────────────────────────────
598//
599// This blanket-qualifies HftTransport as an Alloy Transport:
600//   Service<RequestPacket, Response=ResponsePacket, Error=TransportError,
601//           Future=TransportFut<'static>> + Clone + Send + Sync + 'static
602//   → impl Transport for HftTransport
603//   → BoxTransport::new(hft_transport) works
604//   → RootProvider::new(hft_transport) works
605
606impl Service<RequestPacket> for HftTransport {
607    type Response = ResponsePacket;
608    type Error = TransportError;
609    type Future = TransportFut<'static>;
610
611    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
612        // We're always ready to accept requests. Endpoint availability
613        // is checked in `call` (fail-fast on route_request).
614        Poll::Ready(Ok(()))
615    }
616
617    fn call(&mut self, req: RequestPacket) -> Self::Future {
618        let inner = Arc::clone(&self.inner);
619        Box::pin(async move { inner.route_request(req).await })
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use crate::transport::config::TransportConfig;
627
628    /// Helper to create a SerializedRequest for testing.
629    fn make_request(method: &'static str, id: u64) -> alloy::rpc::json_rpc::SerializedRequest {
630        use alloy::rpc::json_rpc::{Id, Request};
631        let params = serde_json::value::RawValue::from_string("[]".to_string()).unwrap();
632        Request::new(method, Id::Number(id), params)
633            .serialize()
634            .unwrap()
635    }
636
637    #[test]
638    fn classify_write_methods() {
639        let read = RequestPacket::Single(make_request("eth_getBlockByNumber", 1));
640        assert!(!is_write_method(&read));
641
642        let write = RequestPacket::Single(make_request("eth_sendRawTransaction", 2));
643        assert!(is_write_method(&write));
644    }
645
646    #[test]
647    fn classify_batch_with_write() {
648        let batch = RequestPacket::Batch(vec![
649            make_request("eth_getBalance", 1),
650            make_request("eth_sendRawTransaction", 2),
651        ]);
652        assert!(is_write_method(&batch));
653    }
654
655    #[test]
656    fn new_transport_valid_config() {
657        let config = TransportConfig::builder()
658            .endpoint("https://mainnet.base.org")
659            .endpoint("https://base-rpc.publicnode.com")
660            .build()
661            .unwrap();
662        let transport = HftTransport::new(config).unwrap();
663        assert_eq!(transport.healthy_count(), 2);
664        assert_eq!(transport.endpoint_urls().len(), 2);
665    }
666
667    #[test]
668    fn new_transport_invalid_url() {
669        let config = TransportConfig::builder()
670            .endpoint("not a valid url")
671            .build()
672            .unwrap();
673        let result = HftTransport::new(config);
674        assert!(result.is_err());
675    }
676
677    #[test]
678    fn transport_is_clone_send_sync() {
679        fn assert_clone_send_sync<T: Clone + Send + Sync + 'static>() {}
680        assert_clone_send_sync::<HftTransport>();
681    }
682
683    #[test]
684    fn transport_implements_tower_service() {
685        fn assert_service<T: tower::Service<RequestPacket>>() {}
686        assert_service::<HftTransport>();
687    }
688
689    #[test]
690    fn round_robin_selection() {
691        let config = TransportConfig::builder()
692            .endpoint("https://rpc1.example.com")
693            .endpoint("https://rpc2.example.com")
694            .endpoint("https://rpc3.example.com")
695            .strategy(crate::transport::config::Strategy::RoundRobin)
696            .build()
697            .unwrap();
698        let transport = HftTransport::new(config).unwrap();
699        let inner = &transport.inner;
700
701        let now = now_ms();
702        let a = inner.select_endpoint(now).unwrap();
703        let b = inner.select_endpoint(now).unwrap();
704        let c = inner.select_endpoint(now).unwrap();
705        let d = inner.select_endpoint(now).unwrap();
706
707        // Should cycle through 0, 1, 2, 0
708        assert_eq!(a, 0);
709        assert_eq!(b, 1);
710        assert_eq!(c, 2);
711        assert_eq!(d, 0);
712    }
713
714    #[test]
715    fn latency_based_selection_prefers_lower_latency() {
716        let config = TransportConfig::builder()
717            .endpoint("https://rpc1.example.com") // idx 0
718            .endpoint("https://rpc2.example.com") // idx 1
719            .strategy(crate::transport::config::Strategy::LatencyBased)
720            .build()
721            .unwrap();
722        let transport = HftTransport::new(config).unwrap();
723        let inner = &transport.inner;
724
725        // Use the centralized method that syncs atomics
726        inner.endpoints[0].record_success(10_000_000); // 10ms
727        inner.endpoints[1].record_success(1_000_000); // 1ms
728
729        let now = now_ms();
730        let selected = inner.select_endpoint(now).unwrap();
731        assert_eq!(selected, 1); // lower latency
732    }
733
734    #[test]
735    fn selection_skips_open_circuit() {
736        let config = TransportConfig::builder()
737            .endpoint("https://rpc1.example.com")
738            .endpoint("https://rpc2.example.com")
739            .strategy(crate::transport::config::Strategy::LatencyBased)
740            .build()
741            .unwrap();
742        let transport = HftTransport::new(config).unwrap();
743        let inner = &transport.inner;
744
745        let now = now_ms();
746        // Trip circuit breaker on endpoint 0 using centralized method
747        inner.endpoints[0].record_failure(now);
748        inner.endpoints[0].record_failure(now);
749        inner.endpoints[0].record_failure(now);
750
751        // Select at `now` — still within 30s recovery timeout
752        let selected = inner.select_endpoint(now).unwrap();
753        assert_eq!(selected, 1); // only healthy endpoint
754    }
755
756    #[test]
757    fn select_n_endpoints_ordered_by_latency() {
758        let config = TransportConfig::builder()
759            .endpoint("https://rpc1.example.com") // idx 0
760            .endpoint("https://rpc2.example.com") // idx 1
761            .endpoint("https://rpc3.example.com") // idx 2
762            .build()
763            .unwrap();
764        let transport = HftTransport::new(config).unwrap();
765        let inner = &transport.inner;
766
767        // Set latencies via centralized method (syncs atomics)
768        inner.endpoints[0].record_success(5_000_000);
769        inner.endpoints[1].record_success(1_000_000);
770        inner.endpoints[2].record_success(3_000_000);
771
772        let now = now_ms();
773        let selected = inner.select_n_endpoints(2, now);
774        assert_eq!(selected, vec![1, 2]); // ordered by latency, take 2
775    }
776
777    #[test]
778    fn all_circuits_open_returns_none() {
779        let config = TransportConfig::builder()
780            .endpoint("https://rpc1.example.com")
781            .endpoint("https://rpc2.example.com")
782            .build()
783            .unwrap();
784        let transport = HftTransport::new(config).unwrap();
785        let inner = &transport.inner;
786
787        // Trip both circuit breakers
788        for ep in &inner.endpoints {
789            for t in 1..=3 {
790                ep.record_failure(t * 1000);
791            }
792        }
793
794        let now_ms = 5000; // within recovery timeout
795        assert!(inner.select_endpoint(now_ms).is_none());
796    }
797
798    // ── Lock-free verification tests ─────────────────────────────────
799
800    #[test]
801    fn atomic_state_reflects_mutations() {
802        let config = TransportConfig::builder()
803            .endpoint("https://rpc1.example.com")
804            .build()
805            .unwrap();
806        let transport = HftTransport::new(config).unwrap();
807        let inner = &transport.inner;
808        let ep = &inner.endpoints[0];
809
810        // Initial state: Closed
811        assert_eq!(
812            ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK,
813            TAG_CLOSED
814        );
815        assert_eq!(ep.atomic_latency_ns.load(Ordering::Relaxed), 0);
816
817        // After success: still Closed, latency updated
818        ep.record_success(5_000_000);
819        assert_eq!(
820            ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK,
821            TAG_CLOSED
822        );
823        assert_eq!(ep.atomic_latency_ns.load(Ordering::Relaxed), 5_000_000);
824
825        // After 3 failures: state is Open
826        ep.record_failure(1000);
827        ep.record_failure(2000);
828        ep.record_failure(3000);
829        assert_eq!(ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK, TAG_OPEN);
830    }
831
832    #[test]
833    fn healthy_count_is_lock_free() {
834        let config = TransportConfig::builder()
835            .endpoint("https://rpc1.example.com")
836            .endpoint("https://rpc2.example.com")
837            .endpoint("https://rpc3.example.com")
838            .build()
839            .unwrap();
840        let transport = HftTransport::new(config).unwrap();
841
842        assert_eq!(transport.healthy_count(), 3);
843
844        // Trip one circuit
845        transport.record_failure(0, 1000);
846        transport.record_failure(0, 2000);
847        transport.record_failure(0, 3000);
848
849        assert_eq!(transport.healthy_count(), 2);
850    }
851
852    #[test]
853    fn latency_based_fast_path_no_locks() {
854        // Verify that with all Closed endpoints, select_latency_based
855        // uses the atomic fast path (doesn't modify state)
856        let config = TransportConfig::builder()
857            .endpoint("https://rpc1.example.com")
858            .endpoint("https://rpc2.example.com")
859            .strategy(Strategy::LatencyBased)
860            .build()
861            .unwrap();
862        let transport = HftTransport::new(config).unwrap();
863        let inner = &transport.inner;
864
865        inner.endpoints[0].record_success(10_000_000);
866        inner.endpoints[1].record_success(2_000_000);
867
868        // Multiple selections should consistently pick endpoint 1 (lower latency)
869        for _ in 0..100 {
870            assert_eq!(inner.select_endpoint(1000).unwrap(), 1);
871        }
872    }
873
874    #[test]
875    fn select_n_fast_path_with_enough_closed() {
876        let config = TransportConfig::builder()
877            .endpoint("https://rpc1.example.com")
878            .endpoint("https://rpc2.example.com")
879            .endpoint("https://rpc3.example.com")
880            .build()
881            .unwrap();
882        let transport = HftTransport::new(config).unwrap();
883        let inner = &transport.inner;
884
885        inner.endpoints[0].record_success(8_000_000);
886        inner.endpoints[1].record_success(2_000_000);
887        inner.endpoints[2].record_success(5_000_000);
888
889        // All Closed, requesting 2 — should use fast path
890        let selected = inner.select_n_endpoints(2, 1000);
891        assert_eq!(selected, vec![1, 2]); // ordered by latency
892    }
893}