mco_redis_rs/
cmd.rs

1#[cfg(feature = "aio")]
2use futures_util::{
3    task::{Context, Poll},
4    FutureExt, Stream,
5};
6#[cfg(feature = "aio")]
7use std::pin::Pin;
8use std::{fmt, io};
9
10use crate::connection::ConnectionLike;
11use crate::pipeline::Pipeline;
12use crate::types::{from_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
13
14/// An argument to a redis command
15#[derive(Clone)]
16pub enum Arg<D> {
17    /// A normal argument
18    Simple(D),
19    /// A cursor argument created from `cursor_arg()`
20    Cursor,
21}
22
23/// Represents redis commands.
24#[derive(Clone)]
25pub struct Cmd {
26    data: Vec<u8>,
27    // Arg::Simple contains the offset that marks the end of the argument
28    args: Vec<Arg<usize>>,
29    cursor: Option<u64>,
30}
31
32/// Represents a redis iterator.
33pub struct Iter<'a, T: FromRedisValue> {
34    batch: std::vec::IntoIter<T>,
35    cursor: u64,
36    con: &'a mut (dyn ConnectionLike + 'a),
37    cmd: Cmd,
38}
39
40impl<'a, T: FromRedisValue> Iterator for Iter<'a, T> {
41    type Item = T;
42
43    #[inline]
44    fn next(&mut self) -> Option<T> {
45        // we need to do this in a loop until we produce at least one item
46        // or we find the actual end of the iteration.  This is necessary
47        // because with filtering an iterator it is possible that a whole
48        // chunk is not matching the pattern and thus yielding empty results.
49        loop {
50            if let Some(v) = self.batch.next() {
51                return Some(v);
52            };
53            if self.cursor == 0 {
54                return None;
55            }
56
57            let pcmd = unwrap_or!(
58                self.cmd.get_packed_command_with_cursor(self.cursor),
59                return None
60            );
61            let rv = unwrap_or!(self.con.req_packed_command(&pcmd).ok(), return None);
62            let (cur, batch): (u64, Vec<T>) = unwrap_or!(from_redis_value(&rv).ok(), return None);
63
64            self.cursor = cur;
65            self.batch = batch.into_iter();
66        }
67    }
68}
69
70#[cfg(feature = "aio")]
71use crate::aio::ConnectionLike as AsyncConnection;
72
73/// Represents a redis iterator that can be used with async connections.
74#[cfg(feature = "aio")]
75pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
76    batch: std::vec::IntoIter<T>,
77    con: &'a mut (dyn AsyncConnection + Send + 'a),
78    cmd: Cmd,
79}
80
81#[cfg(feature = "aio")]
82impl<'a, T: FromRedisValue + 'a> AsyncIter<'a, T> {
83    /// ```rust,no_run
84    /// # use redis::AsyncCommands;
85    /// # async fn scan_set() -> redis::RedisResult<()> {
86    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
87    /// # let mut con = client.get_async_connection().await?;
88    /// con.sadd("my_set", 42i32).await?;
89    /// con.sadd("my_set", 43i32).await?;
90    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
91    /// while let Some(element) = iter.next_item().await {
92    ///     assert!(element == 42 || element == 43);
93    /// }
94    /// # Ok(())
95    /// # }
96    /// ```
97    #[inline]
98    pub async fn next_item(&mut self) -> Option<T> {
99        // we need to do this in a loop until we produce at least one item
100        // or we find the actual end of the iteration.  This is necessary
101        // because with filtering an iterator it is possible that a whole
102        // chunk is not matching the pattern and thus yielding empty results.
103        loop {
104            if let Some(v) = self.batch.next() {
105                return Some(v);
106            };
107            if let Some(cursor) = self.cmd.cursor {
108                if cursor == 0 {
109                    return None;
110                }
111            } else {
112                return None;
113            }
114
115            let rv = unwrap_or!(
116                self.con.req_packed_command(&self.cmd).await.ok(),
117                return None
118            );
119            let (cur, batch): (u64, Vec<T>) = unwrap_or!(from_redis_value(&rv).ok(), return None);
120
121            self.cmd.cursor = Some(cur);
122            self.batch = batch.into_iter();
123        }
124    }
125}
126
127#[cfg(feature = "aio")]
128impl<'a, T: FromRedisValue + Unpin + 'a> Stream for AsyncIter<'a, T> {
129    type Item = T;
130
131    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
132        let this = self.get_mut();
133        let mut future = Box::pin(this.next_item());
134        future.poll_unpin(cx)
135    }
136}
137
138fn countdigits(mut v: usize) -> usize {
139    let mut result = 1;
140    loop {
141        if v < 10 {
142            return result;
143        }
144        if v < 100 {
145            return result + 1;
146        }
147        if v < 1000 {
148            return result + 2;
149        }
150        if v < 10000 {
151            return result + 3;
152        }
153
154        v /= 10000;
155        result += 4;
156    }
157}
158
159#[inline]
160fn bulklen(len: usize) -> usize {
161    1 + countdigits(len) + 2 + len + 2
162}
163
164fn args_len<'a, I>(args: I, cursor: u64) -> usize
165where
166    I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
167{
168    let mut totlen = 1 + countdigits(args.len()) + 2;
169    for item in args {
170        totlen += bulklen(match item {
171            Arg::Cursor => countdigits(cursor as usize),
172            Arg::Simple(val) => val.len(),
173        });
174    }
175    totlen
176}
177
178pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
179    args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
180}
181
182fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
183where
184    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
185{
186    let mut cmd = Vec::new();
187    write_command_to_vec(&mut cmd, args, cursor);
188    cmd
189}
190
191fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
192where
193    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
194{
195    let totlen = args_len(args.clone(), cursor);
196
197    cmd.reserve(totlen);
198
199    write_command(cmd, args, cursor).unwrap()
200}
201
202fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
203where
204    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
205{
206    let mut buf = ::itoa::Buffer::new();
207
208    cmd.write_all(b"*")?;
209    let s = buf.format(args.len());
210    cmd.write_all(s.as_bytes())?;
211    cmd.write_all(b"\r\n")?;
212
213    let mut cursor_bytes = itoa::Buffer::new();
214    for item in args {
215        let bytes = match item {
216            Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
217            Arg::Simple(val) => val,
218        };
219
220        cmd.write_all(b"$")?;
221        let s = buf.format(bytes.len());
222        cmd.write_all(s.as_bytes())?;
223        cmd.write_all(b"\r\n")?;
224
225        cmd.write_all(bytes)?;
226        cmd.write_all(b"\r\n")?;
227    }
228    Ok(())
229}
230
231impl RedisWrite for Cmd {
232    fn write_arg(&mut self, arg: &[u8]) {
233        self.data.extend_from_slice(arg);
234        self.args.push(Arg::Simple(self.data.len()));
235    }
236
237    fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
238        use std::io::Write;
239        write!(self.data, "{}", arg).unwrap();
240        self.args.push(Arg::Simple(self.data.len()));
241    }
242}
243
244impl Default for Cmd {
245    fn default() -> Cmd {
246        Cmd::new()
247    }
248}
249
250/// A command acts as a builder interface to creating encoded redis
251/// requests.  This allows you to easiy assemble a packed command
252/// by chaining arguments together.
253///
254/// Basic example:
255///
256/// ```rust
257/// redis::Cmd::new().arg("SET").arg("my_key").arg(42);
258/// ```
259///
260/// There is also a helper function called `cmd` which makes it a
261/// tiny bit shorter:
262///
263/// ```rust
264/// redis::cmd("SET").arg("my_key").arg(42);
265/// ```
266///
267/// Because Rust currently does not have an ideal system
268/// for lifetimes of temporaries, sometimes you need to hold on to
269/// the initially generated command:
270///
271/// ```rust,no_run
272/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
273/// # let mut con = client.get_connection().unwrap();
274/// let mut cmd = redis::cmd("SMEMBERS");
275/// let mut iter : redis::Iter<i32> = cmd.arg("my_set").clone().iter(&mut con).unwrap();
276/// ```
277impl Cmd {
278    /// Creates a new empty command.
279    pub fn new() -> Cmd {
280        Cmd {
281            data: vec![],
282            args: vec![],
283            cursor: None,
284        }
285    }
286
287    /// Appends an argument to the command.  The argument passed must
288    /// be a type that implements `ToRedisArgs`.  Most primitive types as
289    /// well as vectors of primitive types implement it.
290    ///
291    /// For instance all of the following are valid:
292    ///
293    /// ```rust,no_run
294    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
295    /// # let mut con = client.get_connection().unwrap();
296    /// redis::cmd("SET").arg(&["my_key", "my_value"]);
297    /// redis::cmd("SET").arg("my_key").arg(42);
298    /// redis::cmd("SET").arg("my_key").arg(b"my_value");
299    /// ```
300    #[inline]
301    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
302        arg.write_redis_args(self);
303        self
304    }
305
306    /// Works similar to `arg` but adds a cursor argument.  This is always
307    /// an integer and also flips the command implementation to support a
308    /// different mode for the iterators where the iterator will ask for
309    /// another batch of items when the local data is exhausted.
310    ///
311    /// ```rust,no_run
312    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
313    /// # let mut con = client.get_connection().unwrap();
314    /// let mut cmd = redis::cmd("SSCAN");
315    /// let mut iter : redis::Iter<isize> =
316    ///     cmd.arg("my_set").cursor_arg(0).clone().iter(&mut con).unwrap();
317    /// for x in iter {
318    ///     // do something with the item
319    /// }
320    /// ```
321    #[inline]
322    pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
323        assert!(!self.in_scan_mode());
324        self.cursor = Some(cursor);
325        self.args.push(Arg::Cursor);
326        self
327    }
328
329    /// Returns the packed command as a byte vector.
330    #[inline]
331    pub fn get_packed_command(&self) -> Vec<u8> {
332        let mut cmd = Vec::new();
333        self.write_packed_command(&mut cmd);
334        cmd
335    }
336
337    pub(crate) fn write_packed_command(&self, cmd: &mut Vec<u8>) {
338        write_command_to_vec(cmd, self.args_iter(), self.cursor.unwrap_or(0))
339    }
340
341    pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
342        write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
343    }
344
345    /// Like `get_packed_command` but replaces the cursor with the
346    /// provided value.  If the command is not in scan mode, `None`
347    /// is returned.
348    #[inline]
349    fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
350        if !self.in_scan_mode() {
351            None
352        } else {
353            Some(encode_command(self.args_iter(), cursor))
354        }
355    }
356
357    /// Returns true if the command is in scan mode.
358    #[inline]
359    pub fn in_scan_mode(&self) -> bool {
360        self.cursor.is_some()
361    }
362
363    /// Sends the command as query to the connection and converts the
364    /// result to the target redis value.  This is the general way how
365    /// you can retrieve data.
366    #[inline]
367    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
368        match con.req_command(self) {
369            Ok(val) => from_redis_value(&val),
370            Err(e) => Err(e),
371        }
372    }
373
374    /// Async version of `query`.
375    #[inline]
376    #[cfg(feature = "aio")]
377    pub async fn query_async<C, T: FromRedisValue>(&self, con: &mut C) -> RedisResult<T>
378    where
379        C: crate::aio::ConnectionLike,
380    {
381        let val = con.req_packed_command(self).await?;
382        from_redis_value(&val)
383    }
384
385    /// Similar to `query()` but returns an iterator over the items of the
386    /// bulk result or iterator.  In normal mode this is not in any way more
387    /// efficient than just querying into a `Vec<T>` as it's internally
388    /// implemented as buffering into a vector.  This however is useful when
389    /// `cursor_arg` was used in which case the iterator will query for more
390    /// items until the server side cursor is exhausted.
391    ///
392    /// This is useful for commands such as `SSCAN`, `SCAN` and others.
393    ///
394    /// One speciality of this function is that it will check if the response
395    /// looks like a cursor or not and always just looks at the payload.
396    /// This way you can use the function the same for responses in the
397    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
398    /// tuple of cursor and list).
399    #[inline]
400    pub fn iter<T: FromRedisValue>(self, con: &mut dyn ConnectionLike) -> RedisResult<Iter<'_, T>> {
401        let rv = con.req_command(&self)?;
402
403        let (cursor, batch) = if rv.looks_like_cursor() {
404            from_redis_value::<(u64, Vec<T>)>(&rv)?
405        } else {
406            (0, from_redis_value(&rv)?)
407        };
408
409        Ok(Iter {
410            batch: batch.into_iter(),
411            cursor,
412            con,
413            cmd: self,
414        })
415    }
416
417    /// Similar to `iter()` but returns an AsyncIter over the items of the
418    /// bulk result or iterator.  A [futures::Stream](https://docs.rs/futures/0.3.3/futures/stream/trait.Stream.html)
419    /// can be obtained by calling `stream()` on the AsyncIter.  In normal mode this is not in any way more
420    /// efficient than just querying into a `Vec<T>` as it's internally
421    /// implemented as buffering into a vector.  This however is useful when
422    /// `cursor_arg` was used in which case the stream will query for more
423    /// items until the server side cursor is exhausted.
424    ///
425    /// This is useful for commands such as `SSCAN`, `SCAN` and others in async contexts.
426    ///
427    /// One speciality of this function is that it will check if the response
428    /// looks like a cursor or not and always just looks at the payload.
429    /// This way you can use the function the same for responses in the
430    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
431    /// tuple of cursor and list).
432    #[cfg(feature = "aio")]
433    #[inline]
434    pub async fn iter_async<'a, T: FromRedisValue + 'a>(
435        mut self,
436        con: &'a mut (dyn AsyncConnection + Send),
437    ) -> RedisResult<AsyncIter<'a, T>> {
438        let rv = con.req_packed_command(&self).await?;
439
440        let (cursor, batch) = if rv.looks_like_cursor() {
441            from_redis_value::<(u64, Vec<T>)>(&rv)?
442        } else {
443            (0, from_redis_value(&rv)?)
444        };
445        if cursor == 0 {
446            self.cursor = None;
447        } else {
448            self.cursor = Some(cursor);
449        }
450
451        Ok(AsyncIter {
452            batch: batch.into_iter(),
453            con,
454            cmd: self,
455        })
456    }
457
458    /// This is a shortcut to `query()` that does not return a value and
459    /// will fail the task if the query fails because of an error.  This is
460    /// mainly useful in examples and for simple commands like setting
461    /// keys.
462    ///
463    /// This is equivalent to a call of query like this:
464    ///
465    /// ```rust,no_run
466    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
467    /// # let mut con = client.get_connection().unwrap();
468    /// let _ : () = redis::cmd("PING").query(&mut con).unwrap();
469    /// ```
470    #[inline]
471    pub fn execute(&self, con: &mut dyn ConnectionLike) {
472        self.query::<()>(con).unwrap();
473    }
474
475    /// Returns an iterator over the arguments in this command (including the command name itself)
476    pub fn args_iter(&self) -> impl Iterator<Item = Arg<&[u8]>> + Clone + ExactSizeIterator {
477        let mut prev = 0;
478        self.args.iter().map(move |arg| match *arg {
479            Arg::Simple(i) => {
480                let arg = Arg::Simple(&self.data[prev..i]);
481                prev = i;
482                arg
483            }
484
485            Arg::Cursor => Arg::Cursor,
486        })
487    }
488
489    // Get a reference to the argument at `idx`
490    #[cfg(feature = "cluster")]
491    pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
492        if idx >= self.args.len() {
493            return None;
494        }
495
496        let start = if idx == 0 {
497            0
498        } else {
499            match self.args[idx - 1] {
500                Arg::Simple(n) => n,
501                _ => 0,
502            }
503        };
504        let end = match self.args[idx] {
505            Arg::Simple(n) => n,
506            _ => 0,
507        };
508        if start == 0 && end == 0 {
509            return None;
510        }
511        Some(&self.data[start..end])
512    }
513}
514
515/// Shortcut function to creating a command with a single argument.
516///
517/// The first argument of a redis command is always the name of the command
518/// which needs to be a string.  This is the recommended way to start a
519/// command pipe.
520///
521/// ```rust
522/// redis::cmd("PING");
523/// ```
524pub fn cmd(name: &str) -> Cmd {
525    let mut rv = Cmd::new();
526    rv.arg(name);
527    rv
528}
529
530/// Packs a bunch of commands into a request.  This is generally a quite
531/// useless function as this functionality is nicely wrapped through the
532/// `Cmd` object, but in some cases it can be useful.  The return value
533/// of this can then be send to the low level `ConnectionLike` methods.
534///
535/// Example:
536///
537/// ```rust
538/// # use redis::ToRedisArgs;
539/// let mut args = vec![];
540/// args.extend("SET".to_redis_args());
541/// args.extend("my_key".to_redis_args());
542/// args.extend(42.to_redis_args());
543/// let cmd = redis::pack_command(&args);
544/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
545/// ```
546pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
547    encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
548}
549
550/// Shortcut for creating a new pipeline.
551pub fn pipe() -> Pipeline {
552    Pipeline::new()
553}
554
555#[cfg(test)]
556#[cfg(feature = "cluster")]
557mod tests {
558    use super::Cmd;
559
560    #[test]
561    fn test_cmd_arg_idx() {
562        let mut c = Cmd::new();
563        assert_eq!(c.arg_idx(0), None);
564
565        c.arg("SET");
566        assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
567        assert_eq!(c.arg_idx(1), None);
568
569        c.arg("foo").arg("42");
570        assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
571        assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
572        assert_eq!(c.arg_idx(3), None);
573        assert_eq!(c.arg_idx(4), None);
574    }
575}