1#![allow(clippy::type_complexity)]
2#![allow(clippy::derive_partial_eq_without_eq)]
3
4mod server;
5
6mod configuration;
7mod delta;
8mod digest;
9mod failure_detector;
10mod message;
11pub mod serialize;
12mod state;
13pub mod transport;
14
15use std::collections::HashSet;
16use std::net::SocketAddr;
17
18use delta::Delta;
19use failure_detector::FailureDetector;
20pub use failure_detector::FailureDetectorConfig;
21use serde::{Deserialize, Serialize};
22use tokio::sync::watch;
23use tokio_stream::wrappers::WatchStream;
24use tracing::{debug, error, warn};
25
26pub use self::configuration::ChitchatConfig;
27pub use self::state::{ClusterStateSnapshot, NodeState};
28use crate::digest::Digest;
29use crate::message::syn_ack_serialized_len;
30pub use crate::message::ChitchatMessage;
31pub use crate::server::{spawn_chitchat, ChitchatHandle};
32use crate::state::ClusterState;
33
34pub(crate) const HEARTBEAT_KEY: &str = "heartbeat";
36
37const MTU: usize = 60_000;
47
48pub type Version = u64;
49
50#[derive(Clone, Hash, Eq, PartialEq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
84pub struct NodeId {
85 pub id: String,
87 pub gossip_public_address: SocketAddr,
89}
90
91impl NodeId {
92 pub fn new(id: String, gossip_public_address: SocketAddr) -> Self {
93 Self {
94 id,
95 gossip_public_address,
96 }
97 }
98
99 pub fn for_test_localhost(port: u16) -> Self {
100 NodeId::new(
101 format!("node-{port}"),
102 ([127u8, 0u8, 0u8, 1u8], port).into(),
103 )
104 }
105
106 #[cfg(test)]
108 pub fn public_port(&self) -> u16 {
109 self.gossip_public_address.port()
110 }
111}
112
113#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
115pub struct VersionedValue {
116 pub value: String,
117 pub version: Version,
118}
119
120pub struct Chitchat {
121 config: ChitchatConfig,
122 cluster_state: ClusterState,
123 heartbeat: u64,
124 failure_detector: FailureDetector,
126 ready_nodes_watcher_tx: watch::Sender<HashSet<NodeId>>,
128 ready_nodes_watcher_rx: watch::Receiver<HashSet<NodeId>>,
130}
131
132impl Chitchat {
133 pub fn with_node_id_and_seeds(
134 config: ChitchatConfig,
135 seed_addrs: watch::Receiver<HashSet<SocketAddr>>,
136 initial_key_values: Vec<(String, String)>,
137 ) -> Self {
138 let (ready_nodes_watcher_tx, ready_nodes_watcher_rx) = watch::channel(HashSet::new());
139 let failure_detector = FailureDetector::new(config.failure_detector_config.clone());
140 let mut chitchat = Chitchat {
141 config,
142 cluster_state: ClusterState::with_seed_addrs(seed_addrs),
143 heartbeat: 0,
144 failure_detector,
145 ready_nodes_watcher_tx,
146 ready_nodes_watcher_rx,
147 };
148
149 let self_node_state = chitchat.self_node_state();
150
151 self_node_state.set(HEARTBEAT_KEY, 0);
153
154 for (key, value) in initial_key_values {
156 self_node_state.set(key, value);
157 }
158
159 chitchat
160 }
161
162 pub fn create_syn_message(&mut self) -> ChitchatMessage {
163 let digest = self.compute_digest();
164 ChitchatMessage::Syn {
165 cluster_id: self.config.cluster_id.clone(),
166 digest,
167 }
168 }
169
170 pub fn process_message(&mut self, msg: ChitchatMessage) -> Option<ChitchatMessage> {
171 match msg {
172 ChitchatMessage::Syn { cluster_id, digest } => {
173 if cluster_id != self.config.cluster_id {
174 warn!(
175 cluster_id = %cluster_id,
176 "rejecting syn message with mismatching cluster name"
177 );
178 return Some(ChitchatMessage::BadCluster);
179 }
180 let self_digest = self.compute_digest();
181 let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
182 let empty_delta = Delta::default();
183 let delta_mtu = MTU - syn_ack_serialized_len(&self_digest, &empty_delta);
184 let delta = self
185 .cluster_state
186 .compute_delta(&digest, delta_mtu, dead_nodes);
187 self.report_to_failure_detector(&delta);
188 Some(ChitchatMessage::SynAck {
189 digest: self_digest,
190 delta,
191 })
192 }
193 ChitchatMessage::SynAck { digest, delta } => {
194 self.report_to_failure_detector(&delta);
195 self.cluster_state.apply_delta(delta);
196 let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
197 let delta = self
198 .cluster_state
199 .compute_delta(&digest, MTU - 1, dead_nodes);
200 Some(ChitchatMessage::Ack { delta })
201 }
202 ChitchatMessage::Ack { delta } => {
203 self.report_to_failure_detector(&delta);
204 self.cluster_state.apply_delta(delta);
205 None
206 }
207 ChitchatMessage::BadCluster => {
208 warn!("message rejected by peer: cluster name mismatch");
209 None
210 }
211 }
212 }
213
214 fn report_to_failure_detector(&mut self, delta: &Delta) {
215 for (node_id, node_delta) in &delta.node_deltas {
216 let local_max_version = self
217 .cluster_state
218 .node_states
219 .get(node_id)
220 .map(|node_state| node_state.max_version)
221 .unwrap_or(0);
222
223 let delta_max_version = node_delta.max_version();
224 if local_max_version < delta_max_version {
225 self.failure_detector.report_heartbeat(node_id);
226 }
227 }
228 }
229
230 pub fn update_nodes_liveliness(&mut self) {
232 let cluster_nodes = self
233 .cluster_state
234 .nodes()
235 .filter(|&node_id| node_id != self.self_node_id())
236 .collect::<Vec<_>>();
237 for &node_id in &cluster_nodes {
238 self.failure_detector.update_node_liveliness(node_id);
239 }
240
241 let ready_nodes_before = self.ready_nodes_watcher_rx.borrow().clone();
242 let ready_nodes_after = self.ready_nodes().cloned().collect::<HashSet<_>>();
243
244 if ready_nodes_before != ready_nodes_after {
245 debug!(current_node = ?self.self_node_id(), live_nodes = ?ready_nodes_after, "nodes status changed");
246 if self.ready_nodes_watcher_tx.send(ready_nodes_after).is_err() {
247 error!(current_node = ?self.self_node_id(), "error while reporting membership change event.")
248 }
249 }
250
251 let garbage_collected_nodes = self.failure_detector.garbage_collect();
253 for node_id in garbage_collected_nodes.iter() {
254 self.cluster_state.remove_node(node_id);
255 }
256 }
257
258 pub fn node_state(&self, node_id: &NodeId) -> Option<&NodeState> {
259 self.cluster_state.node_state(node_id)
260 }
261
262 pub fn self_node_state(&mut self) -> &mut NodeState {
263 self.cluster_state.node_state_mut(&self.config.node_id)
264 }
265
266 pub fn live_nodes(&self) -> impl Iterator<Item = &NodeId> {
268 self.failure_detector.live_nodes()
269 }
270
271 pub fn ready_nodes(&self) -> impl Iterator<Item = &NodeId> {
275 self.live_nodes().filter(|&node_id| {
276 let is_ready_pred = if let Some(pred) = self.config.is_ready_predicate.as_ref() {
277 pred
278 } else {
279 return true;
281 };
282 self.node_state(node_id).map(is_ready_pred).unwrap_or(false)
283 })
284 }
285
286 pub fn dead_nodes(&self) -> impl Iterator<Item = &NodeId> {
288 self.failure_detector.dead_nodes()
289 }
290
291 pub fn seed_nodes(&self) -> HashSet<SocketAddr> {
293 self.cluster_state.seed_addrs()
294 }
295
296 pub fn self_node_id(&self) -> &NodeId {
297 &self.config.node_id
298 }
299
300 pub fn cluster_id(&self) -> &str {
301 &self.config.cluster_id
302 }
303
304 pub fn update_heartbeat(&mut self) {
305 self.heartbeat += 1;
306 let heartbeat = self.heartbeat;
307 self.self_node_state().set(HEARTBEAT_KEY, heartbeat);
308 }
309
310 fn compute_digest(&mut self) -> Digest {
316 let dead_nodes: HashSet<_> = self.dead_nodes().collect();
318 self.cluster_state.compute_digest(dead_nodes)
319 }
320
321 pub(crate) fn cluster_state(&self) -> &ClusterState {
322 &self.cluster_state
323 }
324
325 pub fn state_snapshot(&self) -> ClusterStateSnapshot {
327 ClusterStateSnapshot::from(&self.cluster_state)
328 }
329
330 pub fn ready_nodes_watcher(&self) -> WatchStream<HashSet<NodeId>> {
332 WatchStream::new(self.ready_nodes_watcher_rx.clone())
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use std::ops::{Add, RangeInclusive};
339 use std::sync::Arc;
340 use std::time::Duration;
341
342 use mock_instant::MockClock;
343 use tokio::sync::Mutex;
344 use tokio::time;
345 use tokio_stream::wrappers::IntervalStream;
346 use tokio_stream::StreamExt;
347
348 use super::*;
349 use crate::server::{spawn_chitchat, ChitchatHandle};
350 use crate::transport::{ChannelTransport, Transport};
351
352 const DEAD_NODE_GRACE_PERIOD: Duration = Duration::from_secs(20);
353
354 fn run_chitchat_handshake(initiating_node: &mut Chitchat, peer_node: &mut Chitchat) {
355 let syn_message = initiating_node.create_syn_message();
356 let syn_ack_message = peer_node.process_message(syn_message).unwrap();
357 let ack_message = initiating_node.process_message(syn_ack_message).unwrap();
358 assert!(peer_node.process_message(ack_message).is_none());
359 }
360
361 fn assert_cluster_state_eq(lhs: &NodeState, rhs: &NodeState) {
362 assert_eq!(lhs.key_values.len(), rhs.key_values.len());
363 for (key, value) in &lhs.key_values {
364 if key == HEARTBEAT_KEY {
365 continue;
367 }
368 assert_eq!(rhs.key_values.get(key), Some(value));
369 }
370 }
371
372 fn assert_nodes_sync(nodes: &[&Chitchat]) {
373 let first_node_states = &nodes[0].cluster_state.node_states;
374 for other_node in nodes.iter().skip(1) {
375 let node_states = &other_node.cluster_state.node_states;
376 assert_eq!(first_node_states.len(), node_states.len());
377 for (key, value) in first_node_states {
378 assert_cluster_state_eq(value, node_states.get(key).unwrap());
379 }
380 }
381 }
382
383 async fn start_node(
384 node_id: NodeId,
385 seeds: &[String],
386 transport: &dyn Transport,
387 ) -> ChitchatHandle {
388 let config = ChitchatConfig {
389 node_id: node_id.clone(),
390 cluster_id: "default-cluster".to_string(),
391 gossip_interval: Duration::from_millis(100),
392 listen_addr: node_id.gossip_public_address,
393 seed_nodes: seeds.to_vec(),
394 failure_detector_config: FailureDetectorConfig {
395 dead_node_grace_period: DEAD_NODE_GRACE_PERIOD,
396 phi_threshold: 5.0,
397 initial_interval: Duration::from_millis(100),
398 ..Default::default()
399 },
400 is_ready_predicate: None,
401 };
402 let initial_kvs: Vec<(String, String)> = Vec::new();
403 spawn_chitchat(config, initial_kvs, transport)
404 .await
405 .unwrap()
406 }
407
408 async fn setup_nodes(
409 port_range: RangeInclusive<u16>,
410 transport: &dyn Transport,
411 ) -> Vec<ChitchatHandle> {
412 let node_ids: Vec<NodeId> = port_range.map(NodeId::for_test_localhost).collect();
413 let node_without_seed = start_node(node_ids[0].clone(), &[], transport).await;
414 let mut chitchat_handlers: Vec<ChitchatHandle> = vec![node_without_seed];
415 for node_id in &node_ids[1..] {
416 let seeds = node_ids
417 .iter()
418 .filter(|&peer_id| peer_id != node_id)
419 .map(|peer_id| peer_id.gossip_public_address.to_string())
420 .collect::<Vec<_>>();
421 chitchat_handlers.push(start_node(node_id.clone(), &seeds, transport).await);
422 }
423 tokio::spawn(async {
425 let mut ticker = IntervalStream::new(time::interval(Duration::from_millis(50)));
426 while ticker.next().await.is_some() {
427 MockClock::advance(Duration::from_millis(50));
428 }
429 });
430 chitchat_handlers
431 }
432
433 async fn shutdown_nodes(nodes: Vec<ChitchatHandle>) -> anyhow::Result<()> {
434 for node in nodes {
435 node.shutdown().await?;
436 }
437 Ok(())
438 }
439
440 async fn wait_for_chitchat_state(
441 chitchat: Arc<Mutex<Chitchat>>,
442 expected_node_count: usize,
443 expected_nodes: &[NodeId],
444 ) {
445 let mut ready_nodes_watcher = chitchat
446 .lock()
447 .await
448 .ready_nodes_watcher()
449 .skip_while(|live_nodes| live_nodes.len() != expected_node_count);
450 tokio::time::timeout(Duration::from_secs(50), async move {
451 let live_nodes = ready_nodes_watcher.next().await.unwrap();
452 assert_eq!(
453 live_nodes,
454 expected_nodes.iter().cloned().collect::<HashSet<_>>()
455 );
456 })
457 .await
458 .unwrap();
459 }
460
461 #[test]
462 fn test_chitchat_handshake() {
463 let node_config1 = ChitchatConfig::for_test(10_001);
464 let empty_seeds = watch::channel(Default::default()).1;
465 let mut node1 = Chitchat::with_node_id_and_seeds(
466 node_config1,
467 empty_seeds.clone(),
468 vec![
469 ("key1a".to_string(), "1".to_string()),
470 ("key2a".to_string(), "2".to_string()),
471 ],
472 );
473 let node_config2 = ChitchatConfig::for_test(10_002);
474 let mut node2 = Chitchat::with_node_id_and_seeds(
475 node_config2,
476 empty_seeds,
477 vec![
478 ("key1b".to_string(), "1".to_string()),
479 ("key2b".to_string(), "2".to_string()),
480 ],
481 );
482 run_chitchat_handshake(&mut node1, &mut node2);
483 assert_nodes_sync(&[&node1, &node2]);
484 run_chitchat_handshake(&mut node1, &mut node2);
486 assert_nodes_sync(&[&node1, &node2]);
487 {
488 let state1 = node1.self_node_state();
489 state1.set("key1a", "3");
490 state1.set("key1c", "4");
491 }
492 run_chitchat_handshake(&mut node1, &mut node2);
493 assert_nodes_sync(&[&node1, &node2]);
494 }
495
496 #[tokio::test]
497 async fn test_multiple_nodes() -> anyhow::Result<()> {
498 let transport = ChannelTransport::default();
499 let nodes = setup_nodes(20001..=20005, &transport).await;
500
501 let node = nodes.get(1).unwrap();
502 assert_eq!(node.node_id().public_port(), 20002);
503 wait_for_chitchat_state(
504 node.chitchat(),
505 4,
506 &[
507 NodeId::for_test_localhost(20001),
508 NodeId::for_test_localhost(20003),
509 NodeId::for_test_localhost(20004),
510 NodeId::for_test_localhost(20005),
511 ],
512 )
513 .await;
514
515 shutdown_nodes(nodes).await?;
516 Ok(())
517 }
518
519 #[tokio::test]
520 async fn test_node_goes_from_live_to_down_to_live() -> anyhow::Result<()> {
521 let transport = ChannelTransport::default();
522 let mut nodes = setup_nodes(30001..=30006, &transport).await;
523 let node = &nodes[1];
524 assert_eq!(node.node_id().gossip_public_address.port(), 30002);
525 wait_for_chitchat_state(
526 node.chitchat(),
527 5,
528 &[
529 NodeId::for_test_localhost(30001),
530 NodeId::for_test_localhost(30003),
531 NodeId::for_test_localhost(30004),
532 NodeId::for_test_localhost(30005),
533 NodeId::for_test_localhost(30006),
534 ],
535 )
536 .await;
537
538 let node = nodes.remove(2);
540 assert_eq!(node.node_id().public_port(), 30003);
541 node.shutdown().await.unwrap();
542
543 let node = nodes.get(1).unwrap();
544 assert_eq!(node.node_id().public_port(), 30002);
545 wait_for_chitchat_state(
546 node.chitchat(),
547 4,
548 &[
549 NodeId::for_test_localhost(30001),
550 NodeId::for_test_localhost(30004),
551 NodeId::for_test_localhost(30005),
552 NodeId::for_test_localhost(30006),
553 ],
554 )
555 .await;
556
557 let node_3 = NodeId::for_test_localhost(30003);
559 nodes.push(
560 start_node(
561 node_3,
562 &[NodeId::for_test_localhost(30_001)
563 .gossip_public_address
564 .to_string()],
565 &transport,
566 )
567 .await,
568 );
569
570 let node = nodes.get(1).unwrap();
571 assert_eq!(node.node_id().public_port(), 30002);
572 wait_for_chitchat_state(
573 node.chitchat(),
574 5,
575 &[
576 NodeId::for_test_localhost(30001),
577 NodeId::for_test_localhost(30003),
578 NodeId::for_test_localhost(30004),
579 NodeId::for_test_localhost(30005),
580 NodeId::for_test_localhost(30006),
581 ],
582 )
583 .await;
584
585 shutdown_nodes(nodes).await?;
586 Ok(())
587 }
588
589 #[tokio::test]
590 async fn test_dead_node_should_not_be_gossiped_when_node_joins() -> anyhow::Result<()> {
591 let transport = ChannelTransport::default();
592 let mut nodes = setup_nodes(40001..=40004, &transport).await;
593 {
594 let node2 = nodes.get(1).unwrap();
595 assert_eq!(node2.node_id().public_port(), 40002);
596 wait_for_chitchat_state(
597 node2.chitchat(),
598 3,
599 &[
600 NodeId::for_test_localhost(40001),
601 NodeId::for_test_localhost(40003),
602 NodeId::for_test_localhost(40004),
603 ],
604 )
605 .await;
606 }
607
608 let node3 = nodes.remove(2);
610 assert_eq!(node3.node_id().public_port(), 40003);
611 node3.shutdown().await.unwrap();
612
613 {
614 let node2 = nodes.get(1).unwrap();
615 assert_eq!(node2.node_id().public_port(), 40002);
616 wait_for_chitchat_state(
617 node2.chitchat(),
618 2,
619 &[
620 NodeId::for_test_localhost(40_001),
621 NodeId::for_test_localhost(40_004),
622 ],
623 )
624 .await;
625 }
626
627 let mut new_config = ChitchatConfig::for_test(40_003);
629
630 new_config.node_id.id = "new_node".to_string();
631 let seed = NodeId::for_test_localhost(40_002).gossip_public_address;
632 new_config.seed_nodes = vec![seed.to_string()];
633 let new_node_chitchat = spawn_chitchat(new_config, Vec::new(), &transport)
634 .await
635 .unwrap();
636
637 wait_for_chitchat_state(
638 new_node_chitchat.chitchat(),
639 3,
640 &[
641 NodeId::for_test_localhost(40_001),
642 NodeId::for_test_localhost(40_002),
643 NodeId::for_test_localhost(40_004),
644 ],
645 )
646 .await;
647
648 nodes.push(new_node_chitchat);
649 shutdown_nodes(nodes).await?;
650 Ok(())
651 }
652
653 #[tokio::test]
654 async fn test_network_partition_nodes() -> anyhow::Result<()> {
655 let transport = ChannelTransport::default();
656 let port_range = 11_001u16..=11_006;
657 let nodes = setup_nodes(port_range.clone(), &transport).await;
658
659 for node in nodes.iter() {
661 let expected_peers: Vec<NodeId> = port_range
662 .clone()
663 .filter(|peer_port| *peer_port != node.node_id().public_port())
664 .map(NodeId::for_test_localhost)
665 .collect::<Vec<_>>();
666 wait_for_chitchat_state(node.chitchat(), 5, &expected_peers).await;
667 }
668
669 shutdown_nodes(nodes).await?;
670 Ok(())
671 }
672
673 #[tokio::test]
674 async fn test_dead_node_garbage_collection() -> anyhow::Result<()> {
675 let transport = ChannelTransport::default();
676 let mut nodes = setup_nodes(60001..=60006, &transport).await;
677 let node = nodes.get(1).unwrap();
678 assert_eq!(node.node_id().public_port(), 60002);
679 wait_for_chitchat_state(
680 node.chitchat(),
681 5,
682 &[
683 NodeId::for_test_localhost(60_001),
684 NodeId::for_test_localhost(60_003),
685 NodeId::for_test_localhost(60_004),
686 NodeId::for_test_localhost(60_005),
687 NodeId::for_test_localhost(60_006),
688 ],
689 )
690 .await;
691
692 let node = nodes.remove(2);
694 assert_eq!(node.node_id().public_port(), 60003);
695 node.shutdown().await.unwrap();
696
697 let node = nodes.get(1).unwrap();
698 assert_eq!(node.node_id().public_port(), 60002);
699 wait_for_chitchat_state(
700 node.chitchat(),
701 4,
702 &[
703 NodeId::for_test_localhost(60_001),
704 NodeId::for_test_localhost(60_004),
705 NodeId::for_test_localhost(60_005),
706 NodeId::for_test_localhost(60_006),
707 ],
708 )
709 .await;
710
711 let dead_node_id = NodeId::for_test_localhost(60003);
713 for node in &nodes {
714 assert!(node
715 .chitchat()
716 .lock()
717 .await
718 .node_state(&dead_node_id)
719 .is_some());
720 }
721
722 let wait_for = DEAD_NODE_GRACE_PERIOD.add(Duration::from_secs(5));
725 time::sleep(wait_for).await;
726
727 for node in &nodes {
729 assert!(node
730 .chitchat()
731 .lock()
732 .await
733 .node_state(&dead_node_id)
734 .is_none());
735 }
736
737 shutdown_nodes(nodes).await?;
738 Ok(())
739 }
740}