1use std::collections::HashMap;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use crate::cluster::failure_detector::DEFAULT_THRESHOLD;
36use crate::cluster::peer::PeerState;
37use crate::cluster::pool::ServerPool;
38use crate::events::{ClusterEvent, EventManager};
39use crate::hashkit::{token::parse_token, DynToken};
40
41pub const DEFAULT_GOSSIP_INTERVAL_MS: u64 = 1_000;
44
45pub const DEFAULT_SEEDS_CHECK_INTERVAL_MS: u64 = 30_000;
47
48#[derive(Clone, Debug)]
50pub struct GossipConfig {
51 pub enabled: bool,
53 pub interval: Duration,
55 pub seeds_check_interval: Duration,
58}
59
60impl Default for GossipConfig {
61 fn default() -> Self {
62 Self {
63 enabled: false,
64 interval: Duration::from_millis(DEFAULT_GOSSIP_INTERVAL_MS),
65 seeds_check_interval: Duration::from_millis(DEFAULT_SEEDS_CHECK_INTERVAL_MS),
66 }
67 }
68}
69
70#[derive(Clone, Debug, Eq, PartialEq)]
72pub struct SeedRecord {
73 pub host: String,
75 pub port: u16,
77 pub rack: String,
79 pub dc: String,
81 pub tokens: Vec<DynToken>,
83}
84
85#[derive(Clone, Debug)]
90pub struct GossipNode {
91 pub dc: String,
93 pub rack: String,
95 pub host: String,
97 pub port: u16,
99 pub tokens: Vec<DynToken>,
101 pub state: PeerState,
103 pub ts_secs: u64,
105 pub is_local: bool,
107}
108
109#[derive(Clone, Debug, Default)]
117pub struct GossipState {
118 by_token: HashMap<(String, String, String), GossipNode>,
119 by_name: HashMap<(String, String, String), GossipNode>,
120 node_count: usize,
121}
122
123impl GossipState {
124 #[must_use]
134 pub fn new() -> Self {
135 Self::default()
136 }
137
138 #[must_use]
140 pub fn node_count(&self) -> usize {
141 self.node_count
142 }
143
144 fn token_key(node: &GossipNode) -> (String, String, String) {
146 let primary = node
147 .tokens
148 .first()
149 .map(|t| format!("{}", t.get_int()))
150 .unwrap_or_default();
151 (node.dc.clone(), node.rack.clone(), primary)
152 }
153
154 fn name_key(node: &GossipNode) -> (String, String, String) {
155 (node.dc.clone(), node.rack.clone(), node.host.clone())
156 }
157
158 pub fn add_or_update(&mut self, node: GossipNode) -> GossipStep {
187 let token_key = Self::token_key(&node);
188 let name_key = Self::name_key(&node);
189 if let Some(existing) = self.by_token.get_mut(&token_key) {
190 if existing.host == node.host {
191 if node.ts_secs > existing.ts_secs {
192 let changed = existing.state != node.state;
193 existing.state = node.state;
194 existing.ts_secs = node.ts_secs;
195 if changed {
196 return GossipStep::StateChanged;
197 }
198 return GossipStep::TimestampUpdated;
199 }
200 GossipStep::Unchanged
201 } else {
202 let old_name_key = Self::name_key(existing);
204 self.by_name.remove(&old_name_key);
205 *existing = node.clone();
206 self.by_name.insert(name_key, node);
207 GossipStep::Replaced
208 }
209 } else {
210 self.by_token.insert(token_key, node.clone());
211 self.by_name.insert(name_key, node);
212 self.node_count += 1;
213 GossipStep::Added
214 }
215 }
216
217 pub fn nodes(&self) -> impl Iterator<Item = &GossipNode> + '_ {
219 self.by_token.values()
220 }
221
222 pub fn run_failure_detector(&mut self, now_secs: u64, interval_ms: u64) {
244 let delta_secs = (interval_ms / 1000).saturating_mul(40);
245 for node in self.by_token.values_mut() {
246 if node.is_local {
247 continue;
248 }
249 if now_secs.saturating_sub(node.ts_secs) > delta_secs {
250 node.state = PeerState::Down;
251 }
252 }
253 for node in self.by_name.values_mut() {
255 if node.is_local {
256 continue;
257 }
258 if now_secs.saturating_sub(node.ts_secs) > delta_secs {
259 node.state = PeerState::Down;
260 }
261 }
262 }
263}
264
265#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
267pub enum GossipStep {
268 Added,
270 StateChanged,
272 TimestampUpdated,
274 Replaced,
276 Unchanged,
278}
279
280pub fn parse_seed_node(raw: &str) -> Result<SeedRecord, String> {
293 let parts: Vec<&str> = raw.splitn(5, ':').collect();
294 if parts.len() != 5 {
295 return Err(format!("malformed seed entry '{raw}'"));
296 }
297 let mut iter = raw.rsplitn(5, ':');
302 let tokens_str = iter.next().ok_or("missing tokens")?;
303 let dc = iter.next().ok_or("missing dc")?;
304 let rack = iter.next().ok_or("missing rack")?;
305 let port_str = iter.next().ok_or("missing port")?;
306 let host = iter.next().ok_or("missing host")?;
307 if host.is_empty() {
308 return Err(format!("empty host in '{raw}'"));
309 }
310 if rack.is_empty() {
311 return Err(format!("empty rack in '{raw}'"));
312 }
313 if dc.is_empty() {
314 return Err(format!("empty dc in '{raw}'"));
315 }
316 let port: u16 = port_str
317 .parse()
318 .map_err(|e| format!("bad port '{port_str}': {e}"))?;
319 if port == 0 {
320 return Err(format!("zero port in '{raw}'"));
321 }
322 if tokens_str.is_empty() {
323 return Err(format!("empty tokens in '{raw}'"));
324 }
325 let mut tokens = Vec::new();
326 for t in tokens_str.split(',') {
327 let parsed = parse_token(t.as_bytes()).map_err(|e| format!("bad token '{t}': {e}"))?;
328 tokens.push(parsed);
329 }
330 Ok(SeedRecord {
331 host: host.to_string(),
332 port,
333 rack: rack.to_string(),
334 dc: dc.to_string(),
335 tokens,
336 })
337}
338
339pub fn parse_seed_blob(raw: &str) -> Result<Vec<SeedRecord>, String> {
349 let mut out = Vec::new();
350 for piece in raw.split('|') {
351 if piece.is_empty() {
352 continue;
353 }
354 out.push(parse_seed_node(piece)?);
355 }
356 Ok(out)
357}
358
359#[derive(Debug)]
396pub struct GossipHandler {
397 pool: Arc<ServerPool>,
398 threshold: f64,
399 interval: Duration,
400 failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
406 events: Option<Arc<EventManager>>,
413}
414
415impl GossipHandler {
416 #[must_use]
419 pub fn new(pool: Arc<ServerPool>) -> Self {
420 Self {
421 pool,
422 threshold: DEFAULT_THRESHOLD,
423 interval: Duration::from_millis(DEFAULT_GOSSIP_INTERVAL_MS),
424 failure_metrics: None,
425 events: None,
426 }
427 }
428
429 #[must_use]
439 pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
440 self.failure_metrics = Some(metrics);
441 self
442 }
443
444 #[must_use]
452 pub fn with_events(mut self, events: Arc<EventManager>) -> Self {
453 self.events = Some(events);
454 self
455 }
456
457 #[must_use]
459 pub fn events(&self) -> Option<&Arc<EventManager>> {
460 self.events.as_ref()
461 }
462
463 #[must_use]
465 pub fn with_threshold(mut self, threshold: f64) -> Self {
466 self.threshold = threshold;
467 self
468 }
469
470 #[must_use]
474 pub fn with_interval(mut self, interval: Duration) -> Self {
475 self.interval = interval;
476 self
477 }
478
479 #[must_use]
481 pub fn threshold(&self) -> f64 {
482 self.threshold
483 }
484
485 #[must_use]
487 pub fn interval(&self) -> Duration {
488 self.interval
489 }
490
491 #[must_use]
493 pub fn pool(&self) -> &Arc<ServerPool> {
494 &self.pool
495 }
496
497 pub fn record_heartbeat_pname(&self, pname: &str, now: Instant) {
509 let mut peers = self.pool.peers().write();
510 for p in peers.iter_mut() {
511 if p.is_local() {
512 continue;
513 }
514 if p.endpoint().pname() == pname {
515 p.failure_detector_mut().record_heartbeat(now);
516 if p.failure_detector().phi(now) <= self.threshold && p.state() != PeerState::Normal
517 {
518 let prev = p.state();
519 p.set_state(PeerState::Normal, now_secs_wall());
520 if let Some(m) = self.failure_metrics.as_ref() {
521 m.record_peer_state_transition(
522 p.idx(),
523 p.dc(),
524 p.rack(),
525 prev,
526 PeerState::Normal,
527 );
528 }
529 if let Some(ev) = self.events.as_ref() {
530 ev.publish(ClusterEvent::PeerUp {
531 peer_id: p.idx(),
532 dc: p.dc().to_string(),
533 ts: std::time::SystemTime::now(),
534 });
535 }
536 }
537 return;
538 }
539 }
540 }
541
542 pub fn record_heartbeat_idx(&self, peer_idx: u32, now: Instant) {
546 let mut peers = self.pool.peers().write();
547 if let Some(p) = peers.iter_mut().find(|p| p.idx() == peer_idx) {
548 if p.is_local() {
549 return;
550 }
551 p.failure_detector_mut().record_heartbeat(now);
552 if p.failure_detector().phi(now) <= self.threshold && p.state() != PeerState::Normal {
553 let prev = p.state();
554 p.set_state(PeerState::Normal, now_secs_wall());
555 if let Some(m) = self.failure_metrics.as_ref() {
556 m.record_peer_state_transition(
557 p.idx(),
558 p.dc(),
559 p.rack(),
560 prev,
561 PeerState::Normal,
562 );
563 }
564 if let Some(ev) = self.events.as_ref() {
565 ev.publish(ClusterEvent::PeerUp {
566 peer_id: p.idx(),
567 dc: p.dc().to_string(),
568 ts: std::time::SystemTime::now(),
569 });
570 }
571 }
572 }
573 }
574
575 pub fn evaluate(&self, now: Instant) -> Vec<(u32, PeerState)> {
584 let mut peers = self.pool.peers().write();
585 let mut transitions = Vec::new();
586 for p in peers.iter_mut() {
587 if p.is_local() {
588 continue;
589 }
590 let phi = p.failure_detector().phi(now);
591 if let Some(m) = self.failure_metrics.as_ref() {
592 m.observe_phi(p.idx(), p.dc(), p.rack(), phi);
593 }
594 let target = if p.failure_detector().last_heartbeat().is_some() && phi <= self.threshold
595 {
596 PeerState::Normal
597 } else {
598 PeerState::Down
599 };
600 let prev = p.state();
601 if prev != target {
602 p.set_state(target, now_secs_wall());
603 transitions.push((p.idx(), target));
604 if let Some(m) = self.failure_metrics.as_ref() {
605 m.record_peer_state_transition(p.idx(), p.dc(), p.rack(), prev, target);
606 }
607 if let Some(ev) = self.events.as_ref() {
608 let ts = std::time::SystemTime::now();
609 match target {
610 PeerState::Normal => ev.publish(ClusterEvent::PeerUp {
611 peer_id: p.idx(),
612 dc: p.dc().to_string(),
613 ts,
614 }),
615 PeerState::Down => ev.publish(ClusterEvent::PeerDown {
616 peer_id: p.idx(),
617 dc: p.dc().to_string(),
618 phi,
619 ts,
620 }),
621 _ => {}
622 }
623 }
624 } else if let Some(m) = self.failure_metrics.as_ref() {
625 m.observe_peer_state(p.idx(), p.dc(), p.rack(), target);
626 }
627 }
628 transitions
629 }
630
631 pub fn mark_down_pname(&self, pname: &str) {
636 let mut peers = self.pool.peers().write();
637 for p in peers.iter_mut() {
638 if p.is_local() {
639 continue;
640 }
641 if p.endpoint().pname() == pname && p.state() != PeerState::Down {
642 let prev = p.state();
643 p.set_state(PeerState::Down, now_secs_wall());
644 if let Some(m) = self.failure_metrics.as_ref() {
645 m.record_peer_state_transition(
646 p.idx(),
647 p.dc(),
648 p.rack(),
649 prev,
650 PeerState::Down,
651 );
652 }
653 if let Some(ev) = self.events.as_ref() {
654 ev.publish(ClusterEvent::PeerDown {
655 peer_id: p.idx(),
656 dc: p.dc().to_string(),
657 phi: p.failure_detector().phi(Instant::now()),
658 ts: std::time::SystemTime::now(),
659 });
660 }
661 return;
662 }
663 }
664 }
665
666 pub fn reset_detector(&self, peer_idx: u32) {
670 let mut peers = self.pool.peers().write();
671 if let Some(p) = peers.iter_mut().find(|p| p.idx() == peer_idx) {
672 p.failure_detector_mut().reset();
673 }
674 }
675}
676
677fn now_secs_wall() -> u64 {
678 std::time::SystemTime::now()
679 .duration_since(std::time::UNIX_EPOCH)
680 .map_or(0, |d| d.as_secs())
681}
682
683#[cfg(test)]
684mod tests {
685 use super::*;
686
687 fn node(dc: &str, rack: &str, host: &str, tok: u32, ts: u64) -> GossipNode {
688 GossipNode {
689 dc: dc.into(),
690 rack: rack.into(),
691 host: host.into(),
692 port: 8101,
693 tokens: vec![DynToken::from_u32(tok)],
694 state: PeerState::Normal,
695 ts_secs: ts,
696 is_local: false,
697 }
698 }
699
700 #[test]
701 fn add_then_update_state() {
702 let mut s = GossipState::new();
703 assert_eq!(
704 s.add_or_update(node("d", "r", "h", 7, 1)),
705 GossipStep::Added
706 );
707 let mut n2 = node("d", "r", "h", 7, 2);
708 n2.state = PeerState::Down;
709 assert_eq!(s.add_or_update(n2), GossipStep::StateChanged);
710 }
711
712 #[test]
713 fn ip_replacement() {
714 let mut s = GossipState::new();
715 s.add_or_update(node("d", "r", "h1", 7, 1));
716 let n2 = node("d", "r", "h2", 7, 2);
717 assert_eq!(s.add_or_update(n2), GossipStep::Replaced);
718 }
719
720 #[test]
721 fn stale_update_ignored() {
722 let mut s = GossipState::new();
723 s.add_or_update(node("d", "r", "h", 7, 5));
724 let stale = node("d", "r", "h", 7, 1);
725 assert_eq!(s.add_or_update(stale), GossipStep::Unchanged);
726 }
727
728 #[test]
729 fn parse_one_seed() {
730 let r = parse_seed_node("10.0.0.1:8101:rA:dc1:1383429731").unwrap();
731 assert_eq!(r.host, "10.0.0.1");
732 assert_eq!(r.port, 8101);
733 assert_eq!(r.rack, "rA");
734 assert_eq!(r.dc, "dc1");
735 }
736
737 #[test]
738 fn parse_multi_token_seed() {
739 let r = parse_seed_node("h:1:r:d:1,2,3").unwrap();
740 assert_eq!(r.tokens.len(), 3);
741 }
742
743 #[test]
744 fn parse_blob_with_pipe() {
745 let v = parse_seed_blob("h1:1:r:d:1|h2:2:r:d:2").unwrap();
746 assert_eq!(v.len(), 2);
747 }
748
749 #[test]
750 fn parse_seed_rejects_short() {
751 assert!(parse_seed_node("h:1:r:d").is_err());
752 }
753
754 #[test]
755 fn failure_detector_ages_node_to_down() {
756 let mut s = GossipState::new();
757 s.add_or_update(node("d", "r", "h", 7, 0));
758 s.run_failure_detector(1000, 1000); assert_eq!(s.nodes().next().unwrap().state, PeerState::Down);
760 }
761
762 mod handler_helpers {
766 use std::sync::Arc;
767
768 use crate::cluster::peer::{Peer, PeerEndpoint};
769 use crate::cluster::pool::{PoolConfig, ServerPool};
770 use crate::hashkit::DynToken;
771
772 pub fn pool() -> Arc<ServerPool> {
773 let cfg = PoolConfig {
774 dc: "dc1".into(),
775 rack: "r1".into(),
776 enable_gossip: true,
777 ..PoolConfig::default()
778 };
779 let local = Peer::new(
780 0,
781 PeerEndpoint::tcp("127.0.0.1".into(), 8101),
782 "r1".into(),
783 "dc1".into(),
784 vec![DynToken::from_u32(0)],
785 true,
786 true,
787 false,
788 );
789 let remote = Peer::new(
790 1,
791 PeerEndpoint::tcp("127.0.0.1".into(), 8102),
792 "r1".into(),
793 "dc1".into(),
794 vec![DynToken::from_u32(2_147_483_648)],
795 false,
796 true,
797 false,
798 );
799 Arc::new(ServerPool::new(cfg, vec![local, remote]))
800 }
801 }
802
803 fn remote_state(pool: &super::ServerPool) -> PeerState {
804 pool.peers()
805 .read()
806 .iter()
807 .find(|p| !p.is_local())
808 .map_or(PeerState::Unknown, super::super::peer::Peer::state)
809 }
810
811 #[test]
812 fn handler_first_heartbeat_promotes_to_normal() {
813 let pool = handler_helpers::pool();
814 let handler = GossipHandler::new(pool.clone());
815 let t0 = std::time::Instant::now();
816 assert_eq!(remote_state(&pool), PeerState::Down);
817 handler.record_heartbeat_pname("127.0.0.1:8102", t0);
818 assert_eq!(remote_state(&pool), PeerState::Normal);
821 }
822
823 #[test]
824 fn handler_steady_heartbeats_keep_peer_normal() {
825 let pool = handler_helpers::pool();
828 let handler = GossipHandler::new(pool.clone());
829 let t0 = std::time::Instant::now();
830 for i in 0..100 {
831 let now = t0 + std::time::Duration::from_secs(i);
832 handler.record_heartbeat_pname("127.0.0.1:8102", now);
833 handler.evaluate(now);
834 }
835 let after_last =
836 t0 + std::time::Duration::from_secs(99) + std::time::Duration::from_millis(10);
837 let phi = pool
838 .peers()
839 .read()
840 .iter()
841 .find(|p| !p.is_local())
842 .map_or(0.0, |p| p.failure_detector().phi(after_last));
843 assert!(
844 phi < 1.0,
845 "phi should be < 1.0 right after a heartbeat, got {phi}"
846 );
847 assert_eq!(remote_state(&pool), PeerState::Normal);
848 }
849
850 #[test]
851 fn handler_silence_transitions_peer_to_down() {
852 let pool = handler_helpers::pool();
855 let handler = GossipHandler::new(pool.clone());
856 let t0 = std::time::Instant::now();
857 for i in 0..100 {
858 let now = t0 + std::time::Duration::from_secs(i);
859 handler.record_heartbeat_pname("127.0.0.1:8102", now);
860 }
861 let later = t0 + std::time::Duration::from_secs(159);
864 let transitions = handler.evaluate(later);
865 assert_eq!(transitions, vec![(1, PeerState::Down)]);
866 assert_eq!(remote_state(&pool), PeerState::Down);
867 }
868
869 #[test]
870 fn handler_evaluate_no_data_keeps_peer_down() {
871 let pool = handler_helpers::pool();
873 let handler = GossipHandler::new(pool.clone());
874 let t0 = std::time::Instant::now();
875 let transitions = handler.evaluate(t0);
876 assert!(transitions.is_empty());
877 assert_eq!(remote_state(&pool), PeerState::Down);
878 }
879
880 #[test]
881 fn handler_unknown_pname_is_silent() {
882 let pool = handler_helpers::pool();
883 let handler = GossipHandler::new(pool.clone());
884 let t0 = std::time::Instant::now();
885 handler.record_heartbeat_pname("10.0.0.99:9999", t0);
886 assert_eq!(remote_state(&pool), PeerState::Down);
887 }
888
889 #[test]
890 fn handler_mark_down_overrides_normal() {
891 let pool = handler_helpers::pool();
892 let handler = GossipHandler::new(pool.clone());
893 let t0 = std::time::Instant::now();
894 handler.record_heartbeat_pname("127.0.0.1:8102", t0);
895 assert_eq!(remote_state(&pool), PeerState::Normal);
896 handler.mark_down_pname("127.0.0.1:8102");
897 assert_eq!(remote_state(&pool), PeerState::Down);
898 }
899
900 #[test]
906 fn handler_evaluate_records_normal_to_down_transition() {
907 let pool = handler_helpers::pool();
908 let metrics = std::sync::Arc::new(crate::stats::FailureMetrics::new());
909 let handler = GossipHandler::new(pool.clone()).with_failure_metrics(metrics.clone());
910 let t0 = std::time::Instant::now();
911 for i in 0..100 {
913 let now = t0 + std::time::Duration::from_secs(i);
914 handler.record_heartbeat_pname("127.0.0.1:8102", now);
915 handler.evaluate(now);
916 }
917 let mid_snap = metrics.snapshot();
918 let normal_count = mid_snap
919 .peer_state_transitions
920 .iter()
921 .filter(|t| t.to == PeerState::Normal)
922 .map(|t| t.count)
923 .sum::<u64>();
924 assert_eq!(
927 normal_count, 1,
928 "got transitions: {:?}",
929 mid_snap.peer_state_transitions
930 );
931
932 let later = t0 + std::time::Duration::from_secs(159);
935 let transitions = handler.evaluate(later);
936 assert_eq!(transitions, vec![(1, PeerState::Down)]);
937
938 let snap = metrics.snapshot();
939 let down_entry = snap
940 .peer_state_transitions
941 .iter()
942 .find(|t| t.from == PeerState::Normal && t.to == PeerState::Down)
943 .expect("normal->down transition should be recorded");
944 assert_eq!(down_entry.count, 1);
945 assert_eq!(down_entry.peer_idx, 1);
946
947 let current = snap
950 .peer_state_current
951 .iter()
952 .find(|c| c.peer_idx == 1)
953 .expect("peer_state_current entry should be present");
954 assert_eq!(current.state, PeerState::Down);
955 assert_eq!(current.dc, "dc1");
956 assert_eq!(current.rack, "r1");
957
958 let phi_entry = snap
960 .peer_phi
961 .iter()
962 .find(|p| p.peer_idx == 1)
963 .expect("gossip_phi_score gauge should be populated");
964 assert!(
965 phi_entry.phi >= 0.0,
966 "phi should be non-negative; got {}",
967 phi_entry.phi
968 );
969 }
970}