Skip to main content

ember_cluster/
topology.rs

1//! Cluster topology management.
2//!
3//! Defines the structure of a cluster: nodes, their roles, health states,
4//! and the overall cluster configuration.
5
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::time::Instant;
9
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13use crate::slots::{SlotMap, SlotRange, SLOT_COUNT};
14use crate::ClusterError;
15
16/// Error returned when parsing a `nodes.conf` file fails.
17#[derive(Debug, thiserror::Error)]
18pub enum ConfigParseError {
19    #[error("missing vars header line")]
20    MissingVarsLine,
21    #[error("invalid vars line: {0}")]
22    InvalidVarsLine(String),
23    #[error("no node with 'myself' flag found")]
24    NoMyselfNode,
25    #[error("invalid node line: {0}")]
26    InvalidNodeLine(String),
27    #[error("invalid node id: {0}")]
28    InvalidNodeId(String),
29    #[error("invalid address: {0}")]
30    InvalidAddress(String),
31    #[error("invalid slot range: {0}")]
32    InvalidSlotRange(String),
33}
34
35/// Unique identifier for a cluster node.
36///
37/// Wraps a UUID v4 for guaranteed uniqueness across the cluster.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
39pub struct NodeId(pub Uuid);
40
41impl NodeId {
42    /// Generates a new random node ID.
43    pub fn new() -> Self {
44        Self(Uuid::new_v4())
45    }
46
47    /// Creates a node ID from a UUID string.
48    pub fn parse(s: &str) -> Result<Self, uuid::Error> {
49        Ok(Self(Uuid::parse_str(s)?))
50    }
51
52    /// Returns the full UUID string for use as a map key.
53    ///
54    /// Note: `Display` for `NodeId` shows only the first 8 characters (for
55    /// readability in logs). This method returns the full UUID needed when
56    /// `NodeId` is used as a key in `BTreeMap<String, ...>` storage.
57    pub(crate) fn as_key(&self) -> String {
58        self.0.to_string()
59    }
60}
61
62impl Default for NodeId {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl std::fmt::Display for NodeId {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        // Show first 8 chars for readability (similar to git short hashes)
71        write!(f, "{}", &self.0.to_string()[..8])
72    }
73}
74
75/// The role of a node in the cluster.
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum NodeRole {
78    /// Primary node that owns slots and accepts writes.
79    Primary,
80    /// Replica node that mirrors a primary's data.
81    Replica,
82}
83
84impl std::fmt::Display for NodeRole {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self {
87            NodeRole::Primary => write!(f, "primary"),
88            NodeRole::Replica => write!(f, "replica"),
89        }
90    }
91}
92
93/// Status flags for a node.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
95pub struct NodeFlags {
96    /// Node is the local node (myself).
97    pub myself: bool,
98    /// Node is suspected to be failing.
99    pub pfail: bool,
100    /// Node has been confirmed as failed by the cluster.
101    pub fail: bool,
102    /// Node is performing a handshake (not yet part of cluster).
103    pub handshake: bool,
104    /// Node has no address yet.
105    pub noaddr: bool,
106}
107
108impl NodeFlags {
109    /// Returns true if the node is considered healthy.
110    pub fn is_healthy(&self) -> bool {
111        !self.fail && !self.pfail
112    }
113}
114
115impl std::fmt::Display for NodeFlags {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        let mut flags = Vec::new();
118        if self.myself {
119            flags.push("myself");
120        }
121        if self.pfail {
122            flags.push("pfail");
123        }
124        if self.fail {
125            flags.push("fail");
126        }
127        if self.handshake {
128            flags.push("handshake");
129        }
130        if self.noaddr {
131            flags.push("noaddr");
132        }
133        if flags.is_empty() {
134            write!(f, "-")
135        } else {
136            write!(f, "{}", flags.join(","))
137        }
138    }
139}
140
141/// Information about a single node in the cluster.
142#[derive(Debug, Clone)]
143pub struct ClusterNode {
144    /// Unique node identifier.
145    pub id: NodeId,
146    /// Address for client connections.
147    pub addr: SocketAddr,
148    /// Address for cluster bus (gossip) connections.
149    /// Typically addr.port + 10000.
150    pub cluster_bus_addr: SocketAddr,
151    /// Node's role in the cluster.
152    pub role: NodeRole,
153    /// Slot ranges assigned to this node (only for primaries).
154    pub slots: Vec<SlotRange>,
155    /// If this is a replica, the ID of its primary.
156    pub replicates: Option<NodeId>,
157    /// IDs of nodes replicating this one (if primary).
158    pub replicas: Vec<NodeId>,
159    /// Last time we received a message from this node.
160    pub last_seen: Instant,
161    /// Last time we sent a ping to this node.
162    pub last_ping_sent: Option<Instant>,
163    /// Last time we received a pong from this node.
164    pub last_pong_received: Option<Instant>,
165    /// Status flags.
166    pub flags: NodeFlags,
167    /// Configuration epoch (used for conflict resolution).
168    pub config_epoch: u64,
169}
170
171impl ClusterNode {
172    /// Creates a new primary node with the default bus port offset (10000).
173    pub fn new_primary(id: NodeId, addr: SocketAddr) -> Self {
174        Self::new_primary_with_offset(id, addr, 10000)
175    }
176
177    /// Creates a new primary node with a custom bus port offset.
178    pub fn new_primary_with_offset(id: NodeId, addr: SocketAddr, bus_port_offset: u16) -> Self {
179        let cluster_bus_addr =
180            SocketAddr::new(addr.ip(), addr.port().saturating_add(bus_port_offset));
181        Self {
182            id,
183            addr,
184            cluster_bus_addr,
185            role: NodeRole::Primary,
186            slots: Vec::new(),
187            replicates: None,
188            replicas: Vec::new(),
189            last_seen: Instant::now(),
190            last_ping_sent: None,
191            last_pong_received: None,
192            flags: NodeFlags::default(),
193            config_epoch: 0,
194        }
195    }
196
197    /// Creates a new replica node.
198    pub fn new_replica(id: NodeId, addr: SocketAddr, primary_id: NodeId) -> Self {
199        let cluster_bus_addr = SocketAddr::new(addr.ip(), addr.port().saturating_add(10000));
200        Self {
201            id,
202            addr,
203            cluster_bus_addr,
204            role: NodeRole::Replica,
205            slots: Vec::new(),
206            replicates: Some(primary_id),
207            replicas: Vec::new(),
208            last_seen: Instant::now(),
209            last_ping_sent: None,
210            last_pong_received: None,
211            flags: NodeFlags::default(),
212            config_epoch: 0,
213        }
214    }
215
216    /// Marks this node as the local node.
217    pub fn set_myself(&mut self) {
218        self.flags.myself = true;
219    }
220
221    /// Returns true if this node is healthy and can serve requests.
222    pub fn is_healthy(&self) -> bool {
223        self.flags.is_healthy()
224    }
225
226    /// Returns the total number of slots owned by this node.
227    pub fn slot_count(&self) -> u16 {
228        self.slots.iter().map(|r| r.len()).sum()
229    }
230
231    /// Formats the node in CLUSTER NODES output format.
232    pub fn to_cluster_nodes_line(&self, slot_map: &SlotMap) -> String {
233        let slots_str = if self.role == NodeRole::Primary {
234            let ranges = slot_map.slots_for_node(self.id);
235            if ranges.is_empty() {
236                String::new()
237            } else {
238                ranges
239                    .iter()
240                    .map(|r| r.to_string())
241                    .collect::<Vec<_>>()
242                    .join(" ")
243            }
244        } else {
245            String::new()
246        };
247
248        let replicates_str = self
249            .replicates
250            .map(|id| id.0.to_string())
251            .unwrap_or_else(|| "-".to_string());
252
253        let ping_ms = self
254            .last_ping_sent
255            .map(|t| t.elapsed().as_millis() as u64)
256            .unwrap_or(0);
257        let pong_ms = self
258            .last_pong_received
259            .map(|t| t.elapsed().as_millis() as u64)
260            .unwrap_or(0);
261
262        // Format: <id> <addr>@<bus-port> <flags> <master-id> <ping-sent> <pong-recv> <config-epoch> <link-state> <slots>
263        format!(
264            "{} {}@{} {} {} {} {} {} connected {}",
265            self.id.0,
266            self.addr,
267            self.cluster_bus_addr.port(),
268            self.format_flags(),
269            replicates_str,
270            ping_ms,
271            pong_ms,
272            self.config_epoch,
273            slots_str
274        )
275        .trim()
276        .to_string()
277    }
278
279    /// Formats the node flags for the CLUSTER NODES wire response.
280    ///
281    /// Intentionally diverges from `NodeFlags::Display` in two ways:
282    /// 1. Role (`master`/`slave`) is included here but is not a flag field.
283    /// 2. `pfail` renders as `fail?` per the Redis cluster protocol spec,
284    ///    whereas `NodeFlags::Display` uses the field name `pfail`.
285    fn format_flags(&self) -> String {
286        let mut flags = Vec::new();
287
288        if self.flags.myself {
289            flags.push("myself");
290        }
291
292        match self.role {
293            NodeRole::Primary => flags.push("master"),
294            NodeRole::Replica => flags.push("slave"),
295        }
296
297        if self.flags.fail {
298            flags.push("fail");
299        } else if self.flags.pfail {
300            flags.push("fail?");
301        }
302
303        if self.flags.handshake {
304            flags.push("handshake");
305        }
306
307        if self.flags.noaddr {
308            flags.push("noaddr");
309        }
310
311        flags.join(",")
312    }
313}
314
315/// The complete state of the cluster as seen by a node.
316#[derive(Debug)]
317pub struct ClusterState {
318    /// All known nodes in the cluster, indexed by ID.
319    pub nodes: HashMap<NodeId, ClusterNode>,
320    /// This node's ID.
321    pub local_id: NodeId,
322    /// Current configuration epoch (increases on topology changes).
323    pub config_epoch: u64,
324    /// Slot-to-node mapping.
325    pub slot_map: SlotMap,
326    /// Cluster state: ok, fail, or unknown.
327    pub state: ClusterHealth,
328}
329
330/// Overall cluster health status.
331#[derive(Debug, Clone, Copy, PartialEq, Eq)]
332pub enum ClusterHealth {
333    /// Cluster is operational and all slots are covered.
334    Ok,
335    /// Cluster has failed nodes or uncovered slots.
336    Fail,
337    /// Cluster state is being computed.
338    Unknown,
339}
340
341impl std::fmt::Display for ClusterHealth {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        match self {
344            ClusterHealth::Ok => write!(f, "ok"),
345            ClusterHealth::Fail => write!(f, "fail"),
346            ClusterHealth::Unknown => write!(f, "unknown"),
347        }
348    }
349}
350
351impl ClusterState {
352    /// Creates a new cluster state for a single-node cluster.
353    pub fn single_node(local_node: ClusterNode) -> Self {
354        let local_id = local_node.id;
355        let slot_map = SlotMap::single_node(local_id);
356        let mut nodes = HashMap::new();
357        nodes.insert(local_id, local_node);
358
359        Self {
360            nodes,
361            local_id,
362            config_epoch: 1,
363            slot_map,
364            state: ClusterHealth::Ok,
365        }
366    }
367
368    /// Creates a new empty cluster state (for joining an existing cluster).
369    pub fn new(local_id: NodeId) -> Self {
370        Self {
371            nodes: HashMap::new(),
372            local_id,
373            config_epoch: 0,
374            slot_map: SlotMap::new(),
375            state: ClusterHealth::Unknown,
376        }
377    }
378
379    /// Returns the local node.
380    pub fn local_node(&self) -> Option<&ClusterNode> {
381        self.nodes.get(&self.local_id)
382    }
383
384    /// Returns a mutable reference to the local node.
385    pub fn local_node_mut(&mut self) -> Option<&mut ClusterNode> {
386        self.nodes.get_mut(&self.local_id)
387    }
388
389    /// Adds a node to the cluster.
390    pub fn add_node(&mut self, node: ClusterNode) {
391        self.nodes.insert(node.id, node);
392    }
393
394    /// Removes a node from the cluster.
395    pub fn remove_node(&mut self, node_id: NodeId) -> Option<ClusterNode> {
396        self.nodes.remove(&node_id)
397    }
398
399    /// Returns the node that owns the given slot.
400    pub fn slot_owner(&self, slot: u16) -> Option<&ClusterNode> {
401        let node_id = self.slot_map.owner(slot)?;
402        self.nodes.get(&node_id)
403    }
404
405    /// Returns true if the local node owns the given slot.
406    pub fn owns_slot(&self, slot: u16) -> bool {
407        self.slot_map.owner(slot) == Some(self.local_id)
408    }
409
410    /// Returns all primary nodes.
411    pub fn primaries(&self) -> impl Iterator<Item = &ClusterNode> {
412        self.nodes.values().filter(|n| n.role == NodeRole::Primary)
413    }
414
415    /// Returns all replica nodes.
416    pub fn replicas(&self) -> impl Iterator<Item = &ClusterNode> {
417        self.nodes.values().filter(|n| n.role == NodeRole::Replica)
418    }
419
420    /// Returns replicas of a specific primary.
421    pub fn replicas_of(&self, primary_id: NodeId) -> impl Iterator<Item = &ClusterNode> {
422        self.nodes
423            .values()
424            .filter(move |n| n.replicates == Some(primary_id))
425    }
426
427    /// Computes and updates the cluster health state.
428    pub fn update_health(&mut self) {
429        // Check if all slots are covered by healthy primaries
430        if !self.slot_map.is_complete() {
431            self.state = ClusterHealth::Fail;
432            return;
433        }
434
435        // Check if any slot's owner is unhealthy
436        for slot in 0..crate::slots::SLOT_COUNT {
437            if let Some(owner_id) = self.slot_map.owner(slot) {
438                if let Some(node) = self.nodes.get(&owner_id) {
439                    if !node.is_healthy() {
440                        self.state = ClusterHealth::Fail;
441                        return;
442                    }
443                } else {
444                    // Owner node not found
445                    self.state = ClusterHealth::Fail;
446                    return;
447                }
448            }
449        }
450
451        self.state = ClusterHealth::Ok;
452    }
453
454    /// Promotes a replica to primary, transferring slots from its current primary.
455    ///
456    /// Performs the full state transition for a failover:
457    /// - Transfers all slots from the old primary to the promoted replica.
458    /// - Demotes the old primary to a replica of the new primary.
459    /// - Updates replica lists on both nodes.
460    /// - Bumps the global config epoch.
461    ///
462    /// Returns an error if the target is not a replica with a configured primary.
463    pub fn promote_replica(&mut self, replica_id: NodeId) -> Result<(), String> {
464        // locate the replica and find its current primary
465        let primary_id = {
466            let replica = self
467                .nodes
468                .get(&replica_id)
469                .ok_or_else(|| format!("node {replica_id} not found in cluster state"))?;
470            if replica.role != NodeRole::Replica {
471                return Err(format!("node {replica_id} is not a replica"));
472            }
473            replica
474                .replicates
475                .ok_or_else(|| format!("replica {replica_id} has no primary configured"))?
476        };
477
478        // transfer every slot the old primary owned to the promoted replica
479        for slot in 0..SLOT_COUNT {
480            if self.slot_map.owner(slot) == Some(primary_id) {
481                self.slot_map.assign(slot, replica_id);
482            }
483        }
484        let new_primary_slots = self.slot_map.slots_for_node(replica_id);
485
486        // bump epoch before touching node state so both nodes land on the same epoch
487        self.config_epoch += 1;
488        let new_epoch = self.config_epoch;
489
490        // demote the old primary: clear its slots and make it a replica of the winner
491        if let Some(old_primary) = self.nodes.get_mut(&primary_id) {
492            old_primary.role = NodeRole::Replica;
493            old_primary.replicates = Some(replica_id);
494            old_primary.replicas.retain(|&id| id != replica_id);
495            old_primary.slots.clear();
496            old_primary.config_epoch = new_epoch;
497        }
498
499        // promote the winner: take ownership of all slots and clear the replicates pointer
500        if let Some(new_primary) = self.nodes.get_mut(&replica_id) {
501            new_primary.role = NodeRole::Primary;
502            new_primary.replicates = None;
503            if !new_primary.replicas.contains(&primary_id) {
504                new_primary.replicas.push(primary_id);
505            }
506            new_primary.slots = new_primary_slots;
507            new_primary.config_epoch = new_epoch;
508        }
509
510        self.update_health();
511        Ok(())
512    }
513
514    /// Generates the response for CLUSTER INFO command.
515    pub fn cluster_info(&self) -> String {
516        let assigned_slots = (SLOT_COUNT as usize - self.slot_map.unassigned_count()) as u16;
517        let primaries_count = self.primaries().count();
518
519        format!(
520            "cluster_state:{}\r\n\
521             cluster_slots_assigned:{}\r\n\
522             cluster_slots_ok:{}\r\n\
523             cluster_slots_pfail:0\r\n\
524             cluster_slots_fail:0\r\n\
525             cluster_known_nodes:{}\r\n\
526             cluster_size:{}\r\n\
527             cluster_current_epoch:{}\r\n\
528             cluster_my_epoch:{}\r\n",
529            self.state,
530            assigned_slots,
531            if self.state == ClusterHealth::Ok {
532                assigned_slots
533            } else {
534                0
535            },
536            self.nodes.len(),
537            primaries_count,
538            self.config_epoch,
539            self.local_node().map(|n| n.config_epoch).unwrap_or(0),
540        )
541    }
542
543    /// Generates the response for CLUSTER NODES command.
544    pub fn cluster_nodes(&self) -> String {
545        let mut lines: Vec<String> = self
546            .nodes
547            .values()
548            .map(|node| node.to_cluster_nodes_line(&self.slot_map))
549            .collect();
550        lines.sort(); // Consistent ordering
551        lines.join("\n")
552    }
553
554    /// Generates MOVED redirect information for a slot.
555    pub fn moved_redirect(&self, slot: u16) -> Result<(u16, SocketAddr), ClusterError> {
556        let node = self
557            .slot_owner(slot)
558            .ok_or(ClusterError::SlotNotAssigned(slot))?;
559        Ok((slot, node.addr))
560    }
561
562    /// Serializes the cluster state to the `nodes.conf` format.
563    ///
564    /// The output consists of a comment header, a `vars` line with the current
565    /// epoch and gossip incarnation, followed by one line per node in the
566    /// standard `CLUSTER NODES` format.
567    pub fn to_nodes_conf(&self, incarnation: u64) -> String {
568        let mut out = String::new();
569        out.push_str("# ember cluster config — do not edit\n");
570        out.push_str(&format!(
571            "vars currentEpoch {} lastIncarnation {}\n",
572            self.config_epoch, incarnation
573        ));
574
575        // sort by node id for deterministic output
576        let mut nodes: Vec<&ClusterNode> = self.nodes.values().collect();
577        nodes.sort_by_key(|n| n.id.0);
578
579        for node in nodes {
580            out.push_str(&node.to_cluster_nodes_line(&self.slot_map));
581            out.push('\n');
582        }
583
584        out
585    }
586
587    /// Parses a `nodes.conf` file and reconstructs the cluster state.
588    ///
589    /// Returns the reconstructed state and the saved gossip incarnation number.
590    /// `Instant` fields on nodes are initialized to `Instant::now()` since
591    /// wall-clock timestamps aren't persisted.
592    pub fn from_nodes_conf(data: &str) -> Result<(Self, u64), ConfigParseError> {
593        let mut config_epoch = 0u64;
594        let mut incarnation = 0u64;
595        let mut found_vars = false;
596        let mut local_id = None;
597        let mut nodes = HashMap::new();
598        let mut slot_map = SlotMap::new();
599
600        for line in data.lines() {
601            let line = line.trim();
602
603            // skip blank lines and comments
604            if line.is_empty() || line.starts_with('#') {
605                continue;
606            }
607
608            // parse the vars header
609            if line.starts_with("vars ") {
610                let parts: Vec<&str> = line.split_whitespace().collect();
611                // vars currentEpoch <n> lastIncarnation <n>
612                if parts.len() < 5 {
613                    return Err(ConfigParseError::InvalidVarsLine(line.to_string()));
614                }
615                config_epoch = parts[2]
616                    .parse()
617                    .map_err(|_| ConfigParseError::InvalidVarsLine(line.to_string()))?;
618                incarnation = parts[4]
619                    .parse()
620                    .map_err(|_| ConfigParseError::InvalidVarsLine(line.to_string()))?;
621                found_vars = true;
622                continue;
623            }
624
625            // parse a node line:
626            // <id> <ip:port@bus-port> <flags> <master-id> <ping-sent> <pong-recv> <config-epoch> <link-state> [slots...]
627            let parts: Vec<&str> = line.split_whitespace().collect();
628            if parts.len() < 8 {
629                return Err(ConfigParseError::InvalidNodeLine(line.to_string()));
630            }
631
632            let node_id = NodeId::parse(parts[0])
633                .map_err(|_| ConfigParseError::InvalidNodeId(parts[0].to_string()))?;
634
635            // parse address: "ip:port@bus-port"
636            let addr_str = parts[1];
637            let (client_addr, bus_port) = parse_addr_field(addr_str)?;
638
639            let flags_str = parts[2];
640            let is_myself = flags_str.contains("myself");
641            let is_replica = flags_str.contains("slave");
642
643            let replicates = if parts[3] == "-" {
644                None
645            } else {
646                Some(
647                    NodeId::parse(parts[3])
648                        .map_err(|_| ConfigParseError::InvalidNodeId(parts[3].to_string()))?,
649                )
650            };
651
652            let node_config_epoch: u64 = parts[6]
653                .parse()
654                .map_err(|_| ConfigParseError::InvalidNodeLine(line.to_string()))?;
655
656            // parse slot ranges (fields 8+)
657            let mut slot_ranges = Vec::new();
658            for part in &parts[8..] {
659                let range = parse_slot_range(part)?;
660                slot_ranges.push(range);
661            }
662
663            // assign slots in the slot map
664            for range in &slot_ranges {
665                for slot in range.iter() {
666                    slot_map.assign(slot, node_id);
667                }
668            }
669
670            let bus_addr = SocketAddr::new(client_addr.ip(), bus_port);
671            let role = if is_replica {
672                NodeRole::Replica
673            } else {
674                NodeRole::Primary
675            };
676
677            let node = ClusterNode {
678                id: node_id,
679                addr: client_addr,
680                cluster_bus_addr: bus_addr,
681                role,
682                slots: slot_ranges,
683                replicates,
684                replicas: Vec::new(),
685                last_seen: Instant::now(),
686                last_ping_sent: None,
687                last_pong_received: None,
688                flags: NodeFlags {
689                    myself: is_myself,
690                    ..NodeFlags::default()
691                },
692                config_epoch: node_config_epoch,
693            };
694
695            if is_myself {
696                local_id = Some(node_id);
697            }
698
699            nodes.insert(node_id, node);
700        }
701
702        if !found_vars {
703            return Err(ConfigParseError::MissingVarsLine);
704        }
705
706        let local_id = local_id.ok_or(ConfigParseError::NoMyselfNode)?;
707
708        // rebuild replica links
709        let replica_links: Vec<(NodeId, NodeId)> = nodes
710            .values()
711            .filter_map(|n| n.replicates.map(|primary| (primary, n.id)))
712            .collect();
713
714        for (primary_id, replica_id) in replica_links {
715            if let Some(primary) = nodes.get_mut(&primary_id) {
716                primary.replicas.push(replica_id);
717            }
718        }
719
720        let mut state = ClusterState {
721            nodes,
722            local_id,
723            config_epoch,
724            slot_map,
725            state: ClusterHealth::Unknown,
726        };
727        state.update_health();
728
729        Ok((state, incarnation))
730    }
731}
732
733/// Parses the `ip:port@bus-port` field from a nodes.conf line.
734fn parse_addr_field(s: &str) -> Result<(SocketAddr, u16), ConfigParseError> {
735    // format: "ip:port@bus-port"
736    let at_pos = s
737        .find('@')
738        .ok_or_else(|| ConfigParseError::InvalidAddress(s.to_string()))?;
739
740    let client_part = &s[..at_pos];
741    let bus_part = &s[at_pos + 1..];
742
743    let client_addr: SocketAddr = client_part
744        .parse()
745        .map_err(|_| ConfigParseError::InvalidAddress(s.to_string()))?;
746
747    let bus_port: u16 = bus_part
748        .parse()
749        .map_err(|_| ConfigParseError::InvalidAddress(s.to_string()))?;
750
751    Ok((client_addr, bus_port))
752}
753
754/// Parses a slot range string like "0-5460" or "100".
755fn parse_slot_range(s: &str) -> Result<SlotRange, ConfigParseError> {
756    if let Some((start_str, end_str)) = s.split_once('-') {
757        let start: u16 = start_str
758            .parse()
759            .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))?;
760        let end: u16 = end_str
761            .parse()
762            .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))?;
763        SlotRange::try_new(start, end)
764            .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))
765    } else {
766        let slot: u16 = s
767            .parse()
768            .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))?;
769        SlotRange::try_new(slot, slot)
770            .map_err(|_| ConfigParseError::InvalidSlotRange(s.to_string()))
771    }
772}
773
774#[cfg(test)]
775mod tests {
776    use super::*;
777    use std::net::{IpAddr, Ipv4Addr};
778
779    fn test_addr(port: u16) -> SocketAddr {
780        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
781    }
782
783    #[test]
784    fn node_id_display() {
785        let id = NodeId::new();
786        let display = id.to_string();
787        assert_eq!(display.len(), 8);
788    }
789
790    #[test]
791    fn node_id_parse() {
792        let id = NodeId::new();
793        let parsed = NodeId::parse(&id.0.to_string()).unwrap();
794        assert_eq!(id, parsed);
795    }
796
797    #[test]
798    fn node_flags_display() {
799        let mut flags = NodeFlags::default();
800        assert_eq!(flags.to_string(), "-");
801
802        flags.myself = true;
803        assert_eq!(flags.to_string(), "myself");
804
805        flags.pfail = true;
806        assert_eq!(flags.to_string(), "myself,pfail");
807    }
808
809    #[test]
810    fn cluster_node_primary() {
811        let id = NodeId::new();
812        let node = ClusterNode::new_primary(id, test_addr(6379));
813
814        assert_eq!(node.id, id);
815        assert_eq!(node.role, NodeRole::Primary);
816        assert_eq!(node.addr.port(), 6379);
817        assert_eq!(node.cluster_bus_addr.port(), 16379);
818        assert!(node.replicates.is_none());
819        assert!(node.is_healthy());
820    }
821
822    #[test]
823    fn cluster_node_replica() {
824        let primary_id = NodeId::new();
825        let replica_id = NodeId::new();
826        let node = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
827
828        assert_eq!(node.id, replica_id);
829        assert_eq!(node.role, NodeRole::Replica);
830        assert_eq!(node.replicates, Some(primary_id));
831    }
832
833    #[test]
834    fn cluster_state_single_node() {
835        let id = NodeId::new();
836        let mut node = ClusterNode::new_primary(id, test_addr(6379));
837        node.set_myself();
838
839        let state = ClusterState::single_node(node);
840
841        assert_eq!(state.local_id, id);
842        assert!(state.owns_slot(0));
843        assert!(state.owns_slot(16383));
844        assert_eq!(state.state, ClusterHealth::Ok);
845    }
846
847    #[test]
848    fn cluster_state_slot_owner() {
849        let id = NodeId::new();
850        let mut node = ClusterNode::new_primary(id, test_addr(6379));
851        node.set_myself();
852
853        let state = ClusterState::single_node(node);
854
855        let owner = state.slot_owner(100).unwrap();
856        assert_eq!(owner.id, id);
857    }
858
859    #[test]
860    fn cluster_state_health_check() {
861        let id = NodeId::new();
862        let mut node = ClusterNode::new_primary(id, test_addr(6379));
863        node.set_myself();
864
865        let mut state = ClusterState::single_node(node);
866        state.update_health();
867        assert_eq!(state.state, ClusterHealth::Ok);
868
869        // Unassign a slot
870        state.slot_map.unassign(0);
871        state.update_health();
872        assert_eq!(state.state, ClusterHealth::Fail);
873    }
874
875    #[test]
876    fn cluster_info_format() {
877        let id = NodeId::new();
878        let mut node = ClusterNode::new_primary(id, test_addr(6379));
879        node.set_myself();
880
881        let state = ClusterState::single_node(node);
882        let info = state.cluster_info();
883
884        assert!(info.contains("cluster_state:ok"));
885        assert!(info.contains("cluster_slots_assigned:16384"));
886        assert!(info.contains("cluster_known_nodes:1"));
887    }
888
889    #[test]
890    fn moved_redirect() {
891        let id = NodeId::new();
892        let mut node = ClusterNode::new_primary(id, test_addr(6379));
893        node.set_myself();
894
895        let state = ClusterState::single_node(node);
896
897        let (slot, addr) = state.moved_redirect(100).unwrap();
898        assert_eq!(slot, 100);
899        assert_eq!(addr.port(), 6379);
900    }
901
902    #[test]
903    fn nodes_conf_roundtrip_single_node() {
904        let id = NodeId::new();
905        let mut node = ClusterNode::new_primary(id, test_addr(6379));
906        node.set_myself();
907
908        let state = ClusterState::single_node(node);
909        let conf = state.to_nodes_conf(42);
910
911        let (restored, incarnation) = ClusterState::from_nodes_conf(&conf).unwrap();
912        assert_eq!(incarnation, 42);
913        assert_eq!(restored.local_id, id);
914        assert_eq!(restored.config_epoch, 1);
915        assert!(restored.owns_slot(0));
916        assert!(restored.owns_slot(16383));
917        assert_eq!(restored.nodes.len(), 1);
918        assert!(restored.nodes[&id].flags.myself);
919    }
920
921    #[test]
922    fn nodes_conf_roundtrip_multi_node() {
923        let id1 = NodeId::new();
924        let id2 = NodeId::new();
925
926        let mut node1 = ClusterNode::new_primary(id1, test_addr(6379));
927        node1.set_myself();
928
929        let mut state = ClusterState::new(id1);
930        state.add_node(node1);
931
932        let node2 = ClusterNode::new_primary(id2, test_addr(6380));
933        state.add_node(node2);
934
935        // assign slots to each
936        for slot in 0..8192 {
937            state.slot_map.assign(slot, id1);
938        }
939        for slot in 8192..16384 {
940            state.slot_map.assign(slot, id2);
941        }
942        if let Some(n) = state.nodes.get_mut(&id1) {
943            n.slots = state.slot_map.slots_for_node(id1);
944        }
945        if let Some(n) = state.nodes.get_mut(&id2) {
946            n.slots = state.slot_map.slots_for_node(id2);
947        }
948        state.config_epoch = 5;
949        state.update_health();
950
951        let conf = state.to_nodes_conf(10);
952        let (restored, incarnation) = ClusterState::from_nodes_conf(&conf).unwrap();
953
954        assert_eq!(incarnation, 10);
955        assert_eq!(restored.config_epoch, 5);
956        assert_eq!(restored.local_id, id1);
957        assert_eq!(restored.nodes.len(), 2);
958        assert_eq!(restored.slot_map.owner(0), Some(id1));
959        assert_eq!(restored.slot_map.owner(8192), Some(id2));
960        assert_eq!(restored.slot_map.owner(16383), Some(id2));
961        assert_eq!(restored.state, ClusterHealth::Ok);
962    }
963
964    #[test]
965    fn nodes_conf_roundtrip_with_replica() {
966        let primary_id = NodeId::new();
967        let replica_id = NodeId::new();
968
969        let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
970        primary.set_myself();
971
972        let mut state = ClusterState::single_node(primary);
973
974        let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
975        state.add_node(replica);
976
977        // add replica to primary's replica list
978        if let Some(p) = state.nodes.get_mut(&primary_id) {
979            p.replicas.push(replica_id);
980        }
981
982        let conf = state.to_nodes_conf(1);
983        let (restored, _) = ClusterState::from_nodes_conf(&conf).unwrap();
984
985        assert_eq!(restored.nodes.len(), 2);
986        let restored_replica = &restored.nodes[&replica_id];
987        assert_eq!(restored_replica.role, NodeRole::Replica);
988        assert_eq!(restored_replica.replicates, Some(primary_id));
989
990        // replica links are rebuilt
991        let restored_primary = &restored.nodes[&primary_id];
992        assert!(restored_primary.replicas.contains(&replica_id));
993    }
994
995    #[test]
996    fn nodes_conf_parse_errors() {
997        // missing vars line
998        let result = ClusterState::from_nodes_conf("# comment\n");
999        assert!(result.is_err());
1000
1001        // no myself flag
1002        let result = ClusterState::from_nodes_conf(
1003            "vars currentEpoch 0 lastIncarnation 0\n\
1004             00000000-0000-0000-0000-000000000001 127.0.0.1:6379@16379 master - 0 0 0 connected\n",
1005        );
1006        assert!(matches!(
1007            result.unwrap_err(),
1008            ConfigParseError::NoMyselfNode
1009        ));
1010
1011        // malformed vars
1012        let result = ClusterState::from_nodes_conf("vars bad\n");
1013        assert!(result.is_err());
1014    }
1015
1016    #[test]
1017    fn primaries_and_replicas() {
1018        let primary_id = NodeId::new();
1019        let replica_id = NodeId::new();
1020
1021        let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
1022        primary.set_myself();
1023
1024        let mut state = ClusterState::single_node(primary);
1025
1026        let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
1027        state.add_node(replica);
1028
1029        assert_eq!(state.primaries().count(), 1);
1030        assert_eq!(state.replicas().count(), 1);
1031        assert_eq!(state.replicas_of(primary_id).count(), 1);
1032    }
1033
1034    #[test]
1035    fn promote_replica_transfers_slots() {
1036        let primary_id = NodeId::new();
1037        let replica_id = NodeId::new();
1038
1039        let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
1040        primary.set_myself();
1041
1042        let mut state = ClusterState::single_node(primary);
1043        let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
1044        state.add_node(replica);
1045
1046        // register the replica in the primary's replica list
1047        state
1048            .nodes
1049            .get_mut(&primary_id)
1050            .unwrap()
1051            .replicas
1052            .push(replica_id);
1053
1054        let initial_epoch = state.config_epoch;
1055        state.promote_replica(replica_id).unwrap();
1056
1057        // epoch should have been bumped
1058        assert_eq!(state.config_epoch, initial_epoch + 1);
1059
1060        // the promoted node is now a primary with all slots
1061        let new_primary = state.nodes.get(&replica_id).unwrap();
1062        assert_eq!(new_primary.role, NodeRole::Primary);
1063        assert_eq!(new_primary.replicates, None);
1064        assert!(new_primary.replicas.contains(&primary_id));
1065        assert!(!new_primary.slots.is_empty());
1066
1067        // the old primary is now a replica
1068        let old_primary = state.nodes.get(&primary_id).unwrap();
1069        assert_eq!(old_primary.role, NodeRole::Replica);
1070        assert_eq!(old_primary.replicates, Some(replica_id));
1071        assert!(old_primary.slots.is_empty());
1072
1073        // slot ownership must be transferred
1074        for slot in 0..SLOT_COUNT {
1075            assert_eq!(state.slot_map.owner(slot), Some(replica_id));
1076        }
1077
1078        // cluster should still be healthy
1079        assert_eq!(state.state, ClusterHealth::Ok);
1080    }
1081
1082    #[test]
1083    fn promote_replica_rejects_non_replica() {
1084        let id = NodeId::new();
1085        let mut node = ClusterNode::new_primary(id, test_addr(6379));
1086        node.set_myself();
1087        let mut state = ClusterState::single_node(node);
1088
1089        let err = state.promote_replica(id).unwrap_err();
1090        assert!(err.contains("not a replica"), "unexpected error: {err}");
1091    }
1092
1093    #[test]
1094    fn promote_replica_rejects_unknown_node() {
1095        let id = NodeId::new();
1096        let mut node = ClusterNode::new_primary(id, test_addr(6379));
1097        node.set_myself();
1098        let mut state = ClusterState::single_node(node);
1099
1100        let missing = NodeId::new();
1101        let err = state.promote_replica(missing).unwrap_err();
1102        assert!(err.contains("not found"), "unexpected error: {err}");
1103    }
1104}