Skip to main content

ember_cluster/
gossip.rs

1//! SWIM gossip protocol implementation.
2//!
3//! Implements the Scalable Weakly-consistent Infection-style Membership
4//! protocol for failure detection and cluster membership management.
5//!
6//! # Protocol Overview
7//!
8//! Each protocol period:
9//! 1. Pick a random node to probe with PING
10//! 2. If no ACK within timeout, send PING-REQ to k random nodes
11//! 3. If still no ACK, mark node as SUSPECT
12//! 4. After suspicion timeout, mark as DEAD
13//! 5. Piggyback state updates on all messages
14
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::time::{Duration, Instant};
18
19use rand::prelude::IndexedRandom;
20use tokio::sync::mpsc;
21use tracing::{debug, info, trace, warn};
22
23use crate::message::{GossipMessage, MemberInfo, NodeUpdate};
24use crate::{NodeId, SlotRange};
25
26/// Configuration for the gossip protocol.
27#[derive(Debug, Clone)]
28pub struct GossipConfig {
29    /// How often to run the protocol period (probe a random node).
30    pub protocol_period: Duration,
31    /// How long to wait for a direct probe response.
32    pub probe_timeout: Duration,
33    /// Multiplier for suspicion timeout (protocol_period * suspicion_mult).
34    pub suspicion_mult: u32,
35    /// Number of nodes to ask for indirect probes.
36    pub indirect_probes: usize,
37    /// Maximum number of updates to piggyback per message.
38    pub max_piggyback: usize,
39    /// Port offset for gossip (data_port + gossip_port_offset).
40    pub gossip_port_offset: u16,
41}
42
43impl Default for GossipConfig {
44    fn default() -> Self {
45        Self {
46            protocol_period: Duration::from_secs(1),
47            probe_timeout: Duration::from_millis(500),
48            suspicion_mult: 5,
49            indirect_probes: 3,
50            max_piggyback: 10,
51            gossip_port_offset: 10000,
52        }
53    }
54}
55
56/// Internal state of a cluster member as tracked by gossip.
57#[derive(Debug, Clone)]
58pub struct MemberState {
59    pub id: NodeId,
60    pub addr: SocketAddr,
61    pub incarnation: u64,
62    pub state: MemberStatus,
63    pub state_change: Instant,
64    pub is_primary: bool,
65    pub slots: Vec<SlotRange>,
66}
67
68/// Health status of a member.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum MemberStatus {
71    Alive,
72    Suspect,
73    Dead,
74    Left,
75}
76
77/// Events emitted by the gossip engine.
78#[derive(Debug, Clone)]
79pub enum GossipEvent {
80    /// A new node joined the cluster.
81    MemberJoined(NodeId, SocketAddr),
82    /// A node is suspected to be failing.
83    MemberSuspected(NodeId),
84    /// A node has been confirmed dead.
85    MemberFailed(NodeId),
86    /// A node left gracefully.
87    MemberLeft(NodeId),
88    /// A node that was suspected is now alive.
89    MemberAlive(NodeId),
90}
91
92/// The gossip engine manages cluster membership and failure detection.
93pub struct GossipEngine {
94    /// Our node's identity.
95    local_id: NodeId,
96    /// Our advertised address.
97    local_addr: SocketAddr,
98    /// Our incarnation number (incremented to refute suspicion).
99    incarnation: u64,
100    /// Protocol configuration.
101    config: GossipConfig,
102    /// Known cluster members.
103    members: HashMap<NodeId, MemberState>,
104    /// Pending updates to piggyback on outgoing messages.
105    pending_updates: Vec<NodeUpdate>,
106    /// Sequence number for protocol messages.
107    next_seq: u64,
108    /// Pending probes awaiting acknowledgment.
109    pending_probes: HashMap<u64, PendingProbe>,
110    /// Channel for emitting events.
111    event_tx: mpsc::Sender<GossipEvent>,
112}
113
114struct PendingProbe {
115    target: NodeId,
116    sent_at: Instant,
117    indirect: bool,
118}
119
120impl GossipEngine {
121    /// Creates a new gossip engine.
122    pub fn new(
123        local_id: NodeId,
124        local_addr: SocketAddr,
125        config: GossipConfig,
126        event_tx: mpsc::Sender<GossipEvent>,
127    ) -> Self {
128        Self {
129            local_id,
130            local_addr,
131            incarnation: 1,
132            config,
133            members: HashMap::new(),
134            pending_updates: Vec::new(),
135            next_seq: 1,
136            pending_probes: HashMap::new(),
137            event_tx,
138        }
139    }
140
141    /// Returns the local node ID.
142    pub fn local_id(&self) -> NodeId {
143        self.local_id
144    }
145
146    /// Returns all known members.
147    pub fn members(&self) -> impl Iterator<Item = &MemberState> {
148        self.members.values()
149    }
150
151    /// Returns the number of alive members (excluding self).
152    pub fn alive_count(&self) -> usize {
153        self.members
154            .values()
155            .filter(|m| m.state == MemberStatus::Alive)
156            .count()
157    }
158
159    /// Adds a seed node to bootstrap cluster discovery.
160    pub fn add_seed(&mut self, id: NodeId, addr: SocketAddr) {
161        if id == self.local_id {
162            return;
163        }
164        self.members.entry(id).or_insert_with(|| MemberState {
165            id,
166            addr,
167            incarnation: 0,
168            state: MemberStatus::Alive,
169            state_change: Instant::now(),
170            is_primary: false,
171            slots: Vec::new(),
172        });
173    }
174
175    /// Handles an incoming gossip message.
176    pub async fn handle_message(
177        &mut self,
178        msg: GossipMessage,
179        from: SocketAddr,
180    ) -> Option<GossipMessage> {
181        match msg {
182            GossipMessage::Ping {
183                seq,
184                sender,
185                updates,
186            } => {
187                trace!("received ping seq={} from {}", seq, sender);
188                self.apply_updates(&updates).await;
189                self.ensure_member(sender, from);
190
191                // Reply with ACK
192                let response_updates = self.collect_updates();
193                Some(GossipMessage::Ack {
194                    seq,
195                    sender: self.local_id,
196                    updates: response_updates,
197                })
198            }
199
200            GossipMessage::PingReq {
201                seq,
202                sender,
203                target,
204                target_addr: _,
205            } => {
206                trace!(
207                    "received ping-req seq={} from {} for {}",
208                    seq,
209                    sender,
210                    target
211                );
212                self.ensure_member(sender, from);
213
214                // Forward ping to target (handled externally)
215                // For now, we just record that we might need to relay
216                None
217            }
218
219            GossipMessage::Ack {
220                seq,
221                sender,
222                updates,
223            } => {
224                trace!("received ack seq={} from {}", seq, sender);
225                self.apply_updates(&updates).await;
226                self.ensure_member(sender, from);
227
228                // Clear pending probe
229                if let Some(probe) = self.pending_probes.remove(&seq) {
230                    if self.members.get(&probe.target).map(|m| m.state)
231                        == Some(MemberStatus::Suspect)
232                    {
233                        // Node recovered from suspicion
234                        self.mark_alive(probe.target).await;
235                    }
236                }
237                None
238            }
239
240            GossipMessage::Join {
241                sender,
242                sender_addr,
243            } => {
244                info!("node {} joining from {}", sender, sender_addr);
245                self.ensure_member(sender, sender_addr);
246
247                // Broadcast alive update
248                self.queue_update(NodeUpdate::Alive {
249                    node: sender,
250                    addr: sender_addr,
251                    incarnation: 1,
252                });
253
254                // Send welcome with current members
255                let members: Vec<MemberInfo> = self
256                    .members
257                    .values()
258                    .filter(|m| m.state == MemberStatus::Alive)
259                    .map(|m| MemberInfo {
260                        id: m.id,
261                        addr: m.addr,
262                        incarnation: m.incarnation,
263                        is_primary: m.is_primary,
264                        slots: m.slots.clone(),
265                    })
266                    .collect();
267
268                Some(GossipMessage::Welcome {
269                    sender: self.local_id,
270                    members,
271                })
272            }
273
274            GossipMessage::Welcome { sender, members } => {
275                info!(
276                    "received welcome from {} with {} members",
277                    sender,
278                    members.len()
279                );
280                self.ensure_member(sender, from);
281
282                for member in members {
283                    if member.id != self.local_id {
284                        self.members
285                            .entry(member.id)
286                            .or_insert_with(|| MemberState {
287                                id: member.id,
288                                addr: member.addr,
289                                incarnation: member.incarnation,
290                                state: MemberStatus::Alive,
291                                state_change: Instant::now(),
292                                is_primary: member.is_primary,
293                                slots: member.slots,
294                            });
295                    }
296                }
297                None
298            }
299        }
300    }
301
302    /// Runs one protocol period: probe a random node.
303    pub fn tick(&mut self) -> Option<(SocketAddr, GossipMessage)> {
304        // Check for timed-out probes
305        self.check_probe_timeouts();
306
307        // Check for expired suspicions
308        self.check_suspicion_timeouts();
309
310        // Select a random alive member to probe
311        let target_info = {
312            let alive_members: Vec<_> = self
313                .members
314                .values()
315                .filter(|m| m.state == MemberStatus::Alive || m.state == MemberStatus::Suspect)
316                .map(|m| (m.id, m.addr))
317                .collect();
318
319            if alive_members.is_empty() {
320                return None;
321            }
322
323            *alive_members.choose(&mut rand::rng())?
324        };
325
326        let (target_id, target_addr) = target_info;
327        let seq = self.next_seq;
328        self.next_seq += 1;
329
330        let updates = self.collect_updates();
331        let msg = GossipMessage::Ping {
332            seq,
333            sender: self.local_id,
334            updates,
335        };
336
337        self.pending_probes.insert(
338            seq,
339            PendingProbe {
340                target: target_id,
341                sent_at: Instant::now(),
342                indirect: false,
343            },
344        );
345
346        Some((target_addr, msg))
347    }
348
349    /// Creates a join message to send to a seed node.
350    pub fn create_join_message(&self) -> GossipMessage {
351        GossipMessage::Join {
352            sender: self.local_id,
353            sender_addr: self.local_addr,
354        }
355    }
356
357    fn ensure_member(&mut self, id: NodeId, addr: SocketAddr) {
358        if id == self.local_id {
359            return;
360        }
361        self.members.entry(id).or_insert_with(|| MemberState {
362            id,
363            addr,
364            incarnation: 0,
365            state: MemberStatus::Alive,
366            state_change: Instant::now(),
367            is_primary: false,
368            slots: Vec::new(),
369        });
370    }
371
372    async fn apply_updates(&mut self, updates: &[NodeUpdate]) {
373        for update in updates {
374            match update {
375                NodeUpdate::Alive {
376                    node,
377                    addr,
378                    incarnation,
379                } => {
380                    if *node == self.local_id {
381                        // Someone thinks we're alive, good
382                        continue;
383                    }
384                    if let Some(member) = self.members.get_mut(node) {
385                        if *incarnation > member.incarnation {
386                            member.incarnation = *incarnation;
387                            member.addr = *addr;
388                            if member.state != MemberStatus::Alive {
389                                member.state = MemberStatus::Alive;
390                                member.state_change = Instant::now();
391                                if self
392                                    .event_tx
393                                    .send(GossipEvent::MemberAlive(*node))
394                                    .await
395                                    .is_err()
396                                {
397                                    warn!("event channel closed, cannot send MemberAlive event");
398                                }
399                            }
400                        }
401                    } else {
402                        self.members.insert(
403                            *node,
404                            MemberState {
405                                id: *node,
406                                addr: *addr,
407                                incarnation: *incarnation,
408                                state: MemberStatus::Alive,
409                                state_change: Instant::now(),
410                                is_primary: false,
411                                slots: Vec::new(),
412                            },
413                        );
414                        let _ = self
415                            .event_tx
416                            .send(GossipEvent::MemberJoined(*node, *addr))
417                            .await;
418                    }
419                }
420
421                NodeUpdate::Suspect { node, incarnation } => {
422                    if *node == self.local_id {
423                        // Refute suspicion by incrementing our incarnation
424                        if *incarnation >= self.incarnation {
425                            self.incarnation = incarnation + 1;
426                            self.queue_update(NodeUpdate::Alive {
427                                node: self.local_id,
428                                addr: self.local_addr,
429                                incarnation: self.incarnation,
430                            });
431                        }
432                        continue;
433                    }
434                    if let Some(member) = self.members.get_mut(node) {
435                        if *incarnation >= member.incarnation && member.state == MemberStatus::Alive
436                        {
437                            member.state = MemberStatus::Suspect;
438                            member.state_change = Instant::now();
439                            let _ = self
440                                .event_tx
441                                .send(GossipEvent::MemberSuspected(*node))
442                                .await;
443                        }
444                    }
445                }
446
447                NodeUpdate::Dead { node, incarnation } => {
448                    if *node == self.local_id {
449                        // Refute death claim
450                        self.incarnation = incarnation + 1;
451                        self.queue_update(NodeUpdate::Alive {
452                            node: self.local_id,
453                            addr: self.local_addr,
454                            incarnation: self.incarnation,
455                        });
456                        continue;
457                    }
458                    if let Some(member) = self.members.get_mut(node) {
459                        if *incarnation >= member.incarnation && member.state != MemberStatus::Dead
460                        {
461                            member.state = MemberStatus::Dead;
462                            member.state_change = Instant::now();
463                            if self
464                                .event_tx
465                                .send(GossipEvent::MemberFailed(*node))
466                                .await
467                                .is_err()
468                            {
469                                warn!("event channel closed, cannot send MemberFailed event");
470                            }
471                        }
472                    }
473                }
474
475                NodeUpdate::Left { node } => {
476                    if *node == self.local_id {
477                        continue;
478                    }
479                    if let Some(member) = self.members.get_mut(node) {
480                        if member.state != MemberStatus::Left {
481                            member.state = MemberStatus::Left;
482                            member.state_change = Instant::now();
483                            if self
484                                .event_tx
485                                .send(GossipEvent::MemberLeft(*node))
486                                .await
487                                .is_err()
488                            {
489                                warn!("event channel closed, cannot send MemberLeft event");
490                            }
491                        }
492                    }
493                }
494            }
495        }
496    }
497
498    async fn mark_alive(&mut self, node: NodeId) {
499        if let Some(member) = self.members.get_mut(&node) {
500            if member.state == MemberStatus::Suspect {
501                member.state = MemberStatus::Alive;
502                member.state_change = Instant::now();
503                if self
504                    .event_tx
505                    .send(GossipEvent::MemberAlive(node))
506                    .await
507                    .is_err()
508                {
509                    warn!("event channel closed, cannot send MemberAlive event");
510                }
511            }
512        }
513    }
514
515    fn check_probe_timeouts(&mut self) {
516        let timeout = self.config.probe_timeout;
517        let now = Instant::now();
518
519        // Collect timed out probes first
520        let timed_out: Vec<_> = self
521            .pending_probes
522            .iter()
523            .filter(|(_, probe)| now.duration_since(probe.sent_at) > timeout && !probe.indirect)
524            .map(|(seq, probe)| (*seq, probe.target))
525            .collect();
526
527        // Now process them
528        for (seq, target) in timed_out {
529            self.pending_probes.remove(&seq);
530
531            // Get incarnation before mutating
532            let incarnation = self
533                .members
534                .get(&target)
535                .filter(|m| m.state == MemberStatus::Alive)
536                .map(|m| m.incarnation);
537
538            if let Some(inc) = incarnation {
539                if let Some(member) = self.members.get_mut(&target) {
540                    debug!("node {} failed to respond, marking suspect", target);
541                    member.state = MemberStatus::Suspect;
542                    member.state_change = Instant::now();
543                }
544                self.queue_update(NodeUpdate::Suspect {
545                    node: target,
546                    incarnation: inc,
547                });
548            }
549        }
550    }
551
552    fn check_suspicion_timeouts(&mut self) {
553        let suspicion_timeout = self.config.protocol_period * self.config.suspicion_mult;
554        let now = Instant::now();
555        let mut to_mark_dead = Vec::new();
556
557        for member in self.members.values() {
558            if member.state == MemberStatus::Suspect
559                && now.duration_since(member.state_change) > suspicion_timeout
560            {
561                to_mark_dead.push((member.id, member.incarnation));
562            }
563        }
564
565        for (id, incarnation) in to_mark_dead {
566            if let Some(member) = self.members.get_mut(&id) {
567                warn!("node {} confirmed dead after suspicion timeout", id);
568                member.state = MemberStatus::Dead;
569                member.state_change = Instant::now();
570                self.queue_update(NodeUpdate::Dead {
571                    node: id,
572                    incarnation,
573                });
574            }
575        }
576    }
577
578    fn queue_update(&mut self, update: NodeUpdate) {
579        self.pending_updates.push(update);
580        // Keep bounded
581        if self.pending_updates.len() > self.config.max_piggyback * 2 {
582            self.pending_updates.drain(0..self.config.max_piggyback);
583        }
584    }
585
586    fn collect_updates(&mut self) -> Vec<NodeUpdate> {
587        let count = self.pending_updates.len().min(self.config.max_piggyback);
588        self.pending_updates.drain(0..count).collect()
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use std::net::Ipv4Addr;
596
597    fn test_addr(port: u16) -> SocketAddr {
598        SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), port))
599    }
600
601    #[tokio::test]
602    async fn engine_creation() {
603        let (tx, _rx) = mpsc::channel(16);
604        let engine = GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
605        assert_eq!(engine.alive_count(), 0);
606    }
607
608    #[tokio::test]
609    async fn add_seed() {
610        let (tx, _rx) = mpsc::channel(16);
611        let mut engine =
612            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
613
614        let seed_id = NodeId::new();
615        engine.add_seed(seed_id, test_addr(6380));
616        assert_eq!(engine.alive_count(), 1);
617    }
618
619    #[tokio::test]
620    async fn handle_ping() {
621        let (tx, _rx) = mpsc::channel(16);
622        let mut engine =
623            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
624
625        let sender = NodeId::new();
626        let msg = GossipMessage::Ping {
627            seq: 1,
628            sender,
629            updates: vec![],
630        };
631
632        let response = engine.handle_message(msg, test_addr(6380)).await;
633        assert!(matches!(response, Some(GossipMessage::Ack { .. })));
634        assert_eq!(engine.alive_count(), 1);
635    }
636
637    #[tokio::test]
638    async fn handle_join() {
639        let (tx, _rx) = mpsc::channel(16);
640        let mut engine =
641            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
642
643        let joiner = NodeId::new();
644        let msg = GossipMessage::Join {
645            sender: joiner,
646            sender_addr: test_addr(6380),
647        };
648
649        let response = engine.handle_message(msg, test_addr(6380)).await;
650        assert!(matches!(response, Some(GossipMessage::Welcome { .. })));
651        assert_eq!(engine.alive_count(), 1);
652    }
653
654    #[tokio::test]
655    async fn tick_with_no_members() {
656        let (tx, _rx) = mpsc::channel(16);
657        let mut engine =
658            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
659
660        let probe = engine.tick();
661        assert!(probe.is_none());
662    }
663
664    #[tokio::test]
665    async fn tick_with_members() {
666        let (tx, _rx) = mpsc::channel(16);
667        let mut engine =
668            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
669
670        engine.add_seed(NodeId::new(), test_addr(6380));
671        let probe = engine.tick();
672        assert!(probe.is_some());
673
674        let (addr, msg) = probe.unwrap();
675        assert_eq!(addr.port(), 6380);
676        assert!(matches!(msg, GossipMessage::Ping { .. }));
677    }
678
679    #[tokio::test]
680    async fn create_join_message() {
681        let (tx, _rx) = mpsc::channel(16);
682        let id = NodeId::new();
683        let addr = test_addr(6379);
684        let engine = GossipEngine::new(id, addr, GossipConfig::default(), tx);
685
686        let msg = engine.create_join_message();
687        match msg {
688            GossipMessage::Join {
689                sender,
690                sender_addr,
691            } => {
692                assert_eq!(sender, id);
693                assert_eq!(sender_addr, addr);
694            }
695            _ => panic!("expected Join message"),
696        }
697    }
698}