1use std::collections::{HashMap, VecDeque};
16use std::net::SocketAddr;
17use std::time::{Duration, Instant};
18
19use rand::prelude::IndexedRandom;
20use tokio::sync::mpsc;
21use tracing::{debug, info, trace, warn};
22
23use crate::message::{GossipMessage, MemberInfo, NodeUpdate};
24use crate::{NodeId, SlotRange};
25
26const MAX_INCARNATION: u64 = u64::MAX / 2;
30
31const MAX_INCARNATION_JUMP: u64 = 1000;
36
37#[derive(Debug, Clone)]
39pub struct GossipConfig {
40 pub protocol_period: Duration,
42 pub probe_timeout: Duration,
44 pub suspicion_mult: u32,
46 pub indirect_probes: usize,
48 pub max_piggyback: usize,
50 pub gossip_port_offset: u16,
52}
53
54impl Default for GossipConfig {
55 fn default() -> Self {
56 Self {
57 protocol_period: Duration::from_secs(1),
58 probe_timeout: Duration::from_millis(500),
59 suspicion_mult: 5,
60 indirect_probes: 3,
61 max_piggyback: 10,
62 gossip_port_offset: 10000,
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct MemberState {
70 pub id: NodeId,
71 pub addr: SocketAddr,
72 pub incarnation: u64,
73 pub state: MemberStatus,
74 pub state_change: Instant,
75 pub is_primary: bool,
76 pub replicates: Option<NodeId>,
78 pub slots: Vec<SlotRange>,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum MemberStatus {
84 Alive,
85 Suspect,
86 Dead,
87 Left,
88}
89
90#[derive(Debug, Clone)]
92pub enum GossipEvent {
93 MemberJoined(NodeId, SocketAddr, Vec<SlotRange>),
95 MemberSuspected(NodeId),
97 MemberFailed(NodeId),
99 MemberLeft(NodeId),
101 MemberAlive(NodeId),
103 SlotsChanged(NodeId, Vec<SlotRange>),
105 RoleChanged(NodeId, bool, Option<NodeId>),
107 VoteRequested {
109 candidate: NodeId,
110 epoch: u64,
111 offset: u64,
113 },
114 VoteGranted {
116 from: NodeId,
117 candidate: NodeId,
118 epoch: u64,
119 },
120}
121
122pub struct GossipEngine {
124 local_id: NodeId,
126 local_addr: SocketAddr,
128 incarnation: u64,
130 config: GossipConfig,
132 members: HashMap<NodeId, MemberState>,
134 pending_updates: VecDeque<NodeUpdate>,
138 next_seq: u64,
140 pending_probes: HashMap<u64, PendingProbe>,
142 event_tx: mpsc::Sender<GossipEvent>,
144 local_slots: Vec<SlotRange>,
146 relay_pending: HashMap<u64, RelayEntry>,
148}
149
150struct PendingProbe {
151 target: NodeId,
152 sent_at: Instant,
153 indirect: bool,
154}
155
156struct RelayEntry {
161 requester: SocketAddr,
162 original_seq: u64,
163 sent_at: Instant,
164}
165
166impl GossipEngine {
167 pub fn new(
169 local_id: NodeId,
170 local_addr: SocketAddr,
171 config: GossipConfig,
172 event_tx: mpsc::Sender<GossipEvent>,
173 ) -> Self {
174 Self {
175 local_id,
176 local_addr,
177 incarnation: 1,
178 config,
179 members: HashMap::new(),
180 pending_updates: VecDeque::new(),
181 next_seq: 1,
182 pending_probes: HashMap::new(),
183 event_tx,
184 local_slots: Vec::new(),
185 relay_pending: HashMap::new(),
186 }
187 }
188
189 pub fn local_id(&self) -> NodeId {
191 self.local_id
192 }
193
194 pub fn local_incarnation(&self) -> u64 {
196 self.incarnation
197 }
198
199 pub fn set_incarnation(&mut self, n: u64) {
204 self.incarnation = n;
205 }
206
207 pub fn members(&self) -> impl Iterator<Item = &MemberState> {
209 self.members.values()
210 }
211
212 pub fn alive_member_addrs(&self) -> Vec<std::net::SocketAddr> {
214 self.members
215 .values()
216 .filter(|m| m.state == MemberStatus::Alive || m.state == MemberStatus::Suspect)
217 .map(|m| m.addr)
218 .collect()
219 }
220
221 pub fn alive_count(&self) -> usize {
223 self.members
224 .values()
225 .filter(|m| m.state == MemberStatus::Alive)
226 .count()
227 }
228
229 pub fn set_local_slots(&mut self, slots: Vec<SlotRange>) {
235 self.local_slots = slots;
236 }
237
238 pub fn queue_slots_update(&mut self, node: NodeId, incarnation: u64, slots: Vec<SlotRange>) {
243 self.queue_update(NodeUpdate::SlotsChanged {
244 node,
245 incarnation,
246 slots,
247 });
248 }
249
250 pub fn queue_role_update(
255 &mut self,
256 node: NodeId,
257 incarnation: u64,
258 is_primary: bool,
259 replicates: Option<NodeId>,
260 ) {
261 self.queue_update(NodeUpdate::RoleChanged {
262 node,
263 incarnation,
264 is_primary,
265 replicates,
266 });
267 }
268
269 pub fn queue_vote_request(&mut self, candidate: NodeId, epoch: u64, offset: u64) {
274 self.queue_update(NodeUpdate::VoteRequest {
275 candidate,
276 epoch,
277 offset,
278 });
279 }
280
281 pub fn queue_vote_granted(&mut self, from: NodeId, candidate: NodeId, epoch: u64) {
286 self.queue_update(NodeUpdate::VoteGranted {
287 from,
288 candidate,
289 epoch,
290 });
291 }
292
293 pub fn add_seed(&mut self, id: NodeId, addr: SocketAddr) {
295 if id == self.local_id {
296 return;
297 }
298 self.members.entry(id).or_insert_with(|| MemberState {
299 id,
300 addr,
301 incarnation: 0,
302 state: MemberStatus::Alive,
303 state_change: Instant::now(),
304 is_primary: false,
305 replicates: None,
306 slots: Vec::new(),
307 });
308 }
309
310 pub async fn handle_message(
317 &mut self,
318 msg: GossipMessage,
319 from: SocketAddr,
320 ) -> Vec<(SocketAddr, GossipMessage)> {
321 match msg {
322 GossipMessage::Ping {
323 seq,
324 sender,
325 updates,
326 } => {
327 trace!("received ping seq={} from {}", seq, sender);
328 self.apply_updates(&updates).await;
329 self.ensure_member(sender, from);
330
331 let response_updates = self.collect_updates();
333 vec![(
334 from,
335 GossipMessage::Ack {
336 seq,
337 sender: self.local_id,
338 updates: response_updates,
339 },
340 )]
341 }
342
343 GossipMessage::PingReq {
344 seq,
345 sender,
346 target,
347 target_addr,
348 } => {
349 trace!(
350 "received ping-req seq={} from {} for {}",
351 seq,
352 sender,
353 target
354 );
355 self.ensure_member(sender, from);
356
357 let relay_seq = self.next_seq;
359 self.next_seq += 1;
360
361 self.relay_pending.insert(
362 relay_seq,
363 RelayEntry {
364 requester: from,
365 original_seq: seq,
366 sent_at: Instant::now(),
367 },
368 );
369
370 vec![(
371 target_addr,
372 GossipMessage::Ping {
373 seq: relay_seq,
374 sender: self.local_id,
375 updates: vec![],
376 },
377 )]
378 }
379
380 GossipMessage::Ack {
381 seq,
382 sender,
383 updates,
384 } => {
385 trace!("received ack seq={} from {}", seq, sender);
386 self.apply_updates(&updates).await;
387 self.ensure_member(sender, from);
388
389 let mut outgoing = Vec::new();
390
391 if let Some(probe) = self.pending_probes.remove(&seq) {
393 if self.members.get(&probe.target).map(|m| m.state)
394 == Some(MemberStatus::Suspect)
395 {
396 self.mark_alive(probe.target).await;
398 }
399 }
400
401 if let Some(relay) = self.relay_pending.remove(&seq) {
403 outgoing.push((
404 relay.requester,
405 GossipMessage::Ack {
406 seq: relay.original_seq,
407 sender: self.local_id,
408 updates: vec![],
409 },
410 ));
411 }
412
413 outgoing
414 }
415
416 GossipMessage::Join {
417 sender,
418 sender_addr,
419 } => {
420 info!("node {} joining from {}", sender, sender_addr);
421 let sender_is_new = !self.members.contains_key(&sender);
422 self.ensure_member(sender, sender_addr);
423 if sender_is_new {
424 self.emit(GossipEvent::MemberJoined(sender, sender_addr, Vec::new()))
425 .await;
426 }
427
428 self.queue_update(NodeUpdate::Alive {
430 node: sender,
431 addr: sender_addr,
432 incarnation: 1,
433 });
434
435 let mut members: Vec<MemberInfo> = self
437 .members
438 .values()
439 .filter(|m| m.state == MemberStatus::Alive)
440 .map(|m| MemberInfo {
441 id: m.id,
442 addr: m.addr,
443 incarnation: m.incarnation,
444 is_primary: m.is_primary,
445 slots: m.slots.clone(),
446 })
447 .collect();
448
449 members.push(MemberInfo {
451 id: self.local_id,
452 addr: self.local_addr,
453 incarnation: self.incarnation,
454 is_primary: true,
455 slots: self.local_slots.clone(),
456 });
457
458 vec![(
459 from,
460 GossipMessage::Welcome {
461 sender: self.local_id,
462 members,
463 },
464 )]
465 }
466
467 GossipMessage::Welcome { sender, members } => {
468 info!(
469 "received welcome from {} with {} members",
470 sender,
471 members.len()
472 );
473 let sender_is_new = !self.members.contains_key(&sender);
474 self.ensure_member(sender, from);
475 if sender_is_new {
476 let sender_slots = self
477 .members
478 .get(&sender)
479 .map(|m| m.slots.clone())
480 .unwrap_or_default();
481 self.emit(GossipEvent::MemberJoined(sender, from, sender_slots))
482 .await;
483 }
484
485 for member in members {
486 if member.id == self.local_id {
487 continue;
488 }
489 if let std::collections::hash_map::Entry::Vacant(e) =
490 self.members.entry(member.id)
491 {
492 let slots = member.slots.clone();
493 e.insert(MemberState {
494 id: member.id,
495 addr: member.addr,
496 incarnation: member.incarnation,
497 state: MemberStatus::Alive,
498 state_change: Instant::now(),
499 is_primary: member.is_primary,
500 replicates: None,
501 slots: slots.clone(),
502 });
503 self.emit(GossipEvent::MemberJoined(member.id, member.addr, slots))
504 .await;
505 }
506 }
507 vec![]
508 }
509
510 GossipMessage::SlotsAnnounce {
511 sender,
512 incarnation,
513 slots,
514 } => {
515 self.ensure_member(sender, from);
517 self.apply_updates(&[NodeUpdate::SlotsChanged {
518 node: sender,
519 incarnation,
520 slots,
521 }])
522 .await;
523 vec![]
524 }
525 }
526 }
527
528 pub fn tick(&mut self) -> Vec<(SocketAddr, GossipMessage)> {
533 let mut outgoing = Vec::new();
534
535 outgoing.extend(self.check_probe_timeouts());
537
538 self.check_suspicion_timeouts();
540
541 self.cleanup_stale_relays();
543
544 let target_info = {
546 let alive_members: Vec<_> = self
547 .members
548 .values()
549 .filter(|m| m.state == MemberStatus::Alive || m.state == MemberStatus::Suspect)
550 .map(|m| (m.id, m.addr))
551 .collect();
552
553 if alive_members.is_empty() {
554 return outgoing;
555 }
556
557 match alive_members.choose(&mut rand::rng()) {
558 Some(info) => *info,
559 None => return outgoing,
560 }
561 };
562
563 let (target_id, target_addr) = target_info;
564 let seq = self.next_seq;
565 self.next_seq += 1;
566
567 let mut updates = self.collect_updates();
568
569 if !self.local_slots.is_empty() {
572 updates.push(NodeUpdate::SlotsChanged {
573 node: self.local_id,
574 incarnation: self.incarnation,
575 slots: self.local_slots.clone(),
576 });
577 }
578
579 let msg = GossipMessage::Ping {
580 seq,
581 sender: self.local_id,
582 updates,
583 };
584
585 self.pending_probes.insert(
586 seq,
587 PendingProbe {
588 target: target_id,
589 sent_at: Instant::now(),
590 indirect: false,
591 },
592 );
593
594 outgoing.push((target_addr, msg));
595 outgoing
596 }
597
598 pub fn create_join_message(&self) -> GossipMessage {
600 GossipMessage::Join {
601 sender: self.local_id,
602 sender_addr: self.local_addr,
603 }
604 }
605
606 fn ensure_member(&mut self, id: NodeId, addr: SocketAddr) {
607 if id == self.local_id {
608 return;
609 }
610 self.members.entry(id).or_insert_with(|| MemberState {
611 id,
612 addr,
613 incarnation: 0,
614 state: MemberStatus::Alive,
615 state_change: Instant::now(),
616 is_primary: false,
617 replicates: None,
618 slots: Vec::new(),
619 });
620 }
621
622 async fn emit(&self, event: GossipEvent) {
628 if self.event_tx.send(event).await.is_err() {
629 warn!("gossip event channel closed, dropping event");
630 }
631 }
632
633 async fn apply_updates(&mut self, updates: &[NodeUpdate]) {
634 for update in updates {
635 match update {
636 NodeUpdate::Alive {
637 node,
638 addr,
639 incarnation,
640 } => {
641 if *incarnation > MAX_INCARNATION {
642 warn!(
643 "rejecting alive update for {} with excessive incarnation {}",
644 node, incarnation
645 );
646 continue;
647 }
648 if *node == self.local_id {
649 continue;
651 }
652 if let Some(member) = self.members.get_mut(node) {
653 if *incarnation > member.incarnation {
654 let jump = incarnation - member.incarnation;
655 if jump > MAX_INCARNATION_JUMP {
656 warn!(
657 "rejecting alive update for {}: incarnation jump {} exceeds limit",
658 node, jump
659 );
660 continue;
661 }
662 member.incarnation = *incarnation;
663 member.addr = *addr;
664 if member.state != MemberStatus::Alive {
665 member.state = MemberStatus::Alive;
666 member.state_change = Instant::now();
667 self.emit(GossipEvent::MemberAlive(*node)).await;
668 }
669 }
670 } else {
671 self.members.insert(
672 *node,
673 MemberState {
674 id: *node,
675 addr: *addr,
676 incarnation: *incarnation,
677 state: MemberStatus::Alive,
678 state_change: Instant::now(),
679 is_primary: false,
680 replicates: None,
681 slots: Vec::new(),
682 },
683 );
684 self.emit(GossipEvent::MemberJoined(*node, *addr, Vec::new()))
685 .await;
686 }
687 }
688
689 NodeUpdate::Suspect { node, incarnation } => {
690 if *incarnation > MAX_INCARNATION {
691 warn!(
692 "rejecting suspect update for {} with excessive incarnation {}",
693 node, incarnation
694 );
695 continue;
696 }
697 if *node == self.local_id {
698 if *incarnation >= self.incarnation {
700 self.incarnation = incarnation.saturating_add(1);
701 self.queue_update(NodeUpdate::Alive {
702 node: self.local_id,
703 addr: self.local_addr,
704 incarnation: self.incarnation,
705 });
706 }
707 continue;
708 }
709 if let Some(member) = self.members.get_mut(node) {
710 if *incarnation >= member.incarnation && member.state == MemberStatus::Alive
711 {
712 member.state = MemberStatus::Suspect;
713 member.state_change = Instant::now();
714 self.emit(GossipEvent::MemberSuspected(*node)).await;
715 }
716 }
717 }
718
719 NodeUpdate::Dead { node, incarnation } => {
720 if *incarnation > MAX_INCARNATION {
721 warn!(
722 "rejecting dead update for {} with excessive incarnation {}",
723 node, incarnation
724 );
725 continue;
726 }
727 if *node == self.local_id {
728 self.incarnation = incarnation.saturating_add(1);
730 self.queue_update(NodeUpdate::Alive {
731 node: self.local_id,
732 addr: self.local_addr,
733 incarnation: self.incarnation,
734 });
735 continue;
736 }
737 if let Some(member) = self.members.get_mut(node) {
738 if *incarnation >= member.incarnation && member.state != MemberStatus::Dead
739 {
740 member.state = MemberStatus::Dead;
741 member.state_change = Instant::now();
742 self.emit(GossipEvent::MemberFailed(*node)).await;
743 }
744 }
745 }
746
747 NodeUpdate::Left { node } => {
748 if *node == self.local_id {
749 continue;
750 }
751 if let Some(member) = self.members.get_mut(node) {
752 if member.state != MemberStatus::Left {
753 member.state = MemberStatus::Left;
754 member.state_change = Instant::now();
755 self.emit(GossipEvent::MemberLeft(*node)).await;
756 }
757 }
758 }
759
760 NodeUpdate::SlotsChanged {
761 node,
762 incarnation,
763 slots,
764 } => {
765 if *incarnation > MAX_INCARNATION {
766 warn!(
767 "rejecting slots update for {} with excessive incarnation {}",
768 node, incarnation
769 );
770 continue;
771 }
772 if *node == self.local_id {
773 continue;
774 }
775 let owned = slots.clone();
778 let should_emit = if let Some(member) = self.members.get_mut(node) {
779 if *incarnation >= member.incarnation {
780 member.slots = owned.clone();
781 true
782 } else {
783 false
784 }
785 } else {
786 false
787 };
788 if should_emit {
789 self.emit(GossipEvent::SlotsChanged(*node, owned)).await;
790 }
791 }
792
793 NodeUpdate::RoleChanged {
794 node,
795 incarnation,
796 is_primary,
797 replicates,
798 } => {
799 if *incarnation > MAX_INCARNATION {
800 warn!(
801 "rejecting role update for {} with excessive incarnation {}",
802 node, incarnation
803 );
804 continue;
805 }
806 if *node == self.local_id {
807 continue;
809 }
810 if let Some(member) = self.members.get_mut(node) {
811 if *incarnation > member.incarnation {
812 member.incarnation = *incarnation;
813 member.is_primary = *is_primary;
814 member.replicates = *replicates;
815 self.emit(GossipEvent::RoleChanged(*node, *is_primary, *replicates))
816 .await;
817 }
818 }
819 }
820
821 NodeUpdate::VoteRequest {
822 candidate,
823 epoch,
824 offset,
825 } => {
826 if *candidate != self.local_id {
829 self.emit(GossipEvent::VoteRequested {
830 candidate: *candidate,
831 epoch: *epoch,
832 offset: *offset,
833 })
834 .await;
835 }
836 }
837
838 NodeUpdate::VoteGranted {
839 from,
840 candidate,
841 epoch,
842 } => {
843 self.emit(GossipEvent::VoteGranted {
844 from: *from,
845 candidate: *candidate,
846 epoch: *epoch,
847 })
848 .await;
849 }
850 }
851 }
852 }
853
854 async fn mark_alive(&mut self, node: NodeId) {
855 if let Some(member) = self.members.get_mut(&node) {
856 if member.state == MemberStatus::Suspect {
857 member.state = MemberStatus::Alive;
858 member.state_change = Instant::now();
859 self.emit(GossipEvent::MemberAlive(node)).await;
860 }
861 }
862 }
863
864 fn check_probe_timeouts(&mut self) -> Vec<(SocketAddr, GossipMessage)> {
872 let timeout = self.config.probe_timeout;
873 let now = Instant::now();
874 let mut outgoing = Vec::new();
875
876 let mut timed_out_indirect: Vec<(u64, NodeId)> = Vec::new();
878 let mut timed_out_direct: Vec<(u64, NodeId)> = Vec::new();
879 self.pending_probes.retain(|seq, probe| {
880 if now.duration_since(probe.sent_at) <= timeout {
881 return true;
882 }
883 if probe.indirect {
884 timed_out_indirect.push((*seq, probe.target));
885 } else {
886 timed_out_direct.push((*seq, probe.target));
887 }
888 false
889 });
890
891 for (_seq, target) in timed_out_indirect {
893 let incarnation = self
894 .members
895 .get(&target)
896 .filter(|m| m.state == MemberStatus::Alive)
897 .map(|m| m.incarnation);
898
899 if let Some(inc) = incarnation {
900 if let Some(member) = self.members.get_mut(&target) {
901 debug!("node {} failed indirect probe, marking suspect", target);
902 member.state = MemberStatus::Suspect;
903 member.state_change = Instant::now();
904 }
905 self.queue_update(NodeUpdate::Suspect {
906 node: target,
907 incarnation: inc,
908 });
909 }
910 }
911
912 for (_seq, target) in timed_out_direct {
914 let target_addr = match self.members.get(&target) {
915 Some(m) if m.state == MemberStatus::Alive => m.addr,
916 _ => continue,
917 };
918
919 let relay_nodes: Vec<(NodeId, SocketAddr)> = self
921 .members
922 .values()
923 .filter(|m| m.state == MemberStatus::Alive && m.id != target)
924 .map(|m| (m.id, m.addr))
925 .collect();
926
927 if relay_nodes.is_empty() {
928 let incarnation = self
930 .members
931 .get(&target)
932 .map(|m| m.incarnation)
933 .unwrap_or(0);
934
935 if let Some(member) = self.members.get_mut(&target) {
936 debug!("node {} timed out with no relays, marking suspect", target);
937 member.state = MemberStatus::Suspect;
938 member.state_change = Instant::now();
939 }
940 self.queue_update(NodeUpdate::Suspect {
941 node: target,
942 incarnation,
943 });
944 continue;
945 }
946
947 let k = self.config.indirect_probes.min(relay_nodes.len());
948 let chosen: Vec<_> = relay_nodes
949 .choose_multiple(&mut rand::rng(), k)
950 .copied()
951 .collect();
952
953 debug!(
954 "node {} direct ping timed out, sending PingReq to {} relays",
955 target,
956 chosen.len()
957 );
958
959 let indirect_seq = self.next_seq;
961 self.next_seq += 1;
962 self.pending_probes.insert(
963 indirect_seq,
964 PendingProbe {
965 target,
966 sent_at: Instant::now(),
967 indirect: true,
968 },
969 );
970
971 for (_, relay_addr) in chosen {
972 outgoing.push((
973 relay_addr,
974 GossipMessage::PingReq {
975 seq: indirect_seq,
976 sender: self.local_id,
977 target,
978 target_addr,
979 },
980 ));
981 }
982 }
983
984 outgoing
985 }
986
987 fn cleanup_stale_relays(&mut self) {
992 let timeout = self.config.probe_timeout;
993 let now = Instant::now();
994 self.relay_pending
995 .retain(|_, entry| now.duration_since(entry.sent_at) <= timeout);
996 }
997
998 fn check_suspicion_timeouts(&mut self) {
999 let suspicion_timeout = self.config.protocol_period * self.config.suspicion_mult;
1000 let now = Instant::now();
1001 let mut to_mark_dead = Vec::new();
1002
1003 for member in self.members.values() {
1004 if member.state == MemberStatus::Suspect
1005 && now.duration_since(member.state_change) > suspicion_timeout
1006 {
1007 to_mark_dead.push((member.id, member.incarnation));
1008 }
1009 }
1010
1011 for (id, incarnation) in to_mark_dead {
1012 if let Some(member) = self.members.get_mut(&id) {
1013 warn!("node {} confirmed dead after suspicion timeout", id);
1014 member.state = MemberStatus::Dead;
1015 member.state_change = Instant::now();
1016 self.queue_update(NodeUpdate::Dead {
1017 node: id,
1018 incarnation,
1019 });
1020 }
1021 }
1022 }
1023
1024 fn queue_update(&mut self, update: NodeUpdate) {
1025 self.pending_updates.push_back(update);
1026 if self.pending_updates.len() > self.config.max_piggyback * 2 {
1031 let to_drop = self.pending_updates.len() - self.config.max_piggyback * 2;
1032 self.pending_updates.drain(..to_drop);
1033 }
1034 }
1035
1036 fn collect_updates(&mut self) -> Vec<NodeUpdate> {
1037 let count = self.pending_updates.len().min(self.config.max_piggyback);
1038 self.pending_updates.drain(..count).collect()
1039 }
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044 use super::*;
1045 use std::net::Ipv4Addr;
1046
1047 fn test_addr(port: u16) -> SocketAddr {
1048 SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), port))
1049 }
1050
1051 #[tokio::test]
1052 async fn engine_creation() {
1053 let (tx, _rx) = mpsc::channel(16);
1054 let engine = GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1055 assert_eq!(engine.alive_count(), 0);
1056 }
1057
1058 #[tokio::test]
1059 async fn add_seed() {
1060 let (tx, _rx) = mpsc::channel(16);
1061 let mut engine =
1062 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1063
1064 let seed_id = NodeId::new();
1065 engine.add_seed(seed_id, test_addr(6380));
1066 assert_eq!(engine.alive_count(), 1);
1067 }
1068
1069 #[tokio::test]
1070 async fn handle_ping() {
1071 let (tx, _rx) = mpsc::channel(16);
1072 let mut engine =
1073 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1074
1075 let sender = NodeId::new();
1076 let msg = GossipMessage::Ping {
1077 seq: 1,
1078 sender,
1079 updates: vec![],
1080 };
1081
1082 let responses = engine.handle_message(msg, test_addr(6380)).await;
1083 assert_eq!(responses.len(), 1);
1084 assert!(matches!(responses[0].1, GossipMessage::Ack { .. }));
1085 assert_eq!(responses[0].0.port(), 6380);
1086 assert_eq!(engine.alive_count(), 1);
1087 }
1088
1089 #[tokio::test]
1090 async fn handle_join() {
1091 let (tx, _rx) = mpsc::channel(16);
1092 let mut engine =
1093 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1094
1095 let joiner = NodeId::new();
1096 let msg = GossipMessage::Join {
1097 sender: joiner,
1098 sender_addr: test_addr(6380),
1099 };
1100
1101 let responses = engine.handle_message(msg, test_addr(6380)).await;
1102 assert_eq!(responses.len(), 1);
1103 assert!(matches!(responses[0].1, GossipMessage::Welcome { .. }));
1104 assert_eq!(engine.alive_count(), 1);
1105 }
1106
1107 #[tokio::test]
1108 async fn tick_with_no_members() {
1109 let (tx, _rx) = mpsc::channel(16);
1110 let mut engine =
1111 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1112
1113 let messages = engine.tick();
1114 assert!(messages.is_empty());
1115 }
1116
1117 #[tokio::test]
1118 async fn tick_with_members() {
1119 let (tx, _rx) = mpsc::channel(16);
1120 let mut engine =
1121 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1122
1123 engine.add_seed(NodeId::new(), test_addr(6380));
1124 let messages = engine.tick();
1125 assert_eq!(messages.len(), 1);
1126
1127 let (addr, msg) = &messages[0];
1128 assert_eq!(addr.port(), 6380);
1129 assert!(matches!(msg, GossipMessage::Ping { .. }));
1130 }
1131
1132 #[tokio::test]
1133 async fn create_join_message() {
1134 let (tx, _rx) = mpsc::channel(16);
1135 let id = NodeId::new();
1136 let addr = test_addr(6379);
1137 let engine = GossipEngine::new(id, addr, GossipConfig::default(), tx);
1138
1139 let msg = engine.create_join_message();
1140 match msg {
1141 GossipMessage::Join {
1142 sender,
1143 sender_addr,
1144 } => {
1145 assert_eq!(sender, id);
1146 assert_eq!(sender_addr, addr);
1147 }
1148 _ => panic!("expected Join message"),
1149 }
1150 }
1151
1152 #[tokio::test]
1153 async fn apply_slots_changed_updates_member() {
1154 let (tx, mut rx) = mpsc::channel(16);
1155 let mut engine =
1156 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1157
1158 let remote = NodeId::new();
1159 engine.add_seed(remote, test_addr(6380));
1160
1161 let slots = vec![SlotRange::new(0, 5460)];
1162 let updates = vec![NodeUpdate::SlotsChanged {
1163 node: remote,
1164 incarnation: 1,
1165 slots: slots.clone(),
1166 }];
1167
1168 let msg = GossipMessage::Ping {
1169 seq: 1,
1170 sender: remote,
1171 updates,
1172 };
1173 engine.handle_message(msg, test_addr(6380)).await;
1174
1175 let member = engine.members.get(&remote).unwrap();
1177 assert_eq!(member.slots, slots);
1178
1179 let event = rx.try_recv().unwrap();
1181 assert!(matches!(event, GossipEvent::SlotsChanged(id, _) if id == remote));
1182 }
1183
1184 #[tokio::test]
1185 async fn stale_slots_changed_ignored() {
1186 let (tx, mut rx) = mpsc::channel(16);
1187 let mut engine =
1188 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1189
1190 let remote = NodeId::new();
1191 engine.members.insert(
1193 remote,
1194 MemberState {
1195 id: remote,
1196 addr: test_addr(6380),
1197 incarnation: 5,
1198 state: MemberStatus::Alive,
1199 state_change: Instant::now(),
1200 is_primary: true,
1201 replicates: None,
1202 slots: vec![SlotRange::new(0, 5460)],
1203 },
1204 );
1205
1206 let msg = GossipMessage::Ping {
1208 seq: 1,
1209 sender: remote,
1210 updates: vec![NodeUpdate::SlotsChanged {
1211 node: remote,
1212 incarnation: 3, slots: vec![],
1214 }],
1215 };
1216 engine.handle_message(msg, test_addr(6380)).await;
1217
1218 let member = engine.members.get(&remote).unwrap();
1220 assert_eq!(member.slots.len(), 1);
1221
1222 assert!(rx.try_recv().is_err());
1224 }
1225
1226 #[tokio::test]
1227 async fn welcome_includes_local_slots() {
1228 let (tx, _rx) = mpsc::channel(16);
1229 let local_id = NodeId::new();
1230 let mut engine = GossipEngine::new(local_id, test_addr(6379), GossipConfig::default(), tx);
1231
1232 engine.set_local_slots(vec![SlotRange::new(0, 16383)]);
1233
1234 let joiner = NodeId::new();
1235 let msg = GossipMessage::Join {
1236 sender: joiner,
1237 sender_addr: test_addr(6380),
1238 };
1239
1240 let responses = engine.handle_message(msg, test_addr(6380)).await;
1241 assert_eq!(responses.len(), 1);
1242 match &responses[0].1 {
1243 GossipMessage::Welcome { members, .. } => {
1244 let local_member = members.iter().find(|m| m.id == local_id);
1245 assert!(local_member.is_some(), "welcome should include local node");
1246 assert_eq!(local_member.unwrap().slots, vec![SlotRange::new(0, 16383)]);
1247 }
1248 other => panic!("expected Welcome, got {other:?}"),
1249 }
1250 }
1251
1252 #[tokio::test]
1253 async fn welcome_propagates_member_slots() {
1254 let (tx, mut rx) = mpsc::channel(16);
1255 let mut engine =
1256 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1257
1258 let sender = NodeId::new();
1259 let member_id = NodeId::new();
1260 let slots = vec![SlotRange::new(0, 5460)];
1261
1262 let msg = GossipMessage::Welcome {
1263 sender,
1264 members: vec![MemberInfo {
1265 id: member_id,
1266 addr: test_addr(6381),
1267 incarnation: 1,
1268 is_primary: true,
1269 slots: slots.clone(),
1270 }],
1271 };
1272
1273 engine.handle_message(msg, test_addr(6380)).await;
1274
1275 let member = engine.members.get(&member_id).unwrap();
1277 assert_eq!(member.slots, slots);
1278
1279 let mut found = false;
1283 while let Ok(event) = rx.try_recv() {
1284 if let GossipEvent::MemberJoined(id, _, s) = event {
1285 if id == member_id {
1286 assert_eq!(s, slots);
1287 found = true;
1288 break;
1289 }
1290 }
1291 }
1292 assert!(found, "expected MemberJoined for member_id with slots");
1293 }
1294
1295 #[tokio::test]
1296 async fn direct_ping_timeout_sends_pingreq() {
1297 let (tx, _rx) = mpsc::channel(16);
1298 let config = GossipConfig {
1299 probe_timeout: Duration::from_millis(0), ..GossipConfig::default()
1301 };
1302 let mut engine = GossipEngine::new(NodeId::new(), test_addr(6379), config, tx);
1303
1304 let target = NodeId::new();
1305 let relay = NodeId::new();
1306 engine.add_seed(target, test_addr(6380));
1307 engine.add_seed(relay, test_addr(6381));
1308
1309 let messages = engine.tick();
1311 assert!(!messages.is_empty());
1313
1314 let messages = engine.tick();
1317
1318 let pingreqs: Vec<_> = messages
1320 .iter()
1321 .filter(|(_, msg)| matches!(msg, GossipMessage::PingReq { .. }))
1322 .collect();
1323
1324 assert!(
1327 !pingreqs.is_empty(),
1328 "expected PingReq after direct probe timeout"
1329 );
1330 }
1331
1332 #[tokio::test]
1333 async fn pingreq_handler_forwards_to_target() {
1334 let (tx, _rx) = mpsc::channel(16);
1335 let mut engine =
1336 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1337
1338 let requester = NodeId::new();
1339 let target = NodeId::new();
1340 let target_addr = test_addr(6381);
1341
1342 let msg = GossipMessage::PingReq {
1343 seq: 42,
1344 sender: requester,
1345 target,
1346 target_addr,
1347 };
1348
1349 let responses = engine.handle_message(msg, test_addr(6380)).await;
1350
1351 assert_eq!(responses.len(), 1);
1353 assert_eq!(responses[0].0, target_addr);
1354 assert!(matches!(responses[0].1, GossipMessage::Ping { .. }));
1355
1356 assert_eq!(engine.relay_pending.len(), 1);
1358 }
1359
1360 #[tokio::test]
1361 async fn relayed_ack_forwarded_to_requester() {
1362 let (tx, _rx) = mpsc::channel(16);
1363 let mut engine =
1364 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1365
1366 let requester = NodeId::new();
1367 let requester_addr = test_addr(6380);
1368 let target = NodeId::new();
1369 let target_addr = test_addr(6381);
1370
1371 let msg = GossipMessage::PingReq {
1373 seq: 42,
1374 sender: requester,
1375 target,
1376 target_addr,
1377 };
1378 let responses = engine.handle_message(msg, requester_addr).await;
1379 let relay_seq = match &responses[0].1 {
1380 GossipMessage::Ping { seq, .. } => *seq,
1381 other => panic!("expected Ping, got {other:?}"),
1382 };
1383
1384 let target_sender = NodeId::new();
1386 let ack = GossipMessage::Ack {
1387 seq: relay_seq,
1388 sender: target_sender,
1389 updates: vec![],
1390 };
1391 let responses = engine.handle_message(ack, target_addr).await;
1392
1393 assert_eq!(responses.len(), 1);
1395 assert_eq!(responses[0].0, requester_addr);
1396 match &responses[0].1 {
1397 GossipMessage::Ack { seq, .. } => assert_eq!(*seq, 42),
1398 other => panic!("expected Ack, got {other:?}"),
1399 }
1400
1401 assert!(engine.relay_pending.is_empty());
1403 }
1404
1405 #[tokio::test]
1406 async fn indirect_probe_timeout_marks_suspect() {
1407 let (tx, _rx) = mpsc::channel(16);
1408 let config = GossipConfig {
1409 probe_timeout: Duration::from_millis(0), indirect_probes: 1,
1411 ..GossipConfig::default()
1412 };
1413 let mut engine = GossipEngine::new(NodeId::new(), test_addr(6379), config, tx);
1414
1415 let target = NodeId::new();
1416 let relay = NodeId::new();
1417 engine.add_seed(target, test_addr(6380));
1418 engine.add_seed(relay, test_addr(6381));
1419
1420 engine.tick();
1422
1423 engine.tick();
1425
1426 engine.tick();
1428
1429 let suspect_count = engine
1431 .members
1432 .values()
1433 .filter(|m| m.state == MemberStatus::Suspect)
1434 .count();
1435 assert!(suspect_count > 0, "expected at least one Suspect member");
1436 }
1437
1438 #[tokio::test]
1439 async fn stale_relay_entries_cleaned_up() {
1440 let (tx, _rx) = mpsc::channel(16);
1441 let config = GossipConfig {
1442 probe_timeout: Duration::from_millis(0), ..GossipConfig::default()
1444 };
1445 let mut engine = GossipEngine::new(NodeId::new(), test_addr(6379), config, tx);
1446
1447 engine.relay_pending.insert(
1449 999,
1450 RelayEntry {
1451 requester: test_addr(6380),
1452 original_seq: 1,
1453 sent_at: Instant::now() - Duration::from_secs(10),
1454 },
1455 );
1456
1457 engine.tick();
1458
1459 assert!(engine.relay_pending.is_empty());
1461 }
1462
1463 #[tokio::test]
1464 async fn apply_role_changed_updates_member() {
1465 let (tx, mut rx) = mpsc::channel(16);
1466 let mut engine =
1467 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1468
1469 let remote = NodeId::new();
1470 let primary = NodeId::new();
1471 engine.add_seed(remote, test_addr(6380));
1472
1473 let msg = GossipMessage::Ping {
1475 seq: 1,
1476 sender: remote,
1477 updates: vec![NodeUpdate::RoleChanged {
1478 node: remote,
1479 incarnation: 2,
1480 is_primary: false,
1481 replicates: Some(primary),
1482 }],
1483 };
1484 engine.handle_message(msg, test_addr(6380)).await;
1485
1486 let member = engine.members.get(&remote).unwrap();
1487 assert!(!member.is_primary);
1488 assert_eq!(member.replicates, Some(primary));
1489 assert_eq!(member.incarnation, 2);
1490
1491 let mut found = false;
1493 while let Ok(event) = rx.try_recv() {
1494 if let GossipEvent::RoleChanged(id, is_primary, replicates) = event {
1495 if id == remote {
1496 assert!(!is_primary);
1497 assert_eq!(replicates, Some(primary));
1498 found = true;
1499 break;
1500 }
1501 }
1502 }
1503 assert!(found, "expected RoleChanged event for remote");
1504 }
1505
1506 #[tokio::test]
1507 async fn stale_role_changed_ignored() {
1508 let (tx, mut rx) = mpsc::channel(16);
1509 let mut engine =
1510 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1511
1512 let remote = NodeId::new();
1513 engine.members.insert(
1514 remote,
1515 MemberState {
1516 id: remote,
1517 addr: test_addr(6380),
1518 incarnation: 10,
1519 state: MemberStatus::Alive,
1520 state_change: Instant::now(),
1521 is_primary: true,
1522 replicates: None,
1523 slots: vec![],
1524 },
1525 );
1526
1527 let msg = GossipMessage::Ping {
1529 seq: 1,
1530 sender: remote,
1531 updates: vec![NodeUpdate::RoleChanged {
1532 node: remote,
1533 incarnation: 5, is_primary: false,
1535 replicates: None,
1536 }],
1537 };
1538 engine.handle_message(msg, test_addr(6380)).await;
1539
1540 let member = engine.members.get(&remote).unwrap();
1542 assert!(member.is_primary, "stale update should not change role");
1543
1544 let mut role_changed = false;
1546 while let Ok(event) = rx.try_recv() {
1547 if matches!(event, GossipEvent::RoleChanged(..)) {
1548 role_changed = true;
1549 }
1550 }
1551 assert!(
1552 !role_changed,
1553 "stale role update should not emit RoleChanged"
1554 );
1555 }
1556
1557 #[tokio::test]
1558 async fn excessive_incarnation_jump_rejected() {
1559 let (tx, _rx) = mpsc::channel(16);
1560 let mut engine =
1561 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1562
1563 let remote = NodeId::new();
1564 engine.add_seed(remote, test_addr(6380));
1565 let msg = GossipMessage::Ping {
1569 seq: 1,
1570 sender: remote,
1571 updates: vec![NodeUpdate::Alive {
1572 node: remote,
1573 addr: test_addr(6380),
1574 incarnation: MAX_INCARNATION_JUMP + 1,
1575 }],
1576 };
1577 engine.handle_message(msg, test_addr(6380)).await;
1578
1579 let member = engine.members.get(&remote).unwrap();
1581 assert_eq!(
1582 member.incarnation, 0,
1583 "incarnation should not change after excessive jump"
1584 );
1585 }
1586
1587 #[tokio::test]
1588 async fn valid_incarnation_jump_accepted() {
1589 let (tx, _rx) = mpsc::channel(16);
1590 let mut engine =
1591 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
1592
1593 let remote = NodeId::new();
1594 engine.add_seed(remote, test_addr(6380));
1595
1596 let msg = GossipMessage::Ping {
1598 seq: 1,
1599 sender: remote,
1600 updates: vec![NodeUpdate::Alive {
1601 node: remote,
1602 addr: test_addr(6380),
1603 incarnation: MAX_INCARNATION_JUMP,
1604 }],
1605 };
1606 engine.handle_message(msg, test_addr(6380)).await;
1607
1608 let member = engine.members.get(&remote).unwrap();
1609 assert_eq!(
1610 member.incarnation, MAX_INCARNATION_JUMP,
1611 "valid incarnation jump should be accepted"
1612 );
1613 }
1614}