1use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::time::Instant;
9
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13use crate::slots::{SlotMap, SlotRange, SLOT_COUNT};
14use crate::ClusterError;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct NodeId(pub Uuid);
21
22impl NodeId {
23 pub fn new() -> Self {
25 Self(Uuid::new_v4())
26 }
27
28 pub fn parse(s: &str) -> Result<Self, uuid::Error> {
30 Ok(Self(Uuid::parse_str(s)?))
31 }
32}
33
34impl Default for NodeId {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl std::fmt::Display for NodeId {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", &self.0.to_string()[..8])
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum NodeRole {
50 Primary,
52 Replica,
54}
55
56impl std::fmt::Display for NodeRole {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 match self {
59 NodeRole::Primary => write!(f, "primary"),
60 NodeRole::Replica => write!(f, "replica"),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
67pub struct NodeFlags {
68 pub myself: bool,
70 pub pfail: bool,
72 pub fail: bool,
74 pub handshake: bool,
76 pub noaddr: bool,
78}
79
80impl NodeFlags {
81 pub fn is_healthy(&self) -> bool {
83 !self.fail && !self.pfail
84 }
85}
86
87impl std::fmt::Display for NodeFlags {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 let mut flags = Vec::new();
90 if self.myself {
91 flags.push("myself");
92 }
93 if self.pfail {
94 flags.push("pfail");
95 }
96 if self.fail {
97 flags.push("fail");
98 }
99 if self.handshake {
100 flags.push("handshake");
101 }
102 if self.noaddr {
103 flags.push("noaddr");
104 }
105 if flags.is_empty() {
106 write!(f, "-")
107 } else {
108 write!(f, "{}", flags.join(","))
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct ClusterNode {
116 pub id: NodeId,
118 pub addr: SocketAddr,
120 pub cluster_bus_addr: SocketAddr,
123 pub role: NodeRole,
125 pub slots: Vec<SlotRange>,
127 pub replicates: Option<NodeId>,
129 pub replicas: Vec<NodeId>,
131 pub last_seen: Instant,
133 pub last_ping_sent: Option<Instant>,
135 pub last_pong_received: Option<Instant>,
137 pub flags: NodeFlags,
139 pub config_epoch: u64,
141}
142
143impl ClusterNode {
144 pub fn new_primary(id: NodeId, addr: SocketAddr) -> Self {
146 Self::new_primary_with_offset(id, addr, 10000)
147 }
148
149 pub fn new_primary_with_offset(id: NodeId, addr: SocketAddr, bus_port_offset: u16) -> Self {
151 let cluster_bus_addr =
152 SocketAddr::new(addr.ip(), addr.port().wrapping_add(bus_port_offset));
153 Self {
154 id,
155 addr,
156 cluster_bus_addr,
157 role: NodeRole::Primary,
158 slots: Vec::new(),
159 replicates: None,
160 replicas: Vec::new(),
161 last_seen: Instant::now(),
162 last_ping_sent: None,
163 last_pong_received: None,
164 flags: NodeFlags::default(),
165 config_epoch: 0,
166 }
167 }
168
169 pub fn new_replica(id: NodeId, addr: SocketAddr, primary_id: NodeId) -> Self {
171 let cluster_bus_addr = SocketAddr::new(addr.ip(), addr.port() + 10000);
172 Self {
173 id,
174 addr,
175 cluster_bus_addr,
176 role: NodeRole::Replica,
177 slots: Vec::new(),
178 replicates: Some(primary_id),
179 replicas: Vec::new(),
180 last_seen: Instant::now(),
181 last_ping_sent: None,
182 last_pong_received: None,
183 flags: NodeFlags::default(),
184 config_epoch: 0,
185 }
186 }
187
188 pub fn set_myself(&mut self) {
190 self.flags.myself = true;
191 }
192
193 pub fn is_healthy(&self) -> bool {
195 self.flags.is_healthy()
196 }
197
198 pub fn slot_count(&self) -> u16 {
200 self.slots.iter().map(|r| r.len()).sum()
201 }
202
203 pub fn to_cluster_nodes_line(&self, slot_map: &SlotMap) -> String {
205 let slots_str = if self.role == NodeRole::Primary {
206 let ranges = slot_map.slots_for_node(self.id);
207 if ranges.is_empty() {
208 String::new()
209 } else {
210 ranges
211 .iter()
212 .map(|r| r.to_string())
213 .collect::<Vec<_>>()
214 .join(" ")
215 }
216 } else {
217 String::new()
218 };
219
220 let replicates_str = self
221 .replicates
222 .map(|id| id.0.to_string())
223 .unwrap_or_else(|| "-".to_string());
224
225 format!(
227 "{} {}@{} {} {} {} {} {} connected {}",
228 self.id.0,
229 self.addr,
230 self.cluster_bus_addr.port(),
231 self.format_flags(),
232 replicates_str,
233 self.last_ping_sent
234 .map(|t| t.elapsed().as_millis() as u64)
235 .unwrap_or(0),
236 self.last_pong_received
237 .map(|t| t.elapsed().as_millis() as u64)
238 .unwrap_or(0),
239 self.config_epoch,
240 slots_str
241 )
242 .trim()
243 .to_string()
244 }
245
246 fn format_flags(&self) -> String {
247 let mut flags = Vec::new();
248
249 if self.flags.myself {
250 flags.push("myself");
251 }
252
253 match self.role {
254 NodeRole::Primary => flags.push("master"),
255 NodeRole::Replica => flags.push("slave"),
256 }
257
258 if self.flags.fail {
259 flags.push("fail");
260 } else if self.flags.pfail {
261 flags.push("fail?");
262 }
263
264 if self.flags.handshake {
265 flags.push("handshake");
266 }
267
268 if self.flags.noaddr {
269 flags.push("noaddr");
270 }
271
272 flags.join(",")
273 }
274}
275
276#[derive(Debug)]
278pub struct ClusterState {
279 pub nodes: HashMap<NodeId, ClusterNode>,
281 pub local_id: NodeId,
283 pub config_epoch: u64,
285 pub slot_map: SlotMap,
287 pub state: ClusterHealth,
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
293pub enum ClusterHealth {
294 Ok,
296 Fail,
298 Unknown,
300}
301
302impl std::fmt::Display for ClusterHealth {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 match self {
305 ClusterHealth::Ok => write!(f, "ok"),
306 ClusterHealth::Fail => write!(f, "fail"),
307 ClusterHealth::Unknown => write!(f, "unknown"),
308 }
309 }
310}
311
312impl ClusterState {
313 pub fn single_node(local_node: ClusterNode) -> Self {
315 let local_id = local_node.id;
316 let slot_map = SlotMap::single_node(local_id);
317 let mut nodes = HashMap::new();
318 nodes.insert(local_id, local_node);
319
320 Self {
321 nodes,
322 local_id,
323 config_epoch: 1,
324 slot_map,
325 state: ClusterHealth::Ok,
326 }
327 }
328
329 pub fn new(local_id: NodeId) -> Self {
331 Self {
332 nodes: HashMap::new(),
333 local_id,
334 config_epoch: 0,
335 slot_map: SlotMap::new(),
336 state: ClusterHealth::Unknown,
337 }
338 }
339
340 pub fn local_node(&self) -> Option<&ClusterNode> {
342 self.nodes.get(&self.local_id)
343 }
344
345 pub fn local_node_mut(&mut self) -> Option<&mut ClusterNode> {
347 self.nodes.get_mut(&self.local_id)
348 }
349
350 pub fn add_node(&mut self, node: ClusterNode) {
352 self.nodes.insert(node.id, node);
353 }
354
355 pub fn remove_node(&mut self, node_id: NodeId) -> Option<ClusterNode> {
357 self.nodes.remove(&node_id)
358 }
359
360 pub fn slot_owner(&self, slot: u16) -> Option<&ClusterNode> {
362 let node_id = self.slot_map.owner(slot)?;
363 self.nodes.get(&node_id)
364 }
365
366 pub fn owns_slot(&self, slot: u16) -> bool {
368 self.slot_map.owner(slot) == Some(self.local_id)
369 }
370
371 pub fn primaries(&self) -> impl Iterator<Item = &ClusterNode> {
373 self.nodes.values().filter(|n| n.role == NodeRole::Primary)
374 }
375
376 pub fn replicas(&self) -> impl Iterator<Item = &ClusterNode> {
378 self.nodes.values().filter(|n| n.role == NodeRole::Replica)
379 }
380
381 pub fn replicas_of(&self, primary_id: NodeId) -> impl Iterator<Item = &ClusterNode> {
383 self.nodes
384 .values()
385 .filter(move |n| n.replicates == Some(primary_id))
386 }
387
388 pub fn update_health(&mut self) {
390 if !self.slot_map.is_complete() {
392 self.state = ClusterHealth::Fail;
393 return;
394 }
395
396 for slot in 0..crate::slots::SLOT_COUNT {
398 if let Some(owner_id) = self.slot_map.owner(slot) {
399 if let Some(node) = self.nodes.get(&owner_id) {
400 if !node.is_healthy() {
401 self.state = ClusterHealth::Fail;
402 return;
403 }
404 } else {
405 self.state = ClusterHealth::Fail;
407 return;
408 }
409 }
410 }
411
412 self.state = ClusterHealth::Ok;
413 }
414
415 pub fn cluster_info(&self) -> String {
417 let assigned_slots = (SLOT_COUNT as usize - self.slot_map.unassigned_count()) as u16;
418 let primaries_count = self.primaries().count();
419
420 format!(
421 "cluster_state:{}\r\n\
422 cluster_slots_assigned:{}\r\n\
423 cluster_slots_ok:{}\r\n\
424 cluster_slots_pfail:0\r\n\
425 cluster_slots_fail:0\r\n\
426 cluster_known_nodes:{}\r\n\
427 cluster_size:{}\r\n\
428 cluster_current_epoch:{}\r\n\
429 cluster_my_epoch:{}\r\n",
430 self.state,
431 assigned_slots,
432 if self.state == ClusterHealth::Ok {
433 assigned_slots
434 } else {
435 0
436 },
437 self.nodes.len(),
438 primaries_count,
439 self.config_epoch,
440 self.local_node().map(|n| n.config_epoch).unwrap_or(0),
441 )
442 }
443
444 pub fn cluster_nodes(&self) -> String {
446 let mut lines: Vec<String> = self
447 .nodes
448 .values()
449 .map(|node| node.to_cluster_nodes_line(&self.slot_map))
450 .collect();
451 lines.sort(); lines.join("\n")
453 }
454
455 pub fn moved_redirect(&self, slot: u16) -> Result<(u16, SocketAddr), ClusterError> {
457 let node = self
458 .slot_owner(slot)
459 .ok_or(ClusterError::SlotNotAssigned(slot))?;
460 Ok((slot, node.addr))
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use std::net::{IpAddr, Ipv4Addr};
468
469 fn test_addr(port: u16) -> SocketAddr {
470 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
471 }
472
473 #[test]
474 fn node_id_display() {
475 let id = NodeId::new();
476 let display = id.to_string();
477 assert_eq!(display.len(), 8);
478 }
479
480 #[test]
481 fn node_id_parse() {
482 let id = NodeId::new();
483 let parsed = NodeId::parse(&id.0.to_string()).unwrap();
484 assert_eq!(id, parsed);
485 }
486
487 #[test]
488 fn node_flags_display() {
489 let mut flags = NodeFlags::default();
490 assert_eq!(flags.to_string(), "-");
491
492 flags.myself = true;
493 assert_eq!(flags.to_string(), "myself");
494
495 flags.pfail = true;
496 assert_eq!(flags.to_string(), "myself,pfail");
497 }
498
499 #[test]
500 fn cluster_node_primary() {
501 let id = NodeId::new();
502 let node = ClusterNode::new_primary(id, test_addr(6379));
503
504 assert_eq!(node.id, id);
505 assert_eq!(node.role, NodeRole::Primary);
506 assert_eq!(node.addr.port(), 6379);
507 assert_eq!(node.cluster_bus_addr.port(), 16379);
508 assert!(node.replicates.is_none());
509 assert!(node.is_healthy());
510 }
511
512 #[test]
513 fn cluster_node_replica() {
514 let primary_id = NodeId::new();
515 let replica_id = NodeId::new();
516 let node = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
517
518 assert_eq!(node.id, replica_id);
519 assert_eq!(node.role, NodeRole::Replica);
520 assert_eq!(node.replicates, Some(primary_id));
521 }
522
523 #[test]
524 fn cluster_state_single_node() {
525 let id = NodeId::new();
526 let mut node = ClusterNode::new_primary(id, test_addr(6379));
527 node.set_myself();
528
529 let state = ClusterState::single_node(node);
530
531 assert_eq!(state.local_id, id);
532 assert!(state.owns_slot(0));
533 assert!(state.owns_slot(16383));
534 assert_eq!(state.state, ClusterHealth::Ok);
535 }
536
537 #[test]
538 fn cluster_state_slot_owner() {
539 let id = NodeId::new();
540 let mut node = ClusterNode::new_primary(id, test_addr(6379));
541 node.set_myself();
542
543 let state = ClusterState::single_node(node);
544
545 let owner = state.slot_owner(100).unwrap();
546 assert_eq!(owner.id, id);
547 }
548
549 #[test]
550 fn cluster_state_health_check() {
551 let id = NodeId::new();
552 let mut node = ClusterNode::new_primary(id, test_addr(6379));
553 node.set_myself();
554
555 let mut state = ClusterState::single_node(node);
556 state.update_health();
557 assert_eq!(state.state, ClusterHealth::Ok);
558
559 state.slot_map.unassign(0);
561 state.update_health();
562 assert_eq!(state.state, ClusterHealth::Fail);
563 }
564
565 #[test]
566 fn cluster_info_format() {
567 let id = NodeId::new();
568 let mut node = ClusterNode::new_primary(id, test_addr(6379));
569 node.set_myself();
570
571 let state = ClusterState::single_node(node);
572 let info = state.cluster_info();
573
574 assert!(info.contains("cluster_state:ok"));
575 assert!(info.contains("cluster_slots_assigned:16384"));
576 assert!(info.contains("cluster_known_nodes:1"));
577 }
578
579 #[test]
580 fn moved_redirect() {
581 let id = NodeId::new();
582 let mut node = ClusterNode::new_primary(id, test_addr(6379));
583 node.set_myself();
584
585 let state = ClusterState::single_node(node);
586
587 let (slot, addr) = state.moved_redirect(100).unwrap();
588 assert_eq!(slot, 100);
589 assert_eq!(addr.port(), 6379);
590 }
591
592 #[test]
593 fn primaries_and_replicas() {
594 let primary_id = NodeId::new();
595 let replica_id = NodeId::new();
596
597 let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
598 primary.set_myself();
599
600 let mut state = ClusterState::single_node(primary);
601
602 let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
603 state.add_node(replica);
604
605 assert_eq!(state.primaries().count(), 1);
606 assert_eq!(state.replicas().count(), 1);
607 assert_eq!(state.replicas_of(primary_id).count(), 1);
608 }
609}