use std::sync::atomic::{AtomicUsize, Ordering};
use crate::discovery::ServiceInstance;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RpcEndpoint {
pub service: String,
pub instance_id: String,
pub uri: String,
}
pub trait RpcEndpointSelector: Send + Sync {
fn select(&self, service: &str, instances: &[ServiceInstance]) -> Option<RpcEndpoint>;
}
#[derive(Debug, Default)]
pub struct WeightedRoundRobinBalancer {
cursor: AtomicUsize,
}
impl WeightedRoundRobinBalancer {
pub fn new() -> Self {
Self::default()
}
}
impl RpcEndpointSelector for WeightedRoundRobinBalancer {
fn select(&self, service: &str, instances: &[ServiceInstance]) -> Option<RpcEndpoint> {
let weighted = instances
.iter()
.filter(|instance| instance.healthy)
.flat_map(|instance| std::iter::repeat_n(instance, instance.weight.max(1) as usize))
.collect::<Vec<_>>();
if weighted.is_empty() {
return None;
}
let index = self.cursor.fetch_add(1, Ordering::Relaxed) % weighted.len();
let instance = weighted[index];
Some(endpoint_from_instance(service, instance))
}
}
pub fn endpoint_from_instance(service: &str, instance: &ServiceInstance) -> RpcEndpoint {
let scheme = instance
.metadata
.get("scheme")
.map(String::as_str)
.unwrap_or("http");
RpcEndpoint {
service: service.to_string(),
instance_id: instance.id.clone(),
uri: format!("{scheme}://{}", instance.endpoint.authority()),
}
}