Skip to main content

ember_cluster/
topology.rs

1//! Cluster topology management.
2//!
3//! Defines the structure of a cluster: nodes, their roles, health states,
4//! and the overall cluster configuration.
5
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::time::Instant;
9
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13use crate::slots::{SlotMap, SlotRange, SLOT_COUNT};
14use crate::ClusterError;
15
16/// Unique identifier for a cluster node.
17///
18/// Wraps a UUID v4 for guaranteed uniqueness across the cluster.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct NodeId(pub Uuid);
21
22impl NodeId {
23    /// Generates a new random node ID.
24    pub fn new() -> Self {
25        Self(Uuid::new_v4())
26    }
27
28    /// Creates a node ID from a UUID string.
29    pub fn parse(s: &str) -> Result<Self, uuid::Error> {
30        Ok(Self(Uuid::parse_str(s)?))
31    }
32}
33
34impl Default for NodeId {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl std::fmt::Display for NodeId {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        // Show first 8 chars for readability (similar to git short hashes)
43        write!(f, "{}", &self.0.to_string()[..8])
44    }
45}
46
47/// The role of a node in the cluster.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum NodeRole {
50    /// Primary node that owns slots and accepts writes.
51    Primary,
52    /// Replica node that mirrors a primary's data.
53    Replica,
54}
55
56impl std::fmt::Display for NodeRole {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        match self {
59            NodeRole::Primary => write!(f, "primary"),
60            NodeRole::Replica => write!(f, "replica"),
61        }
62    }
63}
64
65/// Status flags for a node.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
67pub struct NodeFlags {
68    /// Node is the local node (myself).
69    pub myself: bool,
70    /// Node is suspected to be failing.
71    pub pfail: bool,
72    /// Node has been confirmed as failed by the cluster.
73    pub fail: bool,
74    /// Node is performing a handshake (not yet part of cluster).
75    pub handshake: bool,
76    /// Node has no address yet.
77    pub noaddr: bool,
78}
79
80impl NodeFlags {
81    /// Returns true if the node is considered healthy.
82    pub fn is_healthy(&self) -> bool {
83        !self.fail && !self.pfail
84    }
85}
86
87impl std::fmt::Display for NodeFlags {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        let mut flags = Vec::new();
90        if self.myself {
91            flags.push("myself");
92        }
93        if self.pfail {
94            flags.push("pfail");
95        }
96        if self.fail {
97            flags.push("fail");
98        }
99        if self.handshake {
100            flags.push("handshake");
101        }
102        if self.noaddr {
103            flags.push("noaddr");
104        }
105        if flags.is_empty() {
106            write!(f, "-")
107        } else {
108            write!(f, "{}", flags.join(","))
109        }
110    }
111}
112
113/// Information about a single node in the cluster.
114#[derive(Debug, Clone)]
115pub struct ClusterNode {
116    /// Unique node identifier.
117    pub id: NodeId,
118    /// Address for client connections.
119    pub addr: SocketAddr,
120    /// Address for cluster bus (gossip) connections.
121    /// Typically addr.port + 10000.
122    pub cluster_bus_addr: SocketAddr,
123    /// Node's role in the cluster.
124    pub role: NodeRole,
125    /// Slot ranges assigned to this node (only for primaries).
126    pub slots: Vec<SlotRange>,
127    /// If this is a replica, the ID of its primary.
128    pub replicates: Option<NodeId>,
129    /// IDs of nodes replicating this one (if primary).
130    pub replicas: Vec<NodeId>,
131    /// Last time we received a message from this node.
132    pub last_seen: Instant,
133    /// Last time we sent a ping to this node.
134    pub last_ping_sent: Option<Instant>,
135    /// Last time we received a pong from this node.
136    pub last_pong_received: Option<Instant>,
137    /// Status flags.
138    pub flags: NodeFlags,
139    /// Configuration epoch (used for conflict resolution).
140    pub config_epoch: u64,
141}
142
143impl ClusterNode {
144    /// Creates a new primary node with the default bus port offset (10000).
145    pub fn new_primary(id: NodeId, addr: SocketAddr) -> Self {
146        Self::new_primary_with_offset(id, addr, 10000)
147    }
148
149    /// Creates a new primary node with a custom bus port offset.
150    pub fn new_primary_with_offset(id: NodeId, addr: SocketAddr, bus_port_offset: u16) -> Self {
151        let cluster_bus_addr =
152            SocketAddr::new(addr.ip(), addr.port().wrapping_add(bus_port_offset));
153        Self {
154            id,
155            addr,
156            cluster_bus_addr,
157            role: NodeRole::Primary,
158            slots: Vec::new(),
159            replicates: None,
160            replicas: Vec::new(),
161            last_seen: Instant::now(),
162            last_ping_sent: None,
163            last_pong_received: None,
164            flags: NodeFlags::default(),
165            config_epoch: 0,
166        }
167    }
168
169    /// Creates a new replica node.
170    pub fn new_replica(id: NodeId, addr: SocketAddr, primary_id: NodeId) -> Self {
171        let cluster_bus_addr = SocketAddr::new(addr.ip(), addr.port() + 10000);
172        Self {
173            id,
174            addr,
175            cluster_bus_addr,
176            role: NodeRole::Replica,
177            slots: Vec::new(),
178            replicates: Some(primary_id),
179            replicas: Vec::new(),
180            last_seen: Instant::now(),
181            last_ping_sent: None,
182            last_pong_received: None,
183            flags: NodeFlags::default(),
184            config_epoch: 0,
185        }
186    }
187
188    /// Marks this node as the local node.
189    pub fn set_myself(&mut self) {
190        self.flags.myself = true;
191    }
192
193    /// Returns true if this node is healthy and can serve requests.
194    pub fn is_healthy(&self) -> bool {
195        self.flags.is_healthy()
196    }
197
198    /// Returns the total number of slots owned by this node.
199    pub fn slot_count(&self) -> u16 {
200        self.slots.iter().map(|r| r.len()).sum()
201    }
202
203    /// Formats the node in CLUSTER NODES output format.
204    pub fn to_cluster_nodes_line(&self, slot_map: &SlotMap) -> String {
205        let slots_str = if self.role == NodeRole::Primary {
206            let ranges = slot_map.slots_for_node(self.id);
207            if ranges.is_empty() {
208                String::new()
209            } else {
210                ranges
211                    .iter()
212                    .map(|r| r.to_string())
213                    .collect::<Vec<_>>()
214                    .join(" ")
215            }
216        } else {
217            String::new()
218        };
219
220        let replicates_str = self
221            .replicates
222            .map(|id| id.0.to_string())
223            .unwrap_or_else(|| "-".to_string());
224
225        // Format: <id> <addr>@<bus-port> <flags> <master-id> <ping-sent> <pong-recv> <config-epoch> <link-state> <slots>
226        format!(
227            "{} {}@{} {} {} {} {} {} connected {}",
228            self.id.0,
229            self.addr,
230            self.cluster_bus_addr.port(),
231            self.format_flags(),
232            replicates_str,
233            self.last_ping_sent
234                .map(|t| t.elapsed().as_millis() as u64)
235                .unwrap_or(0),
236            self.last_pong_received
237                .map(|t| t.elapsed().as_millis() as u64)
238                .unwrap_or(0),
239            self.config_epoch,
240            slots_str
241        )
242        .trim()
243        .to_string()
244    }
245
246    fn format_flags(&self) -> String {
247        let mut flags = Vec::new();
248
249        if self.flags.myself {
250            flags.push("myself");
251        }
252
253        match self.role {
254            NodeRole::Primary => flags.push("master"),
255            NodeRole::Replica => flags.push("slave"),
256        }
257
258        if self.flags.fail {
259            flags.push("fail");
260        } else if self.flags.pfail {
261            flags.push("fail?");
262        }
263
264        if self.flags.handshake {
265            flags.push("handshake");
266        }
267
268        if self.flags.noaddr {
269            flags.push("noaddr");
270        }
271
272        flags.join(",")
273    }
274}
275
276/// The complete state of the cluster as seen by a node.
277#[derive(Debug)]
278pub struct ClusterState {
279    /// All known nodes in the cluster, indexed by ID.
280    pub nodes: HashMap<NodeId, ClusterNode>,
281    /// This node's ID.
282    pub local_id: NodeId,
283    /// Current configuration epoch (increases on topology changes).
284    pub config_epoch: u64,
285    /// Slot-to-node mapping.
286    pub slot_map: SlotMap,
287    /// Cluster state: ok, fail, or unknown.
288    pub state: ClusterHealth,
289}
290
291/// Overall cluster health status.
292#[derive(Debug, Clone, Copy, PartialEq, Eq)]
293pub enum ClusterHealth {
294    /// Cluster is operational and all slots are covered.
295    Ok,
296    /// Cluster has failed nodes or uncovered slots.
297    Fail,
298    /// Cluster state is being computed.
299    Unknown,
300}
301
302impl std::fmt::Display for ClusterHealth {
303    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304        match self {
305            ClusterHealth::Ok => write!(f, "ok"),
306            ClusterHealth::Fail => write!(f, "fail"),
307            ClusterHealth::Unknown => write!(f, "unknown"),
308        }
309    }
310}
311
312impl ClusterState {
313    /// Creates a new cluster state for a single-node cluster.
314    pub fn single_node(local_node: ClusterNode) -> Self {
315        let local_id = local_node.id;
316        let slot_map = SlotMap::single_node(local_id);
317        let mut nodes = HashMap::new();
318        nodes.insert(local_id, local_node);
319
320        Self {
321            nodes,
322            local_id,
323            config_epoch: 1,
324            slot_map,
325            state: ClusterHealth::Ok,
326        }
327    }
328
329    /// Creates a new empty cluster state (for joining an existing cluster).
330    pub fn new(local_id: NodeId) -> Self {
331        Self {
332            nodes: HashMap::new(),
333            local_id,
334            config_epoch: 0,
335            slot_map: SlotMap::new(),
336            state: ClusterHealth::Unknown,
337        }
338    }
339
340    /// Returns the local node.
341    pub fn local_node(&self) -> Option<&ClusterNode> {
342        self.nodes.get(&self.local_id)
343    }
344
345    /// Returns a mutable reference to the local node.
346    pub fn local_node_mut(&mut self) -> Option<&mut ClusterNode> {
347        self.nodes.get_mut(&self.local_id)
348    }
349
350    /// Adds a node to the cluster.
351    pub fn add_node(&mut self, node: ClusterNode) {
352        self.nodes.insert(node.id, node);
353    }
354
355    /// Removes a node from the cluster.
356    pub fn remove_node(&mut self, node_id: NodeId) -> Option<ClusterNode> {
357        self.nodes.remove(&node_id)
358    }
359
360    /// Returns the node that owns the given slot.
361    pub fn slot_owner(&self, slot: u16) -> Option<&ClusterNode> {
362        let node_id = self.slot_map.owner(slot)?;
363        self.nodes.get(&node_id)
364    }
365
366    /// Returns true if the local node owns the given slot.
367    pub fn owns_slot(&self, slot: u16) -> bool {
368        self.slot_map.owner(slot) == Some(self.local_id)
369    }
370
371    /// Returns all primary nodes.
372    pub fn primaries(&self) -> impl Iterator<Item = &ClusterNode> {
373        self.nodes.values().filter(|n| n.role == NodeRole::Primary)
374    }
375
376    /// Returns all replica nodes.
377    pub fn replicas(&self) -> impl Iterator<Item = &ClusterNode> {
378        self.nodes.values().filter(|n| n.role == NodeRole::Replica)
379    }
380
381    /// Returns replicas of a specific primary.
382    pub fn replicas_of(&self, primary_id: NodeId) -> impl Iterator<Item = &ClusterNode> {
383        self.nodes
384            .values()
385            .filter(move |n| n.replicates == Some(primary_id))
386    }
387
388    /// Computes and updates the cluster health state.
389    pub fn update_health(&mut self) {
390        // Check if all slots are covered by healthy primaries
391        if !self.slot_map.is_complete() {
392            self.state = ClusterHealth::Fail;
393            return;
394        }
395
396        // Check if any slot's owner is unhealthy
397        for slot in 0..crate::slots::SLOT_COUNT {
398            if let Some(owner_id) = self.slot_map.owner(slot) {
399                if let Some(node) = self.nodes.get(&owner_id) {
400                    if !node.is_healthy() {
401                        self.state = ClusterHealth::Fail;
402                        return;
403                    }
404                } else {
405                    // Owner node not found
406                    self.state = ClusterHealth::Fail;
407                    return;
408                }
409            }
410        }
411
412        self.state = ClusterHealth::Ok;
413    }
414
415    /// Generates the response for CLUSTER INFO command.
416    pub fn cluster_info(&self) -> String {
417        let assigned_slots = (SLOT_COUNT as usize - self.slot_map.unassigned_count()) as u16;
418        let primaries_count = self.primaries().count();
419
420        format!(
421            "cluster_state:{}\r\n\
422             cluster_slots_assigned:{}\r\n\
423             cluster_slots_ok:{}\r\n\
424             cluster_slots_pfail:0\r\n\
425             cluster_slots_fail:0\r\n\
426             cluster_known_nodes:{}\r\n\
427             cluster_size:{}\r\n\
428             cluster_current_epoch:{}\r\n\
429             cluster_my_epoch:{}\r\n",
430            self.state,
431            assigned_slots,
432            if self.state == ClusterHealth::Ok {
433                assigned_slots
434            } else {
435                0
436            },
437            self.nodes.len(),
438            primaries_count,
439            self.config_epoch,
440            self.local_node().map(|n| n.config_epoch).unwrap_or(0),
441        )
442    }
443
444    /// Generates the response for CLUSTER NODES command.
445    pub fn cluster_nodes(&self) -> String {
446        let mut lines: Vec<String> = self
447            .nodes
448            .values()
449            .map(|node| node.to_cluster_nodes_line(&self.slot_map))
450            .collect();
451        lines.sort(); // Consistent ordering
452        lines.join("\n")
453    }
454
455    /// Generates MOVED redirect information for a slot.
456    pub fn moved_redirect(&self, slot: u16) -> Result<(u16, SocketAddr), ClusterError> {
457        let node = self
458            .slot_owner(slot)
459            .ok_or(ClusterError::SlotNotAssigned(slot))?;
460        Ok((slot, node.addr))
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use std::net::{IpAddr, Ipv4Addr};
468
469    fn test_addr(port: u16) -> SocketAddr {
470        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
471    }
472
473    #[test]
474    fn node_id_display() {
475        let id = NodeId::new();
476        let display = id.to_string();
477        assert_eq!(display.len(), 8);
478    }
479
480    #[test]
481    fn node_id_parse() {
482        let id = NodeId::new();
483        let parsed = NodeId::parse(&id.0.to_string()).unwrap();
484        assert_eq!(id, parsed);
485    }
486
487    #[test]
488    fn node_flags_display() {
489        let mut flags = NodeFlags::default();
490        assert_eq!(flags.to_string(), "-");
491
492        flags.myself = true;
493        assert_eq!(flags.to_string(), "myself");
494
495        flags.pfail = true;
496        assert_eq!(flags.to_string(), "myself,pfail");
497    }
498
499    #[test]
500    fn cluster_node_primary() {
501        let id = NodeId::new();
502        let node = ClusterNode::new_primary(id, test_addr(6379));
503
504        assert_eq!(node.id, id);
505        assert_eq!(node.role, NodeRole::Primary);
506        assert_eq!(node.addr.port(), 6379);
507        assert_eq!(node.cluster_bus_addr.port(), 16379);
508        assert!(node.replicates.is_none());
509        assert!(node.is_healthy());
510    }
511
512    #[test]
513    fn cluster_node_replica() {
514        let primary_id = NodeId::new();
515        let replica_id = NodeId::new();
516        let node = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
517
518        assert_eq!(node.id, replica_id);
519        assert_eq!(node.role, NodeRole::Replica);
520        assert_eq!(node.replicates, Some(primary_id));
521    }
522
523    #[test]
524    fn cluster_state_single_node() {
525        let id = NodeId::new();
526        let mut node = ClusterNode::new_primary(id, test_addr(6379));
527        node.set_myself();
528
529        let state = ClusterState::single_node(node);
530
531        assert_eq!(state.local_id, id);
532        assert!(state.owns_slot(0));
533        assert!(state.owns_slot(16383));
534        assert_eq!(state.state, ClusterHealth::Ok);
535    }
536
537    #[test]
538    fn cluster_state_slot_owner() {
539        let id = NodeId::new();
540        let mut node = ClusterNode::new_primary(id, test_addr(6379));
541        node.set_myself();
542
543        let state = ClusterState::single_node(node);
544
545        let owner = state.slot_owner(100).unwrap();
546        assert_eq!(owner.id, id);
547    }
548
549    #[test]
550    fn cluster_state_health_check() {
551        let id = NodeId::new();
552        let mut node = ClusterNode::new_primary(id, test_addr(6379));
553        node.set_myself();
554
555        let mut state = ClusterState::single_node(node);
556        state.update_health();
557        assert_eq!(state.state, ClusterHealth::Ok);
558
559        // Unassign a slot
560        state.slot_map.unassign(0);
561        state.update_health();
562        assert_eq!(state.state, ClusterHealth::Fail);
563    }
564
565    #[test]
566    fn cluster_info_format() {
567        let id = NodeId::new();
568        let mut node = ClusterNode::new_primary(id, test_addr(6379));
569        node.set_myself();
570
571        let state = ClusterState::single_node(node);
572        let info = state.cluster_info();
573
574        assert!(info.contains("cluster_state:ok"));
575        assert!(info.contains("cluster_slots_assigned:16384"));
576        assert!(info.contains("cluster_known_nodes:1"));
577    }
578
579    #[test]
580    fn moved_redirect() {
581        let id = NodeId::new();
582        let mut node = ClusterNode::new_primary(id, test_addr(6379));
583        node.set_myself();
584
585        let state = ClusterState::single_node(node);
586
587        let (slot, addr) = state.moved_redirect(100).unwrap();
588        assert_eq!(slot, 100);
589        assert_eq!(addr.port(), 6379);
590    }
591
592    #[test]
593    fn primaries_and_replicas() {
594        let primary_id = NodeId::new();
595        let replica_id = NodeId::new();
596
597        let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
598        primary.set_myself();
599
600        let mut state = ClusterState::single_node(primary);
601
602        let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
603        state.add_node(replica);
604
605        assert_eq!(state.primaries().count(), 1);
606        assert_eq!(state.replicas().count(), 1);
607        assert_eq!(state.replicas_of(primary_id).count(), 1);
608    }
609}