1extern crate rand;
2extern crate redis;
3
4mod cmd;
5mod crc16;
6mod slots;
7
8pub use cmd::{ClusterCmd, slot_for_packed_command};
9use slots::get_slots;
10use std::collections::HashMap;
11use rand::{thread_rng, sample};
12use redis::{Connection, RedisResult, FromRedisValue, Client, ConnectionLike, Commands, Value, Cmd,
13 ErrorKind};
14
15const TTL: usize = 16;
16
17fn connect(info: &str) -> Connection {
18 let client = Client::open(info).unwrap();
19 client.get_connection().unwrap()
20}
21
22fn check_connection(conn: &Connection) -> bool {
23 let mut cmd = Cmd::new();
24 cmd.arg("PING");
25 match cmd.query::<String>(conn) {
26 Ok(_) => true,
27 Err(_) => false,
28 }
29}
30
31pub struct Cluster {
32 startup_nodes: Vec<String>,
33 conns: HashMap<String, Connection>,
34 slots: HashMap<u16, String>,
35 needs_refresh: bool,
36}
37
38impl Cluster {
39 pub fn new(startup_nodes: Vec<&str>) -> Cluster {
40 let mut conns = HashMap::new();
41 let nodes = startup_nodes.iter().map(|s| s.to_string()).collect();
42 for info in startup_nodes {
43 let conn = connect(info);
44 conns.insert(info.to_string(), conn);
45 }
46
47 let mut clus = Cluster {
48 conns: conns,
49 slots: HashMap::new(),
50 needs_refresh: false,
51 startup_nodes: nodes,
52 };
53 clus.refresh_slots();
54 clus
55 }
56
57 fn refresh_slots(&mut self) {
59 for conn in self.conns.values() {
60 for slot_data in get_slots(&conn) {
61 for (slot, addr) in slot_data.nodes() {
62 self.slots.insert(slot, addr);
63 }
64 }
65 break;
67 }
68 self.refresh_conns();
69 self.needs_refresh = false;
70 }
71
72 fn refresh_conns(&mut self) {
74 for addr in self.slots.values() {
75 if self.conns.contains_key(addr) {
76 let ok = {
77 let conn = self.conns.get(addr).unwrap();
78 check_connection(conn)
79 };
80 if !ok {
81 self.conns.remove(addr);
82 }
83 } else {
84 let conn = connect(addr);
85 self.conns.insert(addr.to_string(), conn);
86 }
87 }
88 }
89
90 fn get_connection_by_slot(&self, slot: u16) -> Option<&Connection> {
91 let addr = self.slots.get(&slot).map_or(None, |e| Some(e.clone()));
92 match addr {
93 Some(ref addr) => {
94 if self.conns.contains_key(addr) {
95 Some(self.conns.get(addr).unwrap())
96 } else {
97 None
98 }
99 }
100
101 None => Some(self.get_random_connection()),
103 }
104 }
105
106 fn get_or_create_connection_by_slot(&mut self, slot: u16) -> &Connection {
107 let addr = self.slots.get(&slot).map_or(None, |e| Some(e.clone()));
108 match addr {
109 Some(ref addr) => {
110 if self.conns.contains_key(addr) {
111 self.conns.get(addr).unwrap()
112 } else {
113 let conn = connect(addr);
115 self.conns.insert(addr.to_string(), conn);
116 self.conns.get(addr).unwrap()
117 }
118 }
119
120 None => self.get_random_connection(),
122 }
123 }
124
125 fn get_random_connection(&self) -> &Connection {
126 let mut rng = thread_rng();
127 sample(&mut rng, self.conns.values(), 1).first().unwrap()
128 }
129
130 pub fn send_command<T: FromRedisValue>(&mut self, cmd: &ClusterCmd) -> RedisResult<T> {
131 if self.needs_refresh {
132 self.refresh_slots();
133 }
134 let mut try_random_node = false;
135 for _ in 0..TTL {
136 let slot = match cmd.slot() {
137 Some(slot) => slot,
138 None => panic!("No way to dispatch this command to Redis Cluster"),
139 };
140 let res = {
141 let conn = if try_random_node {
142 try_random_node = false;
143 self.get_random_connection()
144 } else {
145 self.get_or_create_connection_by_slot(slot)
146 };
147 cmd.query(conn)
148 };
149 match res {
150 Ok(res) => return Ok(res),
151 Err(err) => {
152 if err.kind() == ErrorKind::ExtensionError &&
153 err.extension_error_code().unwrap() == "MOVED" {
154 self.needs_refresh = true;
155 }
156 try_random_node = true;
157 }
158 }
159 }
160 panic!("Too many redirections");
161 }
162}
163
164impl ConnectionLike for Cluster {
165 fn req_packed_command(&self, cmd: &[u8]) -> RedisResult<Value> {
166 let slot = slot_for_packed_command(cmd).unwrap();
168 let conn = self.get_connection_by_slot(slot).unwrap();
169 conn.req_packed_command(cmd)
170 }
171
172 fn req_packed_commands(&self,
173 cmd: &[u8],
174 offset: usize,
175 count: usize)
176 -> RedisResult<Vec<Value>> {
177 let slot = slot_for_packed_command(cmd).unwrap();
179 let conn = self.get_connection_by_slot(slot).unwrap();
180 conn.req_packed_commands(cmd, offset, count)
181 }
182
183 fn get_db(&self) -> i64 {
184 0
185 }
186}
187
188impl Commands for Cluster {}
189
190impl Clone for Cluster {
191 fn clone(&self) -> Cluster {
192 let startup_nodes = self.startup_nodes.iter().map(|s| s.as_ref()).collect();
193 Cluster::new(startup_nodes)
194 }
195}