redis_client/
redis.rs

1extern crate rand;
2
3use commands::RedisCommand;
4use errors::RedisError;
5use reader::Reader;
6use results::RedisResult;
7use self::rand::Rng;
8use std::collections::HashMap;
9use std::fmt;
10use std::io::BufReader;
11use std::io::prelude::*;
12use std::net::TcpStream;
13use std::sync::mpsc::*;
14use std::time::Duration;
15use std::thread;
16use std::u32;
17use types::{PubSubType, SenderType};
18
19pub struct RedisClient {
20    port: &'static str,
21    host: &'static str,
22    buffer: BufReader<TcpStream>,
23}
24
25pub struct RedisClientAsync {
26    port: &'static str,
27    host: &'static str,
28    sender: Sender<(SenderType, u32, Vec<u8>)>,
29    callbacks: HashMap<u32, Box<Fn(Result<RedisResult, RedisError>)>>,
30    receiver: Receiver<(u32, Result<RedisResult, RedisError>)>,
31    pipe_callbacks: HashMap<u32, Box<Fn(Result<Vec<RedisResult>, RedisError>)>>,
32    pipe_receiver: Receiver<(u32, Result<Vec<RedisResult>, RedisError>)>
33}
34
35pub struct PubSubArg {
36    pub pubsub_type: PubSubType,
37    pub callback: Option<Box<Fn(RedisResult)>>
38}
39
40pub struct PubSubClientAsync {
41    port: &'static str,
42    host: &'static str,
43    cmd_sender: Sender<(PubSubType, u32, Vec<u8>)>,
44    receiver: Receiver<(u32, Result<RedisResult, RedisError>)>,
45    cmd_callbacks: HashMap<u32, Box<Fn(Result<RedisResult, RedisError>)>>,
46    channel_callbacks: HashMap<String, Box<Fn(RedisResult)>>,
47    pattern_callbacks: HashMap<String, Box<Fn(RedisResult)>>
48}
49
50/// A RedisClient is a structure to send command to redis and receive the response.
51/// All RedisClient's methods are performed synchronously.
52/// 
53/// When creating a RedisClient it will automatically create a connection. Therefore when
54/// it is created it uses the host and the port.
55///
56/// Example:
57///
58/// ```
59/// # fn function() -> Result<(), redis_client::errors::RedisError> {
60/// let mut client = try!(redis_client::RedisClient::new("127.0.0.1", "6379"));
61/// # Ok(())}
62/// ```
63impl RedisClient {
64    pub fn new(host: &'static str, port: &'static str) -> Result<RedisClient, RedisError> {
65        TcpStream::connect(&*format!("{}:{}", host, port))
66            .map(|tcp_stream| {
67                    // TODO better timeout init
68                    let _res_write = tcp_stream.set_write_timeout(Some(Duration::new(5, 0)));
69                    let _res_read = tcp_stream.set_read_timeout(Some(Duration::new(1, 0)));
70                    RedisClient {
71                        port: port,
72                        host: host,
73                        buffer: BufReader::new(tcp_stream),
74                }
75            })
76            .map_err(|err| RedisError::Io(err))
77    }
78
79    /// write a command to the stream
80    fn write_command(&mut self, buf_to_send: &[u8]) -> Result<usize, RedisError> {
81        let mut writer = self.buffer.get_mut() as &mut Write;
82        let size = try!(writer.write(buf_to_send));
83        Ok(size)
84    }
85
86    /// Execute a command received as an array of bytes
87    fn exec_command(&mut self, buf_to_send: &[u8]) -> Result<RedisResult, RedisError> {
88        try!(self.write_command(buf_to_send));
89        
90        Reader::read(&mut self.buffer)
91    }
92
93    /// Execute a pipeline command received as an array of bytes
94    fn exec_pipeline_command(&mut self, buf_to_send: &[u8], cmd_nb: usize) -> Result<Vec<RedisResult>, RedisError> {
95        try!(self.write_command(buf_to_send));
96
97        Reader::read_pipeline(&mut self.buffer, cmd_nb)
98    }
99
100    /// Execute a RedisCommand
101    pub fn exec_redis_command(&mut self, redis_command: &mut RedisCommand) -> Result<RedisResult, RedisError> {
102        self.exec_command(redis_command.into())
103    }
104
105    /// Execute a pipeline of RedisCommand
106    pub fn exec_redis_pipeline_command(&mut self, redis_command: &mut RedisCommand) -> Result<Vec<RedisResult>, RedisError> {
107        let cmd_nb: usize;
108        {
109            cmd_nb = redis_command.get_command_nb();
110        }
111        self.exec_pipeline_command(redis_command.into(), cmd_nb)
112    }
113
114}
115
116impl fmt::Debug for RedisClient {
117    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118        write!(f, "Redis Client - HOST = {} : PORT + {}", self.host, self.port)
119    }
120}
121
122impl fmt::Display for RedisClient {
123    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124        write!(f, "Redis Client - HOST = {} : PORT + {}", self.host, self.port)
125    }
126}
127
128/// A RedisClientAsync is a structure to send command to redis and receive the response asynchronously.
129/// 
130/// When creating a RedisClientAsync it will automatically create a connection. Therefore when
131/// it is created it uses the host and the port.
132///
133/// Example:
134///
135/// ```
136/// # fn function() -> Result<(), redis_client::errors::RedisError> {
137/// let mut client = try!(redis_client::RedisClientAsync::new("127.0.0.1", "6379"));
138/// # Ok(())}
139/// ```
140impl RedisClientAsync {
141    pub fn new(host: &'static str, port: &'static str) -> Result<RedisClientAsync, RedisError> {
142        let (sender_tx, sender_rx) = channel::<(SenderType, u32, Vec<u8>)>();
143        let (init_tx, init_rx) = channel::<Option<RedisError>>();
144        let (receiver_tx, receiver_rx) = channel::<(u32, Result<RedisResult, RedisError>)>();
145        let (pipe_receiver_tx, pipe_receiver_rx) = channel::<(u32, Result<Vec<RedisResult>, RedisError>)>();
146
147        thread::spawn(move || {
148            let _client = RedisClient::new(host, port)
149            .map(|mut redis_client| {
150                init_tx.send(None)
151                .map(|_| {
152                    loop {
153                        match sender_rx.recv() {
154                            Ok(value) => {
155                                match value.0 {
156                                    SenderType::Simple => {
157                                        let _res = receiver_tx.send((value.1, redis_client.exec_command(&value.2[..])));
158                                    },
159                                    SenderType::Pipe(cmd_nb) => {
160                                        let _res = pipe_receiver_tx.send((value.1, redis_client.exec_pipeline_command(&value.2[..], cmd_nb)));
161                                    },
162                                };
163                            },
164                            Err(_) => break,
165                        };
166                    }
167                })
168            })
169            .map_err(|error| {
170                let _res = init_tx.send(Some(error));
171            });
172        });
173
174        match init_rx.recv() {
175            Ok(None) => {
176                Ok(RedisClientAsync {
177                    port: port,
178                    host: host,
179                    sender: sender_tx,
180                    receiver: receiver_rx,
181                    callbacks: HashMap::new(),
182                    pipe_receiver: pipe_receiver_rx,
183                    pipe_callbacks: HashMap::new()
184                })
185            },
186            Ok(Some(err)) =>  Err(err),
187            Err(err) => Err(RedisError::MpscRecv(err)),
188        }
189    }
190
191    /// Execute a redis pipeline command. The callback will be called once the command execution is over and the pump method is called.
192    /// The return value indicates if the command was successfully launched.
193    pub fn exec_redis_pipeline_command_async<F>(&mut self, redis_command: &mut RedisCommand, callback: F) 
194        -> Result<(), RedisError> where F: Fn(Result<Vec<RedisResult>, RedisError>), F: Send + 'static
195    {
196        let mut rng = rand::thread_rng();
197        let key = rng.gen::<u32>();
198        try!(self.sender.send((SenderType::Pipe(redis_command.get_command_nb()), key, redis_command.into())));
199        self.pipe_callbacks.insert(key, Box::new(callback));
200        Ok(())
201    }
202
203    /// Execute a redis command. The callback will be called once the command execution is over and the pump method is called.
204    /// The return value indicates if the command was successfully launched.
205    pub fn exec_redis_command_async<F>(&mut self, redis_command: &mut RedisCommand, callback: F) 
206        -> Result<(), RedisError> where F: Fn(Result<RedisResult, RedisError>), F: Send + 'static
207    {
208        let mut rng = rand::thread_rng();
209        let key = rng.gen::<u32>();
210        try!(self.sender.send((SenderType::Simple, key, redis_command.into())));
211        self.callbacks.insert(key, Box::new(callback));
212        Ok(())
213    }
214
215    /// Pump the result and execute the callbacks with them. If no result are ready this function will return.
216    pub fn pump(&mut self) -> Result<(), RedisError> {
217        loop {
218            match self.receiver.try_recv() {
219                Ok(result) => {
220                    self.callbacks.remove(&result.0) 
221                        .map(|callback| {
222                            if result.1.is_ok() {
223
224                                callback(result.1.clone());
225                            }
226                        });
227                },
228                Err(TryRecvError::Empty) => {
229                    match self.pipe_receiver.try_recv() {
230                        Ok(result) => {
231                            self.pipe_callbacks.remove(&result.0) 
232                                .map(|callback| {
233                                    if result.1.is_ok() {
234
235                                        callback(result.1.clone());
236                                    }
237                                });
238                        },
239                        Err(TryRecvError::Empty) => return Ok(()),
240                        Err(err) => return Err(RedisError::MpscTryRecv(err))
241                    };
242                },
243                Err(err) => return Err(RedisError::MpscTryRecv(err))
244            };
245        }
246    }
247}
248
249impl fmt::Debug for RedisClientAsync {
250    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251        write!(f, "Redis Client Async - HOST = {} : PORT + {}", self.host, self.port)
252    }
253}
254
255impl fmt::Display for RedisClientAsync {
256    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
257        write!(f, "Redis Client Async - HOST = {} : PORT + {}", self.host, self.port)
258    }
259}
260
261
262/// A PubSubClientAsync is a structure to use redis publish/subscribe functionnality.
263/// 
264/// When creating a PubSubClientAsync it will automatically create a connection. Therefore when
265/// it is created it uses the host and the port.
266///
267/// Example:
268///
269/// ```
270/// # fn function() -> Result<(), redis_client::errors::RedisError> {
271/// let mut client = try!(redis_client::PubSubClientAsync::new("127.0.0.1", "6379"));
272/// # Ok(())}
273/// ```
274impl PubSubClientAsync {
275    pub fn new(host: &'static str, port: &'static str) -> Result<PubSubClientAsync, RedisError> {
276        let (init_tx, init_rx) = channel::<Option<RedisError>>();
277        let (sender_tx, sender_rx) = channel::<(PubSubType, u32, Vec<u8>)>();
278        let (receiver_tx, receiver_rx) = channel::<(u32, Result<RedisResult, RedisError>)>();
279
280        thread::spawn(move || {
281            let _client = RedisClient::new(host, port)
282            .map(|mut redis_client| {
283                init_tx.send(None)
284                .map(|_| {
285                    loop {
286                        match sender_rx.try_recv() {
287                            Ok(value) => {
288                                let _res = receiver_tx.send((value.1, redis_client.exec_command(&value.2[..])));
289                            },
290                            Err(_) => {
291                                if let Ok(res) = Reader::read(&mut redis_client.buffer) {
292                                    let _res = receiver_tx.send((0, Ok(res)));
293                                }
294                            }
295                        };
296                    }
297                })
298            })
299            .map_err(|error| {
300                let _res = init_tx.send(Some(error));
301            });
302        });
303
304        match init_rx.recv() {
305            Ok(None) => {
306                Ok(PubSubClientAsync {
307                    port: port,
308                    host: host,
309                    cmd_sender: sender_tx,
310                    receiver: receiver_rx,
311                    cmd_callbacks: HashMap::new(),
312                    channel_callbacks: HashMap::new(),
313                    pattern_callbacks: HashMap::new()
314                })
315            },
316            Ok(Some(err)) =>  Err(err),
317            Err(err) => Err(RedisError::MpscRecv(err)),
318        }
319    }
320
321    /// Execute a redis command. The cmd_callback will be called once the command execution is over and the pump method is called.
322    /// The return value indicates if the command was successfully launched.
323    pub fn exec_redis_command_async<F>(&mut self, redis_command: &mut RedisCommand, cmd_callback: F, pubsub_arg: PubSubArg) 
324        -> Result<(), RedisError> where F: Fn(Result<RedisResult, RedisError>), F: Send + 'static
325    {
326        let mut rng = rand::thread_rng();
327        let key: u32 = rng.gen_range(1, u32::MAX);
328
329        let pubsub_type = pubsub_arg.pubsub_type.clone();
330        try!(self.cmd_sender.send((pubsub_type, key, redis_command.into())));
331        self.cmd_callbacks.insert(key, Box::new(cmd_callback));
332
333        if let Some(callback) = pubsub_arg.callback {
334            if let PubSubType::Channel(value) = pubsub_arg.pubsub_type {
335                self.channel_callbacks.insert(value, callback);
336            }
337            else if let PubSubType::Pattern(value) = pubsub_arg.pubsub_type {
338                self.pattern_callbacks.insert(value, callback);
339            }          
340        }
341
342        Ok(())
343    }
344
345    /// Pump the result and the received value and execute the callbacks with them. If no result or received value are ready this function will return.
346    pub fn pump(&mut self) -> Result<(), RedisError> {
347        loop {
348            match self.receiver.try_recv() {
349                Ok(result) => {
350                    if result.0 == 0 {
351                        if let Ok(res) = result.1 {
352                            let array = res.convert::<Vec<String>>();
353                            if !array.is_empty() {
354                                if array[0] == "message" && array.len() == 3 && self.channel_callbacks.contains_key(&array[1]) {
355                                    self.channel_callbacks[&array[1]](RedisResult::String(array[2].clone()));
356                                } else if array[0] == "pmessage" && array.len() == 4 && self.pattern_callbacks.contains_key(&array[1]) {
357                                    self.pattern_callbacks[&array[1]](RedisResult::String(array[3].clone()));
358                                }
359                            }
360                        } 
361                    } else {
362                        self.cmd_callbacks.remove(&result.0) 
363                            .map(|callback| {
364                                if result.1.is_ok() {
365
366                                    callback(result.1.clone());
367                                }
368                            });
369                    }
370                },
371                Err(TryRecvError::Empty) => return Ok(()),
372                Err(err) => return Err(RedisError::MpscTryRecv(err))
373            };
374        }
375    }
376}
377
378impl fmt::Debug for PubSubClientAsync {
379    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
380        write!(f, "Redis Client Async - HOST = {} : PORT + {}", self.host, self.port)
381    }
382}
383
384impl fmt::Display for PubSubClientAsync {
385    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
386        write!(f, "Redis Client Async - HOST = {} : PORT + {}", self.host, self.port)
387    }
388}