1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! This module provides the functionality to refresh and calculate the cluster topology for Redis Cluster.
use crate::cluster::get_connection_addr;
use crate::cluster_routing::Slot;
use crate::{cluster::TlsMode, RedisResult, Value};
// Parse slot data from raw redis value.
pub(crate) fn parse_slots(
raw_slot_resp: Value,
tls: Option<TlsMode>,
// The DNS address of the node from which `raw_slot_resp` was received.
addr_of_answering_node: &str,
) -> RedisResult<Vec<Slot>> {
// Parse response.
let mut slots = Vec::with_capacity(2);
if let Value::Array(items) = raw_slot_resp {
let mut iter = items.into_iter();
while let Some(Value::Array(item)) = iter.next() {
if item.len() < 3 {
continue;
}
let start = if let Value::Int(start) = item[0] {
start as u16
} else {
continue;
};
let end = if let Value::Int(end) = item[1] {
end as u16
} else {
continue;
};
let mut nodes: Vec<String> = item
.into_iter()
.skip(2)
.filter_map(|node| {
if let Value::Array(node) = node {
if node.len() < 2 {
return None;
}
// According to the CLUSTER SLOTS documentation:
// If the received hostname is an empty string or NULL, clients should utilize the hostname of the responding node.
// However, if the received hostname is "?", it should be regarded as an indication of an unknown node.
let hostname = if let Value::BulkString(ref ip) = node[0] {
let hostname = String::from_utf8_lossy(ip);
if hostname.is_empty() {
addr_of_answering_node.into()
} else if hostname == "?" {
return None;
} else {
hostname
}
} else if let Value::Nil = node[0] {
addr_of_answering_node.into()
} else {
return None;
};
if hostname.is_empty() {
return None;
}
let port = if let Value::Int(port) = node[1] {
port as u16
} else {
return None;
};
Some(
get_connection_addr(hostname.into_owned(), port, tls, None).to_string(),
)
} else {
None
}
})
.collect();
if nodes.is_empty() {
continue;
}
let replicas = nodes.split_off(1);
slots.push(Slot::new(start, end, nodes.pop().unwrap(), replicas));
}
}
Ok(slots)
}
#[cfg(test)]
mod tests {
use super::*;
fn slot_value_with_replicas(start: u16, end: u16, nodes: Vec<(&str, u16)>) -> Value {
let mut node_values: Vec<Value> = nodes
.iter()
.map(|(host, port)| {
Value::Array(vec![
Value::BulkString(host.as_bytes().to_vec()),
Value::Int(*port as i64),
])
})
.collect();
let mut slot_vec = vec![Value::Int(start as i64), Value::Int(end as i64)];
slot_vec.append(&mut node_values);
Value::Array(slot_vec)
}
fn slot_value(start: u16, end: u16, node: &str, port: u16) -> Value {
slot_value_with_replicas(start, end, vec![(node, port)])
}
#[test]
fn parse_slots_returns_slots_with_host_name_if_missing() {
let view = Value::Array(vec![slot_value(0, 4000, "", 6379)]);
let slots = parse_slots(view, None, "node").unwrap();
assert_eq!(slots[0].master, "node:6379");
}
}