1use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::time::{Duration, Instant};
18
19use rand::prelude::IndexedRandom;
20use tokio::sync::mpsc;
21use tracing::{debug, info, trace, warn};
22
23use crate::message::{GossipMessage, MemberInfo, NodeUpdate};
24use crate::{NodeId, SlotRange};
25
26#[derive(Debug, Clone)]
28pub struct GossipConfig {
29 pub protocol_period: Duration,
31 pub probe_timeout: Duration,
33 pub suspicion_mult: u32,
35 pub indirect_probes: usize,
37 pub max_piggyback: usize,
39 pub gossip_port_offset: u16,
41}
42
43impl Default for GossipConfig {
44 fn default() -> Self {
45 Self {
46 protocol_period: Duration::from_secs(1),
47 probe_timeout: Duration::from_millis(500),
48 suspicion_mult: 5,
49 indirect_probes: 3,
50 max_piggyback: 10,
51 gossip_port_offset: 10000,
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
58pub struct MemberState {
59 pub id: NodeId,
60 pub addr: SocketAddr,
61 pub incarnation: u64,
62 pub state: MemberStatus,
63 pub state_change: Instant,
64 pub is_primary: bool,
65 pub slots: Vec<SlotRange>,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum MemberStatus {
71 Alive,
72 Suspect,
73 Dead,
74 Left,
75}
76
77#[derive(Debug, Clone)]
79pub enum GossipEvent {
80 MemberJoined(NodeId, SocketAddr),
82 MemberSuspected(NodeId),
84 MemberFailed(NodeId),
86 MemberLeft(NodeId),
88 MemberAlive(NodeId),
90}
91
92pub struct GossipEngine {
94 local_id: NodeId,
96 local_addr: SocketAddr,
98 incarnation: u64,
100 config: GossipConfig,
102 members: HashMap<NodeId, MemberState>,
104 pending_updates: Vec<NodeUpdate>,
106 next_seq: u64,
108 pending_probes: HashMap<u64, PendingProbe>,
110 event_tx: mpsc::Sender<GossipEvent>,
112}
113
114struct PendingProbe {
115 target: NodeId,
116 sent_at: Instant,
117 indirect: bool,
118}
119
120impl GossipEngine {
121 pub fn new(
123 local_id: NodeId,
124 local_addr: SocketAddr,
125 config: GossipConfig,
126 event_tx: mpsc::Sender<GossipEvent>,
127 ) -> Self {
128 Self {
129 local_id,
130 local_addr,
131 incarnation: 1,
132 config,
133 members: HashMap::new(),
134 pending_updates: Vec::new(),
135 next_seq: 1,
136 pending_probes: HashMap::new(),
137 event_tx,
138 }
139 }
140
141 pub fn local_id(&self) -> NodeId {
143 self.local_id
144 }
145
146 pub fn members(&self) -> impl Iterator<Item = &MemberState> {
148 self.members.values()
149 }
150
151 pub fn alive_count(&self) -> usize {
153 self.members
154 .values()
155 .filter(|m| m.state == MemberStatus::Alive)
156 .count()
157 }
158
159 pub fn add_seed(&mut self, id: NodeId, addr: SocketAddr) {
161 if id == self.local_id {
162 return;
163 }
164 self.members.entry(id).or_insert_with(|| MemberState {
165 id,
166 addr,
167 incarnation: 0,
168 state: MemberStatus::Alive,
169 state_change: Instant::now(),
170 is_primary: false,
171 slots: Vec::new(),
172 });
173 }
174
175 pub async fn handle_message(
177 &mut self,
178 msg: GossipMessage,
179 from: SocketAddr,
180 ) -> Option<GossipMessage> {
181 match msg {
182 GossipMessage::Ping {
183 seq,
184 sender,
185 updates,
186 } => {
187 trace!("received ping seq={} from {}", seq, sender);
188 self.apply_updates(&updates).await;
189 self.ensure_member(sender, from);
190
191 let response_updates = self.collect_updates();
193 Some(GossipMessage::Ack {
194 seq,
195 sender: self.local_id,
196 updates: response_updates,
197 })
198 }
199
200 GossipMessage::PingReq {
201 seq,
202 sender,
203 target,
204 target_addr: _,
205 } => {
206 trace!(
207 "received ping-req seq={} from {} for {}",
208 seq,
209 sender,
210 target
211 );
212 self.ensure_member(sender, from);
213
214 None
217 }
218
219 GossipMessage::Ack {
220 seq,
221 sender,
222 updates,
223 } => {
224 trace!("received ack seq={} from {}", seq, sender);
225 self.apply_updates(&updates).await;
226 self.ensure_member(sender, from);
227
228 if let Some(probe) = self.pending_probes.remove(&seq) {
230 if self.members.get(&probe.target).map(|m| m.state)
231 == Some(MemberStatus::Suspect)
232 {
233 self.mark_alive(probe.target).await;
235 }
236 }
237 None
238 }
239
240 GossipMessage::Join {
241 sender,
242 sender_addr,
243 } => {
244 info!("node {} joining from {}", sender, sender_addr);
245 self.ensure_member(sender, sender_addr);
246
247 self.queue_update(NodeUpdate::Alive {
249 node: sender,
250 addr: sender_addr,
251 incarnation: 1,
252 });
253
254 let members: Vec<MemberInfo> = self
256 .members
257 .values()
258 .filter(|m| m.state == MemberStatus::Alive)
259 .map(|m| MemberInfo {
260 id: m.id,
261 addr: m.addr,
262 incarnation: m.incarnation,
263 is_primary: m.is_primary,
264 slots: m.slots.clone(),
265 })
266 .collect();
267
268 Some(GossipMessage::Welcome {
269 sender: self.local_id,
270 members,
271 })
272 }
273
274 GossipMessage::Welcome { sender, members } => {
275 info!(
276 "received welcome from {} with {} members",
277 sender,
278 members.len()
279 );
280 self.ensure_member(sender, from);
281
282 for member in members {
283 if member.id != self.local_id {
284 self.members
285 .entry(member.id)
286 .or_insert_with(|| MemberState {
287 id: member.id,
288 addr: member.addr,
289 incarnation: member.incarnation,
290 state: MemberStatus::Alive,
291 state_change: Instant::now(),
292 is_primary: member.is_primary,
293 slots: member.slots,
294 });
295 }
296 }
297 None
298 }
299 }
300 }
301
302 pub fn tick(&mut self) -> Option<(SocketAddr, GossipMessage)> {
304 self.check_probe_timeouts();
306
307 self.check_suspicion_timeouts();
309
310 let target_info = {
312 let alive_members: Vec<_> = self
313 .members
314 .values()
315 .filter(|m| m.state == MemberStatus::Alive || m.state == MemberStatus::Suspect)
316 .map(|m| (m.id, m.addr))
317 .collect();
318
319 if alive_members.is_empty() {
320 return None;
321 }
322
323 *alive_members.choose(&mut rand::rng())?
324 };
325
326 let (target_id, target_addr) = target_info;
327 let seq = self.next_seq;
328 self.next_seq += 1;
329
330 let updates = self.collect_updates();
331 let msg = GossipMessage::Ping {
332 seq,
333 sender: self.local_id,
334 updates,
335 };
336
337 self.pending_probes.insert(
338 seq,
339 PendingProbe {
340 target: target_id,
341 sent_at: Instant::now(),
342 indirect: false,
343 },
344 );
345
346 Some((target_addr, msg))
347 }
348
349 pub fn create_join_message(&self) -> GossipMessage {
351 GossipMessage::Join {
352 sender: self.local_id,
353 sender_addr: self.local_addr,
354 }
355 }
356
357 fn ensure_member(&mut self, id: NodeId, addr: SocketAddr) {
358 if id == self.local_id {
359 return;
360 }
361 self.members.entry(id).or_insert_with(|| MemberState {
362 id,
363 addr,
364 incarnation: 0,
365 state: MemberStatus::Alive,
366 state_change: Instant::now(),
367 is_primary: false,
368 slots: Vec::new(),
369 });
370 }
371
372 async fn apply_updates(&mut self, updates: &[NodeUpdate]) {
373 for update in updates {
374 match update {
375 NodeUpdate::Alive {
376 node,
377 addr,
378 incarnation,
379 } => {
380 if *node == self.local_id {
381 continue;
383 }
384 if let Some(member) = self.members.get_mut(node) {
385 if *incarnation > member.incarnation {
386 member.incarnation = *incarnation;
387 member.addr = *addr;
388 if member.state != MemberStatus::Alive {
389 member.state = MemberStatus::Alive;
390 member.state_change = Instant::now();
391 if self
392 .event_tx
393 .send(GossipEvent::MemberAlive(*node))
394 .await
395 .is_err()
396 {
397 warn!("event channel closed, cannot send MemberAlive event");
398 }
399 }
400 }
401 } else {
402 self.members.insert(
403 *node,
404 MemberState {
405 id: *node,
406 addr: *addr,
407 incarnation: *incarnation,
408 state: MemberStatus::Alive,
409 state_change: Instant::now(),
410 is_primary: false,
411 slots: Vec::new(),
412 },
413 );
414 let _ = self
415 .event_tx
416 .send(GossipEvent::MemberJoined(*node, *addr))
417 .await;
418 }
419 }
420
421 NodeUpdate::Suspect { node, incarnation } => {
422 if *node == self.local_id {
423 if *incarnation >= self.incarnation {
425 self.incarnation = incarnation + 1;
426 self.queue_update(NodeUpdate::Alive {
427 node: self.local_id,
428 addr: self.local_addr,
429 incarnation: self.incarnation,
430 });
431 }
432 continue;
433 }
434 if let Some(member) = self.members.get_mut(node) {
435 if *incarnation >= member.incarnation && member.state == MemberStatus::Alive
436 {
437 member.state = MemberStatus::Suspect;
438 member.state_change = Instant::now();
439 let _ = self
440 .event_tx
441 .send(GossipEvent::MemberSuspected(*node))
442 .await;
443 }
444 }
445 }
446
447 NodeUpdate::Dead { node, incarnation } => {
448 if *node == self.local_id {
449 self.incarnation = incarnation + 1;
451 self.queue_update(NodeUpdate::Alive {
452 node: self.local_id,
453 addr: self.local_addr,
454 incarnation: self.incarnation,
455 });
456 continue;
457 }
458 if let Some(member) = self.members.get_mut(node) {
459 if *incarnation >= member.incarnation && member.state != MemberStatus::Dead
460 {
461 member.state = MemberStatus::Dead;
462 member.state_change = Instant::now();
463 if self
464 .event_tx
465 .send(GossipEvent::MemberFailed(*node))
466 .await
467 .is_err()
468 {
469 warn!("event channel closed, cannot send MemberFailed event");
470 }
471 }
472 }
473 }
474
475 NodeUpdate::Left { node } => {
476 if *node == self.local_id {
477 continue;
478 }
479 if let Some(member) = self.members.get_mut(node) {
480 if member.state != MemberStatus::Left {
481 member.state = MemberStatus::Left;
482 member.state_change = Instant::now();
483 if self
484 .event_tx
485 .send(GossipEvent::MemberLeft(*node))
486 .await
487 .is_err()
488 {
489 warn!("event channel closed, cannot send MemberLeft event");
490 }
491 }
492 }
493 }
494 }
495 }
496 }
497
498 async fn mark_alive(&mut self, node: NodeId) {
499 if let Some(member) = self.members.get_mut(&node) {
500 if member.state == MemberStatus::Suspect {
501 member.state = MemberStatus::Alive;
502 member.state_change = Instant::now();
503 if self
504 .event_tx
505 .send(GossipEvent::MemberAlive(node))
506 .await
507 .is_err()
508 {
509 warn!("event channel closed, cannot send MemberAlive event");
510 }
511 }
512 }
513 }
514
515 fn check_probe_timeouts(&mut self) {
516 let timeout = self.config.probe_timeout;
517 let now = Instant::now();
518
519 let timed_out: Vec<_> = self
521 .pending_probes
522 .iter()
523 .filter(|(_, probe)| now.duration_since(probe.sent_at) > timeout && !probe.indirect)
524 .map(|(seq, probe)| (*seq, probe.target))
525 .collect();
526
527 for (seq, target) in timed_out {
529 self.pending_probes.remove(&seq);
530
531 let incarnation = self
533 .members
534 .get(&target)
535 .filter(|m| m.state == MemberStatus::Alive)
536 .map(|m| m.incarnation);
537
538 if let Some(inc) = incarnation {
539 if let Some(member) = self.members.get_mut(&target) {
540 debug!("node {} failed to respond, marking suspect", target);
541 member.state = MemberStatus::Suspect;
542 member.state_change = Instant::now();
543 }
544 self.queue_update(NodeUpdate::Suspect {
545 node: target,
546 incarnation: inc,
547 });
548 }
549 }
550 }
551
552 fn check_suspicion_timeouts(&mut self) {
553 let suspicion_timeout = self.config.protocol_period * self.config.suspicion_mult;
554 let now = Instant::now();
555 let mut to_mark_dead = Vec::new();
556
557 for member in self.members.values() {
558 if member.state == MemberStatus::Suspect
559 && now.duration_since(member.state_change) > suspicion_timeout
560 {
561 to_mark_dead.push((member.id, member.incarnation));
562 }
563 }
564
565 for (id, incarnation) in to_mark_dead {
566 if let Some(member) = self.members.get_mut(&id) {
567 warn!("node {} confirmed dead after suspicion timeout", id);
568 member.state = MemberStatus::Dead;
569 member.state_change = Instant::now();
570 self.queue_update(NodeUpdate::Dead {
571 node: id,
572 incarnation,
573 });
574 }
575 }
576 }
577
578 fn queue_update(&mut self, update: NodeUpdate) {
579 self.pending_updates.push(update);
580 if self.pending_updates.len() > self.config.max_piggyback * 2 {
582 self.pending_updates.drain(0..self.config.max_piggyback);
583 }
584 }
585
586 fn collect_updates(&mut self) -> Vec<NodeUpdate> {
587 let count = self.pending_updates.len().min(self.config.max_piggyback);
588 self.pending_updates.drain(0..count).collect()
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595 use std::net::Ipv4Addr;
596
597 fn test_addr(port: u16) -> SocketAddr {
598 SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), port))
599 }
600
601 #[tokio::test]
602 async fn engine_creation() {
603 let (tx, _rx) = mpsc::channel(16);
604 let engine = GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
605 assert_eq!(engine.alive_count(), 0);
606 }
607
608 #[tokio::test]
609 async fn add_seed() {
610 let (tx, _rx) = mpsc::channel(16);
611 let mut engine =
612 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
613
614 let seed_id = NodeId::new();
615 engine.add_seed(seed_id, test_addr(6380));
616 assert_eq!(engine.alive_count(), 1);
617 }
618
619 #[tokio::test]
620 async fn handle_ping() {
621 let (tx, _rx) = mpsc::channel(16);
622 let mut engine =
623 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
624
625 let sender = NodeId::new();
626 let msg = GossipMessage::Ping {
627 seq: 1,
628 sender,
629 updates: vec![],
630 };
631
632 let response = engine.handle_message(msg, test_addr(6380)).await;
633 assert!(matches!(response, Some(GossipMessage::Ack { .. })));
634 assert_eq!(engine.alive_count(), 1);
635 }
636
637 #[tokio::test]
638 async fn handle_join() {
639 let (tx, _rx) = mpsc::channel(16);
640 let mut engine =
641 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
642
643 let joiner = NodeId::new();
644 let msg = GossipMessage::Join {
645 sender: joiner,
646 sender_addr: test_addr(6380),
647 };
648
649 let response = engine.handle_message(msg, test_addr(6380)).await;
650 assert!(matches!(response, Some(GossipMessage::Welcome { .. })));
651 assert_eq!(engine.alive_count(), 1);
652 }
653
654 #[tokio::test]
655 async fn tick_with_no_members() {
656 let (tx, _rx) = mpsc::channel(16);
657 let mut engine =
658 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
659
660 let probe = engine.tick();
661 assert!(probe.is_none());
662 }
663
664 #[tokio::test]
665 async fn tick_with_members() {
666 let (tx, _rx) = mpsc::channel(16);
667 let mut engine =
668 GossipEngine::new(NodeId::new(), test_addr(6379), GossipConfig::default(), tx);
669
670 engine.add_seed(NodeId::new(), test_addr(6380));
671 let probe = engine.tick();
672 assert!(probe.is_some());
673
674 let (addr, msg) = probe.unwrap();
675 assert_eq!(addr.port(), 6380);
676 assert!(matches!(msg, GossipMessage::Ping { .. }));
677 }
678
679 #[tokio::test]
680 async fn create_join_message() {
681 let (tx, _rx) = mpsc::channel(16);
682 let id = NodeId::new();
683 let addr = test_addr(6379);
684 let engine = GossipEngine::new(id, addr, GossipConfig::default(), tx);
685
686 let msg = engine.create_join_message();
687 match msg {
688 GossipMessage::Join {
689 sender,
690 sender_addr,
691 } => {
692 assert_eq!(sender, id);
693 assert_eq!(sender_addr, addr);
694 }
695 _ => panic!("expected Join message"),
696 }
697 }
698}