Skip to main content

rs_zero/rpc/
balancer.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use crate::discovery::ServiceInstance;
4
5/// RPC endpoint selected from discovery.
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub struct RpcEndpoint {
8    /// Logical service name.
9    pub service: String,
10    /// Stable instance id.
11    pub instance_id: String,
12    /// Endpoint URI suitable for tonic.
13    pub uri: String,
14}
15
16/// Selects RPC endpoints from instances.
17pub trait RpcEndpointSelector: Send + Sync {
18    /// Selects one endpoint.
19    fn select(&self, service: &str, instances: &[ServiceInstance]) -> Option<RpcEndpoint>;
20}
21
22/// Weighted round-robin endpoint selector.
23#[derive(Debug, Default)]
24pub struct WeightedRoundRobinBalancer {
25    cursor: AtomicUsize,
26}
27
28impl WeightedRoundRobinBalancer {
29    /// Creates a selector.
30    pub fn new() -> Self {
31        Self::default()
32    }
33}
34
35impl RpcEndpointSelector for WeightedRoundRobinBalancer {
36    fn select(&self, service: &str, instances: &[ServiceInstance]) -> Option<RpcEndpoint> {
37        let weighted = instances
38            .iter()
39            .filter(|instance| instance.healthy)
40            .flat_map(|instance| std::iter::repeat_n(instance, instance.weight.max(1) as usize))
41            .collect::<Vec<_>>();
42        if weighted.is_empty() {
43            return None;
44        }
45        let index = self.cursor.fetch_add(1, Ordering::Relaxed) % weighted.len();
46        let instance = weighted[index];
47        Some(endpoint_from_instance(service, instance))
48    }
49}
50
51/// Builds a tonic HTTP endpoint from a service instance.
52pub fn endpoint_from_instance(service: &str, instance: &ServiceInstance) -> RpcEndpoint {
53    let scheme = instance
54        .metadata
55        .get("scheme")
56        .map(String::as_str)
57        .unwrap_or("http");
58    RpcEndpoint {
59        service: service.to_string(),
60        instance_id: instance.id.clone(),
61        uri: format!("{scheme}://{}", instance.endpoint.authority()),
62    }
63}