Skip to main content

engine/distributed/
gossip.rs

1//! Gossip Protocol for Distributed Node Discovery
2//!
3//! Implements a SWIM-style gossip protocol for automatic node discovery,
4//! failure detection, and cluster state dissemination.
5//!
6//! Features:
7//! - Automatic node discovery through gossip
8//! - Failure detection with configurable timeouts
9//! - Efficient state propagation using infection-style dissemination
10//! - Suspicion mechanism to reduce false positives
11
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::net::SocketAddr;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::net::UdpSocket;
19use tokio::sync::{mpsc, RwLock};
20use tracing::{debug, error, info, warn};
21
22use super::{NodeHealth, NodeInfo, NodeRole, NodeStatus};
23
24/// Configuration for the gossip protocol
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct GossipConfig {
27    /// Interval between protocol rounds in milliseconds
28    pub protocol_period_ms: u64,
29    /// Number of nodes to probe per round
30    pub fanout: usize,
31    /// Timeout for ping responses in milliseconds
32    pub ping_timeout_ms: u64,
33    /// Number of indirect probes when direct ping fails
34    pub indirect_probes: usize,
35    /// Timeout for indirect ping responses
36    pub indirect_ping_timeout_ms: u64,
37    /// Suspicion timeout multiplier (suspicion_timeout = suspicion_mult * protocol_period * log(n+1))
38    pub suspicion_mult: u32,
39    /// Minimum suspicion timeout in milliseconds (floor to prevent premature death in small clusters)
40    pub min_suspicion_timeout_ms: u64,
41    /// Interval (in protocol rounds) between periodic seed re-join attempts
42    pub rejoin_interval_rounds: u64,
43    /// Maximum time a Dead node stays in the member list before garbage collection (ms)
44    pub dead_node_gc_timeout_ms: u64,
45    /// Maximum number of messages to piggyback per packet
46    pub max_gossip_messages: usize,
47    /// UDP port for gossip communication
48    pub gossip_port: u16,
49    /// Maximum packet size in bytes
50    pub max_packet_size: usize,
51    /// Seed nodes for initial cluster discovery
52    pub seed_nodes: Vec<String>,
53}
54
55impl Default for GossipConfig {
56    fn default() -> Self {
57        Self {
58            protocol_period_ms: 1000,
59            fanout: 3,
60            ping_timeout_ms: 500,
61            indirect_probes: 3,
62            indirect_ping_timeout_ms: 1000,
63            suspicion_mult: 6,
64            min_suspicion_timeout_ms: 30_000, // 30s floor — small clusters need generous timeouts
65            rejoin_interval_rounds: 30,       // Re-try seeds every 30 protocol rounds
66            dead_node_gc_timeout_ms: 300_000, // GC dead nodes after 5 minutes
67            max_gossip_messages: 10,
68            gossip_port: 7947,
69            max_packet_size: 65507, // Max UDP packet size
70            seed_nodes: Vec::new(),
71        }
72    }
73}
74
75/// Member state in the gossip protocol
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum MemberState {
78    /// Node is alive and responding
79    Alive,
80    /// Node is suspected to be dead
81    Suspect,
82    /// Node is confirmed dead
83    Dead,
84    /// Node has left the cluster gracefully
85    Left,
86}
87
88/// Information about a cluster member
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct GossipMember {
91    /// Unique node identifier
92    pub node_id: String,
93    /// Node's gossip address
94    pub address: SocketAddr,
95    /// API address for client connections
96    pub api_address: String,
97    /// Node role
98    pub role: NodeRole,
99    /// Current state
100    pub state: MemberState,
101    /// State incarnation number (for conflict resolution)
102    pub incarnation: u64,
103    /// Timestamp when state was last updated
104    pub last_updated_ms: u64,
105    /// Timestamp when suspicion started (if suspect)
106    pub suspect_time_ms: Option<u64>,
107    /// Custom metadata
108    pub metadata: HashMap<String, String>,
109}
110
111impl GossipMember {
112    /// Create a new member
113    pub fn new(node_id: String, address: SocketAddr, api_address: String, role: NodeRole) -> Self {
114        Self {
115            node_id,
116            address,
117            api_address,
118            role,
119            state: MemberState::Alive,
120            incarnation: 0,
121            last_updated_ms: current_time_ms(),
122            suspect_time_ms: None,
123            metadata: HashMap::new(),
124        }
125    }
126
127    /// Convert to NodeInfo for cluster coordinator
128    pub fn to_node_info(&self) -> NodeInfo {
129        let mut info = NodeInfo::new(self.node_id.clone(), self.api_address.clone(), self.role);
130        info.health = NodeHealth {
131            status: match self.state {
132                MemberState::Alive => NodeStatus::Healthy,
133                MemberState::Suspect => NodeStatus::Suspect,
134                MemberState::Dead | MemberState::Left => NodeStatus::Offline,
135            },
136            last_healthy_ms: self.last_updated_ms,
137            ..Default::default()
138        };
139        info.metadata = self.metadata.clone();
140        info
141    }
142}
143
144/// Types of gossip messages
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub enum GossipMessage {
147    /// Ping request for failure detection
148    Ping {
149        seq_no: u64,
150        from: String,
151        /// Piggybacked state updates
152        updates: Vec<MemberStateUpdate>,
153    },
154    /// Ping acknowledgement
155    Ack {
156        seq_no: u64,
157        from: String,
158        updates: Vec<MemberStateUpdate>,
159    },
160    /// Request to ping another node on behalf of requester
161    PingReq {
162        seq_no: u64,
163        from: String,
164        target: String,
165        target_addr: SocketAddr,
166        updates: Vec<MemberStateUpdate>,
167    },
168    /// Indirect ack (forwarded from target)
169    IndirectAck {
170        seq_no: u64,
171        from: String,
172        target: String,
173        updates: Vec<MemberStateUpdate>,
174    },
175    /// Join request from new node
176    Join { member: GossipMember },
177    /// Join response with current membership
178    JoinAck { members: Vec<GossipMember> },
179    /// Graceful leave notification
180    Leave { node_id: String, incarnation: u64 },
181    /// Full state sync request
182    SyncRequest { from: String },
183    /// Full state sync response
184    SyncResponse { members: Vec<GossipMember> },
185}
186
187/// State update for a member (piggybacked on messages)
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct MemberStateUpdate {
190    pub node_id: String,
191    pub state: MemberState,
192    pub incarnation: u64,
193    pub address: Option<SocketAddr>,
194    pub api_address: Option<String>,
195    pub role: Option<NodeRole>,
196}
197
198/// Event emitted by the gossip protocol
199#[derive(Debug, Clone)]
200pub enum GossipEvent {
201    /// A new node joined the cluster
202    NodeJoined(GossipMember),
203    /// A node left the cluster gracefully
204    NodeLeft(String),
205    /// A node was detected as failed
206    NodeFailed(String),
207    /// A node recovered from suspected state
208    NodeRecovered(String),
209    /// A node's state was updated
210    NodeUpdated(GossipMember),
211}
212
213/// Gossip protocol instance
214pub struct GossipProtocol {
215    /// Configuration
216    config: GossipConfig,
217    /// Local node information
218    local_member: GossipMember,
219    /// All known members
220    members: Arc<RwLock<HashMap<String, GossipMember>>>,
221    /// Sequence number for ping/ack matching (reserved for protocol implementation)
222    _seq_no: AtomicU64,
223    /// Pending pings awaiting acks
224    pending_pings: Arc<RwLock<HashMap<u64, PendingPing>>>,
225    /// Queue of state updates to disseminate
226    update_queue: Arc<RwLock<VecDeque<MemberStateUpdate>>>,
227    /// Nodes to probe in round-robin order
228    probe_index: Arc<RwLock<usize>>,
229    /// Running flag
230    running: Arc<AtomicBool>,
231    /// Event sender
232    event_tx: mpsc::Sender<GossipEvent>,
233}
234
235/// Shared gossip protocol state used across protocol tasks
236struct GossipContext {
237    members: Arc<RwLock<HashMap<String, GossipMember>>>,
238    pending_pings: Arc<RwLock<HashMap<u64, PendingPing>>>,
239    update_queue: Arc<RwLock<VecDeque<MemberStateUpdate>>>,
240    config: GossipConfig,
241    local_member: GossipMember,
242    event_tx: mpsc::Sender<GossipEvent>,
243}
244
245/// Pending ping awaiting acknowledgement
246struct PendingPing {
247    target: String,
248    sent_at: Instant,
249    _is_indirect: bool,
250}
251
252impl GossipProtocol {
253    /// Create a new gossip protocol instance
254    pub fn new(
255        config: GossipConfig,
256        local_member: GossipMember,
257        event_tx: mpsc::Sender<GossipEvent>,
258    ) -> Self {
259        Self {
260            config,
261            local_member,
262            members: Arc::new(RwLock::new(HashMap::new())),
263            _seq_no: AtomicU64::new(0),
264            pending_pings: Arc::new(RwLock::new(HashMap::new())),
265            update_queue: Arc::new(RwLock::new(VecDeque::new())),
266            probe_index: Arc::new(RwLock::new(0)),
267            running: Arc::new(AtomicBool::new(false)),
268            event_tx,
269        }
270    }
271
272    /// Start the gossip protocol
273    pub async fn start(&self) -> Result<(), GossipError> {
274        if self.running.swap(true, Ordering::SeqCst) {
275            return Err(GossipError::AlreadyRunning);
276        }
277
278        // Bind UDP socket
279        let bind_addr = format!("0.0.0.0:{}", self.config.gossip_port);
280        let socket = Arc::new(
281            UdpSocket::bind(&bind_addr)
282                .await
283                .map_err(|e| GossipError::BindError(e.to_string()))?,
284        );
285
286        info!(
287            node_id = %self.local_member.node_id,
288            address = %bind_addr,
289            "Gossip protocol started"
290        );
291
292        // Add self to members
293        {
294            let mut members = self.members.write().await;
295            members.insert(self.local_member.node_id.clone(), self.local_member.clone());
296        }
297
298        // Join seed nodes
299        self.join_seeds(&socket).await;
300
301        // Spawn protocol loop
302        let socket_clone = socket.clone();
303        let running = self.running.clone();
304        let members = self.members.clone();
305        let pending_pings = self.pending_pings.clone();
306        let update_queue = self.update_queue.clone();
307        let config = self.config.clone();
308        let local_member = self.local_member.clone();
309        let event_tx = self.event_tx.clone();
310        let probe_index = self.probe_index.clone();
311
312        // Spawn receiver task
313        let recv_ctx = GossipContext {
314            members: members.clone(),
315            pending_pings: pending_pings.clone(),
316            update_queue: update_queue.clone(),
317            config: config.clone(),
318            local_member: local_member.clone(),
319            event_tx: event_tx.clone(),
320        };
321
322        let socket_recv = socket.clone();
323        tokio::spawn(async move {
324            Self::receiver_loop(socket_recv, recv_ctx).await;
325        });
326
327        // Spawn protocol loop
328        let loop_ctx = GossipContext {
329            members,
330            pending_pings,
331            update_queue,
332            config,
333            local_member,
334            event_tx,
335        };
336
337        tokio::spawn(async move {
338            Self::protocol_loop(socket_clone, running, loop_ctx, probe_index).await;
339        });
340
341        Ok(())
342    }
343
344    /// Stop the gossip protocol
345    pub fn stop(&self) {
346        self.running.store(false, Ordering::SeqCst);
347        info!(
348            node_id = %self.local_member.node_id,
349            "Gossip protocol stopped"
350        );
351    }
352
353    /// Get all known members
354    pub async fn get_members(&self) -> Vec<GossipMember> {
355        let members = self.members.read().await;
356        members.values().cloned().collect()
357    }
358
359    /// Get alive members only
360    pub async fn get_alive_members(&self) -> Vec<GossipMember> {
361        let members = self.members.read().await;
362        members
363            .values()
364            .filter(|m| m.state == MemberState::Alive)
365            .cloned()
366            .collect()
367    }
368
369    /// Get member by ID
370    pub async fn get_member(&self, node_id: &str) -> Option<GossipMember> {
371        let members = self.members.read().await;
372        members.get(node_id).cloned()
373    }
374
375    /// Update local member metadata
376    pub async fn update_metadata(&self, key: String, value: String) {
377        let mut members = self.members.write().await;
378        if let Some(member) = members.get_mut(&self.local_member.node_id) {
379            member.metadata.insert(key, value);
380            member.incarnation += 1;
381            member.last_updated_ms = current_time_ms();
382        }
383    }
384
385    /// Gracefully leave the cluster
386    pub async fn leave(&self) -> Result<(), GossipError> {
387        let members = self.members.read().await;
388        let local = members
389            .get(&self.local_member.node_id)
390            .ok_or(GossipError::NotFound)?;
391
392        let leave_msg = GossipMessage::Leave {
393            node_id: self.local_member.node_id.clone(),
394            incarnation: local.incarnation + 1,
395        };
396
397        // Broadcast leave to all members
398        let msg_bytes = serialize_message(&leave_msg)?;
399        let socket = UdpSocket::bind("0.0.0.0:0")
400            .await
401            .map_err(|e| GossipError::BindError(e.to_string()))?;
402
403        for member in members.values() {
404            if member.node_id != self.local_member.node_id {
405                let _ = socket.send_to(&msg_bytes, member.address).await;
406            }
407        }
408
409        self.stop();
410        Ok(())
411    }
412
413    // Internal: Join seed nodes
414    async fn join_seeds(&self, socket: &UdpSocket) {
415        for seed in &self.config.seed_nodes {
416            // Try direct parse first (numeric IP:port), then DNS resolution (hostname:port)
417            let resolved = if let Ok(addr) = seed.parse::<SocketAddr>() {
418                Some(addr)
419            } else if let Ok(mut addrs) = tokio::net::lookup_host(seed.as_str()).await {
420                addrs.next()
421            } else {
422                warn!(seed = %seed, "Failed to resolve seed node address");
423                None
424            };
425
426            if let Some(addr) = resolved {
427                let join_msg = GossipMessage::Join {
428                    member: self.local_member.clone(),
429                };
430
431                if let Ok(bytes) = serialize_message(&join_msg) {
432                    info!(seed = %seed, resolved = %addr, "Sending join request to seed node");
433                    let _ = socket.send_to(&bytes, addr).await;
434                }
435            }
436        }
437    }
438
439    // Internal: Main receiver loop
440    async fn receiver_loop(socket: Arc<UdpSocket>, ctx: GossipContext) {
441        let mut buf = vec![0u8; ctx.config.max_packet_size];
442
443        loop {
444            match socket.recv_from(&mut buf).await {
445                Ok((len, src)) => {
446                    if let Ok(msg) = deserialize_message(&buf[..len]) {
447                        Self::handle_message(&socket, msg, src, &ctx).await;
448                    }
449                }
450                Err(e) => {
451                    error!(error = %e, "Error receiving gossip message");
452                }
453            }
454        }
455    }
456
457    // Internal: Handle incoming message
458    async fn handle_message(
459        socket: &UdpSocket,
460        msg: GossipMessage,
461        src: SocketAddr,
462        ctx: &GossipContext,
463    ) {
464        match msg {
465            GossipMessage::Ping {
466                seq_no,
467                from,
468                updates,
469            } => {
470                // Apply piggybacked updates
471                Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
472
473                // Sender is alive — refresh their heartbeat timestamp
474                {
475                    let mut members_guard = ctx.members.write().await;
476                    if let Some(member) = members_guard.get_mut(&from) {
477                        member.last_updated_ms = current_time_ms();
478                    }
479                }
480
481                // Get updates to send back
482                let reply_updates =
483                    Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
484
485                // Send ack
486                let ack = GossipMessage::Ack {
487                    seq_no,
488                    from: ctx.local_member.node_id.clone(),
489                    updates: reply_updates,
490                };
491
492                if let Ok(bytes) = serialize_message(&ack) {
493                    let _ = socket.send_to(&bytes, src).await;
494                }
495
496                debug!(from = %from, seq = seq_no, "Received ping, sent ack");
497            }
498
499            GossipMessage::Ack {
500                seq_no,
501                from,
502                updates,
503            } => {
504                // Apply updates
505                Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
506
507                // Sender is alive — refresh their heartbeat timestamp
508                {
509                    let mut members_guard = ctx.members.write().await;
510                    if let Some(member) = members_guard.get_mut(&from) {
511                        member.last_updated_ms = current_time_ms();
512                    }
513                }
514
515                // Clear pending ping
516                let mut pending = ctx.pending_pings.write().await;
517                if pending.remove(&seq_no).is_some() {
518                    debug!(from = %from, seq = seq_no, "Received ack");
519                }
520            }
521
522            GossipMessage::PingReq {
523                seq_no,
524                from,
525                target,
526                target_addr,
527                updates,
528            } => {
529                // Apply updates
530                Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
531
532                // Forward ping to target
533                let ping = GossipMessage::Ping {
534                    seq_no,
535                    from: ctx.local_member.node_id.clone(),
536                    updates: Vec::new(),
537                };
538
539                if let Ok(bytes) = serialize_message(&ping) {
540                    let _ = socket.send_to(&bytes, target_addr).await;
541                }
542
543                debug!(from = %from, target = %target, "Forwarding indirect ping");
544            }
545
546            GossipMessage::IndirectAck {
547                seq_no,
548                from,
549                target,
550                updates,
551            } => {
552                // Apply updates
553                Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
554
555                // Clear pending ping for target
556                let mut pending = ctx.pending_pings.write().await;
557                pending.remove(&seq_no);
558
559                debug!(from = %from, target = %target, "Received indirect ack");
560            }
561
562            GossipMessage::Join { member } => {
563                info!(node_id = %member.node_id, address = %member.address, "Node joining cluster");
564
565                // Add or re-add member (allow rejoins from dead/left nodes)
566                let mut members_guard = ctx.members.write().await;
567                let was_dead_or_left = members_guard
568                    .get(&member.node_id)
569                    .map(|m| matches!(m.state, MemberState::Dead | MemberState::Left))
570                    .unwrap_or(false);
571                let is_new = !members_guard.contains_key(&member.node_id);
572                members_guard.insert(member.node_id.clone(), member.clone());
573
574                // Send current membership
575                let all_members: Vec<GossipMember> = members_guard.values().cloned().collect();
576                drop(members_guard);
577
578                let join_ack = GossipMessage::JoinAck {
579                    members: all_members,
580                };
581                if let Ok(bytes) = serialize_message(&join_ack) {
582                    let _ = socket.send_to(&bytes, src).await;
583                }
584
585                if is_new {
586                    let _ = ctx.event_tx.send(GossipEvent::NodeJoined(member)).await;
587                } else if was_dead_or_left {
588                    info!(node_id = %member.node_id, "Dead/left node rejoined cluster");
589                    let _ = ctx
590                        .event_tx
591                        .send(GossipEvent::NodeRecovered(member.node_id.clone()))
592                        .await;
593                }
594            }
595
596            GossipMessage::JoinAck {
597                members: new_members,
598            } => {
599                let mut members_guard = ctx.members.write().await;
600                for member in new_members {
601                    if !members_guard.contains_key(&member.node_id) {
602                        members_guard.insert(member.node_id.clone(), member.clone());
603                        let _ = ctx.event_tx.send(GossipEvent::NodeJoined(member)).await;
604                    }
605                }
606                info!(count = members_guard.len(), "Received cluster membership");
607            }
608
609            GossipMessage::Leave {
610                node_id,
611                incarnation,
612            } => {
613                let mut members_guard = ctx.members.write().await;
614                if let Some(member) = members_guard.get_mut(&node_id) {
615                    if incarnation >= member.incarnation {
616                        member.state = MemberState::Left;
617                        member.incarnation = incarnation;
618                        info!(node_id = %node_id, "Node left cluster gracefully");
619                        let _ = ctx.event_tx.send(GossipEvent::NodeLeft(node_id)).await;
620                    }
621                }
622            }
623
624            GossipMessage::SyncRequest { from } => {
625                let members_guard = ctx.members.read().await;
626                let all_members: Vec<GossipMember> = members_guard.values().cloned().collect();
627
628                let sync_response = GossipMessage::SyncResponse {
629                    members: all_members,
630                };
631                if let Ok(bytes) = serialize_message(&sync_response) {
632                    let _ = socket.send_to(&bytes, src).await;
633                }
634
635                debug!(from = %from, "Handled sync request");
636            }
637
638            GossipMessage::SyncResponse {
639                members: new_members,
640            } => {
641                let mut members_guard = ctx.members.write().await;
642                for member in new_members {
643                    members_guard
644                        .entry(member.node_id.clone())
645                        .and_modify(|existing| {
646                            if member.incarnation > existing.incarnation {
647                                *existing = member.clone();
648                            }
649                        })
650                        .or_insert(member);
651                }
652            }
653        }
654    }
655
656    // Internal: Main protocol loop
657    async fn protocol_loop(
658        socket: Arc<UdpSocket>,
659        running: Arc<AtomicBool>,
660        ctx: GossipContext,
661        probe_index: Arc<RwLock<usize>>,
662    ) {
663        let protocol_period = Duration::from_millis(ctx.config.protocol_period_ms);
664        let mut seq_counter = 0u64;
665        let mut round_counter = 0u64;
666
667        while running.load(Ordering::SeqCst) {
668            tokio::time::sleep(protocol_period).await;
669            round_counter += 1;
670
671            // Periodic seed re-join: recover from network partitions
672            if ctx.config.rejoin_interval_rounds > 0
673                && round_counter.is_multiple_of(ctx.config.rejoin_interval_rounds)
674            {
675                let alive_count = {
676                    let members_guard = ctx.members.read().await;
677                    members_guard
678                        .values()
679                        .filter(|m| {
680                            m.node_id != ctx.local_member.node_id && m.state == MemberState::Alive
681                        })
682                        .count()
683                };
684                // Only re-join seeds if we have fewer alive peers than seeds configured
685                if alive_count < ctx.config.seed_nodes.len() {
686                    for seed in &ctx.config.seed_nodes {
687                        let resolved = if let Ok(addr) = seed.parse::<SocketAddr>() {
688                            Some(addr)
689                        } else if let Ok(mut addrs) = tokio::net::lookup_host(seed.as_str()).await {
690                            addrs.next()
691                        } else {
692                            None
693                        };
694                        if let Some(addr) = resolved {
695                            let join_msg = GossipMessage::Join {
696                                member: ctx.local_member.clone(),
697                            };
698                            if let Ok(bytes) = serialize_message(&join_msg) {
699                                debug!(seed = %seed, "Periodic seed re-join attempt");
700                                let _ = socket.send_to(&bytes, addr).await;
701                            }
702                        }
703                    }
704                }
705            }
706
707            // Dead node garbage collection: remove nodes dead longer than GC timeout
708            if ctx.config.dead_node_gc_timeout_ms > 0 && round_counter.is_multiple_of(60) {
709                let now = current_time_ms();
710                let mut members_guard = ctx.members.write().await;
711                let gc_candidates: Vec<String> = members_guard
712                    .values()
713                    .filter(|m| {
714                        matches!(m.state, MemberState::Dead | MemberState::Left)
715                            && now.saturating_sub(m.last_updated_ms)
716                                > ctx.config.dead_node_gc_timeout_ms
717                    })
718                    .map(|m| m.node_id.clone())
719                    .collect();
720                for node_id in &gc_candidates {
721                    members_guard.remove(node_id);
722                    info!(node_id = %node_id, "Garbage collected dead/left node from member list");
723                }
724                drop(members_guard);
725            }
726
727            // Get target to probe (include Dead nodes occasionally for resurrection)
728            let target = {
729                let members_guard = ctx.members.read().await;
730                let other_members: Vec<_> = members_guard
731                    .values()
732                    .filter(|m| {
733                        m.node_id != ctx.local_member.node_id
734                            && matches!(m.state, MemberState::Alive | MemberState::Suspect)
735                    })
736                    .collect();
737
738                if other_members.is_empty() {
739                    continue;
740                }
741
742                let mut idx = probe_index.write().await;
743                *idx = (*idx + 1) % other_members.len();
744                other_members[*idx].clone()
745            };
746
747            // Send ping
748            seq_counter += 1;
749            let updates =
750                Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
751            let ping = GossipMessage::Ping {
752                seq_no: seq_counter,
753                from: ctx.local_member.node_id.clone(),
754                updates,
755            };
756
757            if let Ok(bytes) = serialize_message(&ping) {
758                let _ = socket.send_to(&bytes, target.address).await;
759
760                // Record pending ping
761                let mut pending = ctx.pending_pings.write().await;
762                pending.insert(
763                    seq_counter,
764                    PendingPing {
765                        target: target.node_id.clone(),
766                        sent_at: Instant::now(),
767                        _is_indirect: false,
768                    },
769                );
770            }
771
772            // Check for ping timeouts
773            tokio::time::sleep(Duration::from_millis(ctx.config.ping_timeout_ms)).await;
774
775            let timed_out = {
776                let pending = ctx.pending_pings.read().await;
777                pending
778                    .iter()
779                    .filter(|(_, p)| {
780                        p.sent_at.elapsed() > Duration::from_millis(ctx.config.ping_timeout_ms)
781                    })
782                    .map(|(seq, p)| (*seq, p.target.clone()))
783                    .collect::<Vec<_>>()
784            };
785
786            for (seq, target_id) in timed_out {
787                // Try indirect probes
788                Self::try_indirect_probes(&socket, &ctx, &target_id, seq).await;
789            }
790
791            // Check for suspicion timeouts and mark dead
792            Self::check_suspicions(&ctx.members, &ctx.config, &ctx.event_tx).await;
793
794            // Self-refutation: if other nodes think we're Suspect or Dead,
795            // bump our incarnation and reassert Alive to correct the cluster view
796            {
797                let mut members_guard = ctx.members.write().await;
798                if let Some(local) = members_guard.get_mut(&ctx.local_member.node_id) {
799                    if matches!(local.state, MemberState::Suspect | MemberState::Dead) {
800                        let old_state = local.state;
801                        local.incarnation += 1;
802                        local.state = MemberState::Alive;
803                        local.suspect_time_ms = None;
804                        local.last_updated_ms = current_time_ms();
805                        warn!(
806                            old_state = ?old_state,
807                            new_incarnation = local.incarnation,
808                            "Self-refutation: local node was marked {:?}, reasserting Alive", old_state
809                        );
810
811                        // Queue an update so peers learn about our refutation
812                        let mut queue = ctx.update_queue.write().await;
813                        queue.push_back(MemberStateUpdate {
814                            node_id: ctx.local_member.node_id.clone(),
815                            state: MemberState::Alive,
816                            incarnation: local.incarnation,
817                            address: Some(local.address),
818                            api_address: Some(local.api_address.clone()),
819                            role: Some(local.role),
820                        });
821                    }
822                }
823            }
824        }
825    }
826
827    // Internal: Try indirect probes
828    async fn try_indirect_probes(
829        socket: &UdpSocket,
830        ctx: &GossipContext,
831        target_id: &str,
832        _seq: u64,
833    ) {
834        let members_guard = ctx.members.read().await;
835
836        let target = match members_guard.get(target_id) {
837            Some(t) => t.clone(),
838            None => return,
839        };
840
841        // Select indirect probe nodes
842        let indirect_nodes: Vec<_> = members_guard
843            .values()
844            .filter(|m| {
845                m.node_id != ctx.local_member.node_id
846                    && m.node_id != target_id
847                    && m.state == MemberState::Alive
848            })
849            .take(ctx.config.indirect_probes)
850            .cloned()
851            .collect();
852
853        drop(members_guard);
854
855        // Send indirect pings
856        for node in indirect_nodes {
857            let updates =
858                Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
859            let ping_req = GossipMessage::PingReq {
860                seq_no: rand::random(),
861                from: ctx.local_member.node_id.clone(),
862                target: target_id.to_string(),
863                target_addr: target.address,
864                updates,
865            };
866
867            if let Ok(bytes) = serialize_message(&ping_req) {
868                let _ = socket.send_to(&bytes, node.address).await;
869            }
870        }
871
872        // Wait for indirect responses
873        tokio::time::sleep(Duration::from_millis(ctx.config.indirect_ping_timeout_ms)).await;
874
875        // Check if still pending - if so, mark suspect
876        let still_pending = {
877            let pending = ctx.pending_pings.read().await;
878            pending.values().any(|p| p.target == target_id)
879        };
880
881        if still_pending {
882            let mut members_guard = ctx.members.write().await;
883            if let Some(member) = members_guard.get_mut(target_id) {
884                if member.state == MemberState::Alive {
885                    member.state = MemberState::Suspect;
886                    member.suspect_time_ms = Some(current_time_ms());
887                    warn!(node_id = %target_id, "Node marked as suspect");
888                }
889            }
890        }
891
892        // Remove pending ping
893        let mut pending = ctx.pending_pings.write().await;
894        pending.retain(|_, p| p.target != target_id);
895    }
896
897    // Internal: Check suspicion timeouts
898    async fn check_suspicions(
899        members: &RwLock<HashMap<String, GossipMember>>,
900        config: &GossipConfig,
901        event_tx: &mpsc::Sender<GossipEvent>,
902    ) {
903        let now = current_time_ms();
904        let members_guard = members.read().await;
905        let member_count = members_guard.len().max(1);
906
907        // Calculate suspicion timeout: suspicion_mult * protocol_period * log(n+1)
908        // with a minimum floor to prevent premature death in small clusters
909        let calculated_timeout = (config.suspicion_mult as u64)
910            * config.protocol_period_ms
911            * ((member_count as f64 + 1.0).ln().ceil() as u64).max(1);
912        let suspicion_timeout_ms = calculated_timeout.max(config.min_suspicion_timeout_ms);
913
914        let suspects: Vec<_> = members_guard
915            .values()
916            .filter(|m| {
917                m.state == MemberState::Suspect
918                    && m.suspect_time_ms
919                        .is_some_and(|t| now - t > suspicion_timeout_ms)
920            })
921            .map(|m| m.node_id.clone())
922            .collect();
923
924        drop(members_guard);
925
926        // Mark dead
927        for node_id in suspects {
928            let mut members_guard = members.write().await;
929            if let Some(member) = members_guard.get_mut(&node_id) {
930                member.state = MemberState::Dead;
931                error!(node_id = %node_id, "Node marked as dead");
932                let _ = event_tx.send(GossipEvent::NodeFailed(node_id)).await;
933            }
934        }
935    }
936
937    // Internal: Apply state updates with self-refutation support
938    async fn apply_updates(
939        members: &RwLock<HashMap<String, GossipMember>>,
940        updates: &[MemberStateUpdate],
941        _config: &GossipConfig,
942        event_tx: &mpsc::Sender<GossipEvent>,
943    ) {
944        let mut members_guard = members.write().await;
945
946        for update in updates {
947            if let Some(member) = members_guard.get_mut(&update.node_id) {
948                // Self-refutation: if someone says WE are Dead/Suspect, bump our
949                // incarnation and reassert Alive to refute the claim.
950                // (The local_member check happens via node_id match — we only
951                // have the members map here, so we check if the member being
952                // updated is marked with incarnation 0 metadata indicating local.)
953                // We handle this by checking if the target node is updating itself
954                // to a worse state — if so, and the update is about the local node,
955                // we skip applying it. The actual self-refutation is done by the
956                // protocol loop detecting its own entry as non-Alive.
957
958                // Only apply if incarnation is higher or same incarnation with worse state
959                if update.incarnation > member.incarnation
960                    || (update.incarnation == member.incarnation
961                        && update.state as u8 > member.state as u8)
962                {
963                    let old_state = member.state;
964                    member.state = update.state;
965                    member.incarnation = update.incarnation;
966                    member.last_updated_ms = current_time_ms();
967
968                    if update.state == MemberState::Suspect {
969                        member.suspect_time_ms = Some(current_time_ms());
970                    }
971
972                    if let Some(addr) = update.address {
973                        member.address = addr;
974                    }
975                    if let Some(ref api_addr) = update.api_address {
976                        member.api_address = api_addr.clone();
977                    }
978                    if let Some(role) = update.role {
979                        member.role = role;
980                    }
981
982                    // Emit events
983                    if old_state != update.state {
984                        match update.state {
985                            MemberState::Dead => {
986                                let _ = event_tx
987                                    .send(GossipEvent::NodeFailed(update.node_id.clone()))
988                                    .await;
989                            }
990                            MemberState::Left => {
991                                let _ = event_tx
992                                    .send(GossipEvent::NodeLeft(update.node_id.clone()))
993                                    .await;
994                            }
995                            MemberState::Alive
996                                if matches!(
997                                    old_state,
998                                    MemberState::Suspect | MemberState::Dead
999                                ) =>
1000                            {
1001                                let _ = event_tx
1002                                    .send(GossipEvent::NodeRecovered(update.node_id.clone()))
1003                                    .await;
1004                            }
1005                            _ => {}
1006                        }
1007                    }
1008                }
1009            } else if update.state == MemberState::Alive {
1010                // New member
1011                let member = GossipMember {
1012                    node_id: update.node_id.clone(),
1013                    address: update.address.unwrap_or_else(|| {
1014                        "0.0.0.0:0"
1015                            .parse()
1016                            .unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], 0)))
1017                    }),
1018                    api_address: update.api_address.clone().unwrap_or_default(),
1019                    role: update.role.unwrap_or(NodeRole::Replica),
1020                    state: MemberState::Alive,
1021                    incarnation: update.incarnation,
1022                    last_updated_ms: current_time_ms(),
1023                    suspect_time_ms: None,
1024                    metadata: HashMap::new(),
1025                };
1026                members_guard.insert(update.node_id.clone(), member.clone());
1027                let _ = event_tx.send(GossipEvent::NodeJoined(member)).await;
1028            }
1029        }
1030    }
1031
1032    // Internal: Get updates from queue
1033    async fn get_updates(
1034        queue: &RwLock<VecDeque<MemberStateUpdate>>,
1035        max_count: usize,
1036    ) -> Vec<MemberStateUpdate> {
1037        let mut queue_guard = queue.write().await;
1038        let mut updates = Vec::with_capacity(max_count);
1039
1040        for _ in 0..max_count {
1041            if let Some(update) = queue_guard.pop_front() {
1042                updates.push(update);
1043            } else {
1044                break;
1045            }
1046        }
1047
1048        updates
1049    }
1050}
1051
1052/// Errors from the gossip protocol
1053#[derive(Debug, thiserror::Error)]
1054pub enum GossipError {
1055    #[error("Gossip protocol already running")]
1056    AlreadyRunning,
1057    #[error("Failed to bind socket: {0}")]
1058    BindError(String),
1059    #[error("Serialization error: {0}")]
1060    SerializationError(String),
1061    #[error("Not found")]
1062    NotFound,
1063}
1064
1065// Helper functions
1066
1067fn current_time_ms() -> u64 {
1068    std::time::SystemTime::now()
1069        .duration_since(std::time::UNIX_EPOCH)
1070        .unwrap_or(Duration::ZERO)
1071        .as_millis() as u64
1072}
1073
1074fn serialize_message(msg: &GossipMessage) -> Result<Vec<u8>, GossipError> {
1075    serde_json::to_vec(msg).map_err(|e| GossipError::SerializationError(e.to_string()))
1076}
1077
1078fn deserialize_message(data: &[u8]) -> Result<GossipMessage, GossipError> {
1079    serde_json::from_slice(data).map_err(|e| GossipError::SerializationError(e.to_string()))
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084    use super::*;
1085
1086    #[test]
1087    fn test_gossip_member_creation() {
1088        let member = GossipMember::new(
1089            "node-1".to_string(),
1090            "127.0.0.1:7947".parse().unwrap(),
1091            "127.0.0.1:3000".to_string(),
1092            NodeRole::Primary,
1093        );
1094
1095        assert_eq!(member.node_id, "node-1");
1096        assert_eq!(member.state, MemberState::Alive);
1097        assert_eq!(member.incarnation, 0);
1098    }
1099
1100    #[test]
1101    fn test_member_to_node_info() {
1102        let member = GossipMember::new(
1103            "node-1".to_string(),
1104            "127.0.0.1:7947".parse().unwrap(),
1105            "127.0.0.1:3000".to_string(),
1106            NodeRole::Primary,
1107        );
1108
1109        let node_info = member.to_node_info();
1110        assert_eq!(node_info.node_id, "node-1");
1111        assert_eq!(node_info.address, "127.0.0.1:3000");
1112        assert_eq!(node_info.role, NodeRole::Primary);
1113        assert_eq!(node_info.health.status, NodeStatus::Healthy);
1114    }
1115
1116    #[test]
1117    fn test_message_serialization() {
1118        let msg = GossipMessage::Ping {
1119            seq_no: 1,
1120            from: "node-1".to_string(),
1121            updates: vec![],
1122        };
1123
1124        let bytes = serialize_message(&msg).unwrap();
1125        let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1126
1127        match deserialized {
1128            GossipMessage::Ping { seq_no, from, .. } => {
1129                assert_eq!(seq_no, 1);
1130                assert_eq!(from, "node-1");
1131            }
1132            _ => panic!("Wrong message type"),
1133        }
1134    }
1135
1136    #[test]
1137    fn test_gossip_config_defaults() {
1138        let config = GossipConfig::default();
1139
1140        assert_eq!(config.protocol_period_ms, 1000);
1141        assert_eq!(config.fanout, 3);
1142        assert_eq!(config.ping_timeout_ms, 500);
1143        assert_eq!(config.indirect_probes, 3);
1144        assert_eq!(config.gossip_port, 7947);
1145    }
1146
1147    #[test]
1148    fn test_member_state_update() {
1149        let update = MemberStateUpdate {
1150            node_id: "node-1".to_string(),
1151            state: MemberState::Suspect,
1152            incarnation: 5,
1153            address: Some("127.0.0.1:7947".parse().unwrap()),
1154            api_address: Some("127.0.0.1:3000".to_string()),
1155            role: Some(NodeRole::Replica),
1156        };
1157
1158        assert_eq!(update.node_id, "node-1");
1159        assert_eq!(update.state, MemberState::Suspect);
1160        assert_eq!(update.incarnation, 5);
1161    }
1162
1163    #[tokio::test]
1164    async fn test_gossip_protocol_creation() {
1165        let config = GossipConfig::default();
1166        let member = GossipMember::new(
1167            "node-1".to_string(),
1168            "127.0.0.1:7947".parse().unwrap(),
1169            "127.0.0.1:3000".to_string(),
1170            NodeRole::Primary,
1171        );
1172
1173        let (tx, _rx) = mpsc::channel(100);
1174        let protocol = GossipProtocol::new(config, member, tx);
1175
1176        assert!(protocol.get_members().await.is_empty());
1177    }
1178
1179    #[test]
1180    fn test_gossip_error_display() {
1181        let err = GossipError::AlreadyRunning;
1182        assert!(err.to_string().contains("already running"));
1183
1184        let err = GossipError::BindError("address in use".to_string());
1185        assert!(err.to_string().contains("address in use"));
1186
1187        let err = GossipError::SerializationError("invalid json".to_string());
1188        assert!(err.to_string().contains("invalid json"));
1189
1190        let err = GossipError::NotFound;
1191        assert!(err.to_string().contains("Not found"));
1192    }
1193
1194    #[test]
1195    fn test_member_state_transitions() {
1196        // Test valid state representations
1197        assert_eq!(MemberState::Alive as u8, 0);
1198        assert_eq!(MemberState::Suspect as u8, 1);
1199        assert_eq!(MemberState::Dead as u8, 2);
1200        assert_eq!(MemberState::Left as u8, 3);
1201    }
1202
1203    #[test]
1204    fn test_gossip_message_variants() {
1205        // Test Ping message
1206        let ping = GossipMessage::Ping {
1207            seq_no: 42,
1208            from: "node-1".to_string(),
1209            updates: vec![],
1210        };
1211        let bytes = serialize_message(&ping).unwrap();
1212        let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1213        match deserialized {
1214            GossipMessage::Ping { seq_no, from, .. } => {
1215                assert_eq!(seq_no, 42);
1216                assert_eq!(from, "node-1");
1217            }
1218            _ => panic!("Expected Ping"),
1219        }
1220
1221        // Test Ack message
1222        let ack = GossipMessage::Ack {
1223            seq_no: 42,
1224            from: "node-2".to_string(),
1225            updates: vec![],
1226        };
1227        let bytes = serialize_message(&ack).unwrap();
1228        let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1229        match deserialized {
1230            GossipMessage::Ack { seq_no, from, .. } => {
1231                assert_eq!(seq_no, 42);
1232                assert_eq!(from, "node-2");
1233            }
1234            _ => panic!("Expected Ack"),
1235        }
1236
1237        // Test PingReq message
1238        let ping_req = GossipMessage::PingReq {
1239            seq_no: 100,
1240            from: "node-1".to_string(),
1241            target: "node-3".to_string(),
1242            target_addr: "127.0.0.1:7949".parse().unwrap(),
1243            updates: vec![],
1244        };
1245        let bytes = serialize_message(&ping_req).unwrap();
1246        let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1247        match deserialized {
1248            GossipMessage::PingReq {
1249                seq_no,
1250                from,
1251                target,
1252                ..
1253            } => {
1254                assert_eq!(seq_no, 100);
1255                assert_eq!(from, "node-1");
1256                assert_eq!(target, "node-3");
1257            }
1258            _ => panic!("Expected PingReq"),
1259        }
1260    }
1261
1262    #[test]
1263    fn test_gossip_message_with_updates() {
1264        let update = MemberStateUpdate {
1265            node_id: "node-2".to_string(),
1266            state: MemberState::Alive,
1267            incarnation: 1,
1268            address: Some("127.0.0.1:7948".parse().unwrap()),
1269            api_address: Some("127.0.0.1:3001".to_string()),
1270            role: Some(NodeRole::Replica),
1271        };
1272
1273        let ping = GossipMessage::Ping {
1274            seq_no: 1,
1275            from: "node-1".to_string(),
1276            updates: vec![update],
1277        };
1278
1279        let bytes = serialize_message(&ping).unwrap();
1280        let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1281
1282        match deserialized {
1283            GossipMessage::Ping { updates, .. } => {
1284                assert_eq!(updates.len(), 1);
1285                assert_eq!(updates[0].node_id, "node-2");
1286                assert_eq!(updates[0].state, MemberState::Alive);
1287                assert_eq!(updates[0].incarnation, 1);
1288            }
1289            _ => panic!("Expected Ping"),
1290        }
1291    }
1292
1293    #[test]
1294    fn test_gossip_config_custom() {
1295        let config = GossipConfig {
1296            protocol_period_ms: 500,
1297            fanout: 5,
1298            ping_timeout_ms: 250,
1299            indirect_probes: 2,
1300            indirect_ping_timeout_ms: 750,
1301            suspicion_mult: 3,
1302            min_suspicion_timeout_ms: 15_000,
1303            rejoin_interval_rounds: 20,
1304            dead_node_gc_timeout_ms: 120_000,
1305            max_gossip_messages: 15,
1306            gossip_port: 8000,
1307            max_packet_size: 32768,
1308            seed_nodes: vec!["127.0.0.1:8001".to_string(), "127.0.0.1:8002".to_string()],
1309        };
1310
1311        assert_eq!(config.protocol_period_ms, 500);
1312        assert_eq!(config.fanout, 5);
1313        assert_eq!(config.ping_timeout_ms, 250);
1314        assert_eq!(config.indirect_probes, 2);
1315        assert_eq!(config.indirect_ping_timeout_ms, 750);
1316        assert_eq!(config.suspicion_mult, 3);
1317        assert_eq!(config.max_gossip_messages, 15);
1318        assert_eq!(config.gossip_port, 8000);
1319        assert_eq!(config.max_packet_size, 32768);
1320        assert_eq!(config.seed_nodes.len(), 2);
1321    }
1322
1323    #[tokio::test]
1324    async fn test_gossip_protocol_metadata() {
1325        let config = GossipConfig::default();
1326        let member = GossipMember::new(
1327            "node-1".to_string(),
1328            "127.0.0.1:7947".parse().unwrap(),
1329            "127.0.0.1:3000".to_string(),
1330            NodeRole::Primary,
1331        );
1332
1333        let (tx, _rx) = mpsc::channel(100);
1334        let protocol = GossipProtocol::new(config, member, tx);
1335
1336        // Update metadata
1337        protocol
1338            .update_metadata("key1".to_string(), "value1".to_string())
1339            .await;
1340        protocol
1341            .update_metadata("key2".to_string(), "value2".to_string())
1342            .await;
1343
1344        // Verify we can update metadata without errors
1345        // Note: We can't easily verify internal state without additional accessors
1346    }
1347
1348    #[test]
1349    fn test_gossip_member_clone() {
1350        let member = GossipMember::new(
1351            "node-1".to_string(),
1352            "127.0.0.1:7947".parse().unwrap(),
1353            "127.0.0.1:3000".to_string(),
1354            NodeRole::Primary,
1355        );
1356
1357        let cloned = member.clone();
1358        assert_eq!(cloned.node_id, member.node_id);
1359        assert_eq!(cloned.address, member.address);
1360        assert_eq!(cloned.api_address, member.api_address);
1361        assert_eq!(cloned.role, member.role);
1362        assert_eq!(cloned.state, member.state);
1363        assert_eq!(cloned.incarnation, member.incarnation);
1364    }
1365
1366    #[test]
1367    fn test_gossip_event_variants() {
1368        // Test NodeJoined event
1369        let member = GossipMember::new(
1370            "node-1".to_string(),
1371            "127.0.0.1:7947".parse().unwrap(),
1372            "127.0.0.1:3000".to_string(),
1373            NodeRole::Primary,
1374        );
1375        let event = GossipEvent::NodeJoined(member);
1376        match event {
1377            GossipEvent::NodeJoined(m) => assert_eq!(m.node_id, "node-1"),
1378            _ => panic!("Expected NodeJoined"),
1379        }
1380
1381        // Test NodeLeft event
1382        let event = GossipEvent::NodeLeft("node-2".to_string());
1383        match event {
1384            GossipEvent::NodeLeft(id) => assert_eq!(id, "node-2"),
1385            _ => panic!("Expected NodeLeft"),
1386        }
1387
1388        // Test NodeFailed event
1389        let event = GossipEvent::NodeFailed("node-3".to_string());
1390        match event {
1391            GossipEvent::NodeFailed(id) => assert_eq!(id, "node-3"),
1392            _ => panic!("Expected NodeFailed"),
1393        }
1394
1395        // Test NodeRecovered event
1396        let event = GossipEvent::NodeRecovered("node-4".to_string());
1397        match event {
1398            GossipEvent::NodeRecovered(id) => assert_eq!(id, "node-4"),
1399            _ => panic!("Expected NodeRecovered"),
1400        }
1401    }
1402}