darkredis/
connection.rs

1use crate::{Command, CommandList, DataType, Error, Result, Value};
2use futures::{future::BoxFuture, FutureExt};
3
4#[cfg(feature = "runtime_async_std")]
5use async_std::{
6    io,
7    net::{TcpStream, ToSocketAddrs},
8    sync::Mutex,
9};
10#[cfg(feature = "runtime_async_std")]
11use futures::{AsyncReadExt, AsyncWriteExt};
12
13#[cfg(feature = "runtime_tokio")]
14use tokio::{
15    io::{self, AsyncReadExt, AsyncWriteExt},
16    net::{TcpStream, ToSocketAddrs},
17    sync::Mutex,
18};
19
20use std::sync::Arc;
21
22pub mod builder;
23pub mod scan;
24pub mod stream;
25pub use scan::{HScanBuilder, HScanStream, ScanBuilder, ScanStream};
26pub use stream::{Message, MessageStream, PMessage, PMessageStream, ResponseStream};
27
28use builder::MSetBuilder;
29
30#[cfg(test)]
31mod test;
32
33macro_rules! check_slice_not_empty {
34    ($slice:ident) => {
35        if $slice.is_empty() {
36            return Err(Error::EmptySlice);
37        }
38    };
39}
40
41async fn read_until(r: &mut TcpStream, byte: u8) -> io::Result<Vec<u8>> {
42    let mut buffer = Vec::new();
43    let mut single = [0; 1];
44    loop {
45        r.read(&mut single).await?;
46        buffer.push(single[0]);
47        if single[0] == byte {
48            return Ok(buffer);
49        }
50    }
51}
52
53///A connection to Redis. Copying is cheap as the inner type is a simple, futures-aware, `Arc<Mutex>`, and will
54///not create a new connection. Use a [`ConnectionPool`](struct.ConnectionPool.html) if you want to use pooled connections.
55///Alternatively, there's the `deadpool-darkredis` crate.
56///Every convenience function can work with any kind of data as long as it can be converted into bytes.
57///Check the [Redis command reference](https://redis.io/commands) for in-depth explanations of each command.
58#[derive(Clone, Debug)]
59pub struct Connection {
60    pub(crate) stream: Arc<Mutex<TcpStream>>,
61}
62
63impl Connection {
64    ///Connect to a Redis instance running at `address`. If you wish to name this connection, run the [`CLIENT SETNAME`](https://redis.io/commands/client-setname) command.
65    pub async fn connect<A>(address: A) -> Result<Self>
66    where
67        A: ToSocketAddrs,
68    {
69        let stream = Arc::new(Mutex::new(
70            TcpStream::connect(address)
71                .await
72                .map_err(Error::ConnectionFailed)?,
73        ));
74
75        Ok(Self { stream })
76    }
77
78    ///Connect to a Redis instance running at `address`, and authenticate using `password`.
79    pub async fn connect_and_auth<A, P>(address: A, password: P) -> Result<Self>
80    where
81        A: ToSocketAddrs,
82        P: AsRef<[u8]>,
83    {
84        let mut out = Self::connect(address).await?;
85        out.run_command(Command::new("AUTH").arg(&password)).await?;
86
87        Ok(out)
88    }
89
90    async fn parse_simple_value(buf: &[u8]) -> Result<Value> {
91        match buf[0] {
92            b'+' => {
93                if buf == b"+OK\r\n" {
94                    Ok(Value::Ok)
95                } else {
96                    Ok(Value::String(buf[1..].into()))
97                }
98            }
99            b'-' => Err(Error::RedisError(
100                String::from_utf8_lossy(&buf[1..]).to_string(),
101            )),
102            b':' => {
103                let string = String::from_utf8_lossy(&buf[1..]);
104                let num = string.trim().parse::<isize>().unwrap();
105                Ok(Value::Integer(num))
106            }
107            _ => Err(Error::UnexpectedResponse(
108                String::from_utf8_lossy(buf).to_string(),
109            )),
110        }
111    }
112
113    async fn parse_string(start: &[u8], stream: &mut TcpStream) -> Result<Value> {
114        if start == b"$-1\r\n" {
115            Ok(Value::Nil)
116        } else {
117            let num = String::from_utf8_lossy(&start[1..])
118                .trim()
119                .parse::<usize>()
120                .unwrap();
121            let mut buf = vec![0u8; num + 2]; // add two to catch the final \r\n from Redis
122            stream.read_exact(&mut buf).await?;
123
124            buf.pop(); //Discard the last \r\n
125            buf.pop();
126            Ok(Value::String(buf))
127        }
128    }
129
130    fn parse_array<'a>(start: &'a [u8], stream: &'a mut TcpStream) -> BoxFuture<'a, Result<Value>> {
131        async move {
132            let num_parsed = String::from_utf8_lossy(&start[1..])
133                .trim()
134                .parse::<i32>()
135                .unwrap();
136
137            // result can be negative (blpop/brpop return '-1' on timeout)
138            if num_parsed < 0 {
139                return Ok(Value::Nil);
140            }
141
142            let num = num_parsed as usize;
143            let mut values = Vec::with_capacity(num);
144
145            for _ in 0..num {
146                let buf = read_until(stream, b'\n').await?;
147                match buf[0] {
148                    b'+' | b'-' | b':' => values.push(Self::parse_simple_value(&buf).await?),
149                    b'$' => values.push(Self::parse_string(&buf, stream).await?),
150                    b'*' => values.push(Self::parse_array(&buf, stream).await?),
151                    _ => {
152                        return Err(Error::UnexpectedResponse(
153                            String::from_utf8_lossy(&buf).to_string(),
154                        ))
155                    }
156                }
157            }
158
159            Ok(Value::Array(values))
160        }
161        .boxed()
162    }
163
164    //Read a value from the connection.
165    pub(crate) async fn read_value(mut stream: &mut TcpStream) -> Result<Value> {
166        let buf = read_until(&mut stream, b'\n').await?;
167        match buf[0] {
168            b'+' | b'-' | b':' => Self::parse_simple_value(&buf).await,
169            b'$' => Self::parse_string(&buf, &mut stream).await,
170            b'*' => Self::parse_array(&buf, &mut stream).await,
171            _ => Err(Error::UnexpectedResponse(
172                String::from_utf8_lossy(&buf).to_string(),
173            )),
174        }
175    }
176
177    ///Run a single command on this connection.
178    #[inline]
179    pub async fn run_command(&mut self, command: Command<'_>) -> Result<Value> {
180        let mut buffer = Vec::new();
181
182        self.run_command_with_buffer(command, &mut buffer).await
183    }
184
185    ///Run a single command on this connection, using `buffer` for serialization.
186    ///See [`run_commands_with_buffer`](struct.Connection.html#method.run_commands_with_buffer) for more details.
187    pub async fn run_command_with_buffer(
188        &mut self,
189        command: Command<'_>,
190        buffer: &mut Vec<u8>,
191    ) -> Result<Value> {
192        let mut stream = self.stream.lock().await;
193        command.serialize(buffer);
194        stream.write_all(&buffer).await?;
195
196        Ok(Self::read_value(&mut stream).await?)
197    }
198
199    ///Run a series of commands on this connection, returning a stream of the results.
200    #[inline]
201    pub async fn run_commands(&mut self, command: CommandList<'_>) -> Result<ResponseStream> {
202        let mut buffer = Vec::new();
203        self.run_commands_with_buffer(command, &mut buffer).await
204    }
205
206    ///Run a series of commands on this connection, using `buffer` for serialization.
207    ///This prevents allocations as long as `buffer` is large enough from before. The
208    ///buffer will be empty when this function returns.
209    pub async fn run_commands_with_buffer(
210        &mut self,
211        command: CommandList<'_>,
212        buf: &mut Vec<u8>,
213    ) -> Result<ResponseStream> {
214        buf.clear();
215        let mut lock = self.stream.lock().await;
216        let command_count = command.command_count();
217        command.serialize(buf);
218        lock.write_all(&buf).await?;
219        buf.clear();
220
221        Ok(ResponseStream::new(command_count, self.stream.clone()))
222    }
223
224    ///Delete `field` from the hash set stored at `key`.
225    ///# Return value
226    ///`true` when the field was deleted, `false` if it didn't exist
227    pub async fn hdel<K, F>(&mut self, key: K, field: F) -> Result<bool>
228    where
229        K: AsRef<[u8]>,
230        F: AsRef<[u8]>,
231    {
232        self.run_command(Command::new("HDEL").arg(&key).arg(&field))
233            .await
234            .map(|v| v.unwrap_bool())
235    }
236
237    ///Delete every field in `fields` from the hash set stored at `key`.
238    ///# Return value
239    ///The number of deleted fields.
240    pub async fn hdel_slice<K, F>(&mut self, key: K, fields: &[F]) -> Result<isize>
241    where
242        K: AsRef<[u8]>,
243        F: AsRef<[u8]>,
244    {
245        check_slice_not_empty!(fields);
246
247        self.run_command(Command::new("HDEL").arg(&key).args(&fields))
248            .await
249            .map(|v| v.unwrap_integer())
250    }
251
252    ///Check if `field` exists in the hash set `key`.
253    pub async fn hexists<K, F>(&mut self, key: K, field: F) -> Result<bool>
254    where
255        K: AsRef<[u8]>,
256        F: AsRef<[u8]>,
257    {
258        self.run_command(Command::new("HEXISTS").arg(&key).arg(&field))
259            .await
260            .map(|v| v.unwrap_bool())
261    }
262
263    ///Get the value of `field` in the hash set at `key`.
264    pub async fn hget<K, F>(&mut self, key: K, field: F) -> Result<Option<Vec<u8>>>
265    where
266        K: AsRef<[u8]>,
267        F: AsRef<[u8]>,
268    {
269        self.run_command(Command::new("HGET").arg(&key).arg(&field))
270            .await
271            .map(|v| v.optional_string())
272    }
273
274    ///Set the value of `field` in the hash set stored at `key` to `Value`.
275    ///# Return value
276    ///The number of added fields(will be 1 if `field` was created, 0 if it already existed).
277    pub async fn hset<K, F, V>(&mut self, key: K, field: F, value: V) -> Result<isize>
278    where
279        K: AsRef<[u8]>,
280        F: AsRef<[u8]>,
281        V: AsRef<[u8]>,
282    {
283        self.run_command(Command::new("HSET").arg(&key).arg(&field).arg(&value))
284            .await
285            .map(|v| v.unwrap_integer())
286    }
287
288    ///Set the value of `field` in the hash set stored at `key` to `Value`. If `field`
289    ///already exists, do nothing.
290    ///# Return value
291    ///`true` if `field` was set, `false` otherwise.
292    pub async fn hsetnx<K, F, V>(&mut self, key: K, field: F, value: V) -> Result<bool>
293    where
294        K: AsRef<[u8]>,
295        F: AsRef<[u8]>,
296        V: AsRef<[u8]>,
297    {
298        self.run_command(Command::new("HSETNX").arg(&key).arg(&field).arg(&value))
299            .await
300            .map(|v| v.unwrap_bool())
301    }
302
303    ///Set fields in the hash set at `key` to their values in `builder`. See
304    ///[`Connection::mset`](struct.Connection.html#method.mset) for more information.
305    ///# Return value
306    ///The number of added fields.
307    pub async fn hset_many<K>(&mut self, key: K, builder: MSetBuilder<'_>) -> Result<isize>
308    where
309        K: AsRef<[u8]>,
310    {
311        let mut command = Command::new("HSET").arg(&key);
312        command.append_msetbuilder(&builder);
313
314        self.run_command(command).await.map(|v| v.unwrap_integer())
315    }
316
317    ///Increment `field` in the hash set `key` by `val`.
318    ///# Return value
319    ///The field value after the increment.
320    pub async fn hincrby<K, F>(&mut self, key: K, field: F, val: isize) -> Result<isize>
321    where
322        K: AsRef<[u8]>,
323        F: AsRef<[u8]>,
324    {
325        let val = val.to_string();
326        self.run_command(Command::new("HINCRBY").arg(&key).arg(&field).arg(&val))
327            .await
328            .map(|v| v.unwrap_integer())
329    }
330
331    ///Increment `field` in the hash set `key` by `val`, floating point version.
332    ///# Return value
333    ///The field value after the increment.
334    pub async fn hincrbyfloat<K, F>(&mut self, key: K, field: F, val: f64) -> Result<f64>
335    where
336        K: AsRef<[u8]>,
337        F: AsRef<[u8]>,
338    {
339        let val = val.to_string();
340        let command = Command::new("HINCRBYFLOAT").arg(&key).arg(&field).arg(&val);
341        let result = self.run_command(command).await?.unwrap_string();
342        Ok(String::from_utf8_lossy(&result).parse::<f64>().unwrap())
343    }
344
345    ///Get the name of each hash field stored at `key`.
346    pub async fn hkeys<K>(&mut self, key: K) -> Result<Vec<Vec<u8>>>
347    where
348        K: AsRef<[u8]>,
349    {
350        self.run_command(Command::new("HKEYS").arg(&key))
351            .await
352            .map(|v| v.unwrap_string_array())
353    }
354
355    ///Get the number of fields in the hash stored at `key`.
356    pub async fn hlen<K>(&mut self, key: K) -> Result<isize>
357    where
358        K: AsRef<[u8]>,
359    {
360        self.run_command(Command::new("HLEN").arg(&key))
361            .await
362            .map(|v| v.unwrap_integer())
363    }
364
365    ///Get the number of bytes in `field` in the hash set `key`
366    pub async fn hstrlen<K, F>(&mut self, key: K, field: F) -> Result<isize>
367    where
368        K: AsRef<[u8]>,
369        F: AsRef<[u8]>,
370    {
371        self.run_command(Command::new("HSTRLEN").arg(&key).arg(&field))
372            .await
373            .map(|v| v.unwrap_integer())
374    }
375
376    ///Get the value of each field in the hash field stored at `key`.
377    pub async fn hvals<K>(&mut self, key: K) -> Result<Vec<Value>>
378    where
379        K: AsRef<[u8]>,
380    {
381        self.run_command(Command::new("HVALS").arg(&key))
382            .await
383            .map(|v| v.unwrap_array())
384    }
385
386    ///Send a `PING` to the server, returning Ok(()) on success.
387    pub async fn ping(&mut self) -> Result<()> {
388        self.run_command(Command::new("PING")).await.map(|_| ())
389    }
390
391    ///Consume `self`, and subscribe to `channels`, returning a stream of [`Message's`](struct.Message.html). As of now, there's no way to get the connection back, nor change the subscribed topics.
392    pub async fn subscribe<K>(mut self, channels: &[K]) -> Result<stream::MessageStream>
393    where
394        K: AsRef<[u8]>,
395    {
396        let command = Command::new("SUBSCRIBE").args(channels);
397
398        //TODO: Find out if we care about the values given here
399        let _ = self.run_command(command).await?;
400        {
401            let mut stream = self.stream.lock().await;
402            for _ in 0..channels.len() - 1 {
403                let response = Self::read_value(&mut stream).await?;
404                assert_eq!(
405                    response.unwrap_array()[0],
406                    Value::String("subscribe".into())
407                );
408            }
409        }
410
411        Ok(stream::MessageStream::new(self))
412    }
413
414    ///Exactly like [`subscribe`](struct.Connection.html#method.subscribe), but subscribe to channels
415    ///matching patterns instead.
416    pub async fn psubscribe<K>(mut self, patterns: &[K]) -> Result<stream::PMessageStream>
417    where
418        K: AsRef<[u8]>,
419    {
420        let command = Command::new("PSUBSCRIBE").args(patterns);
421
422        //TODO: Find out if we care about the values given here
423        let _ = self.run_command(command).await?;
424        {
425            let mut stream = self.stream.lock().await;
426            for _ in 0..patterns.len() - 1 {
427                let response = Self::read_value(&mut stream).await?;
428                assert_eq!(
429                    response.unwrap_array()[0],
430                    Value::String("psubscribe".into())
431                );
432            }
433        }
434
435        Ok(stream::PMessageStream::new(self))
436    }
437
438    ///Publish `message` to `channel`.
439    ///# Return Value
440    ///Returns how many clients received the message.
441    pub async fn publish<C, M>(&mut self, channel: C, message: M) -> Result<isize>
442    where
443        C: AsRef<[u8]>,
444        M: AsRef<[u8]>,
445    {
446        let command = Command::new("PUBLISH").arg(&channel).arg(&message);
447        self.run_command(command).await.map(|i| i.unwrap_integer())
448    }
449
450    ///Set `key` to `value`.
451    pub async fn set<K, D>(&mut self, key: K, value: D) -> Result<()>
452    where
453        K: AsRef<[u8]>,
454        D: AsRef<[u8]>,
455    {
456        let command = Command::new("SET").arg(&key).arg(&value);
457
458        self.run_command(command).await.map(|_| ())
459    }
460
461    ///Set the key `key` to `data`, and set it to expire after `seconds` seconds.
462    pub async fn set_and_expire_seconds<K, D>(
463        &mut self,
464        key: K,
465        data: D,
466        seconds: u32,
467    ) -> Result<()>
468    where
469        K: AsRef<[u8]>,
470        D: AsRef<[u8]>,
471    {
472        let seconds = seconds.to_string();
473        let command = Command::new("SET")
474            .arg(&key)
475            .arg(&data)
476            .arg(b"EX")
477            .arg(&seconds);
478
479        self.run_command(command).await.map(|_| ())
480    }
481
482    ///Set the key `key` to `data`, and set it to expire after `milliseconds` ms.
483    pub async fn set_and_expire_ms<K, D>(
484        &mut self,
485        key: K,
486        data: D,
487        milliseconds: u32,
488    ) -> Result<()>
489    where
490        K: AsRef<[u8]>,
491        D: AsRef<[u8]>,
492    {
493        let milliseconds = milliseconds.to_string();
494        let command = Command::new("SET")
495            .arg(&key)
496            .arg(&data)
497            .arg(b"PX")
498            .arg(&milliseconds);
499
500        self.run_command(command).await.map(|_| ())
501    }
502
503    ///Set `key` to expire `seconds` seconds from now.
504    pub async fn expire_seconds<K>(&mut self, key: K, seconds: u32) -> Result<isize>
505    where
506        K: AsRef<[u8]>,
507    {
508        let seconds = seconds.to_string();
509        let command = Command::new("EXPIRE").arg(&key).arg(&seconds);
510
511        self.run_command(command).await.map(|i| i.unwrap_integer())
512    }
513
514    ///Set `key` to expire `milliseconds` ms from now.
515    pub async fn expire_ms<K>(&mut self, key: K, seconds: u32) -> Result<isize>
516    where
517        K: AsRef<[u8]>,
518    {
519        let seconds = seconds.to_string();
520        let command = Command::new("PEXPIRE").arg(&key).arg(&seconds);
521
522        self.run_command(command).await.map(|i| i.unwrap_integer())
523    }
524
525    ///Set `key` to expire at Unix timestamp `timestamp`, measured in seconds.
526    pub async fn expire_at_seconds<K>(&mut self, key: K, timestamp: u64) -> Result<isize>
527    where
528        K: AsRef<[u8]>,
529    {
530        let timestamp = timestamp.to_string();
531        let command = Command::new("EXPIREAT").arg(&key).arg(&timestamp);
532
533        self.run_command(command).await.map(|i| i.unwrap_integer())
534    }
535
536    ///Set `key` to expire at Unix timestamp `timestamp`, measured in milliseconds.
537    pub async fn expire_at_ms<K>(&mut self, key: K, timestamp: u64) -> Result<isize>
538    where
539        K: AsRef<[u8]>,
540    {
541        let timestamp = timestamp.to_string();
542        let command = Command::new("PEXPIREAT").arg(&key).arg(&timestamp);
543
544        self.run_command(command).await.map(|i| i.unwrap_integer())
545    }
546
547    ///Delete `key`.
548    ///# Return value
549    ///The number of deleted keys.
550    pub async fn del<K>(&mut self, key: K) -> Result<bool>
551    where
552        K: AsRef<[u8]>,
553    {
554        let command = Command::new("DEL").arg(&key);
555        self.run_command(command).await.map(|i| i.unwrap_bool())
556    }
557
558    ///Delete every element in `keys`.
559    ///# Return value
560    ///The number of deleted keys.
561    pub async fn del_slice<K>(&mut self, keys: &[K]) -> Result<isize>
562    where
563        K: AsRef<[u8]>,
564    {
565        check_slice_not_empty!(keys);
566        let command = Command::new("DEL").args(&keys);
567        self.run_command(command).await.map(|i| i.unwrap_integer())
568    }
569
570    ///Get the value of `key`.
571    pub async fn get<K>(&mut self, key: K) -> Result<Option<Vec<u8>>>
572    where
573        K: AsRef<[u8]>,
574    {
575        let command = Command::new("GET").arg(&key);
576
577        Ok(self.run_command(command).await?.optional_string())
578    }
579
580    ///Push a value to `list` from the left.
581    ///# Return value
582    ///The number of elements in `list`
583    pub async fn lpush<K, V>(&mut self, list: K, value: V) -> Result<isize>
584    where
585        K: AsRef<[u8]>,
586        V: AsRef<[u8]>,
587    {
588        let command = Command::new("LPUSH").arg(&list).arg(&value);
589
590        Ok(self.run_command(command).await?.unwrap_integer())
591    }
592
593    ///Like [`lpush`](struct.Connection.html#method.lpush), but push multiple values.
594    pub async fn lpush_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
595    where
596        K: AsRef<[u8]>,
597        V: AsRef<[u8]>,
598    {
599        check_slice_not_empty!(values);
600        let command = Command::new("LPUSH").arg(&key).args(values);
601
602        Ok(self.run_command(command).await?.unwrap_integer())
603    }
604
605    ///Push a value to `list` from the right.
606    ///# Return value
607    ///The number of elements in `list`
608    pub async fn rpush<K, V>(&mut self, list: K, value: V) -> Result<isize>
609    where
610        K: AsRef<[u8]>,
611        V: AsRef<[u8]>,
612    {
613        let command = Command::new("RPUSH").arg(&list).arg(&value);
614
615        Ok(self.run_command(command).await?.unwrap_integer())
616    }
617
618    ///Like [`rpush`](struct.Connection.html#method.rpush), but push multiple values through a slice.
619    pub async fn rpush_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
620    where
621        K: AsRef<[u8]>,
622        V: AsRef<[u8]>,
623    {
624        check_slice_not_empty!(values);
625        let command = Command::new("RPUSH").arg(&key).args(values);
626
627        Ok(self.run_command(command).await?.unwrap_integer())
628    }
629
630    ///Pop a value from a list from the left side.
631    ///# Return value
632    ///The value popped from `list`
633    pub async fn lpop<K>(&mut self, list: K) -> Result<Option<Vec<u8>>>
634    where
635        K: AsRef<[u8]>,
636    {
637        let command = Command::new("LPOP").arg(&list);
638
639        Ok(self.run_command(command).await?.optional_string())
640    }
641
642    ///Pop a value from a list from the right side.
643    ///# Return value
644    ///The value popped from `list`
645    pub async fn rpop<K>(&mut self, list: K) -> Result<Option<Vec<u8>>>
646    where
647        K: AsRef<[u8]>,
648    {
649        let command = Command::new("RPOP").arg(&list);
650
651        Ok(self.run_command(command).await?.optional_string())
652    }
653
654    ///Pop a value from one of the lists from the left side.
655    ///Block timeout seconds when there are no values to pop. A zero-value with block infinitely.
656    ///# Return value
657    ///* `Ok(Some((list,value)))`: name of the list and corresponding value
658    ///* `Ok(None)`: timeout (no values)
659    ///* `Err(err)`: there was an error
660    pub async fn blpop<K>(
661        &mut self,
662        lists: &[K],
663        timeout: u32,
664    ) -> Result<Option<(Vec<u8>, Vec<u8>)>>
665    where
666        K: AsRef<[u8]>,
667    {
668        self.blpop_brpop(lists, timeout, "BLPOP").await
669    }
670
671    ///Pop a value from one of the lists from the right side.
672    ///Block timeout seconds when there are no values to pop. A zero-value will block infinitely.
673    ///# Return value
674    ///* `Ok(Some((list,value)))`: name of the list and corresponding value
675    ///* `Ok(None)`: timeout (no values)
676    ///* `Err(err)`: there was an error
677    pub async fn brpop<K>(
678        &mut self,
679        lists: &[K],
680        timeout: u32,
681    ) -> Result<Option<(Vec<u8>, Vec<u8>)>>
682    where
683        K: AsRef<[u8]>,
684    {
685        self.blpop_brpop(lists, timeout, "BRPOP").await
686    }
687
688    ///blpop and brpop common code
689    async fn blpop_brpop<K>(
690        &mut self,
691        lists: &[K],
692        timeout: u32,
693        redis_cmd: &str,
694    ) -> Result<Option<(Vec<u8>, Vec<u8>)>>
695    where
696        K: AsRef<[u8]>,
697    {
698        let timeout = timeout.to_string();
699        let command = Command::new(redis_cmd).args(&lists).arg(&timeout);
700        match self.run_command(command).await? {
701            Value::Array(values) => {
702                let vlen = values.len();
703                if vlen == 2 {
704                    let mut v = values.into_iter().map(|s| s.unwrap_string());
705                    return Ok(Some(
706                        (v.next().unwrap(), v.next().unwrap()), // values.into_iter().map(|s| s.unwrap_string()).collect(),
707                    ));
708                }
709                Err(Error::UnexpectedResponse(format!(
710                    "{}: wrong number of elements received: {}",
711                    redis_cmd, vlen
712                )))
713            }
714            Value::Nil => Ok(None),
715            other => Err(Error::UnexpectedResponse(format!(
716                "{}: {:?}",
717                redis_cmd, other
718            ))),
719        }
720    }
721
722    ///Get a series of elements from `list`, from index `from` to `to`. If they are negative, take the
723    ///index from the right side of the list.
724    pub async fn lrange<K>(&mut self, list: K, from: isize, to: isize) -> Result<Vec<Vec<u8>>>
725    where
726        K: AsRef<[u8]>,
727    {
728        let from = from.to_string();
729        let to = to.to_string();
730        let command = Command::new("LRANGE").arg(&list).arg(&from).arg(&to);
731
732        Ok(self
733            .run_command(command)
734            .await?
735            .unwrap_array()
736            .into_iter()
737            .map(|s| s.unwrap_string())
738            .collect())
739    }
740
741    ///Get the number of elements in `list`, or `None` if the list doesn't exist.
742    pub async fn llen<K>(&mut self, list: K) -> Result<Option<isize>>
743    where
744        K: AsRef<[u8]>,
745    {
746        let command = Command::new("LLEN").arg(&list);
747        Ok(self.run_command(command).await?.optional_integer())
748    }
749
750    ///Set the value of the element at `index` in `list` to `value`.
751    pub async fn lset<K, V>(&mut self, list: K, index: usize, value: V) -> Result<()>
752    where
753        K: AsRef<[u8]>,
754        V: AsRef<[u8]>,
755    {
756        let index = index.to_string();
757        let command = Command::new("LSET").arg(&list).arg(&index).arg(&value);
758
759        self.run_command(command).await?;
760        Ok(())
761    }
762
763    ///Trim `list` from `start` to `stop`.
764    pub async fn ltrim<K>(&mut self, list: K, start: usize, stop: usize) -> Result<()>
765    where
766        K: AsRef<[u8]>,
767    {
768        let start = start.to_string();
769        let stop = stop.to_string();
770        let command = Command::new("LTRIM").arg(&list).arg(&start).arg(&stop);
771        self.run_command(command).await?;
772
773        Ok(())
774    }
775
776    ///Increment `key` by one.
777    ///# Return value
778    ///The new value of `key`.
779    pub async fn incr<K>(&mut self, key: K) -> Result<isize>
780    where
781        K: AsRef<[u8]>,
782    {
783        let command = Command::new("INCR").arg(&key);
784        Ok(self.run_command(command).await?.unwrap_integer())
785    }
786
787    ///Increment `key` by `val`.
788    ///# Return value
789    ///The new value of `key`
790    pub async fn incrby<K>(&mut self, key: K, val: isize) -> Result<isize>
791    where
792        K: AsRef<[u8]>,
793    {
794        let val = val.to_string();
795        let command = Command::new("INCRBY").arg(&key).arg(&val);
796        Ok(self.run_command(command).await?.unwrap_integer())
797    }
798
799    ///Increment `key` by a floating point value `val`.
800    ///# Return value
801    ///The new value of `key`
802    pub async fn incrbyfloat<K>(&mut self, key: K, val: f64) -> Result<f64>
803    where
804        K: AsRef<[u8]>,
805    {
806        let val = val.to_string();
807        let command = Command::new("INCRBYFLOAT").arg(&key).arg(&val);
808        let result = self.run_command(command).await?.unwrap_string();
809        Ok(String::from_utf8_lossy(&result).parse::<f64>().unwrap())
810    }
811
812    ///Decrement `key` by a floating point value `val`.
813    ///# Return value
814    ///The new value of `key`
815    pub async fn decr<K>(&mut self, key: K) -> Result<isize>
816    where
817        K: AsRef<[u8]>,
818    {
819        let command = Command::new("DECR").arg(&key);
820        Ok(self.run_command(command).await?.unwrap_integer())
821    }
822
823    ///Decrement `key` by `val`.
824    ///# Return value
825    ///The new value of `key`
826    pub async fn decrby<K>(&mut self, key: K, val: isize) -> Result<isize>
827    where
828        K: AsRef<[u8]>,
829    {
830        let val = val.to_string();
831        let command = Command::new("DECRBY").arg(&key).arg(&val);
832        Ok(self.run_command(command).await?.unwrap_integer())
833    }
834
835    ///Append a string `val` to `key`.
836    ///# Return value
837    ///The new size of `key`
838    pub async fn append<K, V>(&mut self, key: K, val: V) -> Result<isize>
839    where
840        K: AsRef<[u8]>,
841        V: AsRef<[u8]>,
842    {
843        let command = Command::new("APPEND").arg(&key).arg(&val);
844        Ok(self.run_command(command).await?.unwrap_integer())
845    }
846
847    ///Get the string value for every `key`, or `None`` if it doesn't exist
848    pub async fn mget<K>(&mut self, keys: &[K]) -> Result<Vec<Option<Vec<u8>>>>
849    where
850        K: AsRef<[u8]>,
851    {
852        let command = Command::new("MGET").args(&keys);
853        let result = self.run_command(command).await?.unwrap_array();
854        let output: Vec<Option<Vec<u8>>> =
855            result.into_iter().map(|r| r.optional_string()).collect();
856        Ok(output)
857    }
858
859    ///Set every key in `builder` to their respective values.
860    ///# Example
861    ///```
862    /// use darkredis::{Connection, MSetBuilder};
863    ///# #[cfg_attr(feature = "runtime_tokio", tokio::main)]
864    ///# #[cfg_attr(feature = "runtime_async_std", async_std::main)]
865    /// # async fn main() {
866    /// let mut connection = Connection::connect("127.0.0.1:6379").await.unwrap();
867    /// let builder = MSetBuilder::new()
868    ///     .set(b"multi-key-1", b"foo")
869    ///     .set(b"multi-key-2", b"bar")
870    ///     .set(b"multi-key-3", b"baz");
871    /// connection.mset(builder).await.unwrap();
872    /// let keys = &[b"multi-key-1", b"multi-key-2", b"multi-key-3"];
873    /// let results = connection.mget(keys).await.unwrap();
874    /// assert_eq!(
875    ///     results,
876    ///     vec![
877    ///         Some(b"foo".to_vec()),
878    ///         Some(b"bar".to_vec()),
879    ///         Some(b"baz".to_vec())
880    ///     ]
881    /// );
882    /// # connection.del_slice(keys).await.unwrap();
883    /// # }
884    ///```
885
886    pub async fn mset(&mut self, builder: MSetBuilder<'_>) -> Result<()>
887where {
888        let mut command = Command::new("MSET");
889        command.append_msetbuilder(&builder);
890        self.run_command(command).await?;
891        Ok(())
892    }
893
894    ///Returns true if a key has been previously set.
895    pub async fn exists<K>(&mut self, key: K) -> Result<bool>
896    where
897        K: AsRef<[u8]>,
898    {
899        let command = Command::new("EXISTS").arg(&key);
900        Ok(self.run_command(command).await? == Value::Integer(1))
901    }
902
903    ///Adds new `value` to set specified by `key`.
904    pub async fn sadd<K, V>(&mut self, key: K, value: V) -> Result<bool>
905    where
906        K: AsRef<[u8]>,
907        V: AsRef<[u8]>,
908    {
909        let command = Command::new("SADD").arg(&key).arg(&value);
910
911        Ok(self.run_command(command).await?.unwrap_bool())
912    }
913
914    ///Like [`sadd`](struct.Connection.html#method.sadd), but push multiple values.
915    pub async fn sadd_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
916    where
917        K: AsRef<[u8]>,
918        V: AsRef<[u8]>,
919    {
920        let command = Command::new("SADD").arg(&key).args(&values);
921
922        Ok(self.run_command(command).await?.unwrap_integer())
923    }
924
925    /// Return the members of a set specified by `key`.
926    pub async fn smembers<K>(&mut self, key: K) -> Result<Vec<Vec<u8>>>
927    where
928        K: AsRef<[u8]>,
929    {
930        let command = Command::new("SMEMBERS").arg(&key);
931
932        Ok(self
933            .run_command(command)
934            .await?
935            .unwrap_array()
936            .into_iter()
937            .map(|s| s.unwrap_string())
938            .collect())
939    }
940
941    /// Returns `true` if `value` belongs to a set specified by `key`.
942    pub async fn sismember<K, V>(&mut self, key: K, value: V) -> Result<bool>
943    where
944        K: AsRef<[u8]>,
945        V: AsRef<[u8]>,
946    {
947        let command = Command::new("SISMEMBER").arg(&key).arg(&value);
948
949        Ok(self.run_command(command).await?.unwrap_bool())
950    }
951
952    ///Scan for elements in a set.
953    ///# Return value
954    ///Returns a list of the elements which matched in the set.
955    ///# Example
956    ///```
957    /// use darkredis::Connection;
958    /// use futures::StreamExt;
959    ///# #[cfg_attr(feature = "runtime_tokio", tokio::main)]
960    ///# #[cfg_attr(feature = "runtime_async_std", async_std::main)]
961    /// # async fn main() {
962    ///
963    /// let mut connection = Connection::connect("127.0.0.1:6379").await.unwrap();
964    /// let key = b"test-set".to_vec();
965    /// # connection.del(&key).await.unwrap();
966    /// connection.sadd(&key, "foo").await.unwrap();
967    /// connection.sadd(&key, "bar").await.unwrap();
968    ///
969    /// let results = connection.sscan(&key).run()
970    ///   .collect::<Vec<Vec<u8>>>().await;
971    ///
972    /// assert!(results.contains(&b"foo".to_vec()));
973    /// assert!(results.contains(&b"bar".to_vec()));
974    /// # connection.del(&key).await.unwrap();
975    /// # }
976    ///```
977    pub fn sscan<'a, K>(&'a mut self, key: &'a K) -> ScanBuilder
978    where
979        K: AsRef<[u8]>,
980    {
981        ScanBuilder::new("SSCAN", Some(key.as_ref()), self)
982    }
983
984    ///Scan for keys in the database.
985    ///# Return Value
986    /// A stream of the matching keys.
987    ///# Example
988    ///```no_run
989    /// use darkredis::Connection;
990    /// use futures::StreamExt;
991    ///# #[cfg_attr(feature = "runtime_tokio", tokio::main)]
992    ///# #[cfg_attr(feature = "runtime_async_std", async_std::main)]
993    /// # async fn main() {
994    ///
995    /// let mut connection = Connection::connect("127.0.0.1:6379").await.unwrap();
996    /// let key = b"locate-me".to_vec();
997    /// connection.set(&key, "dummy-value").await.unwrap();
998    /// let results = connection.scan().pattern(b"locate*").run()
999    ///   .collect::<Vec<Vec<u8>>>().await;
1000    ///
1001    /// assert!(results.contains(&key));
1002    /// # connection.del(&key).await.unwrap();
1003    /// # }
1004    ///```
1005    pub fn scan(&mut self) -> ScanBuilder {
1006        ScanBuilder::new("SCAN", None, self)
1007    }
1008
1009    ///Scan for fields in the hash set at `key`.
1010    ///# Example
1011    ///```
1012    /// use darkredis::Connection;
1013    /// use futures::StreamExt;
1014    ///# #[cfg_attr(feature = "runtime_tokio", tokio::main)]
1015    ///# #[cfg_attr(feature = "runtime_async_std", async_std::main)]
1016    /// # async fn main() {
1017    ///
1018    /// let mut connection = Connection::connect("127.0.0.1:6379").await.unwrap();
1019    /// let key = b"hscan_test".to_vec();
1020    /// connection.hset(&key, "one", "1").await.unwrap();
1021    /// connection.hset(&key, "two", "2").await.unwrap();
1022    /// connection.hset(&key, "three", "3").await.unwrap();
1023    /// let results = connection.hscan(&key).run()
1024    ///   .collect::<Vec<(Vec<u8>, Vec<u8>)>>().await;
1025    ///
1026    /// assert_eq!(results.len(), 3);
1027    /// assert!(results.contains(&(b"one".to_vec(), b"1".to_vec())));
1028    /// assert!(results.contains(&(b"two".to_vec(), b"2".to_vec())));
1029    /// assert!(results.contains(&(b"three".to_vec(), b"3".to_vec())));
1030    /// # connection.del(&key).await.unwrap();
1031    /// # }
1032    pub fn hscan<'a, K>(&'a mut self, key: &'a K) -> HScanBuilder<'a>
1033    where
1034        K: AsRef<[u8]>,
1035    {
1036        HScanBuilder::new(key.as_ref(), self)
1037    }
1038
1039    ///Get the Type of `key` using the `TYPE` command.
1040    pub async fn key_type<K>(&mut self, key: K) -> Result<Option<DataType>>
1041    where
1042        K: AsRef<[u8]>,
1043    {
1044        let command = Command::new("TYPE").arg(&key);
1045        let result = self.run_command(command).await?.unwrap_string();
1046        match result.as_slice() {
1047            b"string\r\n" => Ok(Some(DataType::String)),
1048            b"list\r\n" => Ok(Some(DataType::List)),
1049            b"set\r\n" => Ok(Some(DataType::Set)),
1050            b"zset\r\n" => Ok(Some(DataType::ZSet)),
1051            b"hash\r\n" => Ok(Some(DataType::Hash)),
1052            b"stream\r\n" => Ok(Some(DataType::Stream)),
1053            b"none\r\n" => Ok(None),
1054            _ => Err(Error::UnexpectedResponse(
1055                String::from_utf8_lossy(&result).to_string(),
1056            )),
1057        }
1058    }
1059
1060    ///Get the number of members in the set at `key`.
1061    pub async fn scard<K>(&mut self, key: &K) -> Result<isize>
1062    where
1063        K: AsRef<[u8]>,
1064    {
1065        let command = Command::new("SCARD").arg(&key);
1066        Ok(self.run_command(command).await?.unwrap_integer())
1067    }
1068
1069    ///Move set member `member` from `source` to `destination`.
1070    ///# Return value
1071    ///`true` if the member was moved.
1072    pub async fn smove<S, D, M>(&mut self, source: S, destination: D, member: M) -> Result<bool>
1073    where
1074        S: AsRef<[u8]>,
1075        M: AsRef<[u8]>,
1076        D: AsRef<[u8]>,
1077    {
1078        let command = Command::new("SMOVE")
1079            .arg(&source)
1080            .arg(&destination)
1081            .arg(&member);
1082        Ok(self.run_command(command).await?.unwrap_bool())
1083    }
1084
1085    ///Remove set member `member`, from the set `key`.
1086    ///# Return value
1087    ///`true` if the member was removed.
1088    pub async fn srem<K, M>(&mut self, key: K, member: M) -> Result<bool>
1089    where
1090        K: AsRef<[u8]>,
1091        M: AsRef<[u8]>,
1092    {
1093        let command = Command::new("SREM").arg(&key).arg(&member);
1094        Ok(self.run_command(command).await?.unwrap_bool())
1095    }
1096
1097    ///Remove every member in `members` from the set at `key`.
1098    ///# Return value
1099    ///The number of members which were removed.
1100    pub async fn srem_slice<K, M>(&mut self, key: K, members: &[M]) -> Result<isize>
1101    where
1102        K: AsRef<[u8]>,
1103        M: AsRef<[u8]>,
1104    {
1105        check_slice_not_empty!(members);
1106        let command = Command::new("SREM").arg(&key).args(members);
1107        Ok(self.run_command(command).await?.unwrap_integer())
1108    }
1109
1110    ///Return the difference in members between the first set and all the other sets.
1111    pub async fn sdiff<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
1112    where
1113        S: AsRef<[u8]>,
1114    {
1115        check_slice_not_empty!(sets);
1116        let command = Command::new("SDIFF").args(sets);
1117        Ok(self.run_command(command).await?.unwrap_string_array())
1118    }
1119
1120    ///Place the difference in members between the sets into `destination`.
1121    ///# Return value
1122    ///The number of elements in `destination` after the operation.
1123    pub async fn sdiffstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
1124    where
1125        D: AsRef<[u8]>,
1126        S: AsRef<[u8]>,
1127    {
1128        check_slice_not_empty!(sets);
1129        let command = Command::new("SDIFFSTORE").arg(&destination).args(sets);
1130        Ok(self.run_command(command).await?.unwrap_integer())
1131    }
1132
1133    ///Return the members which are in every set.
1134    pub async fn sinter<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
1135    where
1136        S: AsRef<[u8]>,
1137    {
1138        check_slice_not_empty!(sets);
1139        let command = Command::new("SINTER").args(sets);
1140        Ok(self.run_command(command).await?.unwrap_string_array())
1141    }
1142
1143    ///Create a new set at `destination` containing the members which are part of all sets.
1144    ///# Return value
1145    ///The number of elements in `destination` after the operation.
1146    pub async fn sinterstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
1147    where
1148        D: AsRef<[u8]>,
1149        S: AsRef<[u8]>,
1150    {
1151        check_slice_not_empty!(sets);
1152        let command = Command::new("SINTERSTORE").arg(&destination).args(sets);
1153        Ok(self.run_command(command).await?.unwrap_integer())
1154    }
1155
1156    ///Return a `count` random members of `set`. If `count` is negative, the same element can show up
1157    ///multiple times. See the [Redis documentation](https://redis.io/commands/srandmember) for more info.
1158    pub async fn srandmember<S>(&mut self, set: S, count: isize) -> Result<Vec<Vec<u8>>>
1159    where
1160        S: AsRef<[u8]>,
1161    {
1162        let count = count.to_string();
1163        let command = Command::new("SRANDMEMBER").arg(&set).arg(&count);
1164        Ok(self.run_command(command).await?.unwrap_string_array())
1165    }
1166
1167    ///Pop `count` random elements out of `set`.
1168    pub async fn spop<S>(&mut self, set: S, count: isize) -> Result<Vec<Vec<u8>>>
1169    where
1170        S: AsRef<[u8]>,
1171    {
1172        let count = count.to_string();
1173        let command = Command::new("SPOP").arg(&set).arg(&count);
1174        Ok(self.run_command(command).await?.unwrap_string_array())
1175    }
1176
1177    ///Return the union of every set in `sets`.
1178    pub async fn sunion<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
1179    where
1180        S: AsRef<[u8]>,
1181    {
1182        check_slice_not_empty!(sets);
1183        let command = Command::new("SUNION").args(sets);
1184        Ok(self.run_command(command).await?.unwrap_string_array())
1185    }
1186
1187    ///Store the union of `sets` in `destination`.
1188    ///# Return value
1189    ///The number of elements in `destination` after the operation.
1190    pub async fn sunionstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
1191    where
1192        D: AsRef<[u8]>,
1193        S: AsRef<[u8]>,
1194    {
1195        check_slice_not_empty!(sets);
1196        let command = Command::new("SUNIONSTORE").arg(&destination).args(sets);
1197        Ok(self.run_command(command).await?.unwrap_integer())
1198    }
1199}