1use serde::{Deserialize, Serialize};
4use std::net::SocketAddr;
5use std::time::Instant;
6
7pub type NodeId = String;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12#[serde(rename_all = "lowercase")]
13#[derive(Default)]
14pub enum NodeState {
15 Alive,
17 Suspect,
19 Dead,
21 Leaving,
23 #[default]
25 Unknown,
26}
27
28impl NodeState {
29 pub fn is_healthy(&self) -> bool {
31 matches!(self, NodeState::Alive)
32 }
33
34 pub fn is_reachable(&self) -> bool {
36 matches!(self, NodeState::Alive | NodeState::Suspect)
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
42pub struct NodeCapabilities {
43 pub voter: bool,
45 pub leader_eligible: bool,
47 pub replica_eligible: bool,
49}
50
51impl NodeCapabilities {
52 pub fn full() -> Self {
54 Self {
55 voter: true,
56 leader_eligible: true,
57 replica_eligible: true,
58 }
59 }
60
61 pub fn observer() -> Self {
63 Self {
64 voter: false,
65 leader_eligible: false,
66 replica_eligible: true,
67 }
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73pub struct NodeInfo {
74 pub id: NodeId,
76
77 pub name: Option<String>,
79
80 pub rack: Option<String>,
82
83 pub client_addr: SocketAddr,
85
86 pub cluster_addr: SocketAddr,
88
89 pub capabilities: NodeCapabilities,
91
92 pub version: String,
94
95 pub tags: std::collections::HashMap<String, String>,
97}
98
99impl NodeInfo {
100 pub fn new(id: impl Into<String>, client_addr: SocketAddr, cluster_addr: SocketAddr) -> Self {
102 Self {
103 id: id.into(),
104 name: None,
105 rack: None,
106 client_addr,
107 cluster_addr,
108 capabilities: NodeCapabilities::full(),
109 version: env!("CARGO_PKG_VERSION").to_string(),
110 tags: std::collections::HashMap::new(),
111 }
112 }
113
114 pub fn with_name(mut self, name: impl Into<String>) -> Self {
116 self.name = Some(name.into());
117 self
118 }
119
120 pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
122 self.rack = Some(rack.into());
123 self
124 }
125
126 pub fn with_capabilities(mut self, capabilities: NodeCapabilities) -> Self {
128 self.capabilities = capabilities;
129 self
130 }
131
132 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
134 self.tags.insert(key.into(), value.into());
135 self
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct Node {
142 pub info: NodeInfo,
144
145 pub state: NodeState,
147
148 pub incarnation: u64,
150
151 pub last_seen: Instant,
153
154 pub partition_leader_count: u32,
156
157 pub partition_replica_count: u32,
159
160 pub is_raft_leader: bool,
162}
163
164impl Node {
165 pub fn new(info: NodeInfo) -> Self {
167 Self {
168 info,
169 state: NodeState::Unknown,
170 incarnation: 0,
171 last_seen: Instant::now(),
172 partition_leader_count: 0,
173 partition_replica_count: 0,
174 is_raft_leader: false,
175 }
176 }
177
178 pub fn touch(&mut self) {
180 self.last_seen = Instant::now();
181 }
182
183 pub fn mark_alive(&mut self, incarnation: u64) -> bool {
192 match self.state {
193 NodeState::Dead => {
194 if incarnation <= self.incarnation {
196 return false;
197 }
198 }
199 NodeState::Alive => {
200 if incarnation < self.incarnation {
202 return false;
203 }
204 }
205 NodeState::Unknown | NodeState::Suspect => {
206 if incarnation < self.incarnation {
208 return false;
209 }
210 }
211 NodeState::Leaving => {
212 if incarnation <= self.incarnation {
214 return false;
215 }
216 }
217 }
218 self.state = NodeState::Alive;
219 self.incarnation = incarnation;
220 self.touch();
221 true
222 }
223
224 pub fn mark_suspect(&mut self) -> bool {
232 match self.state {
236 NodeState::Alive | NodeState::Unknown => {
237 self.state = NodeState::Suspect;
238 true
239 }
240 _ => false,
242 }
243 }
244
245 pub fn mark_dead(&mut self) -> bool {
253 match self.state {
254 NodeState::Suspect => {
255 self.state = NodeState::Dead;
256 true
257 }
258 NodeState::Dead => false, _ => false, }
261 }
262
263 pub fn mark_leaving(&mut self) -> bool {
268 match self.state {
269 NodeState::Dead => false,
270 _ => {
271 self.state = NodeState::Leaving;
272 true
273 }
274 }
275 }
276
277 pub fn is_healthy(&self) -> bool {
279 self.state.is_healthy()
280 }
281
282 pub fn id(&self) -> &str {
284 &self.info.id
285 }
286
287 pub fn cluster_addr(&self) -> SocketAddr {
289 self.info.cluster_addr
290 }
291
292 pub fn client_addr(&self) -> SocketAddr {
294 self.info.client_addr
295 }
296
297 pub fn load_score(&self) -> u32 {
299 self.partition_leader_count * 3 + self.partition_replica_count
301 }
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct NodeGossipState {
307 pub id: NodeId,
308 pub state: NodeState,
309 pub incarnation: u64,
310 pub cluster_addr: SocketAddr,
311 pub client_addr: SocketAddr,
312 pub rack: Option<String>,
313 pub capabilities: NodeCapabilities,
314}
315
316impl From<&Node> for NodeGossipState {
317 fn from(node: &Node) -> Self {
318 Self {
319 id: node.info.id.clone(),
320 state: node.state,
321 incarnation: node.incarnation,
322 cluster_addr: node.info.cluster_addr,
323 client_addr: node.info.client_addr,
324 rack: node.info.rack.clone(),
325 capabilities: node.info.capabilities,
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn test_node_state_transitions() {
336 let info = NodeInfo::new(
337 "node-1",
338 "127.0.0.1:9092".parse().unwrap(),
339 "127.0.0.1:9093".parse().unwrap(),
340 );
341 let mut node = Node::new(info);
342
343 assert_eq!(node.state, NodeState::Unknown);
344 assert!(!node.is_healthy());
345
346 assert!(node.mark_alive(1));
347 assert_eq!(node.state, NodeState::Alive);
348 assert!(node.is_healthy());
349
350 assert!(node.mark_suspect());
351 assert_eq!(node.state, NodeState::Suspect);
352 assert!(!node.is_healthy());
353 assert!(node.state.is_reachable());
354
355 assert!(node.mark_dead());
356 assert_eq!(node.state, NodeState::Dead);
357 assert!(!node.state.is_reachable());
358
359 assert!(!node.mark_suspect());
361 assert_eq!(node.state, NodeState::Dead);
362
363 assert!(!node.mark_alive(1));
365 assert_eq!(node.state, NodeState::Dead);
366 assert!(node.mark_alive(2));
367 assert_eq!(node.state, NodeState::Alive);
368 }
369
370 #[test]
371 fn test_load_score() {
372 let info = NodeInfo::new(
373 "node-1",
374 "127.0.0.1:9092".parse().unwrap(),
375 "127.0.0.1:9093".parse().unwrap(),
376 );
377 let mut node = Node::new(info);
378
379 node.partition_leader_count = 2;
380 node.partition_replica_count = 4;
381
382 assert_eq!(node.load_score(), 2 * 3 + 4);
384 }
385
386 #[test]
387 fn test_node_capabilities() {
388 let full = NodeCapabilities::full();
389 assert!(full.voter && full.leader_eligible && full.replica_eligible);
390
391 let observer = NodeCapabilities::observer();
392 assert!(!observer.voter && !observer.leader_eligible && observer.replica_eligible);
393 }
394}