1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
43use std::sync::{Arc, Mutex};
44use std::task::{Context, Poll};
45use std::time::Instant;
46
47use alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
48use alloy::transports::{TransportError, TransportFut};
49use tower::Service;
50
51use super::config::{Strategy, TransportConfig};
52use super::health::{CircuitState, EndpointHealth, EndpointStatus};
53
54const TAG_CLOSED: u64 = 0;
61const TAG_OPEN: u64 = 1 << 62;
62const TAG_HALFOPEN: u64 = 2 << 62;
63const TAG_MASK: u64 = 3 << 62;
64
65#[inline]
66fn pack_state(state: CircuitState) -> u64 {
67 match state {
68 CircuitState::Closed => TAG_CLOSED,
69 CircuitState::Open { since_ms } => TAG_OPEN | (since_ms & !TAG_MASK),
70 CircuitState::HalfOpen { probes_in_flight } => TAG_HALFOPEN | (probes_in_flight as u64),
71 }
72}
73
74struct ManagedEndpoint {
76 transport: alloy::transports::BoxTransport,
78 health: Mutex<EndpointHealth>,
81 url: String,
83 atomic_latency_ns: AtomicU64,
89 atomic_state: AtomicU64,
92}
93
94impl ManagedEndpoint {
95 #[inline]
97 fn record_success(&self, latency_ns: u64) {
98 let mut h = self.health.lock().unwrap();
99 let old_state = h.state();
100 h.record_success(latency_ns);
101 let new_state = h.state();
102 self.atomic_latency_ns
105 .store(h.avg_latency_ns(), Ordering::Relaxed);
106 self.atomic_state
107 .store(pack_state(new_state), Ordering::Relaxed);
108 if old_state != new_state {
109 tracing::info!(
110 endpoint = %self.url,
111 from = ?old_state,
112 to = ?new_state,
113 "circuit breaker state changed"
114 );
115 }
116 }
117
118 #[inline]
120 fn record_failure(&self, now_ms: u64) {
121 let mut h = self.health.lock().unwrap();
122 let old_state = h.state();
123 h.record_failure(now_ms);
124 let new_state = h.state();
125 self.atomic_state
126 .store(pack_state(new_state), Ordering::Relaxed);
127 if old_state != new_state {
129 tracing::warn!(
130 endpoint = %self.url,
131 from = ?old_state,
132 to = ?new_state,
133 consecutive_failures = h.status().consecutive_failures,
134 "circuit breaker state changed"
135 );
136 }
137 }
138}
139
140impl std::fmt::Debug for ManagedEndpoint {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("ManagedEndpoint")
143 .field("url", &self.url)
144 .finish_non_exhaustive()
145 }
146}
147
148#[doc(hidden)]
155#[derive(Debug)]
156pub struct EndpointPool {
157 endpoints: Vec<ManagedEndpoint>,
158 round_robin: AtomicUsize,
159}
160
161impl EndpointPool {
162 pub fn from_urls(
164 urls: &[String],
165 cb_config: super::config::CircuitBreakerConfig,
166 ) -> crate::Result<Self> {
167 let endpoints = urls
168 .iter()
169 .map(|url| {
170 let parsed: url::Url = url.parse().map_err(|e: url::ParseError| {
171 crate::PerpCityError::InvalidConfig {
172 reason: format!("invalid endpoint URL '{url}': {e}"),
173 }
174 })?;
175 let http = alloy::transports::http::Http::new(parsed);
176 let boxed = alloy::transports::BoxTransport::new(http);
177 Ok(ManagedEndpoint {
178 transport: boxed,
179 health: Mutex::new(EndpointHealth::new(cb_config)),
180 url: url.clone(),
181 atomic_latency_ns: AtomicU64::new(0),
182 atomic_state: AtomicU64::new(TAG_CLOSED),
183 })
184 })
185 .collect::<crate::Result<Vec<_>>>()?;
186
187 Ok(Self {
188 endpoints,
189 round_robin: AtomicUsize::new(0),
190 })
191 }
192
193 pub fn is_empty(&self) -> bool {
195 self.endpoints.is_empty()
196 }
197
198 pub fn select(&self, strategy: Strategy, now_ms: u64) -> Option<usize> {
202 match strategy {
203 Strategy::RoundRobin => self.select_round_robin(now_ms),
204 Strategy::LatencyBased | Strategy::Hedged { .. } => self.select_latency_based(now_ms),
205 }
206 }
207
208 fn select_round_robin(&self, now_ms: u64) -> Option<usize> {
214 let n = self.endpoints.len();
215 let start = self.round_robin.fetch_add(1, Ordering::Relaxed);
216
217 for i in 0..n {
219 let idx = (start + i) % n;
220 if self.endpoints[idx].atomic_state.load(Ordering::Relaxed) & TAG_MASK == TAG_CLOSED {
221 return Some(idx);
222 }
223 }
224
225 for i in 0..n {
227 let idx = (start + i) % n;
228 let ep = &self.endpoints[idx];
229 let mut h = ep.health.lock().unwrap();
230 if h.is_callable(now_ms) {
231 ep.atomic_state
232 .store(pack_state(h.state()), Ordering::Relaxed);
233 return Some(idx);
234 }
235 ep.atomic_state
236 .store(pack_state(h.state()), Ordering::Relaxed);
237 }
238
239 None
240 }
241
242 fn select_latency_based(&self, now_ms: u64) -> Option<usize> {
252 let mut best_idx = None;
254 let mut best_latency = u64::MAX;
255 let mut any_non_closed = false;
256
257 for (i, ep) in self.endpoints.iter().enumerate() {
258 let state = ep.atomic_state.load(Ordering::Relaxed);
259 if state & TAG_MASK == TAG_CLOSED {
260 let lat = ep.atomic_latency_ns.load(Ordering::Relaxed);
261 if lat < best_latency {
262 best_latency = lat;
263 best_idx = Some(i);
264 }
265 } else {
266 any_non_closed = true;
267 }
268 }
269
270 if best_idx.is_some() {
271 return best_idx;
272 }
273
274 if any_non_closed {
276 for (i, ep) in self.endpoints.iter().enumerate() {
277 let mut h = ep.health.lock().unwrap();
278 if h.is_callable(now_ms) {
279 let lat = h.avg_latency_ns();
280 ep.atomic_latency_ns
282 .store(h.avg_latency_ns(), Ordering::Relaxed);
283 ep.atomic_state
284 .store(pack_state(h.state()), Ordering::Relaxed);
285 if lat < best_latency {
286 best_latency = lat;
287 best_idx = Some(i);
288 }
289 } else {
290 ep.atomic_state
291 .store(pack_state(h.state()), Ordering::Relaxed);
292 }
293 }
294 }
295
296 best_idx
297 }
298
299 pub fn select_n(&self, n: usize, now_ms: u64) -> Vec<usize> {
304 let mut candidates: [(usize, u64); 16] = [(0, u64::MAX); 16];
306 let mut count = 0;
307 let mut any_non_closed = false;
308
309 for (i, ep) in self.endpoints.iter().enumerate() {
311 if count >= 16 {
312 break;
313 }
314 let state = ep.atomic_state.load(Ordering::Relaxed);
315 if state & TAG_MASK == TAG_CLOSED {
316 let lat = ep.atomic_latency_ns.load(Ordering::Relaxed);
317 candidates[count] = (i, lat);
318 count += 1;
319 } else {
320 any_non_closed = true;
321 }
322 }
323
324 if count >= n {
326 candidates[..count].sort_unstable_by_key(|&(_, lat)| lat);
327 return candidates[..n].iter().map(|&(i, _)| i).collect();
328 }
329
330 if any_non_closed {
332 for (i, ep) in self.endpoints.iter().enumerate() {
333 if count >= 16 {
334 break;
335 }
336 let state = ep.atomic_state.load(Ordering::Relaxed);
338 if state & TAG_MASK == TAG_CLOSED {
339 continue;
340 }
341 let mut h = ep.health.lock().unwrap();
342 if h.is_callable(now_ms) {
343 let lat = h.avg_latency_ns();
344 ep.atomic_state
345 .store(pack_state(h.state()), Ordering::Relaxed);
346 candidates[count] = (i, lat);
347 count += 1;
348 } else {
349 ep.atomic_state
350 .store(pack_state(h.state()), Ordering::Relaxed);
351 }
352 }
353 }
354
355 candidates[..count].sort_unstable_by_key(|&(_, lat)| lat);
356 candidates[..count.min(n)].iter().map(|&(i, _)| i).collect()
357 }
358
359 pub fn len(&self) -> usize {
361 self.endpoints.len()
362 }
363
364 pub fn record_success(&self, idx: usize, latency_ns: u64) {
366 self.endpoints[idx].record_success(latency_ns);
367 }
368
369 pub fn record_failure(&self, idx: usize, now_ms: u64) {
371 self.endpoints[idx].record_failure(now_ms);
372 }
373
374 fn transport(&self, idx: usize) -> alloy::transports::BoxTransport {
376 self.endpoints[idx].transport.clone()
377 }
378
379 fn url(&self, idx: usize) -> &str {
381 &self.endpoints[idx].url
382 }
383
384 pub fn healthy_count(&self) -> usize {
388 self.endpoints
389 .iter()
390 .filter(|ep| ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK == TAG_CLOSED)
391 .count()
392 }
393
394 pub fn health_status(&self) -> Vec<EndpointStatus> {
396 self.endpoints
397 .iter()
398 .map(|ep| ep.health.lock().unwrap().status())
399 .collect()
400 }
401
402 pub fn endpoint_urls(&self) -> Vec<&str> {
404 self.endpoints.iter().map(|ep| ep.url.as_str()).collect()
405 }
406}
407
408#[derive(Debug)]
415struct Router {
416 shared: EndpointPool,
417 read: EndpointPool,
418 write: EndpointPool,
419 strategy: Strategy,
420 config: TransportConfig,
421}
422
423#[derive(Clone, Debug)]
430pub struct HftTransport {
431 router: Arc<Router>,
432}
433
434impl HftTransport {
435 pub fn new(config: TransportConfig) -> crate::Result<Self> {
441 let cb = config.circuit_breaker;
442 let shared = EndpointPool::from_urls(&config.shared_endpoints, cb)?;
443 let read = EndpointPool::from_urls(&config.read_endpoints, cb)?;
444 let write = EndpointPool::from_urls(&config.write_endpoints, cb)?;
445
446 Ok(Self {
447 router: Arc::new(Router {
448 shared,
449 read,
450 write,
451 strategy: config.strategy,
452 config,
453 }),
454 })
455 }
456
457 pub fn health_status(&self) -> Vec<EndpointStatus> {
459 let r = &self.router;
460 let mut out = r.shared.health_status();
461 out.extend(r.read.health_status());
462 out.extend(r.write.health_status());
463 out
464 }
465
466 pub fn healthy_count(&self) -> usize {
470 let r = &self.router;
471 r.shared.healthy_count() + r.read.healthy_count() + r.write.healthy_count()
472 }
473
474 pub fn endpoint_urls(&self) -> Vec<&str> {
476 let r = &self.router;
477 let mut out = r.shared.endpoint_urls();
478 out.extend(r.read.endpoint_urls());
479 out.extend(r.write.endpoint_urls());
480 out
481 }
482}
483
484fn is_write_method(req: &RequestPacket) -> bool {
491 match req {
492 RequestPacket::Single(call) => is_write_method_name(call.method()),
493 RequestPacket::Batch(calls) => calls.iter().any(|c| is_write_method_name(c.method())),
494 }
495}
496
497fn is_write_method_name(method: &str) -> bool {
498 matches!(method, "eth_sendRawTransaction" | "eth_sendTransaction")
499}
500
501impl Router {
504 fn select_for(&self, is_write: bool, now_ms: u64) -> Option<(&EndpointPool, usize)> {
511 let dedicated = if is_write { &self.write } else { &self.read };
512
513 if !dedicated.is_empty() {
515 if let Some(idx) = dedicated.select(self.strategy, now_ms) {
516 return Some((dedicated, idx));
517 }
518 }
519
520 self.shared
522 .select(self.strategy, now_ms)
523 .map(|idx| (&self.shared, idx))
524 }
525
526 fn read_pool(&self) -> &EndpointPool {
529 if self.read.healthy_count() > 0 {
530 &self.read
531 } else {
532 &self.shared
533 }
534 }
535
536 async fn route_request(
543 self: &Arc<Self>,
544 req: RequestPacket,
545 ) -> Result<ResponsePacket, TransportError> {
546 let is_write = is_write_method(&req);
547 let (max_attempts, base_delay) = if is_write {
548 (
549 1 + self.config.write_retry.max_retries,
550 self.config.write_retry.base_delay,
551 )
552 } else {
553 (
554 1 + self.config.read_retry.max_retries,
555 self.config.read_retry.base_delay,
556 )
557 };
558 let timeout = self.config.request_timeout;
559
560 if !is_write && let Strategy::Hedged { fan_out } = self.strategy {
562 let pool = self.read_pool();
563 return self.hedged_request(pool, req, fan_out, timeout).await;
564 }
565
566 let mut last_err = None;
568 let now_ms = now_ms();
569
570 for attempt in 0..max_attempts {
571 let Some((pool, idx)) = self.select_for(is_write, now_ms) else {
572 tracing::error!("all RPC endpoints unavailable (circuits open)");
573 return Err(TransportError::local_usage_str(
574 "all RPC endpoints unavailable (circuits open)",
575 ));
576 };
577
578 let start = Instant::now();
579 let mut transport = pool.transport(idx);
580
581 let result = tokio::time::timeout(timeout, transport.call(req.clone())).await;
583
584 match result {
585 Ok(Ok(response)) => {
586 if is_write && self.config.write_retry.is_retriable(&response) {
589 if attempt + 1 < max_attempts {
592 tracing::warn!(
593 attempt = attempt + 1,
594 max_attempts,
595 endpoint = %pool.url(idx),
596 error_code = response.first_error_code(),
597 "write rejected pre-mempool, retrying"
598 );
599 } else {
600 tracing::warn!(
601 endpoint = %pool.url(idx),
602 error_code = response.first_error_code(),
603 "write rejected after all retries exhausted"
604 );
605 return Ok(response);
606 }
607 } else {
608 let latency_ns = start.elapsed().as_nanos() as u64;
609 pool.record_success(idx, latency_ns);
610 return Ok(response);
611 }
612 }
613 Ok(Err(e)) => {
614 pool.record_failure(idx, now_ms);
615 tracing::warn!(
616 attempt = attempt + 1,
617 max_attempts,
618 endpoint = %pool.url(idx),
619 error = %e,
620 is_write,
621 "transport error"
622 );
623 last_err = Some(e);
624 }
625 Err(_timeout) => {
626 pool.record_failure(idx, now_ms);
627 tracing::warn!(
628 attempt = attempt + 1,
629 max_attempts,
630 endpoint = %pool.url(idx),
631 is_write,
632 "request timed out"
633 );
634 last_err = Some(TransportError::local_usage_str("request timed out"));
635 }
636 }
637
638 if attempt + 1 < max_attempts {
640 let delay = base_delay * 2u32.saturating_pow(attempt);
641 tokio::time::sleep(delay).await;
642 }
643 }
644
645 Err(last_err.unwrap_or_else(|| TransportError::local_usage_str("no endpoints available")))
646 }
647
648 async fn hedged_request(
655 &self,
656 pool: &EndpointPool,
657 req: RequestPacket,
658 fan_out: usize,
659 timeout: std::time::Duration,
660 ) -> Result<ResponsePacket, TransportError> {
661 let now_ms = now_ms();
662 let indices = pool.select_n(fan_out, now_ms);
663
664 if indices.is_empty() {
665 return Err(TransportError::local_usage_str(
666 "all RPC endpoints unavailable (circuits open)",
667 ));
668 }
669
670 if indices.len() == 1 {
672 let idx = indices[0];
673 let start = Instant::now();
674 let mut transport = pool.transport(idx);
675 let result = tokio::time::timeout(timeout, transport.call(req)).await;
676
677 return match result {
678 Ok(Ok(resp)) => {
679 pool.record_success(idx, start.elapsed().as_nanos() as u64);
680 Ok(resp)
681 }
682 Ok(Err(e)) => {
683 pool.record_failure(idx, now_ms);
684 Err(e)
685 }
686 Err(_) => {
687 pool.record_failure(idx, now_ms);
688 Err(TransportError::local_usage_str("request timed out"))
689 }
690 };
691 }
692
693 let mut join_set = tokio::task::JoinSet::new();
695
696 for &idx in &indices {
697 let mut transport = pool.transport(idx);
698 let req_clone = req.clone();
699
700 join_set.spawn(async move {
701 let start = Instant::now();
702 let result = tokio::time::timeout(timeout, transport.call(req_clone)).await;
703 let result = match result {
704 Ok(r) => r,
705 Err(_) => Err(TransportError::local_usage_str("request timed out")),
706 };
707 (idx, result, start)
708 });
709 }
710
711 let mut last_err = None;
712
713 while let Some(join_result) = join_set.join_next().await {
714 match join_result {
715 Ok((idx, Ok(response), start)) => {
716 let latency_ns = start.elapsed().as_nanos() as u64;
717 pool.record_success(idx, latency_ns);
718 join_set.abort_all();
721 return Ok(response);
722 }
723 Ok((idx, Err(e), _start)) => {
724 pool.record_failure(idx, now_ms);
725 last_err = Some(e);
726 }
727 Err(e) if e.is_cancelled() => {}
729 Err(_) => {
731 last_err = Some(TransportError::local_usage_str(
732 "hedged request task panicked",
733 ));
734 }
735 }
736 }
737
738 Err(last_err
739 .unwrap_or_else(|| TransportError::local_usage_str("all hedged requests failed")))
740 }
741}
742
743fn now_ms() -> u64 {
745 std::time::SystemTime::now()
746 .duration_since(std::time::UNIX_EPOCH)
747 .unwrap_or_default()
748 .as_millis() as u64
749}
750
751impl Service<RequestPacket> for HftTransport {
761 type Response = ResponsePacket;
762 type Error = TransportError;
763 type Future = TransportFut<'static>;
764
765 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
766 Poll::Ready(Ok(()))
769 }
770
771 fn call(&mut self, req: RequestPacket) -> Self::Future {
772 let router = Arc::clone(&self.router);
773 Box::pin(async move { router.route_request(req).await })
774 }
775}
776
777#[cfg(test)]
778mod tests {
779 use super::*;
780 use crate::transport::config::TransportConfig;
781
782 fn make_request(method: &'static str, id: u64) -> alloy::rpc::json_rpc::SerializedRequest {
784 use alloy::rpc::json_rpc::{Id, Request};
785 let params = serde_json::value::RawValue::from_string("[]".to_string()).unwrap();
786 Request::new(method, Id::Number(id), params)
787 .serialize()
788 .unwrap()
789 }
790
791 #[test]
792 fn classify_write_methods() {
793 let read = RequestPacket::Single(make_request("eth_getBlockByNumber", 1));
794 assert!(!is_write_method(&read));
795
796 let write = RequestPacket::Single(make_request("eth_sendRawTransaction", 2));
797 assert!(is_write_method(&write));
798 }
799
800 #[test]
801 fn classify_batch_with_write() {
802 let batch = RequestPacket::Batch(vec![
803 make_request("eth_getBalance", 1),
804 make_request("eth_sendRawTransaction", 2),
805 ]);
806 assert!(is_write_method(&batch));
807 }
808
809 #[test]
810 fn new_transport_shared_only() {
811 let config = TransportConfig::builder()
812 .shared_endpoint("https://mainnet.base.org")
813 .shared_endpoint("https://base-rpc.publicnode.com")
814 .build()
815 .unwrap();
816 let transport = HftTransport::new(config).unwrap();
817 assert_eq!(transport.healthy_count(), 2);
818 assert_eq!(transport.endpoint_urls().len(), 2);
819 }
820
821 #[test]
822 fn new_transport_read_write_split() {
823 let config = TransportConfig::builder()
824 .shared_endpoint("https://alchemy.example.com")
825 .read_endpoint("https://public.example.com")
826 .build()
827 .unwrap();
828 let transport = HftTransport::new(config).unwrap();
829 assert_eq!(transport.healthy_count(), 2);
830 assert_eq!(transport.endpoint_urls().len(), 2);
831 }
832
833 #[test]
834 fn new_transport_invalid_url() {
835 let config = TransportConfig::builder()
836 .shared_endpoint("not a valid url")
837 .build()
838 .unwrap();
839 let result = HftTransport::new(config);
840 assert!(result.is_err());
841 }
842
843 #[test]
844 fn transport_is_clone_send_sync() {
845 fn assert_clone_send_sync<T: Clone + Send + Sync + 'static>() {}
846 assert_clone_send_sync::<HftTransport>();
847 }
848
849 #[test]
850 fn transport_implements_tower_service() {
851 fn assert_service<T: tower::Service<RequestPacket>>() {}
852 assert_service::<HftTransport>();
853 }
854
855 #[test]
858 fn pool_round_robin_selection() {
859 let pool = EndpointPool::from_urls(
860 &[
861 "https://rpc1.example.com".into(),
862 "https://rpc2.example.com".into(),
863 "https://rpc3.example.com".into(),
864 ],
865 Default::default(),
866 )
867 .unwrap();
868
869 let now = now_ms();
870 let a = pool.select(Strategy::RoundRobin, now).unwrap();
871 let b = pool.select(Strategy::RoundRobin, now).unwrap();
872 let c = pool.select(Strategy::RoundRobin, now).unwrap();
873 let d = pool.select(Strategy::RoundRobin, now).unwrap();
874
875 assert_eq!(a, 0);
877 assert_eq!(b, 1);
878 assert_eq!(c, 2);
879 assert_eq!(d, 0);
880 }
881
882 #[test]
883 fn pool_latency_based_prefers_lower() {
884 let pool = EndpointPool::from_urls(
885 &[
886 "https://rpc1.example.com".into(),
887 "https://rpc2.example.com".into(),
888 ],
889 Default::default(),
890 )
891 .unwrap();
892
893 pool.record_success(0, 10_000_000); pool.record_success(1, 1_000_000); let selected = pool.select(Strategy::LatencyBased, now_ms()).unwrap();
897 assert_eq!(selected, 1); }
899
900 #[test]
901 fn pool_skips_open_circuit() {
902 let pool = EndpointPool::from_urls(
903 &[
904 "https://rpc1.example.com".into(),
905 "https://rpc2.example.com".into(),
906 ],
907 Default::default(),
908 )
909 .unwrap();
910
911 let now = now_ms();
912 pool.record_failure(0, now);
914 pool.record_failure(0, now);
915 pool.record_failure(0, now);
916
917 let selected = pool.select(Strategy::LatencyBased, now).unwrap();
918 assert_eq!(selected, 1); }
920
921 #[test]
922 fn pool_select_n_ordered_by_latency() {
923 let pool = EndpointPool::from_urls(
924 &[
925 "https://rpc1.example.com".into(),
926 "https://rpc2.example.com".into(),
927 "https://rpc3.example.com".into(),
928 ],
929 Default::default(),
930 )
931 .unwrap();
932
933 pool.record_success(0, 5_000_000);
934 pool.record_success(1, 1_000_000);
935 pool.record_success(2, 3_000_000);
936
937 let selected = pool.select_n(2, now_ms());
938 assert_eq!(selected, vec![1, 2]); }
940
941 #[test]
942 fn pool_all_circuits_open_returns_none() {
943 let pool = EndpointPool::from_urls(
944 &[
945 "https://rpc1.example.com".into(),
946 "https://rpc2.example.com".into(),
947 ],
948 Default::default(),
949 )
950 .unwrap();
951
952 for idx in 0..pool.len() {
953 for t in 1..=3 {
954 pool.record_failure(idx, t * 1000);
955 }
956 }
957
958 assert!(pool.select(Strategy::LatencyBased, 5000).is_none());
959 }
960
961 #[test]
964 fn router_read_uses_read_pool() {
965 let config = TransportConfig::builder()
966 .shared_endpoint("https://shared.example.com")
967 .read_endpoint("https://read.example.com")
968 .build()
969 .unwrap();
970 let transport = HftTransport::new(config).unwrap();
971 let router = &transport.router;
972
973 let (pool, _idx) = router.select_for(false, now_ms()).unwrap();
974 assert_eq!(pool.len(), 1);
976 assert_eq!(pool.endpoint_urls()[0], "https://read.example.com");
977 }
978
979 #[test]
980 fn router_write_uses_shared_when_no_write_pool() {
981 let config = TransportConfig::builder()
982 .shared_endpoint("https://shared.example.com")
983 .read_endpoint("https://read.example.com")
984 .build()
985 .unwrap();
986 let transport = HftTransport::new(config).unwrap();
987 let router = &transport.router;
988
989 let (pool, _idx) = router.select_for(true, now_ms()).unwrap();
990 assert_eq!(pool.endpoint_urls()[0], "https://shared.example.com");
992 }
993
994 #[test]
995 fn router_read_falls_back_to_shared() {
996 let config = TransportConfig::builder()
997 .shared_endpoint("https://shared.example.com")
998 .read_endpoint("https://read.example.com")
999 .build()
1000 .unwrap();
1001 let transport = HftTransport::new(config).unwrap();
1002 let router = &transport.router;
1003
1004 let now = now_ms();
1006 router.read.record_failure(0, now);
1007 router.read.record_failure(0, now);
1008 router.read.record_failure(0, now);
1009
1010 let (pool, _idx) = router.select_for(false, now).unwrap();
1012 assert_eq!(pool.endpoint_urls()[0], "https://shared.example.com");
1013 }
1014
1015 #[test]
1016 fn router_hedged_read_falls_back_to_shared() {
1017 let config = TransportConfig::builder()
1018 .shared_endpoint("https://shared.example.com")
1019 .read_endpoint("https://read.example.com")
1020 .strategy(Strategy::Hedged { fan_out: 2 })
1021 .build()
1022 .unwrap();
1023 let transport = HftTransport::new(config).unwrap();
1024 let router = &transport.router;
1025
1026 let now = now_ms();
1028 router.read.record_failure(0, now);
1029 router.read.record_failure(0, now);
1030 router.read.record_failure(0, now);
1031
1032 let pool = router.read_pool();
1034 assert_eq!(pool.endpoint_urls()[0], "https://shared.example.com");
1035 }
1036
1037 #[test]
1040 fn atomic_state_reflects_mutations() {
1041 let pool =
1045 EndpointPool::from_urls(&["https://rpc1.example.com".into()], Default::default())
1046 .unwrap();
1047 let ep = &pool.endpoints[0];
1048
1049 assert_eq!(
1051 ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK,
1052 TAG_CLOSED
1053 );
1054 assert_eq!(ep.atomic_latency_ns.load(Ordering::Relaxed), 0);
1055
1056 pool.record_success(0, 5_000_000);
1058 assert_eq!(
1059 ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK,
1060 TAG_CLOSED
1061 );
1062 assert_eq!(ep.atomic_latency_ns.load(Ordering::Relaxed), 5_000_000);
1063
1064 pool.record_failure(0, 1000);
1066 pool.record_failure(0, 2000);
1067 pool.record_failure(0, 3000);
1068 assert_eq!(ep.atomic_state.load(Ordering::Relaxed) & TAG_MASK, TAG_OPEN);
1069 }
1070
1071 #[test]
1072 fn healthy_count_across_pools() {
1073 let config = TransportConfig::builder()
1074 .shared_endpoint("https://shared1.example.com")
1075 .shared_endpoint("https://shared2.example.com")
1076 .read_endpoint("https://read.example.com")
1077 .build()
1078 .unwrap();
1079 let transport = HftTransport::new(config).unwrap();
1080
1081 assert_eq!(transport.healthy_count(), 3);
1082
1083 transport.router.read.record_failure(0, 1000);
1085 transport.router.read.record_failure(0, 2000);
1086 transport.router.read.record_failure(0, 3000);
1087
1088 assert_eq!(transport.healthy_count(), 2);
1089 }
1090
1091 #[test]
1092 fn pool_latency_fast_path_no_locks() {
1093 let pool = EndpointPool::from_urls(
1094 &[
1095 "https://rpc1.example.com".into(),
1096 "https://rpc2.example.com".into(),
1097 ],
1098 Default::default(),
1099 )
1100 .unwrap();
1101
1102 pool.record_success(0, 10_000_000);
1103 pool.record_success(1, 2_000_000);
1104
1105 for _ in 0..100 {
1107 assert_eq!(pool.select(Strategy::LatencyBased, 1000).unwrap(), 1);
1108 }
1109 }
1110
1111 #[test]
1112 fn pool_select_n_fast_path_with_enough_closed() {
1113 let pool = EndpointPool::from_urls(
1114 &[
1115 "https://rpc1.example.com".into(),
1116 "https://rpc2.example.com".into(),
1117 "https://rpc3.example.com".into(),
1118 ],
1119 Default::default(),
1120 )
1121 .unwrap();
1122
1123 pool.record_success(0, 8_000_000);
1124 pool.record_success(1, 2_000_000);
1125 pool.record_success(2, 5_000_000);
1126
1127 let selected = pool.select_n(2, 1000);
1128 assert_eq!(selected, vec![1, 2]); }
1130}