1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
43use std::sync::{Arc, Mutex};
44use std::task::{Context, Poll};
45use std::time::Instant;
46
47use alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
48use alloy::transports::{TransportError, TransportFut};
49use tower::Service;
50
51use super::config::{Strategy, TransportConfig};
52use super::health::{CircuitState, EndpointHealth, EndpointStatus};
53
54const TAG_CLOSED: u64 = 0;
61const TAG_OPEN: u64 = 1 << 62;
62const TAG_HALFOPEN: u64 = 2 << 62;
63const TAG_MASK: u64 = 3 << 62;
64
65#[inline]
66fn pack_state(state: CircuitState) -> u64 {
67 match state {
68 CircuitState::Closed => TAG_CLOSED,
69 CircuitState::Open { since_ms } => TAG_OPEN | (since_ms & !TAG_MASK),
70 CircuitState::HalfOpen { probes_in_flight } => TAG_HALFOPEN | (probes_in_flight as u64),
71 }
72}
73
74struct ManagedEndpoint {
76 transport: alloy::transports::BoxTransport,
78 health: Mutex<EndpointHealth>,
81 url: String,
83 atomic_latency_ns: AtomicU64,
89 atomic_state: AtomicU64,
92}
93
94impl ManagedEndpoint {
95 #[inline]
97 fn record_success(&self, latency_ns: u64) {
98 let mut h = self.health.lock().unwrap();
99 h.record_success(latency_ns);
100 self.atomic_latency_ns
103 .store(h.avg_latency_ns(), Ordering::Relaxed);
104 self.atomic_state
105 .store(pack_state(h.state()), Ordering::Relaxed);
106 }
107
108 #[inline]
110 fn record_failure(&self, now_ms: u64) {
111 let mut h = self.health.lock().unwrap();
112 h.record_failure(now_ms);
113 self.atomic_state
114 .store(pack_state(h.state()), Ordering::Relaxed);
115 }
117}
118
119impl std::fmt::Debug for ManagedEndpoint {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 f.debug_struct("ManagedEndpoint")
122 .field("url", &self.url)
123 .finish_non_exhaustive()
124 }
125}
126
127#[derive(Debug)]
129struct TransportInner {
130 endpoints: Vec<ManagedEndpoint>,
131 strategy: Strategy,
132 config: TransportConfig,
133 round_robin: AtomicUsize,
134}
135
136#[derive(Clone, Debug)]
143pub struct HftTransport {
144 inner: Arc<TransportInner>,
145}
146
147impl HftTransport {
148 pub fn new(config: TransportConfig) -> crate::Result<Self> {
154 let endpoints = config
155 .http_endpoints
156 .iter()
157 .map(|url| {
158 let parsed: url::Url = url.parse().map_err(|e: url::ParseError| {
159 crate::PerpCityError::InvalidConfig {
160 reason: format!("invalid endpoint URL '{url}': {e}"),
161 }
162 })?;
163 let http = alloy::transports::http::Http::new(parsed);
164 let boxed = alloy::transports::BoxTransport::new(http);
165 Ok(ManagedEndpoint {
166 transport: boxed,
167 health: Mutex::new(EndpointHealth::new(config.circuit_breaker)),
168 url: url.clone(),
169 atomic_latency_ns: AtomicU64::new(0),
170 atomic_state: AtomicU64::new(TAG_CLOSED),
171 })
172 })
173 .collect::<crate::Result<Vec<_>>>()?;
174
175 Ok(Self {
176 inner: Arc::new(TransportInner {
177 endpoints,
178 strategy: config.strategy,
179 config,
180 round_robin: AtomicUsize::new(0),
181 }),
182 })
183 }
184
185 pub fn health_status(&self) -> Vec<EndpointStatus> {
187 self.inner
188 .endpoints
189 .iter()
190 .map(|ep| ep.health.lock().unwrap().status())
191 .collect()
192 }
193
194 pub fn healthy_count(&self) -> usize {
199 self.inner
200 .endpoints
201 .iter()
202 .filter(|ep| ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK == TAG_CLOSED)
203 .count()
204 }
205
206 pub fn endpoint_urls(&self) -> Vec<&str> {
208 self.inner
209 .endpoints
210 .iter()
211 .map(|ep| ep.url.as_str())
212 .collect()
213 }
214
215 #[doc(hidden)]
221 pub fn select_endpoint(&self, now_ms: u64) -> Option<usize> {
222 self.inner.select_endpoint(now_ms)
223 }
224
225 #[doc(hidden)]
229 pub fn select_n_endpoints(&self, n: usize, now_ms: u64) -> Vec<usize> {
230 self.inner.select_n_endpoints(n, now_ms)
231 }
232
233 #[doc(hidden)]
237 pub fn record_success(&self, idx: usize, latency_ns: u64) {
238 self.inner.endpoints[idx].record_success(latency_ns);
239 }
240
241 #[doc(hidden)]
245 pub fn record_failure(&self, idx: usize, now_ms: u64) {
246 self.inner.endpoints[idx].record_failure(now_ms);
247 }
248}
249
250fn is_write_method(req: &RequestPacket) -> bool {
257 match req {
258 RequestPacket::Single(call) => is_write_method_name(call.method()),
259 RequestPacket::Batch(calls) => calls.iter().any(|c| is_write_method_name(c.method())),
260 }
261}
262
263fn is_write_method_name(method: &str) -> bool {
264 matches!(method, "eth_sendRawTransaction" | "eth_sendTransaction")
265}
266
267impl TransportInner {
270 fn select_endpoint(&self, now_ms: u64) -> Option<usize> {
275 match self.strategy {
276 Strategy::RoundRobin => self.select_round_robin(now_ms),
277 Strategy::LatencyBased | Strategy::Hedged { .. } => self.select_latency_based(now_ms),
278 }
279 }
280
281 fn select_round_robin(&self, now_ms: u64) -> Option<usize> {
287 let n = self.endpoints.len();
288 let start = self.round_robin.fetch_add(1, Ordering::Relaxed);
289
290 for i in 0..n {
292 let idx = (start + i) % n;
293 if self.endpoints[idx].atomic_state.load(Ordering::Relaxed) & TAG_MASK == TAG_CLOSED {
294 return Some(idx);
295 }
296 }
297
298 for i in 0..n {
300 let idx = (start + i) % n;
301 let ep = &self.endpoints[idx];
302 let mut h = ep.health.lock().unwrap();
303 if h.is_callable(now_ms) {
304 ep.atomic_state
305 .store(pack_state(h.state()), Ordering::Relaxed);
306 return Some(idx);
307 }
308 ep.atomic_state
309 .store(pack_state(h.state()), Ordering::Relaxed);
310 }
311
312 None
313 }
314
315 fn select_latency_based(&self, now_ms: u64) -> Option<usize> {
326 let mut best_idx = None;
328 let mut best_latency = u64::MAX;
329 let mut any_non_closed = false;
330
331 for (i, ep) in self.endpoints.iter().enumerate() {
332 let state = ep.atomic_state.load(Ordering::Relaxed);
333 if state & TAG_MASK == TAG_CLOSED {
334 let lat = ep.atomic_latency_ns.load(Ordering::Relaxed);
335 if lat < best_latency {
336 best_latency = lat;
337 best_idx = Some(i);
338 }
339 } else {
340 any_non_closed = true;
341 }
342 }
343
344 if best_idx.is_some() {
345 return best_idx;
346 }
347
348 if any_non_closed {
350 for (i, ep) in self.endpoints.iter().enumerate() {
351 let mut h = ep.health.lock().unwrap();
352 if h.is_callable(now_ms) {
353 let lat = h.avg_latency_ns();
354 ep.atomic_latency_ns
356 .store(h.avg_latency_ns(), Ordering::Relaxed);
357 ep.atomic_state
358 .store(pack_state(h.state()), Ordering::Relaxed);
359 if lat < best_latency {
360 best_latency = lat;
361 best_idx = Some(i);
362 }
363 } else {
364 ep.atomic_state
365 .store(pack_state(h.state()), Ordering::Relaxed);
366 }
367 }
368 }
369
370 best_idx
371 }
372
373 fn select_n_endpoints(&self, n: usize, now_ms: u64) -> Vec<usize> {
379 let mut candidates: [(usize, u64); 16] = [(0, u64::MAX); 16];
381 let mut count = 0;
382 let mut any_non_closed = false;
383
384 for (i, ep) in self.endpoints.iter().enumerate() {
386 if count >= 16 {
387 break;
388 }
389 let state = ep.atomic_state.load(Ordering::Relaxed);
390 if state & TAG_MASK == TAG_CLOSED {
391 let lat = ep.atomic_latency_ns.load(Ordering::Relaxed);
392 candidates[count] = (i, lat);
393 count += 1;
394 } else {
395 any_non_closed = true;
396 }
397 }
398
399 if count >= n {
401 candidates[..count].sort_unstable_by_key(|&(_, lat)| lat);
402 return candidates[..n].iter().map(|&(i, _)| i).collect();
403 }
404
405 if any_non_closed {
407 for (i, ep) in self.endpoints.iter().enumerate() {
408 if count >= 16 {
409 break;
410 }
411 let state = ep.atomic_state.load(Ordering::Relaxed);
413 if state & TAG_MASK == TAG_CLOSED {
414 continue;
415 }
416 let mut h = ep.health.lock().unwrap();
417 if h.is_callable(now_ms) {
418 let lat = h.avg_latency_ns();
419 ep.atomic_state
420 .store(pack_state(h.state()), Ordering::Relaxed);
421 candidates[count] = (i, lat);
422 count += 1;
423 } else {
424 ep.atomic_state
425 .store(pack_state(h.state()), Ordering::Relaxed);
426 }
427 }
428 }
429
430 candidates[..count].sort_unstable_by_key(|&(_, lat)| lat);
431 candidates[..count.min(n)].iter().map(|&(i, _)| i).collect()
432 }
433
434 async fn route_request(
436 self: &Arc<Self>,
437 req: RequestPacket,
438 ) -> Result<ResponsePacket, TransportError> {
439 let is_write = is_write_method(&req);
440 let max_attempts = if is_write {
441 1
442 } else {
443 1 + self.config.retry.max_retries
444 };
445 let timeout = self.config.request_timeout;
446
447 if !is_write && let Strategy::Hedged { fan_out } = self.strategy {
449 return self.hedged_request(req, fan_out, timeout).await;
450 }
451
452 let mut last_err = None;
454 let now_ms = now_ms();
455
456 for attempt in 0..max_attempts {
457 let Some(idx) = self.select_endpoint(now_ms) else {
458 return Err(TransportError::local_usage_str(
459 "all RPC endpoints unavailable (circuits open)",
460 ));
461 };
462
463 let start = Instant::now();
464 let mut transport = self.endpoints[idx].transport.clone();
465
466 let result = tokio::time::timeout(timeout, transport.call(req.clone())).await;
468
469 match result {
470 Ok(Ok(response)) => {
471 let latency_ns = start.elapsed().as_nanos() as u64;
472 self.endpoints[idx].record_success(latency_ns);
473 return Ok(response);
474 }
475 Ok(Err(e)) => {
476 self.endpoints[idx].record_failure(now_ms);
477 last_err = Some(e);
478 }
479 Err(_timeout) => {
480 self.endpoints[idx].record_failure(now_ms);
481 last_err = Some(TransportError::local_usage_str("request timed out"));
482 }
483 }
484
485 if attempt + 1 < max_attempts {
487 let delay = self.config.retry.base_delay * 2u32.saturating_pow(attempt);
488 tokio::time::sleep(delay).await;
489 }
490 }
491
492 Err(last_err.unwrap_or_else(|| TransportError::local_usage_str("no endpoints available")))
493 }
494
495 async fn hedged_request(
502 &self,
503 req: RequestPacket,
504 fan_out: usize,
505 timeout: std::time::Duration,
506 ) -> Result<ResponsePacket, TransportError> {
507 let now_ms = now_ms();
508 let indices = self.select_n_endpoints(fan_out, now_ms);
509
510 if indices.is_empty() {
511 return Err(TransportError::local_usage_str(
512 "all RPC endpoints unavailable (circuits open)",
513 ));
514 }
515
516 if indices.len() == 1 {
518 let idx = indices[0];
519 let start = Instant::now();
520 let mut transport = self.endpoints[idx].transport.clone();
521 let result = tokio::time::timeout(timeout, transport.call(req)).await;
522
523 return match result {
524 Ok(Ok(resp)) => {
525 self.endpoints[idx].record_success(start.elapsed().as_nanos() as u64);
526 Ok(resp)
527 }
528 Ok(Err(e)) => {
529 self.endpoints[idx].record_failure(now_ms);
530 Err(e)
531 }
532 Err(_) => {
533 self.endpoints[idx].record_failure(now_ms);
534 Err(TransportError::local_usage_str("request timed out"))
535 }
536 };
537 }
538
539 let mut join_set = tokio::task::JoinSet::new();
541
542 for &idx in &indices {
543 let mut transport = self.endpoints[idx].transport.clone();
544 let req_clone = req.clone();
545
546 join_set.spawn(async move {
547 let start = Instant::now();
548 let result = tokio::time::timeout(timeout, transport.call(req_clone)).await;
549 let result = match result {
550 Ok(r) => r,
551 Err(_) => Err(TransportError::local_usage_str("request timed out")),
552 };
553 (idx, result, start)
554 });
555 }
556
557 let mut last_err = None;
558
559 while let Some(join_result) = join_set.join_next().await {
560 match join_result {
561 Ok((idx, Ok(response), start)) => {
562 let latency_ns = start.elapsed().as_nanos() as u64;
563 self.endpoints[idx].record_success(latency_ns);
564 join_set.abort_all();
567 return Ok(response);
568 }
569 Ok((idx, Err(e), _start)) => {
570 self.endpoints[idx].record_failure(now_ms);
571 last_err = Some(e);
572 }
573 Err(e) if e.is_cancelled() => {}
575 Err(_) => {
577 last_err = Some(TransportError::local_usage_str(
578 "hedged request task panicked",
579 ));
580 }
581 }
582 }
583
584 Err(last_err
585 .unwrap_or_else(|| TransportError::local_usage_str("all hedged requests failed")))
586 }
587}
588
589fn now_ms() -> u64 {
591 std::time::SystemTime::now()
592 .duration_since(std::time::UNIX_EPOCH)
593 .unwrap_or_default()
594 .as_millis() as u64
595}
596
597impl Service<RequestPacket> for HftTransport {
607 type Response = ResponsePacket;
608 type Error = TransportError;
609 type Future = TransportFut<'static>;
610
611 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
612 Poll::Ready(Ok(()))
615 }
616
617 fn call(&mut self, req: RequestPacket) -> Self::Future {
618 let inner = Arc::clone(&self.inner);
619 Box::pin(async move { inner.route_request(req).await })
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626 use crate::transport::config::TransportConfig;
627
628 fn make_request(method: &'static str, id: u64) -> alloy::rpc::json_rpc::SerializedRequest {
630 use alloy::rpc::json_rpc::{Id, Request};
631 let params = serde_json::value::RawValue::from_string("[]".to_string()).unwrap();
632 Request::new(method, Id::Number(id), params)
633 .serialize()
634 .unwrap()
635 }
636
637 #[test]
638 fn classify_write_methods() {
639 let read = RequestPacket::Single(make_request("eth_getBlockByNumber", 1));
640 assert!(!is_write_method(&read));
641
642 let write = RequestPacket::Single(make_request("eth_sendRawTransaction", 2));
643 assert!(is_write_method(&write));
644 }
645
646 #[test]
647 fn classify_batch_with_write() {
648 let batch = RequestPacket::Batch(vec![
649 make_request("eth_getBalance", 1),
650 make_request("eth_sendRawTransaction", 2),
651 ]);
652 assert!(is_write_method(&batch));
653 }
654
655 #[test]
656 fn new_transport_valid_config() {
657 let config = TransportConfig::builder()
658 .endpoint("https://mainnet.base.org")
659 .endpoint("https://base-rpc.publicnode.com")
660 .build()
661 .unwrap();
662 let transport = HftTransport::new(config).unwrap();
663 assert_eq!(transport.healthy_count(), 2);
664 assert_eq!(transport.endpoint_urls().len(), 2);
665 }
666
667 #[test]
668 fn new_transport_invalid_url() {
669 let config = TransportConfig::builder()
670 .endpoint("not a valid url")
671 .build()
672 .unwrap();
673 let result = HftTransport::new(config);
674 assert!(result.is_err());
675 }
676
677 #[test]
678 fn transport_is_clone_send_sync() {
679 fn assert_clone_send_sync<T: Clone + Send + Sync + 'static>() {}
680 assert_clone_send_sync::<HftTransport>();
681 }
682
683 #[test]
684 fn transport_implements_tower_service() {
685 fn assert_service<T: tower::Service<RequestPacket>>() {}
686 assert_service::<HftTransport>();
687 }
688
689 #[test]
690 fn round_robin_selection() {
691 let config = TransportConfig::builder()
692 .endpoint("https://rpc1.example.com")
693 .endpoint("https://rpc2.example.com")
694 .endpoint("https://rpc3.example.com")
695 .strategy(crate::transport::config::Strategy::RoundRobin)
696 .build()
697 .unwrap();
698 let transport = HftTransport::new(config).unwrap();
699 let inner = &transport.inner;
700
701 let now = now_ms();
702 let a = inner.select_endpoint(now).unwrap();
703 let b = inner.select_endpoint(now).unwrap();
704 let c = inner.select_endpoint(now).unwrap();
705 let d = inner.select_endpoint(now).unwrap();
706
707 assert_eq!(a, 0);
709 assert_eq!(b, 1);
710 assert_eq!(c, 2);
711 assert_eq!(d, 0);
712 }
713
714 #[test]
715 fn latency_based_selection_prefers_lower_latency() {
716 let config = TransportConfig::builder()
717 .endpoint("https://rpc1.example.com") .endpoint("https://rpc2.example.com") .strategy(crate::transport::config::Strategy::LatencyBased)
720 .build()
721 .unwrap();
722 let transport = HftTransport::new(config).unwrap();
723 let inner = &transport.inner;
724
725 inner.endpoints[0].record_success(10_000_000); inner.endpoints[1].record_success(1_000_000); let now = now_ms();
730 let selected = inner.select_endpoint(now).unwrap();
731 assert_eq!(selected, 1); }
733
734 #[test]
735 fn selection_skips_open_circuit() {
736 let config = TransportConfig::builder()
737 .endpoint("https://rpc1.example.com")
738 .endpoint("https://rpc2.example.com")
739 .strategy(crate::transport::config::Strategy::LatencyBased)
740 .build()
741 .unwrap();
742 let transport = HftTransport::new(config).unwrap();
743 let inner = &transport.inner;
744
745 let now = now_ms();
746 inner.endpoints[0].record_failure(now);
748 inner.endpoints[0].record_failure(now);
749 inner.endpoints[0].record_failure(now);
750
751 let selected = inner.select_endpoint(now).unwrap();
753 assert_eq!(selected, 1); }
755
756 #[test]
757 fn select_n_endpoints_ordered_by_latency() {
758 let config = TransportConfig::builder()
759 .endpoint("https://rpc1.example.com") .endpoint("https://rpc2.example.com") .endpoint("https://rpc3.example.com") .build()
763 .unwrap();
764 let transport = HftTransport::new(config).unwrap();
765 let inner = &transport.inner;
766
767 inner.endpoints[0].record_success(5_000_000);
769 inner.endpoints[1].record_success(1_000_000);
770 inner.endpoints[2].record_success(3_000_000);
771
772 let now = now_ms();
773 let selected = inner.select_n_endpoints(2, now);
774 assert_eq!(selected, vec![1, 2]); }
776
777 #[test]
778 fn all_circuits_open_returns_none() {
779 let config = TransportConfig::builder()
780 .endpoint("https://rpc1.example.com")
781 .endpoint("https://rpc2.example.com")
782 .build()
783 .unwrap();
784 let transport = HftTransport::new(config).unwrap();
785 let inner = &transport.inner;
786
787 for ep in &inner.endpoints {
789 for t in 1..=3 {
790 ep.record_failure(t * 1000);
791 }
792 }
793
794 let now_ms = 5000; assert!(inner.select_endpoint(now_ms).is_none());
796 }
797
798 #[test]
801 fn atomic_state_reflects_mutations() {
802 let config = TransportConfig::builder()
803 .endpoint("https://rpc1.example.com")
804 .build()
805 .unwrap();
806 let transport = HftTransport::new(config).unwrap();
807 let inner = &transport.inner;
808 let ep = &inner.endpoints[0];
809
810 assert_eq!(
812 ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK,
813 TAG_CLOSED
814 );
815 assert_eq!(ep.atomic_latency_ns.load(Ordering::Relaxed), 0);
816
817 ep.record_success(5_000_000);
819 assert_eq!(
820 ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK,
821 TAG_CLOSED
822 );
823 assert_eq!(ep.atomic_latency_ns.load(Ordering::Relaxed), 5_000_000);
824
825 ep.record_failure(1000);
827 ep.record_failure(2000);
828 ep.record_failure(3000);
829 assert_eq!(ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK, TAG_OPEN);
830 }
831
832 #[test]
833 fn healthy_count_is_lock_free() {
834 let config = TransportConfig::builder()
835 .endpoint("https://rpc1.example.com")
836 .endpoint("https://rpc2.example.com")
837 .endpoint("https://rpc3.example.com")
838 .build()
839 .unwrap();
840 let transport = HftTransport::new(config).unwrap();
841
842 assert_eq!(transport.healthy_count(), 3);
843
844 transport.record_failure(0, 1000);
846 transport.record_failure(0, 2000);
847 transport.record_failure(0, 3000);
848
849 assert_eq!(transport.healthy_count(), 2);
850 }
851
852 #[test]
853 fn latency_based_fast_path_no_locks() {
854 let config = TransportConfig::builder()
857 .endpoint("https://rpc1.example.com")
858 .endpoint("https://rpc2.example.com")
859 .strategy(Strategy::LatencyBased)
860 .build()
861 .unwrap();
862 let transport = HftTransport::new(config).unwrap();
863 let inner = &transport.inner;
864
865 inner.endpoints[0].record_success(10_000_000);
866 inner.endpoints[1].record_success(2_000_000);
867
868 for _ in 0..100 {
870 assert_eq!(inner.select_endpoint(1000).unwrap(), 1);
871 }
872 }
873
874 #[test]
875 fn select_n_fast_path_with_enough_closed() {
876 let config = TransportConfig::builder()
877 .endpoint("https://rpc1.example.com")
878 .endpoint("https://rpc2.example.com")
879 .endpoint("https://rpc3.example.com")
880 .build()
881 .unwrap();
882 let transport = HftTransport::new(config).unwrap();
883 let inner = &transport.inner;
884
885 inner.endpoints[0].record_success(8_000_000);
886 inner.endpoints[1].record_success(2_000_000);
887 inner.endpoints[2].record_success(5_000_000);
888
889 let selected = inner.select_n_endpoints(2, 1000);
891 assert_eq!(selected, vec![1, 2]); }
893}