1use crate::{GraphError, Result};
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use dashmap::DashMap;
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, HashSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18use uuid::Uuid;
19
20pub type NodeId = String;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub enum GossipMessage {
26 Ping {
28 from: NodeId,
29 sequence: u64,
30 timestamp: DateTime<Utc>,
31 },
32 Ack {
34 from: NodeId,
35 to: NodeId,
36 sequence: u64,
37 timestamp: DateTime<Utc>,
38 },
39 IndirectPing {
41 from: NodeId,
42 target: NodeId,
43 intermediary: NodeId,
44 sequence: u64,
45 },
46 MembershipUpdate {
48 from: NodeId,
49 updates: Vec<MembershipEvent>,
50 version: u64,
51 },
52 Join {
54 node_id: NodeId,
55 address: SocketAddr,
56 metadata: HashMap<String, String>,
57 },
58 Leave { node_id: NodeId },
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum MembershipEvent {
65 Join {
67 node_id: NodeId,
68 address: SocketAddr,
69 timestamp: DateTime<Utc>,
70 },
71 Leave {
73 node_id: NodeId,
74 timestamp: DateTime<Utc>,
75 },
76 Suspect {
78 node_id: NodeId,
79 timestamp: DateTime<Utc>,
80 },
81 Alive {
83 node_id: NodeId,
84 timestamp: DateTime<Utc>,
85 },
86 Dead {
88 node_id: NodeId,
89 timestamp: DateTime<Utc>,
90 },
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95pub enum NodeHealth {
96 Alive,
98 Suspect,
100 Dead,
102 Left,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct Member {
109 pub node_id: NodeId,
111 pub address: SocketAddr,
113 pub health: NodeHealth,
115 pub last_seen: DateTime<Utc>,
117 pub incarnation: u64,
119 pub metadata: HashMap<String, String>,
121 pub failure_count: u32,
123}
124
125impl Member {
126 pub fn new(node_id: NodeId, address: SocketAddr) -> Self {
128 Self {
129 node_id,
130 address,
131 health: NodeHealth::Alive,
132 last_seen: Utc::now(),
133 incarnation: 0,
134 metadata: HashMap::new(),
135 failure_count: 0,
136 }
137 }
138
139 pub fn is_healthy(&self) -> bool {
141 matches!(self.health, NodeHealth::Alive)
142 }
143
144 pub fn mark_seen(&mut self) {
146 self.last_seen = Utc::now();
147 self.failure_count = 0;
148 if self.health != NodeHealth::Left {
149 self.health = NodeHealth::Alive;
150 }
151 }
152
153 pub fn increment_failures(&mut self) {
155 self.failure_count += 1;
156 }
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct GossipConfig {
162 pub gossip_interval_ms: u64,
164 pub gossip_fanout: usize,
166 pub ping_timeout_ms: u64,
168 pub suspect_threshold: u32,
170 pub indirect_ping_nodes: usize,
172 pub suspicion_timeout_seconds: u64,
174}
175
176impl Default for GossipConfig {
177 fn default() -> Self {
178 Self {
179 gossip_interval_ms: 1000,
180 gossip_fanout: 3,
181 ping_timeout_ms: 500,
182 suspect_threshold: 3,
183 indirect_ping_nodes: 3,
184 suspicion_timeout_seconds: 30,
185 }
186 }
187}
188
189pub struct GossipMembership {
191 local_node_id: NodeId,
193 local_address: SocketAddr,
195 config: GossipConfig,
197 members: Arc<DashMap<NodeId, Member>>,
199 version: Arc<RwLock<u64>>,
201 pending_acks: Arc<DashMap<u64, PendingAck>>,
203 sequence: Arc<RwLock<u64>>,
205 event_listeners: Arc<RwLock<Vec<Box<dyn Fn(MembershipEvent) + Send + Sync>>>>,
207}
208
209struct PendingAck {
211 target: NodeId,
212 sent_at: DateTime<Utc>,
213}
214
215impl GossipMembership {
216 pub fn new(node_id: NodeId, address: SocketAddr, config: GossipConfig) -> Self {
218 let members = Arc::new(DashMap::new());
219
220 let local_member = Member::new(node_id.clone(), address);
222 members.insert(node_id.clone(), local_member);
223
224 Self {
225 local_node_id: node_id,
226 local_address: address,
227 config,
228 members,
229 version: Arc::new(RwLock::new(0)),
230 pending_acks: Arc::new(DashMap::new()),
231 sequence: Arc::new(RwLock::new(0)),
232 event_listeners: Arc::new(RwLock::new(Vec::new())),
233 }
234 }
235
236 pub async fn start(&self) -> Result<()> {
238 info!("Starting gossip protocol for node: {}", self.local_node_id);
239
240 let gossip_self = self.clone();
242 tokio::spawn(async move {
243 gossip_self.run_gossip_loop().await;
244 });
245
246 let detection_self = self.clone();
248 tokio::spawn(async move {
249 detection_self.run_failure_detection().await;
250 });
251
252 Ok(())
253 }
254
255 pub async fn join(&self, seed_address: SocketAddr) -> Result<()> {
257 info!("Joining cluster via seed: {}", seed_address);
258
259 let join_msg = GossipMessage::Join {
261 node_id: self.local_node_id.clone(),
262 address: self.local_address,
263 metadata: HashMap::new(),
264 };
265
266 debug!("Would send join message to {}", seed_address);
269
270 Ok(())
271 }
272
273 pub async fn leave(&self) -> Result<()> {
275 info!("Leaving cluster: {}", self.local_node_id);
276
277 if let Some(mut member) = self.members.get_mut(&self.local_node_id) {
279 member.health = NodeHealth::Left;
280 }
281
282 let leave_msg = GossipMessage::Leave {
284 node_id: self.local_node_id.clone(),
285 };
286
287 self.broadcast_event(MembershipEvent::Leave {
288 node_id: self.local_node_id.clone(),
289 timestamp: Utc::now(),
290 })
291 .await;
292
293 Ok(())
294 }
295
296 pub fn get_members(&self) -> Vec<Member> {
298 self.members.iter().map(|e| e.value().clone()).collect()
299 }
300
301 pub fn get_healthy_members(&self) -> Vec<Member> {
303 self.members
304 .iter()
305 .filter(|e| e.value().is_healthy())
306 .map(|e| e.value().clone())
307 .collect()
308 }
309
310 pub fn get_member(&self, node_id: &NodeId) -> Option<Member> {
312 self.members.get(node_id).map(|m| m.value().clone())
313 }
314
315 pub async fn handle_message(&self, message: GossipMessage) -> Result<()> {
317 match message {
318 GossipMessage::Ping { from, sequence, .. } => self.handle_ping(from, sequence).await,
319 GossipMessage::Ack { from, sequence, .. } => self.handle_ack(from, sequence).await,
320 GossipMessage::MembershipUpdate { updates, .. } => {
321 self.handle_membership_update(updates).await
322 }
323 GossipMessage::Join {
324 node_id,
325 address,
326 metadata,
327 } => self.handle_join(node_id, address, metadata).await,
328 GossipMessage::Leave { node_id } => self.handle_leave(node_id).await,
329 _ => Ok(()),
330 }
331 }
332
333 async fn run_gossip_loop(&self) {
335 let interval = std::time::Duration::from_millis(self.config.gossip_interval_ms);
336
337 loop {
338 tokio::time::sleep(interval).await;
339
340 let members = self.get_healthy_members();
342 let targets: Vec<_> = members
343 .into_iter()
344 .filter(|m| m.node_id != self.local_node_id)
345 .take(self.config.gossip_fanout)
346 .collect();
347
348 for target in targets {
349 self.send_ping(target.node_id).await;
350 }
351 }
352 }
353
354 async fn run_failure_detection(&self) {
356 let interval = std::time::Duration::from_secs(5);
357
358 loop {
359 tokio::time::sleep(interval).await;
360
361 let now = Utc::now();
362 let timeout = ChronoDuration::seconds(self.config.suspicion_timeout_seconds as i64);
363
364 for mut entry in self.members.iter_mut() {
365 let member = entry.value_mut();
366
367 if member.node_id == self.local_node_id {
368 continue;
369 }
370
371 if member.health == NodeHealth::Suspect {
373 let elapsed = now.signed_duration_since(member.last_seen);
374 if elapsed > timeout {
375 debug!("Marking node as dead: {}", member.node_id);
376 member.health = NodeHealth::Dead;
377
378 let event = MembershipEvent::Dead {
379 node_id: member.node_id.clone(),
380 timestamp: now,
381 };
382
383 self.emit_event(event);
384 }
385 }
386 }
387 }
388 }
389
390 async fn send_ping(&self, target: NodeId) {
392 let mut seq = self.sequence.write().await;
393 *seq += 1;
394 let sequence = *seq;
395 drop(seq);
396
397 let ping = GossipMessage::Ping {
398 from: self.local_node_id.clone(),
399 sequence,
400 timestamp: Utc::now(),
401 };
402
403 self.pending_acks.insert(
405 sequence,
406 PendingAck {
407 target: target.clone(),
408 sent_at: Utc::now(),
409 },
410 );
411
412 debug!("Sending ping to {}", target);
413 }
415
416 async fn handle_ping(&self, from: NodeId, sequence: u64) -> Result<()> {
418 debug!("Received ping from {}", from);
419
420 if let Some(mut member) = self.members.get_mut(&from) {
422 member.mark_seen();
423 }
424
425 let ack = GossipMessage::Ack {
427 from: self.local_node_id.clone(),
428 to: from,
429 sequence,
430 timestamp: Utc::now(),
431 };
432
433 Ok(())
435 }
436
437 async fn handle_ack(&self, from: NodeId, sequence: u64) -> Result<()> {
439 debug!("Received ack from {}", from);
440
441 self.pending_acks.remove(&sequence);
443
444 if let Some(mut member) = self.members.get_mut(&from) {
446 member.mark_seen();
447 }
448
449 Ok(())
450 }
451
452 async fn handle_membership_update(&self, updates: Vec<MembershipEvent>) -> Result<()> {
454 for event in updates {
455 match &event {
456 MembershipEvent::Join {
457 node_id, address, ..
458 } => {
459 if !self.members.contains_key(node_id) {
460 let member = Member::new(node_id.clone(), *address);
461 self.members.insert(node_id.clone(), member);
462 }
463 }
464 MembershipEvent::Suspect { node_id, .. } => {
465 if let Some(mut member) = self.members.get_mut(node_id) {
466 member.health = NodeHealth::Suspect;
467 }
468 }
469 MembershipEvent::Dead { node_id, .. } => {
470 if let Some(mut member) = self.members.get_mut(node_id) {
471 member.health = NodeHealth::Dead;
472 }
473 }
474 _ => {}
475 }
476
477 self.emit_event(event);
478 }
479
480 Ok(())
481 }
482
483 async fn handle_join(
485 &self,
486 node_id: NodeId,
487 address: SocketAddr,
488 metadata: HashMap<String, String>,
489 ) -> Result<()> {
490 info!("Node joining: {}", node_id);
491
492 let mut member = Member::new(node_id.clone(), address);
493 member.metadata = metadata;
494
495 self.members.insert(node_id.clone(), member);
496
497 let event = MembershipEvent::Join {
498 node_id,
499 address,
500 timestamp: Utc::now(),
501 };
502
503 self.broadcast_event(event).await;
504
505 Ok(())
506 }
507
508 async fn handle_leave(&self, node_id: NodeId) -> Result<()> {
510 info!("Node leaving: {}", node_id);
511
512 if let Some(mut member) = self.members.get_mut(&node_id) {
513 member.health = NodeHealth::Left;
514 }
515
516 let event = MembershipEvent::Leave {
517 node_id,
518 timestamp: Utc::now(),
519 };
520
521 self.emit_event(event);
522
523 Ok(())
524 }
525
526 async fn broadcast_event(&self, event: MembershipEvent) {
528 let mut version = self.version.write().await;
529 *version += 1;
530 drop(version);
531
532 self.emit_event(event);
533 }
534
535 fn emit_event(&self, event: MembershipEvent) {
537 debug!("Membership event: {:?}", event);
539 }
540
541 pub async fn add_listener<F>(&self, listener: F)
543 where
544 F: Fn(MembershipEvent) + Send + Sync + 'static,
545 {
546 let mut listeners = self.event_listeners.write().await;
547 listeners.push(Box::new(listener));
548 }
549
550 pub async fn get_version(&self) -> u64 {
552 *self.version.read().await
553 }
554}
555
556impl Clone for GossipMembership {
557 fn clone(&self) -> Self {
558 Self {
559 local_node_id: self.local_node_id.clone(),
560 local_address: self.local_address,
561 config: self.config.clone(),
562 members: Arc::clone(&self.members),
563 version: Arc::clone(&self.version),
564 pending_acks: Arc::clone(&self.pending_acks),
565 sequence: Arc::clone(&self.sequence),
566 event_listeners: Arc::clone(&self.event_listeners),
567 }
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use std::net::{IpAddr, Ipv4Addr};
575
576 fn create_test_address(port: u16) -> SocketAddr {
577 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
578 }
579
580 #[tokio::test]
581 async fn test_gossip_membership() {
582 let config = GossipConfig::default();
583 let address = create_test_address(8000);
584 let gossip = GossipMembership::new("node-1".to_string(), address, config);
585
586 assert_eq!(gossip.get_members().len(), 1);
587 }
588
589 #[tokio::test]
590 async fn test_join_leave() {
591 let config = GossipConfig::default();
592 let address1 = create_test_address(8000);
593 let address2 = create_test_address(8001);
594
595 let gossip = GossipMembership::new("node-1".to_string(), address1, config);
596
597 gossip
598 .handle_join("node-2".to_string(), address2, HashMap::new())
599 .await
600 .unwrap();
601
602 assert_eq!(gossip.get_members().len(), 2);
603
604 gossip.handle_leave("node-2".to_string()).await.unwrap();
605
606 let member = gossip.get_member(&"node-2".to_string()).unwrap();
607 assert_eq!(member.health, NodeHealth::Left);
608 }
609
610 #[test]
611 fn test_member() {
612 let address = create_test_address(8000);
613 let mut member = Member::new("test".to_string(), address);
614
615 assert!(member.is_healthy());
616
617 member.health = NodeHealth::Suspect;
618 assert!(!member.is_healthy());
619
620 member.mark_seen();
621 assert!(member.is_healthy());
622 }
623}