redis/
cmd.rs

1#[cfg(feature = "aio")]
2use futures_util::{
3    future::BoxFuture,
4    task::{Context, Poll},
5    Stream, StreamExt,
6};
7#[cfg(feature = "aio")]
8use std::pin::Pin;
9#[cfg(feature = "cache-aio")]
10use std::time::Duration;
11use std::{fmt, io};
12
13use crate::connection::ConnectionLike;
14use crate::pipeline::Pipeline;
15use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
16
17/// An argument to a redis command
18#[derive(Clone)]
19pub enum Arg<D> {
20    /// A normal argument
21    Simple(D),
22    /// A cursor argument created from `cursor_arg()`
23    Cursor,
24}
25
26/// CommandCacheConfig is used to define caching behaviour of individual commands.
27/// # Example
28/// ```rust
29/// use std::time::Duration;
30/// use redis::{CommandCacheConfig, Cmd};
31///
32/// let ttl = Duration::from_secs(120); // 2 minutes TTL
33/// let config = CommandCacheConfig::new()
34///     .set_enable_cache(true)
35///     .set_client_side_ttl(ttl);
36/// let command = Cmd::new().arg("GET").arg("key").set_cache_config(config);
37/// ```
38#[cfg(feature = "cache-aio")]
39#[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
40#[derive(Clone)]
41pub struct CommandCacheConfig {
42    pub(crate) enable_cache: bool,
43    pub(crate) client_side_ttl: Option<Duration>,
44}
45
46#[cfg(feature = "cache-aio")]
47impl CommandCacheConfig {
48    /// Creates new CommandCacheConfig with enable_cache as true and without client_side_ttl.
49    pub fn new() -> Self {
50        Self {
51            enable_cache: true,
52            client_side_ttl: None,
53        }
54    }
55
56    /// Sets whether the cache should be enabled or not.
57    /// Disabling cache for specific command when using [crate::caching::CacheMode::All] will not work.
58    pub fn set_enable_cache(mut self, enable_cache: bool) -> Self {
59        self.enable_cache = enable_cache;
60        self
61    }
62
63    /// Sets custom client side time to live (TTL).
64    pub fn set_client_side_ttl(mut self, client_side_ttl: Duration) -> Self {
65        self.client_side_ttl = Some(client_side_ttl);
66        self
67    }
68}
69#[cfg(feature = "cache-aio")]
70impl Default for CommandCacheConfig {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76/// Represents redis commands.
77#[derive(Clone)]
78pub struct Cmd {
79    pub(crate) data: Vec<u8>,
80    // Arg::Simple contains the offset that marks the end of the argument
81    args: Vec<Arg<usize>>,
82    cursor: Option<u64>,
83    // If it's true command's response won't be read from socket. Useful for Pub/Sub.
84    no_response: bool,
85    #[cfg(feature = "cache-aio")]
86    cache: Option<CommandCacheConfig>,
87}
88
89/// Represents a redis iterator.
90pub struct Iter<'a, T: FromRedisValue> {
91    batch: std::vec::IntoIter<T>,
92    cursor: u64,
93    con: &'a mut (dyn ConnectionLike + 'a),
94    cmd: Cmd,
95}
96
97impl<T: FromRedisValue> Iterator for Iter<'_, T> {
98    type Item = T;
99
100    #[inline]
101    fn next(&mut self) -> Option<T> {
102        // we need to do this in a loop until we produce at least one item
103        // or we find the actual end of the iteration.  This is necessary
104        // because with filtering an iterator it is possible that a whole
105        // chunk is not matching the pattern and thus yielding empty results.
106        loop {
107            if let Some(v) = self.batch.next() {
108                return Some(v);
109            };
110            if self.cursor == 0 {
111                return None;
112            }
113
114            let pcmd = self.cmd.get_packed_command_with_cursor(self.cursor)?;
115            let rv = self.con.req_packed_command(&pcmd).ok()?;
116            let (cur, batch): (u64, Vec<T>) = from_owned_redis_value(rv).ok()?;
117
118            self.cursor = cur;
119            self.batch = batch.into_iter();
120        }
121    }
122}
123
124#[cfg(feature = "aio")]
125use crate::aio::ConnectionLike as AsyncConnection;
126
127/// The inner future of AsyncIter
128#[cfg(feature = "aio")]
129struct AsyncIterInner<'a, T: FromRedisValue + 'a> {
130    batch: std::vec::IntoIter<T>,
131    con: &'a mut (dyn AsyncConnection + Send + 'a),
132    cmd: Cmd,
133}
134
135/// Represents the state of AsyncIter
136#[cfg(feature = "aio")]
137enum IterOrFuture<'a, T: FromRedisValue + 'a> {
138    Iter(AsyncIterInner<'a, T>),
139    Future(BoxFuture<'a, (AsyncIterInner<'a, T>, Option<T>)>),
140    Empty,
141}
142
143/// Represents a redis iterator that can be used with async connections.
144#[cfg(feature = "aio")]
145pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
146    inner: IterOrFuture<'a, T>,
147}
148
149#[cfg(feature = "aio")]
150impl<'a, T: FromRedisValue + 'a> AsyncIterInner<'a, T> {
151    #[inline]
152    pub async fn next_item(&mut self) -> Option<T> {
153        // we need to do this in a loop until we produce at least one item
154        // or we find the actual end of the iteration.  This is necessary
155        // because with filtering an iterator it is possible that a whole
156        // chunk is not matching the pattern and thus yielding empty results.
157        loop {
158            if let Some(v) = self.batch.next() {
159                return Some(v);
160            };
161            if let Some(cursor) = self.cmd.cursor {
162                if cursor == 0 {
163                    return None;
164                }
165            } else {
166                return None;
167            }
168
169            let rv = self.con.req_packed_command(&self.cmd).await.ok()?;
170            let (cur, batch): (u64, Vec<T>) = from_owned_redis_value(rv).ok()?;
171
172            self.cmd.cursor = Some(cur);
173            self.batch = batch.into_iter();
174        }
175    }
176}
177
178#[cfg(feature = "aio")]
179impl<'a, T: FromRedisValue + 'a + Unpin + Send> AsyncIter<'a, T> {
180    /// ```rust,no_run
181    /// # use redis::AsyncCommands;
182    /// # async fn scan_set() -> redis::RedisResult<()> {
183    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
184    /// # let mut con = client.get_multiplexed_async_connection().await?;
185    /// let _: () = con.sadd("my_set", 42i32).await?;
186    /// let _: () = con.sadd("my_set", 43i32).await?;
187    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
188    /// while let Some(element) = iter.next_item().await {
189    ///     assert!(element == 42 || element == 43);
190    /// }
191    /// # Ok(())
192    /// # }
193    /// ```
194    #[inline]
195    pub async fn next_item(&mut self) -> Option<T> {
196        StreamExt::next(self).await
197    }
198}
199
200#[cfg(feature = "aio")]
201impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
202    type Item = T;
203
204    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
205        let this = self.get_mut();
206        let inner = std::mem::replace(&mut this.inner, IterOrFuture::Empty);
207        match inner {
208            IterOrFuture::Iter(mut iter) => {
209                let fut = async move {
210                    let next_item = iter.next_item().await;
211                    (iter, next_item)
212                };
213                this.inner = IterOrFuture::Future(Box::pin(fut));
214                Pin::new(this).poll_next(cx)
215            }
216            IterOrFuture::Future(mut fut) => match fut.as_mut().poll(cx) {
217                Poll::Pending => {
218                    this.inner = IterOrFuture::Future(fut);
219                    Poll::Pending
220                }
221                Poll::Ready((iter, value)) => {
222                    this.inner = IterOrFuture::Iter(iter);
223                    Poll::Ready(value)
224                }
225            },
226            IterOrFuture::Empty => unreachable!(),
227        }
228    }
229}
230
231fn countdigits(mut v: usize) -> usize {
232    let mut result = 1;
233    loop {
234        if v < 10 {
235            return result;
236        }
237        if v < 100 {
238            return result + 1;
239        }
240        if v < 1000 {
241            return result + 2;
242        }
243        if v < 10000 {
244            return result + 3;
245        }
246
247        v /= 10000;
248        result += 4;
249    }
250}
251
252#[inline]
253fn bulklen(len: usize) -> usize {
254    1 + countdigits(len) + 2 + len + 2
255}
256
257fn args_len<'a, I>(args: I, cursor: u64) -> usize
258where
259    I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
260{
261    let mut totlen = 1 + countdigits(args.len()) + 2;
262    for item in args {
263        totlen += bulklen(match item {
264            Arg::Cursor => countdigits(cursor as usize),
265            Arg::Simple(val) => val.len(),
266        });
267    }
268    totlen
269}
270
271pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
272    args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
273}
274
275fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
276where
277    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
278{
279    let mut cmd = Vec::new();
280    write_command_to_vec(&mut cmd, args, cursor);
281    cmd
282}
283
284fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
285where
286    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
287{
288    let totlen = args_len(args.clone(), cursor);
289
290    cmd.reserve(totlen);
291
292    write_command(cmd, args, cursor).unwrap()
293}
294
295fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
296where
297    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
298{
299    let mut buf = ::itoa::Buffer::new();
300
301    cmd.write_all(b"*")?;
302    let s = buf.format(args.len());
303    cmd.write_all(s.as_bytes())?;
304    cmd.write_all(b"\r\n")?;
305
306    let mut cursor_bytes = itoa::Buffer::new();
307    for item in args {
308        let bytes = match item {
309            Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
310            Arg::Simple(val) => val,
311        };
312
313        cmd.write_all(b"$")?;
314        let s = buf.format(bytes.len());
315        cmd.write_all(s.as_bytes())?;
316        cmd.write_all(b"\r\n")?;
317
318        cmd.write_all(bytes)?;
319        cmd.write_all(b"\r\n")?;
320    }
321    Ok(())
322}
323
324impl RedisWrite for Cmd {
325    fn write_arg(&mut self, arg: &[u8]) {
326        self.data.extend_from_slice(arg);
327        self.args.push(Arg::Simple(self.data.len()));
328    }
329
330    fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
331        use std::io::Write;
332        write!(self.data, "{arg}").unwrap();
333        self.args.push(Arg::Simple(self.data.len()));
334    }
335
336    fn writer_for_next_arg(&mut self) -> impl std::io::Write + '_ {
337        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
338        impl Drop for CmdBufferedArgGuard<'_> {
339            fn drop(&mut self) {
340                self.0.args.push(Arg::Simple(self.0.data.len()));
341            }
342        }
343        impl std::io::Write for CmdBufferedArgGuard<'_> {
344            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
345                self.0.data.extend_from_slice(buf);
346                Ok(buf.len())
347            }
348
349            fn flush(&mut self) -> std::io::Result<()> {
350                Ok(())
351            }
352        }
353
354        CmdBufferedArgGuard(self)
355    }
356
357    fn reserve_space_for_args(&mut self, additional: impl IntoIterator<Item = usize>) {
358        let mut capacity = 0;
359        let mut args = 0;
360        for add in additional {
361            capacity += add;
362            args += 1;
363        }
364        self.data.reserve(capacity);
365        self.args.reserve(args);
366    }
367
368    #[cfg(feature = "bytes")]
369    fn bufmut_for_next_arg(&mut self, capacity: usize) -> impl bytes::BufMut + '_ {
370        self.data.reserve(capacity);
371        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
372        impl Drop for CmdBufferedArgGuard<'_> {
373            fn drop(&mut self) {
374                self.0.args.push(Arg::Simple(self.0.data.len()));
375            }
376        }
377        unsafe impl bytes::BufMut for CmdBufferedArgGuard<'_> {
378            fn remaining_mut(&self) -> usize {
379                self.0.data.remaining_mut()
380            }
381
382            unsafe fn advance_mut(&mut self, cnt: usize) {
383                self.0.data.advance_mut(cnt);
384            }
385
386            fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
387                self.0.data.chunk_mut()
388            }
389
390            // Vec specializes these methods, so we do too
391            fn put<T: bytes::buf::Buf>(&mut self, src: T)
392            where
393                Self: Sized,
394            {
395                self.0.data.put(src);
396            }
397
398            fn put_slice(&mut self, src: &[u8]) {
399                self.0.data.put_slice(src);
400            }
401
402            fn put_bytes(&mut self, val: u8, cnt: usize) {
403                self.0.data.put_bytes(val, cnt);
404            }
405        }
406
407        CmdBufferedArgGuard(self)
408    }
409}
410
411impl Default for Cmd {
412    fn default() -> Cmd {
413        Cmd::new()
414    }
415}
416
417/// A command acts as a builder interface to creating encoded redis
418/// requests.  This allows you to easily assemble a packed command
419/// by chaining arguments together.
420///
421/// Basic example:
422///
423/// ```rust
424/// redis::Cmd::new().arg("SET").arg("my_key").arg(42);
425/// ```
426///
427/// There is also a helper function called `cmd` which makes it a
428/// tiny bit shorter:
429///
430/// ```rust
431/// redis::cmd("SET").arg("my_key").arg(42);
432/// ```
433///
434/// Because Rust currently does not have an ideal system
435/// for lifetimes of temporaries, sometimes you need to hold on to
436/// the initially generated command:
437///
438/// ```rust,no_run
439/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
440/// # let mut con = client.get_connection().unwrap();
441/// let mut cmd = redis::cmd("SMEMBERS");
442/// let mut iter : redis::Iter<i32> = cmd.arg("my_set").clone().iter(&mut con).unwrap();
443/// ```
444impl Cmd {
445    /// Creates a new empty command.
446    pub fn new() -> Cmd {
447        Cmd {
448            data: vec![],
449            args: vec![],
450            cursor: None,
451            no_response: false,
452            #[cfg(feature = "cache-aio")]
453            cache: None,
454        }
455    }
456
457    /// Creates a new empty command, with at least the requested capacity.
458    pub fn with_capacity(arg_count: usize, size_of_data: usize) -> Cmd {
459        Cmd {
460            data: Vec::with_capacity(size_of_data),
461            args: Vec::with_capacity(arg_count),
462            cursor: None,
463            no_response: false,
464            #[cfg(feature = "cache-aio")]
465            cache: None,
466        }
467    }
468
469    /// Get the capacities for the internal buffers.
470    #[cfg(test)]
471    #[allow(dead_code)]
472    pub(crate) fn capacity(&self) -> (usize, usize) {
473        (self.args.capacity(), self.data.capacity())
474    }
475
476    /// Appends an argument to the command.  The argument passed must
477    /// be a type that implements `ToRedisArgs`.  Most primitive types as
478    /// well as vectors of primitive types implement it.
479    ///
480    /// For instance all of the following are valid:
481    ///
482    /// ```rust,no_run
483    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
484    /// # let mut con = client.get_connection().unwrap();
485    /// redis::cmd("SET").arg(&["my_key", "my_value"]);
486    /// redis::cmd("SET").arg("my_key").arg(42);
487    /// redis::cmd("SET").arg("my_key").arg(b"my_value");
488    /// ```
489    #[inline]
490    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
491        arg.write_redis_args(self);
492        self
493    }
494
495    /// Works similar to `arg` but adds a cursor argument.  This is always
496    /// an integer and also flips the command implementation to support a
497    /// different mode for the iterators where the iterator will ask for
498    /// another batch of items when the local data is exhausted.
499    ///
500    /// ```rust,no_run
501    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
502    /// # let mut con = client.get_connection().unwrap();
503    /// let mut cmd = redis::cmd("SSCAN");
504    /// let mut iter : redis::Iter<isize> =
505    ///     cmd.arg("my_set").cursor_arg(0).clone().iter(&mut con).unwrap();
506    /// for x in iter {
507    ///     // do something with the item
508    /// }
509    /// ```
510    #[inline]
511    pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
512        assert!(!self.in_scan_mode());
513        self.cursor = Some(cursor);
514        self.args.push(Arg::Cursor);
515        self
516    }
517
518    /// Returns the packed command as a byte vector.
519    #[inline]
520    pub fn get_packed_command(&self) -> Vec<u8> {
521        let mut cmd = Vec::new();
522        self.write_packed_command(&mut cmd);
523        cmd
524    }
525
526    pub(crate) fn write_packed_command(&self, cmd: &mut Vec<u8>) {
527        write_command_to_vec(cmd, self.args_iter(), self.cursor.unwrap_or(0))
528    }
529
530    pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
531        write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
532    }
533
534    /// Like `get_packed_command` but replaces the cursor with the
535    /// provided value.  If the command is not in scan mode, `None`
536    /// is returned.
537    #[inline]
538    fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
539        if !self.in_scan_mode() {
540            None
541        } else {
542            Some(encode_command(self.args_iter(), cursor))
543        }
544    }
545
546    /// Returns true if the command is in scan mode.
547    #[inline]
548    pub fn in_scan_mode(&self) -> bool {
549        self.cursor.is_some()
550    }
551
552    /// Sends the command as query to the connection and converts the
553    /// result to the target redis value.  This is the general way how
554    /// you can retrieve data.
555    #[inline]
556    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
557        match con.req_command(self) {
558            Ok(val) => from_owned_redis_value(val.extract_error()?),
559            Err(e) => Err(e),
560        }
561    }
562
563    /// Async version of `query`.
564    #[inline]
565    #[cfg(feature = "aio")]
566    pub async fn query_async<T: FromRedisValue>(
567        &self,
568        con: &mut impl crate::aio::ConnectionLike,
569    ) -> RedisResult<T> {
570        let val = con.req_packed_command(self).await?;
571        from_owned_redis_value(val.extract_error()?)
572    }
573
574    /// Similar to `query()` but returns an iterator over the items of the
575    /// bulk result or iterator.  In normal mode this is not in any way more
576    /// efficient than just querying into a `Vec<T>` as it's internally
577    /// implemented as buffering into a vector.  This however is useful when
578    /// `cursor_arg` was used in which case the iterator will query for more
579    /// items until the server side cursor is exhausted.
580    ///
581    /// This is useful for commands such as `SSCAN`, `SCAN` and others.
582    ///
583    /// One speciality of this function is that it will check if the response
584    /// looks like a cursor or not and always just looks at the payload.
585    /// This way you can use the function the same for responses in the
586    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
587    /// tuple of cursor and list).
588    #[inline]
589    pub fn iter<T: FromRedisValue>(self, con: &mut dyn ConnectionLike) -> RedisResult<Iter<'_, T>> {
590        let rv = con.req_command(&self)?;
591
592        let (cursor, batch) = if rv.looks_like_cursor() {
593            from_owned_redis_value::<(u64, Vec<T>)>(rv)?
594        } else {
595            (0, from_owned_redis_value(rv)?)
596        };
597
598        Ok(Iter {
599            batch: batch.into_iter(),
600            cursor,
601            con,
602            cmd: self,
603        })
604    }
605
606    /// Similar to `iter()` but returns an AsyncIter over the items of the
607    /// bulk result or iterator.  A [futures::Stream](https://docs.rs/futures/0.3.3/futures/stream/trait.Stream.html)
608    /// is implemented on AsyncIter. In normal mode this is not in any way more
609    /// efficient than just querying into a `Vec<T>` as it's internally
610    /// implemented as buffering into a vector.  This however is useful when
611    /// `cursor_arg` was used in which case the stream will query for more
612    /// items until the server side cursor is exhausted.
613    ///
614    /// This is useful for commands such as `SSCAN`, `SCAN` and others in async contexts.
615    ///
616    /// One speciality of this function is that it will check if the response
617    /// looks like a cursor or not and always just looks at the payload.
618    /// This way you can use the function the same for responses in the
619    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
620    /// tuple of cursor and list).
621    #[cfg(feature = "aio")]
622    #[inline]
623    pub async fn iter_async<'a, T: FromRedisValue + 'a>(
624        mut self,
625        con: &'a mut (dyn AsyncConnection + Send),
626    ) -> RedisResult<AsyncIter<'a, T>> {
627        let rv = con.req_packed_command(&self).await?;
628
629        let (cursor, batch) = if rv.looks_like_cursor() {
630            from_owned_redis_value::<(u64, Vec<T>)>(rv)?
631        } else {
632            (0, from_owned_redis_value(rv)?)
633        };
634        if cursor == 0 {
635            self.cursor = None;
636        } else {
637            self.cursor = Some(cursor);
638        }
639
640        Ok(AsyncIter {
641            inner: IterOrFuture::Iter(AsyncIterInner {
642                batch: batch.into_iter(),
643                con,
644                cmd: self,
645            }),
646        })
647    }
648
649    /// This is a shortcut to `query()` that does not return a value and
650    /// will fail the task if the query fails because of an error.  This is
651    /// mainly useful in examples and for simple commands like setting
652    /// keys.
653    ///
654    /// This is equivalent to a call of query like this:
655    ///
656    /// ```rust,no_run
657    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
658    /// # let mut con = client.get_connection().unwrap();
659    /// redis::cmd("PING").query::<()>(&mut con).unwrap();
660    /// ```
661    #[inline]
662    #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
663    pub fn execute(&self, con: &mut dyn ConnectionLike) {
664        self.exec(con).unwrap();
665    }
666
667    /// This is an alternative to `query`` that can be used if you want to be able to handle a
668    /// command's success or failure but don't care about the command's response. For example,
669    /// this is useful for "SET" commands for which the response's content is not important.
670    /// It avoids the need to define generic bounds for ().
671    #[inline]
672    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
673        self.query::<()>(con)
674    }
675
676    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
677    /// command's success or failure but don't care about the command's response. For example,
678    /// this is useful for "SET" commands for which the response's content is not important.
679    /// It avoids the need to define generic bounds for ().
680    #[cfg(feature = "aio")]
681    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
682        self.query_async::<()>(con).await
683    }
684
685    /// Returns an iterator over the arguments in this command (including the command name itself)
686    pub fn args_iter(&self) -> impl Clone + ExactSizeIterator<Item = Arg<&[u8]>> {
687        let mut prev = 0;
688        self.args.iter().map(move |arg| match *arg {
689            Arg::Simple(i) => {
690                let arg = Arg::Simple(&self.data[prev..i]);
691                prev = i;
692                arg
693            }
694
695            Arg::Cursor => Arg::Cursor,
696        })
697    }
698
699    // Get a reference to the argument at `idx`
700    #[cfg(any(feature = "cluster", feature = "cache-aio"))]
701    pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
702        if idx >= self.args.len() {
703            return None;
704        }
705
706        let start = if idx == 0 {
707            0
708        } else {
709            match self.args[idx - 1] {
710                Arg::Simple(n) => n,
711                _ => 0,
712            }
713        };
714        let end = match self.args[idx] {
715            Arg::Simple(n) => n,
716            _ => 0,
717        };
718        if start == 0 && end == 0 {
719            return None;
720        }
721        Some(&self.data[start..end])
722    }
723
724    /// Client won't read and wait for results. Currently only used for Pub/Sub commands in RESP3.
725    #[inline]
726    pub fn set_no_response(&mut self, nr: bool) -> &mut Cmd {
727        self.no_response = nr;
728        self
729    }
730
731    /// Check whether command's result will be waited for.
732    #[inline]
733    pub fn is_no_response(&self) -> bool {
734        self.no_response
735    }
736
737    /// Changes caching behaviour for this specific command.
738    #[cfg(feature = "cache-aio")]
739    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
740    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Cmd {
741        self.cache = Some(command_cache_config);
742        self
743    }
744
745    #[cfg(feature = "cache-aio")]
746    #[inline]
747    pub(crate) fn get_cache_config(&self) -> &Option<CommandCacheConfig> {
748        &self.cache
749    }
750}
751
752/// Shortcut function to creating a command with a single argument.
753///
754/// The first argument of a redis command is always the name of the command
755/// which needs to be a string.  This is the recommended way to start a
756/// command pipe.
757///
758/// ```rust
759/// redis::cmd("PING");
760/// ```
761pub fn cmd(name: &str) -> Cmd {
762    let mut rv = Cmd::new();
763    rv.arg(name);
764    rv
765}
766
767/// Packs a bunch of commands into a request.
768///
769/// This is generally a quite useless function as this functionality is
770/// nicely wrapped through the `Cmd` object, but in some cases it can be
771/// useful.  The return value of this can then be send to the low level
772/// `ConnectionLike` methods.
773///
774/// Example:
775///
776/// ```rust
777/// # use redis::ToRedisArgs;
778/// let mut args = vec![];
779/// args.extend("SET".to_redis_args());
780/// args.extend("my_key".to_redis_args());
781/// args.extend(42.to_redis_args());
782/// let cmd = redis::pack_command(&args);
783/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
784/// ```
785pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
786    encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
787}
788
789/// Shortcut for creating a new pipeline.
790pub fn pipe() -> Pipeline {
791    Pipeline::new()
792}
793
794#[cfg(test)]
795mod tests {
796    use super::Cmd;
797    #[cfg(feature = "bytes")]
798    use bytes::BufMut;
799
800    use crate::RedisWrite;
801    use std::io::Write;
802
803    #[test]
804    fn test_cmd_writer_for_next_arg() {
805        // Test that a write split across multiple calls to `write` produces the
806        // same result as a single call to `write_arg`
807        let mut c1 = Cmd::new();
808        {
809            let mut c1_writer = c1.writer_for_next_arg();
810            c1_writer.write_all(b"foo").unwrap();
811            c1_writer.write_all(b"bar").unwrap();
812            c1_writer.flush().unwrap();
813        }
814        let v1 = c1.get_packed_command();
815
816        let mut c2 = Cmd::new();
817        c2.write_arg(b"foobar");
818        let v2 = c2.get_packed_command();
819
820        assert_eq!(v1, v2);
821    }
822
823    // Test that multiple writers to the same command produce the same
824    // result as the same multiple calls to `write_arg`
825    #[test]
826    fn test_cmd_writer_for_next_arg_multiple() {
827        let mut c1 = Cmd::new();
828        {
829            let mut c1_writer = c1.writer_for_next_arg();
830            c1_writer.write_all(b"foo").unwrap();
831            c1_writer.write_all(b"bar").unwrap();
832            c1_writer.flush().unwrap();
833        }
834        {
835            let mut c1_writer = c1.writer_for_next_arg();
836            c1_writer.write_all(b"baz").unwrap();
837            c1_writer.write_all(b"qux").unwrap();
838            c1_writer.flush().unwrap();
839        }
840        let v1 = c1.get_packed_command();
841
842        let mut c2 = Cmd::new();
843        c2.write_arg(b"foobar");
844        c2.write_arg(b"bazqux");
845        let v2 = c2.get_packed_command();
846
847        assert_eq!(v1, v2);
848    }
849
850    // Test that an "empty" write produces the equivalent to `write_arg(b"")`
851    #[test]
852    fn test_cmd_writer_for_next_arg_empty() {
853        let mut c1 = Cmd::new();
854        {
855            let mut c1_writer = c1.writer_for_next_arg();
856            c1_writer.flush().unwrap();
857        }
858        let v1 = c1.get_packed_command();
859
860        let mut c2 = Cmd::new();
861        c2.write_arg(b"");
862        let v2 = c2.get_packed_command();
863
864        assert_eq!(v1, v2);
865    }
866
867    #[cfg(feature = "bytes")]
868    /// Test that a write split across multiple calls to `write` produces the
869    /// same result as a single call to `write_arg`
870    #[test]
871    fn test_cmd_bufmut_for_next_arg() {
872        let mut c1 = Cmd::new();
873        {
874            let mut c1_writer = c1.bufmut_for_next_arg(6);
875            c1_writer.put_slice(b"foo");
876            c1_writer.put_slice(b"bar");
877        }
878        let v1 = c1.get_packed_command();
879
880        let mut c2 = Cmd::new();
881        c2.write_arg(b"foobar");
882        let v2 = c2.get_packed_command();
883
884        assert_eq!(v1, v2);
885    }
886
887    #[cfg(feature = "bytes")]
888    /// Test that multiple writers to the same command produce the same
889    /// result as the same multiple calls to `write_arg`
890    #[test]
891    fn test_cmd_bufmut_for_next_arg_multiple() {
892        let mut c1 = Cmd::new();
893        {
894            let mut c1_writer = c1.bufmut_for_next_arg(6);
895            c1_writer.put_slice(b"foo");
896            c1_writer.put_slice(b"bar");
897        }
898        {
899            let mut c1_writer = c1.bufmut_for_next_arg(6);
900            c1_writer.put_slice(b"baz");
901            c1_writer.put_slice(b"qux");
902        }
903        let v1 = c1.get_packed_command();
904
905        let mut c2 = Cmd::new();
906        c2.write_arg(b"foobar");
907        c2.write_arg(b"bazqux");
908        let v2 = c2.get_packed_command();
909
910        assert_eq!(v1, v2);
911    }
912
913    #[cfg(feature = "bytes")]
914    /// Test that an "empty" write produces the equivalent to `write_arg(b"")`
915    #[test]
916    fn test_cmd_bufmut_for_next_arg_empty() {
917        let mut c1 = Cmd::new();
918        {
919            let _c1_writer = c1.bufmut_for_next_arg(0);
920        }
921        let v1 = c1.get_packed_command();
922
923        let mut c2 = Cmd::new();
924        c2.write_arg(b"");
925        let v2 = c2.get_packed_command();
926
927        assert_eq!(v1, v2);
928    }
929
930    #[test]
931    #[cfg(feature = "cluster")]
932    fn test_cmd_arg_idx() {
933        let mut c = Cmd::new();
934        assert_eq!(c.arg_idx(0), None);
935
936        c.arg("SET");
937        assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
938        assert_eq!(c.arg_idx(1), None);
939
940        c.arg("foo").arg("42");
941        assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
942        assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
943        assert_eq!(c.arg_idx(3), None);
944        assert_eq!(c.arg_idx(4), None);
945    }
946}