1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#![allow(clippy::derive_partial_eq_without_eq)]
use std::net::SocketAddr;
use std::time::Duration;
use crate::{ChitchatId, FailureDetectorConfig, NodeState};
/// An optional user-defined callback executed when the self node is lagging behind.
pub type CatchupCallback = Box<dyn Fn() + Send>;
/// An optional user-defined predicate liveness predication applied on top of the output of the
/// failure detector.
pub type ExtraLivenessPredicate = Box<dyn Fn(&NodeState) -> bool + Send>;
/// A struct for configuring a Chitchat instance.
pub struct ChitchatConfig {
pub chitchat_id: ChitchatId,
pub cluster_id: String,
pub gossip_interval: Duration,
pub listen_addr: SocketAddr,
pub seed_nodes: Vec<String>,
pub failure_detector_config: FailureDetectorConfig,
// Marked for deletion grace period expressed as a number of hearbeats.
// Chitchat ensures a key marked for deletion is eventually deleted by three mechanisms:
// - Garbage collection: each heartbeat, marked for deletion keys with `deletion now > instant
// + marked_for_deletion_grace_period` are deleted.
// - Compute delta: for a given node digest, if `node_digest.heartbeat +
// marked_for_deletion_grace_period < node_state.heartbeat` the node is flagged "to be reset"
// and the delta is populated with all keys and values.
// - Apply delta: for a node flagged "to be reset", Chitchat will remove the node state and
// populate a fresh new node state with the keys and values present in the delta.
pub marked_for_deletion_grace_period: Duration,
/// An optional callback executed when the self node is lagging behind. It
/// is meant to wire up an external mechanism capable of catching up the
/// Chitchat state faster than the Chitchat protocol. That external
/// mechanism will call [`crate::Chitchat::reset_node_state`] to communicate
/// the results back to Chitchat. For instance, in Quickwit, we use a GRPC
/// API to fetch the states from other nodes in the cluster.
pub catchup_callback: Option<CatchupCallback>,
// Extra lifeness predicate that can be used to define what a node being "live" means.
// It can be used for instance, to only surface the nodes that are both alive according
// to the failure detector, but also have a given set of required keys.
pub extra_liveness_predicate: Option<ExtraLivenessPredicate>,
}
impl ChitchatConfig {
#[cfg(test)]
pub fn for_test(port: u16) -> Self {
let chitchat_id = ChitchatId::for_local_test(port);
let listen_addr = chitchat_id.gossip_advertise_addr;
Self {
chitchat_id,
cluster_id: "default-cluster".to_string(),
gossip_interval: Duration::from_millis(50),
listen_addr,
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: Duration::from_secs(10_000),
catchup_callback: None,
extra_liveness_predicate: None,
}
}
}
#[cfg(test)]
impl Default for ChitchatConfig {
fn default() -> Self {
let chitchat_id = ChitchatId::for_local_test(10_000);
let listen_addr = chitchat_id.gossip_advertise_addr;
Self {
chitchat_id,
cluster_id: "default-cluster".to_string(),
gossip_interval: Duration::from_millis(1_000),
listen_addr,
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: Duration::from_secs(3_600 * 2), // 2h
catchup_callback: None,
extra_liveness_predicate: None,
}
}
}