1use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::net::SocketAddr;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::net::UdpSocket;
19use tokio::sync::{mpsc, RwLock};
20use tracing::{debug, error, info, warn};
21
22use super::{NodeHealth, NodeInfo, NodeRole, NodeStatus};
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct GossipConfig {
27 pub protocol_period_ms: u64,
29 pub fanout: usize,
31 pub ping_timeout_ms: u64,
33 pub indirect_probes: usize,
35 pub indirect_ping_timeout_ms: u64,
37 pub suspicion_mult: u32,
39 pub min_suspicion_timeout_ms: u64,
41 pub rejoin_interval_rounds: u64,
43 pub dead_node_gc_timeout_ms: u64,
45 pub max_gossip_messages: usize,
47 pub gossip_port: u16,
49 pub max_packet_size: usize,
51 pub seed_nodes: Vec<String>,
53}
54
55impl Default for GossipConfig {
56 fn default() -> Self {
57 Self {
58 protocol_period_ms: 1000,
59 fanout: 3,
60 ping_timeout_ms: 500,
61 indirect_probes: 3,
62 indirect_ping_timeout_ms: 1000,
63 suspicion_mult: 6,
64 min_suspicion_timeout_ms: 30_000, rejoin_interval_rounds: 30, dead_node_gc_timeout_ms: 300_000, max_gossip_messages: 10,
68 gossip_port: 7947,
69 max_packet_size: 65507, seed_nodes: Vec::new(),
71 }
72 }
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum MemberState {
78 Alive,
80 Suspect,
82 Dead,
84 Left,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct GossipMember {
91 pub node_id: String,
93 pub address: SocketAddr,
95 pub api_address: String,
97 pub role: NodeRole,
99 pub state: MemberState,
101 pub incarnation: u64,
103 pub last_updated_ms: u64,
105 pub suspect_time_ms: Option<u64>,
107 pub metadata: HashMap<String, String>,
109}
110
111impl GossipMember {
112 pub fn new(node_id: String, address: SocketAddr, api_address: String, role: NodeRole) -> Self {
114 Self {
115 node_id,
116 address,
117 api_address,
118 role,
119 state: MemberState::Alive,
120 incarnation: 0,
121 last_updated_ms: current_time_ms(),
122 suspect_time_ms: None,
123 metadata: HashMap::new(),
124 }
125 }
126
127 pub fn to_node_info(&self) -> NodeInfo {
129 let mut info = NodeInfo::new(self.node_id.clone(), self.api_address.clone(), self.role);
130 info.health = NodeHealth {
131 status: match self.state {
132 MemberState::Alive => NodeStatus::Healthy,
133 MemberState::Suspect => NodeStatus::Suspect,
134 MemberState::Dead | MemberState::Left => NodeStatus::Offline,
135 },
136 last_healthy_ms: self.last_updated_ms,
137 ..Default::default()
138 };
139 info.metadata = self.metadata.clone();
140 info
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub enum GossipMessage {
147 Ping {
149 seq_no: u64,
150 from: String,
151 updates: Vec<MemberStateUpdate>,
153 },
154 Ack {
156 seq_no: u64,
157 from: String,
158 updates: Vec<MemberStateUpdate>,
159 },
160 PingReq {
162 seq_no: u64,
163 from: String,
164 target: String,
165 target_addr: SocketAddr,
166 updates: Vec<MemberStateUpdate>,
167 },
168 IndirectAck {
170 seq_no: u64,
171 from: String,
172 target: String,
173 updates: Vec<MemberStateUpdate>,
174 },
175 Join { member: GossipMember },
177 JoinAck { members: Vec<GossipMember> },
179 Leave { node_id: String, incarnation: u64 },
181 SyncRequest { from: String },
183 SyncResponse { members: Vec<GossipMember> },
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct MemberStateUpdate {
190 pub node_id: String,
191 pub state: MemberState,
192 pub incarnation: u64,
193 pub address: Option<SocketAddr>,
194 pub api_address: Option<String>,
195 pub role: Option<NodeRole>,
196}
197
198#[derive(Debug, Clone)]
200pub enum GossipEvent {
201 NodeJoined(GossipMember),
203 NodeLeft(String),
205 NodeFailed(String),
207 NodeRecovered(String),
209 NodeUpdated(GossipMember),
211}
212
213pub struct GossipProtocol {
215 config: GossipConfig,
217 local_member: GossipMember,
219 members: Arc<RwLock<HashMap<String, GossipMember>>>,
221 _seq_no: AtomicU64,
223 pending_pings: Arc<RwLock<HashMap<u64, PendingPing>>>,
225 update_queue: Arc<RwLock<VecDeque<MemberStateUpdate>>>,
227 probe_index: Arc<RwLock<usize>>,
229 running: Arc<AtomicBool>,
231 event_tx: mpsc::Sender<GossipEvent>,
233}
234
235struct GossipContext {
237 members: Arc<RwLock<HashMap<String, GossipMember>>>,
238 pending_pings: Arc<RwLock<HashMap<u64, PendingPing>>>,
239 update_queue: Arc<RwLock<VecDeque<MemberStateUpdate>>>,
240 config: GossipConfig,
241 local_member: GossipMember,
242 event_tx: mpsc::Sender<GossipEvent>,
243}
244
245struct PendingPing {
247 target: String,
248 sent_at: Instant,
249 _is_indirect: bool,
250}
251
252impl GossipProtocol {
253 pub fn new(
255 config: GossipConfig,
256 local_member: GossipMember,
257 event_tx: mpsc::Sender<GossipEvent>,
258 ) -> Self {
259 Self {
260 config,
261 local_member,
262 members: Arc::new(RwLock::new(HashMap::new())),
263 _seq_no: AtomicU64::new(0),
264 pending_pings: Arc::new(RwLock::new(HashMap::new())),
265 update_queue: Arc::new(RwLock::new(VecDeque::new())),
266 probe_index: Arc::new(RwLock::new(0)),
267 running: Arc::new(AtomicBool::new(false)),
268 event_tx,
269 }
270 }
271
272 pub async fn start(&self) -> Result<(), GossipError> {
274 if self.running.swap(true, Ordering::SeqCst) {
275 return Err(GossipError::AlreadyRunning);
276 }
277
278 let bind_addr = format!("0.0.0.0:{}", self.config.gossip_port);
280 let socket = Arc::new(
281 UdpSocket::bind(&bind_addr)
282 .await
283 .map_err(|e| GossipError::BindError(e.to_string()))?,
284 );
285
286 info!(
287 node_id = %self.local_member.node_id,
288 address = %bind_addr,
289 "Gossip protocol started"
290 );
291
292 {
294 let mut members = self.members.write().await;
295 members.insert(self.local_member.node_id.clone(), self.local_member.clone());
296 }
297
298 self.join_seeds(&socket).await;
300
301 let socket_clone = socket.clone();
303 let running = self.running.clone();
304 let members = self.members.clone();
305 let pending_pings = self.pending_pings.clone();
306 let update_queue = self.update_queue.clone();
307 let config = self.config.clone();
308 let local_member = self.local_member.clone();
309 let event_tx = self.event_tx.clone();
310 let probe_index = self.probe_index.clone();
311
312 let recv_ctx = GossipContext {
314 members: members.clone(),
315 pending_pings: pending_pings.clone(),
316 update_queue: update_queue.clone(),
317 config: config.clone(),
318 local_member: local_member.clone(),
319 event_tx: event_tx.clone(),
320 };
321
322 let socket_recv = socket.clone();
323 tokio::spawn(async move {
324 Self::receiver_loop(socket_recv, recv_ctx).await;
325 });
326
327 let loop_ctx = GossipContext {
329 members,
330 pending_pings,
331 update_queue,
332 config,
333 local_member,
334 event_tx,
335 };
336
337 tokio::spawn(async move {
338 Self::protocol_loop(socket_clone, running, loop_ctx, probe_index).await;
339 });
340
341 Ok(())
342 }
343
344 pub fn stop(&self) {
346 self.running.store(false, Ordering::SeqCst);
347 info!(
348 node_id = %self.local_member.node_id,
349 "Gossip protocol stopped"
350 );
351 }
352
353 pub async fn get_members(&self) -> Vec<GossipMember> {
355 let members = self.members.read().await;
356 members.values().cloned().collect()
357 }
358
359 pub async fn get_alive_members(&self) -> Vec<GossipMember> {
361 let members = self.members.read().await;
362 members
363 .values()
364 .filter(|m| m.state == MemberState::Alive)
365 .cloned()
366 .collect()
367 }
368
369 pub async fn get_member(&self, node_id: &str) -> Option<GossipMember> {
371 let members = self.members.read().await;
372 members.get(node_id).cloned()
373 }
374
375 pub async fn update_metadata(&self, key: String, value: String) {
377 let mut members = self.members.write().await;
378 if let Some(member) = members.get_mut(&self.local_member.node_id) {
379 member.metadata.insert(key, value);
380 member.incarnation += 1;
381 member.last_updated_ms = current_time_ms();
382 }
383 }
384
385 pub async fn leave(&self) -> Result<(), GossipError> {
387 let members = self.members.read().await;
388 let local = members
389 .get(&self.local_member.node_id)
390 .ok_or(GossipError::NotFound)?;
391
392 let leave_msg = GossipMessage::Leave {
393 node_id: self.local_member.node_id.clone(),
394 incarnation: local.incarnation + 1,
395 };
396
397 let msg_bytes = serialize_message(&leave_msg)?;
399 let socket = UdpSocket::bind("0.0.0.0:0")
400 .await
401 .map_err(|e| GossipError::BindError(e.to_string()))?;
402
403 for member in members.values() {
404 if member.node_id != self.local_member.node_id {
405 let _ = socket.send_to(&msg_bytes, member.address).await;
406 }
407 }
408
409 self.stop();
410 Ok(())
411 }
412
413 async fn join_seeds(&self, socket: &UdpSocket) {
415 for seed in &self.config.seed_nodes {
416 let resolved = if let Ok(addr) = seed.parse::<SocketAddr>() {
418 Some(addr)
419 } else if let Ok(mut addrs) = tokio::net::lookup_host(seed.as_str()).await {
420 addrs.next()
421 } else {
422 warn!(seed = %seed, "Failed to resolve seed node address");
423 None
424 };
425
426 if let Some(addr) = resolved {
427 let join_msg = GossipMessage::Join {
428 member: self.local_member.clone(),
429 };
430
431 if let Ok(bytes) = serialize_message(&join_msg) {
432 info!(seed = %seed, resolved = %addr, "Sending join request to seed node");
433 let _ = socket.send_to(&bytes, addr).await;
434 }
435 }
436 }
437 }
438
439 async fn receiver_loop(socket: Arc<UdpSocket>, ctx: GossipContext) {
441 let mut buf = vec![0u8; ctx.config.max_packet_size];
442
443 loop {
444 match socket.recv_from(&mut buf).await {
445 Ok((len, src)) => {
446 if let Ok(msg) = deserialize_message(&buf[..len]) {
447 Self::handle_message(&socket, msg, src, &ctx).await;
448 }
449 }
450 Err(e) => {
451 error!(error = %e, "Error receiving gossip message");
452 }
453 }
454 }
455 }
456
457 async fn handle_message(
459 socket: &UdpSocket,
460 msg: GossipMessage,
461 src: SocketAddr,
462 ctx: &GossipContext,
463 ) {
464 match msg {
465 GossipMessage::Ping {
466 seq_no,
467 from,
468 updates,
469 } => {
470 Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
472
473 {
475 let mut members_guard = ctx.members.write().await;
476 if let Some(member) = members_guard.get_mut(&from) {
477 member.last_updated_ms = current_time_ms();
478 }
479 }
480
481 let reply_updates =
483 Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
484
485 let ack = GossipMessage::Ack {
487 seq_no,
488 from: ctx.local_member.node_id.clone(),
489 updates: reply_updates,
490 };
491
492 if let Ok(bytes) = serialize_message(&ack) {
493 let _ = socket.send_to(&bytes, src).await;
494 }
495
496 debug!(from = %from, seq = seq_no, "Received ping, sent ack");
497 }
498
499 GossipMessage::Ack {
500 seq_no,
501 from,
502 updates,
503 } => {
504 Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
506
507 {
509 let mut members_guard = ctx.members.write().await;
510 if let Some(member) = members_guard.get_mut(&from) {
511 member.last_updated_ms = current_time_ms();
512 }
513 }
514
515 let mut pending = ctx.pending_pings.write().await;
517 if pending.remove(&seq_no).is_some() {
518 debug!(from = %from, seq = seq_no, "Received ack");
519 }
520 }
521
522 GossipMessage::PingReq {
523 seq_no,
524 from,
525 target,
526 target_addr,
527 updates,
528 } => {
529 Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
531
532 let ping = GossipMessage::Ping {
534 seq_no,
535 from: ctx.local_member.node_id.clone(),
536 updates: Vec::new(),
537 };
538
539 if let Ok(bytes) = serialize_message(&ping) {
540 let _ = socket.send_to(&bytes, target_addr).await;
541 }
542
543 debug!(from = %from, target = %target, "Forwarding indirect ping");
544 }
545
546 GossipMessage::IndirectAck {
547 seq_no,
548 from,
549 target,
550 updates,
551 } => {
552 Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
554
555 let mut pending = ctx.pending_pings.write().await;
557 pending.remove(&seq_no);
558
559 debug!(from = %from, target = %target, "Received indirect ack");
560 }
561
562 GossipMessage::Join { member } => {
563 info!(node_id = %member.node_id, address = %member.address, "Node joining cluster");
564
565 let mut members_guard = ctx.members.write().await;
567 let was_dead_or_left = members_guard
568 .get(&member.node_id)
569 .map(|m| matches!(m.state, MemberState::Dead | MemberState::Left))
570 .unwrap_or(false);
571 let is_new = !members_guard.contains_key(&member.node_id);
572 members_guard.insert(member.node_id.clone(), member.clone());
573
574 let all_members: Vec<GossipMember> = members_guard.values().cloned().collect();
576 drop(members_guard);
577
578 let join_ack = GossipMessage::JoinAck {
579 members: all_members,
580 };
581 if let Ok(bytes) = serialize_message(&join_ack) {
582 let _ = socket.send_to(&bytes, src).await;
583 }
584
585 if is_new {
586 let _ = ctx.event_tx.send(GossipEvent::NodeJoined(member)).await;
587 } else if was_dead_or_left {
588 info!(node_id = %member.node_id, "Dead/left node rejoined cluster");
589 let _ = ctx
590 .event_tx
591 .send(GossipEvent::NodeRecovered(member.node_id.clone()))
592 .await;
593 }
594 }
595
596 GossipMessage::JoinAck {
597 members: new_members,
598 } => {
599 let mut members_guard = ctx.members.write().await;
600 for member in new_members {
601 if !members_guard.contains_key(&member.node_id) {
602 members_guard.insert(member.node_id.clone(), member.clone());
603 let _ = ctx.event_tx.send(GossipEvent::NodeJoined(member)).await;
604 }
605 }
606 info!(count = members_guard.len(), "Received cluster membership");
607 }
608
609 GossipMessage::Leave {
610 node_id,
611 incarnation,
612 } => {
613 let mut members_guard = ctx.members.write().await;
614 if let Some(member) = members_guard.get_mut(&node_id) {
615 if incarnation >= member.incarnation {
616 member.state = MemberState::Left;
617 member.incarnation = incarnation;
618 info!(node_id = %node_id, "Node left cluster gracefully");
619 let _ = ctx.event_tx.send(GossipEvent::NodeLeft(node_id)).await;
620 }
621 }
622 }
623
624 GossipMessage::SyncRequest { from } => {
625 let members_guard = ctx.members.read().await;
626 let all_members: Vec<GossipMember> = members_guard.values().cloned().collect();
627
628 let sync_response = GossipMessage::SyncResponse {
629 members: all_members,
630 };
631 if let Ok(bytes) = serialize_message(&sync_response) {
632 let _ = socket.send_to(&bytes, src).await;
633 }
634
635 debug!(from = %from, "Handled sync request");
636 }
637
638 GossipMessage::SyncResponse {
639 members: new_members,
640 } => {
641 let mut members_guard = ctx.members.write().await;
642 for member in new_members {
643 members_guard
644 .entry(member.node_id.clone())
645 .and_modify(|existing| {
646 if member.incarnation > existing.incarnation {
647 *existing = member.clone();
648 }
649 })
650 .or_insert(member);
651 }
652 }
653 }
654 }
655
656 async fn protocol_loop(
658 socket: Arc<UdpSocket>,
659 running: Arc<AtomicBool>,
660 ctx: GossipContext,
661 probe_index: Arc<RwLock<usize>>,
662 ) {
663 let protocol_period = Duration::from_millis(ctx.config.protocol_period_ms);
664 let mut seq_counter = 0u64;
665 let mut round_counter = 0u64;
666
667 while running.load(Ordering::SeqCst) {
668 tokio::time::sleep(protocol_period).await;
669 round_counter += 1;
670
671 if ctx.config.rejoin_interval_rounds > 0
673 && round_counter.is_multiple_of(ctx.config.rejoin_interval_rounds)
674 {
675 let alive_count = {
676 let members_guard = ctx.members.read().await;
677 members_guard
678 .values()
679 .filter(|m| {
680 m.node_id != ctx.local_member.node_id && m.state == MemberState::Alive
681 })
682 .count()
683 };
684 if alive_count < ctx.config.seed_nodes.len() {
686 for seed in &ctx.config.seed_nodes {
687 let resolved = if let Ok(addr) = seed.parse::<SocketAddr>() {
688 Some(addr)
689 } else if let Ok(mut addrs) = tokio::net::lookup_host(seed.as_str()).await {
690 addrs.next()
691 } else {
692 None
693 };
694 if let Some(addr) = resolved {
695 let join_msg = GossipMessage::Join {
696 member: ctx.local_member.clone(),
697 };
698 if let Ok(bytes) = serialize_message(&join_msg) {
699 debug!(seed = %seed, "Periodic seed re-join attempt");
700 let _ = socket.send_to(&bytes, addr).await;
701 }
702 }
703 }
704 }
705 }
706
707 if ctx.config.dead_node_gc_timeout_ms > 0 && round_counter.is_multiple_of(60) {
709 let now = current_time_ms();
710 let mut members_guard = ctx.members.write().await;
711 let gc_candidates: Vec<String> = members_guard
712 .values()
713 .filter(|m| {
714 matches!(m.state, MemberState::Dead | MemberState::Left)
715 && now.saturating_sub(m.last_updated_ms)
716 > ctx.config.dead_node_gc_timeout_ms
717 })
718 .map(|m| m.node_id.clone())
719 .collect();
720 for node_id in &gc_candidates {
721 members_guard.remove(node_id);
722 info!(node_id = %node_id, "Garbage collected dead/left node from member list");
723 }
724 drop(members_guard);
725 }
726
727 let target = {
729 let members_guard = ctx.members.read().await;
730 let other_members: Vec<_> = members_guard
731 .values()
732 .filter(|m| {
733 m.node_id != ctx.local_member.node_id
734 && matches!(m.state, MemberState::Alive | MemberState::Suspect)
735 })
736 .collect();
737
738 if other_members.is_empty() {
739 continue;
740 }
741
742 let mut idx = probe_index.write().await;
743 *idx = (*idx + 1) % other_members.len();
744 other_members[*idx].clone()
745 };
746
747 seq_counter += 1;
749 let updates =
750 Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
751 let ping = GossipMessage::Ping {
752 seq_no: seq_counter,
753 from: ctx.local_member.node_id.clone(),
754 updates,
755 };
756
757 if let Ok(bytes) = serialize_message(&ping) {
758 let _ = socket.send_to(&bytes, target.address).await;
759
760 let mut pending = ctx.pending_pings.write().await;
762 pending.insert(
763 seq_counter,
764 PendingPing {
765 target: target.node_id.clone(),
766 sent_at: Instant::now(),
767 _is_indirect: false,
768 },
769 );
770 }
771
772 tokio::time::sleep(Duration::from_millis(ctx.config.ping_timeout_ms)).await;
774
775 let timed_out = {
776 let pending = ctx.pending_pings.read().await;
777 pending
778 .iter()
779 .filter(|(_, p)| {
780 p.sent_at.elapsed() > Duration::from_millis(ctx.config.ping_timeout_ms)
781 })
782 .map(|(seq, p)| (*seq, p.target.clone()))
783 .collect::<Vec<_>>()
784 };
785
786 for (seq, target_id) in timed_out {
787 Self::try_indirect_probes(&socket, &ctx, &target_id, seq).await;
789 }
790
791 Self::check_suspicions(&ctx.members, &ctx.config, &ctx.event_tx).await;
793
794 {
797 let mut members_guard = ctx.members.write().await;
798 if let Some(local) = members_guard.get_mut(&ctx.local_member.node_id) {
799 if matches!(local.state, MemberState::Suspect | MemberState::Dead) {
800 let old_state = local.state;
801 local.incarnation += 1;
802 local.state = MemberState::Alive;
803 local.suspect_time_ms = None;
804 local.last_updated_ms = current_time_ms();
805 warn!(
806 old_state = ?old_state,
807 new_incarnation = local.incarnation,
808 "Self-refutation: local node was marked {:?}, reasserting Alive", old_state
809 );
810
811 let mut queue = ctx.update_queue.write().await;
813 queue.push_back(MemberStateUpdate {
814 node_id: ctx.local_member.node_id.clone(),
815 state: MemberState::Alive,
816 incarnation: local.incarnation,
817 address: Some(local.address),
818 api_address: Some(local.api_address.clone()),
819 role: Some(local.role),
820 });
821 }
822 }
823 }
824 }
825 }
826
827 async fn try_indirect_probes(
829 socket: &UdpSocket,
830 ctx: &GossipContext,
831 target_id: &str,
832 _seq: u64,
833 ) {
834 let members_guard = ctx.members.read().await;
835
836 let target = match members_guard.get(target_id) {
837 Some(t) => t.clone(),
838 None => return,
839 };
840
841 let indirect_nodes: Vec<_> = members_guard
843 .values()
844 .filter(|m| {
845 m.node_id != ctx.local_member.node_id
846 && m.node_id != target_id
847 && m.state == MemberState::Alive
848 })
849 .take(ctx.config.indirect_probes)
850 .cloned()
851 .collect();
852
853 drop(members_guard);
854
855 for node in indirect_nodes {
857 let updates =
858 Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
859 let ping_req = GossipMessage::PingReq {
860 seq_no: rand::random(),
861 from: ctx.local_member.node_id.clone(),
862 target: target_id.to_string(),
863 target_addr: target.address,
864 updates,
865 };
866
867 if let Ok(bytes) = serialize_message(&ping_req) {
868 let _ = socket.send_to(&bytes, node.address).await;
869 }
870 }
871
872 tokio::time::sleep(Duration::from_millis(ctx.config.indirect_ping_timeout_ms)).await;
874
875 let still_pending = {
877 let pending = ctx.pending_pings.read().await;
878 pending.values().any(|p| p.target == target_id)
879 };
880
881 if still_pending {
882 let mut members_guard = ctx.members.write().await;
883 if let Some(member) = members_guard.get_mut(target_id) {
884 if member.state == MemberState::Alive {
885 member.state = MemberState::Suspect;
886 member.suspect_time_ms = Some(current_time_ms());
887 warn!(node_id = %target_id, "Node marked as suspect");
888 }
889 }
890 }
891
892 let mut pending = ctx.pending_pings.write().await;
894 pending.retain(|_, p| p.target != target_id);
895 }
896
897 async fn check_suspicions(
899 members: &RwLock<HashMap<String, GossipMember>>,
900 config: &GossipConfig,
901 event_tx: &mpsc::Sender<GossipEvent>,
902 ) {
903 let now = current_time_ms();
904 let members_guard = members.read().await;
905 let member_count = members_guard.len().max(1);
906
907 let calculated_timeout = (config.suspicion_mult as u64)
910 * config.protocol_period_ms
911 * ((member_count as f64 + 1.0).ln().ceil() as u64).max(1);
912 let suspicion_timeout_ms = calculated_timeout.max(config.min_suspicion_timeout_ms);
913
914 let suspects: Vec<_> = members_guard
915 .values()
916 .filter(|m| {
917 m.state == MemberState::Suspect
918 && m.suspect_time_ms
919 .is_some_and(|t| now - t > suspicion_timeout_ms)
920 })
921 .map(|m| m.node_id.clone())
922 .collect();
923
924 drop(members_guard);
925
926 for node_id in suspects {
928 let mut members_guard = members.write().await;
929 if let Some(member) = members_guard.get_mut(&node_id) {
930 member.state = MemberState::Dead;
931 error!(node_id = %node_id, "Node marked as dead");
932 let _ = event_tx.send(GossipEvent::NodeFailed(node_id)).await;
933 }
934 }
935 }
936
937 async fn apply_updates(
939 members: &RwLock<HashMap<String, GossipMember>>,
940 updates: &[MemberStateUpdate],
941 _config: &GossipConfig,
942 event_tx: &mpsc::Sender<GossipEvent>,
943 ) {
944 let mut members_guard = members.write().await;
945
946 for update in updates {
947 if let Some(member) = members_guard.get_mut(&update.node_id) {
948 if update.incarnation > member.incarnation
960 || (update.incarnation == member.incarnation
961 && update.state as u8 > member.state as u8)
962 {
963 let old_state = member.state;
964 member.state = update.state;
965 member.incarnation = update.incarnation;
966 member.last_updated_ms = current_time_ms();
967
968 if update.state == MemberState::Suspect {
969 member.suspect_time_ms = Some(current_time_ms());
970 }
971
972 if let Some(addr) = update.address {
973 member.address = addr;
974 }
975 if let Some(ref api_addr) = update.api_address {
976 member.api_address = api_addr.clone();
977 }
978 if let Some(role) = update.role {
979 member.role = role;
980 }
981
982 if old_state != update.state {
984 match update.state {
985 MemberState::Dead => {
986 let _ = event_tx
987 .send(GossipEvent::NodeFailed(update.node_id.clone()))
988 .await;
989 }
990 MemberState::Left => {
991 let _ = event_tx
992 .send(GossipEvent::NodeLeft(update.node_id.clone()))
993 .await;
994 }
995 MemberState::Alive
996 if matches!(
997 old_state,
998 MemberState::Suspect | MemberState::Dead
999 ) =>
1000 {
1001 let _ = event_tx
1002 .send(GossipEvent::NodeRecovered(update.node_id.clone()))
1003 .await;
1004 }
1005 _ => {}
1006 }
1007 }
1008 }
1009 } else if update.state == MemberState::Alive {
1010 let member = GossipMember {
1012 node_id: update.node_id.clone(),
1013 address: update.address.unwrap_or_else(|| {
1014 "0.0.0.0:0"
1015 .parse()
1016 .unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], 0)))
1017 }),
1018 api_address: update.api_address.clone().unwrap_or_default(),
1019 role: update.role.unwrap_or(NodeRole::Replica),
1020 state: MemberState::Alive,
1021 incarnation: update.incarnation,
1022 last_updated_ms: current_time_ms(),
1023 suspect_time_ms: None,
1024 metadata: HashMap::new(),
1025 };
1026 members_guard.insert(update.node_id.clone(), member.clone());
1027 let _ = event_tx.send(GossipEvent::NodeJoined(member)).await;
1028 }
1029 }
1030 }
1031
1032 async fn get_updates(
1034 queue: &RwLock<VecDeque<MemberStateUpdate>>,
1035 max_count: usize,
1036 ) -> Vec<MemberStateUpdate> {
1037 let mut queue_guard = queue.write().await;
1038 let mut updates = Vec::with_capacity(max_count);
1039
1040 for _ in 0..max_count {
1041 if let Some(update) = queue_guard.pop_front() {
1042 updates.push(update);
1043 } else {
1044 break;
1045 }
1046 }
1047
1048 updates
1049 }
1050}
1051
1052#[derive(Debug, thiserror::Error)]
1054pub enum GossipError {
1055 #[error("Gossip protocol already running")]
1056 AlreadyRunning,
1057 #[error("Failed to bind socket: {0}")]
1058 BindError(String),
1059 #[error("Serialization error: {0}")]
1060 SerializationError(String),
1061 #[error("Not found")]
1062 NotFound,
1063}
1064
1065fn current_time_ms() -> u64 {
1068 std::time::SystemTime::now()
1069 .duration_since(std::time::UNIX_EPOCH)
1070 .unwrap_or(Duration::ZERO)
1071 .as_millis() as u64
1072}
1073
1074fn serialize_message(msg: &GossipMessage) -> Result<Vec<u8>, GossipError> {
1075 serde_json::to_vec(msg).map_err(|e| GossipError::SerializationError(e.to_string()))
1076}
1077
1078fn deserialize_message(data: &[u8]) -> Result<GossipMessage, GossipError> {
1079 serde_json::from_slice(data).map_err(|e| GossipError::SerializationError(e.to_string()))
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084 use super::*;
1085
1086 #[test]
1087 fn test_gossip_member_creation() {
1088 let member = GossipMember::new(
1089 "node-1".to_string(),
1090 "127.0.0.1:7947".parse().unwrap(),
1091 "127.0.0.1:3000".to_string(),
1092 NodeRole::Primary,
1093 );
1094
1095 assert_eq!(member.node_id, "node-1");
1096 assert_eq!(member.state, MemberState::Alive);
1097 assert_eq!(member.incarnation, 0);
1098 }
1099
1100 #[test]
1101 fn test_member_to_node_info() {
1102 let member = GossipMember::new(
1103 "node-1".to_string(),
1104 "127.0.0.1:7947".parse().unwrap(),
1105 "127.0.0.1:3000".to_string(),
1106 NodeRole::Primary,
1107 );
1108
1109 let node_info = member.to_node_info();
1110 assert_eq!(node_info.node_id, "node-1");
1111 assert_eq!(node_info.address, "127.0.0.1:3000");
1112 assert_eq!(node_info.role, NodeRole::Primary);
1113 assert_eq!(node_info.health.status, NodeStatus::Healthy);
1114 }
1115
1116 #[test]
1117 fn test_message_serialization() {
1118 let msg = GossipMessage::Ping {
1119 seq_no: 1,
1120 from: "node-1".to_string(),
1121 updates: vec![],
1122 };
1123
1124 let bytes = serialize_message(&msg).unwrap();
1125 let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1126
1127 match deserialized {
1128 GossipMessage::Ping { seq_no, from, .. } => {
1129 assert_eq!(seq_no, 1);
1130 assert_eq!(from, "node-1");
1131 }
1132 _ => panic!("Wrong message type"),
1133 }
1134 }
1135
1136 #[test]
1137 fn test_gossip_config_defaults() {
1138 let config = GossipConfig::default();
1139
1140 assert_eq!(config.protocol_period_ms, 1000);
1141 assert_eq!(config.fanout, 3);
1142 assert_eq!(config.ping_timeout_ms, 500);
1143 assert_eq!(config.indirect_probes, 3);
1144 assert_eq!(config.gossip_port, 7947);
1145 }
1146
1147 #[test]
1148 fn test_member_state_update() {
1149 let update = MemberStateUpdate {
1150 node_id: "node-1".to_string(),
1151 state: MemberState::Suspect,
1152 incarnation: 5,
1153 address: Some("127.0.0.1:7947".parse().unwrap()),
1154 api_address: Some("127.0.0.1:3000".to_string()),
1155 role: Some(NodeRole::Replica),
1156 };
1157
1158 assert_eq!(update.node_id, "node-1");
1159 assert_eq!(update.state, MemberState::Suspect);
1160 assert_eq!(update.incarnation, 5);
1161 }
1162
1163 #[tokio::test]
1164 async fn test_gossip_protocol_creation() {
1165 let config = GossipConfig::default();
1166 let member = GossipMember::new(
1167 "node-1".to_string(),
1168 "127.0.0.1:7947".parse().unwrap(),
1169 "127.0.0.1:3000".to_string(),
1170 NodeRole::Primary,
1171 );
1172
1173 let (tx, _rx) = mpsc::channel(100);
1174 let protocol = GossipProtocol::new(config, member, tx);
1175
1176 assert!(protocol.get_members().await.is_empty());
1177 }
1178
1179 #[test]
1180 fn test_gossip_error_display() {
1181 let err = GossipError::AlreadyRunning;
1182 assert!(err.to_string().contains("already running"));
1183
1184 let err = GossipError::BindError("address in use".to_string());
1185 assert!(err.to_string().contains("address in use"));
1186
1187 let err = GossipError::SerializationError("invalid json".to_string());
1188 assert!(err.to_string().contains("invalid json"));
1189
1190 let err = GossipError::NotFound;
1191 assert!(err.to_string().contains("Not found"));
1192 }
1193
1194 #[test]
1195 fn test_member_state_transitions() {
1196 assert_eq!(MemberState::Alive as u8, 0);
1198 assert_eq!(MemberState::Suspect as u8, 1);
1199 assert_eq!(MemberState::Dead as u8, 2);
1200 assert_eq!(MemberState::Left as u8, 3);
1201 }
1202
1203 #[test]
1204 fn test_gossip_message_variants() {
1205 let ping = GossipMessage::Ping {
1207 seq_no: 42,
1208 from: "node-1".to_string(),
1209 updates: vec![],
1210 };
1211 let bytes = serialize_message(&ping).unwrap();
1212 let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1213 match deserialized {
1214 GossipMessage::Ping { seq_no, from, .. } => {
1215 assert_eq!(seq_no, 42);
1216 assert_eq!(from, "node-1");
1217 }
1218 _ => panic!("Expected Ping"),
1219 }
1220
1221 let ack = GossipMessage::Ack {
1223 seq_no: 42,
1224 from: "node-2".to_string(),
1225 updates: vec![],
1226 };
1227 let bytes = serialize_message(&ack).unwrap();
1228 let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1229 match deserialized {
1230 GossipMessage::Ack { seq_no, from, .. } => {
1231 assert_eq!(seq_no, 42);
1232 assert_eq!(from, "node-2");
1233 }
1234 _ => panic!("Expected Ack"),
1235 }
1236
1237 let ping_req = GossipMessage::PingReq {
1239 seq_no: 100,
1240 from: "node-1".to_string(),
1241 target: "node-3".to_string(),
1242 target_addr: "127.0.0.1:7949".parse().unwrap(),
1243 updates: vec![],
1244 };
1245 let bytes = serialize_message(&ping_req).unwrap();
1246 let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1247 match deserialized {
1248 GossipMessage::PingReq {
1249 seq_no,
1250 from,
1251 target,
1252 ..
1253 } => {
1254 assert_eq!(seq_no, 100);
1255 assert_eq!(from, "node-1");
1256 assert_eq!(target, "node-3");
1257 }
1258 _ => panic!("Expected PingReq"),
1259 }
1260 }
1261
1262 #[test]
1263 fn test_gossip_message_with_updates() {
1264 let update = MemberStateUpdate {
1265 node_id: "node-2".to_string(),
1266 state: MemberState::Alive,
1267 incarnation: 1,
1268 address: Some("127.0.0.1:7948".parse().unwrap()),
1269 api_address: Some("127.0.0.1:3001".to_string()),
1270 role: Some(NodeRole::Replica),
1271 };
1272
1273 let ping = GossipMessage::Ping {
1274 seq_no: 1,
1275 from: "node-1".to_string(),
1276 updates: vec![update],
1277 };
1278
1279 let bytes = serialize_message(&ping).unwrap();
1280 let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
1281
1282 match deserialized {
1283 GossipMessage::Ping { updates, .. } => {
1284 assert_eq!(updates.len(), 1);
1285 assert_eq!(updates[0].node_id, "node-2");
1286 assert_eq!(updates[0].state, MemberState::Alive);
1287 assert_eq!(updates[0].incarnation, 1);
1288 }
1289 _ => panic!("Expected Ping"),
1290 }
1291 }
1292
1293 #[test]
1294 fn test_gossip_config_custom() {
1295 let config = GossipConfig {
1296 protocol_period_ms: 500,
1297 fanout: 5,
1298 ping_timeout_ms: 250,
1299 indirect_probes: 2,
1300 indirect_ping_timeout_ms: 750,
1301 suspicion_mult: 3,
1302 min_suspicion_timeout_ms: 15_000,
1303 rejoin_interval_rounds: 20,
1304 dead_node_gc_timeout_ms: 120_000,
1305 max_gossip_messages: 15,
1306 gossip_port: 8000,
1307 max_packet_size: 32768,
1308 seed_nodes: vec!["127.0.0.1:8001".to_string(), "127.0.0.1:8002".to_string()],
1309 };
1310
1311 assert_eq!(config.protocol_period_ms, 500);
1312 assert_eq!(config.fanout, 5);
1313 assert_eq!(config.ping_timeout_ms, 250);
1314 assert_eq!(config.indirect_probes, 2);
1315 assert_eq!(config.indirect_ping_timeout_ms, 750);
1316 assert_eq!(config.suspicion_mult, 3);
1317 assert_eq!(config.max_gossip_messages, 15);
1318 assert_eq!(config.gossip_port, 8000);
1319 assert_eq!(config.max_packet_size, 32768);
1320 assert_eq!(config.seed_nodes.len(), 2);
1321 }
1322
1323 #[tokio::test]
1324 async fn test_gossip_protocol_metadata() {
1325 let config = GossipConfig::default();
1326 let member = GossipMember::new(
1327 "node-1".to_string(),
1328 "127.0.0.1:7947".parse().unwrap(),
1329 "127.0.0.1:3000".to_string(),
1330 NodeRole::Primary,
1331 );
1332
1333 let (tx, _rx) = mpsc::channel(100);
1334 let protocol = GossipProtocol::new(config, member, tx);
1335
1336 protocol
1338 .update_metadata("key1".to_string(), "value1".to_string())
1339 .await;
1340 protocol
1341 .update_metadata("key2".to_string(), "value2".to_string())
1342 .await;
1343
1344 }
1347
1348 #[test]
1349 fn test_gossip_member_clone() {
1350 let member = GossipMember::new(
1351 "node-1".to_string(),
1352 "127.0.0.1:7947".parse().unwrap(),
1353 "127.0.0.1:3000".to_string(),
1354 NodeRole::Primary,
1355 );
1356
1357 let cloned = member.clone();
1358 assert_eq!(cloned.node_id, member.node_id);
1359 assert_eq!(cloned.address, member.address);
1360 assert_eq!(cloned.api_address, member.api_address);
1361 assert_eq!(cloned.role, member.role);
1362 assert_eq!(cloned.state, member.state);
1363 assert_eq!(cloned.incarnation, member.incarnation);
1364 }
1365
1366 #[test]
1367 fn test_gossip_event_variants() {
1368 let member = GossipMember::new(
1370 "node-1".to_string(),
1371 "127.0.0.1:7947".parse().unwrap(),
1372 "127.0.0.1:3000".to_string(),
1373 NodeRole::Primary,
1374 );
1375 let event = GossipEvent::NodeJoined(member);
1376 match event {
1377 GossipEvent::NodeJoined(m) => assert_eq!(m.node_id, "node-1"),
1378 _ => panic!("Expected NodeJoined"),
1379 }
1380
1381 let event = GossipEvent::NodeLeft("node-2".to_string());
1383 match event {
1384 GossipEvent::NodeLeft(id) => assert_eq!(id, "node-2"),
1385 _ => panic!("Expected NodeLeft"),
1386 }
1387
1388 let event = GossipEvent::NodeFailed("node-3".to_string());
1390 match event {
1391 GossipEvent::NodeFailed(id) => assert_eq!(id, "node-3"),
1392 _ => panic!("Expected NodeFailed"),
1393 }
1394
1395 let event = GossipEvent::NodeRecovered("node-4".to_string());
1397 match event {
1398 GossipEvent::NodeRecovered(id) => assert_eq!(id, "node-4"),
1399 _ => panic!("Expected NodeRecovered"),
1400 }
1401 }
1402}