Skip to main content

rivven_cluster/
node.rs

1//! Node types and management
2
3use serde::{Deserialize, Serialize};
4use std::net::SocketAddr;
5use std::time::Instant;
6
7/// Unique node identifier (UUID or human-readable string)
8pub type NodeId = String;
9
10/// Node state in the cluster
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12#[serde(rename_all = "lowercase")]
13#[derive(Default)]
14pub enum NodeState {
15    /// Node is healthy and responding
16    Alive,
17    /// Node missed some pings, suspected but not confirmed dead
18    Suspect,
19    /// Node confirmed dead, will be removed
20    Dead,
21    /// Node is leaving gracefully
22    Leaving,
23    /// Node state is unknown (just joined)
24    #[default]
25    Unknown,
26}
27
28impl NodeState {
29    /// Check if node is considered healthy for routing
30    pub fn is_healthy(&self) -> bool {
31        matches!(self, NodeState::Alive)
32    }
33
34    /// Check if node might be reachable
35    pub fn is_reachable(&self) -> bool {
36        matches!(self, NodeState::Alive | NodeState::Suspect)
37    }
38}
39
40/// Node capabilities and roles
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
42pub struct NodeCapabilities {
43    /// Can this node be a Raft voter?
44    pub voter: bool,
45    /// Can this node host partition leaders?
46    pub leader_eligible: bool,
47    /// Can this node host partition replicas?
48    pub replica_eligible: bool,
49}
50
51impl NodeCapabilities {
52    /// Full capabilities (voter + leader + replica)
53    pub fn full() -> Self {
54        Self {
55            voter: true,
56            leader_eligible: true,
57            replica_eligible: true,
58        }
59    }
60
61    /// Observer capabilities (replica only, no voting/leading)
62    pub fn observer() -> Self {
63        Self {
64            voter: false,
65            leader_eligible: false,
66            replica_eligible: true,
67        }
68    }
69}
70
71/// Information about a cluster node
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73pub struct NodeInfo {
74    /// Unique node identifier
75    pub id: NodeId,
76
77    /// Human-readable name
78    pub name: Option<String>,
79
80    /// Rack identifier for rack-aware placement
81    pub rack: Option<String>,
82
83    /// Client-facing address
84    pub client_addr: SocketAddr,
85
86    /// Cluster communication address
87    pub cluster_addr: SocketAddr,
88
89    /// Node capabilities
90    pub capabilities: NodeCapabilities,
91
92    /// Node version (for compatibility checking)
93    pub version: String,
94
95    /// Custom metadata/tags
96    pub tags: std::collections::HashMap<String, String>,
97}
98
99impl NodeInfo {
100    /// Create new node info
101    pub fn new(id: impl Into<String>, client_addr: SocketAddr, cluster_addr: SocketAddr) -> Self {
102        Self {
103            id: id.into(),
104            name: None,
105            rack: None,
106            client_addr,
107            cluster_addr,
108            capabilities: NodeCapabilities::full(),
109            version: env!("CARGO_PKG_VERSION").to_string(),
110            tags: std::collections::HashMap::new(),
111        }
112    }
113
114    /// Set human-readable name
115    pub fn with_name(mut self, name: impl Into<String>) -> Self {
116        self.name = Some(name.into());
117        self
118    }
119
120    /// Set rack identifier
121    pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
122        self.rack = Some(rack.into());
123        self
124    }
125
126    /// Set capabilities
127    pub fn with_capabilities(mut self, capabilities: NodeCapabilities) -> Self {
128        self.capabilities = capabilities;
129        self
130    }
131
132    /// Add a tag
133    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
134        self.tags.insert(key.into(), value.into());
135        self
136    }
137}
138
139/// Full node state including runtime information
140#[derive(Debug, Clone)]
141pub struct Node {
142    /// Static node information
143    pub info: NodeInfo,
144
145    /// Current node state
146    pub state: NodeState,
147
148    /// Incarnation number (for SWIM protocol)
149    pub incarnation: u64,
150
151    /// Last time we heard from this node
152    pub last_seen: Instant,
153
154    /// Number of partitions led by this node
155    pub partition_leader_count: u32,
156
157    /// Number of partition replicas on this node
158    pub partition_replica_count: u32,
159
160    /// Whether this node is the Raft leader
161    pub is_raft_leader: bool,
162}
163
164impl Node {
165    /// Create a new node from info
166    pub fn new(info: NodeInfo) -> Self {
167        Self {
168            info,
169            state: NodeState::Unknown,
170            incarnation: 0,
171            last_seen: Instant::now(),
172            partition_leader_count: 0,
173            partition_replica_count: 0,
174            is_raft_leader: false,
175        }
176    }
177
178    /// Update last seen time
179    pub fn touch(&mut self) {
180        self.last_seen = Instant::now();
181    }
182
183    /// Mark as alive with incarnation-based CAS.
184    ///
185    /// Returns `true` if the transition was accepted.
186    ///
187    /// Enforces state machine rules:
188    /// - Unknown/Suspect → Alive: always valid with >= incarnation
189    /// - Dead → Alive: only valid with strictly higher incarnation (rejoin)
190    /// - Alive → Alive: valid with higher incarnation (state refresh)
191    pub fn mark_alive(&mut self, incarnation: u64) -> bool {
192        match self.state {
193            NodeState::Dead => {
194                // Dead → Alive requires strictly higher incarnation (rejoin)
195                if incarnation <= self.incarnation {
196                    return false;
197                }
198            }
199            NodeState::Alive => {
200                // Alive → Alive refresh: allow >= incarnation
201                if incarnation < self.incarnation {
202                    return false;
203                }
204            }
205            NodeState::Unknown | NodeState::Suspect => {
206                // Allow transition with >= incarnation
207                if incarnation < self.incarnation {
208                    return false;
209                }
210            }
211            NodeState::Leaving => {
212                // Once leaving, only a higher incarnation rejoin is valid
213                if incarnation <= self.incarnation {
214                    return false;
215                }
216            }
217        }
218        self.state = NodeState::Alive;
219        self.incarnation = incarnation;
220        self.touch();
221        true
222    }
223
224    /// Mark as suspect with state machine guard.
225    ///
226    /// Returns `true` if the transition was accepted.
227    ///
228    /// Only Alive|Unknown → Suspect is valid.
229    /// Dead → Suspect is explicitly rejected to prevent the
230    /// Suspect→Dead→Suspect race condition.
231    pub fn mark_suspect(&mut self) -> bool {
232        // Allow Unknown → Suspect transition in addition to Alive → Suspect.
233        // A node just discovered (Unknown) can be suspected if it misses pings
234        // before ever being confirmed Alive.
235        match self.state {
236            NodeState::Alive | NodeState::Unknown => {
237                self.state = NodeState::Suspect;
238                true
239            }
240            // Already suspect, dead, or leaving — no-op
241            _ => false,
242        }
243    }
244
245    /// Mark as dead with state machine guard.
246    ///
247    /// Returns `true` if the transition was accepted.
248    ///
249    /// Only Suspect → Dead is valid in SWIM.
250    /// Direct Alive → Dead is rejected (must go through Suspect first).
251    /// Already Dead is a no-op (idempotent).
252    pub fn mark_dead(&mut self) -> bool {
253        match self.state {
254            NodeState::Suspect => {
255                self.state = NodeState::Dead;
256                true
257            }
258            NodeState::Dead => false, // Already dead, idempotent
259            _ => false,               // Invalid transition
260        }
261    }
262
263    /// Mark as leaving.
264    ///
265    /// Returns `true` if the transition was accepted.
266    /// Valid from any non-Dead state (graceful shutdown).
267    pub fn mark_leaving(&mut self) -> bool {
268        match self.state {
269            NodeState::Dead => false,
270            _ => {
271                self.state = NodeState::Leaving;
272                true
273            }
274        }
275    }
276
277    /// Check if node is healthy
278    pub fn is_healthy(&self) -> bool {
279        self.state.is_healthy()
280    }
281
282    /// Get node ID
283    pub fn id(&self) -> &str {
284        &self.info.id
285    }
286
287    /// Get cluster address
288    pub fn cluster_addr(&self) -> SocketAddr {
289        self.info.cluster_addr
290    }
291
292    /// Get client address  
293    pub fn client_addr(&self) -> SocketAddr {
294        self.info.client_addr
295    }
296
297    /// Calculate load score (lower is better for placement)
298    pub fn load_score(&self) -> u32 {
299        // Weight leaders more than replicas
300        self.partition_leader_count * 3 + self.partition_replica_count
301    }
302}
303
304/// Serializable node state for gossip
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct NodeGossipState {
307    pub id: NodeId,
308    pub state: NodeState,
309    pub incarnation: u64,
310    pub cluster_addr: SocketAddr,
311    pub client_addr: SocketAddr,
312    pub rack: Option<String>,
313    pub capabilities: NodeCapabilities,
314}
315
316impl From<&Node> for NodeGossipState {
317    fn from(node: &Node) -> Self {
318        Self {
319            id: node.info.id.clone(),
320            state: node.state,
321            incarnation: node.incarnation,
322            cluster_addr: node.info.cluster_addr,
323            client_addr: node.info.client_addr,
324            rack: node.info.rack.clone(),
325            capabilities: node.info.capabilities,
326        }
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn test_node_state_transitions() {
336        let info = NodeInfo::new(
337            "node-1",
338            "127.0.0.1:9092".parse().unwrap(),
339            "127.0.0.1:9093".parse().unwrap(),
340        );
341        let mut node = Node::new(info);
342
343        assert_eq!(node.state, NodeState::Unknown);
344        assert!(!node.is_healthy());
345
346        assert!(node.mark_alive(1));
347        assert_eq!(node.state, NodeState::Alive);
348        assert!(node.is_healthy());
349
350        assert!(node.mark_suspect());
351        assert_eq!(node.state, NodeState::Suspect);
352        assert!(!node.is_healthy());
353        assert!(node.state.is_reachable());
354
355        assert!(node.mark_dead());
356        assert_eq!(node.state, NodeState::Dead);
357        assert!(!node.state.is_reachable());
358
359        // Dead → Suspect must be rejected
360        assert!(!node.mark_suspect());
361        assert_eq!(node.state, NodeState::Dead);
362
363        // Dead → Alive requires strictly higher incarnation
364        assert!(!node.mark_alive(1));
365        assert_eq!(node.state, NodeState::Dead);
366        assert!(node.mark_alive(2));
367        assert_eq!(node.state, NodeState::Alive);
368    }
369
370    #[test]
371    fn test_load_score() {
372        let info = NodeInfo::new(
373            "node-1",
374            "127.0.0.1:9092".parse().unwrap(),
375            "127.0.0.1:9093".parse().unwrap(),
376        );
377        let mut node = Node::new(info);
378
379        node.partition_leader_count = 2;
380        node.partition_replica_count = 4;
381
382        // Leaders weighted 3x
383        assert_eq!(node.load_score(), 2 * 3 + 4);
384    }
385
386    #[test]
387    fn test_node_capabilities() {
388        let full = NodeCapabilities::full();
389        assert!(full.voter && full.leader_eligible && full.replica_eligible);
390
391        let observer = NodeCapabilities::observer();
392        assert!(!observer.voter && !observer.leader_eligible && observer.replica_eligible);
393    }
394}