kotoba_db_cluster/
membership.rs

1//! # Cluster Membership Management
2//!
3//! Manages cluster membership, node discovery, and cluster configuration.
4//! Handles node joins, leaves, failures, and configuration updates.
5
6use crate::*;
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, mpsc, broadcast};
11use tokio::time::{self, MissedTickBehavior};
12
13/// Cluster membership manager
14pub struct MembershipManager {
15    /// Cluster state
16    cluster_state: Arc<ClusterState>,
17    /// Membership configuration
18    config: MembershipConfig,
19    /// Node heartbeat tracking
20    heartbeats: Arc<RwLock<HashMap<NodeId, HeartbeatInfo>>>,
21    /// Membership change notifications
22    membership_tx: broadcast::Sender<MembershipEvent>,
23    membership_rx: broadcast::Receiver<MembershipEvent>,
24    /// Command channel
25    command_tx: mpsc::Sender<MembershipCommand>,
26    command_rx: mpsc::Receiver<MembershipCommand>,
27}
28
29impl MembershipManager {
30    /// Create a new membership manager
31    pub fn new(cluster_state: Arc<ClusterState>, config: MembershipConfig) -> Self {
32        let (command_tx, command_rx) = mpsc::channel(100);
33        let (membership_tx, membership_rx) = broadcast::channel(100);
34
35        Self {
36            cluster_state,
37            config,
38            heartbeats: Arc::new(RwLock::new(HashMap::new())),
39            membership_tx,
40            membership_rx,
41            command_tx,
42            command_rx,
43        }
44    }
45
46    /// Start membership management processes
47    pub async fn start(&mut self) -> Result<(), MembershipError> {
48        // TODO: Implement membership management startup
49        // For now, just return Ok to allow compilation
50        Ok(())
51    }
52
53    /// Add a node to the cluster
54    pub async fn add_node(&self, node_id: NodeId, node_info: NodeInfo) -> Result<(), MembershipError> {
55        let mut config = self.cluster_state.config.write().await;
56
57        // Check if node already exists
58        if config.nodes.contains_key(&node_id) {
59            return Err(MembershipError::NodeAlreadyExists(node_id.0));
60        }
61
62        // Add node to configuration
63        config.nodes.insert(node_id.clone(), node_info);
64
65        // Initialize heartbeat tracking
66        let mut heartbeats = self.heartbeats.write().await;
67        heartbeats.insert(node_id.clone(), HeartbeatInfo::new());
68
69        // Notify listeners
70        let node_id_str = node_id.0.clone();
71        let _ = self.membership_tx.send(MembershipEvent::NodeJoined(node_id));
72
73        println!("Node {} joined the cluster", node_id_str);
74        Ok(())
75    }
76
77    /// Remove a node from the cluster
78    pub async fn remove_node(&self, node_id: &NodeId) -> Result<(), MembershipError> {
79        let mut config = self.cluster_state.config.write().await;
80
81        // Check if node exists
82        if !config.nodes.contains_key(node_id) {
83            return Err(MembershipError::NodeNotFound(node_id.0.clone()));
84        }
85
86        // Remove node from configuration
87        config.nodes.remove(node_id);
88
89        // Remove heartbeat tracking
90        let mut heartbeats = self.heartbeats.write().await;
91        heartbeats.remove(node_id);
92
93        // Notify listeners
94        let _ = self.membership_tx.send(MembershipEvent::NodeLeft(node_id.clone()));
95
96        println!("Node {} left the cluster", node_id.0);
97        Ok(())
98    }
99
100    /// Update node information
101    pub async fn update_node(&self, node_id: &NodeId, node_info: NodeInfo) -> Result<(), MembershipError> {
102        let mut config = self.cluster_state.config.write().await;
103
104        // Check if node exists
105        if !config.nodes.contains_key(node_id) {
106            return Err(MembershipError::NodeNotFound(node_id.0.clone()));
107        }
108
109        // Update node information
110        config.nodes.insert(node_id.clone(), node_info);
111
112        println!("Node {} information updated", node_id.0);
113        Ok(())
114    }
115
116    /// Record a heartbeat from a node
117    pub async fn record_heartbeat(&self, node_id: &NodeId) -> Result<(), MembershipError> {
118        let mut heartbeats = self.heartbeats.write().await;
119
120        if let Some(info) = heartbeats.get_mut(node_id) {
121            info.last_heartbeat = Instant::now();
122            info.missed_heartbeats = 0;
123            Ok(())
124        } else {
125            Err(MembershipError::NodeNotFound(node_id.0.clone()))
126        }
127    }
128
129    /// Get current cluster configuration
130    pub async fn get_cluster_config(&self) -> ClusterConfig {
131        self.cluster_state.config.read().await.clone()
132    }
133
134    /// Get list of active nodes
135    pub async fn get_active_nodes(&self) -> Vec<NodeId> {
136        let heartbeats = self.heartbeats.read().await;
137        let config = self.cluster_state.config.read().await;
138
139        config.nodes.keys()
140            .filter(|node_id| {
141                heartbeats.get(node_id)
142                    .map(|info| info.is_alive())
143                    .unwrap_or(false)
144            })
145            .cloned()
146            .collect()
147    }
148
149    /// Check if a node is active
150    pub async fn is_node_active(&self, node_id: &NodeId) -> bool {
151        let heartbeats = self.heartbeats.read().await;
152        heartbeats.get(node_id)
153            .map(|info| info.is_alive())
154            .unwrap_or(false)
155    }
156
157    /// Get cluster statistics
158    pub async fn get_cluster_stats(&self) -> ClusterStats {
159        let config = self.cluster_state.config.read().await;
160        let heartbeats = self.heartbeats.read().await;
161
162        let total_nodes = config.nodes.len();
163        let active_nodes = heartbeats.values()
164            .filter(|info| info.is_alive())
165            .count();
166
167        let suspected_nodes = heartbeats.values()
168            .filter(|info| info.is_suspected())
169            .count();
170
171        let failed_nodes = total_nodes - active_nodes;
172
173        ClusterStats {
174            total_nodes,
175            active_nodes,
176            suspected_nodes,
177            failed_nodes,
178            replication_factor: config.replication_factor,
179            partition_count: config.partition_count,
180        }
181    }
182
183    /// Subscribe to membership events
184    pub fn subscribe_events(&self) -> broadcast::Receiver<MembershipEvent> {
185        self.membership_tx.subscribe()
186    }
187
188    // Internal methods
189
190    async fn start_heartbeat_monitor(&self, cluster_state: Arc<ClusterState>, membership_tx: tokio::sync::broadcast::Sender<MembershipEvent>) -> Result<(), MembershipError> {
191        let heartbeats = Arc::clone(&self.heartbeats);
192        let config = self.config.clone();
193
194        tokio::spawn(async move {
195            let mut interval = time::interval(config.heartbeat_interval);
196            interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
197
198            loop {
199                interval.tick().await;
200
201                let mut heartbeats = heartbeats.write().await;
202                let mut suspected_nodes = Vec::new();
203
204                // Check for missed heartbeats
205                for (node_id, info) in heartbeats.iter_mut() {
206                    info.missed_heartbeats += 1;
207
208                    if info.is_suspected() && !info.was_suspected {
209                        info.was_suspected = true;
210                        suspected_nodes.push(node_id.clone());
211                    }
212                }
213
214                // Notify about suspected nodes
215                for node_id in suspected_nodes {
216                    let _ = membership_tx.send(MembershipEvent::NodeSuspected(node_id));
217                }
218            }
219        });
220
221        Ok(())
222    }
223
224    async fn start_failure_detector(&self, cluster_state: Arc<ClusterState>, membership_tx: tokio::sync::broadcast::Sender<MembershipEvent>) -> Result<(), MembershipError> {
225        let heartbeats = Arc::clone(&self.heartbeats);
226        let config = self.config.clone();
227
228        tokio::spawn(async move {
229            let mut interval = time::interval(config.failure_detection_interval);
230            interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
231
232            loop {
233                interval.tick().await;
234
235                let heartbeats = heartbeats.read().await;
236                let mut failed_nodes = Vec::new();
237
238                // Check for failed nodes
239                for (node_id, info) in heartbeats.iter() {
240                    if info.is_failed() {
241                        failed_nodes.push(node_id.clone());
242                    }
243                }
244
245                // Notify about failed nodes
246                for node_id in failed_nodes {
247                    let _ = membership_tx.send(MembershipEvent::NodeFailed(node_id));
248                }
249
250                // Clean up old failed nodes after some time
251                // TODO: Implement cleanup logic
252            }
253        });
254
255        Ok(())
256    }
257
258    async fn start_command_processor(&self, _cluster_state: Arc<ClusterState>, _command_tx: tokio::sync::mpsc::Sender<MembershipCommand>, _command_rx: &mut tokio::sync::mpsc::Receiver<MembershipCommand>) -> Result<(), MembershipError> {
259        // TODO: Implement command processor
260        // For now, just return Ok to allow compilation
261        Ok(())
262    }
263
264    /// Get command sender for external commands
265    pub fn command_sender(&self) -> mpsc::Sender<MembershipCommand> {
266        self.command_tx.clone()
267    }
268}
269
270/// Membership configuration
271#[derive(Debug, Clone)]
272pub struct MembershipConfig {
273    /// Heartbeat interval
274    pub heartbeat_interval: Duration,
275    /// Failure detection interval
276    pub failure_detection_interval: Duration,
277    /// Maximum missed heartbeats before suspecting failure
278    pub max_missed_heartbeats: u32,
279    /// Maximum suspected time before marking as failed
280    pub failure_timeout: Duration,
281    /// Gossip interval for membership propagation
282    pub gossip_interval: Duration,
283}
284
285impl Default for MembershipConfig {
286    fn default() -> Self {
287        Self {
288            heartbeat_interval: Duration::from_secs(1),
289            failure_detection_interval: Duration::from_secs(5),
290            max_missed_heartbeats: 3,
291            failure_timeout: Duration::from_secs(15),
292            gossip_interval: Duration::from_secs(2),
293        }
294    }
295}
296
297/// Heartbeat information for a node
298#[derive(Debug, Clone)]
299pub struct HeartbeatInfo {
300    pub last_heartbeat: Instant,
301    pub missed_heartbeats: u32,
302    pub was_suspected: bool,
303}
304
305impl HeartbeatInfo {
306    pub fn new() -> Self {
307        Self {
308            last_heartbeat: Instant::now(),
309            missed_heartbeats: 0,
310            was_suspected: false,
311        }
312    }
313
314    /// Check if node is considered alive
315    pub fn is_alive(&self) -> bool {
316        !self.is_suspected() && !self.is_failed()
317    }
318
319    /// Check if node is suspected of failure
320    pub fn is_suspected(&self) -> bool {
321        self.missed_heartbeats >= 1 // Simplified: any missed heartbeat
322    }
323
324    /// Check if node has failed
325    pub fn is_failed(&self) -> bool {
326        self.last_heartbeat.elapsed() > Duration::from_secs(30) // Simplified timeout
327    }
328}
329
330/// Membership events
331#[derive(Debug, Clone)]
332pub enum MembershipEvent {
333    NodeJoined(NodeId),
334    NodeLeft(NodeId),
335    NodeSuspected(NodeId),
336    NodeFailed(NodeId),
337    NodeRecovered(NodeId),
338    ConfigChanged,
339}
340
341/// Membership commands
342#[derive(Debug)]
343pub enum MembershipCommand {
344    AddNode {
345        node_id: NodeId,
346        node_info: NodeInfo,
347        response_tx: tokio::sync::oneshot::Sender<Result<(), MembershipError>>,
348    },
349    RemoveNode {
350        node_id: NodeId,
351        response_tx: tokio::sync::oneshot::Sender<Result<(), MembershipError>>,
352    },
353    UpdateNode {
354        node_id: NodeId,
355        node_info: NodeInfo,
356        response_tx: tokio::sync::oneshot::Sender<Result<(), MembershipError>>,
357    },
358}
359
360/// Cluster statistics
361#[derive(Debug, Clone)]
362pub struct ClusterStats {
363    pub total_nodes: usize,
364    pub active_nodes: usize,
365    pub suspected_nodes: usize,
366    pub failed_nodes: usize,
367    pub replication_factor: usize,
368    pub partition_count: usize,
369}
370
371impl ClusterStats {
372    /// Calculate cluster health score (0.0 to 1.0)
373    pub fn health_score(&self) -> f64 {
374        if self.total_nodes == 0 {
375            return 1.0; // Empty cluster is "healthy"
376        }
377
378        let healthy_nodes = self.active_nodes as f64;
379        let total_nodes = self.total_nodes as f64;
380
381        healthy_nodes / total_nodes
382    }
383
384    /// Check if cluster has quorum
385    pub fn has_quorum(&self) -> bool {
386        let quorum_size = (self.total_nodes / 2) + 1;
387        self.active_nodes >= quorum_size
388    }
389
390    /// Check if cluster can tolerate failures
391    pub fn can_tolerate_failures(&self, max_failures: usize) -> bool {
392        self.active_nodes > max_failures
393    }
394}
395
396/// Membership-related errors
397#[derive(Debug, thiserror::Error)]
398pub enum MembershipError {
399    #[error("Node already exists: {0}")]
400    NodeAlreadyExists(String),
401
402    #[error("Node not found: {0}")]
403    NodeNotFound(String),
404
405    #[error("Invalid node configuration: {0}")]
406    InvalidNodeConfig(String),
407
408    #[error("Membership operation timed out")]
409    Timeout,
410
411    #[error("Network communication error: {0}")]
412    NetworkError(String),
413
414    #[error("Configuration error: {0}")]
415    ConfigError(String),
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    #[test]
423    fn test_heartbeat_info() {
424        let info = HeartbeatInfo::new();
425
426        assert!(info.is_alive());
427        assert!(!info.is_suspected());
428        assert!(!info.is_failed());
429    }
430
431    #[test]
432    fn test_cluster_stats() {
433        let stats = ClusterStats {
434            total_nodes: 5,
435            active_nodes: 4,
436            suspected_nodes: 1,
437            failed_nodes: 0,
438            replication_factor: 3,
439            partition_count: 64,
440        };
441
442        assert_eq!(stats.health_score(), 0.8);
443        assert!(stats.has_quorum()); // 4 >= 3
444        assert!(stats.can_tolerate_failures(1)); // 4 > 1
445        assert!(!stats.can_tolerate_failures(4)); // 4 > 4 is false
446    }
447
448    #[tokio::test]
449    async fn test_membership_manager_creation() {
450        let cluster_state = Arc::new(ClusterState::new(NodeId("test".to_string())));
451        let config = MembershipConfig::default();
452
453        let manager = MembershipManager::new(cluster_state, config);
454
455        let stats = manager.get_cluster_stats().await;
456        assert_eq!(stats.total_nodes, 0); // No nodes added yet
457        assert_eq!(stats.active_nodes, 0);
458    }
459
460    #[tokio::test]
461    async fn test_add_remove_node() {
462        let cluster_state = Arc::new(ClusterState::new(NodeId("test".to_string())));
463        let config = MembershipConfig::default();
464
465        let manager = MembershipManager::new(cluster_state, config);
466
467        let node_id = NodeId("node-1".to_string());
468        let node_info = NodeInfo {
469            id: node_id.clone(),
470            address: "127.0.0.1".to_string(),
471            port: 8080,
472            role: NodeRole::Follower,
473            partitions: vec![],
474        };
475
476        // Add node
477        manager.add_node(node_id.clone(), node_info.clone()).await.unwrap();
478
479        let stats = manager.get_cluster_stats().await;
480        assert_eq!(stats.total_nodes, 1);
481
482        // Remove node
483        manager.remove_node(&node_id).await.unwrap();
484
485        let stats = manager.get_cluster_stats().await;
486        assert_eq!(stats.total_nodes, 0);
487    }
488}