1use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::time::Instant;
9
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13use crate::slots::{SlotMap, SlotRange, SLOT_COUNT};
14use crate::ClusterError;
15
16#[derive(Debug, thiserror::Error)]
18pub enum ConfigParseError {
19 #[error("missing vars header line")]
20 MissingVarsLine,
21 #[error("invalid vars line: {0}")]
22 InvalidVarsLine(String),
23 #[error("no node with 'myself' flag found")]
24 NoMyselfNode,
25 #[error("invalid node line: {0}")]
26 InvalidNodeLine(String),
27 #[error("invalid node id: {0}")]
28 InvalidNodeId(String),
29 #[error("invalid address: {0}")]
30 InvalidAddress(String),
31 #[error("invalid slot range: {0}")]
32 InvalidSlotRange(String),
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
39pub struct NodeId(pub Uuid);
40
41impl NodeId {
42 pub fn new() -> Self {
44 Self(Uuid::new_v4())
45 }
46
47 pub fn parse(s: &str) -> Result<Self, uuid::Error> {
49 Ok(Self(Uuid::parse_str(s)?))
50 }
51
52 pub(crate) fn as_key(&self) -> String {
58 self.0.to_string()
59 }
60}
61
62impl Default for NodeId {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl std::fmt::Display for NodeId {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(f, "{}", &self.0.to_string()[..8])
72 }
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum NodeRole {
78 Primary,
80 Replica,
82}
83
84impl std::fmt::Display for NodeRole {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 match self {
87 NodeRole::Primary => write!(f, "primary"),
88 NodeRole::Replica => write!(f, "replica"),
89 }
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
95pub struct NodeFlags {
96 pub myself: bool,
98 pub pfail: bool,
100 pub fail: bool,
102 pub handshake: bool,
104 pub noaddr: bool,
106}
107
108impl NodeFlags {
109 pub fn is_healthy(&self) -> bool {
111 !self.fail && !self.pfail
112 }
113}
114
115impl std::fmt::Display for NodeFlags {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 let mut flags = Vec::new();
118 if self.myself {
119 flags.push("myself");
120 }
121 if self.pfail {
122 flags.push("pfail");
123 }
124 if self.fail {
125 flags.push("fail");
126 }
127 if self.handshake {
128 flags.push("handshake");
129 }
130 if self.noaddr {
131 flags.push("noaddr");
132 }
133 if flags.is_empty() {
134 write!(f, "-")
135 } else {
136 write!(f, "{}", flags.join(","))
137 }
138 }
139}
140
141#[derive(Debug, Clone)]
143pub struct ClusterNode {
144 pub id: NodeId,
146 pub addr: SocketAddr,
148 pub cluster_bus_addr: SocketAddr,
151 pub role: NodeRole,
153 pub slots: Vec<SlotRange>,
155 pub replicates: Option<NodeId>,
157 pub replicas: Vec<NodeId>,
159 pub last_seen: Instant,
161 pub last_ping_sent: Option<Instant>,
163 pub last_pong_received: Option<Instant>,
165 pub flags: NodeFlags,
167 pub config_epoch: u64,
169}
170
171impl ClusterNode {
172 pub fn new_primary(id: NodeId, addr: SocketAddr) -> Self {
174 Self::new_primary_with_offset(id, addr, 10000)
175 }
176
177 pub fn new_primary_with_offset(id: NodeId, addr: SocketAddr, bus_port_offset: u16) -> Self {
179 let cluster_bus_addr =
180 SocketAddr::new(addr.ip(), addr.port().saturating_add(bus_port_offset));
181 Self {
182 id,
183 addr,
184 cluster_bus_addr,
185 role: NodeRole::Primary,
186 slots: Vec::new(),
187 replicates: None,
188 replicas: Vec::new(),
189 last_seen: Instant::now(),
190 last_ping_sent: None,
191 last_pong_received: None,
192 flags: NodeFlags::default(),
193 config_epoch: 0,
194 }
195 }
196
197 pub fn new_replica(id: NodeId, addr: SocketAddr, primary_id: NodeId) -> Self {
199 let cluster_bus_addr = SocketAddr::new(addr.ip(), addr.port().saturating_add(10000));
200 Self {
201 id,
202 addr,
203 cluster_bus_addr,
204 role: NodeRole::Replica,
205 slots: Vec::new(),
206 replicates: Some(primary_id),
207 replicas: Vec::new(),
208 last_seen: Instant::now(),
209 last_ping_sent: None,
210 last_pong_received: None,
211 flags: NodeFlags::default(),
212 config_epoch: 0,
213 }
214 }
215
216 pub fn set_myself(&mut self) {
218 self.flags.myself = true;
219 }
220
221 pub fn is_healthy(&self) -> bool {
223 self.flags.is_healthy()
224 }
225
226 pub fn slot_count(&self) -> u16 {
228 self.slots.iter().map(|r| r.len()).sum()
229 }
230
231 pub fn to_cluster_nodes_line(&self, slot_map: &SlotMap) -> String {
233 let slots_str = if self.role == NodeRole::Primary {
234 let ranges = slot_map.slots_for_node(self.id);
235 if ranges.is_empty() {
236 String::new()
237 } else {
238 ranges
239 .iter()
240 .map(|r| r.to_string())
241 .collect::<Vec<_>>()
242 .join(" ")
243 }
244 } else {
245 String::new()
246 };
247
248 let replicates_str = self
249 .replicates
250 .map(|id| id.0.to_string())
251 .unwrap_or_else(|| "-".to_string());
252
253 let ping_ms = self
254 .last_ping_sent
255 .map(|t| t.elapsed().as_millis() as u64)
256 .unwrap_or(0);
257 let pong_ms = self
258 .last_pong_received
259 .map(|t| t.elapsed().as_millis() as u64)
260 .unwrap_or(0);
261
262 format!(
264 "{} {}@{} {} {} {} {} {} connected {}",
265 self.id.0,
266 self.addr,
267 self.cluster_bus_addr.port(),
268 self.format_flags(),
269 replicates_str,
270 ping_ms,
271 pong_ms,
272 self.config_epoch,
273 slots_str
274 )
275 .trim()
276 .to_string()
277 }
278
279 fn format_flags(&self) -> String {
286 let mut flags = Vec::new();
287
288 if self.flags.myself {
289 flags.push("myself");
290 }
291
292 match self.role {
293 NodeRole::Primary => flags.push("master"),
294 NodeRole::Replica => flags.push("slave"),
295 }
296
297 if self.flags.fail {
298 flags.push("fail");
299 } else if self.flags.pfail {
300 flags.push("fail?");
301 }
302
303 if self.flags.handshake {
304 flags.push("handshake");
305 }
306
307 if self.flags.noaddr {
308 flags.push("noaddr");
309 }
310
311 flags.join(",")
312 }
313}
314
315#[derive(Debug)]
317pub struct ClusterState {
318 pub nodes: HashMap<NodeId, ClusterNode>,
320 pub local_id: NodeId,
322 pub config_epoch: u64,
324 pub slot_map: SlotMap,
326 pub state: ClusterHealth,
328}
329
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
332pub enum ClusterHealth {
333 Ok,
335 Fail,
337 Unknown,
339}
340
341impl std::fmt::Display for ClusterHealth {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 match self {
344 ClusterHealth::Ok => write!(f, "ok"),
345 ClusterHealth::Fail => write!(f, "fail"),
346 ClusterHealth::Unknown => write!(f, "unknown"),
347 }
348 }
349}
350
351impl ClusterState {
352 pub fn single_node(local_node: ClusterNode) -> Self {
354 let local_id = local_node.id;
355 let slot_map = SlotMap::single_node(local_id);
356 let mut nodes = HashMap::new();
357 nodes.insert(local_id, local_node);
358
359 Self {
360 nodes,
361 local_id,
362 config_epoch: 1,
363 slot_map,
364 state: ClusterHealth::Ok,
365 }
366 }
367
368 pub fn new(local_id: NodeId) -> Self {
370 Self {
371 nodes: HashMap::new(),
372 local_id,
373 config_epoch: 0,
374 slot_map: SlotMap::new(),
375 state: ClusterHealth::Unknown,
376 }
377 }
378
379 pub fn local_node(&self) -> Option<&ClusterNode> {
381 self.nodes.get(&self.local_id)
382 }
383
384 pub fn local_node_mut(&mut self) -> Option<&mut ClusterNode> {
386 self.nodes.get_mut(&self.local_id)
387 }
388
389 pub fn add_node(&mut self, node: ClusterNode) {
391 self.nodes.insert(node.id, node);
392 }
393
394 pub fn remove_node(&mut self, node_id: NodeId) -> Option<ClusterNode> {
396 self.nodes.remove(&node_id)
397 }
398
399 pub fn slot_owner(&self, slot: u16) -> Option<&ClusterNode> {
401 let node_id = self.slot_map.owner(slot)?;
402 self.nodes.get(&node_id)
403 }
404
405 pub fn owns_slot(&self, slot: u16) -> bool {
407 self.slot_map.owner(slot) == Some(self.local_id)
408 }
409
410 pub fn primaries(&self) -> impl Iterator<Item = &ClusterNode> {
412 self.nodes.values().filter(|n| n.role == NodeRole::Primary)
413 }
414
415 pub fn replicas(&self) -> impl Iterator<Item = &ClusterNode> {
417 self.nodes.values().filter(|n| n.role == NodeRole::Replica)
418 }
419
420 pub fn replicas_of(&self, primary_id: NodeId) -> impl Iterator<Item = &ClusterNode> {
422 self.nodes
423 .values()
424 .filter(move |n| n.replicates == Some(primary_id))
425 }
426
427 pub fn update_health(&mut self) {
429 if !self.slot_map.is_complete() {
431 self.state = ClusterHealth::Fail;
432 return;
433 }
434
435 for slot in 0..crate::slots::SLOT_COUNT {
437 if let Some(owner_id) = self.slot_map.owner(slot) {
438 if let Some(node) = self.nodes.get(&owner_id) {
439 if !node.is_healthy() {
440 self.state = ClusterHealth::Fail;
441 return;
442 }
443 } else {
444 self.state = ClusterHealth::Fail;
446 return;
447 }
448 }
449 }
450
451 self.state = ClusterHealth::Ok;
452 }
453
454 pub fn promote_replica(&mut self, replica_id: NodeId) -> Result<(), String> {
464 let primary_id = {
466 let replica = self
467 .nodes
468 .get(&replica_id)
469 .ok_or_else(|| format!("node {replica_id} not found in cluster state"))?;
470 if replica.role != NodeRole::Replica {
471 return Err(format!("node {replica_id} is not a replica"));
472 }
473 replica
474 .replicates
475 .ok_or_else(|| format!("replica {replica_id} has no primary configured"))?
476 };
477
478 for slot in 0..SLOT_COUNT {
480 if self.slot_map.owner(slot) == Some(primary_id) {
481 self.slot_map.assign(slot, replica_id);
482 }
483 }
484 let new_primary_slots = self.slot_map.slots_for_node(replica_id);
485
486 self.config_epoch += 1;
488 let new_epoch = self.config_epoch;
489
490 if let Some(old_primary) = self.nodes.get_mut(&primary_id) {
492 old_primary.role = NodeRole::Replica;
493 old_primary.replicates = Some(replica_id);
494 old_primary.replicas.retain(|&id| id != replica_id);
495 old_primary.slots.clear();
496 old_primary.config_epoch = new_epoch;
497 }
498
499 if let Some(new_primary) = self.nodes.get_mut(&replica_id) {
501 new_primary.role = NodeRole::Primary;
502 new_primary.replicates = None;
503 if !new_primary.replicas.contains(&primary_id) {
504 new_primary.replicas.push(primary_id);
505 }
506 new_primary.slots = new_primary_slots;
507 new_primary.config_epoch = new_epoch;
508 }
509
510 self.update_health();
511 Ok(())
512 }
513
514 pub fn cluster_info(&self) -> String {
516 let assigned_slots = (SLOT_COUNT as usize - self.slot_map.unassigned_count()) as u16;
517 let primaries_count = self.primaries().count();
518
519 format!(
520 "cluster_state:{}\r\n\
521 cluster_slots_assigned:{}\r\n\
522 cluster_slots_ok:{}\r\n\
523 cluster_slots_pfail:0\r\n\
524 cluster_slots_fail:0\r\n\
525 cluster_known_nodes:{}\r\n\
526 cluster_size:{}\r\n\
527 cluster_current_epoch:{}\r\n\
528 cluster_my_epoch:{}\r\n",
529 self.state,
530 assigned_slots,
531 if self.state == ClusterHealth::Ok {
532 assigned_slots
533 } else {
534 0
535 },
536 self.nodes.len(),
537 primaries_count,
538 self.config_epoch,
539 self.local_node().map(|n| n.config_epoch).unwrap_or(0),
540 )
541 }
542
543 pub fn cluster_nodes(&self) -> String {
545 let mut lines: Vec<String> = self
546 .nodes
547 .values()
548 .map(|node| node.to_cluster_nodes_line(&self.slot_map))
549 .collect();
550 lines.sort(); lines.join("\n")
552 }
553
554 pub fn moved_redirect(&self, slot: u16) -> Result<(u16, SocketAddr), ClusterError> {
556 let node = self
557 .slot_owner(slot)
558 .ok_or(ClusterError::SlotNotAssigned(slot))?;
559 Ok((slot, node.addr))
560 }
561
562 pub fn to_nodes_conf(&self, incarnation: u64) -> String {
568 let mut out = String::new();
569 out.push_str("# ember cluster config — do not edit\n");
570 out.push_str(&format!(
571 "vars currentEpoch {} lastIncarnation {}\n",
572 self.config_epoch, incarnation
573 ));
574
575 let mut nodes: Vec<&ClusterNode> = self.nodes.values().collect();
577 nodes.sort_by_key(|n| n.id.0);
578
579 for node in nodes {
580 out.push_str(&node.to_cluster_nodes_line(&self.slot_map));
581 out.push('\n');
582 }
583
584 out
585 }
586
587 pub fn from_nodes_conf(data: &str) -> Result<(Self, u64), ConfigParseError> {
593 let mut config_epoch = 0u64;
594 let mut incarnation = 0u64;
595 let mut found_vars = false;
596 let mut local_id = None;
597 let mut nodes = HashMap::new();
598 let mut slot_map = SlotMap::new();
599
600 for line in data.lines() {
601 let line = line.trim();
602
603 if line.is_empty() || line.starts_with('#') {
605 continue;
606 }
607
608 if line.starts_with("vars ") {
610 let parts: Vec<&str> = line.split_whitespace().collect();
611 if parts.len() < 5 {
613 return Err(ConfigParseError::InvalidVarsLine(line.to_string()));
614 }
615 config_epoch = parts[2]
616 .parse()
617 .map_err(|_| ConfigParseError::InvalidVarsLine(line.to_string()))?;
618 incarnation = parts[4]
619 .parse()
620 .map_err(|_| ConfigParseError::InvalidVarsLine(line.to_string()))?;
621 found_vars = true;
622 continue;
623 }
624
625 let parts: Vec<&str> = line.split_whitespace().collect();
628 if parts.len() < 8 {
629 return Err(ConfigParseError::InvalidNodeLine(line.to_string()));
630 }
631
632 let node_id = NodeId::parse(parts[0])
633 .map_err(|_| ConfigParseError::InvalidNodeId(parts[0].to_string()))?;
634
635 let addr_str = parts[1];
637 let (client_addr, bus_port) = parse_addr_field(addr_str)?;
638
639 let flags_str = parts[2];
640 let is_myself = flags_str.contains("myself");
641 let is_replica = flags_str.contains("slave");
642
643 let replicates = if parts[3] == "-" {
644 None
645 } else {
646 Some(
647 NodeId::parse(parts[3])
648 .map_err(|_| ConfigParseError::InvalidNodeId(parts[3].to_string()))?,
649 )
650 };
651
652 let node_config_epoch: u64 = parts[6]
653 .parse()
654 .map_err(|_| ConfigParseError::InvalidNodeLine(line.to_string()))?;
655
656 let mut slot_ranges = Vec::new();
658 for part in &parts[8..] {
659 let range = parse_slot_range(part)?;
660 slot_ranges.push(range);
661 }
662
663 for range in &slot_ranges {
665 for slot in range.iter() {
666 slot_map.assign(slot, node_id);
667 }
668 }
669
670 let bus_addr = SocketAddr::new(client_addr.ip(), bus_port);
671 let role = if is_replica {
672 NodeRole::Replica
673 } else {
674 NodeRole::Primary
675 };
676
677 let node = ClusterNode {
678 id: node_id,
679 addr: client_addr,
680 cluster_bus_addr: bus_addr,
681 role,
682 slots: slot_ranges,
683 replicates,
684 replicas: Vec::new(),
685 last_seen: Instant::now(),
686 last_ping_sent: None,
687 last_pong_received: None,
688 flags: NodeFlags {
689 myself: is_myself,
690 ..NodeFlags::default()
691 },
692 config_epoch: node_config_epoch,
693 };
694
695 if is_myself {
696 local_id = Some(node_id);
697 }
698
699 nodes.insert(node_id, node);
700 }
701
702 if !found_vars {
703 return Err(ConfigParseError::MissingVarsLine);
704 }
705
706 let local_id = local_id.ok_or(ConfigParseError::NoMyselfNode)?;
707
708 let replica_links: Vec<(NodeId, NodeId)> = nodes
710 .values()
711 .filter_map(|n| n.replicates.map(|primary| (primary, n.id)))
712 .collect();
713
714 for (primary_id, replica_id) in replica_links {
715 if let Some(primary) = nodes.get_mut(&primary_id) {
716 primary.replicas.push(replica_id);
717 }
718 }
719
720 let mut state = ClusterState {
721 nodes,
722 local_id,
723 config_epoch,
724 slot_map,
725 state: ClusterHealth::Unknown,
726 };
727 state.update_health();
728
729 Ok((state, incarnation))
730 }
731}
732
733fn parse_addr_field(s: &str) -> Result<(SocketAddr, u16), ConfigParseError> {
735 let at_pos = s
737 .find('@')
738 .ok_or_else(|| ConfigParseError::InvalidAddress(s.to_string()))?;
739
740 let client_part = &s[..at_pos];
741 let bus_part = &s[at_pos + 1..];
742
743 let client_addr: SocketAddr = client_part
744 .parse()
745 .map_err(|_| ConfigParseError::InvalidAddress(s.to_string()))?;
746
747 let bus_port: u16 = bus_part
748 .parse()
749 .map_err(|_| ConfigParseError::InvalidAddress(s.to_string()))?;
750
751 Ok((client_addr, bus_port))
752}
753
754fn parse_slot_range(s: &str) -> Result<SlotRange, ConfigParseError> {
756 if let Some((start_str, end_str)) = s.split_once('-') {
757 let start: u16 = start_str
758 .parse()
759 .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))?;
760 let end: u16 = end_str
761 .parse()
762 .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))?;
763 SlotRange::try_new(start, end)
764 .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))
765 } else {
766 let slot: u16 = s
767 .parse()
768 .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))?;
769 SlotRange::try_new(slot, slot)
770 .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))
771 }
772}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777 use std::net::{IpAddr, Ipv4Addr};
778
779 fn test_addr(port: u16) -> SocketAddr {
780 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
781 }
782
783 #[test]
784 fn node_id_display() {
785 let id = NodeId::new();
786 let display = id.to_string();
787 assert_eq!(display.len(), 8);
788 }
789
790 #[test]
791 fn node_id_parse() {
792 let id = NodeId::new();
793 let parsed = NodeId::parse(&id.0.to_string()).unwrap();
794 assert_eq!(id, parsed);
795 }
796
797 #[test]
798 fn node_flags_display() {
799 let mut flags = NodeFlags::default();
800 assert_eq!(flags.to_string(), "-");
801
802 flags.myself = true;
803 assert_eq!(flags.to_string(), "myself");
804
805 flags.pfail = true;
806 assert_eq!(flags.to_string(), "myself,pfail");
807 }
808
809 #[test]
810 fn cluster_node_primary() {
811 let id = NodeId::new();
812 let node = ClusterNode::new_primary(id, test_addr(6379));
813
814 assert_eq!(node.id, id);
815 assert_eq!(node.role, NodeRole::Primary);
816 assert_eq!(node.addr.port(), 6379);
817 assert_eq!(node.cluster_bus_addr.port(), 16379);
818 assert!(node.replicates.is_none());
819 assert!(node.is_healthy());
820 }
821
822 #[test]
823 fn cluster_node_replica() {
824 let primary_id = NodeId::new();
825 let replica_id = NodeId::new();
826 let node = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
827
828 assert_eq!(node.id, replica_id);
829 assert_eq!(node.role, NodeRole::Replica);
830 assert_eq!(node.replicates, Some(primary_id));
831 }
832
833 #[test]
834 fn cluster_state_single_node() {
835 let id = NodeId::new();
836 let mut node = ClusterNode::new_primary(id, test_addr(6379));
837 node.set_myself();
838
839 let state = ClusterState::single_node(node);
840
841 assert_eq!(state.local_id, id);
842 assert!(state.owns_slot(0));
843 assert!(state.owns_slot(16383));
844 assert_eq!(state.state, ClusterHealth::Ok);
845 }
846
847 #[test]
848 fn cluster_state_slot_owner() {
849 let id = NodeId::new();
850 let mut node = ClusterNode::new_primary(id, test_addr(6379));
851 node.set_myself();
852
853 let state = ClusterState::single_node(node);
854
855 let owner = state.slot_owner(100).unwrap();
856 assert_eq!(owner.id, id);
857 }
858
859 #[test]
860 fn cluster_state_health_check() {
861 let id = NodeId::new();
862 let mut node = ClusterNode::new_primary(id, test_addr(6379));
863 node.set_myself();
864
865 let mut state = ClusterState::single_node(node);
866 state.update_health();
867 assert_eq!(state.state, ClusterHealth::Ok);
868
869 state.slot_map.unassign(0);
871 state.update_health();
872 assert_eq!(state.state, ClusterHealth::Fail);
873 }
874
875 #[test]
876 fn cluster_info_format() {
877 let id = NodeId::new();
878 let mut node = ClusterNode::new_primary(id, test_addr(6379));
879 node.set_myself();
880
881 let state = ClusterState::single_node(node);
882 let info = state.cluster_info();
883
884 assert!(info.contains("cluster_state:ok"));
885 assert!(info.contains("cluster_slots_assigned:16384"));
886 assert!(info.contains("cluster_known_nodes:1"));
887 }
888
889 #[test]
890 fn moved_redirect() {
891 let id = NodeId::new();
892 let mut node = ClusterNode::new_primary(id, test_addr(6379));
893 node.set_myself();
894
895 let state = ClusterState::single_node(node);
896
897 let (slot, addr) = state.moved_redirect(100).unwrap();
898 assert_eq!(slot, 100);
899 assert_eq!(addr.port(), 6379);
900 }
901
902 #[test]
903 fn nodes_conf_roundtrip_single_node() {
904 let id = NodeId::new();
905 let mut node = ClusterNode::new_primary(id, test_addr(6379));
906 node.set_myself();
907
908 let state = ClusterState::single_node(node);
909 let conf = state.to_nodes_conf(42);
910
911 let (restored, incarnation) = ClusterState::from_nodes_conf(&conf).unwrap();
912 assert_eq!(incarnation, 42);
913 assert_eq!(restored.local_id, id);
914 assert_eq!(restored.config_epoch, 1);
915 assert!(restored.owns_slot(0));
916 assert!(restored.owns_slot(16383));
917 assert_eq!(restored.nodes.len(), 1);
918 assert!(restored.nodes[&id].flags.myself);
919 }
920
921 #[test]
922 fn nodes_conf_roundtrip_multi_node() {
923 let id1 = NodeId::new();
924 let id2 = NodeId::new();
925
926 let mut node1 = ClusterNode::new_primary(id1, test_addr(6379));
927 node1.set_myself();
928
929 let mut state = ClusterState::new(id1);
930 state.add_node(node1);
931
932 let node2 = ClusterNode::new_primary(id2, test_addr(6380));
933 state.add_node(node2);
934
935 for slot in 0..8192 {
937 state.slot_map.assign(slot, id1);
938 }
939 for slot in 8192..16384 {
940 state.slot_map.assign(slot, id2);
941 }
942 if let Some(n) = state.nodes.get_mut(&id1) {
943 n.slots = state.slot_map.slots_for_node(id1);
944 }
945 if let Some(n) = state.nodes.get_mut(&id2) {
946 n.slots = state.slot_map.slots_for_node(id2);
947 }
948 state.config_epoch = 5;
949 state.update_health();
950
951 let conf = state.to_nodes_conf(10);
952 let (restored, incarnation) = ClusterState::from_nodes_conf(&conf).unwrap();
953
954 assert_eq!(incarnation, 10);
955 assert_eq!(restored.config_epoch, 5);
956 assert_eq!(restored.local_id, id1);
957 assert_eq!(restored.nodes.len(), 2);
958 assert_eq!(restored.slot_map.owner(0), Some(id1));
959 assert_eq!(restored.slot_map.owner(8192), Some(id2));
960 assert_eq!(restored.slot_map.owner(16383), Some(id2));
961 assert_eq!(restored.state, ClusterHealth::Ok);
962 }
963
964 #[test]
965 fn nodes_conf_roundtrip_with_replica() {
966 let primary_id = NodeId::new();
967 let replica_id = NodeId::new();
968
969 let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
970 primary.set_myself();
971
972 let mut state = ClusterState::single_node(primary);
973
974 let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
975 state.add_node(replica);
976
977 if let Some(p) = state.nodes.get_mut(&primary_id) {
979 p.replicas.push(replica_id);
980 }
981
982 let conf = state.to_nodes_conf(1);
983 let (restored, _) = ClusterState::from_nodes_conf(&conf).unwrap();
984
985 assert_eq!(restored.nodes.len(), 2);
986 let restored_replica = &restored.nodes[&replica_id];
987 assert_eq!(restored_replica.role, NodeRole::Replica);
988 assert_eq!(restored_replica.replicates, Some(primary_id));
989
990 let restored_primary = &restored.nodes[&primary_id];
992 assert!(restored_primary.replicas.contains(&replica_id));
993 }
994
995 #[test]
996 fn nodes_conf_parse_errors() {
997 let result = ClusterState::from_nodes_conf("# comment\n");
999 assert!(result.is_err());
1000
1001 let result = ClusterState::from_nodes_conf(
1003 "vars currentEpoch 0 lastIncarnation 0\n\
1004 00000000-0000-0000-0000-000000000001 127.0.0.1:6379@16379 master - 0 0 0 connected\n",
1005 );
1006 assert!(matches!(
1007 result.unwrap_err(),
1008 ConfigParseError::NoMyselfNode
1009 ));
1010
1011 let result = ClusterState::from_nodes_conf("vars bad\n");
1013 assert!(result.is_err());
1014 }
1015
1016 #[test]
1017 fn primaries_and_replicas() {
1018 let primary_id = NodeId::new();
1019 let replica_id = NodeId::new();
1020
1021 let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
1022 primary.set_myself();
1023
1024 let mut state = ClusterState::single_node(primary);
1025
1026 let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
1027 state.add_node(replica);
1028
1029 assert_eq!(state.primaries().count(), 1);
1030 assert_eq!(state.replicas().count(), 1);
1031 assert_eq!(state.replicas_of(primary_id).count(), 1);
1032 }
1033
1034 #[test]
1035 fn promote_replica_transfers_slots() {
1036 let primary_id = NodeId::new();
1037 let replica_id = NodeId::new();
1038
1039 let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
1040 primary.set_myself();
1041
1042 let mut state = ClusterState::single_node(primary);
1043 let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
1044 state.add_node(replica);
1045
1046 state
1048 .nodes
1049 .get_mut(&primary_id)
1050 .unwrap()
1051 .replicas
1052 .push(replica_id);
1053
1054 let initial_epoch = state.config_epoch;
1055 state.promote_replica(replica_id).unwrap();
1056
1057 assert_eq!(state.config_epoch, initial_epoch + 1);
1059
1060 let new_primary = state.nodes.get(&replica_id).unwrap();
1062 assert_eq!(new_primary.role, NodeRole::Primary);
1063 assert_eq!(new_primary.replicates, None);
1064 assert!(new_primary.replicas.contains(&primary_id));
1065 assert!(!new_primary.slots.is_empty());
1066
1067 let old_primary = state.nodes.get(&primary_id).unwrap();
1069 assert_eq!(old_primary.role, NodeRole::Replica);
1070 assert_eq!(old_primary.replicates, Some(replica_id));
1071 assert!(old_primary.slots.is_empty());
1072
1073 for slot in 0..SLOT_COUNT {
1075 assert_eq!(state.slot_map.owner(slot), Some(replica_id));
1076 }
1077
1078 assert_eq!(state.state, ClusterHealth::Ok);
1080 }
1081
1082 #[test]
1083 fn promote_replica_rejects_non_replica() {
1084 let id = NodeId::new();
1085 let mut node = ClusterNode::new_primary(id, test_addr(6379));
1086 node.set_myself();
1087 let mut state = ClusterState::single_node(node);
1088
1089 let err = state.promote_replica(id).unwrap_err();
1090 assert!(err.contains("not a replica"), "unexpected error: {err}");
1091 }
1092
1093 #[test]
1094 fn promote_replica_rejects_unknown_node() {
1095 let id = NodeId::new();
1096 let mut node = ClusterNode::new_primary(id, test_addr(6379));
1097 node.set_myself();
1098 let mut state = ClusterState::single_node(node);
1099
1100 let missing = NodeId::new();
1101 let err = state.promote_replica(missing).unwrap_err();
1102 assert!(err.contains("not found"), "unexpected error: {err}");
1103 }
1104}