use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use crate::error::TransportError;
use crate::metrics::ProviderMetrics;
use crate::policy::{CircuitBreaker, CircuitBreakerConfig};
use crate::request::{JsonRpcRequest, JsonRpcResponse};
use crate::transport::{HealthStatus, RpcTransport};
#[derive(Debug, Clone)]
pub struct ProviderPoolConfig {
pub circuit_breaker: CircuitBreakerConfig,
pub request_timeout: Duration,
}
impl Default for ProviderPoolConfig {
fn default() -> Self {
Self {
circuit_breaker: CircuitBreakerConfig::default(),
request_timeout: Duration::from_secs(30),
}
}
}
struct ProviderSlot {
transport: Arc<dyn RpcTransport>,
circuit: CircuitBreaker,
metrics: Option<Arc<ProviderMetrics>>,
}
pub struct ProviderPool {
slots: Vec<ProviderSlot>,
cursor: AtomicUsize,
config: ProviderPoolConfig,
}
impl ProviderPool {
pub fn new(transports: Vec<Arc<dyn RpcTransport>>, config: ProviderPoolConfig) -> Self {
let slots = transports
.into_iter()
.map(|t| ProviderSlot {
transport: t,
circuit: CircuitBreaker::new(config.circuit_breaker.clone()),
metrics: None,
})
.collect();
Self {
slots,
cursor: AtomicUsize::new(0),
config,
}
}
pub fn new_with_metrics(
transports: Vec<Arc<dyn RpcTransport>>,
config: ProviderPoolConfig,
) -> Self {
let slots = transports
.into_iter()
.map(|t| {
let m = Arc::new(ProviderMetrics::new(t.url()));
ProviderSlot {
transport: t,
circuit: CircuitBreaker::new(config.circuit_breaker.clone()),
metrics: Some(m),
}
})
.collect();
Self {
slots,
cursor: AtomicUsize::new(0),
config,
}
}
pub fn len(&self) -> usize {
self.slots.len()
}
pub fn is_empty(&self) -> bool {
self.slots.is_empty()
}
pub fn health_summary(&self) -> Vec<(String, HealthStatus, String)> {
self.slots
.iter()
.map(|s| {
let url = s.transport.url().to_string();
let health = s.transport.health();
let circuit = s.circuit.state().to_string();
(url, health, circuit)
})
.collect()
}
pub fn healthy_count(&self) -> usize {
self.slots.iter().filter(|s| s.circuit.is_allowed()).count()
}
pub fn metrics(&self) -> Vec<crate::metrics::MetricsSnapshot> {
self.slots
.iter()
.filter_map(|s| s.metrics.as_ref().map(|m| m.snapshot()))
.collect()
}
pub fn health_report(&self) -> Vec<serde_json::Value> {
self.slots
.iter()
.map(|s| {
let mut report = serde_json::json!({
"url": s.transport.url(),
"health": s.transport.health().to_string(),
"circuit": s.circuit.state().to_string(),
});
if let Some(ref m) = s.metrics {
let snap = m.snapshot();
let obj = report.as_object_mut().unwrap();
obj.insert(
"total_requests".into(),
serde_json::json!(snap.total_requests),
);
obj.insert(
"successful_requests".into(),
serde_json::json!(snap.successful_requests),
);
obj.insert(
"failed_requests".into(),
serde_json::json!(snap.failed_requests),
);
obj.insert("success_rate".into(), serde_json::json!(snap.success_rate));
obj.insert(
"avg_latency_ms".into(),
serde_json::json!(snap.avg_latency_ms),
);
obj.insert(
"rate_limit_hits".into(),
serde_json::json!(snap.rate_limit_hits),
);
obj.insert(
"circuit_open_count".into(),
serde_json::json!(snap.circuit_open_count),
);
}
report
})
.collect()
}
fn next_slot(&self) -> Option<&ProviderSlot> {
if self.slots.is_empty() {
return None;
}
let start = self.cursor.fetch_add(1, Ordering::Relaxed) % self.slots.len();
for i in 0..self.slots.len() {
let idx = (start + i) % self.slots.len();
let slot = &self.slots[idx];
if slot.circuit.is_allowed() {
return Some(slot);
}
}
None
}
}
#[async_trait]
impl RpcTransport for ProviderPool {
async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
let slot = self.next_slot().ok_or(TransportError::AllProvidersDown)?;
let timeout = self.config.request_timeout;
let start = std::time::Instant::now();
let result = tokio::time::timeout(timeout, slot.transport.send(req))
.await
.map_err(|_| TransportError::Timeout {
ms: timeout.as_millis() as u64,
})?;
match result {
Ok(resp) => {
slot.circuit.record_success();
if let Some(ref m) = slot.metrics {
m.record_success(start.elapsed());
}
Ok(resp)
}
Err(e) if e.is_retryable() => {
slot.circuit.record_failure();
if let Some(ref m) = slot.metrics {
m.record_failure();
}
Err(e)
}
Err(e) => {
if let Some(ref m) = slot.metrics {
m.record_failure();
}
Err(e)
}
}
}
fn health(&self) -> HealthStatus {
let healthy_count = self.slots.iter().filter(|s| s.circuit.is_allowed()).count();
match healthy_count {
0 => HealthStatus::Unhealthy,
n if n == self.slots.len() => HealthStatus::Healthy,
_ => HealthStatus::Degraded,
}
}
fn url(&self) -> &str {
"pool"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::request::RpcId;
struct MockTransport {
url: String,
should_fail: bool,
}
#[async_trait]
impl RpcTransport for MockTransport {
async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
if self.should_fail {
Err(TransportError::Http("mock error".into()))
} else {
Ok(JsonRpcResponse {
jsonrpc: "2.0".into(),
id: RpcId::Number(1),
result: Some(serde_json::Value::String("0x1".into())),
error: None,
})
}
}
fn url(&self) -> &str {
&self.url
}
}
fn mock(url: &str, fail: bool) -> Arc<dyn RpcTransport> {
Arc::new(MockTransport {
url: url.to_string(),
should_fail: fail,
})
}
#[test]
fn pool_len() {
let pool = ProviderPool::new(
vec![mock("https://a.com", false), mock("https://b.com", false)],
ProviderPoolConfig::default(),
);
assert_eq!(pool.len(), 2);
}
#[test]
fn health_all_healthy() {
let pool = ProviderPool::new(
vec![mock("https://a.com", false)],
ProviderPoolConfig::default(),
);
assert_eq!(pool.health(), HealthStatus::Healthy);
}
#[test]
fn health_all_down() {
let pool = ProviderPool::new(vec![], ProviderPoolConfig::default());
assert!(pool.next_slot().is_none());
}
}