1use crate::*;
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, mpsc, broadcast};
11use tokio::time::{self, MissedTickBehavior};
12
13pub struct MembershipManager {
15 cluster_state: Arc<ClusterState>,
17 config: MembershipConfig,
19 heartbeats: Arc<RwLock<HashMap<NodeId, HeartbeatInfo>>>,
21 membership_tx: broadcast::Sender<MembershipEvent>,
23 membership_rx: broadcast::Receiver<MembershipEvent>,
24 command_tx: mpsc::Sender<MembershipCommand>,
26 command_rx: mpsc::Receiver<MembershipCommand>,
27}
28
29impl MembershipManager {
30 pub fn new(cluster_state: Arc<ClusterState>, config: MembershipConfig) -> Self {
32 let (command_tx, command_rx) = mpsc::channel(100);
33 let (membership_tx, membership_rx) = broadcast::channel(100);
34
35 Self {
36 cluster_state,
37 config,
38 heartbeats: Arc::new(RwLock::new(HashMap::new())),
39 membership_tx,
40 membership_rx,
41 command_tx,
42 command_rx,
43 }
44 }
45
46 pub async fn start(&mut self) -> Result<(), MembershipError> {
48 Ok(())
51 }
52
53 pub async fn add_node(&self, node_id: NodeId, node_info: NodeInfo) -> Result<(), MembershipError> {
55 let mut config = self.cluster_state.config.write().await;
56
57 if config.nodes.contains_key(&node_id) {
59 return Err(MembershipError::NodeAlreadyExists(node_id.0));
60 }
61
62 config.nodes.insert(node_id.clone(), node_info);
64
65 let mut heartbeats = self.heartbeats.write().await;
67 heartbeats.insert(node_id.clone(), HeartbeatInfo::new());
68
69 let node_id_str = node_id.0.clone();
71 let _ = self.membership_tx.send(MembershipEvent::NodeJoined(node_id));
72
73 println!("Node {} joined the cluster", node_id_str);
74 Ok(())
75 }
76
77 pub async fn remove_node(&self, node_id: &NodeId) -> Result<(), MembershipError> {
79 let mut config = self.cluster_state.config.write().await;
80
81 if !config.nodes.contains_key(node_id) {
83 return Err(MembershipError::NodeNotFound(node_id.0.clone()));
84 }
85
86 config.nodes.remove(node_id);
88
89 let mut heartbeats = self.heartbeats.write().await;
91 heartbeats.remove(node_id);
92
93 let _ = self.membership_tx.send(MembershipEvent::NodeLeft(node_id.clone()));
95
96 println!("Node {} left the cluster", node_id.0);
97 Ok(())
98 }
99
100 pub async fn update_node(&self, node_id: &NodeId, node_info: NodeInfo) -> Result<(), MembershipError> {
102 let mut config = self.cluster_state.config.write().await;
103
104 if !config.nodes.contains_key(node_id) {
106 return Err(MembershipError::NodeNotFound(node_id.0.clone()));
107 }
108
109 config.nodes.insert(node_id.clone(), node_info);
111
112 println!("Node {} information updated", node_id.0);
113 Ok(())
114 }
115
116 pub async fn record_heartbeat(&self, node_id: &NodeId) -> Result<(), MembershipError> {
118 let mut heartbeats = self.heartbeats.write().await;
119
120 if let Some(info) = heartbeats.get_mut(node_id) {
121 info.last_heartbeat = Instant::now();
122 info.missed_heartbeats = 0;
123 Ok(())
124 } else {
125 Err(MembershipError::NodeNotFound(node_id.0.clone()))
126 }
127 }
128
129 pub async fn get_cluster_config(&self) -> ClusterConfig {
131 self.cluster_state.config.read().await.clone()
132 }
133
134 pub async fn get_active_nodes(&self) -> Vec<NodeId> {
136 let heartbeats = self.heartbeats.read().await;
137 let config = self.cluster_state.config.read().await;
138
139 config.nodes.keys()
140 .filter(|node_id| {
141 heartbeats.get(node_id)
142 .map(|info| info.is_alive())
143 .unwrap_or(false)
144 })
145 .cloned()
146 .collect()
147 }
148
149 pub async fn is_node_active(&self, node_id: &NodeId) -> bool {
151 let heartbeats = self.heartbeats.read().await;
152 heartbeats.get(node_id)
153 .map(|info| info.is_alive())
154 .unwrap_or(false)
155 }
156
157 pub async fn get_cluster_stats(&self) -> ClusterStats {
159 let config = self.cluster_state.config.read().await;
160 let heartbeats = self.heartbeats.read().await;
161
162 let total_nodes = config.nodes.len();
163 let active_nodes = heartbeats.values()
164 .filter(|info| info.is_alive())
165 .count();
166
167 let suspected_nodes = heartbeats.values()
168 .filter(|info| info.is_suspected())
169 .count();
170
171 let failed_nodes = total_nodes - active_nodes;
172
173 ClusterStats {
174 total_nodes,
175 active_nodes,
176 suspected_nodes,
177 failed_nodes,
178 replication_factor: config.replication_factor,
179 partition_count: config.partition_count,
180 }
181 }
182
183 pub fn subscribe_events(&self) -> broadcast::Receiver<MembershipEvent> {
185 self.membership_tx.subscribe()
186 }
187
188 async fn start_heartbeat_monitor(&self, cluster_state: Arc<ClusterState>, membership_tx: tokio::sync::broadcast::Sender<MembershipEvent>) -> Result<(), MembershipError> {
191 let heartbeats = Arc::clone(&self.heartbeats);
192 let config = self.config.clone();
193
194 tokio::spawn(async move {
195 let mut interval = time::interval(config.heartbeat_interval);
196 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
197
198 loop {
199 interval.tick().await;
200
201 let mut heartbeats = heartbeats.write().await;
202 let mut suspected_nodes = Vec::new();
203
204 for (node_id, info) in heartbeats.iter_mut() {
206 info.missed_heartbeats += 1;
207
208 if info.is_suspected() && !info.was_suspected {
209 info.was_suspected = true;
210 suspected_nodes.push(node_id.clone());
211 }
212 }
213
214 for node_id in suspected_nodes {
216 let _ = membership_tx.send(MembershipEvent::NodeSuspected(node_id));
217 }
218 }
219 });
220
221 Ok(())
222 }
223
224 async fn start_failure_detector(&self, cluster_state: Arc<ClusterState>, membership_tx: tokio::sync::broadcast::Sender<MembershipEvent>) -> Result<(), MembershipError> {
225 let heartbeats = Arc::clone(&self.heartbeats);
226 let config = self.config.clone();
227
228 tokio::spawn(async move {
229 let mut interval = time::interval(config.failure_detection_interval);
230 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
231
232 loop {
233 interval.tick().await;
234
235 let heartbeats = heartbeats.read().await;
236 let mut failed_nodes = Vec::new();
237
238 for (node_id, info) in heartbeats.iter() {
240 if info.is_failed() {
241 failed_nodes.push(node_id.clone());
242 }
243 }
244
245 for node_id in failed_nodes {
247 let _ = membership_tx.send(MembershipEvent::NodeFailed(node_id));
248 }
249
250 }
253 });
254
255 Ok(())
256 }
257
258 async fn start_command_processor(&self, _cluster_state: Arc<ClusterState>, _command_tx: tokio::sync::mpsc::Sender<MembershipCommand>, _command_rx: &mut tokio::sync::mpsc::Receiver<MembershipCommand>) -> Result<(), MembershipError> {
259 Ok(())
262 }
263
264 pub fn command_sender(&self) -> mpsc::Sender<MembershipCommand> {
266 self.command_tx.clone()
267 }
268}
269
270#[derive(Debug, Clone)]
272pub struct MembershipConfig {
273 pub heartbeat_interval: Duration,
275 pub failure_detection_interval: Duration,
277 pub max_missed_heartbeats: u32,
279 pub failure_timeout: Duration,
281 pub gossip_interval: Duration,
283}
284
285impl Default for MembershipConfig {
286 fn default() -> Self {
287 Self {
288 heartbeat_interval: Duration::from_secs(1),
289 failure_detection_interval: Duration::from_secs(5),
290 max_missed_heartbeats: 3,
291 failure_timeout: Duration::from_secs(15),
292 gossip_interval: Duration::from_secs(2),
293 }
294 }
295}
296
297#[derive(Debug, Clone)]
299pub struct HeartbeatInfo {
300 pub last_heartbeat: Instant,
301 pub missed_heartbeats: u32,
302 pub was_suspected: bool,
303}
304
305impl HeartbeatInfo {
306 pub fn new() -> Self {
307 Self {
308 last_heartbeat: Instant::now(),
309 missed_heartbeats: 0,
310 was_suspected: false,
311 }
312 }
313
314 pub fn is_alive(&self) -> bool {
316 !self.is_suspected() && !self.is_failed()
317 }
318
319 pub fn is_suspected(&self) -> bool {
321 self.missed_heartbeats >= 1 }
323
324 pub fn is_failed(&self) -> bool {
326 self.last_heartbeat.elapsed() > Duration::from_secs(30) }
328}
329
330#[derive(Debug, Clone)]
332pub enum MembershipEvent {
333 NodeJoined(NodeId),
334 NodeLeft(NodeId),
335 NodeSuspected(NodeId),
336 NodeFailed(NodeId),
337 NodeRecovered(NodeId),
338 ConfigChanged,
339}
340
341#[derive(Debug)]
343pub enum MembershipCommand {
344 AddNode {
345 node_id: NodeId,
346 node_info: NodeInfo,
347 response_tx: tokio::sync::oneshot::Sender<Result<(), MembershipError>>,
348 },
349 RemoveNode {
350 node_id: NodeId,
351 response_tx: tokio::sync::oneshot::Sender<Result<(), MembershipError>>,
352 },
353 UpdateNode {
354 node_id: NodeId,
355 node_info: NodeInfo,
356 response_tx: tokio::sync::oneshot::Sender<Result<(), MembershipError>>,
357 },
358}
359
360#[derive(Debug, Clone)]
362pub struct ClusterStats {
363 pub total_nodes: usize,
364 pub active_nodes: usize,
365 pub suspected_nodes: usize,
366 pub failed_nodes: usize,
367 pub replication_factor: usize,
368 pub partition_count: usize,
369}
370
371impl ClusterStats {
372 pub fn health_score(&self) -> f64 {
374 if self.total_nodes == 0 {
375 return 1.0; }
377
378 let healthy_nodes = self.active_nodes as f64;
379 let total_nodes = self.total_nodes as f64;
380
381 healthy_nodes / total_nodes
382 }
383
384 pub fn has_quorum(&self) -> bool {
386 let quorum_size = (self.total_nodes / 2) + 1;
387 self.active_nodes >= quorum_size
388 }
389
390 pub fn can_tolerate_failures(&self, max_failures: usize) -> bool {
392 self.active_nodes > max_failures
393 }
394}
395
396#[derive(Debug, thiserror::Error)]
398pub enum MembershipError {
399 #[error("Node already exists: {0}")]
400 NodeAlreadyExists(String),
401
402 #[error("Node not found: {0}")]
403 NodeNotFound(String),
404
405 #[error("Invalid node configuration: {0}")]
406 InvalidNodeConfig(String),
407
408 #[error("Membership operation timed out")]
409 Timeout,
410
411 #[error("Network communication error: {0}")]
412 NetworkError(String),
413
414 #[error("Configuration error: {0}")]
415 ConfigError(String),
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
423 fn test_heartbeat_info() {
424 let info = HeartbeatInfo::new();
425
426 assert!(info.is_alive());
427 assert!(!info.is_suspected());
428 assert!(!info.is_failed());
429 }
430
431 #[test]
432 fn test_cluster_stats() {
433 let stats = ClusterStats {
434 total_nodes: 5,
435 active_nodes: 4,
436 suspected_nodes: 1,
437 failed_nodes: 0,
438 replication_factor: 3,
439 partition_count: 64,
440 };
441
442 assert_eq!(stats.health_score(), 0.8);
443 assert!(stats.has_quorum()); assert!(stats.can_tolerate_failures(1)); assert!(!stats.can_tolerate_failures(4)); }
447
448 #[tokio::test]
449 async fn test_membership_manager_creation() {
450 let cluster_state = Arc::new(ClusterState::new(NodeId("test".to_string())));
451 let config = MembershipConfig::default();
452
453 let manager = MembershipManager::new(cluster_state, config);
454
455 let stats = manager.get_cluster_stats().await;
456 assert_eq!(stats.total_nodes, 0); assert_eq!(stats.active_nodes, 0);
458 }
459
460 #[tokio::test]
461 async fn test_add_remove_node() {
462 let cluster_state = Arc::new(ClusterState::new(NodeId("test".to_string())));
463 let config = MembershipConfig::default();
464
465 let manager = MembershipManager::new(cluster_state, config);
466
467 let node_id = NodeId("node-1".to_string());
468 let node_info = NodeInfo {
469 id: node_id.clone(),
470 address: "127.0.0.1".to_string(),
471 port: 8080,
472 role: NodeRole::Follower,
473 partitions: vec![],
474 };
475
476 manager.add_node(node_id.clone(), node_info.clone()).await.unwrap();
478
479 let stats = manager.get_cluster_stats().await;
480 assert_eq!(stats.total_nodes, 1);
481
482 manager.remove_node(&node_id).await.unwrap();
484
485 let stats = manager.get_cluster_stats().await;
486 assert_eq!(stats.total_nodes, 0);
487 }
488}