Skip to main content

ember_cluster/
topology.rs

1//! Cluster topology management.
2//!
3//! Defines the structure of a cluster: nodes, their roles, health states,
4//! and the overall cluster configuration.
5
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::time::Instant;
9
10use serde::{Deserialize, Serialize};
11use uuid::Uuid;
12
13use crate::slots::{SlotMap, SlotRange};
14use crate::ClusterError;
15
16/// Unique identifier for a cluster node.
17///
18/// Wraps a UUID v4 for guaranteed uniqueness across the cluster.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct NodeId(pub Uuid);
21
22impl NodeId {
23    /// Generates a new random node ID.
24    pub fn new() -> Self {
25        Self(Uuid::new_v4())
26    }
27
28    /// Creates a node ID from a UUID string.
29    pub fn parse(s: &str) -> Result<Self, uuid::Error> {
30        Ok(Self(Uuid::parse_str(s)?))
31    }
32}
33
34impl Default for NodeId {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl std::fmt::Display for NodeId {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        // Show first 8 chars for readability (similar to git short hashes)
43        write!(f, "{}", &self.0.to_string()[..8])
44    }
45}
46
47/// The role of a node in the cluster.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum NodeRole {
50    /// Primary node that owns slots and accepts writes.
51    Primary,
52    /// Replica node that mirrors a primary's data.
53    Replica,
54}
55
56impl std::fmt::Display for NodeRole {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        match self {
59            NodeRole::Primary => write!(f, "primary"),
60            NodeRole::Replica => write!(f, "replica"),
61        }
62    }
63}
64
65/// Status flags for a node.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
67pub struct NodeFlags {
68    /// Node is the local node (myself).
69    pub myself: bool,
70    /// Node is suspected to be failing.
71    pub pfail: bool,
72    /// Node has been confirmed as failed by the cluster.
73    pub fail: bool,
74    /// Node is performing a handshake (not yet part of cluster).
75    pub handshake: bool,
76    /// Node has no address yet.
77    pub noaddr: bool,
78}
79
80impl NodeFlags {
81    /// Returns true if the node is considered healthy.
82    pub fn is_healthy(&self) -> bool {
83        !self.fail && !self.pfail
84    }
85}
86
87impl std::fmt::Display for NodeFlags {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        let mut flags = Vec::new();
90        if self.myself {
91            flags.push("myself");
92        }
93        if self.pfail {
94            flags.push("pfail");
95        }
96        if self.fail {
97            flags.push("fail");
98        }
99        if self.handshake {
100            flags.push("handshake");
101        }
102        if self.noaddr {
103            flags.push("noaddr");
104        }
105        if flags.is_empty() {
106            write!(f, "-")
107        } else {
108            write!(f, "{}", flags.join(","))
109        }
110    }
111}
112
113/// Information about a single node in the cluster.
114#[derive(Debug, Clone)]
115pub struct ClusterNode {
116    /// Unique node identifier.
117    pub id: NodeId,
118    /// Address for client connections.
119    pub addr: SocketAddr,
120    /// Address for cluster bus (gossip) connections.
121    /// Typically addr.port + 10000.
122    pub cluster_bus_addr: SocketAddr,
123    /// Node's role in the cluster.
124    pub role: NodeRole,
125    /// Slot ranges assigned to this node (only for primaries).
126    pub slots: Vec<SlotRange>,
127    /// If this is a replica, the ID of its primary.
128    pub replicates: Option<NodeId>,
129    /// IDs of nodes replicating this one (if primary).
130    pub replicas: Vec<NodeId>,
131    /// Last time we received a message from this node.
132    pub last_seen: Instant,
133    /// Last time we sent a ping to this node.
134    pub last_ping_sent: Option<Instant>,
135    /// Last time we received a pong from this node.
136    pub last_pong_received: Option<Instant>,
137    /// Status flags.
138    pub flags: NodeFlags,
139    /// Configuration epoch (used for conflict resolution).
140    pub config_epoch: u64,
141}
142
143impl ClusterNode {
144    /// Creates a new primary node.
145    pub fn new_primary(id: NodeId, addr: SocketAddr) -> Self {
146        let cluster_bus_addr = SocketAddr::new(addr.ip(), addr.port() + 10000);
147        Self {
148            id,
149            addr,
150            cluster_bus_addr,
151            role: NodeRole::Primary,
152            slots: Vec::new(),
153            replicates: None,
154            replicas: Vec::new(),
155            last_seen: Instant::now(),
156            last_ping_sent: None,
157            last_pong_received: None,
158            flags: NodeFlags::default(),
159            config_epoch: 0,
160        }
161    }
162
163    /// Creates a new replica node.
164    pub fn new_replica(id: NodeId, addr: SocketAddr, primary_id: NodeId) -> Self {
165        let cluster_bus_addr = SocketAddr::new(addr.ip(), addr.port() + 10000);
166        Self {
167            id,
168            addr,
169            cluster_bus_addr,
170            role: NodeRole::Replica,
171            slots: Vec::new(),
172            replicates: Some(primary_id),
173            replicas: Vec::new(),
174            last_seen: Instant::now(),
175            last_ping_sent: None,
176            last_pong_received: None,
177            flags: NodeFlags::default(),
178            config_epoch: 0,
179        }
180    }
181
182    /// Marks this node as the local node.
183    pub fn set_myself(&mut self) {
184        self.flags.myself = true;
185    }
186
187    /// Returns true if this node is healthy and can serve requests.
188    pub fn is_healthy(&self) -> bool {
189        self.flags.is_healthy()
190    }
191
192    /// Returns the total number of slots owned by this node.
193    pub fn slot_count(&self) -> u16 {
194        self.slots.iter().map(|r| r.len()).sum()
195    }
196
197    /// Formats the node in CLUSTER NODES output format.
198    pub fn to_cluster_nodes_line(&self, slot_map: &SlotMap) -> String {
199        let slots_str = if self.role == NodeRole::Primary {
200            let ranges = slot_map.slots_for_node(self.id);
201            if ranges.is_empty() {
202                String::new()
203            } else {
204                ranges
205                    .iter()
206                    .map(|r| r.to_string())
207                    .collect::<Vec<_>>()
208                    .join(" ")
209            }
210        } else {
211            String::new()
212        };
213
214        let replicates_str = self
215            .replicates
216            .map(|id| id.0.to_string())
217            .unwrap_or_else(|| "-".to_string());
218
219        // Format: <id> <addr>@<bus-port> <flags> <master-id> <ping-sent> <pong-recv> <config-epoch> <link-state> <slots>
220        format!(
221            "{} {}@{} {} {} {} {} {} connected {}",
222            self.id.0,
223            self.addr,
224            self.cluster_bus_addr.port(),
225            self.format_flags(),
226            replicates_str,
227            self.last_ping_sent
228                .map(|t| t.elapsed().as_millis() as u64)
229                .unwrap_or(0),
230            self.last_pong_received
231                .map(|t| t.elapsed().as_millis() as u64)
232                .unwrap_or(0),
233            self.config_epoch,
234            slots_str
235        )
236        .trim()
237        .to_string()
238    }
239
240    fn format_flags(&self) -> String {
241        let mut flags = Vec::new();
242
243        if self.flags.myself {
244            flags.push("myself");
245        }
246
247        match self.role {
248            NodeRole::Primary => flags.push("master"),
249            NodeRole::Replica => flags.push("slave"),
250        }
251
252        if self.flags.fail {
253            flags.push("fail");
254        } else if self.flags.pfail {
255            flags.push("fail?");
256        }
257
258        if self.flags.handshake {
259            flags.push("handshake");
260        }
261
262        if self.flags.noaddr {
263            flags.push("noaddr");
264        }
265
266        flags.join(",")
267    }
268}
269
270/// The complete state of the cluster as seen by a node.
271#[derive(Debug)]
272pub struct ClusterState {
273    /// All known nodes in the cluster, indexed by ID.
274    pub nodes: HashMap<NodeId, ClusterNode>,
275    /// This node's ID.
276    pub local_id: NodeId,
277    /// Current configuration epoch (increases on topology changes).
278    pub config_epoch: u64,
279    /// Slot-to-node mapping.
280    pub slot_map: SlotMap,
281    /// Cluster state: ok, fail, or unknown.
282    pub state: ClusterHealth,
283}
284
285/// Overall cluster health status.
286#[derive(Debug, Clone, Copy, PartialEq, Eq)]
287pub enum ClusterHealth {
288    /// Cluster is operational and all slots are covered.
289    Ok,
290    /// Cluster has failed nodes or uncovered slots.
291    Fail,
292    /// Cluster state is being computed.
293    Unknown,
294}
295
296impl std::fmt::Display for ClusterHealth {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        match self {
299            ClusterHealth::Ok => write!(f, "ok"),
300            ClusterHealth::Fail => write!(f, "fail"),
301            ClusterHealth::Unknown => write!(f, "unknown"),
302        }
303    }
304}
305
306impl ClusterState {
307    /// Creates a new cluster state for a single-node cluster.
308    pub fn single_node(local_node: ClusterNode) -> Self {
309        let local_id = local_node.id;
310        let slot_map = SlotMap::single_node(local_id);
311        let mut nodes = HashMap::new();
312        nodes.insert(local_id, local_node);
313
314        Self {
315            nodes,
316            local_id,
317            config_epoch: 1,
318            slot_map,
319            state: ClusterHealth::Ok,
320        }
321    }
322
323    /// Creates a new empty cluster state (for joining an existing cluster).
324    pub fn new(local_id: NodeId) -> Self {
325        Self {
326            nodes: HashMap::new(),
327            local_id,
328            config_epoch: 0,
329            slot_map: SlotMap::new(),
330            state: ClusterHealth::Unknown,
331        }
332    }
333
334    /// Returns the local node.
335    pub fn local_node(&self) -> Option<&ClusterNode> {
336        self.nodes.get(&self.local_id)
337    }
338
339    /// Returns a mutable reference to the local node.
340    pub fn local_node_mut(&mut self) -> Option<&mut ClusterNode> {
341        self.nodes.get_mut(&self.local_id)
342    }
343
344    /// Adds a node to the cluster.
345    pub fn add_node(&mut self, node: ClusterNode) {
346        self.nodes.insert(node.id, node);
347    }
348
349    /// Removes a node from the cluster.
350    pub fn remove_node(&mut self, node_id: NodeId) -> Option<ClusterNode> {
351        self.nodes.remove(&node_id)
352    }
353
354    /// Returns the node that owns the given slot.
355    pub fn slot_owner(&self, slot: u16) -> Option<&ClusterNode> {
356        let node_id = self.slot_map.owner(slot)?;
357        self.nodes.get(&node_id)
358    }
359
360    /// Returns true if the local node owns the given slot.
361    pub fn owns_slot(&self, slot: u16) -> bool {
362        self.slot_map.owner(slot) == Some(self.local_id)
363    }
364
365    /// Returns all primary nodes.
366    pub fn primaries(&self) -> impl Iterator<Item = &ClusterNode> {
367        self.nodes.values().filter(|n| n.role == NodeRole::Primary)
368    }
369
370    /// Returns all replica nodes.
371    pub fn replicas(&self) -> impl Iterator<Item = &ClusterNode> {
372        self.nodes.values().filter(|n| n.role == NodeRole::Replica)
373    }
374
375    /// Returns replicas of a specific primary.
376    pub fn replicas_of(&self, primary_id: NodeId) -> impl Iterator<Item = &ClusterNode> {
377        self.nodes
378            .values()
379            .filter(move |n| n.replicates == Some(primary_id))
380    }
381
382    /// Computes and updates the cluster health state.
383    pub fn update_health(&mut self) {
384        // Check if all slots are covered by healthy primaries
385        if !self.slot_map.is_complete() {
386            self.state = ClusterHealth::Fail;
387            return;
388        }
389
390        // Check if any slot's owner is unhealthy
391        for slot in 0..crate::slots::SLOT_COUNT {
392            if let Some(owner_id) = self.slot_map.owner(slot) {
393                if let Some(node) = self.nodes.get(&owner_id) {
394                    if !node.is_healthy() {
395                        self.state = ClusterHealth::Fail;
396                        return;
397                    }
398                } else {
399                    // Owner node not found
400                    self.state = ClusterHealth::Fail;
401                    return;
402                }
403            }
404        }
405
406        self.state = ClusterHealth::Ok;
407    }
408
409    /// Generates the response for CLUSTER INFO command.
410    pub fn cluster_info(&self) -> String {
411        let primaries: Vec<_> = self.primaries().collect();
412        let assigned_slots: u16 = primaries.iter().map(|n| n.slot_count()).sum();
413
414        format!(
415            "cluster_state:{}\r\n\
416             cluster_slots_assigned:{}\r\n\
417             cluster_slots_ok:{}\r\n\
418             cluster_slots_pfail:0\r\n\
419             cluster_slots_fail:0\r\n\
420             cluster_known_nodes:{}\r\n\
421             cluster_size:{}\r\n\
422             cluster_current_epoch:{}\r\n\
423             cluster_my_epoch:{}\r\n",
424            self.state,
425            assigned_slots,
426            if self.state == ClusterHealth::Ok {
427                assigned_slots
428            } else {
429                0
430            },
431            self.nodes.len(),
432            primaries.len(),
433            self.config_epoch,
434            self.local_node().map(|n| n.config_epoch).unwrap_or(0),
435        )
436    }
437
438    /// Generates the response for CLUSTER NODES command.
439    pub fn cluster_nodes(&self) -> String {
440        let mut lines: Vec<String> = self
441            .nodes
442            .values()
443            .map(|node| node.to_cluster_nodes_line(&self.slot_map))
444            .collect();
445        lines.sort(); // Consistent ordering
446        lines.join("\n")
447    }
448
449    /// Generates MOVED redirect information for a slot.
450    pub fn moved_redirect(&self, slot: u16) -> Result<(u16, SocketAddr), ClusterError> {
451        let node = self
452            .slot_owner(slot)
453            .ok_or(ClusterError::SlotNotAssigned(slot))?;
454        Ok((slot, node.addr))
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461    use std::net::{IpAddr, Ipv4Addr};
462
463    fn test_addr(port: u16) -> SocketAddr {
464        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
465    }
466
467    #[test]
468    fn node_id_display() {
469        let id = NodeId::new();
470        let display = id.to_string();
471        assert_eq!(display.len(), 8);
472    }
473
474    #[test]
475    fn node_id_parse() {
476        let id = NodeId::new();
477        let parsed = NodeId::parse(&id.0.to_string()).unwrap();
478        assert_eq!(id, parsed);
479    }
480
481    #[test]
482    fn node_flags_display() {
483        let mut flags = NodeFlags::default();
484        assert_eq!(flags.to_string(), "-");
485
486        flags.myself = true;
487        assert_eq!(flags.to_string(), "myself");
488
489        flags.pfail = true;
490        assert_eq!(flags.to_string(), "myself,pfail");
491    }
492
493    #[test]
494    fn cluster_node_primary() {
495        let id = NodeId::new();
496        let node = ClusterNode::new_primary(id, test_addr(6379));
497
498        assert_eq!(node.id, id);
499        assert_eq!(node.role, NodeRole::Primary);
500        assert_eq!(node.addr.port(), 6379);
501        assert_eq!(node.cluster_bus_addr.port(), 16379);
502        assert!(node.replicates.is_none());
503        assert!(node.is_healthy());
504    }
505
506    #[test]
507    fn cluster_node_replica() {
508        let primary_id = NodeId::new();
509        let replica_id = NodeId::new();
510        let node = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
511
512        assert_eq!(node.id, replica_id);
513        assert_eq!(node.role, NodeRole::Replica);
514        assert_eq!(node.replicates, Some(primary_id));
515    }
516
517    #[test]
518    fn cluster_state_single_node() {
519        let id = NodeId::new();
520        let mut node = ClusterNode::new_primary(id, test_addr(6379));
521        node.set_myself();
522
523        let state = ClusterState::single_node(node);
524
525        assert_eq!(state.local_id, id);
526        assert!(state.owns_slot(0));
527        assert!(state.owns_slot(16383));
528        assert_eq!(state.state, ClusterHealth::Ok);
529    }
530
531    #[test]
532    fn cluster_state_slot_owner() {
533        let id = NodeId::new();
534        let mut node = ClusterNode::new_primary(id, test_addr(6379));
535        node.set_myself();
536
537        let state = ClusterState::single_node(node);
538
539        let owner = state.slot_owner(100).unwrap();
540        assert_eq!(owner.id, id);
541    }
542
543    #[test]
544    fn cluster_state_health_check() {
545        let id = NodeId::new();
546        let mut node = ClusterNode::new_primary(id, test_addr(6379));
547        node.set_myself();
548
549        let mut state = ClusterState::single_node(node);
550        state.update_health();
551        assert_eq!(state.state, ClusterHealth::Ok);
552
553        // Unassign a slot
554        state.slot_map.unassign(0);
555        state.update_health();
556        assert_eq!(state.state, ClusterHealth::Fail);
557    }
558
559    #[test]
560    fn cluster_info_format() {
561        let id = NodeId::new();
562        let mut node = ClusterNode::new_primary(id, test_addr(6379));
563        node.set_myself();
564
565        let state = ClusterState::single_node(node);
566        let info = state.cluster_info();
567
568        assert!(info.contains("cluster_state:ok"));
569        assert!(info.contains("cluster_slots_assigned:0")); // slots in node.slots, not slot_map
570        assert!(info.contains("cluster_known_nodes:1"));
571    }
572
573    #[test]
574    fn moved_redirect() {
575        let id = NodeId::new();
576        let mut node = ClusterNode::new_primary(id, test_addr(6379));
577        node.set_myself();
578
579        let state = ClusterState::single_node(node);
580
581        let (slot, addr) = state.moved_redirect(100).unwrap();
582        assert_eq!(slot, 100);
583        assert_eq!(addr.port(), 6379);
584    }
585
586    #[test]
587    fn primaries_and_replicas() {
588        let primary_id = NodeId::new();
589        let replica_id = NodeId::new();
590
591        let mut primary = ClusterNode::new_primary(primary_id, test_addr(6379));
592        primary.set_myself();
593
594        let mut state = ClusterState::single_node(primary);
595
596        let replica = ClusterNode::new_replica(replica_id, test_addr(6380), primary_id);
597        state.add_node(replica);
598
599        assert_eq!(state.primaries().count(), 1);
600        assert_eq!(state.replicas().count(), 1);
601        assert_eq!(state.replicas_of(primary_id).count(), 1);
602    }
603}