td_rredis/cmd.rs
1use types::{ToRedisArgs, FromRedisValue, Value, RedisResult, ErrorKind, from_redis_value};
2use connection::ConnectionLike;
3use cluster::Cluster;
4use slot::key_hash_slot;
5
6#[derive(Clone)]
7enum Arg {
8 Simple(Vec<u8>),
9 Cursor,
10 Borrowed(Vec<u8>),
11}
12
13
14/// Represents redis commands.
15#[derive(Clone)]
16pub struct Cmd {
17 args: Vec<Arg>,
18 cursor: Option<u64>,
19 is_ignored: bool,
20}
21
22/// Represents a redis command pipeline.
23pub struct Pipeline {
24 commands: Vec<Cmd>,
25 transaction_mode: bool,
26}
27
28/// Represents a redis iterator.
29pub struct Iter<'a, T: FromRedisValue> {
30 batch: Vec<T>,
31 cursor: u64,
32 con: &'a (ConnectionLike + 'a),
33 cmd: Cmd,
34}
35
36impl<'a, T: FromRedisValue> Iterator for Iter<'a, T> {
37 type Item = T;
38
39 #[inline]
40 fn next(&mut self) -> Option<T> {
41 // we need to do this in a loop until we produce at least one item
42 // or we find the actual end of the iteration. This is necessary
43 // because with filtering an iterator it is possible that a whole
44 // chunk is not matching the pattern and thus yielding empty results.
45 loop {
46 match self.batch.pop() {
47 Some(v) => {
48 return Some(v);
49 }
50 None => {}
51 };
52 if self.cursor == 0 {
53 return None;
54 }
55
56 let pcmd = unwrap_or!(self.cmd.get_packed_command_with_cursor(self.cursor),
57 return None);
58 let rv = unwrap_or!(self.con
59 .req_packed_command(&pcmd)
60 .ok(),
61 return None);
62 let (cur, mut batch): (u64, Vec<T>) = unwrap_or!(from_redis_value(&rv).ok(),
63 return None);
64 batch.reverse();
65
66 self.cursor = cur;
67 self.batch = batch;
68 }
69 }
70}
71
72fn encode_command(args: &Vec<Arg>, cursor: u64) -> Vec<u8> {
73 let mut cmd = vec![];
74 cmd.extend(format!("*{}\r\n", args.len()).as_bytes().iter().cloned());
75
76 {
77 let mut encode = |item: &[u8]| {
78 cmd.extend(format!("${}\r\n", item.len()).as_bytes().iter().cloned());
79 cmd.extend(item.iter().cloned());
80 cmd.extend(b"\r\n".iter().cloned());
81 };
82
83 for item in args.iter() {
84 match *item {
85 Arg::Cursor => encode(cursor.to_string().as_bytes()),
86 Arg::Simple(ref val) => encode(val),
87 Arg::Borrowed(ref ptr) => encode(ptr),
88 }
89 }
90 }
91 cmd
92}
93
94fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
95 let mut rv = vec![];
96 if atomic {
97 rv.extend(cmd("MULTI").get_packed_command().into_iter());
98 }
99 for cmd in cmds.iter() {
100 rv.extend(cmd.get_packed_command().into_iter());
101 }
102 if atomic {
103 rv.extend(cmd("EXEC").get_packed_command().into_iter());
104 }
105 rv
106}
107
108/// A command acts as a builder interface to creating encoded redis
109/// requests. This allows you to easiy assemble a packed command
110/// by chaining arguments together.
111///
112/// Basic example:
113///
114/// ```rust
115/// td_rredis::Cmd::new().arg("SET").arg("my_key").arg(42);
116/// ```
117///
118/// There is also a helper function called `cmd` which makes it a
119/// tiny bit shorter:
120///
121/// ```rust
122/// td_rredis::cmd("SET").arg("my_key").arg(42);
123/// ```
124///
125/// Because currently rust's currently does not have an ideal system
126/// for lifetimes of temporaries, sometimes you need to hold on to
127/// the initially generated command:
128
129impl Cmd {
130 /// Creates a new empty command.
131 pub fn new() -> Cmd {
132 Cmd {
133 args: vec![],
134 cursor: None,
135 is_ignored: false,
136 }
137 }
138
139 /// Appends an argument to the command. The argument passed must
140 /// be a type that implements `ToRedisArgs`. Most primitive types as
141 /// well as vectors of primitive types implement it.
142 ///
143 /// For instance all of the following are valid:
144 ///
145 /// ```rust,no_run
146 /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
147 /// # let con = client.get_connection().unwrap();
148 /// td_rredis::cmd("SET").arg(&["my_key", "my_value"]);
149 /// td_rredis::cmd("SET").arg("my_key").arg(42);
150 /// td_rredis::cmd("SET").arg("my_key").arg(b"my_value");
151 /// ```
152 #[inline]
153 pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
154 for item in arg.to_redis_args().into_iter() {
155 self.args.push(Arg::Simple(item));
156 }
157 self
158 }
159
160 /// Returns the packed command as a byte vector.
161 #[inline]
162 pub fn get_packed_command(&self) -> Vec<u8> {
163 encode_command(&self.args, self.cursor.unwrap_or(0))
164 }
165
166 /// Sends the command as query to the connection and converts the
167 /// result to the target redis value. This is the general way how
168 /// you can retrieve data.
169 #[inline]
170 pub fn query<T: FromRedisValue>(&self, con: &ConnectionLike) -> RedisResult<T> {
171 let pcmd = self.get_packed_command();
172 match con.req_packed_command(&pcmd) {
173 Ok(val) => from_redis_value(&val),
174 Err(e) => Err(e),
175 }
176 }
177
178 #[inline]
179 pub fn query_cluster<T: FromRedisValue>(&self, cluster: &mut Cluster) -> RedisResult<T> {
180 cluster.query_cmd(self)
181 }
182
183 /// Works similar to `arg` but adds a cursor argument. This is always
184 /// an integer and also flips the command implementation to support a
185 /// different mode for the iterators where the iterator will ask for
186 /// another batch of items when the local data is exhausted.
187 ///
188 /// ```rust,no_run
189 /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
190 /// # let con = client.get_connection().unwrap();
191 /// let mut cmd = td_rredis::cmd("SSCAN");
192 /// let mut iter : td_rredis::Iter<isize> = cmd.arg("my_set").cursor_arg(0).iter(&con).unwrap();
193 /// for x in iter {
194 /// // do something with the item
195 /// }
196 /// ```
197 #[inline]
198 pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
199 assert!(!self.in_scan_mode());
200 self.cursor = Some(cursor);
201 self.args.push(Arg::Cursor);
202 self
203 }
204
205 /// Like `get_packed_command` but replaces the cursor with the
206 /// provided value. If the command is not in scan mode, `None`
207 /// is returned.
208 #[inline]
209 fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
210 if !self.in_scan_mode() {
211 None
212 } else {
213 Some(encode_command(&self.args, cursor))
214 }
215 }
216
217 /// Returns true if the command is in scan mode.
218 #[inline]
219 pub fn in_scan_mode(&self) -> bool {
220 self.cursor.is_some()
221 }
222
223 /// Similar to `query()` but returns an iterator over the items of the
224 /// bulk result or iterator. In normal mode this is not in any way more
225 /// efficient than just querying into a `Vec<T>` as it's internally
226 /// implemented as buffering into a vector. This however is useful when
227 /// `cursor_arg` was used in which case the iterator will query for more
228 /// items until the server side cursor is exhausted.
229 ///
230 /// This is useful for commands such as `SSCAN`, `SCAN` and others.
231 ///
232 /// One speciality of this function is that it will check if the response
233 /// looks like a cursor or not and always just looks at the payload.
234 /// This way you can use the function the same for responses in the
235 /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
236 /// tuple of cursor and list).
237 #[inline]
238 pub fn iter<'a, T: FromRedisValue>(&self, con: &'a ConnectionLike) -> RedisResult<Iter<'a, T>> {
239 let pcmd = self.get_packed_command();
240 let rv = try!(con.req_packed_command(&pcmd));
241 let mut batch: Vec<T>;
242 let mut cursor = 0;
243
244 if rv.looks_like_cursor() {
245 let (next, b): (u64, Vec<T>) = try!(from_redis_value(&rv));
246 batch = b;
247 cursor = next;
248 } else {
249 batch = try!(from_redis_value(&rv));
250 }
251
252 batch.reverse();
253 Ok(Iter {
254 batch: batch,
255 cursor: cursor,
256 con: con,
257 cmd: self.clone(),
258 })
259 }
260
261 /// This is a shortcut to `query()` that does not return a value and
262 /// will fail the task if the query fails because of an error. This is
263 /// mainly useful in examples and for simple commands like setting
264 /// keys.
265 ///
266 /// This is equivalent to a call of query like this:
267 ///
268 /// ```rust,no_run
269 /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
270 /// # let con = client.get_connection().unwrap();
271 /// let _ : () = td_rredis::cmd("PING").query(&con).unwrap();
272 /// ```
273 #[inline]
274 pub fn execute(&self, con: &ConnectionLike) {
275 let _: () = self.query(con).unwrap();
276 }
277
278 pub fn get_slot(&self) -> u16 {
279 if self.args.len() > 1 {
280 return match self.args[1] {
281 Arg::Cursor => 0,
282 Arg::Simple(ref val) => key_hash_slot(val),
283 Arg::Borrowed(ref ptr) => key_hash_slot(ptr),
284 };
285 }
286 0
287 }
288}
289
290
291/// A pipeline allows you to send multiple commands in one go to the
292/// redis server. API wise it's very similar to just using a command
293/// but it allows multiple commands to be chained and some features such
294/// as iteration are not available.
295///
296/// Basic example:
297///
298/// ```rust,no_run
299/// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
300/// # let con = client.get_connection().unwrap();
301/// let ((k1, k2),) : ((i32, i32),) = td_rredis::pipe()
302/// .cmd("SET").arg("key_1").arg(42).ignore()
303/// .cmd("SET").arg("key_2").arg(43).ignore()
304/// .cmd("MGET").arg(&["key_1", "key_2"]).query(&con).unwrap();
305/// ```
306///
307/// As you can see with `cmd` you can start a new command. By default
308/// each command produces a value but for some you can ignore them by
309/// calling `ignore` on the command. That way it will be skipped in the
310/// return value which is useful for `SET` commands and others, which
311/// do not have a useful return value.
312impl Pipeline {
313 /// Creates an empty pipeline. For consistency with the `cmd`
314 /// api a `pipe` function is provided as alias.
315 pub fn new() -> Pipeline {
316 Pipeline {
317 commands: vec![],
318 transaction_mode: false,
319 }
320 }
321
322 /// Starts a new command. Functions such as `arg` then become
323 /// available to add more arguments to that command.
324 #[inline]
325 pub fn cmd(&mut self, name: &str) -> &mut Pipeline {
326 self.commands.push(cmd(name));
327 self
328 }
329
330 /// Adds a command to the pipeline.
331 #[inline]
332 pub fn add_command(&mut self, cmd: &Cmd) -> &mut Pipeline {
333 self.commands.push(cmd.clone());
334 self
335 }
336
337 #[inline]
338 fn get_last_command(&mut self) -> &mut Cmd {
339 let idx = match self.commands.len() {
340 0 => panic!("No command on stack"),
341 x => x - 1,
342 };
343 &mut self.commands[idx]
344 }
345
346 /// Adds an argument to the last started command. This works similar
347 /// to the `arg` method of the `Cmd` object.
348 ///
349 /// Note that this function fails the task if executed on an empty pipeline.
350 #[inline]
351 pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Pipeline {
352 {
353 let cmd = self.get_last_command();
354 cmd.arg(arg);
355 }
356 self
357 }
358
359 /// Instructs the pipeline to ignore the return value of this command.
360 /// It will still be ensured that it is not an error, but any successful
361 /// result is just thrown away. This makes result processing through
362 /// tuples much easier because you do not need to handle all the items
363 /// you do not care about.
364 ///
365 /// Note that this function fails the task if executed on an empty pipeline.
366 #[inline]
367 pub fn ignore(&mut self) -> &mut Pipeline {
368 {
369 let cmd = self.get_last_command();
370 cmd.is_ignored = true;
371 }
372 self
373 }
374
375 /// This enables atomic mode. In atomic mode the whole pipeline is
376 /// enclosed in `MULTI`/`EXEC`. From the user's point of view nothing
377 /// changes however. This is easier than using `MULTI`/`EXEC` yourself
378 /// as the format does not change.
379 ///
380 /// ```rust,no_run
381 /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
382 /// # let con = client.get_connection().unwrap();
383 /// let (k1, k2) : (i32, i32) = td_rredis::pipe()
384 /// .atomic()
385 /// .cmd("GET").arg("key_1")
386 /// .cmd("GET").arg("key_2").query(&con).unwrap();
387 /// ```
388 #[inline]
389 pub fn atomic(&mut self) -> &mut Pipeline {
390 self.transaction_mode = true;
391 self
392 }
393
394 fn make_pipeline_results(&self, resp: Vec<Value>) -> Value {
395 let mut rv = vec![];
396 for (idx, result) in resp.into_iter().enumerate() {
397 if !self.commands[idx].is_ignored {
398 rv.push(result);
399 }
400 }
401 Value::Bulk(rv)
402 }
403
404 fn execute_pipelined(&self, con: &ConnectionLike) -> RedisResult<Value> {
405 Ok(self.make_pipeline_results(try!(con.req_packed_commands(
406 &encode_pipeline(&self.commands, false),
407 0, self.commands.len()))))
408 }
409
410 fn execute_transaction(&self, con: &ConnectionLike) -> RedisResult<Value> {
411 let mut resp = try!(con.req_packed_commands(&encode_pipeline(&self.commands, true),
412 self.commands.len() + 1,
413 1));
414 match resp.pop() {
415 Some(Value::Nil) => Ok(Value::Nil),
416 Some(Value::Bulk(items)) => Ok(self.make_pipeline_results(items)),
417 _ => fail!((ErrorKind::ResponseError, "Invalid response when parsing multi response")),
418 }
419 }
420
421 /// Executes the pipeline and fetches the return values. Since most
422 /// pipelines return different types it's recommended to use tuple
423 /// matching to process the results:
424 ///
425 /// ```rust,no_run
426 /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
427 /// # let con = client.get_connection().unwrap();
428 /// let (k1, k2) : (i32, i32) = td_rredis::pipe()
429 /// .cmd("SET").arg("key_1").arg(42).ignore()
430 /// .cmd("SET").arg("key_2").arg(43).ignore()
431 /// .cmd("GET").arg("key_1")
432 /// .cmd("GET").arg("key_2").query(&con).unwrap();
433 /// ```
434 #[inline]
435 pub fn query<T: FromRedisValue>(&self, con: &ConnectionLike) -> RedisResult<T> {
436 from_redis_value(&(if self.commands.len() == 0 {
437 Value::Bulk(vec![])
438 } else if self.transaction_mode {
439 try!(self.execute_transaction(con))
440 } else {
441 try!(self.execute_pipelined(con))
442 }))
443 }
444
445 #[inline]
446 pub fn query_cluster<T: FromRedisValue>(&self, cluster: &mut Cluster) -> RedisResult<T> {
447 cluster.query_pipe(self)
448 }
449
450 /// This is a shortcut to `query()` that does not return a value and
451 /// will fail the task if the query of the pipeline fails.
452 ///
453 /// This is equivalent to a call of query like this:
454 ///
455 /// ```rust,no_run
456 /// # let client = td_rredis::Client::open("redis://127.0.0.1/").unwrap();
457 /// # let con = client.get_connection().unwrap();
458 /// let _ : () = td_rredis::pipe().cmd("PING").query(&con).unwrap();
459 /// ```
460 #[inline]
461 pub fn execute(&self, con: &ConnectionLike) {
462 let _: () = self.query(con).unwrap();
463 }
464
465 pub fn get_slot(&self) -> u16 {
466 if self.commands.len() > 0 {
467 return self.commands[1].get_slot();
468 }
469 0
470 }
471}
472
473/// Shortcut function to creating a command with a single argument.
474///
475/// The first argument of a redis command is always the name of the command
476/// which needs to be a string. This is the recommended way to start a
477/// command pipe.
478///
479/// ```rust
480/// td_rredis::cmd("PING");
481/// ```
482pub fn cmd<'a>(name: &'a str) -> Cmd {
483 let mut rv = Cmd::new();
484 rv.arg(name);
485 rv
486}
487
488/// Packs a bunch of commands into a request. This is generally a quite
489/// useless function as this functionality is nicely wrapped through the
490/// `Cmd` object, but in some cases it can be useful. The return value
491/// of this can then be send to the low level `ConnectionLike` methods.
492///
493/// Example:
494///
495/// ```rust,ignore
496/// # this is ignore because it uses unstable APIs.
497/// # use redis::ToRedisArgs;
498/// let mut args = vec![];
499/// args.push_all(&"SET".to_redis_args());
500/// args.push_all(&"my_key".to_redis_args());
501/// args.push_all(&42.to_redis_args());
502/// let cmd = td_rredis::pack_command(&args);
503/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
504/// ```
505pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
506 encode_command(&args.iter().map(|x| Arg::Borrowed(x.clone())).collect(), 0)
507}
508
509/// Shortcut for creating a new pipeline.
510pub fn pipe() -> Pipeline {
511 Pipeline::new()
512}