1use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::time::Instant;
9
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13use crate::slots::{SlotMap, SlotRange};
14use crate::ClusterError;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct NodeId(pub Uuid);
21
22impl NodeId {
23 pub fn new() -> Self {
25 Self(Uuid::new_v4())
26 }
27
28 pub fn parse(s: &str) -> Result<Self, uuid::Error> {
30 Ok(Self(Uuid::parse_str(s)?))
31 }
32}
33
34impl Default for NodeId {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl std::fmt::Display for NodeId {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", &self.0.to_string()[..8])
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum NodeRole {
50 Primary,
52 Replica,
54}
55
56impl std::fmt::Display for NodeRole {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 match self {
59 NodeRole::Primary => write!(f, "primary"),
60 NodeRole::Replica => write!(f, "replica"),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
67pub struct NodeFlags {
68 pub myself: bool,
70 pub pfail: bool,
72 pub fail: bool,
74 pub handshake: bool,
76 pub noaddr: bool,
78}
79
80impl NodeFlags {
81 pub fn is_healthy(&self) -> bool {
83 !self.fail && !self.pfail
84 }
85}
86
87impl std::fmt::Display for NodeFlags {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 let mut flags = Vec::new();
90 if self.myself {
91 flags.push("myself");
92 }
93 if self.pfail {
94 flags.push("pfail");
95 }
96 if self.fail {
97 flags.push("fail");
98 }
99 if self.handshake {
100 flags.push("handshake");
101 }
102 if self.noaddr {
103 flags.push("noaddr");
104 }
105 if flags.is_empty() {
106 write!(f, "-")
107 } else {
108 write!(f, "{}", flags.join(","))
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct ClusterNode {
116 pub id: NodeId,
118 pub addr: SocketAddr,
120 pub cluster_bus_addr: SocketAddr,
123 pub role: NodeRole,
125 pub slots: Vec<SlotRange>,
127 pub replicates: Option<NodeId>,
129 pub replicas: Vec<NodeId>,
131 pub last_seen: Instant,
133 pub last_ping_sent: Option<Instant>,
135 pub last_pong_received: Option<Instant>,
137 pub flags: NodeFlags,
139 pub config_epoch: u64,
141}
142
143impl ClusterNode {
144 pub fn new_primary(id: NodeId, addr: SocketAddr) -> Self {
146 let cluster_bus_addr = SocketAddr::new(addr.ip(), addr.port() + 10000);
147 Self {
148 id,
149 addr,
150 cluster_bus_addr,
151 role: NodeRole::Primary,
152 slots: Vec::new(),
153 replicates: None,
154 replicas: Vec::new(),
155 last_seen: Instant::now(),
156 last_ping_sent: None,
157 last_pong_received: None,
158 flags: NodeFlags::default(),
159 config_epoch: 0,
160 }
161 }
162
163 pub fn new_replica(id: NodeId, addr: SocketAddr, primary_id: NodeId) -> Self {
165 let cluster_bus_addr = SocketAddr::new(addr.ip(), addr.port() + 10000);
166 Self {
167 id,
168 addr,
169 cluster_bus_addr,
170 role: NodeRole::Replica,
171 slots: Vec::new(),
172 replicates: Some(primary_id),
173 replicas: Vec::new(),
174 last_seen: Instant::now(),
175 last_ping_sent: None,
176 last_pong_received: None,
177 flags: NodeFlags::default(),
178 config_epoch: 0,
179 }
180 }
181
182 pub fn set_myself(&mut self) {
184 self.flags.myself = true;
185 }
186
187 pub fn is_healthy(&self) -> bool {
189 self.flags.is_healthy()
190 }
191
192 pub fn slot_count(&self) -> u16 {
194 self.slots.iter().map(|r| r.len()).sum()
195 }
196
197 pub fn to_cluster_nodes_line(&self, slot_map: &SlotMap) -> String {
199 let slots_str = if self.role == NodeRole::Primary {
200 let ranges = slot_map.slots_for_node(self.id);
201 if ranges.is_empty() {
202 String::new()
203 } else {
204 ranges
205 .iter()
206 .map(|r| r.to_string())
207 .collect::<Vec<_>>()
208 .join(" ")
209 }
210 } else {
211 String::new()
212 };
213
214 let replicates_str = self
215 .replicates
216 .map(|id| id.0.to_string())
217 .unwrap_or_else(|| "-".to_string());
218
219 format!(
221 "{} {}@{} {} {} {} {} {} connected {}",
222 self.id.0,
223 self.addr,
224 self.cluster_bus_addr.port(),
225 self.format_flags(),
226 replicates_str,
227 self.last_ping_sent
228 .map(|t| t.elapsed().as_millis() as u64)
229 .unwrap_or(0),
230 self.last_pong_received
231 .map(|t| t.elapsed().as_millis() as u64)
232 .unwrap_or(0),
233 self.config_epoch,
234 slots_str
235 )
236 .trim()
237 .to_string()
238 }
239
240 fn format_flags(&self) -> String {
241 let mut flags = Vec::new();
242
243 if self.flags.myself {
244 flags.push("myself");
245 }
246
247 match self.role {
248 NodeRole::Primary => flags.push("master"),
249 NodeRole::Replica => flags.push("slave"),
250 }
251
252 if self.flags.fail {
253 flags.push("fail");
254 } else if self.flags.pfail {
255 flags.push("fail?");
256 }
257
258 if self.flags.handshake {
259 flags.push("handshake");
260 }
261
262 if self.flags.noaddr {
263 flags.push("noaddr");
264 }
265
266 flags.join(",")
267 }
268}
269
270#[derive(Debug)]
272pub struct ClusterState {
273 pub nodes: HashMap<NodeId, ClusterNode>,
275 pub local_id: NodeId,
277 pub config_epoch: u64,
279 pub slot_map: SlotMap,
281 pub state: ClusterHealth,
283}
284
285#[derive(Debug, Clone, Copy, PartialEq, Eq)]
287pub enum ClusterHealth {
288 Ok,
290 Fail,
292 Unknown,
294}
295
296impl std::fmt::Display for ClusterHealth {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 match self {
299 ClusterHealth::Ok => write!(f, "ok"),
300 ClusterHealth::Fail => write!(f, "fail"),
301 ClusterHealth::Unknown => write!(f, "unknown"),
302 }
303 }
304}
305
306impl ClusterState {
307 pub fn single_node(local_node: ClusterNode) -> Self {
309 let local_id = local_node.id;
310 let slot_map = SlotMap::single_node(local_id);
311 let mut nodes = HashMap::new();
312 nodes.insert(local_id, local_node);
313
314 Self {
315 nodes,
316 local_id,
317 config_epoch: 1,
318 slot_map,
319 state: ClusterHealth::Ok,
320 }
321 }
322
323 pub fn new(local_id: NodeId) -> Self {
325 Self {
326 nodes: HashMap::new(),
327 local_id,
328 config_epoch: 0,
329 slot_map: SlotMap::new(),
330 state: ClusterHealth::Unknown,
331 }
332 }
333
334 pub fn local_node(&self) -> Option<&ClusterNode> {
336 self.nodes.get(&self.local_id)
337 }
338
339 pub fn local_node_mut(&mut self) -> Option<&mut ClusterNode> {
341 self.nodes.get_mut(&self.local_id)
342 }
343
344 pub fn add_node(&mut self, node: ClusterNode) {
346 self.nodes.insert(node.id, node);
347 }
348
349 pub fn remove_node(&mut self, node_id: NodeId) -> Option<ClusterNode> {
351 self.nodes.remove(&node_id)
352 }
353
354 pub fn slot_owner(&self, slot: u16) -> Option<&ClusterNode> {
356 let node_id = self.slot_map.owner(slot)?;
357 self.nodes.get(&node_id)
358 }
359
360 pub fn owns_slot(&self, slot: u16) -> bool {
362 self.slot_map.owner(slot) == Some(self.local_id)
363 }
364
365 pub fn primaries(&self) -> impl Iterator<Item = &ClusterNode> {
367 self.nodes.values().filter(|n| n.role == NodeRole::Primary)
368 }
369
370 pub fn replicas(&self) -> impl Iterator<Item = &ClusterNode> {
372 self.nodes.values().filter(|n| n.role == NodeRole::Replica)
373 }
374
375 pub fn replicas_of(&self, primary_id: NodeId) -> impl Iterator<Item = &ClusterNode> {
377 self.nodes
378 .values()
379 .filter(move |n| n.replicates == Some(primary_id))
380 }
381
382 pub fn update_health(&mut self) {
384 if !self.slot_map.is_complete() {
386 self.state = ClusterHealth::Fail;
387 return;
388 }
389
390 for slot in 0..crate::slots::SLOT_COUNT {
392 if let Some(owner_id) = self.slot_map.owner(slot) {
393 if let Some(node) = self.nodes.get(&owner_id) {
394 if !node.is_healthy() {
395 self.state = ClusterHealth::Fail;
396 return;
397 }
398 } else {
399 self.state = ClusterHealth::Fail;
401 return;
402 }
403 }
404 }
405
406 self.state = ClusterHealth::Ok;
407 }
408
409 pub fn cluster_info(&self) -> String {
411 let primaries: Vec<_> = self.primaries().collect();
412 let assigned_slots: u16 = primaries.iter().map(|n| n.slot_count()).sum();
413
414 format!(
415 "cluster_state:{}\r\n\
416 cluster_slots_assigned:{}\r\n\
417 cluster_slots_ok:{}\r\n\
418 cluster_slots_pfail:0\r\n\
419 cluster_slots_fail:0\r\n\
420 cluster_known_nodes:{}\r\n\
421 cluster_size:{}\r\n\
422 cluster_current_epoch:{}\r\n\
423 cluster_my_epoch:{}\r\n",
424 self.state,
425 assigned_slots,
426 if self.state == ClusterHealth::Ok {
427 assigned_slots
428 } else {
429 0
430 },
431 self.nodes.len(),
432 primaries.len(),
433 self.config_epoch,
434 self.local_node().map(|n| n.config_epoch).unwrap_or(0),
435 )
436 }
437
438 pub fn cluster_nodes(&self) -> String {
440 let mut lines: Vec<String> = self
441 .nodes
442 .values()
443 .map(|node| node.to_cluster_nodes_line(&self.slot_map))
444 .collect();
445 lines.sort(); lines.join("\n")
447 }
448
449 pub fn moved_redirect(&self, slot: u16) -> Result<(u16, SocketAddr), ClusterError> {
451 let node = self
452 .slot_owner(slot)
453 .ok_or(ClusterError::SlotNotAssigned(slot))?;
454 Ok((slot, node.addr))
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461 use std::net::{IpAddr, Ipv4Addr};
462
463 fn test_addr(port: u16) -> SocketAddr {
464 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
465 }
466
467 #[test]
468 fn node_id_display() {
469 let id = NodeId::new();
470 let display = id.to_string();
471 assert_eq!(display.len(), 8);
472 }
473
474 #[test]
475 fn node_id_parse() {
476 let id = NodeId::new();
477 let parsed = NodeId::parse(&id.0.to_string()).unwrap();
478 assert_eq!(id, parsed);
479 }
480
481 #[test]
482 fn node_flags_display() {
483 let mut flags = NodeFlags::default();
484 assert_eq!(flags.to_string(), "-");
485
486 flags.myself = true;
487 assert_eq!(flags.to_string(), "myself");
488
489 flags.pfail = true;
490 assert_eq!(flags.to_string(), "myself,pfail");
491 }
492
493 #[test]
494 fn cluster_node_primary() {
495 let id = NodeId::new();
496 let node = ClusterNode::new_primary(id, test_addr(6379));
497
498 assert_eq!(node.id, id);
499 assert_eq!(node.role, NodeRole::Primary);
500 assert_eq!(node.addr.port(), 6379);
501 assert_eq!(node.cluster_bus_addr.port(), 16379);
502 assert!(node.replicates.is_none());
503 assert!(node.is_healthy());
504 }
505
506 #[test]
507 fn cluster_node_replica() {
508 let primary_id = NodeId::new();
509 let replica_id = NodeId::new();
510 let node = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
511
512 assert_eq!(node.id, replica_id);
513 assert_eq!(node.role, NodeRole::Replica);
514 assert_eq!(node.replicates, Some(primary_id));
515 }
516
517 #[test]
518 fn cluster_state_single_node() {
519 let id = NodeId::new();
520 let mut node = ClusterNode::new_primary(id, test_addr(6379));
521 node.set_myself();
522
523 let state = ClusterState::single_node(node);
524
525 assert_eq!(state.local_id, id);
526 assert!(state.owns_slot(0));
527 assert!(state.owns_slot(16383));
528 assert_eq!(state.state, ClusterHealth::Ok);
529 }
530
531 #[test]
532 fn cluster_state_slot_owner() {
533 let id = NodeId::new();
534 let mut node = ClusterNode::new_primary(id, test_addr(6379));
535 node.set_myself();
536
537 let state = ClusterState::single_node(node);
538
539 let owner = state.slot_owner(100).unwrap();
540 assert_eq!(owner.id, id);
541 }
542
543 #[test]
544 fn cluster_state_health_check() {
545 let id = NodeId::new();
546 let mut node = ClusterNode::new_primary(id, test_addr(6379));
547 node.set_myself();
548
549 let mut state = ClusterState::single_node(node);
550 state.update_health();
551 assert_eq!(state.state, ClusterHealth::Ok);
552
553 state.slot_map.unassign(0);
555 state.update_health();
556 assert_eq!(state.state, ClusterHealth::Fail);
557 }
558
559 #[test]
560 fn cluster_info_format() {
561 let id = NodeId::new();
562 let mut node = ClusterNode::new_primary(id, test_addr(6379));
563 node.set_myself();
564
565 let state = ClusterState::single_node(node);
566 let info = state.cluster_info();
567
568 assert!(info.contains("cluster_state:ok"));
569 assert!(info.contains("cluster_slots_assigned:0")); assert!(info.contains("cluster_known_nodes:1"));
571 }
572
573 #[test]
574 fn moved_redirect() {
575 let id = NodeId::new();
576 let mut node = ClusterNode::new_primary(id, test_addr(6379));
577 node.set_myself();
578
579 let state = ClusterState::single_node(node);
580
581 let (slot, addr) = state.moved_redirect(100).unwrap();
582 assert_eq!(slot, 100);
583 assert_eq!(addr.port(), 6379);
584 }
585
586 #[test]
587 fn primaries_and_replicas() {
588 let primary_id = NodeId::new();
589 let replica_id = NodeId::new();
590
591 let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
592 primary.set_myself();
593
594 let mut state = ClusterState::single_node(primary);
595
596 let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
597 state.add_node(replica);
598
599 assert_eq!(state.primaries().count(), 1);
600 assert_eq!(state.replicas().count(), 1);
601 assert_eq!(state.replicas_of(primary_id).count(), 1);
602 }
603}