td_rredis/
cmd.rs

1use types::{ToRedisArgs, FromRedisValue, Value, RedisResult, ErrorKind, from_redis_value};
2use connection::ConnectionLike;
3use cluster::Cluster;
4use slot::key_hash_slot;
5
6#[derive(Clone)]
7enum Arg {
8    Simple(Vec<u8>),
9    Cursor,
10    Borrowed(Vec<u8>),
11}
12
13
14/// Represents redis commands.
15#[derive(Clone)]
16pub struct Cmd {
17    args: Vec<Arg>,
18    cursor: Option<u64>,
19    is_ignored: bool,
20}
21
22/// Represents a redis command pipeline.
23pub struct Pipeline {
24    commands: Vec<Cmd>,
25    transaction_mode: bool,
26}
27
28/// Represents a redis iterator.
29pub struct Iter<'a, T: FromRedisValue> {
30    batch: Vec<T>,
31    cursor: u64,
32    con: &'a (ConnectionLike + 'a),
33    cmd: Cmd,
34}
35
36impl<'a, T: FromRedisValue> Iterator for Iter<'a, T> {
37    type Item = T;
38
39    #[inline]
40    fn next(&mut self) -> Option<T> {
41        // we need to do this in a loop until we produce at least one item
42        // or we find the actual end of the iteration.  This is necessary
43        // because with filtering an iterator it is possible that a whole
44        // chunk is not matching the pattern and thus yielding empty results.
45        loop {
46            match self.batch.pop() {
47                Some(v) => {
48                    return Some(v);
49                }
50                None => {}
51            };
52            if self.cursor == 0 {
53                return None;
54            }
55
56            let pcmd = unwrap_or!(self.cmd.get_packed_command_with_cursor(self.cursor),
57                                  return None);
58            let rv = unwrap_or!(self.con
59                                    .req_packed_command(&pcmd)
60                                    .ok(),
61                                return None);
62            let (cur, mut batch): (u64, Vec<T>) = unwrap_or!(from_redis_value(&rv).ok(),
63                                                             return None);
64            batch.reverse();
65
66            self.cursor = cur;
67            self.batch = batch;
68        }
69    }
70}
71
72fn encode_command(args: &Vec<Arg>, cursor: u64) -> Vec<u8> {
73    let mut cmd = vec![];
74    cmd.extend(format!("*{}\r\n", args.len()).as_bytes().iter().cloned());
75
76    {
77        let mut encode = |item: &[u8]| {
78            cmd.extend(format!("${}\r\n", item.len()).as_bytes().iter().cloned());
79            cmd.extend(item.iter().cloned());
80            cmd.extend(b"\r\n".iter().cloned());
81        };
82
83        for item in args.iter() {
84            match *item {
85                Arg::Cursor => encode(cursor.to_string().as_bytes()),
86                Arg::Simple(ref val) => encode(val),
87                Arg::Borrowed(ref ptr) => encode(ptr),
88            }
89        }
90    }
91    cmd
92}
93
94fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
95    let mut rv = vec![];
96    if atomic {
97        rv.extend(cmd("MULTI").get_packed_command().into_iter());
98    }
99    for cmd in cmds.iter() {
100        rv.extend(cmd.get_packed_command().into_iter());
101    }
102    if atomic {
103        rv.extend(cmd("EXEC").get_packed_command().into_iter());
104    }
105    rv
106}
107
108/// A command acts as a builder interface to creating encoded redis
109/// requests.  This allows you to easiy assemble a packed command
110/// by chaining arguments together.
111///
112/// Basic example:
113///
114/// ```rust
115/// td_rredis::Cmd::new().arg("SET").arg("my_key").arg(42);
116/// ```
117///
118/// There is also a helper function called `cmd` which makes it a
119/// tiny bit shorter:
120///
121/// ```rust
122/// td_rredis::cmd("SET").arg("my_key").arg(42);
123/// ```
124///
125/// Because currently rust's currently does not have an ideal system
126/// for lifetimes of temporaries, sometimes you need to hold on to
127/// the initially generated command:
128
129impl Cmd {
130    /// Creates a new empty command.
131    pub fn new() -> Cmd {
132        Cmd {
133            args: vec![],
134            cursor: None,
135            is_ignored: false,
136        }
137    }
138
139    /// Appends an argument to the command.  The argument passed must
140    /// be a type that implements `ToRedisArgs`.  Most primitive types as
141    /// well as vectors of primitive types implement it.
142    ///
143    /// For instance all of the following are valid:
144    ///
145    /// ```rust,no_run
146    /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
147    /// # let con = client.get_connection().unwrap();
148    /// td_rredis::cmd("SET").arg(&["my_key", "my_value"]);
149    /// td_rredis::cmd("SET").arg("my_key").arg(42);
150    /// td_rredis::cmd("SET").arg("my_key").arg(b"my_value");
151    /// ```
152    #[inline]
153    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
154        for item in arg.to_redis_args().into_iter() {
155            self.args.push(Arg::Simple(item));
156        }
157        self
158    }
159
160    /// Returns the packed command as a byte vector.
161    #[inline]
162    pub fn get_packed_command(&self) -> Vec<u8> {
163        encode_command(&self.args, self.cursor.unwrap_or(0))
164    }
165
166    /// Sends the command as query to the connection and converts the
167    /// result to the target redis value.  This is the general way how
168    /// you can retrieve data.
169    #[inline]
170    pub fn query<T: FromRedisValue>(&self, con: &ConnectionLike) -> RedisResult<T> {
171        let pcmd = self.get_packed_command();
172        match con.req_packed_command(&pcmd) {
173            Ok(val) => from_redis_value(&val),
174            Err(e) => Err(e),
175        }
176    }
177
178    #[inline]
179    pub fn query_cluster<T: FromRedisValue>(&self, cluster: &mut Cluster) -> RedisResult<T> {
180        cluster.query_cmd(self)
181    }
182
183    /// Works similar to `arg` but adds a cursor argument.  This is always
184    /// an integer and also flips the command implementation to support a
185    /// different mode for the iterators where the iterator will ask for
186    /// another batch of items when the local data is exhausted.
187    ///
188    /// ```rust,no_run
189    /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
190    /// # let con = client.get_connection().unwrap();
191    /// let mut cmd = td_rredis::cmd("SSCAN");
192    /// let mut iter : td_rredis::Iter<isize> = cmd.arg("my_set").cursor_arg(0).iter(&con).unwrap();
193    /// for x in iter {
194    ///     // do something with the item
195    /// }
196    /// ```
197    #[inline]
198    pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
199        assert!(!self.in_scan_mode());
200        self.cursor = Some(cursor);
201        self.args.push(Arg::Cursor);
202        self
203    }
204
205    /// Like `get_packed_command` but replaces the cursor with the
206    /// provided value.  If the command is not in scan mode, `None`
207    /// is returned.
208    #[inline]
209    fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
210        if !self.in_scan_mode() {
211            None
212        } else {
213            Some(encode_command(&self.args, cursor))
214        }
215    }
216
217    /// Returns true if the command is in scan mode.
218    #[inline]
219    pub fn in_scan_mode(&self) -> bool {
220        self.cursor.is_some()
221    }
222
223    /// Similar to `query()` but returns an iterator over the items of the
224    /// bulk result or iterator.  In normal mode this is not in any way more
225    /// efficient than just querying into a `Vec<T>` as it's internally
226    /// implemented as buffering into a vector.  This however is useful when
227    /// `cursor_arg` was used in which case the iterator will query for more
228    /// items until the server side cursor is exhausted.
229    ///
230    /// This is useful for commands such as `SSCAN`, `SCAN` and others.
231    ///
232    /// One speciality of this function is that it will check if the response
233    /// looks like a cursor or not and always just looks at the payload.
234    /// This way you can use the function the same for responses in the
235    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
236    /// tuple of cursor and list).
237    #[inline]
238    pub fn iter<'a, T: FromRedisValue>(&self, con: &'a ConnectionLike) -> RedisResult<Iter<'a, T>> {
239        let pcmd = self.get_packed_command();
240        let rv = try!(con.req_packed_command(&pcmd));
241        let mut batch: Vec<T>;
242        let mut cursor = 0;
243
244        if rv.looks_like_cursor() {
245            let (next, b): (u64, Vec<T>) = try!(from_redis_value(&rv));
246            batch = b;
247            cursor = next;
248        } else {
249            batch = try!(from_redis_value(&rv));
250        }
251
252        batch.reverse();
253        Ok(Iter {
254            batch: batch,
255            cursor: cursor,
256            con: con,
257            cmd: self.clone(),
258        })
259    }
260
261    /// This is a shortcut to `query()` that does not return a value and
262    /// will fail the task if the query fails because of an error.  This is
263    /// mainly useful in examples and for simple commands like setting
264    /// keys.
265    ///
266    /// This is equivalent to a call of query like this:
267    ///
268    /// ```rust,no_run
269    /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
270    /// # let con = client.get_connection().unwrap();
271    /// let _ : () = td_rredis::cmd("PING").query(&con).unwrap();
272    /// ```
273    #[inline]
274    pub fn execute(&self, con: &ConnectionLike) {
275        let _: () = self.query(con).unwrap();
276    }
277
278    pub fn get_slot(&self) -> u16 {
279        if self.args.len() > 1 {
280            return match self.args[1] {
281                Arg::Cursor => 0,
282                Arg::Simple(ref val) => key_hash_slot(val),
283                Arg::Borrowed(ref ptr) => key_hash_slot(ptr),
284            };
285        }
286        0
287    }
288}
289
290
291/// A pipeline allows you to send multiple commands in one go to the
292/// redis server.  API wise it's very similar to just using a command
293/// but it allows multiple commands to be chained and some features such
294/// as iteration are not available.
295///
296/// Basic example:
297///
298/// ```rust,no_run
299/// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
300/// # let con = client.get_connection().unwrap();
301/// let ((k1, k2),) : ((i32, i32),) = td_rredis::pipe()
302///     .cmd("SET").arg("key_1").arg(42).ignore()
303///     .cmd("SET").arg("key_2").arg(43).ignore()
304///     .cmd("MGET").arg(&["key_1", "key_2"]).query(&con).unwrap();
305/// ```
306///
307/// As you can see with `cmd` you can start a new command.  By default
308/// each command produces a value but for some you can ignore them by
309/// calling `ignore` on the command.  That way it will be skipped in the
310/// return value which is useful for `SET` commands and others, which
311/// do not have a useful return value.
312impl Pipeline {
313    /// Creates an empty pipeline.  For consistency with the `cmd`
314    /// api a `pipe` function is provided as alias.
315    pub fn new() -> Pipeline {
316        Pipeline {
317            commands: vec![],
318            transaction_mode: false,
319        }
320    }
321
322    /// Starts a new command.  Functions such as `arg` then become
323    /// available to add more arguments to that command.
324    #[inline]
325    pub fn cmd(&mut self, name: &str) -> &mut Pipeline {
326        self.commands.push(cmd(name));
327        self
328    }
329
330    /// Adds a command to the pipeline.
331    #[inline]
332    pub fn add_command(&mut self, cmd: &Cmd) -> &mut Pipeline {
333        self.commands.push(cmd.clone());
334        self
335    }
336
337    #[inline]
338    fn get_last_command(&mut self) -> &mut Cmd {
339        let idx = match self.commands.len() {
340            0 => panic!("No command on stack"),
341            x => x - 1,
342        };
343        &mut self.commands[idx]
344    }
345
346    /// Adds an argument to the last started command.  This works similar
347    /// to the `arg` method of the `Cmd` object.
348    ///
349    /// Note that this function fails the task if executed on an empty pipeline.
350    #[inline]
351    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Pipeline {
352        {
353            let cmd = self.get_last_command();
354            cmd.arg(arg);
355        }
356        self
357    }
358
359    /// Instructs the pipeline to ignore the return value of this command.
360    /// It will still be ensured that it is not an error, but any successful
361    /// result is just thrown away.  This makes result processing through
362    /// tuples much easier because you do not need to handle all the items
363    /// you do not care about.
364    ///
365    /// Note that this function fails the task if executed on an empty pipeline.
366    #[inline]
367    pub fn ignore(&mut self) -> &mut Pipeline {
368        {
369            let cmd = self.get_last_command();
370            cmd.is_ignored = true;
371        }
372        self
373    }
374
375    /// This enables atomic mode.  In atomic mode the whole pipeline is
376    /// enclosed in `MULTI`/`EXEC`.  From the user's point of view nothing
377    /// changes however.  This is easier than using `MULTI`/`EXEC` yourself
378    /// as the format does not change.
379    ///
380    /// ```rust,no_run
381    /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
382    /// # let con = client.get_connection().unwrap();
383    /// let (k1, k2) : (i32, i32) = td_rredis::pipe()
384    ///     .atomic()
385    ///     .cmd("GET").arg("key_1")
386    ///     .cmd("GET").arg("key_2").query(&con).unwrap();
387    /// ```
388    #[inline]
389    pub fn atomic(&mut self) -> &mut Pipeline {
390        self.transaction_mode = true;
391        self
392    }
393
394    fn make_pipeline_results(&self, resp: Vec<Value>) -> Value {
395        let mut rv = vec![];
396        for (idx, result) in resp.into_iter().enumerate() {
397            if !self.commands[idx].is_ignored {
398                rv.push(result);
399            }
400        }
401        Value::Bulk(rv)
402    }
403
404    fn execute_pipelined(&self, con: &ConnectionLike) -> RedisResult<Value> {
405        Ok(self.make_pipeline_results(try!(con.req_packed_commands(
406            &encode_pipeline(&self.commands, false),
407            0, self.commands.len()))))
408    }
409
410    fn execute_transaction(&self, con: &ConnectionLike) -> RedisResult<Value> {
411        let mut resp = try!(con.req_packed_commands(&encode_pipeline(&self.commands, true),
412                                                    self.commands.len() + 1,
413                                                    1));
414        match resp.pop() {
415            Some(Value::Nil) => Ok(Value::Nil),
416            Some(Value::Bulk(items)) => Ok(self.make_pipeline_results(items)),
417            _ => fail!((ErrorKind::ResponseError, "Invalid response when parsing multi response")),
418        }
419    }
420
421    /// Executes the pipeline and fetches the return values.  Since most
422    /// pipelines return different types it's recommended to use tuple
423    /// matching to process the results:
424    ///
425    /// ```rust,no_run
426    /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
427    /// # let con = client.get_connection().unwrap();
428    /// let (k1, k2) : (i32, i32) = td_rredis::pipe()
429    ///     .cmd("SET").arg("key_1").arg(42).ignore()
430    ///     .cmd("SET").arg("key_2").arg(43).ignore()
431    ///     .cmd("GET").arg("key_1")
432    ///     .cmd("GET").arg("key_2").query(&con).unwrap();
433    /// ```
434    #[inline]
435    pub fn query<T: FromRedisValue>(&self, con: &ConnectionLike) -> RedisResult<T> {
436        from_redis_value(&(if self.commands.len() == 0 {
437            Value::Bulk(vec![])
438        } else if self.transaction_mode {
439            try!(self.execute_transaction(con))
440        } else {
441            try!(self.execute_pipelined(con))
442        }))
443    }
444
445    #[inline]
446    pub fn query_cluster<T: FromRedisValue>(&self, cluster: &mut Cluster) -> RedisResult<T> {
447        cluster.query_pipe(self)
448    }
449
450    /// This is a shortcut to `query()` that does not return a value and
451    /// will fail the task if the query of the pipeline fails.
452    ///
453    /// This is equivalent to a call of query like this:
454    ///
455    /// ```rust,no_run
456    /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
457    /// # let con = client.get_connection().unwrap();
458    /// let _ : () = td_rredis::pipe().cmd("PING").query(&con).unwrap();
459    /// ```
460    #[inline]
461    pub fn execute(&self, con: &ConnectionLike) {
462        let _: () = self.query(con).unwrap();
463    }
464
465    pub fn get_slot(&self) -> u16 {
466        if self.commands.len() > 0 {
467            return self.commands[1].get_slot();
468        }
469        0
470    }
471}
472
473/// Shortcut function to creating a command with a single argument.
474///
475/// The first argument of a redis command is always the name of the command
476/// which needs to be a string.  This is the recommended way to start a
477/// command pipe.
478///
479/// ```rust
480/// td_rredis::cmd("PING");
481/// ```
482pub fn cmd<'a>(name: &'a str) -> Cmd {
483    let mut rv = Cmd::new();
484    rv.arg(name);
485    rv
486}
487
488/// Packs a bunch of commands into a request.  This is generally a quite
489/// useless function as this functionality is nicely wrapped through the
490/// `Cmd` object, but in some cases it can be useful.  The return value
491/// of this can then be send to the low level `ConnectionLike` methods.
492///
493/// Example:
494///
495/// ```rust,ignore
496/// # this is ignore because it uses unstable APIs.
497/// # use redis::ToRedisArgs;
498/// let mut args = vec![];
499/// args.push_all(&"SET".to_redis_args());
500/// args.push_all(&"my_key".to_redis_args());
501/// args.push_all(&42.to_redis_args());
502/// let cmd = td_rredis::pack_command(&args);
503/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
504/// ```
505pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
506    encode_command(&args.iter().map(|x| Arg::Borrowed(x.clone())).collect(), 0)
507}
508
509/// Shortcut for creating a new pipeline.
510pub fn pipe() -> Pipeline {
511    Pipeline::new()
512}