redis/
cmd.rs

1#[cfg(feature = "aio")]
2use futures_util::{
3    future::BoxFuture,
4    task::{Context, Poll},
5    Stream, StreamExt,
6};
7#[cfg(feature = "aio")]
8use std::pin::Pin;
9#[cfg(feature = "cache-aio")]
10use std::time::Duration;
11use std::{fmt, io};
12
13use crate::connection::ConnectionLike;
14use crate::pipeline::Pipeline;
15use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
16
17/// An argument to a redis command
18#[derive(Clone)]
19pub enum Arg<D> {
20    /// A normal argument
21    Simple(D),
22    /// A cursor argument created from `cursor_arg()`
23    Cursor,
24}
25
26/// CommandCacheConfig is used to define caching behaviour of individual commands.
27/// # Example
28/// ```rust
29/// use std::time::Duration;
30/// use redis::{CommandCacheConfig, Cmd};
31///
32/// let ttl = Duration::from_secs(120); // 2 minutes TTL
33/// let config = CommandCacheConfig::new()
34///     .set_enable_cache(true)
35///     .set_client_side_ttl(ttl);
36/// let command = Cmd::new().arg("GET").arg("key").set_cache_config(config);
37/// ```
38#[cfg(feature = "cache-aio")]
39#[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
40#[derive(Clone)]
41pub struct CommandCacheConfig {
42    pub(crate) enable_cache: bool,
43    pub(crate) client_side_ttl: Option<Duration>,
44}
45
46#[cfg(feature = "cache-aio")]
47impl CommandCacheConfig {
48    /// Creates new CommandCacheConfig with enable_cache as true and without client_side_ttl.
49    pub fn new() -> Self {
50        Self {
51            enable_cache: true,
52            client_side_ttl: None,
53        }
54    }
55
56    /// Sets whether the cache should be enabled or not.
57    /// Disabling cache for specific command when using [crate::caching::CacheMode::All] will not work.
58    pub fn set_enable_cache(mut self, enable_cache: bool) -> Self {
59        self.enable_cache = enable_cache;
60        self
61    }
62
63    /// Sets custom client side time to live (TTL).
64    pub fn set_client_side_ttl(mut self, client_side_ttl: Duration) -> Self {
65        self.client_side_ttl = Some(client_side_ttl);
66        self
67    }
68}
69#[cfg(feature = "cache-aio")]
70impl Default for CommandCacheConfig {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76/// Represents redis commands.
77#[derive(Clone)]
78pub struct Cmd {
79    pub(crate) data: Vec<u8>,
80    // Arg::Simple contains the offset that marks the end of the argument
81    args: Vec<Arg<usize>>,
82    cursor: Option<u64>,
83    // If it's true command's response won't be read from socket. Useful for Pub/Sub.
84    no_response: bool,
85    #[cfg(feature = "cache-aio")]
86    cache: Option<CommandCacheConfig>,
87}
88
89#[cfg_attr(
90    not(feature = "safe_iterators"),
91    deprecated(
92        note = "Deprecated due to the fact that this implementation silently stops at the first value that can't be converted to T. Enable the feature `safe_iterators` for a safe version."
93    )
94)]
95/// Represents a redis iterator.
96pub struct Iter<'a, T: FromRedisValue> {
97    iter: CheckedIter<'a, T>,
98}
99
100#[cfg(not(feature = "safe_iterators"))]
101impl<T: FromRedisValue> Iterator for Iter<'_, T> {
102    type Item = T;
103
104    #[inline]
105    fn next(&mut self) -> Option<T> {
106        // use the checked iterator, but keep the behavior of the deprecated
107        // iterator.  This will return silently `None` if an error occurs.
108        self.iter.next()?.ok()
109    }
110}
111
112#[cfg(feature = "safe_iterators")]
113impl<T: FromRedisValue> Iterator for Iter<'_, T> {
114    type Item = RedisResult<T>;
115
116    #[inline]
117    fn next(&mut self) -> Option<RedisResult<T>> {
118        self.iter.next()
119    }
120}
121
122/// Represents a safe(r) redis iterator.
123struct CheckedIter<'a, T: FromRedisValue> {
124    batch: std::vec::IntoIter<RedisResult<T>>,
125    con: &'a mut (dyn ConnectionLike + 'a),
126    cmd: Cmd,
127}
128
129impl<T: FromRedisValue> Iterator for CheckedIter<'_, T> {
130    type Item = RedisResult<T>;
131
132    #[inline]
133    fn next(&mut self) -> Option<RedisResult<T>> {
134        // we need to do this in a loop until we produce at least one item
135        // or we find the actual end of the iteration.  This is necessary
136        // because with filtering an iterator it is possible that a whole
137        // chunk is not matching the pattern and thus yielding empty results.
138        loop {
139            if let Some(value) = self.batch.next() {
140                return Some(value);
141            };
142
143            if self.cmd.cursor? == 0 {
144                return None;
145            }
146
147            let (cursor, batch) = match self
148                .con
149                .req_packed_command(&self.cmd.get_packed_command())
150                .and_then(from_owned_redis_value::<(u64, _)>)
151            {
152                Ok((cursor, values)) => (cursor, T::from_each_owned_redis_values(values)),
153                Err(e) => return Some(Err(e)),
154            };
155
156            self.cmd.cursor = Some(cursor);
157            self.batch = batch.into_iter();
158        }
159    }
160}
161
162#[cfg(feature = "aio")]
163use crate::aio::ConnectionLike as AsyncConnection;
164
165/// The inner future of AsyncIter
166#[cfg(feature = "aio")]
167struct AsyncIterInner<'a, T: FromRedisValue + 'a> {
168    batch: std::vec::IntoIter<RedisResult<T>>,
169    con: &'a mut (dyn AsyncConnection + Send + 'a),
170    cmd: Cmd,
171}
172
173/// Represents the state of AsyncIter
174#[cfg(feature = "aio")]
175enum IterOrFuture<'a, T: FromRedisValue + 'a> {
176    Iter(AsyncIterInner<'a, T>),
177    Future(BoxFuture<'a, (AsyncIterInner<'a, T>, Option<RedisResult<T>>)>),
178    Empty,
179}
180
181/// Represents a redis iterator that can be used with async connections.
182#[cfg(feature = "aio")]
183#[cfg_attr(
184    all(feature = "aio", not(feature = "safe_iterators")),
185    deprecated(
186        note = "Deprecated due to the fact that this implementation silently stops at the first value that can't be converted to T. Enable the feature `safe_iterators` for a safe version."
187    )
188)]
189pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
190    inner: IterOrFuture<'a, T>,
191}
192
193#[cfg(feature = "aio")]
194impl<'a, T: FromRedisValue + 'a> AsyncIterInner<'a, T> {
195    async fn next_item(&mut self) -> Option<RedisResult<T>> {
196        // we need to do this in a loop until we produce at least one item
197        // or we find the actual end of the iteration.  This is necessary
198        // because with filtering an iterator it is possible that a whole
199        // chunk is not matching the pattern and thus yielding empty results.
200        loop {
201            if let Some(v) = self.batch.next() {
202                return Some(v);
203            };
204
205            if self.cmd.cursor? == 0 {
206                return None;
207            }
208
209            let (cursor, batch) = match self
210                .con
211                .req_packed_command(&self.cmd)
212                .await
213                .and_then(from_owned_redis_value::<(u64, _)>)
214            {
215                Ok((cursor, items)) => (cursor, T::from_each_owned_redis_values(items)),
216                Err(e) => return Some(Err(e)),
217            };
218
219            self.cmd.cursor = Some(cursor);
220            self.batch = batch.into_iter();
221        }
222    }
223}
224
225#[cfg(feature = "aio")]
226impl<'a, T: FromRedisValue + 'a + Unpin + Send> AsyncIter<'a, T> {
227    /// ```rust,no_run
228    /// # use redis::AsyncCommands;
229    /// # async fn scan_set() -> redis::RedisResult<()> {
230    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
231    /// # let mut con = client.get_multiplexed_async_connection().await?;
232    /// let _: () = con.sadd("my_set", 42i32).await?;
233    /// let _: () = con.sadd("my_set", 43i32).await?;
234    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
235    /// while let Some(element) = iter.next_item().await {
236    ///     let element = element?;
237    ///     assert!(element == 42 || element == 43);
238    /// }
239    /// # Ok(())
240    /// # }
241    /// ```
242    #[cfg(feature = "safe_iterators")]
243    #[inline]
244    pub async fn next_item(&mut self) -> Option<RedisResult<T>> {
245        StreamExt::next(self).await
246    }
247
248    /// ```rust,no_run
249    /// # use redis::AsyncCommands;
250    /// # async fn scan_set() -> redis::RedisResult<()> {
251    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
252    /// # let mut con = client.get_multiplexed_async_connection().await?;
253    /// let _: () = con.sadd("my_set", 42i32).await?;
254    /// let _: () = con.sadd("my_set", 43i32).await?;
255    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
256    /// while let Some(element) = iter.next_item().await {
257    ///     assert!(element == 42 || element == 43);
258    /// }
259    /// # Ok(())
260    /// # }
261    /// ```
262    #[cfg(not(feature = "safe_iterators"))]
263    #[inline]
264    pub async fn next_item(&mut self) -> Option<T> {
265        StreamExt::next(self).await
266    }
267}
268
269#[cfg(feature = "aio")]
270impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
271    #[cfg(feature = "safe_iterators")]
272    type Item = RedisResult<T>;
273    #[cfg(not(feature = "safe_iterators"))]
274    type Item = T;
275
276    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
277        let this = self.get_mut();
278        let inner = std::mem::replace(&mut this.inner, IterOrFuture::Empty);
279        match inner {
280            IterOrFuture::Iter(mut iter) => {
281                let fut = async move {
282                    let next_item = iter.next_item().await;
283                    (iter, next_item)
284                };
285                this.inner = IterOrFuture::Future(Box::pin(fut));
286                Pin::new(this).poll_next(cx)
287            }
288            IterOrFuture::Future(mut fut) => match fut.as_mut().poll(cx) {
289                Poll::Pending => {
290                    this.inner = IterOrFuture::Future(fut);
291                    Poll::Pending
292                }
293                Poll::Ready((iter, value)) => {
294                    this.inner = IterOrFuture::Iter(iter);
295
296                    #[cfg(feature = "safe_iterators")]
297                    return Poll::Ready(value);
298                    #[cfg(not(feature = "safe_iterators"))]
299                    Poll::Ready(value.map(|res| res.ok()).flatten())
300                }
301            },
302            IterOrFuture::Empty => unreachable!(),
303        }
304    }
305}
306
307fn countdigits(mut v: usize) -> usize {
308    let mut result = 1;
309    loop {
310        if v < 10 {
311            return result;
312        }
313        if v < 100 {
314            return result + 1;
315        }
316        if v < 1000 {
317            return result + 2;
318        }
319        if v < 10000 {
320            return result + 3;
321        }
322
323        v /= 10000;
324        result += 4;
325    }
326}
327
328#[inline]
329fn bulklen(len: usize) -> usize {
330    1 + countdigits(len) + 2 + len + 2
331}
332
333fn args_len<'a, I>(args: I, cursor: u64) -> usize
334where
335    I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
336{
337    let mut totlen = 1 + countdigits(args.len()) + 2;
338    for item in args {
339        totlen += bulklen(match item {
340            Arg::Cursor => countdigits(cursor as usize),
341            Arg::Simple(val) => val.len(),
342        });
343    }
344    totlen
345}
346
347pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
348    args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
349}
350
351fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
352where
353    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
354{
355    let mut cmd = Vec::new();
356    write_command_to_vec(&mut cmd, args, cursor);
357    cmd
358}
359
360fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
361where
362    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
363{
364    let totlen = args_len(args.clone(), cursor);
365
366    cmd.reserve(totlen);
367
368    write_command(cmd, args, cursor).unwrap()
369}
370
371fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
372where
373    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
374{
375    let mut buf = ::itoa::Buffer::new();
376
377    cmd.write_all(b"*")?;
378    let s = buf.format(args.len());
379    cmd.write_all(s.as_bytes())?;
380    cmd.write_all(b"\r\n")?;
381
382    let mut cursor_bytes = itoa::Buffer::new();
383    for item in args {
384        let bytes = match item {
385            Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
386            Arg::Simple(val) => val,
387        };
388
389        cmd.write_all(b"$")?;
390        let s = buf.format(bytes.len());
391        cmd.write_all(s.as_bytes())?;
392        cmd.write_all(b"\r\n")?;
393
394        cmd.write_all(bytes)?;
395        cmd.write_all(b"\r\n")?;
396    }
397    Ok(())
398}
399
400impl RedisWrite for Cmd {
401    fn write_arg(&mut self, arg: &[u8]) {
402        self.data.extend_from_slice(arg);
403        self.args.push(Arg::Simple(self.data.len()));
404    }
405
406    fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
407        use std::io::Write;
408        write!(self.data, "{arg}").unwrap();
409        self.args.push(Arg::Simple(self.data.len()));
410    }
411
412    fn writer_for_next_arg(&mut self) -> impl std::io::Write + '_ {
413        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
414        impl Drop for CmdBufferedArgGuard<'_> {
415            fn drop(&mut self) {
416                self.0.args.push(Arg::Simple(self.0.data.len()));
417            }
418        }
419        impl std::io::Write for CmdBufferedArgGuard<'_> {
420            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
421                self.0.data.extend_from_slice(buf);
422                Ok(buf.len())
423            }
424
425            fn flush(&mut self) -> std::io::Result<()> {
426                Ok(())
427            }
428        }
429
430        CmdBufferedArgGuard(self)
431    }
432
433    fn reserve_space_for_args(&mut self, additional: impl IntoIterator<Item = usize>) {
434        let mut capacity = 0;
435        let mut args = 0;
436        for add in additional {
437            capacity += add;
438            args += 1;
439        }
440        self.data.reserve(capacity);
441        self.args.reserve(args);
442    }
443
444    #[cfg(feature = "bytes")]
445    fn bufmut_for_next_arg(&mut self, capacity: usize) -> impl bytes::BufMut + '_ {
446        self.data.reserve(capacity);
447        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
448        impl Drop for CmdBufferedArgGuard<'_> {
449            fn drop(&mut self) {
450                self.0.args.push(Arg::Simple(self.0.data.len()));
451            }
452        }
453        unsafe impl bytes::BufMut for CmdBufferedArgGuard<'_> {
454            fn remaining_mut(&self) -> usize {
455                self.0.data.remaining_mut()
456            }
457
458            unsafe fn advance_mut(&mut self, cnt: usize) {
459                self.0.data.advance_mut(cnt);
460            }
461
462            fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
463                self.0.data.chunk_mut()
464            }
465
466            // Vec specializes these methods, so we do too
467            fn put<T: bytes::buf::Buf>(&mut self, src: T)
468            where
469                Self: Sized,
470            {
471                self.0.data.put(src);
472            }
473
474            fn put_slice(&mut self, src: &[u8]) {
475                self.0.data.put_slice(src);
476            }
477
478            fn put_bytes(&mut self, val: u8, cnt: usize) {
479                self.0.data.put_bytes(val, cnt);
480            }
481        }
482
483        CmdBufferedArgGuard(self)
484    }
485}
486
487impl Default for Cmd {
488    fn default() -> Cmd {
489        Cmd::new()
490    }
491}
492
493/// A command acts as a builder interface to creating encoded redis
494/// requests.  This allows you to easily assemble a packed command
495/// by chaining arguments together.
496///
497/// Basic example:
498///
499/// ```rust
500/// redis::Cmd::new().arg("SET").arg("my_key").arg(42);
501/// ```
502///
503/// There is also a helper function called `cmd` which makes it a
504/// tiny bit shorter:
505///
506/// ```rust
507/// redis::cmd("SET").arg("my_key").arg(42);
508/// ```
509///
510/// Because Rust currently does not have an ideal system
511/// for lifetimes of temporaries, sometimes you need to hold on to
512/// the initially generated command:
513///
514/// ```rust,no_run
515/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
516/// # let mut con = client.get_connection().unwrap();
517/// let mut cmd = redis::cmd("SMEMBERS");
518/// let mut iter : redis::Iter<i32> = cmd.arg("my_set").clone().iter(&mut con).unwrap();
519/// ```
520impl Cmd {
521    /// Creates a new empty command.
522    pub fn new() -> Cmd {
523        Cmd {
524            data: vec![],
525            args: vec![],
526            cursor: None,
527            no_response: false,
528            #[cfg(feature = "cache-aio")]
529            cache: None,
530        }
531    }
532
533    /// Creates a new empty command, with at least the requested capacity.
534    pub fn with_capacity(arg_count: usize, size_of_data: usize) -> Cmd {
535        Cmd {
536            data: Vec::with_capacity(size_of_data),
537            args: Vec::with_capacity(arg_count),
538            cursor: None,
539            no_response: false,
540            #[cfg(feature = "cache-aio")]
541            cache: None,
542        }
543    }
544
545    /// Get the capacities for the internal buffers.
546    #[cfg(test)]
547    #[allow(dead_code)]
548    pub(crate) fn capacity(&self) -> (usize, usize) {
549        (self.args.capacity(), self.data.capacity())
550    }
551
552    /// Clears the command, resetting it completely.
553    ///
554    /// This is equivalent to [`Cmd::new`], except the buffer capacity is kept.
555    ///
556    /// # Examples
557    ///
558    /// ```rust,no_run
559    /// # use redis::{Client, Cmd};
560    /// # let client = Client::open("redis://127.0.0.1/").unwrap();
561    /// # let mut con = client.get_connection().expect("Failed to connect to Redis");
562    /// let mut cmd = Cmd::new();
563    /// cmd.arg("SET").arg("foo").arg("42");
564    /// cmd.query::<()>(&mut con).expect("Query failed");
565    /// cmd.clear();
566    /// // This reuses the allocations of the previous command
567    /// cmd.arg("SET").arg("bar").arg("42");
568    /// cmd.query::<()>(&mut con).expect("Query failed");
569    /// ```
570    pub fn clear(&mut self) {
571        self.data.clear();
572        self.args.clear();
573        self.cursor = None;
574        self.no_response = false;
575        #[cfg(feature = "cache-aio")]
576        {
577            self.cache = None;
578        }
579    }
580
581    /// Appends an argument to the command.  The argument passed must
582    /// be a type that implements `ToRedisArgs`.  Most primitive types as
583    /// well as vectors of primitive types implement it.
584    ///
585    /// For instance all of the following are valid:
586    ///
587    /// ```rust,no_run
588    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
589    /// # let mut con = client.get_connection().unwrap();
590    /// redis::cmd("SET").arg(&["my_key", "my_value"]);
591    /// redis::cmd("SET").arg("my_key").arg(42);
592    /// redis::cmd("SET").arg("my_key").arg(b"my_value");
593    /// ```
594    #[inline]
595    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
596        arg.write_redis_args(self);
597        self
598    }
599
600    /// Works similar to `arg` but adds a cursor argument.
601    ///
602    /// This is always an integer and also flips the command implementation to support a
603    /// different mode for the iterators where the iterator will ask for
604    /// another batch of items when the local data is exhausted.
605    /// Calling this function more than once will overwrite the previous cursor with the latest set value.
606    ///
607    /// ```rust,no_run
608    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
609    /// # let mut con = client.get_connection().unwrap();
610    /// let mut cmd = redis::cmd("SSCAN");
611    /// let mut iter : redis::Iter<isize> =
612    ///     cmd.arg("my_set").cursor_arg(0).clone().iter(&mut con).unwrap();
613    /// for x in iter {
614    ///     // do something with the item
615    /// }
616    /// ```
617    #[inline]
618    pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
619        self.cursor = Some(cursor);
620        self.args.push(Arg::Cursor);
621        self
622    }
623
624    /// Returns the packed command as a byte vector.
625    ///
626    /// This is a wrapper around [`write_packed_command`] that creates a [`Vec`] to write to.
627    ///
628    /// [`write_packed_command`]: Self::write_packed_command
629    #[inline]
630    pub fn get_packed_command(&self) -> Vec<u8> {
631        let mut cmd = Vec::new();
632        self.write_packed_command(&mut cmd);
633        cmd
634    }
635
636    /// Writes the packed command to `dst`.
637    ///
638    /// This will *append* the packed command.
639    ///
640    /// See also [`get_packed_command`].
641    ///
642    /// [`get_packed_command`]: Self::get_packed_command.
643    #[inline]
644    pub fn write_packed_command(&self, dst: &mut Vec<u8>) {
645        write_command_to_vec(dst, self.args_iter(), self.cursor.unwrap_or(0))
646    }
647
648    pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
649        write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
650    }
651
652    /// Returns true if the command is in scan mode.
653    #[inline]
654    pub fn in_scan_mode(&self) -> bool {
655        self.cursor.is_some()
656    }
657
658    /// Sends the command as query to the connection and converts the
659    /// result to the target redis value.  This is the general way how
660    /// you can retrieve data.
661    #[inline]
662    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
663        match con.req_command(self) {
664            Ok(val) => from_owned_redis_value(val.extract_error()?),
665            Err(e) => Err(e),
666        }
667    }
668
669    /// Async version of `query`.
670    #[inline]
671    #[cfg(feature = "aio")]
672    pub async fn query_async<T: FromRedisValue>(
673        &self,
674        con: &mut impl crate::aio::ConnectionLike,
675    ) -> RedisResult<T> {
676        let val = con.req_packed_command(self).await?;
677        from_owned_redis_value(val.extract_error()?)
678    }
679
680    /// Sets the cursor and converts the passed value to a batch used by the
681    /// iterators.
682    fn set_cursor_and_get_batch<T: FromRedisValue>(
683        &mut self,
684        value: crate::Value,
685    ) -> RedisResult<Vec<RedisResult<T>>> {
686        let (cursor, values) = if value.looks_like_cursor() {
687            let (cursor, values) = from_owned_redis_value::<(u64, _)>(value)?;
688            (cursor, values)
689        } else {
690            (0, from_owned_redis_value(value)?)
691        };
692
693        self.cursor = Some(cursor);
694
695        Ok(T::from_each_owned_redis_values(values))
696    }
697
698    /// Similar to `query()` but returns an iterator over the items of the
699    /// bulk result or iterator.  In normal mode this is not in any way more
700    /// efficient than just querying into a `Vec<T>` as it's internally
701    /// implemented as buffering into a vector.  This however is useful when
702    /// `cursor_arg` was used in which case the iterator will query for more
703    /// items until the server side cursor is exhausted.
704    ///
705    /// This is useful for commands such as `SSCAN`, `SCAN` and others.
706    ///
707    /// One speciality of this function is that it will check if the response
708    /// looks like a cursor or not and always just looks at the payload.
709    /// This way you can use the function the same for responses in the
710    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
711    /// tuple of cursor and list).
712    #[inline]
713    pub fn iter<T: FromRedisValue>(
714        mut self,
715        con: &mut dyn ConnectionLike,
716    ) -> RedisResult<Iter<'_, T>> {
717        let rv = con.req_command(&self)?;
718
719        let batch = self.set_cursor_and_get_batch(rv)?;
720
721        Ok(Iter {
722            iter: CheckedIter {
723                batch: batch.into_iter(),
724                con,
725                cmd: self,
726            },
727        })
728    }
729
730    /// Similar to `iter()` but returns an AsyncIter over the items of the
731    /// bulk result or iterator.  A [futures::Stream](https://docs.rs/futures/0.3.3/futures/stream/trait.Stream.html)
732    /// is implemented on AsyncIter. In normal mode this is not in any way more
733    /// efficient than just querying into a `Vec<T>` as it's internally
734    /// implemented as buffering into a vector.  This however is useful when
735    /// `cursor_arg` was used in which case the stream will query for more
736    /// items until the server side cursor is exhausted.
737    ///
738    /// This is useful for commands such as `SSCAN`, `SCAN` and others in async contexts.
739    ///
740    /// One speciality of this function is that it will check if the response
741    /// looks like a cursor or not and always just looks at the payload.
742    /// This way you can use the function the same for responses in the
743    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
744    /// tuple of cursor and list).
745    #[cfg(feature = "aio")]
746    #[inline]
747    pub async fn iter_async<'a, T: FromRedisValue + 'a>(
748        mut self,
749        con: &'a mut (dyn AsyncConnection + Send),
750    ) -> RedisResult<AsyncIter<'a, T>> {
751        let rv = con.req_packed_command(&self).await?;
752
753        let batch = self.set_cursor_and_get_batch(rv)?;
754
755        Ok(AsyncIter {
756            inner: IterOrFuture::Iter(AsyncIterInner {
757                batch: batch.into_iter(),
758                con,
759                cmd: self,
760            }),
761        })
762    }
763
764    /// This is a shortcut to `query()` that does not return a value and
765    /// will fail the task if the query fails because of an error.  This is
766    /// mainly useful in examples and for simple commands like setting
767    /// keys.
768    ///
769    /// This is equivalent to a call of query like this:
770    ///
771    /// ```rust,no_run
772    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
773    /// # let mut con = client.get_connection().unwrap();
774    /// redis::cmd("PING").query::<()>(&mut con).unwrap();
775    /// ```
776    #[inline]
777    #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
778    pub fn execute(&self, con: &mut dyn ConnectionLike) {
779        self.exec(con).unwrap();
780    }
781
782    /// This is an alternative to `query`` that can be used if you want to be able to handle a
783    /// command's success or failure but don't care about the command's response. For example,
784    /// this is useful for "SET" commands for which the response's content is not important.
785    /// It avoids the need to define generic bounds for ().
786    #[inline]
787    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
788        self.query::<()>(con)
789    }
790
791    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
792    /// command's success or failure but don't care about the command's response. For example,
793    /// this is useful for "SET" commands for which the response's content is not important.
794    /// It avoids the need to define generic bounds for ().
795    #[cfg(feature = "aio")]
796    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
797        self.query_async::<()>(con).await
798    }
799
800    /// Returns an iterator over the arguments in this command (including the command name itself)
801    pub fn args_iter(&self) -> impl Clone + ExactSizeIterator<Item = Arg<&[u8]>> {
802        let mut prev = 0;
803        self.args.iter().map(move |arg| match *arg {
804            Arg::Simple(i) => {
805                let arg = Arg::Simple(&self.data[prev..i]);
806                prev = i;
807                arg
808            }
809
810            Arg::Cursor => Arg::Cursor,
811        })
812    }
813
814    // Get a reference to the argument at `idx`
815    #[cfg(any(feature = "cluster", feature = "cache-aio"))]
816    pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
817        if idx >= self.args.len() {
818            return None;
819        }
820
821        let start = if idx == 0 {
822            0
823        } else {
824            match self.args[idx - 1] {
825                Arg::Simple(n) => n,
826                _ => 0,
827            }
828        };
829        let end = match self.args[idx] {
830            Arg::Simple(n) => n,
831            _ => 0,
832        };
833        if start == 0 && end == 0 {
834            return None;
835        }
836        Some(&self.data[start..end])
837    }
838
839    /// Client won't read and wait for results. Currently only used for Pub/Sub commands in RESP3.
840    #[inline]
841    pub fn set_no_response(&mut self, nr: bool) -> &mut Cmd {
842        self.no_response = nr;
843        self
844    }
845
846    /// Check whether command's result will be waited for.
847    #[inline]
848    pub fn is_no_response(&self) -> bool {
849        self.no_response
850    }
851
852    /// Changes caching behaviour for this specific command.
853    #[cfg(feature = "cache-aio")]
854    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
855    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Cmd {
856        self.cache = Some(command_cache_config);
857        self
858    }
859
860    #[cfg(feature = "cache-aio")]
861    #[inline]
862    pub(crate) fn get_cache_config(&self) -> &Option<CommandCacheConfig> {
863        &self.cache
864    }
865}
866
867/// Shortcut function to creating a command with a single argument.
868///
869/// The first argument of a redis command is always the name of the command
870/// which needs to be a string.  This is the recommended way to start a
871/// command pipe.
872///
873/// ```rust
874/// redis::cmd("PING");
875/// ```
876pub fn cmd(name: &str) -> Cmd {
877    let mut rv = Cmd::new();
878    rv.arg(name);
879    rv
880}
881
882/// Packs a bunch of commands into a request.
883///
884/// This is generally a quite useless function as this functionality is
885/// nicely wrapped through the `Cmd` object, but in some cases it can be
886/// useful.  The return value of this can then be send to the low level
887/// `ConnectionLike` methods.
888///
889/// Example:
890///
891/// ```rust
892/// # use redis::ToRedisArgs;
893/// let mut args = vec![];
894/// args.extend("SET".to_redis_args());
895/// args.extend("my_key".to_redis_args());
896/// args.extend(42.to_redis_args());
897/// let cmd = redis::pack_command(&args);
898/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
899/// ```
900pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
901    encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
902}
903
904/// Shortcut for creating a new pipeline.
905pub fn pipe() -> Pipeline {
906    Pipeline::new()
907}
908
909#[cfg(test)]
910mod tests {
911    use super::Cmd;
912    #[cfg(feature = "bytes")]
913    use bytes::BufMut;
914
915    use crate::RedisWrite;
916    use std::io::Write;
917
918    #[test]
919    fn test_cmd_clean() {
920        let mut cmd = Cmd::new();
921        cmd.arg("key").arg("value");
922        cmd.set_no_response(true);
923        cmd.cursor_arg(24);
924        cmd.clear();
925
926        // Everything should be reset, but the capacity should still be there
927        assert!(cmd.data.is_empty());
928        assert!(cmd.data.capacity() > 0);
929        assert!(cmd.args.is_empty());
930        assert!(cmd.args.capacity() > 0);
931        assert_eq!(cmd.cursor, None);
932        assert!(!cmd.no_response);
933    }
934
935    #[test]
936    #[cfg(feature = "cache-aio")]
937    fn test_cmd_clean_cache_aio() {
938        let mut cmd = Cmd::new();
939        cmd.arg("key").arg("value");
940        cmd.set_no_response(true);
941        cmd.cursor_arg(24);
942        cmd.set_cache_config(crate::CommandCacheConfig::default());
943        cmd.clear();
944
945        // Everything should be reset, but the capacity should still be there
946        assert!(cmd.data.is_empty());
947        assert!(cmd.data.capacity() > 0);
948        assert!(cmd.args.is_empty());
949        assert!(cmd.args.capacity() > 0);
950        assert_eq!(cmd.cursor, None);
951        assert!(!cmd.no_response);
952        assert!(cmd.cache.is_none());
953    }
954
955    #[test]
956    fn test_cmd_writer_for_next_arg() {
957        // Test that a write split across multiple calls to `write` produces the
958        // same result as a single call to `write_arg`
959        let mut c1 = Cmd::new();
960        {
961            let mut c1_writer = c1.writer_for_next_arg();
962            c1_writer.write_all(b"foo").unwrap();
963            c1_writer.write_all(b"bar").unwrap();
964            c1_writer.flush().unwrap();
965        }
966        let v1 = c1.get_packed_command();
967
968        let mut c2 = Cmd::new();
969        c2.write_arg(b"foobar");
970        let v2 = c2.get_packed_command();
971
972        assert_eq!(v1, v2);
973    }
974
975    // Test that multiple writers to the same command produce the same
976    // result as the same multiple calls to `write_arg`
977    #[test]
978    fn test_cmd_writer_for_next_arg_multiple() {
979        let mut c1 = Cmd::new();
980        {
981            let mut c1_writer = c1.writer_for_next_arg();
982            c1_writer.write_all(b"foo").unwrap();
983            c1_writer.write_all(b"bar").unwrap();
984            c1_writer.flush().unwrap();
985        }
986        {
987            let mut c1_writer = c1.writer_for_next_arg();
988            c1_writer.write_all(b"baz").unwrap();
989            c1_writer.write_all(b"qux").unwrap();
990            c1_writer.flush().unwrap();
991        }
992        let v1 = c1.get_packed_command();
993
994        let mut c2 = Cmd::new();
995        c2.write_arg(b"foobar");
996        c2.write_arg(b"bazqux");
997        let v2 = c2.get_packed_command();
998
999        assert_eq!(v1, v2);
1000    }
1001
1002    // Test that an "empty" write produces the equivalent to `write_arg(b"")`
1003    #[test]
1004    fn test_cmd_writer_for_next_arg_empty() {
1005        let mut c1 = Cmd::new();
1006        {
1007            let mut c1_writer = c1.writer_for_next_arg();
1008            c1_writer.flush().unwrap();
1009        }
1010        let v1 = c1.get_packed_command();
1011
1012        let mut c2 = Cmd::new();
1013        c2.write_arg(b"");
1014        let v2 = c2.get_packed_command();
1015
1016        assert_eq!(v1, v2);
1017    }
1018
1019    #[cfg(feature = "bytes")]
1020    /// Test that a write split across multiple calls to `write` produces the
1021    /// same result as a single call to `write_arg`
1022    #[test]
1023    fn test_cmd_bufmut_for_next_arg() {
1024        let mut c1 = Cmd::new();
1025        {
1026            let mut c1_writer = c1.bufmut_for_next_arg(6);
1027            c1_writer.put_slice(b"foo");
1028            c1_writer.put_slice(b"bar");
1029        }
1030        let v1 = c1.get_packed_command();
1031
1032        let mut c2 = Cmd::new();
1033        c2.write_arg(b"foobar");
1034        let v2 = c2.get_packed_command();
1035
1036        assert_eq!(v1, v2);
1037    }
1038
1039    #[cfg(feature = "bytes")]
1040    /// Test that multiple writers to the same command produce the same
1041    /// result as the same multiple calls to `write_arg`
1042    #[test]
1043    fn test_cmd_bufmut_for_next_arg_multiple() {
1044        let mut c1 = Cmd::new();
1045        {
1046            let mut c1_writer = c1.bufmut_for_next_arg(6);
1047            c1_writer.put_slice(b"foo");
1048            c1_writer.put_slice(b"bar");
1049        }
1050        {
1051            let mut c1_writer = c1.bufmut_for_next_arg(6);
1052            c1_writer.put_slice(b"baz");
1053            c1_writer.put_slice(b"qux");
1054        }
1055        let v1 = c1.get_packed_command();
1056
1057        let mut c2 = Cmd::new();
1058        c2.write_arg(b"foobar");
1059        c2.write_arg(b"bazqux");
1060        let v2 = c2.get_packed_command();
1061
1062        assert_eq!(v1, v2);
1063    }
1064
1065    #[cfg(feature = "bytes")]
1066    /// Test that an "empty" write produces the equivalent to `write_arg(b"")`
1067    #[test]
1068    fn test_cmd_bufmut_for_next_arg_empty() {
1069        let mut c1 = Cmd::new();
1070        {
1071            let _c1_writer = c1.bufmut_for_next_arg(0);
1072        }
1073        let v1 = c1.get_packed_command();
1074
1075        let mut c2 = Cmd::new();
1076        c2.write_arg(b"");
1077        let v2 = c2.get_packed_command();
1078
1079        assert_eq!(v1, v2);
1080    }
1081
1082    #[test]
1083    #[cfg(feature = "cluster")]
1084    fn test_cmd_arg_idx() {
1085        let mut c = Cmd::new();
1086        assert_eq!(c.arg_idx(0), None);
1087
1088        c.arg("SET");
1089        assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
1090        assert_eq!(c.arg_idx(1), None);
1091
1092        c.arg("foo").arg("42");
1093        assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
1094        assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
1095        assert_eq!(c.arg_idx(3), None);
1096        assert_eq!(c.arg_idx(4), None);
1097    }
1098}