datacake_chitchat_fork/
lib.rs

1#![allow(clippy::type_complexity)]
2#![allow(clippy::derive_partial_eq_without_eq)]
3
4mod server;
5
6mod configuration;
7mod delta;
8mod digest;
9mod failure_detector;
10mod message;
11pub mod serialize;
12mod state;
13pub mod transport;
14
15use std::collections::HashSet;
16use std::net::SocketAddr;
17
18use delta::Delta;
19use failure_detector::FailureDetector;
20pub use failure_detector::FailureDetectorConfig;
21use serde::{Deserialize, Serialize};
22use tokio::sync::watch;
23use tokio_stream::wrappers::WatchStream;
24use tracing::{debug, error, warn};
25
26pub use self::configuration::ChitchatConfig;
27pub use self::state::{ClusterStateSnapshot, NodeState};
28use crate::digest::Digest;
29use crate::message::syn_ack_serialized_len;
30pub use crate::message::ChitchatMessage;
31pub use crate::server::{spawn_chitchat, ChitchatHandle};
32use crate::state::ClusterState;
33
34/// Map key for the heartbeat node value.
35pub(crate) const HEARTBEAT_KEY: &str = "heartbeat";
36
37/// Maximum payload size (in bytes) for UDP.
38///
39/// Note 60KB typically won't fit in a UDP frame,
40/// so long message will be sent over several frame.
41///
42/// We pick a large MTU because at the moment
43/// we send the self digest "in full".
44/// A frame of 1400B would limit us to 20 nodes
45/// or so.
46const MTU: usize = 60_000;
47
48pub type Version = u64;
49
50/// [`NodeId`] represents a Chitchat Node identifier.
51///
52/// For the lifetime of a cluster, nodes can go down and back up, they may
53/// permanently die. These are couple of issues we want to solve with [`NodeId`] struct:
54/// - We want a fresh local chitchat state for every run of a node.
55/// - We don’t want other nodes to override a newly started node state with an obsolete state.
56/// - We want other running nodes to detect that a newly started node’s state prevails all its
57///   previous state.
58/// - We want a node to advertise its own gossip address.
59/// - We want a node to have an id that is the same across subsequent runs for keeping cache data
60///   around as long as possible.
61///
62/// Our solution to this is:
63/// - The `id` attribute which represents the node's unique identifier in the cluster should be
64///   dynamic on every run. This easily solves our first three requirements. The tradeoff is that
65///   starting node need to always propagate their fresh state and old states are never reclaimed.
66/// - Having `gossip_public_address` attribute fulfils our fourth requirements, its value is
67///   expected to be from a config item or an environnement variable.
68/// - Making part of the `id` attribute static and related to the node solves the last requirement.
69///
70/// Because Chitchat instance is not concerned about caching strategy and what needs to be
71/// cached, We let the client decide what makes up the `id` attribute and how to extract its
72/// components.
73///
74/// One such client is Quickwit where the `id` is made of
75/// `{node_unique_id}/{node_generation}/`.
76/// - node_unique_id: a static unique name for the node.
77/// - node_generation: a monotonically increasing value (timestamp on every run)
78/// More details at https://github.com/quickwit-oss/chitchat/issues/1#issuecomment-1059029051
79///
80/// Note: using timestamp to make the `id` dynamic has the potential of reusing
81/// a previously used `id` in cases where the clock is reset in the past. We believe this
82/// very rare and things should just work fine.
83#[derive(Clone, Hash, Eq, PartialEq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
84pub struct NodeId {
85    // The unique identifier of this node in the cluster.
86    pub id: String,
87    // The SocketAddr other peers should use to communicate.
88    pub gossip_public_address: SocketAddr,
89}
90
91impl NodeId {
92    pub fn new(id: String, gossip_public_address: SocketAddr) -> Self {
93        Self {
94            id,
95            gossip_public_address,
96        }
97    }
98
99    pub fn for_test_localhost(port: u16) -> Self {
100        NodeId::new(
101            format!("node-{port}"),
102            ([127u8, 0u8, 0u8, 1u8], port).into(),
103        )
104    }
105
106    /// Returns the gossip public port. Useful for test assert only.
107    #[cfg(test)]
108    pub fn public_port(&self) -> u16 {
109        self.gossip_public_address.port()
110    }
111}
112
113/// A versioned value for a given Key-value pair.
114#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
115pub struct VersionedValue {
116    pub value: String,
117    pub version: Version,
118}
119
120pub struct Chitchat {
121    config: ChitchatConfig,
122    cluster_state: ClusterState,
123    heartbeat: u64,
124    /// The failure detector instance.
125    failure_detector: FailureDetector,
126    /// A notification channel (sender) for sending live nodes change feed.
127    ready_nodes_watcher_tx: watch::Sender<HashSet<NodeId>>,
128    /// A notification channel (receiver) for receiving `ready` nodes change feed.
129    ready_nodes_watcher_rx: watch::Receiver<HashSet<NodeId>>,
130}
131
132impl Chitchat {
133    pub fn with_node_id_and_seeds(
134        config: ChitchatConfig,
135        seed_addrs: watch::Receiver<HashSet<SocketAddr>>,
136        initial_key_values: Vec<(String, String)>,
137    ) -> Self {
138        let (ready_nodes_watcher_tx, ready_nodes_watcher_rx) = watch::channel(HashSet::new());
139        let failure_detector = FailureDetector::new(config.failure_detector_config.clone());
140        let mut chitchat = Chitchat {
141            config,
142            cluster_state: ClusterState::with_seed_addrs(seed_addrs),
143            heartbeat: 0,
144            failure_detector,
145            ready_nodes_watcher_tx,
146            ready_nodes_watcher_rx,
147        };
148
149        let self_node_state = chitchat.self_node_state();
150
151        // Immediately mark node as alive to ensure it responds to SYNs.
152        self_node_state.set(HEARTBEAT_KEY, 0);
153
154        // Set initial key/value pairs.
155        for (key, value) in initial_key_values {
156            self_node_state.set(key, value);
157        }
158
159        chitchat
160    }
161
162    pub fn create_syn_message(&mut self) -> ChitchatMessage {
163        let digest = self.compute_digest();
164        ChitchatMessage::Syn {
165            cluster_id: self.config.cluster_id.clone(),
166            digest,
167        }
168    }
169
170    pub fn process_message(&mut self, msg: ChitchatMessage) -> Option<ChitchatMessage> {
171        match msg {
172            ChitchatMessage::Syn { cluster_id, digest } => {
173                if cluster_id != self.config.cluster_id {
174                    warn!(
175                        cluster_id = %cluster_id,
176                        "rejecting syn message with mismatching cluster name"
177                    );
178                    return Some(ChitchatMessage::BadCluster);
179                }
180                let self_digest = self.compute_digest();
181                let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
182                let empty_delta = Delta::default();
183                let delta_mtu = MTU - syn_ack_serialized_len(&self_digest, &empty_delta);
184                let delta = self
185                    .cluster_state
186                    .compute_delta(&digest, delta_mtu, dead_nodes);
187                self.report_to_failure_detector(&delta);
188                Some(ChitchatMessage::SynAck {
189                    digest: self_digest,
190                    delta,
191                })
192            }
193            ChitchatMessage::SynAck { digest, delta } => {
194                self.report_to_failure_detector(&delta);
195                self.cluster_state.apply_delta(delta);
196                let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
197                let delta = self
198                    .cluster_state
199                    .compute_delta(&digest, MTU - 1, dead_nodes);
200                Some(ChitchatMessage::Ack { delta })
201            }
202            ChitchatMessage::Ack { delta } => {
203                self.report_to_failure_detector(&delta);
204                self.cluster_state.apply_delta(delta);
205                None
206            }
207            ChitchatMessage::BadCluster => {
208                warn!("message rejected by peer: cluster name mismatch");
209                None
210            }
211        }
212    }
213
214    fn report_to_failure_detector(&mut self, delta: &Delta) {
215        for (node_id, node_delta) in &delta.node_deltas {
216            let local_max_version = self
217                .cluster_state
218                .node_states
219                .get(node_id)
220                .map(|node_state| node_state.max_version)
221                .unwrap_or(0);
222
223            let delta_max_version = node_delta.max_version();
224            if local_max_version < delta_max_version {
225                self.failure_detector.report_heartbeat(node_id);
226            }
227        }
228    }
229
230    /// Checks and marks nodes as dead / live / ready.
231    pub fn update_nodes_liveliness(&mut self) {
232        let cluster_nodes = self
233            .cluster_state
234            .nodes()
235            .filter(|&node_id| node_id != self.self_node_id())
236            .collect::<Vec<_>>();
237        for &node_id in &cluster_nodes {
238            self.failure_detector.update_node_liveliness(node_id);
239        }
240
241        let ready_nodes_before = self.ready_nodes_watcher_rx.borrow().clone();
242        let ready_nodes_after = self.ready_nodes().cloned().collect::<HashSet<_>>();
243
244        if ready_nodes_before != ready_nodes_after {
245            debug!(current_node = ?self.self_node_id(), live_nodes = ?ready_nodes_after, "nodes status changed");
246            if self.ready_nodes_watcher_tx.send(ready_nodes_after).is_err() {
247                error!(current_node = ?self.self_node_id(), "error while reporting membership change event.")
248            }
249        }
250
251        // Perform garbage collection.
252        let garbage_collected_nodes = self.failure_detector.garbage_collect();
253        for node_id in garbage_collected_nodes.iter() {
254            self.cluster_state.remove_node(node_id);
255        }
256    }
257
258    pub fn node_state(&self, node_id: &NodeId) -> Option<&NodeState> {
259        self.cluster_state.node_state(node_id)
260    }
261
262    pub fn self_node_state(&mut self) -> &mut NodeState {
263        self.cluster_state.node_state_mut(&self.config.node_id)
264    }
265
266    /// Retrieves the list of all live nodes.
267    pub fn live_nodes(&self) -> impl Iterator<Item = &NodeId> {
268        self.failure_detector.live_nodes()
269    }
270
271    /// Retrieves the list of nodes that are ready.
272    /// To be ready, a node has to be alive and pass the `is_ready_predicate` as
273    /// defined in the Chitchat configuration.
274    pub fn ready_nodes(&self) -> impl Iterator<Item = &NodeId> {
275        self.live_nodes().filter(|&node_id| {
276            let is_ready_pred = if let Some(pred) = self.config.is_ready_predicate.as_ref() {
277                pred
278            } else {
279                // No predicate means that we consider all nodes as ready.
280                return true;
281            };
282            self.node_state(node_id).map(is_ready_pred).unwrap_or(false)
283        })
284    }
285
286    /// Retrieve the list of all dead nodes.
287    pub fn dead_nodes(&self) -> impl Iterator<Item = &NodeId> {
288        self.failure_detector.dead_nodes()
289    }
290
291    /// Retrieve a list of seed nodes.
292    pub fn seed_nodes(&self) -> HashSet<SocketAddr> {
293        self.cluster_state.seed_addrs()
294    }
295
296    pub fn self_node_id(&self) -> &NodeId {
297        &self.config.node_id
298    }
299
300    pub fn cluster_id(&self) -> &str {
301        &self.config.cluster_id
302    }
303
304    pub fn update_heartbeat(&mut self) {
305        self.heartbeat += 1;
306        let heartbeat = self.heartbeat;
307        self.self_node_state().set(HEARTBEAT_KEY, heartbeat);
308    }
309
310    /// Computes digest.
311    ///
312    /// This method also increments the heartbeat, to force the presence
313    /// of at least one update, and have the node liveliness propagated
314    /// through the cluster.
315    fn compute_digest(&mut self) -> Digest {
316        // Ensure for every reply from this node, at least the heartbeat is changed.
317        let dead_nodes: HashSet<_> = self.dead_nodes().collect();
318        self.cluster_state.compute_digest(dead_nodes)
319    }
320
321    pub(crate) fn cluster_state(&self) -> &ClusterState {
322        &self.cluster_state
323    }
324
325    /// Returns a serializable snapshot of the ClusterState
326    pub fn state_snapshot(&self) -> ClusterStateSnapshot {
327        ClusterStateSnapshot::from(&self.cluster_state)
328    }
329
330    /// Returns a watch stream for monitoring changes on the cluster's live nodes.
331    pub fn ready_nodes_watcher(&self) -> WatchStream<HashSet<NodeId>> {
332        WatchStream::new(self.ready_nodes_watcher_rx.clone())
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use std::ops::{Add, RangeInclusive};
339    use std::sync::Arc;
340    use std::time::Duration;
341
342    use mock_instant::MockClock;
343    use tokio::sync::Mutex;
344    use tokio::time;
345    use tokio_stream::wrappers::IntervalStream;
346    use tokio_stream::StreamExt;
347
348    use super::*;
349    use crate::server::{spawn_chitchat, ChitchatHandle};
350    use crate::transport::{ChannelTransport, Transport};
351
352    const DEAD_NODE_GRACE_PERIOD: Duration = Duration::from_secs(20);
353
354    fn run_chitchat_handshake(initiating_node: &mut Chitchat, peer_node: &mut Chitchat) {
355        let syn_message = initiating_node.create_syn_message();
356        let syn_ack_message = peer_node.process_message(syn_message).unwrap();
357        let ack_message = initiating_node.process_message(syn_ack_message).unwrap();
358        assert!(peer_node.process_message(ack_message).is_none());
359    }
360
361    fn assert_cluster_state_eq(lhs: &NodeState, rhs: &NodeState) {
362        assert_eq!(lhs.key_values.len(), rhs.key_values.len());
363        for (key, value) in &lhs.key_values {
364            if key == HEARTBEAT_KEY {
365                // we ignore the heartbeat key
366                continue;
367            }
368            assert_eq!(rhs.key_values.get(key), Some(value));
369        }
370    }
371
372    fn assert_nodes_sync(nodes: &[&Chitchat]) {
373        let first_node_states = &nodes[0].cluster_state.node_states;
374        for other_node in nodes.iter().skip(1) {
375            let node_states = &other_node.cluster_state.node_states;
376            assert_eq!(first_node_states.len(), node_states.len());
377            for (key, value) in first_node_states {
378                assert_cluster_state_eq(value, node_states.get(key).unwrap());
379            }
380        }
381    }
382
383    async fn start_node(
384        node_id: NodeId,
385        seeds: &[String],
386        transport: &dyn Transport,
387    ) -> ChitchatHandle {
388        let config = ChitchatConfig {
389            node_id: node_id.clone(),
390            cluster_id: "default-cluster".to_string(),
391            gossip_interval: Duration::from_millis(100),
392            listen_addr: node_id.gossip_public_address,
393            seed_nodes: seeds.to_vec(),
394            failure_detector_config: FailureDetectorConfig {
395                dead_node_grace_period: DEAD_NODE_GRACE_PERIOD,
396                phi_threshold: 5.0,
397                initial_interval: Duration::from_millis(100),
398                ..Default::default()
399            },
400            is_ready_predicate: None,
401        };
402        let initial_kvs: Vec<(String, String)> = Vec::new();
403        spawn_chitchat(config, initial_kvs, transport)
404            .await
405            .unwrap()
406    }
407
408    async fn setup_nodes(
409        port_range: RangeInclusive<u16>,
410        transport: &dyn Transport,
411    ) -> Vec<ChitchatHandle> {
412        let node_ids: Vec<NodeId> = port_range.map(NodeId::for_test_localhost).collect();
413        let node_without_seed = start_node(node_ids[0].clone(), &[], transport).await;
414        let mut chitchat_handlers: Vec<ChitchatHandle> = vec![node_without_seed];
415        for node_id in &node_ids[1..] {
416            let seeds = node_ids
417                .iter()
418                .filter(|&peer_id| peer_id != node_id)
419                .map(|peer_id| peer_id.gossip_public_address.to_string())
420                .collect::<Vec<_>>();
421            chitchat_handlers.push(start_node(node_id.clone(), &seeds, transport).await);
422        }
423        // Make sure the failure detector's fake clock moves forward.
424        tokio::spawn(async {
425            let mut ticker = IntervalStream::new(time::interval(Duration::from_millis(50)));
426            while ticker.next().await.is_some() {
427                MockClock::advance(Duration::from_millis(50));
428            }
429        });
430        chitchat_handlers
431    }
432
433    async fn shutdown_nodes(nodes: Vec<ChitchatHandle>) -> anyhow::Result<()> {
434        for node in nodes {
435            node.shutdown().await?;
436        }
437        Ok(())
438    }
439
440    async fn wait_for_chitchat_state(
441        chitchat: Arc<Mutex<Chitchat>>,
442        expected_node_count: usize,
443        expected_nodes: &[NodeId],
444    ) {
445        let mut ready_nodes_watcher = chitchat
446            .lock()
447            .await
448            .ready_nodes_watcher()
449            .skip_while(|live_nodes| live_nodes.len() != expected_node_count);
450        tokio::time::timeout(Duration::from_secs(50), async move {
451            let live_nodes = ready_nodes_watcher.next().await.unwrap();
452            assert_eq!(
453                live_nodes,
454                expected_nodes.iter().cloned().collect::<HashSet<_>>()
455            );
456        })
457        .await
458        .unwrap();
459    }
460
461    #[test]
462    fn test_chitchat_handshake() {
463        let node_config1 = ChitchatConfig::for_test(10_001);
464        let empty_seeds = watch::channel(Default::default()).1;
465        let mut node1 = Chitchat::with_node_id_and_seeds(
466            node_config1,
467            empty_seeds.clone(),
468            vec![
469                ("key1a".to_string(), "1".to_string()),
470                ("key2a".to_string(), "2".to_string()),
471            ],
472        );
473        let node_config2 = ChitchatConfig::for_test(10_002);
474        let mut node2 = Chitchat::with_node_id_and_seeds(
475            node_config2,
476            empty_seeds,
477            vec![
478                ("key1b".to_string(), "1".to_string()),
479                ("key2b".to_string(), "2".to_string()),
480            ],
481        );
482        run_chitchat_handshake(&mut node1, &mut node2);
483        assert_nodes_sync(&[&node1, &node2]);
484        // useless handshake
485        run_chitchat_handshake(&mut node1, &mut node2);
486        assert_nodes_sync(&[&node1, &node2]);
487        {
488            let state1 = node1.self_node_state();
489            state1.set("key1a", "3");
490            state1.set("key1c", "4");
491        }
492        run_chitchat_handshake(&mut node1, &mut node2);
493        assert_nodes_sync(&[&node1, &node2]);
494    }
495
496    #[tokio::test]
497    async fn test_multiple_nodes() -> anyhow::Result<()> {
498        let transport = ChannelTransport::default();
499        let nodes = setup_nodes(20001..=20005, &transport).await;
500
501        let node = nodes.get(1).unwrap();
502        assert_eq!(node.node_id().public_port(), 20002);
503        wait_for_chitchat_state(
504            node.chitchat(),
505            4,
506            &[
507                NodeId::for_test_localhost(20001),
508                NodeId::for_test_localhost(20003),
509                NodeId::for_test_localhost(20004),
510                NodeId::for_test_localhost(20005),
511            ],
512        )
513        .await;
514
515        shutdown_nodes(nodes).await?;
516        Ok(())
517    }
518
519    #[tokio::test]
520    async fn test_node_goes_from_live_to_down_to_live() -> anyhow::Result<()> {
521        let transport = ChannelTransport::default();
522        let mut nodes = setup_nodes(30001..=30006, &transport).await;
523        let node = &nodes[1];
524        assert_eq!(node.node_id().gossip_public_address.port(), 30002);
525        wait_for_chitchat_state(
526            node.chitchat(),
527            5,
528            &[
529                NodeId::for_test_localhost(30001),
530                NodeId::for_test_localhost(30003),
531                NodeId::for_test_localhost(30004),
532                NodeId::for_test_localhost(30005),
533                NodeId::for_test_localhost(30006),
534            ],
535        )
536        .await;
537
538        // Take down node at localhost:30003
539        let node = nodes.remove(2);
540        assert_eq!(node.node_id().public_port(), 30003);
541        node.shutdown().await.unwrap();
542
543        let node = nodes.get(1).unwrap();
544        assert_eq!(node.node_id().public_port(), 30002);
545        wait_for_chitchat_state(
546            node.chitchat(),
547            4,
548            &[
549                NodeId::for_test_localhost(30001),
550                NodeId::for_test_localhost(30004),
551                NodeId::for_test_localhost(30005),
552                NodeId::for_test_localhost(30006),
553            ],
554        )
555        .await;
556
557        // Restart node at localhost:10003
558        let node_3 = NodeId::for_test_localhost(30003);
559        nodes.push(
560            start_node(
561                node_3,
562                &[NodeId::for_test_localhost(30_001)
563                    .gossip_public_address
564                    .to_string()],
565                &transport,
566            )
567            .await,
568        );
569
570        let node = nodes.get(1).unwrap();
571        assert_eq!(node.node_id().public_port(), 30002);
572        wait_for_chitchat_state(
573            node.chitchat(),
574            5,
575            &[
576                NodeId::for_test_localhost(30001),
577                NodeId::for_test_localhost(30003),
578                NodeId::for_test_localhost(30004),
579                NodeId::for_test_localhost(30005),
580                NodeId::for_test_localhost(30006),
581            ],
582        )
583        .await;
584
585        shutdown_nodes(nodes).await?;
586        Ok(())
587    }
588
589    #[tokio::test]
590    async fn test_dead_node_should_not_be_gossiped_when_node_joins() -> anyhow::Result<()> {
591        let transport = ChannelTransport::default();
592        let mut nodes = setup_nodes(40001..=40004, &transport).await;
593        {
594            let node2 = nodes.get(1).unwrap();
595            assert_eq!(node2.node_id().public_port(), 40002);
596            wait_for_chitchat_state(
597                node2.chitchat(),
598                3,
599                &[
600                    NodeId::for_test_localhost(40001),
601                    NodeId::for_test_localhost(40003),
602                    NodeId::for_test_localhost(40004),
603                ],
604            )
605            .await;
606        }
607
608        // Take down node at localhost:40003
609        let node3 = nodes.remove(2);
610        assert_eq!(node3.node_id().public_port(), 40003);
611        node3.shutdown().await.unwrap();
612
613        {
614            let node2 = nodes.get(1).unwrap();
615            assert_eq!(node2.node_id().public_port(), 40002);
616            wait_for_chitchat_state(
617                node2.chitchat(),
618                2,
619                &[
620                    NodeId::for_test_localhost(40_001),
621                    NodeId::for_test_localhost(40_004),
622                ],
623            )
624            .await;
625        }
626
627        // Restart node at localhost:40003 with new name
628        let mut new_config = ChitchatConfig::for_test(40_003);
629
630        new_config.node_id.id = "new_node".to_string();
631        let seed = NodeId::for_test_localhost(40_002).gossip_public_address;
632        new_config.seed_nodes = vec![seed.to_string()];
633        let new_node_chitchat = spawn_chitchat(new_config, Vec::new(), &transport)
634            .await
635            .unwrap();
636
637        wait_for_chitchat_state(
638            new_node_chitchat.chitchat(),
639            3,
640            &[
641                NodeId::for_test_localhost(40_001),
642                NodeId::for_test_localhost(40_002),
643                NodeId::for_test_localhost(40_004),
644            ],
645        )
646        .await;
647
648        nodes.push(new_node_chitchat);
649        shutdown_nodes(nodes).await?;
650        Ok(())
651    }
652
653    #[tokio::test]
654    async fn test_network_partition_nodes() -> anyhow::Result<()> {
655        let transport = ChannelTransport::default();
656        let port_range = 11_001u16..=11_006;
657        let nodes = setup_nodes(port_range.clone(), &transport).await;
658
659        // Check nodes know each other.
660        for node in nodes.iter() {
661            let expected_peers: Vec<NodeId> = port_range
662                .clone()
663                .filter(|peer_port| *peer_port != node.node_id().public_port())
664                .map(NodeId::for_test_localhost)
665                .collect::<Vec<_>>();
666            wait_for_chitchat_state(node.chitchat(), 5, &expected_peers).await;
667        }
668
669        shutdown_nodes(nodes).await?;
670        Ok(())
671    }
672
673    #[tokio::test]
674    async fn test_dead_node_garbage_collection() -> anyhow::Result<()> {
675        let transport = ChannelTransport::default();
676        let mut nodes = setup_nodes(60001..=60006, &transport).await;
677        let node = nodes.get(1).unwrap();
678        assert_eq!(node.node_id().public_port(), 60002);
679        wait_for_chitchat_state(
680            node.chitchat(),
681            5,
682            &[
683                NodeId::for_test_localhost(60_001),
684                NodeId::for_test_localhost(60_003),
685                NodeId::for_test_localhost(60_004),
686                NodeId::for_test_localhost(60_005),
687                NodeId::for_test_localhost(60_006),
688            ],
689        )
690        .await;
691
692        // Take down node at localhost:60003
693        let node = nodes.remove(2);
694        assert_eq!(node.node_id().public_port(), 60003);
695        node.shutdown().await.unwrap();
696
697        let node = nodes.get(1).unwrap();
698        assert_eq!(node.node_id().public_port(), 60002);
699        wait_for_chitchat_state(
700            node.chitchat(),
701            4,
702            &[
703                NodeId::for_test_localhost(60_001),
704                NodeId::for_test_localhost(60_004),
705                NodeId::for_test_localhost(60_005),
706                NodeId::for_test_localhost(60_006),
707            ],
708        )
709        .await;
710
711        // Dead node should still be known to the cluster.
712        let dead_node_id = NodeId::for_test_localhost(60003);
713        for node in &nodes {
714            assert!(node
715                .chitchat()
716                .lock()
717                .await
718                .node_state(&dead_node_id)
719                .is_some());
720        }
721
722        // Wait a bit more than `dead_node_grace_period` since all nodes will not
723        // notice cluster change at the same time.
724        let wait_for = DEAD_NODE_GRACE_PERIOD.add(Duration::from_secs(5));
725        time::sleep(wait_for).await;
726
727        // Dead node should no longer be known to the cluster.
728        for node in &nodes {
729            assert!(node
730                .chitchat()
731                .lock()
732                .await
733                .node_state(&dead_node_id)
734                .is_none());
735        }
736
737        shutdown_nodes(nodes).await?;
738        Ok(())
739    }
740}