redis_cluster/
lib.rs

1extern crate rand;
2extern crate redis;
3
4mod cmd;
5mod crc16;
6mod slots;
7
8pub use cmd::{ClusterCmd, slot_for_packed_command};
9use slots::get_slots;
10use std::collections::HashMap;
11use rand::{thread_rng, sample};
12use redis::{Connection, RedisResult, FromRedisValue, Client, ConnectionLike, Commands, Value, Cmd,
13            ErrorKind};
14
15const TTL: usize = 16;
16
17fn connect(info: &str) -> Connection {
18    let client = Client::open(info).unwrap();
19    client.get_connection().unwrap()
20}
21
22fn check_connection(conn: &Connection) -> bool {
23    let mut cmd = Cmd::new();
24    cmd.arg("PING");
25    match cmd.query::<String>(conn) {
26        Ok(_) => true,
27        Err(_) => false,
28    }
29}
30
31pub struct Cluster {
32    startup_nodes: Vec<String>,
33    conns: HashMap<String, Connection>,
34    slots: HashMap<u16, String>,
35    needs_refresh: bool,
36}
37
38impl Cluster {
39    pub fn new(startup_nodes: Vec<&str>) -> Cluster {
40        let mut conns = HashMap::new();
41        let nodes = startup_nodes.iter().map(|s| s.to_string()).collect();
42        for info in startup_nodes {
43            let conn = connect(info);
44            conns.insert(info.to_string(), conn);
45        }
46
47        let mut clus = Cluster {
48            conns: conns,
49            slots: HashMap::new(),
50            needs_refresh: false,
51            startup_nodes: nodes,
52        };
53        clus.refresh_slots();
54        clus
55    }
56
57    /// Query a node to discover slot-> master mappings.
58    fn refresh_slots(&mut self) {
59        for conn in self.conns.values() {
60            for slot_data in get_slots(&conn) {
61                for (slot, addr) in slot_data.nodes() {
62                    self.slots.insert(slot, addr);
63                }
64            }
65            // this loop can terminate if the first node replies
66            break;
67        }
68        self.refresh_conns();
69        self.needs_refresh = false;
70    }
71
72    /// Remove dead connections and connect to new nodes if necessary
73    fn refresh_conns(&mut self) {
74        for addr in self.slots.values() {
75            if self.conns.contains_key(addr) {
76                let ok = {
77                    let conn = self.conns.get(addr).unwrap();
78                    check_connection(conn)
79                };
80                if !ok {
81                    self.conns.remove(addr);
82                }
83            } else {
84                let conn = connect(addr);
85                self.conns.insert(addr.to_string(), conn);
86            }
87        }
88    }
89
90    fn get_connection_by_slot(&self, slot: u16) -> Option<&Connection> {
91        let addr = self.slots.get(&slot).map_or(None, |e| Some(e.clone()));
92        match addr {
93            Some(ref addr) => {
94                if self.conns.contains_key(addr) {
95                    Some(self.conns.get(addr).unwrap())
96                } else {
97                    None
98                }
99            }
100
101            // just return a random connection
102            None => Some(self.get_random_connection()),
103        }
104    }
105
106    fn get_or_create_connection_by_slot(&mut self, slot: u16) -> &Connection {
107        let addr = self.slots.get(&slot).map_or(None, |e| Some(e.clone()));
108        match addr {
109            Some(ref addr) => {
110                if self.conns.contains_key(addr) {
111                    self.conns.get(addr).unwrap()
112                } else {
113                    // create the connection
114                    let conn = connect(addr);
115                    self.conns.insert(addr.to_string(), conn);
116                    self.conns.get(addr).unwrap()
117                }
118            }
119
120            // just return a random connection
121            None => self.get_random_connection(),
122        }
123    }
124
125    fn get_random_connection(&self) -> &Connection {
126        let mut rng = thread_rng();
127        sample(&mut rng, self.conns.values(), 1).first().unwrap()
128    }
129
130    pub fn send_command<T: FromRedisValue>(&mut self, cmd: &ClusterCmd) -> RedisResult<T> {
131        if self.needs_refresh {
132            self.refresh_slots();
133        }
134        let mut try_random_node = false;
135        for _ in 0..TTL {
136            let slot = match cmd.slot() {
137                Some(slot) => slot,
138                None => panic!("No way to dispatch this command to Redis Cluster"),
139            };
140            let res = {
141                let conn = if try_random_node {
142                    try_random_node = false;
143                    self.get_random_connection()
144                } else {
145                    self.get_or_create_connection_by_slot(slot)
146                };
147                cmd.query(conn)
148            };
149            match res {
150                Ok(res) => return Ok(res),
151                Err(err) => {
152                    if err.kind() == ErrorKind::ExtensionError &&
153                       err.extension_error_code().unwrap() == "MOVED" {
154                        self.needs_refresh = true;
155                    }
156                    try_random_node = true;
157                }
158            }
159        }
160        panic!("Too many redirections");
161    }
162}
163
164impl ConnectionLike for Cluster {
165    fn req_packed_command(&self, cmd: &[u8]) -> RedisResult<Value> {
166        // TODO we dont have mutable access to self so we can't get_or_create_connection_by_slot...
167        let slot = slot_for_packed_command(cmd).unwrap();
168        let conn = self.get_connection_by_slot(slot).unwrap();
169        conn.req_packed_command(cmd)
170    }
171
172    fn req_packed_commands(&self,
173                           cmd: &[u8],
174                           offset: usize,
175                           count: usize)
176                           -> RedisResult<Vec<Value>> {
177        // TODO we dont have mutable access to self so we can't get_or_create_connection_by_slot...
178        let slot = slot_for_packed_command(cmd).unwrap();
179        let conn = self.get_connection_by_slot(slot).unwrap();
180        conn.req_packed_commands(cmd, offset, count)
181    }
182
183    fn get_db(&self) -> i64 {
184        0
185    }
186}
187
188impl Commands for Cluster {}
189
190impl Clone for Cluster {
191    fn clone(&self) -> Cluster {
192        let startup_nodes = self.startup_nodes.iter().map(|s| s.as_ref()).collect();
193        Cluster::new(startup_nodes)
194    }
195}