k8s_operator_raft/
discovery.rs1use std::collections::HashMap;
2use std::net::ToSocketAddrs;
3use std::time::Duration;
4
5use tracing::debug;
6
7use k8s_operator_storage::{BasicNode, NodeId};
8
9pub struct HeadlessServiceDiscovery {
10 service_name: String,
11 namespace: String,
12 port: u16,
13 refresh_interval: Duration,
14}
15
16impl HeadlessServiceDiscovery {
17 pub fn new(service_name: &str, namespace: &str, port: u16) -> Self {
18 Self {
19 service_name: service_name.to_string(),
20 namespace: namespace.to_string(),
21 port,
22 refresh_interval: Duration::from_secs(10),
23 }
24 }
25
26 pub fn with_refresh_interval(mut self, interval: Duration) -> Self {
27 self.refresh_interval = interval;
28 self
29 }
30
31 pub fn dns_name(&self) -> String {
32 format!("{}.{}.svc.cluster.local", self.service_name, self.namespace)
33 }
34
35 pub fn pod_dns_name(&self, ordinal: u64) -> String {
36 format!(
37 "{}-{}.{}.{}.svc.cluster.local",
38 self.service_name.trim_end_matches("-headless"),
39 ordinal,
40 self.service_name,
41 self.namespace
42 )
43 }
44
45 pub fn discover_by_ordinal(&self, num_replicas: u64) -> HashMap<NodeId, BasicNode> {
46 let mut peers = HashMap::new();
47
48 for ordinal in 0..num_replicas {
49 let dns_name = self.pod_dns_name(ordinal);
50 let addr = format!("{}:{}", dns_name, self.port);
51
52 if let Ok(mut addrs) = addr.to_socket_addrs() {
53 if let Some(socket_addr) = addrs.next() {
54 let node = BasicNode {
55 addr: format!("{}:{}", socket_addr.ip(), self.port),
56 };
57 peers.insert(ordinal, node);
58 debug!("Discovered peer {}: {}", ordinal, dns_name);
59 }
60 } else {
61 debug!("Could not resolve peer {}: {}", ordinal, dns_name);
62 }
63 }
64
65 peers
66 }
67}
68
69pub struct StaticDiscovery {
70 peers: HashMap<NodeId, BasicNode>,
71}
72
73impl StaticDiscovery {
74 pub fn new() -> Self {
75 Self {
76 peers: HashMap::new(),
77 }
78 }
79
80 pub fn add_peer(mut self, node_id: NodeId, addr: impl Into<String>) -> Self {
81 self.peers.insert(
82 node_id,
83 BasicNode {
84 addr: addr.into(),
85 },
86 );
87 self
88 }
89
90 pub fn from_addresses(addresses: Vec<String>) -> Self {
91 let mut discovery = Self::new();
92 for (idx, addr) in addresses.into_iter().enumerate() {
93 discovery.peers.insert(
94 idx as NodeId,
95 BasicNode { addr },
96 );
97 }
98 discovery
99 }
100
101 pub fn peers(&self) -> &HashMap<NodeId, BasicNode> {
102 &self.peers
103 }
104
105 pub fn into_peers(self) -> HashMap<NodeId, BasicNode> {
106 self.peers
107 }
108}
109
110impl Default for StaticDiscovery {
111 fn default() -> Self {
112 Self::new()
113 }
114}