ruvector_graph/distributed/
gossip.rs

1//! Gossip protocol for cluster membership and health monitoring
2//!
3//! Implements SWIM (Scalable Weakly-consistent Infection-style Membership) protocol:
4//! - Fast failure detection
5//! - Efficient membership propagation
6//! - Low network overhead
7//! - Automatic node discovery
8
9use crate::{GraphError, Result};
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use dashmap::DashMap;
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, HashSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18use uuid::Uuid;
19
20/// Node identifier in the cluster
21pub type NodeId = String;
22
23/// Gossip message types
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub enum GossipMessage {
26    /// Ping message for health check
27    Ping {
28        from: NodeId,
29        sequence: u64,
30        timestamp: DateTime<Utc>,
31    },
32    /// Ack response to ping
33    Ack {
34        from: NodeId,
35        to: NodeId,
36        sequence: u64,
37        timestamp: DateTime<Utc>,
38    },
39    /// Indirect ping through intermediary
40    IndirectPing {
41        from: NodeId,
42        target: NodeId,
43        intermediary: NodeId,
44        sequence: u64,
45    },
46    /// Membership update
47    MembershipUpdate {
48        from: NodeId,
49        updates: Vec<MembershipEvent>,
50        version: u64,
51    },
52    /// Join request
53    Join {
54        node_id: NodeId,
55        address: SocketAddr,
56        metadata: HashMap<String, String>,
57    },
58    /// Leave notification
59    Leave { node_id: NodeId },
60}
61
62/// Membership event types
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum MembershipEvent {
65    /// Node joined the cluster
66    Join {
67        node_id: NodeId,
68        address: SocketAddr,
69        timestamp: DateTime<Utc>,
70    },
71    /// Node left the cluster
72    Leave {
73        node_id: NodeId,
74        timestamp: DateTime<Utc>,
75    },
76    /// Node suspected to be failed
77    Suspect {
78        node_id: NodeId,
79        timestamp: DateTime<Utc>,
80    },
81    /// Node confirmed alive
82    Alive {
83        node_id: NodeId,
84        timestamp: DateTime<Utc>,
85    },
86    /// Node confirmed dead
87    Dead {
88        node_id: NodeId,
89        timestamp: DateTime<Utc>,
90    },
91}
92
93/// Node health status
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95pub enum NodeHealth {
96    /// Node is healthy and responsive
97    Alive,
98    /// Node is suspected to be failed
99    Suspect,
100    /// Node is confirmed dead
101    Dead,
102    /// Node explicitly left
103    Left,
104}
105
106/// Member information in the gossip protocol
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct Member {
109    /// Node identifier
110    pub node_id: NodeId,
111    /// Network address
112    pub address: SocketAddr,
113    /// Current health status
114    pub health: NodeHealth,
115    /// Last time we heard from this node
116    pub last_seen: DateTime<Utc>,
117    /// Incarnation number (for conflict resolution)
118    pub incarnation: u64,
119    /// Node metadata
120    pub metadata: HashMap<String, String>,
121    /// Number of consecutive ping failures
122    pub failure_count: u32,
123}
124
125impl Member {
126    /// Create a new member
127    pub fn new(node_id: NodeId, address: SocketAddr) -> Self {
128        Self {
129            node_id,
130            address,
131            health: NodeHealth::Alive,
132            last_seen: Utc::now(),
133            incarnation: 0,
134            metadata: HashMap::new(),
135            failure_count: 0,
136        }
137    }
138
139    /// Check if member is healthy
140    pub fn is_healthy(&self) -> bool {
141        matches!(self.health, NodeHealth::Alive)
142    }
143
144    /// Mark as seen
145    pub fn mark_seen(&mut self) {
146        self.last_seen = Utc::now();
147        self.failure_count = 0;
148        if self.health != NodeHealth::Left {
149            self.health = NodeHealth::Alive;
150        }
151    }
152
153    /// Increment failure count
154    pub fn increment_failures(&mut self) {
155        self.failure_count += 1;
156    }
157}
158
159/// Gossip configuration
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct GossipConfig {
162    /// Gossip interval in milliseconds
163    pub gossip_interval_ms: u64,
164    /// Number of nodes to gossip with per interval
165    pub gossip_fanout: usize,
166    /// Ping timeout in milliseconds
167    pub ping_timeout_ms: u64,
168    /// Number of ping failures before suspecting node
169    pub suspect_threshold: u32,
170    /// Number of indirect ping nodes
171    pub indirect_ping_nodes: usize,
172    /// Suspicion timeout in seconds
173    pub suspicion_timeout_seconds: u64,
174}
175
176impl Default for GossipConfig {
177    fn default() -> Self {
178        Self {
179            gossip_interval_ms: 1000,
180            gossip_fanout: 3,
181            ping_timeout_ms: 500,
182            suspect_threshold: 3,
183            indirect_ping_nodes: 3,
184            suspicion_timeout_seconds: 30,
185        }
186    }
187}
188
189/// Gossip-based membership protocol
190pub struct GossipMembership {
191    /// Local node ID
192    local_node_id: NodeId,
193    /// Local node address
194    local_address: SocketAddr,
195    /// Configuration
196    config: GossipConfig,
197    /// Cluster members
198    members: Arc<DashMap<NodeId, Member>>,
199    /// Membership version (incremented on changes)
200    version: Arc<RwLock<u64>>,
201    /// Pending acks
202    pending_acks: Arc<DashMap<u64, PendingAck>>,
203    /// Sequence number for messages
204    sequence: Arc<RwLock<u64>>,
205    /// Event listeners
206    event_listeners: Arc<RwLock<Vec<Box<dyn Fn(MembershipEvent) + Send + Sync>>>>,
207}
208
209/// Pending acknowledgment
210struct PendingAck {
211    target: NodeId,
212    sent_at: DateTime<Utc>,
213}
214
215impl GossipMembership {
216    /// Create a new gossip membership
217    pub fn new(node_id: NodeId, address: SocketAddr, config: GossipConfig) -> Self {
218        let members = Arc::new(DashMap::new());
219
220        // Add self to members
221        let local_member = Member::new(node_id.clone(), address);
222        members.insert(node_id.clone(), local_member);
223
224        Self {
225            local_node_id: node_id,
226            local_address: address,
227            config,
228            members,
229            version: Arc::new(RwLock::new(0)),
230            pending_acks: Arc::new(DashMap::new()),
231            sequence: Arc::new(RwLock::new(0)),
232            event_listeners: Arc::new(RwLock::new(Vec::new())),
233        }
234    }
235
236    /// Start the gossip protocol
237    pub async fn start(&self) -> Result<()> {
238        info!("Starting gossip protocol for node: {}", self.local_node_id);
239
240        // Start periodic gossip
241        let gossip_self = self.clone();
242        tokio::spawn(async move {
243            gossip_self.run_gossip_loop().await;
244        });
245
246        // Start failure detection
247        let detection_self = self.clone();
248        tokio::spawn(async move {
249            detection_self.run_failure_detection().await;
250        });
251
252        Ok(())
253    }
254
255    /// Add a seed node to join cluster
256    pub async fn join(&self, seed_address: SocketAddr) -> Result<()> {
257        info!("Joining cluster via seed: {}", seed_address);
258
259        // Send join message
260        let join_msg = GossipMessage::Join {
261            node_id: self.local_node_id.clone(),
262            address: self.local_address,
263            metadata: HashMap::new(),
264        };
265
266        // In production, send actual network message
267        // For now, just log
268        debug!("Would send join message to {}", seed_address);
269
270        Ok(())
271    }
272
273    /// Leave the cluster gracefully
274    pub async fn leave(&self) -> Result<()> {
275        info!("Leaving cluster: {}", self.local_node_id);
276
277        // Update own status
278        if let Some(mut member) = self.members.get_mut(&self.local_node_id) {
279            member.health = NodeHealth::Left;
280        }
281
282        // Broadcast leave message
283        let leave_msg = GossipMessage::Leave {
284            node_id: self.local_node_id.clone(),
285        };
286
287        self.broadcast_event(MembershipEvent::Leave {
288            node_id: self.local_node_id.clone(),
289            timestamp: Utc::now(),
290        })
291        .await;
292
293        Ok(())
294    }
295
296    /// Get all cluster members
297    pub fn get_members(&self) -> Vec<Member> {
298        self.members.iter().map(|e| e.value().clone()).collect()
299    }
300
301    /// Get healthy members only
302    pub fn get_healthy_members(&self) -> Vec<Member> {
303        self.members
304            .iter()
305            .filter(|e| e.value().is_healthy())
306            .map(|e| e.value().clone())
307            .collect()
308    }
309
310    /// Get a specific member
311    pub fn get_member(&self, node_id: &NodeId) -> Option<Member> {
312        self.members.get(node_id).map(|m| m.value().clone())
313    }
314
315    /// Handle incoming gossip message
316    pub async fn handle_message(&self, message: GossipMessage) -> Result<()> {
317        match message {
318            GossipMessage::Ping { from, sequence, .. } => self.handle_ping(from, sequence).await,
319            GossipMessage::Ack { from, sequence, .. } => self.handle_ack(from, sequence).await,
320            GossipMessage::MembershipUpdate { updates, .. } => {
321                self.handle_membership_update(updates).await
322            }
323            GossipMessage::Join {
324                node_id,
325                address,
326                metadata,
327            } => self.handle_join(node_id, address, metadata).await,
328            GossipMessage::Leave { node_id } => self.handle_leave(node_id).await,
329            _ => Ok(()),
330        }
331    }
332
333    /// Run the gossip loop
334    async fn run_gossip_loop(&self) {
335        let interval = std::time::Duration::from_millis(self.config.gossip_interval_ms);
336
337        loop {
338            tokio::time::sleep(interval).await;
339
340            // Select random members to gossip with
341            let members = self.get_healthy_members();
342            let targets: Vec<_> = members
343                .into_iter()
344                .filter(|m| m.node_id != self.local_node_id)
345                .take(self.config.gossip_fanout)
346                .collect();
347
348            for target in targets {
349                self.send_ping(target.node_id).await;
350            }
351        }
352    }
353
354    /// Run failure detection
355    async fn run_failure_detection(&self) {
356        let interval = std::time::Duration::from_secs(5);
357
358        loop {
359            tokio::time::sleep(interval).await;
360
361            let now = Utc::now();
362            let timeout = ChronoDuration::seconds(self.config.suspicion_timeout_seconds as i64);
363
364            for mut entry in self.members.iter_mut() {
365                let member = entry.value_mut();
366
367                if member.node_id == self.local_node_id {
368                    continue;
369                }
370
371                // Check if node has timed out
372                if member.health == NodeHealth::Suspect {
373                    let elapsed = now.signed_duration_since(member.last_seen);
374                    if elapsed > timeout {
375                        debug!("Marking node as dead: {}", member.node_id);
376                        member.health = NodeHealth::Dead;
377
378                        let event = MembershipEvent::Dead {
379                            node_id: member.node_id.clone(),
380                            timestamp: now,
381                        };
382
383                        self.emit_event(event);
384                    }
385                }
386            }
387        }
388    }
389
390    /// Send ping to a node
391    async fn send_ping(&self, target: NodeId) {
392        let mut seq = self.sequence.write().await;
393        *seq += 1;
394        let sequence = *seq;
395        drop(seq);
396
397        let ping = GossipMessage::Ping {
398            from: self.local_node_id.clone(),
399            sequence,
400            timestamp: Utc::now(),
401        };
402
403        // Track pending ack
404        self.pending_acks.insert(
405            sequence,
406            PendingAck {
407                target: target.clone(),
408                sent_at: Utc::now(),
409            },
410        );
411
412        debug!("Sending ping to {}", target);
413        // In production, send actual network message
414    }
415
416    /// Handle ping message
417    async fn handle_ping(&self, from: NodeId, sequence: u64) -> Result<()> {
418        debug!("Received ping from {}", from);
419
420        // Update member status
421        if let Some(mut member) = self.members.get_mut(&from) {
422            member.mark_seen();
423        }
424
425        // Send ack
426        let ack = GossipMessage::Ack {
427            from: self.local_node_id.clone(),
428            to: from,
429            sequence,
430            timestamp: Utc::now(),
431        };
432
433        // In production, send actual network message
434        Ok(())
435    }
436
437    /// Handle ack message
438    async fn handle_ack(&self, from: NodeId, sequence: u64) -> Result<()> {
439        debug!("Received ack from {}", from);
440
441        // Remove from pending
442        self.pending_acks.remove(&sequence);
443
444        // Update member status
445        if let Some(mut member) = self.members.get_mut(&from) {
446            member.mark_seen();
447        }
448
449        Ok(())
450    }
451
452    /// Handle membership update
453    async fn handle_membership_update(&self, updates: Vec<MembershipEvent>) -> Result<()> {
454        for event in updates {
455            match &event {
456                MembershipEvent::Join {
457                    node_id, address, ..
458                } => {
459                    if !self.members.contains_key(node_id) {
460                        let member = Member::new(node_id.clone(), *address);
461                        self.members.insert(node_id.clone(), member);
462                    }
463                }
464                MembershipEvent::Suspect { node_id, .. } => {
465                    if let Some(mut member) = self.members.get_mut(node_id) {
466                        member.health = NodeHealth::Suspect;
467                    }
468                }
469                MembershipEvent::Dead { node_id, .. } => {
470                    if let Some(mut member) = self.members.get_mut(node_id) {
471                        member.health = NodeHealth::Dead;
472                    }
473                }
474                _ => {}
475            }
476
477            self.emit_event(event);
478        }
479
480        Ok(())
481    }
482
483    /// Handle join request
484    async fn handle_join(
485        &self,
486        node_id: NodeId,
487        address: SocketAddr,
488        metadata: HashMap<String, String>,
489    ) -> Result<()> {
490        info!("Node joining: {}", node_id);
491
492        let mut member = Member::new(node_id.clone(), address);
493        member.metadata = metadata;
494
495        self.members.insert(node_id.clone(), member);
496
497        let event = MembershipEvent::Join {
498            node_id,
499            address,
500            timestamp: Utc::now(),
501        };
502
503        self.broadcast_event(event).await;
504
505        Ok(())
506    }
507
508    /// Handle leave notification
509    async fn handle_leave(&self, node_id: NodeId) -> Result<()> {
510        info!("Node leaving: {}", node_id);
511
512        if let Some(mut member) = self.members.get_mut(&node_id) {
513            member.health = NodeHealth::Left;
514        }
515
516        let event = MembershipEvent::Leave {
517            node_id,
518            timestamp: Utc::now(),
519        };
520
521        self.emit_event(event);
522
523        Ok(())
524    }
525
526    /// Broadcast event to all members
527    async fn broadcast_event(&self, event: MembershipEvent) {
528        let mut version = self.version.write().await;
529        *version += 1;
530        drop(version);
531
532        self.emit_event(event);
533    }
534
535    /// Emit event to listeners
536    fn emit_event(&self, event: MembershipEvent) {
537        // In production, call event listeners
538        debug!("Membership event: {:?}", event);
539    }
540
541    /// Add event listener
542    pub async fn add_listener<F>(&self, listener: F)
543    where
544        F: Fn(MembershipEvent) + Send + Sync + 'static,
545    {
546        let mut listeners = self.event_listeners.write().await;
547        listeners.push(Box::new(listener));
548    }
549
550    /// Get membership version
551    pub async fn get_version(&self) -> u64 {
552        *self.version.read().await
553    }
554}
555
556impl Clone for GossipMembership {
557    fn clone(&self) -> Self {
558        Self {
559            local_node_id: self.local_node_id.clone(),
560            local_address: self.local_address,
561            config: self.config.clone(),
562            members: Arc::clone(&self.members),
563            version: Arc::clone(&self.version),
564            pending_acks: Arc::clone(&self.pending_acks),
565            sequence: Arc::clone(&self.sequence),
566            event_listeners: Arc::clone(&self.event_listeners),
567        }
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use std::net::{IpAddr, Ipv4Addr};
575
576    fn create_test_address(port: u16) -> SocketAddr {
577        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
578    }
579
580    #[tokio::test]
581    async fn test_gossip_membership() {
582        let config = GossipConfig::default();
583        let address = create_test_address(8000);
584        let gossip = GossipMembership::new("node-1".to_string(), address, config);
585
586        assert_eq!(gossip.get_members().len(), 1);
587    }
588
589    #[tokio::test]
590    async fn test_join_leave() {
591        let config = GossipConfig::default();
592        let address1 = create_test_address(8000);
593        let address2 = create_test_address(8001);
594
595        let gossip = GossipMembership::new("node-1".to_string(), address1, config);
596
597        gossip
598            .handle_join("node-2".to_string(), address2, HashMap::new())
599            .await
600            .unwrap();
601
602        assert_eq!(gossip.get_members().len(), 2);
603
604        gossip.handle_leave("node-2".to_string()).await.unwrap();
605
606        let member = gossip.get_member(&"node-2".to_string()).unwrap();
607        assert_eq!(member.health, NodeHealth::Left);
608    }
609
610    #[test]
611    fn test_member() {
612        let address = create_test_address(8000);
613        let mut member = Member::new("test".to_string(), address);
614
615        assert!(member.is_healthy());
616
617        member.health = NodeHealth::Suspect;
618        assert!(!member.is_healthy());
619
620        member.mark_seen();
621        assert!(member.is_healthy());
622    }
623}