1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
use crate::transforms::cassandra::sink_cluster::CassandraNode;
use cassandra_protocol::token::Murmur3Token;
use uuid::Uuid;
use super::node_pool::{KeyspaceMetadata, ReplicationStrategy};
#[derive(Debug, Clone)]
pub struct TokenRing {
ring_in: Vec<Murmur3Token>,
ring_out: Vec<Uuid>,
}
impl TokenRing {
pub fn new(nodes: &[CassandraNode]) -> Self {
let mut ring: Vec<_> = nodes
.iter()
.flat_map(|node| node.tokens.iter().map(|token| (*token, node.host_id)))
.collect();
ring.sort_by(|a, b| a.0.cmp(&b.0));
// Split ring into ring_in and ring_out as its faster to search and retrive seperately
let ring_in: Vec<_> = ring.iter().map(|node| node.0).collect();
let ring_out: Vec<_> = ring.iter().map(|node| node.1).collect();
TokenRing { ring_in, ring_out }
}
/// Provides an iterator over the ring members starting at the given token.
/// The iterator traverses the whole ring in the direction of increasing tokens.
/// After reaching the maximum token it wraps around and continues from the lowest one.
/// The iterator visits each member once, it doesn't have infinite length.
pub fn ring_range(&self, token: Murmur3Token) -> impl Iterator<Item = Uuid> + '_ {
let binary_search_index: usize = match self.ring_in.binary_search_by(|e| e.cmp(&token)) {
Ok(exact_match_index) => exact_match_index,
Err(first_greater_index) => first_greater_index,
};
self.ring_out
.iter()
.skip(binary_search_index)
.chain(self.ring_out.iter())
.copied()
.take(self.ring_out.len())
}
/// Walk the token ring to figure out which nodes are acting as replicas for the given query.
/// The way we do this depends on the replication strategy used:
/// * ReplicationStrategy::SimpleStrategy - Each cassandra node has many tokens assigned to it to form the token ring.
/// Take the hashed key (token) and walk along the token ring starting at the token in the ring after the token from our key.
/// Return the nodes belonging to the first replication_factor number of tokens encountered.
/// * ReplicationStrategy::NetworkTopologyStrategy - The same as the simple strategy but also:
/// When we walk the token ring we need to skip tokens belonging to nodes in a rack we have already encountered in this walk.
///
/// For more info: https://docs.datastax.com/en/cassandra-oss/3.0/cassandra/architecture/archDataDistributeReplication.html
pub fn iter_replica_nodes<'a>(
&'a self,
nodes: &'a [CassandraNode],
token_from_key: Murmur3Token,
keyspace: &'a KeyspaceMetadata,
) -> impl Iterator<Item = Uuid> + 'a {
let mut racks_used = vec![];
self.ring_range(token_from_key)
.filter(move |host_id| {
if let ReplicationStrategy::NetworkTopologyStrategy = keyspace.replication_strategy
{
let rack = &nodes.iter().find(|x| x.host_id == *host_id).unwrap().rack;
if racks_used.contains(&rack) {
false
} else {
racks_used.push(rack);
true
}
} else {
true
}
})
.take(keyspace.replication_factor)
}
}
#[cfg(test)]
mod test_token_map {
use super::*;
use hex_literal::hex;
use itertools::Itertools;
use pretty_assertions::assert_eq;
use uuid::Uuid;
static NODE_1: Uuid = Uuid::from_bytes(hex!("2DD022D62937475489D602D2933A8F71"));
static NODE_2: Uuid = Uuid::from_bytes(hex!("2DD022D62937475489D602D2933A8F72"));
static NODE_3: Uuid = Uuid::from_bytes(hex!("2DD022D62937475489D602D2933A8F73"));
fn prepare_nodes() -> Vec<CassandraNode> {
vec![
CassandraNode::new(
"127.0.0.1:9042".parse().unwrap(),
"rack1".into(),
vec![
Murmur3Token::new(-2),
Murmur3Token::new(-1),
Murmur3Token::new(0),
],
NODE_1,
),
CassandraNode::new(
"127.0.0.1:9043".parse().unwrap(),
"rack1".into(),
vec![Murmur3Token::new(20)],
NODE_2,
),
CassandraNode::new(
"127.0.0.1:9044".parse().unwrap(),
"rack1".into(),
vec![
Murmur3Token::new(2),
Murmur3Token::new(1),
Murmur3Token::new(10),
],
NODE_3,
),
]
}
#[test]
fn should_return_replicas_in_order() {
verify_tokens(
&[NODE_1, NODE_3, NODE_3, NODE_3, NODE_2],
Murmur3Token::new(0),
);
}
#[test]
fn should_return_replicas_in_order_for_non_primary_token() {
verify_tokens(&[NODE_3, NODE_2], Murmur3Token::new(3));
}
#[test]
fn should_return_replicas_in_a_ring() {
verify_tokens(
&[NODE_2, NODE_1, NODE_1, NODE_1, NODE_3],
Murmur3Token::new(20),
);
}
fn verify_tokens(node_host_ids: &[Uuid], token: Murmur3Token) {
let token_map = TokenRing::new(prepare_nodes().as_slice());
let nodes = token_map
.iter_replica_nodes(
&[
CassandraNode::new(
"127.0.0.1:9042".parse().unwrap(),
"rack1".to_owned(),
vec![],
NODE_1,
),
CassandraNode::new(
"127.0.0.2:9042".parse().unwrap(),
"rack1".to_owned(),
vec![],
NODE_2,
),
CassandraNode::new(
"127.0.0.3:9042".parse().unwrap(),
"rack1".to_owned(),
vec![],
NODE_3,
),
],
token,
&KeyspaceMetadata {
replication_factor: node_host_ids.len(),
replication_strategy: ReplicationStrategy::SimpleStrategy,
},
)
.collect_vec();
assert_eq!(nodes, node_host_ids);
}
}