1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use crate::discovery::ServiceInstance;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
7pub struct RpcEndpoint {
8 pub service: String,
10 pub instance_id: String,
12 pub uri: String,
14}
15
16pub trait RpcEndpointSelector: Send + Sync {
18 fn select(&self, service: &str, instances: &[ServiceInstance]) -> Option<RpcEndpoint>;
20}
21
22#[derive(Debug, Default)]
24pub struct WeightedRoundRobinBalancer {
25 cursor: AtomicUsize,
26}
27
28impl WeightedRoundRobinBalancer {
29 pub fn new() -> Self {
31 Self::default()
32 }
33}
34
35impl RpcEndpointSelector for WeightedRoundRobinBalancer {
36 fn select(&self, service: &str, instances: &[ServiceInstance]) -> Option<RpcEndpoint> {
37 let weighted = instances
38 .iter()
39 .filter(|instance| instance.healthy)
40 .flat_map(|instance| std::iter::repeat_n(instance, instance.weight.max(1) as usize))
41 .collect::<Vec<_>>();
42 if weighted.is_empty() {
43 return None;
44 }
45 let index = self.cursor.fetch_add(1, Ordering::Relaxed) % weighted.len();
46 let instance = weighted[index];
47 Some(endpoint_from_instance(service, instance))
48 }
49}
50
51pub fn endpoint_from_instance(service: &str, instance: &ServiceInstance) -> RpcEndpoint {
53 let scheme = instance
54 .metadata
55 .get("scheme")
56 .map(String::as_str)
57 .unwrap_or("http");
58 RpcEndpoint {
59 service: service.to_string(),
60 instance_id: instance.id.clone(),
61 uri: format!("{scheme}://{}", instance.endpoint.authority()),
62 }
63}