1extern crate rand;
2
3use commands::RedisCommand;
4use errors::RedisError;
5use reader::Reader;
6use results::RedisResult;
7use self::rand::Rng;
8use std::collections::HashMap;
9use std::fmt;
10use std::io::BufReader;
11use std::io::prelude::*;
12use std::net::TcpStream;
13use std::sync::mpsc::*;
14use std::time::Duration;
15use std::thread;
16use std::u32;
17use types::{PubSubType, SenderType};
18
19pub struct RedisClient {
20 port: &'static str,
21 host: &'static str,
22 buffer: BufReader<TcpStream>,
23}
24
25pub struct RedisClientAsync {
26 port: &'static str,
27 host: &'static str,
28 sender: Sender<(SenderType, u32, Vec<u8>)>,
29 callbacks: HashMap<u32, Box<Fn(Result<RedisResult, RedisError>)>>,
30 receiver: Receiver<(u32, Result<RedisResult, RedisError>)>,
31 pipe_callbacks: HashMap<u32, Box<Fn(Result<Vec<RedisResult>, RedisError>)>>,
32 pipe_receiver: Receiver<(u32, Result<Vec<RedisResult>, RedisError>)>
33}
34
35pub struct PubSubArg {
36 pub pubsub_type: PubSubType,
37 pub callback: Option<Box<Fn(RedisResult)>>
38}
39
40pub struct PubSubClientAsync {
41 port: &'static str,
42 host: &'static str,
43 cmd_sender: Sender<(PubSubType, u32, Vec<u8>)>,
44 receiver: Receiver<(u32, Result<RedisResult, RedisError>)>,
45 cmd_callbacks: HashMap<u32, Box<Fn(Result<RedisResult, RedisError>)>>,
46 channel_callbacks: HashMap<String, Box<Fn(RedisResult)>>,
47 pattern_callbacks: HashMap<String, Box<Fn(RedisResult)>>
48}
49
50impl RedisClient {
64 pub fn new(host: &'static str, port: &'static str) -> Result<RedisClient, RedisError> {
65 TcpStream::connect(&*format!("{}:{}", host, port))
66 .map(|tcp_stream| {
67 let _res_write = tcp_stream.set_write_timeout(Some(Duration::new(5, 0)));
69 let _res_read = tcp_stream.set_read_timeout(Some(Duration::new(1, 0)));
70 RedisClient {
71 port: port,
72 host: host,
73 buffer: BufReader::new(tcp_stream),
74 }
75 })
76 .map_err(|err| RedisError::Io(err))
77 }
78
79 fn write_command(&mut self, buf_to_send: &[u8]) -> Result<usize, RedisError> {
81 let mut writer = self.buffer.get_mut() as &mut Write;
82 let size = try!(writer.write(buf_to_send));
83 Ok(size)
84 }
85
86 fn exec_command(&mut self, buf_to_send: &[u8]) -> Result<RedisResult, RedisError> {
88 try!(self.write_command(buf_to_send));
89
90 Reader::read(&mut self.buffer)
91 }
92
93 fn exec_pipeline_command(&mut self, buf_to_send: &[u8], cmd_nb: usize) -> Result<Vec<RedisResult>, RedisError> {
95 try!(self.write_command(buf_to_send));
96
97 Reader::read_pipeline(&mut self.buffer, cmd_nb)
98 }
99
100 pub fn exec_redis_command(&mut self, redis_command: &mut RedisCommand) -> Result<RedisResult, RedisError> {
102 self.exec_command(redis_command.into())
103 }
104
105 pub fn exec_redis_pipeline_command(&mut self, redis_command: &mut RedisCommand) -> Result<Vec<RedisResult>, RedisError> {
107 let cmd_nb: usize;
108 {
109 cmd_nb = redis_command.get_command_nb();
110 }
111 self.exec_pipeline_command(redis_command.into(), cmd_nb)
112 }
113
114}
115
116impl fmt::Debug for RedisClient {
117 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118 write!(f, "Redis Client - HOST = {} : PORT + {}", self.host, self.port)
119 }
120}
121
122impl fmt::Display for RedisClient {
123 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124 write!(f, "Redis Client - HOST = {} : PORT + {}", self.host, self.port)
125 }
126}
127
128impl RedisClientAsync {
141 pub fn new(host: &'static str, port: &'static str) -> Result<RedisClientAsync, RedisError> {
142 let (sender_tx, sender_rx) = channel::<(SenderType, u32, Vec<u8>)>();
143 let (init_tx, init_rx) = channel::<Option<RedisError>>();
144 let (receiver_tx, receiver_rx) = channel::<(u32, Result<RedisResult, RedisError>)>();
145 let (pipe_receiver_tx, pipe_receiver_rx) = channel::<(u32, Result<Vec<RedisResult>, RedisError>)>();
146
147 thread::spawn(move || {
148 let _client = RedisClient::new(host, port)
149 .map(|mut redis_client| {
150 init_tx.send(None)
151 .map(|_| {
152 loop {
153 match sender_rx.recv() {
154 Ok(value) => {
155 match value.0 {
156 SenderType::Simple => {
157 let _res = receiver_tx.send((value.1, redis_client.exec_command(&value.2[..])));
158 },
159 SenderType::Pipe(cmd_nb) => {
160 let _res = pipe_receiver_tx.send((value.1, redis_client.exec_pipeline_command(&value.2[..], cmd_nb)));
161 },
162 };
163 },
164 Err(_) => break,
165 };
166 }
167 })
168 })
169 .map_err(|error| {
170 let _res = init_tx.send(Some(error));
171 });
172 });
173
174 match init_rx.recv() {
175 Ok(None) => {
176 Ok(RedisClientAsync {
177 port: port,
178 host: host,
179 sender: sender_tx,
180 receiver: receiver_rx,
181 callbacks: HashMap::new(),
182 pipe_receiver: pipe_receiver_rx,
183 pipe_callbacks: HashMap::new()
184 })
185 },
186 Ok(Some(err)) => Err(err),
187 Err(err) => Err(RedisError::MpscRecv(err)),
188 }
189 }
190
191 pub fn exec_redis_pipeline_command_async<F>(&mut self, redis_command: &mut RedisCommand, callback: F)
194 -> Result<(), RedisError> where F: Fn(Result<Vec<RedisResult>, RedisError>), F: Send + 'static
195 {
196 let mut rng = rand::thread_rng();
197 let key = rng.gen::<u32>();
198 try!(self.sender.send((SenderType::Pipe(redis_command.get_command_nb()), key, redis_command.into())));
199 self.pipe_callbacks.insert(key, Box::new(callback));
200 Ok(())
201 }
202
203 pub fn exec_redis_command_async<F>(&mut self, redis_command: &mut RedisCommand, callback: F)
206 -> Result<(), RedisError> where F: Fn(Result<RedisResult, RedisError>), F: Send + 'static
207 {
208 let mut rng = rand::thread_rng();
209 let key = rng.gen::<u32>();
210 try!(self.sender.send((SenderType::Simple, key, redis_command.into())));
211 self.callbacks.insert(key, Box::new(callback));
212 Ok(())
213 }
214
215 pub fn pump(&mut self) -> Result<(), RedisError> {
217 loop {
218 match self.receiver.try_recv() {
219 Ok(result) => {
220 self.callbacks.remove(&result.0)
221 .map(|callback| {
222 if result.1.is_ok() {
223
224 callback(result.1.clone());
225 }
226 });
227 },
228 Err(TryRecvError::Empty) => {
229 match self.pipe_receiver.try_recv() {
230 Ok(result) => {
231 self.pipe_callbacks.remove(&result.0)
232 .map(|callback| {
233 if result.1.is_ok() {
234
235 callback(result.1.clone());
236 }
237 });
238 },
239 Err(TryRecvError::Empty) => return Ok(()),
240 Err(err) => return Err(RedisError::MpscTryRecv(err))
241 };
242 },
243 Err(err) => return Err(RedisError::MpscTryRecv(err))
244 };
245 }
246 }
247}
248
249impl fmt::Debug for RedisClientAsync {
250 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251 write!(f, "Redis Client Async - HOST = {} : PORT + {}", self.host, self.port)
252 }
253}
254
255impl fmt::Display for RedisClientAsync {
256 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
257 write!(f, "Redis Client Async - HOST = {} : PORT + {}", self.host, self.port)
258 }
259}
260
261
262impl PubSubClientAsync {
275 pub fn new(host: &'static str, port: &'static str) -> Result<PubSubClientAsync, RedisError> {
276 let (init_tx, init_rx) = channel::<Option<RedisError>>();
277 let (sender_tx, sender_rx) = channel::<(PubSubType, u32, Vec<u8>)>();
278 let (receiver_tx, receiver_rx) = channel::<(u32, Result<RedisResult, RedisError>)>();
279
280 thread::spawn(move || {
281 let _client = RedisClient::new(host, port)
282 .map(|mut redis_client| {
283 init_tx.send(None)
284 .map(|_| {
285 loop {
286 match sender_rx.try_recv() {
287 Ok(value) => {
288 let _res = receiver_tx.send((value.1, redis_client.exec_command(&value.2[..])));
289 },
290 Err(_) => {
291 if let Ok(res) = Reader::read(&mut redis_client.buffer) {
292 let _res = receiver_tx.send((0, Ok(res)));
293 }
294 }
295 };
296 }
297 })
298 })
299 .map_err(|error| {
300 let _res = init_tx.send(Some(error));
301 });
302 });
303
304 match init_rx.recv() {
305 Ok(None) => {
306 Ok(PubSubClientAsync {
307 port: port,
308 host: host,
309 cmd_sender: sender_tx,
310 receiver: receiver_rx,
311 cmd_callbacks: HashMap::new(),
312 channel_callbacks: HashMap::new(),
313 pattern_callbacks: HashMap::new()
314 })
315 },
316 Ok(Some(err)) => Err(err),
317 Err(err) => Err(RedisError::MpscRecv(err)),
318 }
319 }
320
321 pub fn exec_redis_command_async<F>(&mut self, redis_command: &mut RedisCommand, cmd_callback: F, pubsub_arg: PubSubArg)
324 -> Result<(), RedisError> where F: Fn(Result<RedisResult, RedisError>), F: Send + 'static
325 {
326 let mut rng = rand::thread_rng();
327 let key: u32 = rng.gen_range(1, u32::MAX);
328
329 let pubsub_type = pubsub_arg.pubsub_type.clone();
330 try!(self.cmd_sender.send((pubsub_type, key, redis_command.into())));
331 self.cmd_callbacks.insert(key, Box::new(cmd_callback));
332
333 if let Some(callback) = pubsub_arg.callback {
334 if let PubSubType::Channel(value) = pubsub_arg.pubsub_type {
335 self.channel_callbacks.insert(value, callback);
336 }
337 else if let PubSubType::Pattern(value) = pubsub_arg.pubsub_type {
338 self.pattern_callbacks.insert(value, callback);
339 }
340 }
341
342 Ok(())
343 }
344
345 pub fn pump(&mut self) -> Result<(), RedisError> {
347 loop {
348 match self.receiver.try_recv() {
349 Ok(result) => {
350 if result.0 == 0 {
351 if let Ok(res) = result.1 {
352 let array = res.convert::<Vec<String>>();
353 if !array.is_empty() {
354 if array[0] == "message" && array.len() == 3 && self.channel_callbacks.contains_key(&array[1]) {
355 self.channel_callbacks[&array[1]](RedisResult::String(array[2].clone()));
356 } else if array[0] == "pmessage" && array.len() == 4 && self.pattern_callbacks.contains_key(&array[1]) {
357 self.pattern_callbacks[&array[1]](RedisResult::String(array[3].clone()));
358 }
359 }
360 }
361 } else {
362 self.cmd_callbacks.remove(&result.0)
363 .map(|callback| {
364 if result.1.is_ok() {
365
366 callback(result.1.clone());
367 }
368 });
369 }
370 },
371 Err(TryRecvError::Empty) => return Ok(()),
372 Err(err) => return Err(RedisError::MpscTryRecv(err))
373 };
374 }
375 }
376}
377
378impl fmt::Debug for PubSubClientAsync {
379 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
380 write!(f, "Redis Client Async - HOST = {} : PORT + {}", self.host, self.port)
381 }
382}
383
384impl fmt::Display for PubSubClientAsync {
385 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
386 write!(f, "Redis Client Async - HOST = {} : PORT + {}", self.host, self.port)
387 }
388}