use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use serde::Serialize;
use crate::error::TransportError;
use crate::request::{JsonRpcRequest, JsonRpcResponse};
use crate::transport::{HealthStatus, RpcTransport};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Region {
UsEast,
UsWest,
EuWest,
EuCentral,
AsiaSoutheast,
AsiaEast,
SouthAmerica,
Oceania,
}
impl Region {
pub fn as_str(&self) -> &'static str {
match self {
Region::UsEast => "us-east",
Region::UsWest => "us-west",
Region::EuWest => "eu-west",
Region::EuCentral => "eu-central",
Region::AsiaSoutheast => "asia-southeast",
Region::AsiaEast => "asia-east",
Region::SouthAmerica => "south-america",
Region::Oceania => "oceania",
}
}
pub fn proximity_order(&self) -> Vec<Region> {
use Region::*;
match self {
UsEast => vec![
UsWest,
EuWest,
SouthAmerica,
EuCentral,
AsiaSoutheast,
AsiaEast,
Oceania,
],
UsWest => vec![
UsEast,
AsiaEast,
AsiaSoutheast,
Oceania,
SouthAmerica,
EuWest,
EuCentral,
],
EuWest => vec![
EuCentral,
UsEast,
SouthAmerica,
AsiaSoutheast,
UsWest,
AsiaEast,
Oceania,
],
EuCentral => vec![
EuWest,
AsiaSoutheast,
UsEast,
AsiaEast,
SouthAmerica,
UsWest,
Oceania,
],
AsiaSoutheast => vec![
AsiaEast,
Oceania,
EuCentral,
EuWest,
UsWest,
UsEast,
SouthAmerica,
],
AsiaEast => vec![
AsiaSoutheast,
Oceania,
UsWest,
EuCentral,
EuWest,
UsEast,
SouthAmerica,
],
SouthAmerica => vec![
UsEast,
UsWest,
EuWest,
EuCentral,
AsiaSoutheast,
AsiaEast,
Oceania,
],
Oceania => vec![
AsiaSoutheast,
AsiaEast,
UsWest,
EuCentral,
EuWest,
UsEast,
SouthAmerica,
],
}
}
pub fn all() -> &'static [Region] {
&[
Region::UsEast,
Region::UsWest,
Region::EuWest,
Region::EuCentral,
Region::AsiaSoutheast,
Region::AsiaEast,
Region::SouthAmerica,
Region::Oceania,
]
}
}
impl fmt::Display for Region {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for Region {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let lower = s.to_lowercase();
match lower.as_str() {
"us-east" | "us_east" | "useast" => Ok(Region::UsEast),
"us-west" | "us_west" | "uswest" => Ok(Region::UsWest),
"eu-west" | "eu_west" | "euwest" => Ok(Region::EuWest),
"eu-central" | "eu_central" | "eucentral" => Ok(Region::EuCentral),
"asia-southeast" | "asia_southeast" | "asiasoutheast" => Ok(Region::AsiaSoutheast),
"asia-east" | "asia_east" | "asiaeast" => Ok(Region::AsiaEast),
"south-america" | "south_america" | "southamerica" => Ok(Region::SouthAmerica),
"oceania" => Ok(Region::Oceania),
_ => Err(format!("unknown region: {s}")),
}
}
}
#[derive(Debug)]
struct RegionalHealth {
healthy: bool,
avg_latency: Duration,
last_checked: Instant,
success_count: u64,
failure_count: u64,
latency_sum_us: u128,
}
impl RegionalHealth {
fn new() -> Self {
Self {
healthy: true,
avg_latency: Duration::ZERO,
last_checked: Instant::now(),
success_count: 0,
failure_count: 0,
latency_sum_us: 0,
}
}
fn record_success(&mut self, latency: Duration) {
self.success_count += 1;
self.latency_sum_us += latency.as_micros();
let total = self.success_count + self.failure_count;
if total > 0 {
self.avg_latency = Duration::from_micros((self.latency_sum_us / total as u128) as u64);
}
self.last_checked = Instant::now();
self.healthy = self.success_rate() > 0.5;
}
fn record_failure(&mut self) {
self.failure_count += 1;
self.last_checked = Instant::now();
self.healthy = self.success_rate() > 0.5;
}
fn success_rate(&self) -> f64 {
let total = self.success_count + self.failure_count;
if total == 0 {
return 1.0; }
self.success_count as f64 / total as f64
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RegionHealthSummary {
pub region: String,
pub provider_count: usize,
pub healthy: bool,
pub avg_latency_ms: u64,
pub success_count: u64,
pub failure_count: u64,
}
pub struct GeoRouter {
local_region: Region,
providers: HashMap<Region, Vec<Arc<dyn RpcTransport>>>,
health: Mutex<HashMap<Region, RegionalHealth>>,
cursors: Mutex<HashMap<Region, usize>>,
}
impl GeoRouter {
pub fn new(local_region: Region) -> Self {
Self {
local_region,
providers: HashMap::new(),
health: Mutex::new(HashMap::new()),
cursors: Mutex::new(HashMap::new()),
}
}
pub fn add_provider(&mut self, region: Region, transport: Arc<dyn RpcTransport>) {
self.providers.entry(region).or_default().push(transport);
self.health
.lock()
.unwrap()
.entry(region)
.or_insert_with(RegionalHealth::new);
}
pub fn add_providers(&mut self, region: Region, transports: Vec<Arc<dyn RpcTransport>>) {
for t in transports {
self.add_provider(region, t);
}
}
pub fn local_region(&self) -> Region {
self.local_region
}
pub fn provider_count(&self) -> usize {
self.providers.values().map(|v| v.len()).sum()
}
pub fn regions(&self) -> Vec<Region> {
self.providers.keys().copied().collect()
}
pub fn health_summary(&self) -> Vec<RegionHealthSummary> {
let health = self.health.lock().unwrap();
self.providers
.iter()
.map(|(region, provs)| {
let h = health.get(region);
RegionHealthSummary {
region: region.to_string(),
provider_count: provs.len(),
healthy: h.is_none_or(|h| h.healthy),
avg_latency_ms: h.map_or(0, |h| h.avg_latency.as_millis() as u64),
success_count: h.map_or(0, |h| h.success_count),
failure_count: h.map_or(0, |h| h.failure_count),
}
})
.collect()
}
fn routing_order(&self) -> Vec<Region> {
let mut order = vec![self.local_region];
order.extend(self.local_region.proximity_order());
order.retain(|r| self.providers.contains_key(r));
order
}
fn select_from_region(&self, region: &Region) -> Option<Arc<dyn RpcTransport>> {
let providers = self.providers.get(region)?;
if providers.is_empty() {
return None;
}
let mut cursors = self.cursors.lock().unwrap();
let cursor = cursors.entry(*region).or_insert(0);
let idx = *cursor % providers.len();
*cursor = cursor.wrapping_add(1);
Some(providers[idx].clone())
}
fn record_success(&self, region: &Region, latency: Duration) {
let mut health = self.health.lock().unwrap();
let entry = health.entry(*region).or_insert_with(RegionalHealth::new);
entry.record_success(latency);
}
fn record_failure(&self, region: &Region) {
let mut health = self.health.lock().unwrap();
let entry = health.entry(*region).or_insert_with(RegionalHealth::new);
entry.record_failure();
}
fn is_region_healthy(&self, region: &Region) -> bool {
let health = self.health.lock().unwrap();
match health.get(region) {
Some(h) => h.healthy,
None => true, }
}
}
#[async_trait]
impl RpcTransport for GeoRouter {
async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
for region in self.routing_order() {
if !self.is_region_healthy(®ion) {
continue;
}
if let Some(provider) = self.select_from_region(®ion) {
let start = Instant::now();
match provider.send(req.clone()).await {
Ok(resp) => {
self.record_success(®ion, start.elapsed());
return Ok(resp);
}
Err(e) if e.is_retryable() => {
self.record_failure(®ion);
continue; }
Err(e) => return Err(e), }
}
}
Err(TransportError::AllProvidersDown)
}
fn url(&self) -> &str {
"geo-router"
}
fn health(&self) -> HealthStatus {
if self.is_region_healthy(&self.local_region) {
HealthStatus::Healthy
} else if self
.routing_order()
.iter()
.any(|r| self.is_region_healthy(r))
{
HealthStatus::Degraded
} else {
HealthStatus::Unhealthy
}
}
}
pub fn detect_region_from_env() -> Option<Region> {
for var in &[
"AWS_REGION",
"AWS_DEFAULT_REGION",
"CLOUD_REGION",
"FLY_REGION",
] {
if let Ok(val) = std::env::var(var) {
if let Some(region) = parse_cloud_region(&val) {
return Some(region);
}
}
}
None
}
fn parse_cloud_region(value: &str) -> Option<Region> {
let lower = value.to_lowercase();
if lower.contains("us-east") || lower.contains("iad") || lower.contains("ewr") {
Some(Region::UsEast)
} else if lower.contains("us-west")
|| lower.contains("lax")
|| lower.contains("sjc")
|| lower.contains("sea")
{
Some(Region::UsWest)
} else if lower.contains("eu-west")
|| lower.contains("lhr")
|| lower.contains("dub")
|| lower.contains("cdg")
{
Some(Region::EuWest)
} else if lower.contains("eu-central") || lower.contains("fra") {
Some(Region::EuCentral)
} else if lower.contains("ap-southeast") || lower.contains("sin") || lower.contains("sgp") {
Some(Region::AsiaSoutheast)
} else if lower.contains("ap-northeast")
|| lower.contains("ap-east")
|| lower.contains("nrt")
|| lower.contains("hnd")
|| lower.contains("hkg")
{
Some(Region::AsiaEast)
} else if lower.contains("sa-east") || lower.contains("gru") {
Some(Region::SouthAmerica)
} else if lower.contains("ap-south") || lower.contains("syd") || lower.contains("mel") {
Some(Region::Oceania)
} else {
None
}
}
pub struct RegionalEndpoints;
impl RegionalEndpoints {
pub fn alchemy(api_key: &str) -> Vec<(Region, String)> {
vec![
(
Region::UsEast,
format!("https://eth-mainnet.g.alchemy.com/v2/{api_key}"),
),
(
Region::EuCentral,
format!("https://eth-mainnet.g.alchemy.com/v2/{api_key}"),
),
]
}
pub fn ankr() -> Vec<(Region, String)> {
vec![
(Region::UsEast, "https://rpc.ankr.com/eth".into()),
(Region::EuWest, "https://rpc.ankr.com/eth".into()),
(Region::AsiaSoutheast, "https://rpc.ankr.com/eth".into()),
]
}
pub fn public_solana() -> Vec<(Region, String)> {
vec![(Region::UsEast, "https://api.mainnet-beta.solana.com".into())]
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::request::RpcId;
use std::sync::atomic::{AtomicU64, Ordering};
struct SuccessTransport {
name: String,
call_count: AtomicU64,
}
impl SuccessTransport {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
call_count: AtomicU64::new(0),
}
}
fn calls(&self) -> u64 {
self.call_count.load(Ordering::Relaxed)
}
}
#[async_trait]
impl RpcTransport for SuccessTransport {
async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
self.call_count.fetch_add(1, Ordering::Relaxed);
Ok(JsonRpcResponse {
jsonrpc: "2.0".into(),
id: RpcId::Number(1),
result: Some(serde_json::Value::String(format!("ok-{}", self.name))),
error: None,
})
}
fn url(&self) -> &str {
&self.name
}
}
struct FailTransport {
name: String,
}
impl FailTransport {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
}
#[async_trait]
impl RpcTransport for FailTransport {
async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
Err(TransportError::Http("down".into()))
}
fn url(&self) -> &str {
&self.name
}
}
struct NonRetryableFailTransport {
name: String,
}
impl NonRetryableFailTransport {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
}
#[async_trait]
impl RpcTransport for NonRetryableFailTransport {
async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
Err(TransportError::Other("permanent failure".into()))
}
fn url(&self) -> &str {
&self.name
}
}
fn make_request() -> JsonRpcRequest {
JsonRpcRequest::new(1, "eth_blockNumber", vec![])
}
#[test]
fn region_proximity_us_east() {
let order = Region::UsEast.proximity_order();
assert_eq!(
order[0],
Region::UsWest,
"UsEast's closest should be UsWest"
);
assert_eq!(order.len(), 7, "should list all other regions");
assert!(!order.contains(&Region::UsEast));
}
#[test]
fn region_proximity_asia() {
let order = Region::AsiaEast.proximity_order();
assert_eq!(
order[0],
Region::AsiaSoutheast,
"AsiaEast's closest should be AsiaSoutheast"
);
assert!(!order.contains(&Region::AsiaEast));
}
#[test]
fn region_display() {
assert_eq!(Region::UsEast.as_str(), "us-east");
assert_eq!(Region::EuCentral.as_str(), "eu-central");
assert_eq!(Region::AsiaSoutheast.as_str(), "asia-southeast");
assert_eq!(Region::Oceania.as_str(), "oceania");
assert_eq!(Region::SouthAmerica.to_string(), "south-america");
}
#[test]
fn region_from_str() {
assert_eq!("us-east".parse::<Region>().unwrap(), Region::UsEast);
assert_eq!("us_west".parse::<Region>().unwrap(), Region::UsWest);
assert_eq!("eucentral".parse::<Region>().unwrap(), Region::EuCentral);
assert_eq!("OCEANIA".parse::<Region>().unwrap(), Region::Oceania);
assert!("mars".parse::<Region>().is_err());
}
#[test]
fn routing_order_local_first() {
let mut router = GeoRouter::new(Region::EuWest);
router.add_provider(Region::EuWest, Arc::new(SuccessTransport::new("eu1")));
router.add_provider(Region::UsEast, Arc::new(SuccessTransport::new("us1")));
let order = router.routing_order();
assert_eq!(order[0], Region::EuWest, "local region must be first");
}
#[test]
fn routing_order_excludes_unconfigured() {
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, Arc::new(SuccessTransport::new("us1")));
router.add_provider(Region::EuWest, Arc::new(SuccessTransport::new("eu1")));
let order = router.routing_order();
for r in &order {
assert!(
*r == Region::UsEast || *r == Region::EuWest,
"unexpected region {:?} in routing order",
r,
);
}
assert_eq!(order.len(), 2);
}
#[test]
fn provider_count_and_regions() {
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, Arc::new(SuccessTransport::new("us1")));
router.add_provider(Region::UsEast, Arc::new(SuccessTransport::new("us2")));
router.add_provider(Region::EuWest, Arc::new(SuccessTransport::new("eu1")));
assert_eq!(router.provider_count(), 3);
let regions = router.regions();
assert!(regions.contains(&Region::UsEast));
assert!(regions.contains(&Region::EuWest));
assert_eq!(regions.len(), 2);
}
#[tokio::test]
async fn send_routes_to_local() {
let us = Arc::new(SuccessTransport::new("us-east-1"));
let eu = Arc::new(SuccessTransport::new("eu-west-1"));
let us_ref = us.clone();
let eu_ref = eu.clone();
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, us);
router.add_provider(Region::EuWest, eu);
let resp = router.send(make_request()).await.unwrap();
let result = resp.result.unwrap();
assert_eq!(result, serde_json::Value::String("ok-us-east-1".into()));
assert_eq!(us_ref.calls(), 1, "local provider should have been called");
assert_eq!(
eu_ref.calls(),
0,
"remote provider should NOT have been called"
);
}
#[tokio::test]
async fn send_falls_back_on_failure() {
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, Arc::new(FailTransport::new("us-fail")));
router.add_provider(Region::EuWest, Arc::new(SuccessTransport::new("eu-ok")));
let resp = router.send(make_request()).await.unwrap();
let result = resp.result.unwrap();
assert_eq!(result, serde_json::Value::String("ok-eu-ok".into()));
}
#[tokio::test]
async fn all_down_returns_error() {
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, Arc::new(FailTransport::new("us-fail")));
router.add_provider(Region::EuWest, Arc::new(FailTransport::new("eu-fail")));
let err = router.send(make_request()).await.unwrap_err();
assert!(
matches!(err, TransportError::AllProvidersDown),
"expected AllProvidersDown, got {:?}",
err,
);
}
#[tokio::test]
async fn non_retryable_error_propagates_immediately() {
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(
Region::UsEast,
Arc::new(NonRetryableFailTransport::new("us-perm-fail")),
);
router.add_provider(Region::EuWest, Arc::new(SuccessTransport::new("eu-ok")));
let err = router.send(make_request()).await.unwrap_err();
assert!(
matches!(err, TransportError::Other(_)),
"expected Other error, got {:?}",
err,
);
}
#[tokio::test]
async fn round_robin_within_region() {
let a = Arc::new(SuccessTransport::new("a"));
let b = Arc::new(SuccessTransport::new("b"));
let a_ref = a.clone();
let b_ref = b.clone();
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, a);
router.add_provider(Region::UsEast, b);
for _ in 0..4 {
router.send(make_request()).await.unwrap();
}
assert_eq!(a_ref.calls(), 2, "provider A should get half the requests");
assert_eq!(b_ref.calls(), 2, "provider B should get half the requests");
}
#[test]
fn health_summary_reports() {
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, Arc::new(SuccessTransport::new("us1")));
router.add_provider(Region::UsEast, Arc::new(SuccessTransport::new("us2")));
router.add_provider(Region::EuWest, Arc::new(SuccessTransport::new("eu1")));
let summary = router.health_summary();
assert_eq!(summary.len(), 2, "should have summaries for 2 regions");
for entry in &summary {
assert!(entry.healthy);
assert!(
entry.region == "us-east" || entry.region == "eu-west",
"unexpected region: {}",
entry.region,
);
if entry.region == "us-east" {
assert_eq!(entry.provider_count, 2);
} else {
assert_eq!(entry.provider_count, 1);
}
}
}
#[tokio::test]
async fn health_status_reflects_region_state() {
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, Arc::new(SuccessTransport::new("us1")));
router.add_provider(Region::EuWest, Arc::new(SuccessTransport::new("eu1")));
assert_eq!(router.health(), HealthStatus::Healthy);
}
#[test]
fn detect_region_aws() {
let result = parse_cloud_region("us-east-1");
assert_eq!(result, Some(Region::UsEast));
let result = parse_cloud_region("eu-west-1");
assert_eq!(result, Some(Region::EuWest));
let result = parse_cloud_region("ap-southeast-1");
assert_eq!(result, Some(Region::AsiaSoutheast));
let result = parse_cloud_region("sa-east-1");
assert_eq!(result, Some(Region::SouthAmerica));
}
#[test]
fn detect_region_fly() {
assert_eq!(parse_cloud_region("ewr"), Some(Region::UsEast));
assert_eq!(parse_cloud_region("lax"), Some(Region::UsWest));
assert_eq!(parse_cloud_region("lhr"), Some(Region::EuWest));
assert_eq!(parse_cloud_region("fra"), Some(Region::EuCentral));
assert_eq!(parse_cloud_region("sin"), Some(Region::AsiaSoutheast));
assert_eq!(parse_cloud_region("nrt"), Some(Region::AsiaEast));
assert_eq!(parse_cloud_region("gru"), Some(Region::SouthAmerica));
assert_eq!(parse_cloud_region("syd"), Some(Region::Oceania));
}
#[test]
fn detect_region_unknown() {
assert_eq!(parse_cloud_region("mars-north-1"), None);
assert_eq!(parse_cloud_region(""), None);
}
#[test]
fn alchemy_endpoints() {
let eps = RegionalEndpoints::alchemy("test-key");
assert_eq!(eps.len(), 2);
assert!(eps[0].1.contains("test-key"));
}
#[test]
fn ankr_endpoints() {
let eps = RegionalEndpoints::ankr();
assert_eq!(eps.len(), 3);
for (_, url) in &eps {
assert!(url.starts_with("https://rpc.ankr.com"));
}
}
#[test]
fn regional_health_starts_healthy() {
let h = RegionalHealth::new();
assert!(h.healthy);
assert_eq!(h.success_count, 0);
assert_eq!(h.failure_count, 0);
}
#[test]
fn regional_health_success_rate() {
let mut h = RegionalHealth::new();
h.record_success(Duration::from_millis(10));
h.record_success(Duration::from_millis(20));
h.record_failure();
assert!(h.success_rate() > 0.6);
assert!(h.healthy);
}
#[test]
fn regional_health_becomes_unhealthy() {
let mut h = RegionalHealth::new();
for _ in 0..5 {
h.record_failure();
}
assert!(!h.healthy);
assert_eq!(h.failure_count, 5);
}
#[tokio::test]
async fn unhealthy_region_is_skipped() {
let mut router = GeoRouter::new(Region::UsEast);
router.add_provider(Region::UsEast, Arc::new(SuccessTransport::new("us1")));
router.add_provider(Region::EuWest, Arc::new(SuccessTransport::new("eu1")));
{
let mut health = router.health.lock().unwrap();
let entry = health
.entry(Region::UsEast)
.or_insert_with(RegionalHealth::new);
for _ in 0..10 {
entry.record_failure();
}
}
let resp = router.send(make_request()).await.unwrap();
let result = resp.result.unwrap();
assert_eq!(result, serde_json::Value::String("ok-eu1".into()));
}
#[test]
fn geo_router_url() {
let router = GeoRouter::new(Region::UsEast);
assert_eq!(router.url(), "geo-router");
}
#[test]
fn local_region_accessor() {
let router = GeoRouter::new(Region::AsiaSoutheast);
assert_eq!(router.local_region(), Region::AsiaSoutheast);
}
}