Skip to main content

ember_cluster/
gossip.rs

1//! SWIM gossip protocol implementation.
2//!
3//! Implements the Scalable Weakly-consistent Infection-style Membership
4//! protocol for failure detection and cluster membership management.
5//!
6//! # Protocol Overview
7//!
8//! Each protocol period:
9//! 1. Pick a random node to probe with PING
10//! 2. If no ACK within timeout, send PING-REQ to k random nodes
11//! 3. If still no ACK, mark node as SUSPECT
12//! 4. After suspicion timeout, mark as DEAD
13//! 5. Piggyback state updates on all messages
14
15use std::collections::{HashMap, VecDeque};
16use std::net::SocketAddr;
17use std::time::{Duration, Instant};
18
19use rand::prelude::IndexedRandom;
20use tokio::sync::mpsc;
21use tracing::{debug, info, trace, warn};
22
23use crate::message::{GossipMessage, MemberInfo, NodeUpdate};
24use crate::{NodeId, SlotRange};
25
26/// Maximum allowed incarnation value. Rejects gossip updates with
27/// incarnation numbers beyond this to prevent a malicious node from
28/// sending u64::MAX and permanently disabling suspicion refutation.
29const MAX_INCARNATION: u64 = u64::MAX / 2;
30
31/// Maximum allowed single-hop incarnation jump. A legitimate node increments
32/// its incarnation by 1 when refuting suspicion, so jumps larger than this
33/// threshold indicate either a bug or a DoS attempt trying to exhaust the
34/// incarnation space.
35const MAX_INCARNATION_JUMP: u64 = 1000;
36
37/// Configuration for the gossip protocol.
38#[derive(Debug, Clone)]
39pub struct GossipConfig {
40    /// How often to run the protocol period (probe a random node).
41    pub protocol_period: Duration,
42    /// How long to wait for a direct probe response.
43    pub probe_timeout: Duration,
44    /// Multiplier for suspicion timeout (protocol_period * suspicion_mult).
45    pub suspicion_mult: u32,
46    /// Number of nodes to ask for indirect probes.
47    pub indirect_probes: usize,
48    /// Maximum number of updates to piggyback per message.
49    pub max_piggyback: usize,
50    /// Port offset for gossip (data_port + gossip_port_offset).
51    pub gossip_port_offset: u16,
52}
53
54impl Default for GossipConfig {
55    fn default() -> Self {
56        Self {
57            protocol_period: Duration::from_secs(1),
58            probe_timeout: Duration::from_millis(500),
59            suspicion_mult: 5,
60            indirect_probes: 3,
61            max_piggyback: 10,
62            gossip_port_offset: 10000,
63        }
64    }
65}
66
67/// Internal state of a cluster member as tracked by gossip.
68#[derive(Debug, Clone)]
69pub struct MemberState {
70    pub id: NodeId,
71    pub addr: SocketAddr,
72    pub incarnation: u64,
73    pub state: MemberStatus,
74    pub state_change: Instant,
75    pub is_primary: bool,
76    /// The primary this member replicates from, if it is a replica.
77    pub replicates: Option<NodeId>,
78    pub slots: Vec<SlotRange>,
79}
80
81/// Health status of a member.
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum MemberStatus {
84    Alive,
85    Suspect,
86    Dead,
87    Left,
88}
89
90/// Events emitted by the gossip engine.
91#[derive(Debug, Clone)]
92pub enum GossipEvent {
93    /// A new node joined the cluster.
94    MemberJoined(NodeId, SocketAddr, Vec<SlotRange>),
95    /// A node is suspected to be failing.
96    MemberSuspected(NodeId),
97    /// A node has been confirmed dead.
98    MemberFailed(NodeId),
99    /// A node left gracefully.
100    MemberLeft(NodeId),
101    /// A node that was suspected is now alive.
102    MemberAlive(NodeId),
103    /// A node's slot ownership changed.
104    SlotsChanged(NodeId, Vec<SlotRange>),
105    /// A node's role changed. Fields: node ID, is_primary, replicates.
106    RoleChanged(NodeId, bool, Option<NodeId>),
107    /// A replica requested votes for a failover election.
108    VoteRequested {
109        candidate: NodeId,
110        epoch: u64,
111        /// Candidate's replication offset at the time of the request.
112        offset: u64,
113    },
114    /// A primary granted its vote to a candidate.
115    VoteGranted {
116        from: NodeId,
117        candidate: NodeId,
118        epoch: u64,
119    },
120}
121
122/// The gossip engine manages cluster membership and failure detection.
123pub struct GossipEngine {
124    /// Our node's identity.
125    local_id: NodeId,
126    /// Our advertised address.
127    local_addr: SocketAddr,
128    /// Our incarnation number (incremented to refute suspicion).
129    incarnation: u64,
130    /// Protocol configuration.
131    config: GossipConfig,
132    /// Known cluster members.
133    members: HashMap<NodeId, MemberState>,
134    /// Pending updates to piggyback on outgoing messages.
135    /// VecDeque gives O(1) front drain instead of the O(n) shift that
136    /// Vec::drain(0..n) requires.
137    pending_updates: VecDeque<NodeUpdate>,
138    /// Sequence number for protocol messages.
139    next_seq: u64,
140    /// Pending probes awaiting acknowledgment.
141    pending_probes: HashMap<u64, PendingProbe>,
142    /// Channel for emitting events.
143    event_tx: mpsc::Sender<GossipEvent>,
144    /// Slot ranges owned by the local node, included in Welcome replies.
145    local_slots: Vec<SlotRange>,
146    /// Active PingReq relays waiting for an Ack from the target.
147    relay_pending: HashMap<u64, RelayEntry>,
148}
149
150struct PendingProbe {
151    target: NodeId,
152    sent_at: Instant,
153    indirect: bool,
154}
155
156/// Tracks a PingReq relay in progress.
157///
158/// When we forward a Ping on behalf of another node (via PingReq), we
159/// store this entry so we can relay the Ack back to the original requester.
160struct RelayEntry {
161    requester: SocketAddr,
162    original_seq: u64,
163    sent_at: Instant,
164}
165
166impl GossipEngine {
167    /// Creates a new gossip engine.
168    pub fn new(
169        local_id: NodeId,
170        local_addr: SocketAddr,
171        config: GossipConfig,
172        event_tx: mpsc::Sender<GossipEvent>,
173    ) -> Self {
174        Self {
175            local_id,
176            local_addr,
177            incarnation: 1,
178            config,
179            members: HashMap::new(),
180            pending_updates: VecDeque::new(),
181            next_seq: 1,
182            pending_probes: HashMap::new(),
183            event_tx,
184            local_slots: Vec::new(),
185            relay_pending: HashMap::new(),
186        }
187    }
188
189    /// Returns the local node ID.
190    pub fn local_id(&self) -> NodeId {
191        self.local_id
192    }
193
194    /// Returns the local node's incarnation number.
195    pub fn local_incarnation(&self) -> u64 {
196        self.incarnation
197    }
198
199    /// Restores the incarnation number from a previous session.
200    ///
201    /// Used when loading persisted config so the node doesn't regress
202    /// to a lower incarnation, which would make it lose suspicion refutations.
203    pub fn set_incarnation(&mut self, n: u64) {
204        self.incarnation = n;
205    }
206
207    /// Returns all known members.
208    pub fn members(&self) -> impl Iterator<Item = &MemberState> {
209        self.members.values()
210    }
211
212    /// Returns gossip addresses for all currently alive members.
213    pub fn alive_member_addrs(&self) -> Vec<std::net::SocketAddr> {
214        self.members
215            .values()
216            .filter(|m| m.state == MemberStatus::Alive || m.state == MemberStatus::Suspect)
217            .map(|m| m.addr)
218            .collect()
219    }
220
221    /// Returns the number of alive members (excluding self).
222    pub fn alive_count(&self) -> usize {
223        self.members
224            .values()
225            .filter(|m| m.state == MemberStatus::Alive)
226            .count()
227    }
228
229    /// Updates the local node's slot ownership.
230    ///
231    /// Called after ADDSLOTS/DELSLOTS/SETSLOT NODE to keep the gossip
232    /// engine's view in sync. The updated slots are included in Welcome
233    /// replies so joining nodes learn the full slot map.
234    pub fn set_local_slots(&mut self, slots: Vec<SlotRange>) {
235        self.local_slots = slots;
236    }
237
238    /// Queues a slot ownership update for gossip propagation.
239    ///
240    /// The update will be piggybacked on the next outgoing Ping or Ack
241    /// message, spreading to the cluster via epidemic dissemination.
242    pub fn queue_slots_update(&mut self, node: NodeId, incarnation: u64, slots: Vec<SlotRange>) {
243        self.queue_update(NodeUpdate::SlotsChanged {
244            node,
245            incarnation,
246            slots,
247        });
248    }
249
250    /// Queues a role change for gossip propagation.
251    ///
252    /// Called after this node changes from primary to replica (or vice versa).
253    /// The update will be piggybacked on the next outgoing Ping or Ack.
254    pub fn queue_role_update(
255        &mut self,
256        node: NodeId,
257        incarnation: u64,
258        is_primary: bool,
259        replicates: Option<NodeId>,
260    ) {
261        self.queue_update(NodeUpdate::RoleChanged {
262            node,
263            incarnation,
264            is_primary,
265            replicates,
266        });
267    }
268
269    /// Queues a vote request for gossip propagation.
270    ///
271    /// Called by a replica that is starting an automatic failover election.
272    /// The update will be piggybacked on the next outgoing Ping or Ack.
273    pub fn queue_vote_request(&mut self, candidate: NodeId, epoch: u64, offset: u64) {
274        self.queue_update(NodeUpdate::VoteRequest {
275            candidate,
276            epoch,
277            offset,
278        });
279    }
280
281    /// Queues a vote grant for gossip propagation.
282    ///
283    /// Called by a primary that has decided to vote for the given candidate.
284    /// The update will be piggybacked on the next outgoing Ping or Ack.
285    pub fn queue_vote_granted(&mut self, from: NodeId, candidate: NodeId, epoch: u64) {
286        self.queue_update(NodeUpdate::VoteGranted {
287            from,
288            candidate,
289            epoch,
290        });
291    }
292
293    /// Adds a seed node to bootstrap cluster discovery.
294    pub fn add_seed(&mut self, id: NodeId, addr: SocketAddr) {
295        if id == self.local_id {
296            return;
297        }
298        self.members.entry(id).or_insert_with(|| MemberState {
299            id,
300            addr,
301            incarnation: 0,
302            state: MemberStatus::Alive,
303            state_change: Instant::now(),
304            is_primary: false,
305            replicates: None,
306            slots: Vec::new(),
307        });
308    }
309
310    /// Handles an incoming gossip message.
311    ///
312    /// Returns a list of `(address, message)` pairs to send. Most messages
313    /// produce a single reply back to `from`, but PingReq forwards a Ping
314    /// to a different host, and relayed Acks route back to the original
315    /// requester.
316    pub async fn handle_message(
317        &mut self,
318        msg: GossipMessage,
319        from: SocketAddr,
320    ) -> Vec<(SocketAddr, GossipMessage)> {
321        match msg {
322            GossipMessage::Ping {
323                seq,
324                sender,
325                updates,
326            } => {
327                trace!("received ping seq={} from {}", seq, sender);
328                self.apply_updates(&updates).await;
329                self.ensure_member(sender, from);
330
331                // Reply with ACK
332                let response_updates = self.collect_updates();
333                vec![(
334                    from,
335                    GossipMessage::Ack {
336                        seq,
337                        sender: self.local_id,
338                        updates: response_updates,
339                    },
340                )]
341            }
342
343            GossipMessage::PingReq {
344                seq,
345                sender,
346                target,
347                target_addr,
348            } => {
349                trace!(
350                    "received ping-req seq={} from {} for {}",
351                    seq,
352                    sender,
353                    target
354                );
355                self.ensure_member(sender, from);
356
357                // forward a fresh Ping to the target on behalf of the requester
358                let relay_seq = self.next_seq;
359                self.next_seq += 1;
360
361                self.relay_pending.insert(
362                    relay_seq,
363                    RelayEntry {
364                        requester: from,
365                        original_seq: seq,
366                        sent_at: Instant::now(),
367                    },
368                );
369
370                vec![(
371                    target_addr,
372                    GossipMessage::Ping {
373                        seq: relay_seq,
374                        sender: self.local_id,
375                        updates: vec![],
376                    },
377                )]
378            }
379
380            GossipMessage::Ack {
381                seq,
382                sender,
383                updates,
384            } => {
385                trace!("received ack seq={} from {}", seq, sender);
386                self.apply_updates(&updates).await;
387                self.ensure_member(sender, from);
388
389                let mut outgoing = Vec::new();
390
391                // Clear pending probe
392                if let Some(probe) = self.pending_probes.remove(&seq) {
393                    if self.members.get(&probe.target).map(|m| m.state)
394                        == Some(MemberStatus::Suspect)
395                    {
396                        // Node recovered from suspicion
397                        self.mark_alive(probe.target).await;
398                    }
399                }
400
401                // Check if this is a relayed Ack — forward it back to the requester
402                if let Some(relay) = self.relay_pending.remove(&seq) {
403                    outgoing.push((
404                        relay.requester,
405                        GossipMessage::Ack {
406                            seq: relay.original_seq,
407                            sender: self.local_id,
408                            updates: vec![],
409                        },
410                    ));
411                }
412
413                outgoing
414            }
415
416            GossipMessage::Join {
417                sender,
418                sender_addr,
419            } => {
420                info!("node {} joining from {}", sender, sender_addr);
421                let sender_is_new = !self.members.contains_key(&sender);
422                self.ensure_member(sender, sender_addr);
423                if sender_is_new {
424                    self.emit(GossipEvent::MemberJoined(sender, sender_addr, Vec::new()))
425                        .await;
426                }
427
428                // Broadcast alive update
429                self.queue_update(NodeUpdate::Alive {
430                    node: sender,
431                    addr: sender_addr,
432                    incarnation: 1,
433                });
434
435                // send welcome with current members, including ourselves
436                let mut members: Vec<MemberInfo> = self
437                    .members
438                    .values()
439                    .filter(|m| m.state == MemberStatus::Alive)
440                    .map(|m| MemberInfo {
441                        id: m.id,
442                        addr: m.addr,
443                        incarnation: m.incarnation,
444                        is_primary: m.is_primary,
445                        slots: m.slots.clone(),
446                    })
447                    .collect();
448
449                // include our own slot info so the joiner learns the full map
450                members.push(MemberInfo {
451                    id: self.local_id,
452                    addr: self.local_addr,
453                    incarnation: self.incarnation,
454                    is_primary: true,
455                    slots: self.local_slots.clone(),
456                });
457
458                vec![(
459                    from,
460                    GossipMessage::Welcome {
461                        sender: self.local_id,
462                        members,
463                    },
464                )]
465            }
466
467            GossipMessage::Welcome { sender, members } => {
468                info!(
469                    "received welcome from {} with {} members",
470                    sender,
471                    members.len()
472                );
473                let sender_is_new = !self.members.contains_key(&sender);
474                self.ensure_member(sender, from);
475                if sender_is_new {
476                    let sender_slots = self
477                        .members
478                        .get(&sender)
479                        .map(|m| m.slots.clone())
480                        .unwrap_or_default();
481                    self.emit(GossipEvent::MemberJoined(sender, from, sender_slots))
482                        .await;
483                }
484
485                for member in members {
486                    if member.id == self.local_id {
487                        continue;
488                    }
489                    if let std::collections::hash_map::Entry::Vacant(e) =
490                        self.members.entry(member.id)
491                    {
492                        let slots = member.slots.clone();
493                        e.insert(MemberState {
494                            id: member.id,
495                            addr: member.addr,
496                            incarnation: member.incarnation,
497                            state: MemberStatus::Alive,
498                            state_change: Instant::now(),
499                            is_primary: member.is_primary,
500                            replicates: None,
501                            slots: slots.clone(),
502                        });
503                        self.emit(GossipEvent::MemberJoined(member.id, member.addr, slots))
504                            .await;
505                    }
506                }
507                vec![]
508            }
509
510            GossipMessage::SlotsAnnounce {
511                sender,
512                incarnation,
513                slots,
514            } => {
515                // Treat as a piggybacked SlotsChanged update from the sender.
516                self.ensure_member(sender, from);
517                self.apply_updates(&[NodeUpdate::SlotsChanged {
518                    node: sender,
519                    incarnation,
520                    slots,
521                }])
522                .await;
523                vec![]
524            }
525        }
526    }
527
528    /// Runs one protocol period: probe a random node.
529    ///
530    /// Returns all messages to send this tick: the direct probe plus any
531    /// PingReq messages generated by timed-out direct probes.
532    pub fn tick(&mut self) -> Vec<(SocketAddr, GossipMessage)> {
533        let mut outgoing = Vec::new();
534
535        // Check for timed-out probes (may generate PingReq messages)
536        outgoing.extend(self.check_probe_timeouts());
537
538        // Check for expired suspicions
539        self.check_suspicion_timeouts();
540
541        // Clean up stale relay entries
542        self.cleanup_stale_relays();
543
544        // Select a random alive member to probe
545        let target_info = {
546            let alive_members: Vec<_> = self
547                .members
548                .values()
549                .filter(|m| m.state == MemberStatus::Alive || m.state == MemberStatus::Suspect)
550                .map(|m| (m.id, m.addr))
551                .collect();
552
553            if alive_members.is_empty() {
554                return outgoing;
555            }
556
557            match alive_members.choose(&mut rand::rng()) {
558                Some(info) => *info,
559                None => return outgoing,
560            }
561        };
562
563        let (target_id, target_addr) = target_info;
564        let seq = self.next_seq;
565        self.next_seq += 1;
566
567        let mut updates = self.collect_updates();
568
569        // Always piggyback local slot state so peers converge even if they
570        // missed the one-shot SlotsChanged queued by broadcast_local_slots.
571        if !self.local_slots.is_empty() {
572            updates.push(NodeUpdate::SlotsChanged {
573                node: self.local_id,
574                incarnation: self.incarnation,
575                slots: self.local_slots.clone(),
576            });
577        }
578
579        let msg = GossipMessage::Ping {
580            seq,
581            sender: self.local_id,
582            updates,
583        };
584
585        self.pending_probes.insert(
586            seq,
587            PendingProbe {
588                target: target_id,
589                sent_at: Instant::now(),
590                indirect: false,
591            },
592        );
593
594        outgoing.push((target_addr, msg));
595        outgoing
596    }
597
598    /// Creates a join message to send to a seed node.
599    pub fn create_join_message(&self) -> GossipMessage {
600        GossipMessage::Join {
601            sender: self.local_id,
602            sender_addr: self.local_addr,
603        }
604    }
605
606    fn ensure_member(&mut self, id: NodeId, addr: SocketAddr) {
607        if id == self.local_id {
608            return;
609        }
610        self.members.entry(id).or_insert_with(|| MemberState {
611            id,
612            addr,
613            incarnation: 0,
614            state: MemberStatus::Alive,
615            state_change: Instant::now(),
616            is_primary: false,
617            replicates: None,
618            slots: Vec::new(),
619        });
620    }
621
622    /// Sends a gossip event to the external event channel.
623    ///
624    /// Logs a warning when the channel is closed (receiver dropped). This
625    /// normally only happens during shutdown, so seeing the message in steady
626    /// state indicates a bug in the event consumer.
627    async fn emit(&self, event: GossipEvent) {
628        if self.event_tx.send(event).await.is_err() {
629            warn!("gossip event channel closed, dropping event");
630        }
631    }
632
633    async fn apply_updates(&mut self, updates: &[NodeUpdate]) {
634        for update in updates {
635            match update {
636                NodeUpdate::Alive {
637                    node,
638                    addr,
639                    incarnation,
640                } => {
641                    if *incarnation > MAX_INCARNATION {
642                        warn!(
643                            "rejecting alive update for {} with excessive incarnation {}",
644                            node, incarnation
645                        );
646                        continue;
647                    }
648                    if *node == self.local_id {
649                        // Someone thinks we're alive, good
650                        continue;
651                    }
652                    if let Some(member) = self.members.get_mut(node) {
653                        if *incarnation > member.incarnation {
654                            let jump = incarnation - member.incarnation;
655                            if jump > MAX_INCARNATION_JUMP {
656                                warn!(
657                                    "rejecting alive update for {}: incarnation jump {} exceeds limit",
658                                    node, jump
659                                );
660                                continue;
661                            }
662                            member.incarnation = *incarnation;
663                            member.addr = *addr;
664                            if member.state != MemberStatus::Alive {
665                                member.state = MemberStatus::Alive;
666                                member.state_change = Instant::now();
667                                self.emit(GossipEvent::MemberAlive(*node)).await;
668                            }
669                        }
670                    } else {
671                        self.members.insert(
672                            *node,
673                            MemberState {
674                                id: *node,
675                                addr: *addr,
676                                incarnation: *incarnation,
677                                state: MemberStatus::Alive,
678                                state_change: Instant::now(),
679                                is_primary: false,
680                                replicates: None,
681                                slots: Vec::new(),
682                            },
683                        );
684                        self.emit(GossipEvent::MemberJoined(*node, *addr, Vec::new()))
685                            .await;
686                    }
687                }
688
689                NodeUpdate::Suspect { node, incarnation } => {
690                    if *incarnation > MAX_INCARNATION {
691                        warn!(
692                            "rejecting suspect update for {} with excessive incarnation {}",
693                            node, incarnation
694                        );
695                        continue;
696                    }
697                    if *node == self.local_id {
698                        // Refute suspicion by incrementing our incarnation
699                        if *incarnation >= self.incarnation {
700                            self.incarnation = incarnation.saturating_add(1);
701                            self.queue_update(NodeUpdate::Alive {
702                                node: self.local_id,
703                                addr: self.local_addr,
704                                incarnation: self.incarnation,
705                            });
706                        }
707                        continue;
708                    }
709                    if let Some(member) = self.members.get_mut(node) {
710                        if *incarnation >= member.incarnation && member.state == MemberStatus::Alive
711                        {
712                            member.state = MemberStatus::Suspect;
713                            member.state_change = Instant::now();
714                            self.emit(GossipEvent::MemberSuspected(*node)).await;
715                        }
716                    }
717                }
718
719                NodeUpdate::Dead { node, incarnation } => {
720                    if *incarnation > MAX_INCARNATION {
721                        warn!(
722                            "rejecting dead update for {} with excessive incarnation {}",
723                            node, incarnation
724                        );
725                        continue;
726                    }
727                    if *node == self.local_id {
728                        // Refute death claim
729                        self.incarnation = incarnation.saturating_add(1);
730                        self.queue_update(NodeUpdate::Alive {
731                            node: self.local_id,
732                            addr: self.local_addr,
733                            incarnation: self.incarnation,
734                        });
735                        continue;
736                    }
737                    if let Some(member) = self.members.get_mut(node) {
738                        if *incarnation >= member.incarnation && member.state != MemberStatus::Dead
739                        {
740                            member.state = MemberStatus::Dead;
741                            member.state_change = Instant::now();
742                            self.emit(GossipEvent::MemberFailed(*node)).await;
743                        }
744                    }
745                }
746
747                NodeUpdate::Left { node } => {
748                    if *node == self.local_id {
749                        continue;
750                    }
751                    if let Some(member) = self.members.get_mut(node) {
752                        if member.state != MemberStatus::Left {
753                            member.state = MemberStatus::Left;
754                            member.state_change = Instant::now();
755                            self.emit(GossipEvent::MemberLeft(*node)).await;
756                        }
757                    }
758                }
759
760                NodeUpdate::SlotsChanged {
761                    node,
762                    incarnation,
763                    slots,
764                } => {
765                    if *incarnation > MAX_INCARNATION {
766                        warn!(
767                            "rejecting slots update for {} with excessive incarnation {}",
768                            node, incarnation
769                        );
770                        continue;
771                    }
772                    if *node == self.local_id {
773                        continue;
774                    }
775                    // clone the slot list once before the mutable borrow so we
776                    // can move it into the event after the borrow is released
777                    let owned = slots.clone();
778                    let should_emit = if let Some(member) = self.members.get_mut(node) {
779                        if *incarnation >= member.incarnation {
780                            member.slots = owned.clone();
781                            true
782                        } else {
783                            false
784                        }
785                    } else {
786                        false
787                    };
788                    if should_emit {
789                        self.emit(GossipEvent::SlotsChanged(*node, owned)).await;
790                    }
791                }
792
793                NodeUpdate::RoleChanged {
794                    node,
795                    incarnation,
796                    is_primary,
797                    replicates,
798                } => {
799                    if *incarnation > MAX_INCARNATION {
800                        warn!(
801                            "rejecting role update for {} with excessive incarnation {}",
802                            node, incarnation
803                        );
804                        continue;
805                    }
806                    if *node == self.local_id {
807                        // we know our own role
808                        continue;
809                    }
810                    if let Some(member) = self.members.get_mut(node) {
811                        if *incarnation > member.incarnation {
812                            member.incarnation = *incarnation;
813                            member.is_primary = *is_primary;
814                            member.replicates = *replicates;
815                            self.emit(GossipEvent::RoleChanged(*node, *is_primary, *replicates))
816                                .await;
817                        }
818                    }
819                }
820
821                NodeUpdate::VoteRequest {
822                    candidate,
823                    epoch,
824                    offset,
825                } => {
826                    // Relay to the server layer to decide whether to grant.
827                    // No incarnation check needed: epoch ordering is handled upstream.
828                    if *candidate != self.local_id {
829                        self.emit(GossipEvent::VoteRequested {
830                            candidate: *candidate,
831                            epoch: *epoch,
832                            offset: *offset,
833                        })
834                        .await;
835                    }
836                }
837
838                NodeUpdate::VoteGranted {
839                    from,
840                    candidate,
841                    epoch,
842                } => {
843                    self.emit(GossipEvent::VoteGranted {
844                        from: *from,
845                        candidate: *candidate,
846                        epoch: *epoch,
847                    })
848                    .await;
849                }
850            }
851        }
852    }
853
854    async fn mark_alive(&mut self, node: NodeId) {
855        if let Some(member) = self.members.get_mut(&node) {
856            if member.state == MemberStatus::Suspect {
857                member.state = MemberStatus::Alive;
858                member.state_change = Instant::now();
859                self.emit(GossipEvent::MemberAlive(node)).await;
860            }
861        }
862    }
863
864    /// Checks for timed-out probes and implements two-phase failure detection.
865    ///
866    /// Phase 1: direct ping timeout → send PingReq to `indirect_probes` random
867    ///          alive members, and register an indirect probe for the target.
868    /// Phase 2: indirect probe timeout → mark the target Suspect.
869    ///
870    /// Returns PingReq messages to send.
871    fn check_probe_timeouts(&mut self) -> Vec<(SocketAddr, GossipMessage)> {
872        let timeout = self.config.probe_timeout;
873        let now = Instant::now();
874        let mut outgoing = Vec::new();
875
876        // Single pass: collect timed-out entries by kind and remove them.
877        let mut timed_out_indirect: Vec<(u64, NodeId)> = Vec::new();
878        let mut timed_out_direct: Vec<(u64, NodeId)> = Vec::new();
879        self.pending_probes.retain(|seq, probe| {
880            if now.duration_since(probe.sent_at) <= timeout {
881                return true;
882            }
883            if probe.indirect {
884                timed_out_indirect.push((*seq, probe.target));
885            } else {
886                timed_out_direct.push((*seq, probe.target));
887            }
888            false
889        });
890
891        // Phase 2: indirect probe timeouts → mark Suspect
892        for (_seq, target) in timed_out_indirect {
893            let incarnation = self
894                .members
895                .get(&target)
896                .filter(|m| m.state == MemberStatus::Alive)
897                .map(|m| m.incarnation);
898
899            if let Some(inc) = incarnation {
900                if let Some(member) = self.members.get_mut(&target) {
901                    debug!("node {} failed indirect probe, marking suspect", target);
902                    member.state = MemberStatus::Suspect;
903                    member.state_change = Instant::now();
904                }
905                self.queue_update(NodeUpdate::Suspect {
906                    node: target,
907                    incarnation: inc,
908                });
909            }
910        }
911
912        // Phase 1: direct ping timeouts → send PingReq
913        for (_seq, target) in timed_out_direct {
914            let target_addr = match self.members.get(&target) {
915                Some(m) if m.state == MemberStatus::Alive => m.addr,
916                _ => continue,
917            };
918
919            // pick random alive members (excluding target) to relay through
920            let relay_nodes: Vec<(NodeId, SocketAddr)> = self
921                .members
922                .values()
923                .filter(|m| m.state == MemberStatus::Alive && m.id != target)
924                .map(|m| (m.id, m.addr))
925                .collect();
926
927            if relay_nodes.is_empty() {
928                // no relays available — fall back to immediate suspect
929                let incarnation = self
930                    .members
931                    .get(&target)
932                    .map(|m| m.incarnation)
933                    .unwrap_or(0);
934
935                if let Some(member) = self.members.get_mut(&target) {
936                    debug!("node {} timed out with no relays, marking suspect", target);
937                    member.state = MemberStatus::Suspect;
938                    member.state_change = Instant::now();
939                }
940                self.queue_update(NodeUpdate::Suspect {
941                    node: target,
942                    incarnation,
943                });
944                continue;
945            }
946
947            let k = self.config.indirect_probes.min(relay_nodes.len());
948            let chosen: Vec<_> = relay_nodes
949                .choose_multiple(&mut rand::rng(), k)
950                .copied()
951                .collect();
952
953            debug!(
954                "node {} direct ping timed out, sending PingReq to {} relays",
955                target,
956                chosen.len()
957            );
958
959            // register an indirect probe — if this times out, we mark Suspect
960            let indirect_seq = self.next_seq;
961            self.next_seq += 1;
962            self.pending_probes.insert(
963                indirect_seq,
964                PendingProbe {
965                    target,
966                    sent_at: Instant::now(),
967                    indirect: true,
968                },
969            );
970
971            for (_, relay_addr) in chosen {
972                outgoing.push((
973                    relay_addr,
974                    GossipMessage::PingReq {
975                        seq: indirect_seq,
976                        sender: self.local_id,
977                        target,
978                        target_addr,
979                    },
980                ));
981            }
982        }
983
984        outgoing
985    }
986
987    /// Removes stale relay entries that have timed out.
988    ///
989    /// If the target never responds, the relay entry just sits there.
990    /// The original prober handles its own timeout via the indirect probe.
991    fn cleanup_stale_relays(&mut self) {
992        let timeout = self.config.probe_timeout;
993        let now = Instant::now();
994        self.relay_pending
995            .retain(|_, entry| now.duration_since(entry.sent_at) <= timeout);
996    }
997
998    fn check_suspicion_timeouts(&mut self) {
999        let suspicion_timeout = self.config.protocol_period * self.config.suspicion_mult;
1000        let now = Instant::now();
1001        let mut to_mark_dead = Vec::new();
1002
1003        for member in self.members.values() {
1004            if member.state == MemberStatus::Suspect
1005                && now.duration_since(member.state_change) > suspicion_timeout
1006            {
1007                to_mark_dead.push((member.id, member.incarnation));
1008            }
1009        }
1010
1011        for (id, incarnation) in to_mark_dead {
1012            if let Some(member) = self.members.get_mut(&id) {
1013                warn!("node {} confirmed dead after suspicion timeout", id);
1014                member.state = MemberStatus::Dead;
1015                member.state_change = Instant::now();
1016                self.queue_update(NodeUpdate::Dead {
1017                    node: id,
1018                    incarnation,
1019                });
1020            }
1021        }
1022    }
1023
1024    fn queue_update(&mut self, update: NodeUpdate) {
1025        self.pending_updates.push_back(update);
1026        // When the queue overflows, drop the oldest pending updates.
1027        // This is safe: gossip convergence doesn't require every update to be
1028        // delivered. Members re-gossip their state on each protocol period, so
1029        // a dropped update will be re-sent in the next round.
1030        if self.pending_updates.len() > self.config.max_piggyback * 2 {
1031            let to_drop = self.pending_updates.len() - self.config.max_piggyback * 2;
1032            self.pending_updates.drain(..to_drop);
1033        }
1034    }
1035
1036    fn collect_updates(&mut self) -> Vec<NodeUpdate> {
1037        let count = self.pending_updates.len().min(self.config.max_piggyback);
1038        self.pending_updates.drain(..count).collect()
1039    }
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044    use super::*;
1045    use std::net::Ipv4Addr;
1046
1047    fn test_addr(port: u16) -> SocketAddr {
1048        SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), port))
1049    }
1050
1051    #[tokio::test]
1052    async fn engine_creation() {
1053        let (tx, _rx) = mpsc::channel(16);
1054        let engine = GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1055        assert_eq!(engine.alive_count(), 0);
1056    }
1057
1058    #[tokio::test]
1059    async fn add_seed() {
1060        let (tx, _rx) = mpsc::channel(16);
1061        let mut engine =
1062            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1063
1064        let seed_id = NodeId::new();
1065        engine.add_seed(seed_id, test_addr(6380));
1066        assert_eq!(engine.alive_count(), 1);
1067    }
1068
1069    #[tokio::test]
1070    async fn handle_ping() {
1071        let (tx, _rx) = mpsc::channel(16);
1072        let mut engine =
1073            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1074
1075        let sender = NodeId::new();
1076        let msg = GossipMessage::Ping {
1077            seq: 1,
1078            sender,
1079            updates: vec![],
1080        };
1081
1082        let responses = engine.handle_message(msg, test_addr(6380)).await;
1083        assert_eq!(responses.len(), 1);
1084        assert!(matches!(responses[0].1, GossipMessage::Ack { .. }));
1085        assert_eq!(responses[0].0.port(), 6380);
1086        assert_eq!(engine.alive_count(), 1);
1087    }
1088
1089    #[tokio::test]
1090    async fn handle_join() {
1091        let (tx, _rx) = mpsc::channel(16);
1092        let mut engine =
1093            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1094
1095        let joiner = NodeId::new();
1096        let msg = GossipMessage::Join {
1097            sender: joiner,
1098            sender_addr: test_addr(6380),
1099        };
1100
1101        let responses = engine.handle_message(msg, test_addr(6380)).await;
1102        assert_eq!(responses.len(), 1);
1103        assert!(matches!(responses[0].1, GossipMessage::Welcome { .. }));
1104        assert_eq!(engine.alive_count(), 1);
1105    }
1106
1107    #[tokio::test]
1108    async fn tick_with_no_members() {
1109        let (tx, _rx) = mpsc::channel(16);
1110        let mut engine =
1111            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1112
1113        let messages = engine.tick();
1114        assert!(messages.is_empty());
1115    }
1116
1117    #[tokio::test]
1118    async fn tick_with_members() {
1119        let (tx, _rx) = mpsc::channel(16);
1120        let mut engine =
1121            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1122
1123        engine.add_seed(NodeId::new(), test_addr(6380));
1124        let messages = engine.tick();
1125        assert_eq!(messages.len(), 1);
1126
1127        let (addr, msg) = &messages[0];
1128        assert_eq!(addr.port(), 6380);
1129        assert!(matches!(msg, GossipMessage::Ping { .. }));
1130    }
1131
1132    #[tokio::test]
1133    async fn create_join_message() {
1134        let (tx, _rx) = mpsc::channel(16);
1135        let id = NodeId::new();
1136        let addr = test_addr(6379);
1137        let engine = GossipEngine::new(id, addr, GossipConfig::default(), tx);
1138
1139        let msg = engine.create_join_message();
1140        match msg {
1141            GossipMessage::Join {
1142                sender,
1143                sender_addr,
1144            } => {
1145                assert_eq!(sender, id);
1146                assert_eq!(sender_addr, addr);
1147            }
1148            _ => panic!("expected Join message"),
1149        }
1150    }
1151
1152    #[tokio::test]
1153    async fn apply_slots_changed_updates_member() {
1154        let (tx, mut rx) = mpsc::channel(16);
1155        let mut engine =
1156            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1157
1158        let remote = NodeId::new();
1159        engine.add_seed(remote, test_addr(6380));
1160
1161        let slots = vec![SlotRange::new(0, 5460)];
1162        let updates = vec![NodeUpdate::SlotsChanged {
1163            node: remote,
1164            incarnation: 1,
1165            slots: slots.clone(),
1166        }];
1167
1168        let msg = GossipMessage::Ping {
1169            seq: 1,
1170            sender: remote,
1171            updates,
1172        };
1173        engine.handle_message(msg, test_addr(6380)).await;
1174
1175        // member should have updated slots
1176        let member = engine.members.get(&remote).unwrap();
1177        assert_eq!(member.slots, slots);
1178
1179        // should have emitted a SlotsChanged event
1180        let event = rx.try_recv().unwrap();
1181        assert!(matches!(event, GossipEvent::SlotsChanged(id, _) if id == remote));
1182    }
1183
1184    #[tokio::test]
1185    async fn stale_slots_changed_ignored() {
1186        let (tx, mut rx) = mpsc::channel(16);
1187        let mut engine =
1188            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1189
1190        let remote = NodeId::new();
1191        // add member with incarnation 5
1192        engine.members.insert(
1193            remote,
1194            MemberState {
1195                id: remote,
1196                addr: test_addr(6380),
1197                incarnation: 5,
1198                state: MemberStatus::Alive,
1199                state_change: Instant::now(),
1200                is_primary: true,
1201                replicates: None,
1202                slots: vec![SlotRange::new(0, 5460)],
1203            },
1204        );
1205
1206        // send slots update with stale incarnation (lower)
1207        let msg = GossipMessage::Ping {
1208            seq: 1,
1209            sender: remote,
1210            updates: vec![NodeUpdate::SlotsChanged {
1211                node: remote,
1212                incarnation: 3, // stale
1213                slots: vec![],
1214            }],
1215        };
1216        engine.handle_message(msg, test_addr(6380)).await;
1217
1218        // slots should NOT have been cleared
1219        let member = engine.members.get(&remote).unwrap();
1220        assert_eq!(member.slots.len(), 1);
1221
1222        // no SlotsChanged event should have been emitted
1223        assert!(rx.try_recv().is_err());
1224    }
1225
1226    #[tokio::test]
1227    async fn welcome_includes_local_slots() {
1228        let (tx, _rx) = mpsc::channel(16);
1229        let local_id = NodeId::new();
1230        let mut engine = GossipEngine::new(local_id, test_addr(6379), GossipConfig::default(), tx);
1231
1232        engine.set_local_slots(vec![SlotRange::new(0, 16383)]);
1233
1234        let joiner = NodeId::new();
1235        let msg = GossipMessage::Join {
1236            sender: joiner,
1237            sender_addr: test_addr(6380),
1238        };
1239
1240        let responses = engine.handle_message(msg, test_addr(6380)).await;
1241        assert_eq!(responses.len(), 1);
1242        match &responses[0].1 {
1243            GossipMessage::Welcome { members, .. } => {
1244                let local_member = members.iter().find(|m| m.id == local_id);
1245                assert!(local_member.is_some(), "welcome should include local node");
1246                assert_eq!(local_member.unwrap().slots, vec![SlotRange::new(0, 16383)]);
1247            }
1248            other => panic!("expected Welcome, got {other:?}"),
1249        }
1250    }
1251
1252    #[tokio::test]
1253    async fn welcome_propagates_member_slots() {
1254        let (tx, mut rx) = mpsc::channel(16);
1255        let mut engine =
1256            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1257
1258        let sender = NodeId::new();
1259        let member_id = NodeId::new();
1260        let slots = vec![SlotRange::new(0, 5460)];
1261
1262        let msg = GossipMessage::Welcome {
1263            sender,
1264            members: vec![MemberInfo {
1265                id: member_id,
1266                addr: test_addr(6381),
1267                incarnation: 1,
1268                is_primary: true,
1269                slots: slots.clone(),
1270            }],
1271        };
1272
1273        engine.handle_message(msg, test_addr(6380)).await;
1274
1275        // member should be added with slots
1276        let member = engine.members.get(&member_id).unwrap();
1277        assert_eq!(member.slots, slots);
1278
1279        // Fix 1: the handler now emits MemberJoined for the sender first,
1280        // then for each new member in the members list.
1281        // Drain until we find the one for member_id.
1282        let mut found = false;
1283        while let Ok(event) = rx.try_recv() {
1284            if let GossipEvent::MemberJoined(id, _, s) = event {
1285                if id == member_id {
1286                    assert_eq!(s, slots);
1287                    found = true;
1288                    break;
1289                }
1290            }
1291        }
1292        assert!(found, "expected MemberJoined for member_id with slots");
1293    }
1294
1295    #[tokio::test]
1296    async fn direct_ping_timeout_sends_pingreq() {
1297        let (tx, _rx) = mpsc::channel(16);
1298        let config = GossipConfig {
1299            probe_timeout: Duration::from_millis(0), // expire immediately
1300            ..GossipConfig::default()
1301        };
1302        let mut engine = GossipEngine::new(NodeId::new(), test_addr(6379), config, tx);
1303
1304        let target = NodeId::new();
1305        let relay = NodeId::new();
1306        engine.add_seed(target, test_addr(6380));
1307        engine.add_seed(relay, test_addr(6381));
1308
1309        // send a direct probe to target
1310        let messages = engine.tick();
1311        // tick sends the direct ping plus checks timeouts (but probe just started)
1312        assert!(!messages.is_empty());
1313
1314        // the direct probe is pending — now on next tick it should time out
1315        // and generate PingReq messages to the relay
1316        let messages = engine.tick();
1317
1318        // should have at least one PingReq in the outgoing messages
1319        let pingreqs: Vec<_> = messages
1320            .iter()
1321            .filter(|(_, msg)| matches!(msg, GossipMessage::PingReq { .. }))
1322            .collect();
1323
1324        // the first tick's probe should have timed out (0ms timeout)
1325        // and generated PingReq(s) to the relay node
1326        assert!(
1327            !pingreqs.is_empty(),
1328            "expected PingReq after direct probe timeout"
1329        );
1330    }
1331
1332    #[tokio::test]
1333    async fn pingreq_handler_forwards_to_target() {
1334        let (tx, _rx) = mpsc::channel(16);
1335        let mut engine =
1336            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1337
1338        let requester = NodeId::new();
1339        let target = NodeId::new();
1340        let target_addr = test_addr(6381);
1341
1342        let msg = GossipMessage::PingReq {
1343            seq: 42,
1344            sender: requester,
1345            target,
1346            target_addr,
1347        };
1348
1349        let responses = engine.handle_message(msg, test_addr(6380)).await;
1350
1351        // should forward a Ping to the target address
1352        assert_eq!(responses.len(), 1);
1353        assert_eq!(responses[0].0, target_addr);
1354        assert!(matches!(responses[0].1, GossipMessage::Ping { .. }));
1355
1356        // relay entry should be registered
1357        assert_eq!(engine.relay_pending.len(), 1);
1358    }
1359
1360    #[tokio::test]
1361    async fn relayed_ack_forwarded_to_requester() {
1362        let (tx, _rx) = mpsc::channel(16);
1363        let mut engine =
1364            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1365
1366        let requester = NodeId::new();
1367        let requester_addr = test_addr(6380);
1368        let target = NodeId::new();
1369        let target_addr = test_addr(6381);
1370
1371        // step 1: receive PingReq
1372        let msg = GossipMessage::PingReq {
1373            seq: 42,
1374            sender: requester,
1375            target,
1376            target_addr,
1377        };
1378        let responses = engine.handle_message(msg, requester_addr).await;
1379        let relay_seq = match &responses[0].1 {
1380            GossipMessage::Ping { seq, .. } => *seq,
1381            other => panic!("expected Ping, got {other:?}"),
1382        };
1383
1384        // step 2: target responds with Ack for the relay_seq
1385        let target_sender = NodeId::new();
1386        let ack = GossipMessage::Ack {
1387            seq: relay_seq,
1388            sender: target_sender,
1389            updates: vec![],
1390        };
1391        let responses = engine.handle_message(ack, target_addr).await;
1392
1393        // should forward an Ack with the original seq back to the requester
1394        assert_eq!(responses.len(), 1);
1395        assert_eq!(responses[0].0, requester_addr);
1396        match &responses[0].1 {
1397            GossipMessage::Ack { seq, .. } => assert_eq!(*seq, 42),
1398            other => panic!("expected Ack, got {other:?}"),
1399        }
1400
1401        // relay entry should be cleaned up
1402        assert!(engine.relay_pending.is_empty());
1403    }
1404
1405    #[tokio::test]
1406    async fn indirect_probe_timeout_marks_suspect() {
1407        let (tx, _rx) = mpsc::channel(16);
1408        let config = GossipConfig {
1409            probe_timeout: Duration::from_millis(0), // expire immediately
1410            indirect_probes: 1,
1411            ..GossipConfig::default()
1412        };
1413        let mut engine = GossipEngine::new(NodeId::new(), test_addr(6379), config, tx);
1414
1415        let target = NodeId::new();
1416        let relay = NodeId::new();
1417        engine.add_seed(target, test_addr(6380));
1418        engine.add_seed(relay, test_addr(6381));
1419
1420        // tick 1: send direct ping
1421        engine.tick();
1422
1423        // tick 2: direct probe timed out → PingReq sent, indirect probe registered
1424        engine.tick();
1425
1426        // tick 3: indirect probe also timed out → should mark Suspect
1427        engine.tick();
1428
1429        // verify at least one member is now Suspect
1430        let suspect_count = engine
1431            .members
1432            .values()
1433            .filter(|m| m.state == MemberStatus::Suspect)
1434            .count();
1435        assert!(suspect_count > 0, "expected at least one Suspect member");
1436    }
1437
1438    #[tokio::test]
1439    async fn stale_relay_entries_cleaned_up() {
1440        let (tx, _rx) = mpsc::channel(16);
1441        let config = GossipConfig {
1442            probe_timeout: Duration::from_millis(0), // expire immediately
1443            ..GossipConfig::default()
1444        };
1445        let mut engine = GossipEngine::new(NodeId::new(), test_addr(6379), config, tx);
1446
1447        // manually insert a relay entry
1448        engine.relay_pending.insert(
1449            999,
1450            RelayEntry {
1451                requester: test_addr(6380),
1452                original_seq: 1,
1453                sent_at: Instant::now() - Duration::from_secs(10),
1454            },
1455        );
1456
1457        engine.tick();
1458
1459        // stale entry should be cleaned up
1460        assert!(engine.relay_pending.is_empty());
1461    }
1462
1463    #[tokio::test]
1464    async fn apply_role_changed_updates_member() {
1465        let (tx, mut rx) = mpsc::channel(16);
1466        let mut engine =
1467            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1468
1469        let remote = NodeId::new();
1470        let primary = NodeId::new();
1471        engine.add_seed(remote, test_addr(6380));
1472
1473        // send a role change: remote becomes a replica of primary
1474        let msg = GossipMessage::Ping {
1475            seq: 1,
1476            sender: remote,
1477            updates: vec![NodeUpdate::RoleChanged {
1478                node: remote,
1479                incarnation: 2,
1480                is_primary: false,
1481                replicates: Some(primary),
1482            }],
1483        };
1484        engine.handle_message(msg, test_addr(6380)).await;
1485
1486        let member = engine.members.get(&remote).unwrap();
1487        assert!(!member.is_primary);
1488        assert_eq!(member.replicates, Some(primary));
1489        assert_eq!(member.incarnation, 2);
1490
1491        // should have emitted RoleChanged
1492        let mut found = false;
1493        while let Ok(event) = rx.try_recv() {
1494            if let GossipEvent::RoleChanged(id, is_primary, replicates) = event {
1495                if id == remote {
1496                    assert!(!is_primary);
1497                    assert_eq!(replicates, Some(primary));
1498                    found = true;
1499                    break;
1500                }
1501            }
1502        }
1503        assert!(found, "expected RoleChanged event for remote");
1504    }
1505
1506    #[tokio::test]
1507    async fn stale_role_changed_ignored() {
1508        let (tx, mut rx) = mpsc::channel(16);
1509        let mut engine =
1510            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1511
1512        let remote = NodeId::new();
1513        engine.members.insert(
1514            remote,
1515            MemberState {
1516                id: remote,
1517                addr: test_addr(6380),
1518                incarnation: 10,
1519                state: MemberStatus::Alive,
1520                state_change: Instant::now(),
1521                is_primary: true,
1522                replicates: None,
1523                slots: vec![],
1524            },
1525        );
1526
1527        // send role change with old incarnation
1528        let msg = GossipMessage::Ping {
1529            seq: 1,
1530            sender: remote,
1531            updates: vec![NodeUpdate::RoleChanged {
1532                node: remote,
1533                incarnation: 5, // stale
1534                is_primary: false,
1535                replicates: None,
1536            }],
1537        };
1538        engine.handle_message(msg, test_addr(6380)).await;
1539
1540        // member should still be primary
1541        let member = engine.members.get(&remote).unwrap();
1542        assert!(member.is_primary, "stale update should not change role");
1543
1544        // drain events (MemberAlive from the Ping sender, but no RoleChanged)
1545        let mut role_changed = false;
1546        while let Ok(event) = rx.try_recv() {
1547            if matches!(event, GossipEvent::RoleChanged(..)) {
1548                role_changed = true;
1549            }
1550        }
1551        assert!(
1552            !role_changed,
1553            "stale role update should not emit RoleChanged"
1554        );
1555    }
1556
1557    #[tokio::test]
1558    async fn excessive_incarnation_jump_rejected() {
1559        let (tx, _rx) = mpsc::channel(16);
1560        let mut engine =
1561            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1562
1563        let remote = NodeId::new();
1564        engine.add_seed(remote, test_addr(6380));
1565        // seed starts at incarnation 0
1566
1567        // a jump larger than MAX_INCARNATION_JUMP should be rejected
1568        let msg = GossipMessage::Ping {
1569            seq: 1,
1570            sender: remote,
1571            updates: vec![NodeUpdate::Alive {
1572                node: remote,
1573                addr: test_addr(6380),
1574                incarnation: MAX_INCARNATION_JUMP + 1,
1575            }],
1576        };
1577        engine.handle_message(msg, test_addr(6380)).await;
1578
1579        // incarnation should NOT have changed
1580        let member = engine.members.get(&remote).unwrap();
1581        assert_eq!(
1582            member.incarnation, 0,
1583            "incarnation should not change after excessive jump"
1584        );
1585    }
1586
1587    #[tokio::test]
1588    async fn valid_incarnation_jump_accepted() {
1589        let (tx, _rx) = mpsc::channel(16);
1590        let mut engine =
1591            GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1592
1593        let remote = NodeId::new();
1594        engine.add_seed(remote, test_addr(6380));
1595
1596        // a jump within the limit should be accepted
1597        let msg = GossipMessage::Ping {
1598            seq: 1,
1599            sender: remote,
1600            updates: vec![NodeUpdate::Alive {
1601                node: remote,
1602                addr: test_addr(6380),
1603                incarnation: MAX_INCARNATION_JUMP,
1604            }],
1605        };
1606        engine.handle_message(msg, test_addr(6380)).await;
1607
1608        let member = engine.members.get(&remote).unwrap();
1609        assert_eq!(
1610            member.incarnation, MAX_INCARNATION_JUMP,
1611            "valid incarnation jump should be accepted"
1612        );
1613    }
1614}