Skip to main content

k8s_operator_raft/
discovery.rs

1use std::collections::HashMap;
2use std::net::ToSocketAddrs;
3use std::time::Duration;
4
5use tracing::debug;
6
7use k8s_operator_storage::{BasicNode, NodeId};
8
9pub struct HeadlessServiceDiscovery {
10    service_name: String,
11    namespace: String,
12    port: u16,
13    refresh_interval: Duration,
14}
15
16impl HeadlessServiceDiscovery {
17    pub fn new(service_name: &str, namespace: &str, port: u16) -> Self {
18        Self {
19            service_name: service_name.to_string(),
20            namespace: namespace.to_string(),
21            port,
22            refresh_interval: Duration::from_secs(10),
23        }
24    }
25
26    pub fn with_refresh_interval(mut self, interval: Duration) -> Self {
27        self.refresh_interval = interval;
28        self
29    }
30
31    pub fn dns_name(&self) -> String {
32        format!("{}.{}.svc.cluster.local", self.service_name, self.namespace)
33    }
34
35    pub fn pod_dns_name(&self, ordinal: u64) -> String {
36        format!(
37            "{}-{}.{}.{}.svc.cluster.local",
38            self.service_name.trim_end_matches("-headless"),
39            ordinal,
40            self.service_name,
41            self.namespace
42        )
43    }
44
45    pub fn discover_by_ordinal(&self, num_replicas: u64) -> HashMap<NodeId, BasicNode> {
46        let mut peers = HashMap::new();
47
48        for ordinal in 0..num_replicas {
49            let dns_name = self.pod_dns_name(ordinal);
50            let addr = format!("{}:{}", dns_name, self.port);
51
52            if let Ok(mut addrs) = addr.to_socket_addrs() {
53                if let Some(socket_addr) = addrs.next() {
54                    let node = BasicNode {
55                        addr: format!("{}:{}", socket_addr.ip(), self.port),
56                    };
57                    peers.insert(ordinal, node);
58                    debug!("Discovered peer {}: {}", ordinal, dns_name);
59                }
60            } else {
61                debug!("Could not resolve peer {}: {}", ordinal, dns_name);
62            }
63        }
64
65        peers
66    }
67}
68
69pub struct StaticDiscovery {
70    peers: HashMap<NodeId, BasicNode>,
71}
72
73impl StaticDiscovery {
74    pub fn new() -> Self {
75        Self {
76            peers: HashMap::new(),
77        }
78    }
79
80    pub fn add_peer(mut self, node_id: NodeId, addr: impl Into<String>) -> Self {
81        self.peers.insert(
82            node_id,
83            BasicNode {
84                addr: addr.into(),
85            },
86        );
87        self
88    }
89
90    pub fn from_addresses(addresses: Vec<String>) -> Self {
91        let mut discovery = Self::new();
92        for (idx, addr) in addresses.into_iter().enumerate() {
93            discovery.peers.insert(
94                idx as NodeId,
95                BasicNode { addr },
96            );
97        }
98        discovery
99    }
100
101    pub fn peers(&self) -> &HashMap<NodeId, BasicNode> {
102        &self.peers
103    }
104
105    pub fn into_peers(self) -> HashMap<NodeId, BasicNode> {
106        self.peers
107    }
108}
109
110impl Default for StaticDiscovery {
111    fn default() -> Self {
112        Self::new()
113    }
114}