Skip to main content

hydracache_cluster_chitchat/
lib.rs

1//! Chitchat-backed discovery adapter for HydraCache cluster mode.
2//!
3//! This crate is intentionally separate from `hydracache` so local-only users
4//! do not pay for gossip dependencies. `ChitchatDiscovery` implements
5//! [`hydracache::ClusterDiscovery`] and stores HydraCache candidate metadata in
6//! real `chitchat` node state.
7//!
8//! # Example
9//!
10//! ```no_run
11//! use std::net::SocketAddr;
12//! use std::sync::Arc;
13//! use std::time::Duration;
14//!
15//! use hydracache::{ClusterGeneration, HydraCache, InMemoryCluster};
16//! use hydracache_cluster_chitchat::{ChitchatDiscovery, ChitchatDiscoveryConfig};
17//!
18//! # #[tokio::main]
19//! # async fn main() -> hydracache::CacheResult<()> {
20//! let cluster = Arc::new(InMemoryCluster::new("orders"));
21//! let discovery = Arc::new(
22//!     ChitchatDiscovery::spawn_udp(
23//!         ChitchatDiscoveryConfig::new(
24//!             "orders",
25//!             "member-a",
26//!             ClusterGeneration::new(1),
27//!             "127.0.0.1:7000".parse::<SocketAddr>().unwrap(),
28//!         )
29//!         .gossip_interval(Duration::from_millis(200)),
30//!     )
31//!     .await?,
32//! );
33//!
34//! let member = HydraCache::member()
35//!     .shared_cluster(cluster)
36//!     .discovery(discovery)
37//!     .node_id("member-a")
38//!     .generation(ClusterGeneration::new(1))
39//!     .start()
40//!     .await?;
41//!
42//! assert!(member.cluster_discovery_diagnostics().unwrap().has_candidates());
43//! # Ok(())
44//! # }
45//! ```
46
47use std::collections::BTreeMap;
48use std::fmt;
49use std::net::SocketAddr;
50use std::sync::{Arc, Mutex};
51use std::time::Duration;
52
53use chitchat::transport::{Transport, UdpTransport};
54use chitchat::{
55    spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, FailureDetectorConfig,
56    NodeState,
57};
58use hydracache::{
59    CacheError, CacheResult, ClusterCandidate, ClusterDiscovery, ClusterDiscoveryEvent,
60    ClusterEndpoints, ClusterGeneration, ClusterNodeId, ClusterRole,
61};
62use tokio::sync::Mutex as TokioMutex;
63
64const KEY_ADAPTER: &str = "hydracache.discovery.adapter";
65const KEY_ROLE: &str = "hydracache.role";
66const KEY_GENERATION: &str = "hydracache.generation";
67const KEY_ENDPOINT_CONTROL: &str = "hydracache.endpoint.control";
68const KEY_ENDPOINT_INVALIDATION: &str = "hydracache.endpoint.invalidation";
69const KEY_ENDPOINT_DIAGNOSTICS: &str = "hydracache.endpoint.diagnostics";
70const KEY_LIFECYCLE: &str = "hydracache.lifecycle";
71const KEY_LEFT_GENERATION: &str = "hydracache.left.generation";
72const KEY_LEFT_ROLE: &str = "hydracache.left.role";
73const KEY_METADATA_PREFIX: &str = "hydracache.metadata.";
74
75const LIFECYCLE_ACTIVE: &str = "active";
76const LIFECYCLE_LEAVING: &str = "leaving";
77
78const METADATA_LIFECYCLE: &str = "lifecycle";
79const METADATA_LEFT_GENERATION: &str = "left.generation";
80const METADATA_LEFT_ROLE: &str = "left.role";
81
82/// Configuration for a chitchat-backed HydraCache discovery node.
83#[derive(Debug, Clone)]
84pub struct ChitchatDiscoveryConfig {
85    cluster_id: String,
86    node_id: ClusterNodeId,
87    generation: ClusterGeneration,
88    listen_addr: SocketAddr,
89    gossip_advertise_addr: SocketAddr,
90    seed_nodes: Vec<String>,
91    gossip_interval: Duration,
92    marked_for_deletion_grace_period: Duration,
93    failure_detector_config: FailureDetectorConfig,
94}
95
96impl ChitchatDiscoveryConfig {
97    /// Build a config using the same listen and advertised gossip address.
98    pub fn new(
99        cluster_id: impl Into<String>,
100        node_id: impl Into<ClusterNodeId>,
101        generation: ClusterGeneration,
102        listen_addr: SocketAddr,
103    ) -> Self {
104        Self {
105            cluster_id: cluster_id.into(),
106            node_id: node_id.into(),
107            generation,
108            listen_addr,
109            gossip_advertise_addr: listen_addr,
110            seed_nodes: Vec::new(),
111            gossip_interval: Duration::from_millis(250),
112            marked_for_deletion_grace_period: Duration::from_secs(60),
113            failure_detector_config: FailureDetectorConfig::default(),
114        }
115    }
116
117    /// Advertise a different gossip address than the local bind address.
118    pub fn gossip_advertise_addr(mut self, addr: SocketAddr) -> Self {
119        self.gossip_advertise_addr = addr;
120        self
121    }
122
123    /// Add one seed node address.
124    pub fn seed_node(mut self, seed: impl Into<String>) -> Self {
125        self.seed_nodes.push(seed.into());
126        self
127    }
128
129    /// Replace all seed node addresses.
130    pub fn seed_nodes<I, S>(mut self, seeds: I) -> Self
131    where
132        I: IntoIterator<Item = S>,
133        S: Into<String>,
134    {
135        self.seed_nodes = seeds.into_iter().map(Into::into).collect();
136        self
137    }
138
139    /// Set the gossip interval.
140    pub fn gossip_interval(mut self, interval: Duration) -> Self {
141        self.gossip_interval = interval;
142        self
143    }
144
145    /// Set the tombstone grace period for chitchat node-state keys.
146    pub fn marked_for_deletion_grace_period(mut self, period: Duration) -> Self {
147        self.marked_for_deletion_grace_period = period;
148        self
149    }
150
151    /// Set chitchat's failure detector configuration.
152    pub fn failure_detector_config(mut self, config: FailureDetectorConfig) -> Self {
153        self.failure_detector_config = config;
154        self
155    }
156
157    /// Return the logical HydraCache cluster id.
158    pub fn cluster_id(&self) -> &str {
159        &self.cluster_id
160    }
161
162    /// Return the stable HydraCache node id.
163    pub fn node_id(&self) -> &ClusterNodeId {
164        &self.node_id
165    }
166
167    /// Return the process generation advertised in chitchat.
168    pub fn generation(&self) -> ClusterGeneration {
169        self.generation
170    }
171
172    /// Return the UDP bind address.
173    pub fn listen_addr(&self) -> SocketAddr {
174        self.listen_addr
175    }
176
177    /// Return configured seed addresses.
178    pub fn seed_nodes_value(&self) -> &[String] {
179        &self.seed_nodes
180    }
181
182    fn chitchat_id(&self) -> ChitchatId {
183        ChitchatId::new(
184            self.node_id.as_str().to_owned(),
185            self.generation.value(),
186            self.gossip_advertise_addr,
187        )
188    }
189
190    fn into_chitchat_config(self) -> ChitchatConfig {
191        ChitchatConfig {
192            chitchat_id: self.chitchat_id(),
193            cluster_id: self.cluster_id,
194            gossip_interval: self.gossip_interval,
195            listen_addr: self.listen_addr,
196            seed_nodes: self.seed_nodes,
197            failure_detector_config: self.failure_detector_config,
198            marked_for_deletion_grace_period: self.marked_for_deletion_grace_period,
199            catchup_callback: None,
200            extra_liveness_predicate: None,
201        }
202    }
203}
204
205#[derive(Debug, Default)]
206struct DiscoveryState {
207    candidates: BTreeMap<ClusterNodeId, ClusterCandidate>,
208    events: Vec<ClusterDiscoveryEvent>,
209}
210
211/// Real chitchat-backed implementation of [`ClusterDiscovery`].
212pub struct ChitchatDiscovery {
213    chitchat_id: ChitchatId,
214    chitchat: Arc<TokioMutex<Chitchat>>,
215    handle: ChitchatHandle,
216    state: Arc<Mutex<DiscoveryState>>,
217}
218
219impl fmt::Debug for ChitchatDiscovery {
220    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
221        formatter
222            .debug_struct("ChitchatDiscovery")
223            .field("chitchat_id", &self.chitchat_id)
224            .field("candidate_count", &self.candidates().len())
225            .field("event_count", &self.events().len())
226            .finish_non_exhaustive()
227    }
228}
229
230impl Drop for ChitchatDiscovery {
231    fn drop(&mut self) {
232        self.handle.abort();
233    }
234}
235
236impl ChitchatDiscovery {
237    /// Spawn a discovery node using chitchat's UDP transport.
238    pub async fn spawn_udp(config: ChitchatDiscoveryConfig) -> CacheResult<Self> {
239        Self::spawn_with_transport(config, &UdpTransport).await
240    }
241
242    /// Spawn a discovery node using a caller-provided chitchat transport.
243    ///
244    /// Tests can use `chitchat::transport::ChannelTransport`; production code
245    /// usually uses [`spawn_udp`](Self::spawn_udp).
246    pub async fn spawn_with_transport(
247        config: ChitchatDiscoveryConfig,
248        transport: &dyn Transport,
249    ) -> CacheResult<Self> {
250        let handle = spawn_chitchat(
251            config.into_chitchat_config(),
252            vec![(KEY_ADAPTER.to_owned(), "chitchat".to_owned())],
253            transport,
254        )
255        .await
256        .map_err(to_cache_error)?;
257        let chitchat_id = handle.chitchat_id().clone();
258        let chitchat = handle.chitchat();
259        let state = Arc::new(Mutex::new(DiscoveryState::default()));
260
261        spawn_live_node_watcher(chitchat.clone(), state.clone());
262
263        Ok(Self {
264            chitchat_id,
265            chitchat,
266            handle,
267            state,
268        })
269    }
270
271    /// Return this node's chitchat id.
272    pub fn chitchat_id(&self) -> &ChitchatId {
273        &self.chitchat_id
274    }
275
276    /// Return the latest known candidates.
277    pub fn candidates(&self) -> Vec<ClusterCandidate> {
278        self.state
279            .lock()
280            .expect("chitchat discovery state poisoned")
281            .candidates
282            .values()
283            .cloned()
284            .collect()
285    }
286
287    /// Return observed discovery events.
288    pub fn events(&self) -> Vec<ClusterDiscoveryEvent> {
289        self.state
290            .lock()
291            .expect("chitchat discovery state poisoned")
292            .events
293            .clone()
294    }
295
296    /// Ask this node to gossip with a specific peer immediately.
297    pub fn gossip_once(&self, addr: SocketAddr) -> CacheResult<()> {
298        self.handle.gossip(addr).map_err(to_cache_error)
299    }
300
301    /// Return one local chitchat key-value for diagnostics and tests.
302    pub async fn local_value(&self, key: &str) -> Option<String> {
303        self.chitchat
304            .lock()
305            .await
306            .self_node_state()
307            .get(key)
308            .map(ToOwned::to_owned)
309    }
310
311    /// Publish a generation-safe graceful-leave marker in local chitchat state.
312    ///
313    /// The marker is advisory discovery metadata. Authoritative leave still
314    /// belongs to the configured HydraCache control plane, but remote discovery
315    /// nodes can observe this marker and distinguish an intentional leave from
316    /// ordinary suspect/dead failure detection.
317    ///
318    /// ```no_run
319    /// # use std::net::SocketAddr;
320    /// # use hydracache::{ClusterGeneration, ClusterRole};
321    /// # use hydracache_cluster_chitchat::{ChitchatDiscovery, ChitchatDiscoveryConfig};
322    /// # #[tokio::main]
323    /// # async fn main() -> hydracache::CacheResult<()> {
324    /// let discovery = ChitchatDiscovery::spawn_udp(
325    ///     ChitchatDiscoveryConfig::new(
326    ///         "orders",
327    ///         "member-a",
328    ///         ClusterGeneration::new(7),
329    ///         "127.0.0.1:7000".parse::<SocketAddr>().unwrap(),
330    ///     ),
331    /// )
332    /// .await?;
333    ///
334    /// discovery
335    ///     .mark_leaving("member-a", ClusterGeneration::new(7), ClusterRole::Member)
336    ///     .await?;
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub async fn mark_leaving(
341        &self,
342        node_id: impl Into<ClusterNodeId>,
343        generation: ClusterGeneration,
344        role: ClusterRole,
345    ) -> CacheResult<()> {
346        let node_id = node_id.into();
347        if node_id.as_str() != self.chitchat_id.node_id.as_ref() {
348            return Err(CacheError::Backend(format!(
349                "chitchat leave marker can only be written by local node {}; attempted {}",
350                self.chitchat_id.node_id, node_id
351            )));
352        }
353        if role == ClusterRole::Local {
354            return Err(CacheError::Backend(
355                "local caches do not publish chitchat leave markers".to_owned(),
356            ));
357        }
358
359        let mut chitchat = self.chitchat.lock().await;
360        let node_state = chitchat.self_node_state();
361        reject_stale_leave_generation(node_state, generation)?;
362
363        node_state.set(KEY_ROLE, role_to_str(role));
364        node_state.set(KEY_GENERATION, generation.value().to_string());
365        node_state.set(KEY_LIFECYCLE, LIFECYCLE_LEAVING);
366        node_state.set(KEY_LEFT_GENERATION, generation.value().to_string());
367        node_state.set(KEY_LEFT_ROLE, role_to_str(role));
368
369        record_leave_marker(self.state.clone(), node_id, generation, role);
370        Ok(())
371    }
372
373    async fn announce_candidate(&self, mut candidate: ClusterCandidate) -> CacheResult<()> {
374        candidate
375            .metadata
376            .entry("discovery.adapter".to_owned())
377            .or_insert_with(|| "chitchat".to_owned());
378        candidate
379            .metadata
380            .insert(METADATA_LIFECYCLE.to_owned(), LIFECYCLE_ACTIVE.to_owned());
381        candidate.metadata.remove(METADATA_LEFT_GENERATION);
382        candidate.metadata.remove(METADATA_LEFT_ROLE);
383
384        self.chitchat
385            .lock()
386            .await
387            .self_node_state()
388            .set(KEY_ADAPTER, "chitchat");
389        write_candidate_to_chitchat(self.chitchat.clone(), &candidate).await;
390        record_candidate(self.state.clone(), candidate);
391        Ok(())
392    }
393
394    fn push_event(&self, event: ClusterDiscoveryEvent) {
395        self.state
396            .lock()
397            .expect("chitchat discovery state poisoned")
398            .events
399            .push(event);
400    }
401}
402
403#[async_trait::async_trait]
404impl ClusterDiscovery for ChitchatDiscovery {
405    async fn announce(&self, candidate: ClusterCandidate) -> CacheResult<()> {
406        self.announce_candidate(candidate).await
407    }
408
409    async fn mark_live(&self, node_id: ClusterNodeId) -> CacheResult<()> {
410        self.push_event(ClusterDiscoveryEvent::MemberLive(node_id));
411        Ok(())
412    }
413
414    async fn mark_suspect(&self, node_id: ClusterNodeId) -> CacheResult<()> {
415        self.push_event(ClusterDiscoveryEvent::MemberSuspect(node_id));
416        Ok(())
417    }
418
419    async fn mark_dead(&self, node_id: ClusterNodeId) -> CacheResult<()> {
420        self.push_event(ClusterDiscoveryEvent::MemberDead(node_id));
421        Ok(())
422    }
423
424    fn candidates(&self) -> Vec<ClusterCandidate> {
425        ChitchatDiscovery::candidates(self)
426    }
427
428    fn events(&self) -> Vec<ClusterDiscoveryEvent> {
429        ChitchatDiscovery::events(self)
430    }
431}
432
433async fn write_candidate_to_chitchat(
434    chitchat: Arc<TokioMutex<Chitchat>>,
435    candidate: &ClusterCandidate,
436) {
437    let mut chitchat = chitchat.lock().await;
438    let node_state = chitchat.self_node_state();
439    node_state.set(KEY_ROLE, role_to_str(candidate.role));
440    node_state.set(KEY_GENERATION, candidate.generation.value().to_string());
441    node_state.set(KEY_LIFECYCLE, LIFECYCLE_ACTIVE);
442    set_optional(
443        node_state,
444        KEY_ENDPOINT_CONTROL,
445        candidate.endpoints.control.as_deref(),
446    );
447    set_optional(
448        node_state,
449        KEY_ENDPOINT_INVALIDATION,
450        candidate.endpoints.invalidation.as_deref(),
451    );
452    set_optional(
453        node_state,
454        KEY_ENDPOINT_DIAGNOSTICS,
455        candidate.endpoints.diagnostics.as_deref(),
456    );
457    for (key, value) in &candidate.metadata {
458        node_state.set(format!("{KEY_METADATA_PREFIX}{key}"), value);
459    }
460}
461
462fn reject_stale_leave_generation(
463    node_state: &NodeState,
464    generation: ClusterGeneration,
465) -> CacheResult<()> {
466    if let Some(active_generation) = parse_generation(node_state.get(KEY_GENERATION)) {
467        if generation < active_generation {
468            return Err(CacheError::Backend(format!(
469                "stale chitchat leave marker rejected: marker generation {} is older than active generation {}",
470                generation.value(),
471                active_generation.value()
472            )));
473        }
474    }
475    if let Some(left_generation) = parse_generation(node_state.get(KEY_LEFT_GENERATION)) {
476        if generation < left_generation {
477            return Err(CacheError::Backend(format!(
478                "stale chitchat leave marker rejected: marker generation {} is older than previous leave generation {}",
479                generation.value(),
480                left_generation.value()
481            )));
482        }
483    }
484    Ok(())
485}
486
487fn set_optional(node_state: &mut NodeState, key: &str, value: Option<&str>) {
488    if let Some(value) = value {
489        node_state.set(key, value);
490    }
491}
492
493fn spawn_live_node_watcher(chitchat: Arc<TokioMutex<Chitchat>>, state: Arc<Mutex<DiscoveryState>>) {
494    tokio::spawn(async move {
495        let mut live_nodes = {
496            let chitchat = chitchat.lock().await;
497            chitchat.live_nodes_watcher()
498        };
499
500        while live_nodes.changed().await.is_ok() {
501            let candidates = live_nodes
502                .borrow()
503                .iter()
504                .filter_map(|(chitchat_id, node_state)| {
505                    candidate_from_node(chitchat_id, node_state)
506                })
507                .collect::<Vec<_>>();
508
509            let mut state = state.lock().expect("chitchat discovery state poisoned");
510            for candidate in candidates {
511                state
512                    .events
513                    .push(ClusterDiscoveryEvent::MemberLive(candidate.node_id.clone()));
514                state
515                    .candidates
516                    .insert(candidate.node_id.clone(), candidate);
517            }
518        }
519    });
520}
521
522fn record_candidate(state: Arc<Mutex<DiscoveryState>>, candidate: ClusterCandidate) {
523    let mut state = state.lock().expect("chitchat discovery state poisoned");
524    state
525        .events
526        .push(ClusterDiscoveryEvent::CandidateSeen(candidate.clone()));
527    state
528        .candidates
529        .insert(candidate.node_id.clone(), candidate);
530}
531
532fn record_leave_marker(
533    state: Arc<Mutex<DiscoveryState>>,
534    node_id: ClusterNodeId,
535    generation: ClusterGeneration,
536    role: ClusterRole,
537) {
538    let mut state = state.lock().expect("chitchat discovery state poisoned");
539    {
540        let candidate = state
541            .candidates
542            .entry(node_id.clone())
543            .or_insert_with(|| match role {
544                ClusterRole::Member => ClusterCandidate::member(node_id.clone()),
545                ClusterRole::Client => ClusterCandidate::client(node_id.clone()),
546                ClusterRole::Local => ClusterCandidate::client(node_id.clone()),
547            });
548        candidate.generation = generation;
549        candidate.role = role;
550        candidate
551            .metadata
552            .insert(METADATA_LIFECYCLE.to_owned(), LIFECYCLE_LEAVING.to_owned());
553        candidate.metadata.insert(
554            METADATA_LEFT_GENERATION.to_owned(),
555            generation.value().to_string(),
556        );
557        candidate
558            .metadata
559            .insert(METADATA_LEFT_ROLE.to_owned(), role_to_str(role).to_owned());
560    }
561    state.events.push(ClusterDiscoveryEvent::MemberLeaving {
562        node_id,
563        generation,
564        role,
565    });
566}
567
568fn candidate_from_node(
569    chitchat_id: &ChitchatId,
570    node_state: &NodeState,
571) -> Option<ClusterCandidate> {
572    let role = parse_role(node_state.get(KEY_ROLE)?)?;
573    let generation = parse_generation(node_state.get(KEY_GENERATION))
574        .unwrap_or_else(|| ClusterGeneration::new(chitchat_id.generation_id));
575    let mut candidate = match role {
576        ClusterRole::Member => ClusterCandidate::member(chitchat_id.node_id.to_string()),
577        ClusterRole::Client => ClusterCandidate::client(chitchat_id.node_id.to_string()),
578        ClusterRole::Local => return None,
579    }
580    .generation(generation)
581    .endpoints(ClusterEndpoints {
582        control: node_state.get(KEY_ENDPOINT_CONTROL).map(ToOwned::to_owned),
583        invalidation: node_state
584            .get(KEY_ENDPOINT_INVALIDATION)
585            .map(ToOwned::to_owned),
586        diagnostics: node_state
587            .get(KEY_ENDPOINT_DIAGNOSTICS)
588            .map(ToOwned::to_owned),
589    });
590
591    for (key, value) in node_state.key_values() {
592        if let Some(metadata_key) = key.strip_prefix(KEY_METADATA_PREFIX) {
593            candidate
594                .metadata
595                .insert(metadata_key.to_owned(), value.to_owned());
596        }
597    }
598    if let Some(lifecycle) = node_state.get(KEY_LIFECYCLE) {
599        candidate
600            .metadata
601            .insert(METADATA_LIFECYCLE.to_owned(), lifecycle.to_owned());
602        if lifecycle == LIFECYCLE_LEAVING {
603            copy_node_state_metadata(
604                node_state,
605                &mut candidate,
606                KEY_LEFT_GENERATION,
607                METADATA_LEFT_GENERATION,
608            );
609            copy_node_state_metadata(
610                node_state,
611                &mut candidate,
612                KEY_LEFT_ROLE,
613                METADATA_LEFT_ROLE,
614            );
615        }
616    }
617    candidate
618        .metadata
619        .entry("discovery.adapter".to_owned())
620        .or_insert_with(|| "chitchat".to_owned());
621    Some(candidate)
622}
623
624fn parse_generation(value: Option<&str>) -> Option<ClusterGeneration> {
625    value
626        .and_then(|value| value.parse::<u64>().ok())
627        .map(ClusterGeneration::new)
628}
629
630fn copy_node_state_metadata(
631    node_state: &NodeState,
632    candidate: &mut ClusterCandidate,
633    node_state_key: &str,
634    metadata_key: &str,
635) {
636    if let Some(value) = node_state.get(node_state_key) {
637        candidate
638            .metadata
639            .insert(metadata_key.to_owned(), value.to_owned());
640    }
641}
642
643fn role_to_str(role: ClusterRole) -> &'static str {
644    match role {
645        ClusterRole::Local => "local",
646        ClusterRole::Client => "client",
647        ClusterRole::Member => "member",
648    }
649}
650
651fn parse_role(value: &str) -> Option<ClusterRole> {
652    match value {
653        "client" => Some(ClusterRole::Client),
654        "member" => Some(ClusterRole::Member),
655        "local" => Some(ClusterRole::Local),
656        _ => None,
657    }
658}
659
660fn to_cache_error(error: impl std::fmt::Display) -> CacheError {
661    CacheError::Backend(format!("chitchat discovery failed: {error}"))
662}
663
664#[cfg(test)]
665mod tests {
666    use std::time::Instant;
667
668    use chitchat::transport::ChannelTransport;
669    use hydracache::{ClusterDiscovery, ClusterEndpoints};
670    use tokio::time::{sleep, timeout};
671
672    use super::*;
673
674    fn addr(port: u16) -> SocketAddr {
675        ([127, 0, 0, 1], port).into()
676    }
677
678    fn config(port: u16, node: &str) -> ChitchatDiscoveryConfig {
679        ChitchatDiscoveryConfig::new(
680            "orders",
681            node,
682            ClusterGeneration::new(port as u64),
683            addr(port),
684        )
685        .gossip_interval(Duration::from_millis(20))
686    }
687
688    #[test]
689    fn config_builds_chitchat_identity_with_generation() {
690        let config = ChitchatDiscoveryConfig::new(
691            "orders",
692            "member-a",
693            ClusterGeneration::new(42),
694            addr(47_001),
695        )
696        .seed_node("127.0.0.1:47000");
697
698        let id = config.chitchat_id();
699
700        assert_eq!(config.cluster_id(), "orders");
701        assert_eq!(config.node_id().as_str(), "member-a");
702        assert_eq!(config.generation(), ClusterGeneration::new(42));
703        assert_eq!(config.listen_addr(), addr(47_001));
704        assert_eq!(config.seed_nodes_value(), &["127.0.0.1:47000".to_owned()]);
705        assert_eq!(id.node_id.as_ref(), "member-a");
706        assert_eq!(id.generation_id, 42);
707    }
708
709    #[test]
710    fn config_builder_setters_feed_chitchat_config() {
711        let config = ChitchatDiscoveryConfig::new(
712            "orders",
713            "member-a",
714            ClusterGeneration::new(43),
715            addr(47_002),
716        )
717        .gossip_advertise_addr(addr(48_002))
718        .seed_node("127.0.0.1:47001")
719        .seed_nodes(["127.0.0.1:47002", "127.0.0.1:47003"])
720        .gossip_interval(Duration::from_millis(33))
721        .marked_for_deletion_grace_period(Duration::from_secs(7))
722        .failure_detector_config(FailureDetectorConfig::default());
723
724        let id = config.chitchat_id();
725        assert_eq!(id.node_id.as_ref(), "member-a");
726        assert_eq!(id.generation_id, 43);
727        assert_eq!(id.gossip_advertise_addr, addr(48_002));
728        assert_eq!(
729            config.seed_nodes_value(),
730            &["127.0.0.1:47002".to_owned(), "127.0.0.1:47003".to_owned()]
731        );
732
733        let chitchat_config = config.into_chitchat_config();
734        assert_eq!(chitchat_config.cluster_id, "orders");
735        assert_eq!(chitchat_config.gossip_interval, Duration::from_millis(33));
736        assert_eq!(
737            chitchat_config.marked_for_deletion_grace_period,
738            Duration::from_secs(7)
739        );
740    }
741
742    #[tokio::test]
743    async fn announce_writes_candidate_to_real_chitchat_state() {
744        let transport = ChannelTransport::default();
745        let discovery =
746            ChitchatDiscovery::spawn_with_transport(config(47_011, "member-a"), &transport)
747                .await
748                .unwrap();
749
750        discovery
751            .announce(
752                ClusterCandidate::member("member-a")
753                    .generation(ClusterGeneration::new(47_011))
754                    .endpoints(ClusterEndpoints::new().control("127.0.0.1:7000"))
755                    .metadata("zone", "eu"),
756            )
757            .await
758            .unwrap();
759
760        assert_eq!(
761            discovery.local_value(KEY_ROLE).await.as_deref(),
762            Some("member")
763        );
764        assert_eq!(
765            discovery.local_value(KEY_ENDPOINT_CONTROL).await.as_deref(),
766            Some("127.0.0.1:7000")
767        );
768        assert_eq!(
769            discovery
770                .local_value(&format!("{KEY_METADATA_PREFIX}zone"))
771                .await
772                .as_deref(),
773            Some("eu")
774        );
775        assert_eq!(discovery.candidates().len(), 1);
776        assert!(matches!(
777            discovery.events().first(),
778            Some(ClusterDiscoveryEvent::CandidateSeen(candidate))
779                if candidate.node_id.as_str() == "member-a"
780        ));
781    }
782
783    #[tokio::test]
784    async fn leave_marker_is_written_to_local_chitchat_state() {
785        let transport = ChannelTransport::default();
786        let discovery =
787            ChitchatDiscovery::spawn_with_transport(config(47_012, "member-a"), &transport)
788                .await
789                .unwrap();
790
791        discovery
792            .announce(
793                ClusterCandidate::member("member-a").generation(ClusterGeneration::new(47_012)),
794            )
795            .await
796            .unwrap();
797        discovery
798            .mark_leaving(
799                "member-a",
800                ClusterGeneration::new(47_012),
801                ClusterRole::Member,
802            )
803            .await
804            .unwrap();
805
806        assert_eq!(
807            discovery.local_value(KEY_LIFECYCLE).await.as_deref(),
808            Some(LIFECYCLE_LEAVING)
809        );
810        assert_eq!(
811            discovery.local_value(KEY_LEFT_GENERATION).await.as_deref(),
812            Some("47012")
813        );
814        assert_eq!(
815            discovery.local_value(KEY_LEFT_ROLE).await.as_deref(),
816            Some("member")
817        );
818        let candidate = discovery
819            .candidates()
820            .into_iter()
821            .find(|candidate| candidate.node_id.as_str() == "member-a")
822            .expect("candidate should remain visible after graceful leave");
823        assert_eq!(
824            candidate
825                .metadata
826                .get(METADATA_LIFECYCLE)
827                .map(String::as_str),
828            Some(LIFECYCLE_LEAVING)
829        );
830        assert!(discovery.events().iter().any(|event| {
831            matches!(
832                event,
833                ClusterDiscoveryEvent::MemberLeaving { node_id, generation, role }
834                    if node_id.as_str() == "member-a"
835                        && *generation == ClusterGeneration::new(47_012)
836                        && *role == ClusterRole::Member
837            )
838        }));
839    }
840
841    #[tokio::test]
842    async fn stale_leave_marker_cannot_overwrite_newer_generation() {
843        let transport = ChannelTransport::default();
844        let discovery =
845            ChitchatDiscovery::spawn_with_transport(config(47_013, "member-a"), &transport)
846                .await
847                .unwrap();
848
849        discovery
850            .announce(ClusterCandidate::member("member-a").generation(ClusterGeneration::new(3)))
851            .await
852            .unwrap();
853
854        let error = discovery
855            .mark_leaving("member-a", ClusterGeneration::new(2), ClusterRole::Member)
856            .await
857            .unwrap_err();
858
859        assert!(error
860            .to_string()
861            .contains("stale chitchat leave marker rejected"));
862        assert_eq!(
863            discovery.local_value(KEY_LIFECYCLE).await.as_deref(),
864            Some(LIFECYCLE_ACTIVE)
865        );
866    }
867
868    #[tokio::test]
869    async fn leave_marker_rejects_wrong_node_and_local_role() {
870        let transport = ChannelTransport::default();
871        let discovery =
872            ChitchatDiscovery::spawn_with_transport(config(47_017, "member-a"), &transport)
873                .await
874                .unwrap();
875
876        discovery
877            .announce(
878                ClusterCandidate::member("member-a").generation(ClusterGeneration::new(47_017)),
879            )
880            .await
881            .unwrap();
882
883        let wrong_node = discovery
884            .mark_leaving(
885                "member-b",
886                ClusterGeneration::new(47_017),
887                ClusterRole::Member,
888            )
889            .await
890            .unwrap_err();
891        assert!(wrong_node
892            .to_string()
893            .contains("can only be written by local node"));
894
895        let local_role = discovery
896            .mark_leaving(
897                "member-a",
898                ClusterGeneration::new(47_017),
899                ClusterRole::Local,
900            )
901            .await
902            .unwrap_err();
903        assert!(local_role
904            .to_string()
905            .contains("local caches do not publish"));
906    }
907
908    #[tokio::test]
909    async fn remote_discovery_observes_leave_marker_metadata() {
910        let transport = ChannelTransport::default();
911        let first = ChitchatDiscovery::spawn_with_transport(config(47_014, "member-a"), &transport)
912            .await
913            .unwrap();
914        let second = ChitchatDiscovery::spawn_with_transport(
915            config(47_015, "client-a").seed_node("127.0.0.1:47014"),
916            &transport,
917        )
918        .await
919        .unwrap();
920
921        first
922            .announce(
923                ClusterCandidate::member("member-a").generation(ClusterGeneration::new(47_014)),
924            )
925            .await
926            .unwrap();
927        first
928            .mark_leaving(
929                "member-a",
930                ClusterGeneration::new(47_014),
931                ClusterRole::Member,
932            )
933            .await
934            .unwrap();
935
936        first.gossip_once(addr(47_015)).unwrap();
937        second.gossip_once(addr(47_014)).unwrap();
938
939        wait_until(Duration::from_secs(2), || {
940            second.candidates().iter().any(|candidate| {
941                candidate.node_id.as_str() == "member-a"
942                    && candidate
943                        .metadata
944                        .get(METADATA_LIFECYCLE)
945                        .is_some_and(|value| value == LIFECYCLE_LEAVING)
946            })
947        })
948        .await;
949
950        let remote = second
951            .candidates()
952            .into_iter()
953            .find(|candidate| candidate.node_id.as_str() == "member-a")
954            .expect("remote candidate should be present");
955        assert_eq!(
956            remote
957                .metadata
958                .get(METADATA_LEFT_GENERATION)
959                .map(String::as_str),
960            Some("47014")
961        );
962        assert_eq!(
963            remote.metadata.get(METADATA_LEFT_ROLE).map(String::as_str),
964            Some("member")
965        );
966    }
967
968    #[tokio::test]
969    async fn newer_rejoin_supersedes_leave_marker() {
970        let transport = ChannelTransport::default();
971        let discovery =
972            ChitchatDiscovery::spawn_with_transport(config(47_016, "member-a"), &transport)
973                .await
974                .unwrap();
975
976        discovery
977            .announce(ClusterCandidate::member("member-a").generation(ClusterGeneration::new(2)))
978            .await
979            .unwrap();
980        discovery
981            .mark_leaving("member-a", ClusterGeneration::new(2), ClusterRole::Member)
982            .await
983            .unwrap();
984        discovery
985            .announce(ClusterCandidate::member("member-a").generation(ClusterGeneration::new(3)))
986            .await
987            .unwrap();
988
989        assert_eq!(
990            discovery.local_value(KEY_LIFECYCLE).await.as_deref(),
991            Some(LIFECYCLE_ACTIVE)
992        );
993        assert_eq!(
994            discovery.local_value(KEY_GENERATION).await.as_deref(),
995            Some("3")
996        );
997        let candidate = discovery
998            .candidates()
999            .into_iter()
1000            .find(|candidate| candidate.node_id.as_str() == "member-a")
1001            .expect("candidate should be visible after rejoin");
1002        assert_eq!(candidate.generation, ClusterGeneration::new(3));
1003        assert_eq!(
1004            candidate
1005                .metadata
1006                .get(METADATA_LIFECYCLE)
1007                .map(String::as_str),
1008            Some(LIFECYCLE_ACTIVE)
1009        );
1010        assert!(!candidate.metadata.contains_key(METADATA_LEFT_GENERATION));
1011    }
1012
1013    #[tokio::test]
1014    async fn chitchat_gossip_discovers_remote_candidate() {
1015        let transport = ChannelTransport::default();
1016        let first = ChitchatDiscovery::spawn_with_transport(config(47_021, "member-a"), &transport)
1017            .await
1018            .unwrap();
1019        let second = ChitchatDiscovery::spawn_with_transport(
1020            config(47_022, "client-a").seed_node("127.0.0.1:47021"),
1021            &transport,
1022        )
1023        .await
1024        .unwrap();
1025
1026        first
1027            .announce(
1028                ClusterCandidate::member("member-a").generation(ClusterGeneration::new(47_021)),
1029            )
1030            .await
1031            .unwrap();
1032        second
1033            .announce(
1034                ClusterCandidate::client("client-a").generation(ClusterGeneration::new(47_022)),
1035            )
1036            .await
1037            .unwrap();
1038        second.gossip_once(addr(47_021)).unwrap();
1039
1040        wait_until(Duration::from_secs(2), || {
1041            first
1042                .candidates()
1043                .iter()
1044                .any(|candidate| candidate.node_id.as_str() == "client-a")
1045        })
1046        .await;
1047
1048        let remote = first
1049            .candidates()
1050            .into_iter()
1051            .find(|candidate| candidate.node_id.as_str() == "client-a")
1052            .expect("remote candidate should be present");
1053        assert_eq!(remote.role, ClusterRole::Client);
1054        assert_eq!(remote.generation, ClusterGeneration::new(47_022));
1055        assert_eq!(
1056            remote.metadata.get("discovery.adapter").map(String::as_str),
1057            Some("chitchat")
1058        );
1059        assert!(format!("{first:?}").contains("ChitchatDiscovery"));
1060    }
1061
1062    async fn wait_until(timeout_after: Duration, mut condition: impl FnMut() -> bool) {
1063        timeout(timeout_after, async {
1064            let started = Instant::now();
1065            loop {
1066                if condition() {
1067                    return;
1068                }
1069                assert!(started.elapsed() < timeout_after);
1070                sleep(Duration::from_millis(10)).await;
1071            }
1072        })
1073        .await
1074        .expect("condition should become true");
1075    }
1076}