1use std::collections::BTreeMap;
48use std::fmt;
49use std::net::SocketAddr;
50use std::sync::{Arc, Mutex};
51use std::time::Duration;
52
53use chitchat::transport::{Transport, UdpTransport};
54use chitchat::{
55 spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, FailureDetectorConfig,
56 NodeState,
57};
58use hydracache::{
59 CacheError, CacheResult, ClusterCandidate, ClusterDiscovery, ClusterDiscoveryEvent,
60 ClusterEndpoints, ClusterGeneration, ClusterNodeId, ClusterRole,
61};
62use tokio::sync::Mutex as TokioMutex;
63
64const KEY_ADAPTER: &str = "hydracache.discovery.adapter";
65const KEY_ROLE: &str = "hydracache.role";
66const KEY_GENERATION: &str = "hydracache.generation";
67const KEY_ENDPOINT_CONTROL: &str = "hydracache.endpoint.control";
68const KEY_ENDPOINT_INVALIDATION: &str = "hydracache.endpoint.invalidation";
69const KEY_ENDPOINT_DIAGNOSTICS: &str = "hydracache.endpoint.diagnostics";
70const KEY_LIFECYCLE: &str = "hydracache.lifecycle";
71const KEY_LEFT_GENERATION: &str = "hydracache.left.generation";
72const KEY_LEFT_ROLE: &str = "hydracache.left.role";
73const KEY_METADATA_PREFIX: &str = "hydracache.metadata.";
74
75const LIFECYCLE_ACTIVE: &str = "active";
76const LIFECYCLE_LEAVING: &str = "leaving";
77
78const METADATA_LIFECYCLE: &str = "lifecycle";
79const METADATA_LEFT_GENERATION: &str = "left.generation";
80const METADATA_LEFT_ROLE: &str = "left.role";
81
82#[derive(Debug, Clone)]
84pub struct ChitchatDiscoveryConfig {
85 cluster_id: String,
86 node_id: ClusterNodeId,
87 generation: ClusterGeneration,
88 listen_addr: SocketAddr,
89 gossip_advertise_addr: SocketAddr,
90 seed_nodes: Vec<String>,
91 gossip_interval: Duration,
92 marked_for_deletion_grace_period: Duration,
93 failure_detector_config: FailureDetectorConfig,
94}
95
96impl ChitchatDiscoveryConfig {
97 pub fn new(
99 cluster_id: impl Into<String>,
100 node_id: impl Into<ClusterNodeId>,
101 generation: ClusterGeneration,
102 listen_addr: SocketAddr,
103 ) -> Self {
104 Self {
105 cluster_id: cluster_id.into(),
106 node_id: node_id.into(),
107 generation,
108 listen_addr,
109 gossip_advertise_addr: listen_addr,
110 seed_nodes: Vec::new(),
111 gossip_interval: Duration::from_millis(250),
112 marked_for_deletion_grace_period: Duration::from_secs(60),
113 failure_detector_config: FailureDetectorConfig::default(),
114 }
115 }
116
117 pub fn gossip_advertise_addr(mut self, addr: SocketAddr) -> Self {
119 self.gossip_advertise_addr = addr;
120 self
121 }
122
123 pub fn seed_node(mut self, seed: impl Into<String>) -> Self {
125 self.seed_nodes.push(seed.into());
126 self
127 }
128
129 pub fn seed_nodes<I, S>(mut self, seeds: I) -> Self
131 where
132 I: IntoIterator<Item = S>,
133 S: Into<String>,
134 {
135 self.seed_nodes = seeds.into_iter().map(Into::into).collect();
136 self
137 }
138
139 pub fn gossip_interval(mut self, interval: Duration) -> Self {
141 self.gossip_interval = interval;
142 self
143 }
144
145 pub fn marked_for_deletion_grace_period(mut self, period: Duration) -> Self {
147 self.marked_for_deletion_grace_period = period;
148 self
149 }
150
151 pub fn failure_detector_config(mut self, config: FailureDetectorConfig) -> Self {
153 self.failure_detector_config = config;
154 self
155 }
156
157 pub fn cluster_id(&self) -> &str {
159 &self.cluster_id
160 }
161
162 pub fn node_id(&self) -> &ClusterNodeId {
164 &self.node_id
165 }
166
167 pub fn generation(&self) -> ClusterGeneration {
169 self.generation
170 }
171
172 pub fn listen_addr(&self) -> SocketAddr {
174 self.listen_addr
175 }
176
177 pub fn seed_nodes_value(&self) -> &[String] {
179 &self.seed_nodes
180 }
181
182 fn chitchat_id(&self) -> ChitchatId {
183 ChitchatId::new(
184 self.node_id.as_str().to_owned(),
185 self.generation.value(),
186 self.gossip_advertise_addr,
187 )
188 }
189
190 fn into_chitchat_config(self) -> ChitchatConfig {
191 ChitchatConfig {
192 chitchat_id: self.chitchat_id(),
193 cluster_id: self.cluster_id,
194 gossip_interval: self.gossip_interval,
195 listen_addr: self.listen_addr,
196 seed_nodes: self.seed_nodes,
197 failure_detector_config: self.failure_detector_config,
198 marked_for_deletion_grace_period: self.marked_for_deletion_grace_period,
199 catchup_callback: None,
200 extra_liveness_predicate: None,
201 }
202 }
203}
204
205#[derive(Debug, Default)]
206struct DiscoveryState {
207 candidates: BTreeMap<ClusterNodeId, ClusterCandidate>,
208 events: Vec<ClusterDiscoveryEvent>,
209}
210
211pub struct ChitchatDiscovery {
213 chitchat_id: ChitchatId,
214 chitchat: Arc<TokioMutex<Chitchat>>,
215 handle: ChitchatHandle,
216 state: Arc<Mutex<DiscoveryState>>,
217}
218
219impl fmt::Debug for ChitchatDiscovery {
220 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
221 formatter
222 .debug_struct("ChitchatDiscovery")
223 .field("chitchat_id", &self.chitchat_id)
224 .field("candidate_count", &self.candidates().len())
225 .field("event_count", &self.events().len())
226 .finish_non_exhaustive()
227 }
228}
229
230impl Drop for ChitchatDiscovery {
231 fn drop(&mut self) {
232 self.handle.abort();
233 }
234}
235
236impl ChitchatDiscovery {
237 pub async fn spawn_udp(config: ChitchatDiscoveryConfig) -> CacheResult<Self> {
239 Self::spawn_with_transport(config, &UdpTransport).await
240 }
241
242 pub async fn spawn_with_transport(
247 config: ChitchatDiscoveryConfig,
248 transport: &dyn Transport,
249 ) -> CacheResult<Self> {
250 let handle = spawn_chitchat(
251 config.into_chitchat_config(),
252 vec![(KEY_ADAPTER.to_owned(), "chitchat".to_owned())],
253 transport,
254 )
255 .await
256 .map_err(to_cache_error)?;
257 let chitchat_id = handle.chitchat_id().clone();
258 let chitchat = handle.chitchat();
259 let state = Arc::new(Mutex::new(DiscoveryState::default()));
260
261 spawn_live_node_watcher(chitchat.clone(), state.clone());
262
263 Ok(Self {
264 chitchat_id,
265 chitchat,
266 handle,
267 state,
268 })
269 }
270
271 pub fn chitchat_id(&self) -> &ChitchatId {
273 &self.chitchat_id
274 }
275
276 pub fn candidates(&self) -> Vec<ClusterCandidate> {
278 self.state
279 .lock()
280 .expect("chitchat discovery state poisoned")
281 .candidates
282 .values()
283 .cloned()
284 .collect()
285 }
286
287 pub fn events(&self) -> Vec<ClusterDiscoveryEvent> {
289 self.state
290 .lock()
291 .expect("chitchat discovery state poisoned")
292 .events
293 .clone()
294 }
295
296 pub fn gossip_once(&self, addr: SocketAddr) -> CacheResult<()> {
298 self.handle.gossip(addr).map_err(to_cache_error)
299 }
300
301 pub async fn local_value(&self, key: &str) -> Option<String> {
303 self.chitchat
304 .lock()
305 .await
306 .self_node_state()
307 .get(key)
308 .map(ToOwned::to_owned)
309 }
310
311 pub async fn mark_leaving(
341 &self,
342 node_id: impl Into<ClusterNodeId>,
343 generation: ClusterGeneration,
344 role: ClusterRole,
345 ) -> CacheResult<()> {
346 let node_id = node_id.into();
347 if node_id.as_str() != self.chitchat_id.node_id.as_ref() {
348 return Err(CacheError::Backend(format!(
349 "chitchat leave marker can only be written by local node {}; attempted {}",
350 self.chitchat_id.node_id, node_id
351 )));
352 }
353 if role == ClusterRole::Local {
354 return Err(CacheError::Backend(
355 "local caches do not publish chitchat leave markers".to_owned(),
356 ));
357 }
358
359 let mut chitchat = self.chitchat.lock().await;
360 let node_state = chitchat.self_node_state();
361 reject_stale_leave_generation(node_state, generation)?;
362
363 node_state.set(KEY_ROLE, role_to_str(role));
364 node_state.set(KEY_GENERATION, generation.value().to_string());
365 node_state.set(KEY_LIFECYCLE, LIFECYCLE_LEAVING);
366 node_state.set(KEY_LEFT_GENERATION, generation.value().to_string());
367 node_state.set(KEY_LEFT_ROLE, role_to_str(role));
368
369 record_leave_marker(self.state.clone(), node_id, generation, role);
370 Ok(())
371 }
372
373 async fn announce_candidate(&self, mut candidate: ClusterCandidate) -> CacheResult<()> {
374 candidate
375 .metadata
376 .entry("discovery.adapter".to_owned())
377 .or_insert_with(|| "chitchat".to_owned());
378 candidate
379 .metadata
380 .insert(METADATA_LIFECYCLE.to_owned(), LIFECYCLE_ACTIVE.to_owned());
381 candidate.metadata.remove(METADATA_LEFT_GENERATION);
382 candidate.metadata.remove(METADATA_LEFT_ROLE);
383
384 self.chitchat
385 .lock()
386 .await
387 .self_node_state()
388 .set(KEY_ADAPTER, "chitchat");
389 write_candidate_to_chitchat(self.chitchat.clone(), &candidate).await;
390 record_candidate(self.state.clone(), candidate);
391 Ok(())
392 }
393
394 fn push_event(&self, event: ClusterDiscoveryEvent) {
395 self.state
396 .lock()
397 .expect("chitchat discovery state poisoned")
398 .events
399 .push(event);
400 }
401}
402
403#[async_trait::async_trait]
404impl ClusterDiscovery for ChitchatDiscovery {
405 async fn announce(&self, candidate: ClusterCandidate) -> CacheResult<()> {
406 self.announce_candidate(candidate).await
407 }
408
409 async fn mark_live(&self, node_id: ClusterNodeId) -> CacheResult<()> {
410 self.push_event(ClusterDiscoveryEvent::MemberLive(node_id));
411 Ok(())
412 }
413
414 async fn mark_suspect(&self, node_id: ClusterNodeId) -> CacheResult<()> {
415 self.push_event(ClusterDiscoveryEvent::MemberSuspect(node_id));
416 Ok(())
417 }
418
419 async fn mark_dead(&self, node_id: ClusterNodeId) -> CacheResult<()> {
420 self.push_event(ClusterDiscoveryEvent::MemberDead(node_id));
421 Ok(())
422 }
423
424 fn candidates(&self) -> Vec<ClusterCandidate> {
425 ChitchatDiscovery::candidates(self)
426 }
427
428 fn events(&self) -> Vec<ClusterDiscoveryEvent> {
429 ChitchatDiscovery::events(self)
430 }
431}
432
433async fn write_candidate_to_chitchat(
434 chitchat: Arc<TokioMutex<Chitchat>>,
435 candidate: &ClusterCandidate,
436) {
437 let mut chitchat = chitchat.lock().await;
438 let node_state = chitchat.self_node_state();
439 node_state.set(KEY_ROLE, role_to_str(candidate.role));
440 node_state.set(KEY_GENERATION, candidate.generation.value().to_string());
441 node_state.set(KEY_LIFECYCLE, LIFECYCLE_ACTIVE);
442 set_optional(
443 node_state,
444 KEY_ENDPOINT_CONTROL,
445 candidate.endpoints.control.as_deref(),
446 );
447 set_optional(
448 node_state,
449 KEY_ENDPOINT_INVALIDATION,
450 candidate.endpoints.invalidation.as_deref(),
451 );
452 set_optional(
453 node_state,
454 KEY_ENDPOINT_DIAGNOSTICS,
455 candidate.endpoints.diagnostics.as_deref(),
456 );
457 for (key, value) in &candidate.metadata {
458 node_state.set(format!("{KEY_METADATA_PREFIX}{key}"), value);
459 }
460}
461
462fn reject_stale_leave_generation(
463 node_state: &NodeState,
464 generation: ClusterGeneration,
465) -> CacheResult<()> {
466 if let Some(active_generation) = parse_generation(node_state.get(KEY_GENERATION)) {
467 if generation < active_generation {
468 return Err(CacheError::Backend(format!(
469 "stale chitchat leave marker rejected: marker generation {} is older than active generation {}",
470 generation.value(),
471 active_generation.value()
472 )));
473 }
474 }
475 if let Some(left_generation) = parse_generation(node_state.get(KEY_LEFT_GENERATION)) {
476 if generation < left_generation {
477 return Err(CacheError::Backend(format!(
478 "stale chitchat leave marker rejected: marker generation {} is older than previous leave generation {}",
479 generation.value(),
480 left_generation.value()
481 )));
482 }
483 }
484 Ok(())
485}
486
487fn set_optional(node_state: &mut NodeState, key: &str, value: Option<&str>) {
488 if let Some(value) = value {
489 node_state.set(key, value);
490 }
491}
492
493fn spawn_live_node_watcher(chitchat: Arc<TokioMutex<Chitchat>>, state: Arc<Mutex<DiscoveryState>>) {
494 tokio::spawn(async move {
495 let mut live_nodes = {
496 let chitchat = chitchat.lock().await;
497 chitchat.live_nodes_watcher()
498 };
499
500 while live_nodes.changed().await.is_ok() {
501 let candidates = live_nodes
502 .borrow()
503 .iter()
504 .filter_map(|(chitchat_id, node_state)| {
505 candidate_from_node(chitchat_id, node_state)
506 })
507 .collect::<Vec<_>>();
508
509 let mut state = state.lock().expect("chitchat discovery state poisoned");
510 for candidate in candidates {
511 state
512 .events
513 .push(ClusterDiscoveryEvent::MemberLive(candidate.node_id.clone()));
514 state
515 .candidates
516 .insert(candidate.node_id.clone(), candidate);
517 }
518 }
519 });
520}
521
522fn record_candidate(state: Arc<Mutex<DiscoveryState>>, candidate: ClusterCandidate) {
523 let mut state = state.lock().expect("chitchat discovery state poisoned");
524 state
525 .events
526 .push(ClusterDiscoveryEvent::CandidateSeen(candidate.clone()));
527 state
528 .candidates
529 .insert(candidate.node_id.clone(), candidate);
530}
531
532fn record_leave_marker(
533 state: Arc<Mutex<DiscoveryState>>,
534 node_id: ClusterNodeId,
535 generation: ClusterGeneration,
536 role: ClusterRole,
537) {
538 let mut state = state.lock().expect("chitchat discovery state poisoned");
539 {
540 let candidate = state
541 .candidates
542 .entry(node_id.clone())
543 .or_insert_with(|| match role {
544 ClusterRole::Member => ClusterCandidate::member(node_id.clone()),
545 ClusterRole::Client => ClusterCandidate::client(node_id.clone()),
546 ClusterRole::Local => ClusterCandidate::client(node_id.clone()),
547 });
548 candidate.generation = generation;
549 candidate.role = role;
550 candidate
551 .metadata
552 .insert(METADATA_LIFECYCLE.to_owned(), LIFECYCLE_LEAVING.to_owned());
553 candidate.metadata.insert(
554 METADATA_LEFT_GENERATION.to_owned(),
555 generation.value().to_string(),
556 );
557 candidate
558 .metadata
559 .insert(METADATA_LEFT_ROLE.to_owned(), role_to_str(role).to_owned());
560 }
561 state.events.push(ClusterDiscoveryEvent::MemberLeaving {
562 node_id,
563 generation,
564 role,
565 });
566}
567
568fn candidate_from_node(
569 chitchat_id: &ChitchatId,
570 node_state: &NodeState,
571) -> Option<ClusterCandidate> {
572 let role = parse_role(node_state.get(KEY_ROLE)?)?;
573 let generation = parse_generation(node_state.get(KEY_GENERATION))
574 .unwrap_or_else(|| ClusterGeneration::new(chitchat_id.generation_id));
575 let mut candidate = match role {
576 ClusterRole::Member => ClusterCandidate::member(chitchat_id.node_id.to_string()),
577 ClusterRole::Client => ClusterCandidate::client(chitchat_id.node_id.to_string()),
578 ClusterRole::Local => return None,
579 }
580 .generation(generation)
581 .endpoints(ClusterEndpoints {
582 control: node_state.get(KEY_ENDPOINT_CONTROL).map(ToOwned::to_owned),
583 invalidation: node_state
584 .get(KEY_ENDPOINT_INVALIDATION)
585 .map(ToOwned::to_owned),
586 diagnostics: node_state
587 .get(KEY_ENDPOINT_DIAGNOSTICS)
588 .map(ToOwned::to_owned),
589 });
590
591 for (key, value) in node_state.key_values() {
592 if let Some(metadata_key) = key.strip_prefix(KEY_METADATA_PREFIX) {
593 candidate
594 .metadata
595 .insert(metadata_key.to_owned(), value.to_owned());
596 }
597 }
598 if let Some(lifecycle) = node_state.get(KEY_LIFECYCLE) {
599 candidate
600 .metadata
601 .insert(METADATA_LIFECYCLE.to_owned(), lifecycle.to_owned());
602 if lifecycle == LIFECYCLE_LEAVING {
603 copy_node_state_metadata(
604 node_state,
605 &mut candidate,
606 KEY_LEFT_GENERATION,
607 METADATA_LEFT_GENERATION,
608 );
609 copy_node_state_metadata(
610 node_state,
611 &mut candidate,
612 KEY_LEFT_ROLE,
613 METADATA_LEFT_ROLE,
614 );
615 }
616 }
617 candidate
618 .metadata
619 .entry("discovery.adapter".to_owned())
620 .or_insert_with(|| "chitchat".to_owned());
621 Some(candidate)
622}
623
624fn parse_generation(value: Option<&str>) -> Option<ClusterGeneration> {
625 value
626 .and_then(|value| value.parse::<u64>().ok())
627 .map(ClusterGeneration::new)
628}
629
630fn copy_node_state_metadata(
631 node_state: &NodeState,
632 candidate: &mut ClusterCandidate,
633 node_state_key: &str,
634 metadata_key: &str,
635) {
636 if let Some(value) = node_state.get(node_state_key) {
637 candidate
638 .metadata
639 .insert(metadata_key.to_owned(), value.to_owned());
640 }
641}
642
643fn role_to_str(role: ClusterRole) -> &'static str {
644 match role {
645 ClusterRole::Local => "local",
646 ClusterRole::Client => "client",
647 ClusterRole::Member => "member",
648 }
649}
650
651fn parse_role(value: &str) -> Option<ClusterRole> {
652 match value {
653 "client" => Some(ClusterRole::Client),
654 "member" => Some(ClusterRole::Member),
655 "local" => Some(ClusterRole::Local),
656 _ => None,
657 }
658}
659
660fn to_cache_error(error: impl std::fmt::Display) -> CacheError {
661 CacheError::Backend(format!("chitchat discovery failed: {error}"))
662}
663
664#[cfg(test)]
665mod tests {
666 use std::time::Instant;
667
668 use chitchat::transport::ChannelTransport;
669 use hydracache::{ClusterDiscovery, ClusterEndpoints};
670 use tokio::time::{sleep, timeout};
671
672 use super::*;
673
674 fn addr(port: u16) -> SocketAddr {
675 ([127, 0, 0, 1], port).into()
676 }
677
678 fn config(port: u16, node: &str) -> ChitchatDiscoveryConfig {
679 ChitchatDiscoveryConfig::new(
680 "orders",
681 node,
682 ClusterGeneration::new(port as u64),
683 addr(port),
684 )
685 .gossip_interval(Duration::from_millis(20))
686 }
687
688 #[test]
689 fn config_builds_chitchat_identity_with_generation() {
690 let config = ChitchatDiscoveryConfig::new(
691 "orders",
692 "member-a",
693 ClusterGeneration::new(42),
694 addr(47_001),
695 )
696 .seed_node("127.0.0.1:47000");
697
698 let id = config.chitchat_id();
699
700 assert_eq!(config.cluster_id(), "orders");
701 assert_eq!(config.node_id().as_str(), "member-a");
702 assert_eq!(config.generation(), ClusterGeneration::new(42));
703 assert_eq!(config.listen_addr(), addr(47_001));
704 assert_eq!(config.seed_nodes_value(), &["127.0.0.1:47000".to_owned()]);
705 assert_eq!(id.node_id.as_ref(), "member-a");
706 assert_eq!(id.generation_id, 42);
707 }
708
709 #[test]
710 fn config_builder_setters_feed_chitchat_config() {
711 let config = ChitchatDiscoveryConfig::new(
712 "orders",
713 "member-a",
714 ClusterGeneration::new(43),
715 addr(47_002),
716 )
717 .gossip_advertise_addr(addr(48_002))
718 .seed_node("127.0.0.1:47001")
719 .seed_nodes(["127.0.0.1:47002", "127.0.0.1:47003"])
720 .gossip_interval(Duration::from_millis(33))
721 .marked_for_deletion_grace_period(Duration::from_secs(7))
722 .failure_detector_config(FailureDetectorConfig::default());
723
724 let id = config.chitchat_id();
725 assert_eq!(id.node_id.as_ref(), "member-a");
726 assert_eq!(id.generation_id, 43);
727 assert_eq!(id.gossip_advertise_addr, addr(48_002));
728 assert_eq!(
729 config.seed_nodes_value(),
730 &["127.0.0.1:47002".to_owned(), "127.0.0.1:47003".to_owned()]
731 );
732
733 let chitchat_config = config.into_chitchat_config();
734 assert_eq!(chitchat_config.cluster_id, "orders");
735 assert_eq!(chitchat_config.gossip_interval, Duration::from_millis(33));
736 assert_eq!(
737 chitchat_config.marked_for_deletion_grace_period,
738 Duration::from_secs(7)
739 );
740 }
741
742 #[tokio::test]
743 async fn announce_writes_candidate_to_real_chitchat_state() {
744 let transport = ChannelTransport::default();
745 let discovery =
746 ChitchatDiscovery::spawn_with_transport(config(47_011, "member-a"), &transport)
747 .await
748 .unwrap();
749
750 discovery
751 .announce(
752 ClusterCandidate::member("member-a")
753 .generation(ClusterGeneration::new(47_011))
754 .endpoints(ClusterEndpoints::new().control("127.0.0.1:7000"))
755 .metadata("zone", "eu"),
756 )
757 .await
758 .unwrap();
759
760 assert_eq!(
761 discovery.local_value(KEY_ROLE).await.as_deref(),
762 Some("member")
763 );
764 assert_eq!(
765 discovery.local_value(KEY_ENDPOINT_CONTROL).await.as_deref(),
766 Some("127.0.0.1:7000")
767 );
768 assert_eq!(
769 discovery
770 .local_value(&format!("{KEY_METADATA_PREFIX}zone"))
771 .await
772 .as_deref(),
773 Some("eu")
774 );
775 assert_eq!(discovery.candidates().len(), 1);
776 assert!(matches!(
777 discovery.events().first(),
778 Some(ClusterDiscoveryEvent::CandidateSeen(candidate))
779 if candidate.node_id.as_str() == "member-a"
780 ));
781 }
782
783 #[tokio::test]
784 async fn leave_marker_is_written_to_local_chitchat_state() {
785 let transport = ChannelTransport::default();
786 let discovery =
787 ChitchatDiscovery::spawn_with_transport(config(47_012, "member-a"), &transport)
788 .await
789 .unwrap();
790
791 discovery
792 .announce(
793 ClusterCandidate::member("member-a").generation(ClusterGeneration::new(47_012)),
794 )
795 .await
796 .unwrap();
797 discovery
798 .mark_leaving(
799 "member-a",
800 ClusterGeneration::new(47_012),
801 ClusterRole::Member,
802 )
803 .await
804 .unwrap();
805
806 assert_eq!(
807 discovery.local_value(KEY_LIFECYCLE).await.as_deref(),
808 Some(LIFECYCLE_LEAVING)
809 );
810 assert_eq!(
811 discovery.local_value(KEY_LEFT_GENERATION).await.as_deref(),
812 Some("47012")
813 );
814 assert_eq!(
815 discovery.local_value(KEY_LEFT_ROLE).await.as_deref(),
816 Some("member")
817 );
818 let candidate = discovery
819 .candidates()
820 .into_iter()
821 .find(|candidate| candidate.node_id.as_str() == "member-a")
822 .expect("candidate should remain visible after graceful leave");
823 assert_eq!(
824 candidate
825 .metadata
826 .get(METADATA_LIFECYCLE)
827 .map(String::as_str),
828 Some(LIFECYCLE_LEAVING)
829 );
830 assert!(discovery.events().iter().any(|event| {
831 matches!(
832 event,
833 ClusterDiscoveryEvent::MemberLeaving { node_id, generation, role }
834 if node_id.as_str() == "member-a"
835 && *generation == ClusterGeneration::new(47_012)
836 && *role == ClusterRole::Member
837 )
838 }));
839 }
840
841 #[tokio::test]
842 async fn stale_leave_marker_cannot_overwrite_newer_generation() {
843 let transport = ChannelTransport::default();
844 let discovery =
845 ChitchatDiscovery::spawn_with_transport(config(47_013, "member-a"), &transport)
846 .await
847 .unwrap();
848
849 discovery
850 .announce(ClusterCandidate::member("member-a").generation(ClusterGeneration::new(3)))
851 .await
852 .unwrap();
853
854 let error = discovery
855 .mark_leaving("member-a", ClusterGeneration::new(2), ClusterRole::Member)
856 .await
857 .unwrap_err();
858
859 assert!(error
860 .to_string()
861 .contains("stale chitchat leave marker rejected"));
862 assert_eq!(
863 discovery.local_value(KEY_LIFECYCLE).await.as_deref(),
864 Some(LIFECYCLE_ACTIVE)
865 );
866 }
867
868 #[tokio::test]
869 async fn leave_marker_rejects_wrong_node_and_local_role() {
870 let transport = ChannelTransport::default();
871 let discovery =
872 ChitchatDiscovery::spawn_with_transport(config(47_017, "member-a"), &transport)
873 .await
874 .unwrap();
875
876 discovery
877 .announce(
878 ClusterCandidate::member("member-a").generation(ClusterGeneration::new(47_017)),
879 )
880 .await
881 .unwrap();
882
883 let wrong_node = discovery
884 .mark_leaving(
885 "member-b",
886 ClusterGeneration::new(47_017),
887 ClusterRole::Member,
888 )
889 .await
890 .unwrap_err();
891 assert!(wrong_node
892 .to_string()
893 .contains("can only be written by local node"));
894
895 let local_role = discovery
896 .mark_leaving(
897 "member-a",
898 ClusterGeneration::new(47_017),
899 ClusterRole::Local,
900 )
901 .await
902 .unwrap_err();
903 assert!(local_role
904 .to_string()
905 .contains("local caches do not publish"));
906 }
907
908 #[tokio::test]
909 async fn remote_discovery_observes_leave_marker_metadata() {
910 let transport = ChannelTransport::default();
911 let first = ChitchatDiscovery::spawn_with_transport(config(47_014, "member-a"), &transport)
912 .await
913 .unwrap();
914 let second = ChitchatDiscovery::spawn_with_transport(
915 config(47_015, "client-a").seed_node("127.0.0.1:47014"),
916 &transport,
917 )
918 .await
919 .unwrap();
920
921 first
922 .announce(
923 ClusterCandidate::member("member-a").generation(ClusterGeneration::new(47_014)),
924 )
925 .await
926 .unwrap();
927 first
928 .mark_leaving(
929 "member-a",
930 ClusterGeneration::new(47_014),
931 ClusterRole::Member,
932 )
933 .await
934 .unwrap();
935
936 first.gossip_once(addr(47_015)).unwrap();
937 second.gossip_once(addr(47_014)).unwrap();
938
939 wait_until(Duration::from_secs(2), || {
940 second.candidates().iter().any(|candidate| {
941 candidate.node_id.as_str() == "member-a"
942 && candidate
943 .metadata
944 .get(METADATA_LIFECYCLE)
945 .is_some_and(|value| value == LIFECYCLE_LEAVING)
946 })
947 })
948 .await;
949
950 let remote = second
951 .candidates()
952 .into_iter()
953 .find(|candidate| candidate.node_id.as_str() == "member-a")
954 .expect("remote candidate should be present");
955 assert_eq!(
956 remote
957 .metadata
958 .get(METADATA_LEFT_GENERATION)
959 .map(String::as_str),
960 Some("47014")
961 );
962 assert_eq!(
963 remote.metadata.get(METADATA_LEFT_ROLE).map(String::as_str),
964 Some("member")
965 );
966 }
967
968 #[tokio::test]
969 async fn newer_rejoin_supersedes_leave_marker() {
970 let transport = ChannelTransport::default();
971 let discovery =
972 ChitchatDiscovery::spawn_with_transport(config(47_016, "member-a"), &transport)
973 .await
974 .unwrap();
975
976 discovery
977 .announce(ClusterCandidate::member("member-a").generation(ClusterGeneration::new(2)))
978 .await
979 .unwrap();
980 discovery
981 .mark_leaving("member-a", ClusterGeneration::new(2), ClusterRole::Member)
982 .await
983 .unwrap();
984 discovery
985 .announce(ClusterCandidate::member("member-a").generation(ClusterGeneration::new(3)))
986 .await
987 .unwrap();
988
989 assert_eq!(
990 discovery.local_value(KEY_LIFECYCLE).await.as_deref(),
991 Some(LIFECYCLE_ACTIVE)
992 );
993 assert_eq!(
994 discovery.local_value(KEY_GENERATION).await.as_deref(),
995 Some("3")
996 );
997 let candidate = discovery
998 .candidates()
999 .into_iter()
1000 .find(|candidate| candidate.node_id.as_str() == "member-a")
1001 .expect("candidate should be visible after rejoin");
1002 assert_eq!(candidate.generation, ClusterGeneration::new(3));
1003 assert_eq!(
1004 candidate
1005 .metadata
1006 .get(METADATA_LIFECYCLE)
1007 .map(String::as_str),
1008 Some(LIFECYCLE_ACTIVE)
1009 );
1010 assert!(!candidate.metadata.contains_key(METADATA_LEFT_GENERATION));
1011 }
1012
1013 #[tokio::test]
1014 async fn chitchat_gossip_discovers_remote_candidate() {
1015 let transport = ChannelTransport::default();
1016 let first = ChitchatDiscovery::spawn_with_transport(config(47_021, "member-a"), &transport)
1017 .await
1018 .unwrap();
1019 let second = ChitchatDiscovery::spawn_with_transport(
1020 config(47_022, "client-a").seed_node("127.0.0.1:47021"),
1021 &transport,
1022 )
1023 .await
1024 .unwrap();
1025
1026 first
1027 .announce(
1028 ClusterCandidate::member("member-a").generation(ClusterGeneration::new(47_021)),
1029 )
1030 .await
1031 .unwrap();
1032 second
1033 .announce(
1034 ClusterCandidate::client("client-a").generation(ClusterGeneration::new(47_022)),
1035 )
1036 .await
1037 .unwrap();
1038 second.gossip_once(addr(47_021)).unwrap();
1039
1040 wait_until(Duration::from_secs(2), || {
1041 first
1042 .candidates()
1043 .iter()
1044 .any(|candidate| candidate.node_id.as_str() == "client-a")
1045 })
1046 .await;
1047
1048 let remote = first
1049 .candidates()
1050 .into_iter()
1051 .find(|candidate| candidate.node_id.as_str() == "client-a")
1052 .expect("remote candidate should be present");
1053 assert_eq!(remote.role, ClusterRole::Client);
1054 assert_eq!(remote.generation, ClusterGeneration::new(47_022));
1055 assert_eq!(
1056 remote.metadata.get("discovery.adapter").map(String::as_str),
1057 Some("chitchat")
1058 );
1059 assert!(format!("{first:?}").contains("ChitchatDiscovery"));
1060 }
1061
1062 async fn wait_until(timeout_after: Duration, mut condition: impl FnMut() -> bool) {
1063 timeout(timeout_after, async {
1064 let started = Instant::now();
1065 loop {
1066 if condition() {
1067 return;
1068 }
1069 assert!(started.elapsed() < timeout_after);
1070 sleep(Duration::from_millis(10)).await;
1071 }
1072 })
1073 .await
1074 .expect("condition should become true");
1075 }
1076}