redis_asyncx/
client.rs

1//! Redis client implementation.
2//!
3//! The clients default to RESP2 unless HELLO 3 is explicitly sent to switch to RESP3.
4//! The client is a simple wrapper around the Connection struct.
5//! It provides simple APIs to send commands to the Redis server and get the response.
6//! The client is designed to be used in an async context, using the tokio runtime.
7
8use crate::Connection;
9use crate::Frame;
10use crate::RedisError;
11use crate::Result;
12use crate::cmd::*;
13use anyhow::{Context, anyhow};
14use std::collections::HashMap;
15use std::str::from_utf8;
16use tokio::net::{TcpStream, ToSocketAddrs};
17
18#[derive(Debug)]
19pub enum Response {
20    Simple(Vec<u8>),
21    Array(Vec<Vec<u8>>),
22    Map(HashMap<String, Vec<u8>>),
23    Null,
24    Error(RedisError),
25}
26
27/// Redis client implementation.
28pub struct Client {
29    // todo: modify it to use a connection pool shared across multiple clients
30    // spawn a new connection for each client is inefficient when the number of clients is large
31    conn: Connection,
32}
33
34impl Client {
35    /// Establish a connection to the Redis server.
36    ///
37    /// # Examples
38    ///
39    /// ```ignore
40    /// use async_redis::Client;
41    ///
42    /// #[tokio::main]
43    /// async fn main() {
44    ///     let mut c = Client::connect("127.0.0.1:6379").await.unwrap();
45    /// }
46    /// ```
47    pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
48        let stream = TcpStream::connect(addr)
49            .await
50            .with_context(|| "failed to connect to Redis server")?;
51
52        let conn = Connection::new(stream);
53
54        Ok(Client { conn })
55    }
56
57    /// Sends a HELLO command to the Redis server.
58    ///
59    /// # Arguments
60    ///
61    /// * `proto` - An optional protocol version to use
62    ///
63    /// # Returns
64    ///
65    /// * `Ok(HashMap<String, Vec<u8>>)` if the HELLO command is successful
66    /// * `Err(RedisError)` if an error occurs
67    pub async fn hello(&mut self, proto: Option<u8>) -> Result<HashMap<String, Vec<u8>>> {
68        let frame: Frame = Hello::new(proto).into_stream();
69
70        self.conn
71            .write_frame(&frame)
72            .await
73            .with_context(|| "failed to write frame for HELLO command")?;
74
75        match self
76            .read_response()
77            .await
78            .with_context(|| "failed to read response for HELLO command")?
79        {
80            Response::Array(data) => {
81                let map = data
82                    .chunks(2)
83                    .filter_map(|chunk| {
84                        if chunk.len() == 2 {
85                            let key = from_utf8(&chunk[0]).ok()?.to_string();
86                            let value = chunk[1].to_vec();
87                            Some((key, value))
88                        } else {
89                            None
90                        }
91                    })
92                    .collect();
93
94                Ok(map)
95            }
96            Response::Map(data) => Ok(data),
97            Response::Error(err) => Err(err),
98            _ => Err(RedisError::UnexpectedResponseType),
99        }
100    }
101
102    /// Sends a PING command to the Redis server, optionally with a message.
103    ///
104    /// # Arguments
105    ///
106    /// * `msg` - An optional message to send to the server
107    ///
108    /// # Returns
109    ///
110    /// * `Ok(String)` if the PING command is successful
111    /// * `Err(RedisError)` if an error occurs
112    ///     
113    /// # Examples
114    ///
115    /// ```ignore
116    /// use async_redis::Client;
117    ///
118    /// #[tokio::main]
119    /// async fn main() {
120    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
121    ///     let resp = client.ping(Some("Hello Redis".to_string())).await.unwrap();
122    /// }
123    /// ```
124    pub async fn ping(&mut self, msg: Option<&[u8]>) -> Result<Vec<u8>> {
125        let frame: Frame = Ping::new(msg).into_stream();
126
127        self.conn
128            .write_frame(&frame)
129            .await
130            .with_context(|| "failed to write frame for PING command")?;
131
132        match self
133            .read_response()
134            .await
135            .with_context(|| "failed to read response for PING command")?
136        {
137            Response::Simple(data) => Ok(data),
138            Response::Error(err) => Err(err),
139            _ => Err(RedisError::UnexpectedResponseType),
140        }
141    }
142
143    /// Sends a GET command to the Redis server.
144    ///
145    /// # Description
146    ///
147    /// The GET command retrieves the value of a key stored on the Redis server.
148    ///
149    /// # Arguments
150    ///
151    /// * `key` - A required key to send to the server
152    ///
153    /// # Returns
154    ///
155    /// * `Ok(Some(String))` if the key to GET exists
156    /// * `Ok(None)` if the key to GET does not exist
157    /// * `Err(RedisError)` if an error occurs
158    ///     
159    /// # Examples
160    ///
161    /// ```ignore
162    /// use async_redis::Client;
163    ///
164    /// #[tokio::main]
165    /// async fn main() {
166    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
167    ///     let resp = client.get("mykey").await?;
168    /// }
169    /// ```
170    pub async fn get(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
171        let frame: Frame = Get::new(key).into_stream();
172
173        self.conn
174            .write_frame(&frame)
175            .await
176            .with_context(|| "failed to write frame for GET command")?;
177
178        match self
179            .read_response()
180            .await
181            .with_context(|| "failed to read response for GET command")?
182        {
183            Response::Simple(data) => Ok(Some(data)),
184            Response::Null => Ok(None),
185            Response::Error(err) => Err(err),
186            _ => Err(RedisError::UnexpectedResponseType),
187        }
188    }
189
190    /// Sends a GETEX command to the Redis server.
191    #[allow(unused_variables)]
192    pub async fn get_ex(&mut self, key: &str, seconds: i64) -> Result<Option<Vec<u8>>> {
193        todo!("GETEX command is not implemented yet");
194        // let frame: Frame = GetEx::new(key, seconds).into_stream();
195
196        // self.conn.write_frame(&frame).await?;
197
198        // match self.read_response().await? {
199        //     Response::Simple(data) => Ok(Some(data)),
200        //     Response::Null => Ok(None),
201        //     Response::Error(err) => Err(err),
202        //     _ => Err(RedisError::UnexpectedResponseType),
203        // }
204    }
205
206    /// Sends a MGET command to the Redis server.
207    #[allow(unused_variables)]
208    pub async fn mget(&mut self, keys: Vec<&str>) -> Result<Option<Vec<Vec<u8>>>> {
209        todo!("MGET command is not implemented yet");
210        // let frame: Frame = MGet::new(keys).into_stream();
211
212        // self.conn.write_frame(&frame).await?;
213
214        // match self.read_response().await? {
215        //     Response::Array(data) => Ok(Some(data)),
216        //     Response::Null => Ok(None),
217        //     Response::Error(err) => Err(err),
218        //     _ => Err(RedisError::UnexpectedResponseType),
219        // }
220    }
221
222    // todo: the real SET command has some other options like EX, PX, NX, XX
223    // we need to add these options to the SET command. Possibly with option pattern
224    /// Sends a SET command to the Redis server.
225    ///
226    /// # Description
227    ///
228    /// The SET command sets the value of a key in the Redis server.
229    ///
230    /// # Arguments
231    ///
232    /// * `key` - A required key to set
233    /// * `val` - A required value to set
234    ///
235    /// # Returns
236    ///
237    /// * `Ok(Some(String))` if the key is set successfully
238    /// * `Ok(None)` if the key is not set
239    ///
240    /// # Examples
241    ///
242    /// ```ignore
243    /// use async_redis::Client;
244    ///
245    /// #[tokio::main]
246    /// async fn main() {
247    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
248    ///     let resp = client.set("mykey", "myvalue").await?;
249    /// }
250    pub async fn set(&mut self, key: &str, val: &[u8]) -> Result<Option<Vec<u8>>> {
251        let frame: Frame = Set::new(key, val).into_stream();
252
253        self.conn
254            .write_frame(&frame)
255            .await
256            .with_context(|| "failed to write frame for SET command")?;
257
258        match self
259            .read_response()
260            .await
261            .with_context(|| "failed to read response for SET command")?
262        {
263            Response::Simple(data) => Ok(Some(data)),
264            Response::Null => Ok(None),
265            Response::Error(err) => Err(err),
266            _ => Err(RedisError::UnexpectedResponseType),
267        }
268    }
269
270    /// Sends a SETEX command to the Redis server.
271    #[allow(unused_variables)]
272    pub async fn set_ex(&mut self, key: &str, val: &[u8], seconds: i64) -> Result<Option<Vec<u8>>> {
273        todo!("SETEX command is not implemented yet");
274        // let frame: Frame = SetEx::new(key, val, seconds).into_stream();
275
276        // self.conn.write_frame(&frame).await?;
277
278        // match self.read_response().await? {
279        //     Response::Simple(data) => Ok(Some(data)),
280        //     Response::Null => Ok(None),
281        //     Response::Error(err) => Err(err),
282        //     _ => Err(RedisError::UnexpectedResponseType),
283        // }
284    }
285
286    /// Sends a SETNX command to the Redis server.
287    #[allow(unused_variables)]
288    pub async fn set_nx(&mut self, key: &str, val: &[u8]) -> Result<Option<Vec<u8>>> {
289        todo!("SETNX command is not implemented yet");
290        // let frame: Frame = SetNx::new(key, val).into_stream();
291
292        // self.conn.write_frame(&frame).await?;
293
294        // match self.read_response().await? {
295        //     Response::Simple(data) => Ok(Some(data)),
296        //     Response::Null => Ok(None),
297        //     Response::Error(err) => Err(err),
298        //     _ => Err(RedisError::UnexpectedResponseType),
299        // }
300    }
301
302    /// Sends a DEL command to the Redis server.
303    ///
304    /// # Description
305    ///
306    /// The DEL command deletes a key from the Redis server.
307    ///
308    /// # Arguments
309    ///
310    /// * `keys` - A required vector of keys to delete
311    ///
312    /// # Returns
313    ///
314    /// * `Ok(u64)` the number of keys deleted
315    ///
316    /// # Examples
317    ///
318    /// ```ignore
319    ///
320    /// use async_redis::Client;
321    ///
322    /// #[tokio::main]
323    /// async fn main() {
324    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
325    ///     let resp = client.del(vec!["foo", "bar", "baz"]).await?;
326    /// }
327    pub async fn del(&mut self, keys: Vec<&str>) -> Result<u64> {
328        let frame: Frame = Del::new(keys).into_stream();
329
330        self.conn
331            .write_frame(&frame)
332            .await
333            .with_context(|| "failed to write frame for DEL command")?;
334
335        match self
336            .read_response()
337            .await
338            .with_context(|| "failed to read response for DEL command")?
339        {
340            Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
341            Response::Error(err) => Err(err),
342            _ => Err(RedisError::UnexpectedResponseType),
343        }
344    }
345
346    /// Sends an EXISTS command to the Redis server.
347    ///
348    /// # Description
349    ///
350    /// The EXISTS command checks if a key exists in the Redis server.
351    ///
352    /// # Arguments
353    ///
354    /// * `keys` - A required vector of keys to check
355    ///
356    /// # Returns
357    ///
358    /// * `Ok(u64)` the number of keys that exist
359    ///
360    /// # Examples
361    ///
362    /// ```ignore
363    /// #[tokio::main]
364    /// async fn main() {
365    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
366    ///     let resp = client.exists(vec!["foo", "bar", "baz"]).await?;
367    /// }
368    pub async fn exists(&mut self, keys: Vec<&str>) -> Result<u64> {
369        let frame: Frame = Exists::new(keys).into_stream();
370
371        self.conn
372            .write_frame(&frame)
373            .await
374            .with_context(|| "failed to write frame for EXISTS command")?;
375
376        match self
377            .read_response()
378            .await
379            .with_context(|| "failed to read response for EXISTS command")?
380        {
381            Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
382            Response::Error(err) => Err(err),
383            _ => Err(RedisError::UnexpectedResponseType),
384        }
385    }
386
387    // todo: add EXAT, PXAT, NX, XX options
388    /// Sends an EXPIRE command to the Redis server.
389    ///
390    /// # Description
391    ///
392    /// The EXPIRE command sets a timeout on a key. After the timeout has expired, the key will be deleted.
393    ///
394    /// # Arguments
395    ///
396    /// * `key` - A required key to set the timeout
397    /// * `seconds` - A required number of seconds to set the timeout
398    ///
399    /// # Returns
400    ///
401    /// * `Ok(1)` if the key is set successfully
402    /// * `Ok(0)` if the key is not set
403    ///
404    /// # Examples
405    ///
406    /// ```ignore
407    /// #[tokio::main]
408    /// async fn main() {
409    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
410    ///     let resp = client.expire("mykey", 1).await?;
411    /// }
412    pub async fn expire(&mut self, key: &str, seconds: i64) -> Result<u64> {
413        let frame: Frame = Expire::new(key, seconds).into_stream();
414
415        self.conn
416            .write_frame(&frame)
417            .await
418            .with_context(|| "failed to write frame for EXPIRE command")?;
419
420        match self
421            .read_response()
422            .await
423            .with_context(|| "failed to read response for EXPIRE command")?
424        {
425            Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
426            Response::Error(err) => Err(err),
427            _ => Err(RedisError::UnexpectedResponseType),
428        }
429    }
430
431    /// Sends a TTL command to the Redis server.
432    ///
433    /// # Description
434    ///
435    /// The TTL command returns the remaining time to live of a key that has an expire set.
436    ///
437    /// # Arguments
438    ///
439    /// * `key` - A required key to check ttl
440    ///
441    /// # Returns
442    ///
443    /// * `Ok(-2)` if the key does not exist
444    /// * `Ok(-1)` if the key exists but has no expire set
445    /// * `Ok(other)` if the key exists and has an expire set
446    ///
447    /// # Examples
448    ///
449    /// ```ignore
450    /// #[tokio::main]
451    /// async fn main() {
452    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
453    ///     let resp = client.ttl("mykey").await?;
454    /// }
455    pub async fn ttl(&mut self, key: &str) -> Result<i64> {
456        let frame: Frame = Ttl::new(key).into_stream();
457
458        self.conn
459            .write_frame(&frame)
460            .await
461            .with_context(|| "failed to write frame for TTL command")?;
462
463        match self
464            .read_response()
465            .await
466            .with_context(|| "failed to read response for TTL command")?
467        {
468            Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
469            Response::Error(err) => Err(err),
470            _ => Err(RedisError::UnexpectedResponseType),
471        }
472    }
473
474    /// Sends an INCR command to the Redis server.
475    ///
476    /// # Description
477    ///
478    /// The INCR command increments the integer value of a key by one.
479    ///
480    /// # Arguments
481    ///
482    /// * `key` - A required key to increment
483    ///
484    /// # Returns
485    ///
486    /// * `Ok(i64)` the new value of the key after increment
487    /// * `Err(RedisError)` if an error occurs
488    ///
489    /// # Examples
490    ///
491    /// ```ignore
492    /// #[tokio::main]
493    /// async fn main() {
494    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
495    ///     let resp = client.incr("mykey").await?;
496    /// }
497    pub async fn incr(&mut self, key: &str) -> Result<i64> {
498        let frame: Frame = Incr::new(key).into_stream();
499
500        self.conn
501            .write_frame(&frame)
502            .await
503            .with_context(|| "failed to write frame for INCR command")?;
504
505        match self
506            .read_response()
507            .await
508            .with_context(|| "failed to read response for INCR command")?
509        {
510            Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
511            Response::Error(err) => Err(err),
512            _ => Err(RedisError::UnexpectedResponseType),
513        }
514    }
515
516    /// Sends an INCRBY command to the Redis server.
517    #[allow(unused_variables)]
518    pub async fn incr_by(&mut self, key: &str, increment: i64) -> Result<i64> {
519        todo!("INCRBY command is not implemented yet");
520        // let frame: Frame = IncrBy::new(key, increment).into_stream();
521
522        // self.conn.write_frame(&frame).await?;
523
524        // match self.read_response().await? {
525        //     Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
526        //     Response::Error(err) => Err(err),
527        //     _ => Err(RedisError::UnexpectedResponseType),
528        // }
529    }
530
531    /// Sends an INCRBYFLOAT command to the Redis server.
532    #[allow(unused_variables)]
533    pub async fn incr_by_float(&mut self, key: &str, increment: f64) -> Result<f64> {
534        todo!("INCRBYFLOAT command is not implemented yet");
535        // let frame: Frame = IncrByFloat::new(key, increment).into_stream();
536
537        // self.conn.write_frame(&frame).await?;
538
539        // match self.read_response().await? {
540        //     Response::Simple(data) => Ok(from_utf8(&data)?.parse::<f64>()?),
541        //     Response::Error(err) => Err(err),
542        //     _ => Err(RedisError::UnexpectedResponseType),
543        // }
544    }
545
546    /// Sends a DECR command to the Redis server.
547    ///
548    /// # Description
549    ///
550    /// The DECR command decrements the integer value of a key by one.
551    ///
552    /// # Arguments
553    ///
554    /// * `key` - A required key to decrement
555    ///
556    /// # Returns
557    ///
558    /// * `Ok(i64)` the new value of the key after decrement
559    /// * `Err(RedisError)` if an error occurs
560    ///
561    /// # Examples
562    ///
563    /// ```ignore
564    /// #[tokio::main]
565    /// async fn main() {
566    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
567    ///     let resp = client.decr("mykey").await?;
568    /// }
569    pub async fn decr(&mut self, key: &str) -> Result<i64> {
570        let frame: Frame = Decr::new(key).into_stream();
571
572        self.conn
573            .write_frame(&frame)
574            .await
575            .with_context(|| "failed to write frame for DECR command")?;
576
577        match self
578            .read_response()
579            .await
580            .with_context(|| "failed to read response for DECR command")?
581        {
582            Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
583            Response::Error(err) => Err(err),
584            _ => Err(RedisError::UnexpectedResponseType),
585        }
586    }
587
588    /// Sends a DECRBY command to the Redis server.
589    #[allow(unused_variables)]
590    pub async fn decr_by(&mut self, key: &str, decrement: i64) -> Result<i64> {
591        todo!("DECRBY command is not implemented yet");
592        // let frame: Frame = DecrBy::new(key, decrement).into_stream();
593
594        // self.conn.write_frame(&frame).await?;
595
596        // match self.read_response().await? {
597        //     Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
598        //     Response::Error(err) => Err(err),
599        //     _ => Err(RedisError::UnexpectedResponseType),
600        // }
601    }
602
603    /// Sends a DECRBYFLOAT command to the Redis server.
604    #[allow(unused_variables)]
605    pub async fn decr_by_float(&mut self, key: &str, decrement: f64) -> Result<f64> {
606        todo!("DECRBYFLOAT command is not implemented yet");
607        // let frame: Frame = DecrByFloat::new(key, decrement).into_stream();
608
609        // self.conn.write_frame(&frame).await?;
610
611        // match self.read_response().await? {
612        //     Response::Simple(data) => Ok(from_utf8(&data)?.parse::<f64>()?),
613        //     Response::Error(err) => Err(err),
614        //     _ => Err(RedisError::UnexpectedResponseType),
615        // }
616    }
617
618    /// Sends an LPUSH command to the Redis server.
619    ///
620    /// # Description
621    ///
622    /// The LPUSH command inserts all the specified values at the head of the list stored at key.
623    ///
624    /// # Arguments
625    ///
626    /// * `key` - A required key to insert values
627    /// * `values` - A required vector of values to insert
628    ///
629    /// # Returns
630    ///
631    /// * `Ok(u64)` the length of the list after the push operation
632    /// * `Err(RedisError)` if an error occurs
633    ///
634    /// # Examples
635    ///
636    /// ```ignore
637    /// #[tokio::main]
638    /// async fn main() {
639    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
640    ///     let resp = client.lpush("mykey", vec!["foo", "bar", "baz"]).await?;
641    /// }
642    pub async fn lpush(&mut self, key: &str, values: Vec<&[u8]>) -> Result<u64> {
643        let frame: Frame = LPush::new(key, values).into_stream();
644
645        self.conn
646            .write_frame(&frame)
647            .await
648            .with_context(|| "failed to write frame for LPUSH command")?;
649
650        match self
651            .read_response()
652            .await
653            .with_context(|| "failed to read response for LPUSH command")?
654        {
655            Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
656            Response::Error(err) => Err(err),
657            _ => Err(RedisError::UnexpectedResponseType),
658        }
659    }
660
661    /// Sends an RPUSH command to the Redis server.
662    ///
663    /// # Description
664    ///
665    /// The RPUSH command inserts all the specified values at the tail of the list stored at key.
666    ///
667    /// # Arguments
668    ///
669    /// * `key` - A required key to insert values
670    /// * `values` - A required vector of values to insert
671    ///
672    /// # Returns
673    ///
674    /// * `Ok(u64)` the length of the list after the push operation
675    ///
676    /// # Examples
677    ///
678    /// ```ignore
679    /// #[tokio::main]
680    /// async fn main() {
681    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
682    ///     let resp = client.rpush("mykey", vec!["foo", "bar", "baz"]).await?;
683    /// }
684    pub async fn rpush(&mut self, key: &str, values: Vec<&[u8]>) -> Result<u64> {
685        let frame: Frame = RPush::new(key, values).into_stream();
686
687        self.conn
688            .write_frame(&frame)
689            .await
690            .with_context(|| "failed to write frame for RPUSH command")?;
691
692        match self
693            .read_response()
694            .await
695            .with_context(|| "failed to read response for RPUSH command")?
696        {
697            Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
698            Response::Error(err) => Err(err),
699            _ => Err(RedisError::UnexpectedResponseType),
700        }
701    }
702
703    /// Sends an LPOP command to the Redis server.
704    ///
705    /// # Description
706    ///
707    /// The LPOP command removes and returns the removed elements from the head of the list stored at key.
708    ///
709    /// # Arguments
710    ///
711    /// * `key` - A required key to remove values
712    /// * `count` - An optional number of elements to remove
713    ///
714    /// # Returns
715    ///
716    /// * `Ok(Some(String))` if the key exists and the elements are removed
717    /// * `Ok(None)` if the key does not exist
718    /// * `Err(RedisError)` if an error occurs
719    ///
720    /// # Examples
721    ///
722    /// ```ignore
723    /// #[tokio::main]
724    /// async fn main() {
725    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
726    ///     let resp = client.lpop("mykey", 1).await?;
727    /// }
728    pub async fn lpop(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
729        let frame: Frame = LPop::new(key, None).into_stream();
730
731        self.conn
732            .write_frame(&frame)
733            .await
734            .with_context(|| "failed to write frame for LPOP command")?;
735
736        match self
737            .read_response()
738            .await
739            .with_context(|| "failed to read response for LPOP command")?
740        {
741            Response::Simple(data) => Ok(Some(data)),
742            Response::Null => Ok(None),
743            Response::Error(err) => Err(err),
744            _ => Err(RedisError::UnexpectedResponseType),
745        }
746    }
747
748    pub async fn lpop_n(&mut self, key: &str, count: u64) -> Result<Option<Vec<Vec<u8>>>> {
749        let frame: Frame = LPop::new(key, Some(count)).into_stream();
750
751        self.conn
752            .write_frame(&frame)
753            .await
754            .with_context(|| "failed to write frame for LPOP command")?;
755
756        match self
757            .read_response()
758            .await
759            .with_context(|| "failed to read response for LPOP command")?
760        {
761            Response::Array(data) => Ok(Some(data)),
762            Response::Null => Ok(None),
763            Response::Error(err) => Err(err),
764            _ => Err(RedisError::UnexpectedResponseType),
765        }
766    }
767
768    /// Sends an RPOP command to the Redis server.
769    ///
770    /// # Description
771    ///
772    /// The RPOP command removes and returns the removed elements from the tail of the list stored at key.
773    ///
774    /// # Arguments
775    ///
776    /// * `key` - A required key to remove values
777    /// * `count` - An optional number of elements to remove
778    ///
779    /// # Returns
780    ///
781    /// * `Ok(Some(String))` if the key exists and the elements are removed
782    /// * `Ok(None)` if the key does not exist
783    /// * `Err(RedisError)` if an error occurs
784    ///
785    /// # Examples
786    ///
787    /// ```ignore
788    /// #[tokio::main]
789    /// async fn main() {
790    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
791    ///     let resp = client.rpop("mykey", 1).await?;
792    /// }
793    pub async fn rpop(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
794        let frame: Frame = RPop::new(key, None).into_stream();
795
796        self.conn
797            .write_frame(&frame)
798            .await
799            .with_context(|| "failed to write frame for RPOP command")?;
800
801        match self
802            .read_response()
803            .await
804            .with_context(|| "failed to read response for RPOP command")?
805        {
806            Response::Simple(data) => Ok(Some(data)),
807            Response::Null => Ok(None),
808            Response::Error(err) => Err(err),
809            _ => Err(RedisError::UnexpectedResponseType),
810        }
811    }
812
813    pub async fn rpop_n(&mut self, key: &str, count: u64) -> Result<Option<Vec<Vec<u8>>>> {
814        let frame: Frame = RPop::new(key, Some(count)).into_stream();
815
816        self.conn
817            .write_frame(&frame)
818            .await
819            .with_context(|| "failed to write frame for RPOP command")?;
820
821        match self
822            .read_response()
823            .await
824            .with_context(|| "failed to read response for RPOP command")?
825        {
826            Response::Array(data) => Ok(Some(data)),
827            Response::Null => Ok(None),
828            Response::Error(err) => Err(err),
829            _ => Err(RedisError::UnexpectedResponseType),
830        }
831    }
832
833    /// Sends an LRANGE command to the Redis server.
834    ///
835    /// # Description
836    ///
837    /// The LRANGE command returns the specified elements of the list stored at key.
838    ///
839    /// # Arguments
840    ///
841    /// * `key` - A required key to get values
842    /// * `start` - A required start index
843    /// * `end` - A required end index
844    ///
845    /// # Returns
846    ///
847    /// * `Ok(Some(String))` if the key exists and the elements are returned
848    /// * `Ok(None)` if the key does not exist
849    /// * `Err(RedisError)` if an error occurs
850    ///
851    /// # Examples
852    ///
853    /// ```ignore
854    /// #[tokio::main]
855    /// async fn main() {
856    ///     let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
857    ///     let resp = client.lrange("mykey", 0, -1).await?;
858    /// }
859    pub async fn lrange(&mut self, key: &str, start: i64, end: i64) -> Result<Vec<Vec<u8>>> {
860        let frame: Frame = LRange::new(key, start, end).into_stream();
861
862        self.conn
863            .write_frame(&frame)
864            .await
865            .with_context(|| "failed to write frame for LRANGE command")?;
866
867        match self
868            .read_response()
869            .await
870            .with_context(|| "failed to read response for LRANGE command")?
871        {
872            Response::Array(data) => Ok(data),
873            Response::Error(err) => Err(err),
874            _ => Err(RedisError::UnexpectedResponseType),
875        }
876    }
877
878    /// Sends an HGET command to the Redis server.
879    #[allow(unused_variables)]
880    pub async fn hget(&mut self, key: &str, field: &str) -> Result<Option<Vec<u8>>> {
881        todo!("HGET command is not implemented yet");
882        // let frame: Frame = HGet::new(key, field).into_stream();
883
884        // self.conn.write_frame(&frame).await?;
885
886        // match self.read_response().await? {
887        //     Response::Simple(data) => Ok(Some(data)),
888        //     Response::Null => Ok(None),
889        //     Response::Error(err) => Err(err),
890        //     _ => Err(RedisError::UnexpectedResponseType),
891        // }
892    }
893
894    /// Sends an HMGET command to the Redis server.
895    #[allow(unused_variables)]
896    pub async fn hmget(&mut self, key: &str, fields: Vec<&str>) -> Result<Option<Vec<Vec<u8>>>> {
897        todo!("HMGET command is not implemented yet");
898        // let frame: Frame = HMGet::new(key, fields).into_stream();
899
900        // self.conn.write_frame(&frame).await?;
901
902        // match self.read_response().await? {
903        //     Response::Array(data) => Ok(Some(data)),
904        //     Response::Null => Ok(None),
905        //     Response::Error(err) => Err(err),
906        //     _ => Err(RedisError::UnexpectedResponseType),
907        // }
908    }
909
910    /// Sends an HGETALL command to the Redis server.
911    #[allow(unused_variables)]
912    pub async fn hget_all(&mut self, key: &str) -> Result<Option<HashMap<String, Vec<u8>>>> {
913        todo!("HGETALL command is not implemented yet");
914        // let frame: Frame = HGetAll::new(key).into_stream();
915
916        // self.conn.write_frame(&frame).await?;
917
918        // match self.read_response().await? {
919        //     Response::Map(data) => Ok(Some(data)),
920        //     Response::Null => Ok(None),
921        //     Response::Error(err) => Err(err),
922        //     _ => Err(RedisError::UnexpectedResponseType),
923        // }
924    }
925
926    /// Sends an HKEYS command to the Redis server.
927    #[allow(unused_variables)]
928    pub async fn hkeys(&mut self, key: &str) -> Result<Option<Vec<Vec<u8>>>> {
929        todo!("HKEYS command is not implemented yet");
930        // let frame: Frame = HKeys::new(key).into_stream();
931
932        // self.conn.write_frame(&frame).await?;
933
934        // match self.read_response().await? {
935        //     Response::Array(data) => Ok(Some(data)),
936        //     Response::Null => Ok(None),
937        //     Response::Error(err) => Err(err),
938        //     _ => Err(RedisError::UnexpectedResponseType),
939        // }
940    }
941
942    /// Sends an HVALS command to the Redis server.
943    #[allow(unused_variables)]
944    pub async fn hvals(&mut self, key: &str) -> Result<Option<Vec<Vec<u8>>>> {
945        todo!("HVALS command is not implemented yet");
946        // let frame: Frame = HVals::new(key).into_stream();
947
948        // self.conn.write_frame(&frame).await?;
949
950        // match self.read_response().await? {
951        //     Response::Array(data) => Ok(Some(data)),
952        //     Response::Null => Ok(None),
953        //     Response::Error(err) => Err(err),
954        //     _ => Err(RedisError::UnexpectedResponseType),
955        // }
956    }
957
958    /// Sends an HLEN command to the Redis server.
959    #[allow(unused_variables)]
960    pub async fn hlen(&mut self, key: &str) -> Result<Option<u64>> {
961        todo!("HLEN command is not implemented yet");
962        // let frame: Frame = HLen::new(key).into_stream();
963
964        // self.conn.write_frame(&frame).await?;
965
966        // match self.read_response().await? {
967        //     Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
968        //     Response::Null => Ok(None),
969        //     Response::Error(err) => Err(err),
970        //     _ => Err(RedisError::UnexpectedResponseType),
971        // }
972    }
973
974    /// Sends an HSET command to the Redis server.
975    #[allow(unused_variables)]
976    pub async fn hset(&mut self, key: &str, field: &str, value: &[u8]) -> Result<Option<Vec<u8>>> {
977        todo!("HSET command is not implemented yet");
978        // let frame: Frame = HSet::new(key, field, value).into_stream();
979
980        // self.conn.write_frame(&frame).await?;
981
982        // match self.read_response().await? {
983        //     Response::Simple(data) => Ok(Some(data)),
984        //     Response::Null => Ok(None),
985        //     Response::Error(err) => Err(err),
986        //     _ => Err(RedisError::UnexpectedResponseType),
987        // }
988    }
989
990    /// Sends an HSETNX command to the Redis server.
991    #[allow(unused_variables)]
992    pub async fn hset_nx(
993        &mut self,
994        key: &str,
995        field: &str,
996        value: &[u8],
997    ) -> Result<Option<Vec<u8>>> {
998        todo!("HSETNX command is not implemented yet");
999        // let frame: Frame = HSetNx::new(key, field, value).into_stream();
1000
1001        // self.conn.write_frame(&frame).await?;
1002
1003        // match self.read_response().await? {
1004        //     Response::Simple(data) => Ok(Some(data)),
1005        //     Response::Null => Ok(None),
1006        //     Response::Error(err) => Err(err),
1007        //     _ => Err(RedisError::UnexpectedResponseType),
1008        // }
1009    }
1010
1011    /// Sends an HMSET command to the Redis server.
1012    #[allow(unused_variables)]
1013    pub async fn hmset(
1014        &mut self,
1015        key: &str,
1016        fields: HashMap<String, Vec<u8>>,
1017    ) -> Result<Option<Vec<u8>>> {
1018        todo!("HMSET command is not implemented yet");
1019        // let frame: Frame = HMSet::new(key, fields).into_stream();
1020
1021        // self.conn.write_frame(&frame).await?;
1022
1023        // match self.read_response().await? {
1024        //     Response::Simple(data) => Ok(Some(data)),
1025        //     Response::Null => Ok(None),
1026        //     Response::Error(err) => Err(err),
1027        //     _ => Err(RedisError::UnexpectedResponseType),
1028        // }
1029    }
1030
1031    /// Sends an HDEL command to the Redis server.
1032    #[allow(unused_variables)]
1033    pub async fn hdel(&mut self, key: &str, field: &str) -> Result<Option<Vec<u8>>> {
1034        todo!("HDEL command is not implemented yet");
1035        // let frame: Frame = HDel::new(key, field).into_stream();
1036
1037        // self.conn.write_frame(&frame).await?;
1038
1039        // match self.read_response().await? {
1040        //     Response::Simple(data) => Ok(Some(data)),
1041        //     Response::Null => Ok(None),
1042        //     Response::Error(err) => Err(err),
1043        //     _ => Err(RedisError::UnexpectedResponseType),
1044        // }
1045    }
1046
1047    /// Sends an SADD command to the Redis server.
1048    #[allow(unused_variables)]
1049    pub async fn sadd(&mut self, key: &str, members: Vec<&[u8]>) -> Result<Option<Vec<u8>>> {
1050        todo!("SADD command is not implemented yet");
1051        // let frame: Frame = SAdd::new(key, members).into_stream();
1052
1053        // self.conn.write_frame(&frame).await?;
1054
1055        // match self.read_response().await? {
1056        //     Response::Simple(data) => Ok(Some(data)),
1057        //     Response::Null => Ok(None),
1058        //     Response::Error(err) => Err(err),
1059        //     _ => Err(RedisError::UnexpectedResponseType),
1060        // }
1061    }
1062
1063    /// Sends an SREM command to the Redis server.
1064    #[allow(unused_variables)]
1065    pub async fn srem(&mut self, key: &str, members: Vec<&[u8]>) -> Result<Option<Vec<u8>>> {
1066        todo!("SREM command is not implemented yet");
1067        // let frame: Frame = SRem::new(key, members).into_stream();
1068
1069        // self.conn.write_frame(&frame).await?;
1070
1071        // match self.read_response().await? {
1072        //     Response::Simple(data) => Ok(Some(data)),
1073        //     Response::Null => Ok(None),
1074        //     Response::Error(err) => Err(err),
1075        //     _ => Err(RedisError::UnexpectedResponseType),
1076        // }
1077    }
1078
1079    /// Sends an SISMEMBER command to the Redis server.
1080    #[allow(unused_variables)]
1081    pub async fn sismember(&mut self, key: &str, member: &[u8]) -> Result<Option<Vec<u8>>> {
1082        todo!("SISMEMBER command is not implemented yet");
1083        // let frame: Frame = SIsMember::new(key, member).into_stream();
1084
1085        // self.conn.write_frame(&frame).await?;
1086
1087        // match self.read_response().await? {
1088        //     Response::Simple(data) => Ok(Some(data)),
1089        //     Response::Null => Ok(None),
1090        //     Response::Error(err) => Err(err),
1091        //     _ => Err(RedisError::UnexpectedResponseType),
1092        // }
1093    }
1094
1095    /// Sends an SMEMBERS command to the Redis server.
1096    #[allow(unused_variables)]
1097    pub async fn smembers(&mut self, key: &str) -> Result<Option<Vec<Vec<u8>>>> {
1098        todo!("SMEMBERS command is not implemented yet");
1099        // let frame: Frame = SMembers::new(key).into_stream();
1100
1101        // self.conn.write_frame(&frame).await?;
1102
1103        // match self.read_response().await? {
1104        //     Response::Array(data) => Ok(Some(data)),
1105        //     Response::Null => Ok(None),
1106        //     Response::Error(err) => Err(err),
1107        //     _ => Err(RedisError::UnexpectedResponseType),
1108        // }
1109    }
1110
1111    /// Sends an SPOP command to the Redis server.
1112    #[allow(unused_variables)]
1113    pub async fn spop(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
1114        todo!("SPOP command is not implemented yet");
1115        // let frame: Frame = SPop::new(key).into_stream();
1116
1117        // self.conn.write_frame(&frame).await?;
1118
1119        // match self.read_response().await? {
1120        //     Response::Simple(data) => Ok(Some(data)),
1121        //     Response::Null => Ok(None),
1122        //     Response::Error(err) => Err(err),
1123        //     _ => Err(RedisError::UnexpectedResponseType),
1124        // }
1125    }
1126
1127    /// Sends a ZADD command to the Redis server.
1128    #[allow(unused_variables)]
1129    pub async fn zadd(
1130        &mut self,
1131        key: &str,
1132        members: HashMap<String, f64>,
1133    ) -> Result<Option<Vec<u8>>> {
1134        todo!("ZADD command is not implemented yet");
1135        // let frame: Frame = ZAdd::new(key, members).into_stream();
1136
1137        // self.conn.write_frame(&frame).await?;
1138
1139        // match self.read_response().await? {
1140        //     Response::Simple(data) => Ok(Some(data)),
1141        //     Response::Null => Ok(None),
1142        //     Response::Error(err) => Err(err),
1143        //     _ => Err(RedisError::UnexpectedResponseType),
1144        // }
1145    }
1146
1147    /// Sends a ZREM command to the Redis server.
1148    #[allow(unused_variables)]
1149    pub async fn zrem(&mut self, key: &str, members: Vec<&[u8]>) -> Result<Option<Vec<u8>>> {
1150        todo!("ZREM command is not implemented yet");
1151        // let frame: Frame = ZRem::new(key, members).into_stream();
1152
1153        // self.conn.write_frame(&frame).await?;
1154
1155        // match self.read_response().await? {
1156        //     Response::Simple(data) => Ok(Some(data)),
1157        //     Response::Null => Ok(None),
1158        //     Response::Error(err) => Err(err),
1159        //     _ => Err(RedisError::UnexpectedResponseType),
1160        // }
1161    }
1162
1163    /// Sends a ZRANGE command to the Redis server.
1164    #[allow(unused_variables)]
1165    pub async fn zrange(
1166        &mut self,
1167        key: &str,
1168        start: i64,
1169        end: i64,
1170    ) -> Result<Option<Vec<Vec<u8>>>> {
1171        todo!("ZRANGE command is not implemented yet");
1172        // let frame: Frame = ZRange::new(key, start, end).into_stream();
1173
1174        // self.conn.write_frame(&frame).await?;
1175
1176        // match self.read_response().await? {
1177        //     Response::Array(data) => Ok(Some(data)),
1178        //     Response::Null => Ok(None),
1179        //     Response::Error(err) => Err(err),
1180        //     _ => Err(RedisError::UnexpectedResponseType),
1181        // }
1182    }
1183
1184    /// Sends a ZREVRANGE command to the Redis server.
1185    #[allow(unused_variables)]
1186    pub async fn zrevrange(
1187        &mut self,
1188        key: &str,
1189        start: i64,
1190        end: i64,
1191    ) -> Result<Option<Vec<Vec<u8>>>> {
1192        todo!("ZREVRANGE command is not implemented yet");
1193        // let frame: Frame = ZRevRange::new(key, start, end).into_stream();
1194
1195        // self.conn.write_frame(&frame).await?;
1196
1197        // match self.read_response().await? {
1198        //     Response::Array(data) => Ok(Some(data)),
1199        //     Response::Null => Ok(None),
1200        //     Response::Error(err) => Err(err),
1201        //     _ => Err(RedisError::UnexpectedResponseType),
1202        // }
1203    }
1204
1205    /// Sends a ZRANK command to the Redis server.
1206    #[allow(unused_variables)]
1207    pub async fn zrank(&mut self, key: &str, member: &[u8]) -> Result<Option<u64>> {
1208        todo!("ZRANK command is not implemented yet");
1209        // let frame: Frame = ZRank::new(key, member).into_stream();
1210
1211        // self.conn.write_frame(&frame).await?;
1212
1213        // match self.read_response().await? {
1214        //     Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
1215        //     Response::Null => Ok(None),
1216        //     Response::Error(err) => Err(err),
1217        //     _ => Err(RedisError::UnexpectedResponseType),
1218        // }
1219    }
1220
1221    /// Sends a ZREVRANK command to the Redis server.
1222    #[allow(unused_variables)]
1223    pub async fn zrevrank(&mut self, key: &str, member: &[u8]) -> Result<Option<u64>> {
1224        todo!("ZREVRANK command is not implemented yet");
1225        // let frame: Frame = ZRevRank::new(key, member).into_stream();
1226
1227        // self.conn.write_frame(&frame).await?;
1228
1229        // match self.read_response().await? {
1230        //     Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
1231        //     Response::Null => Ok(None),
1232        //     Response::Error(err) => Err(err),
1233        //     _ => Err(RedisError::UnexpectedResponseType),
1234        // }
1235    }
1236
1237    /// Sends a ZSCORE command to the Redis server.
1238    #[allow(unused_variables)]
1239    pub async fn zscore(&mut self, key: &str, member: &[u8]) -> Result<Option<f64>> {
1240        todo!("ZSCORE command is not implemented yet");
1241        // let frame: Frame = ZScore::new(key, member).into_stream();
1242
1243        // self.conn.write_frame(&frame).await?;
1244
1245        // match self.read_response().await? {
1246        //     Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<f64>()?)),
1247        //     Response::Null => Ok(None),
1248        //     Response::Error(err) => Err(err),
1249        //     _ => Err(RedisError::UnexpectedResponseType),
1250        // }
1251    }
1252
1253    /// Sends a ZCARD command to the Redis server.
1254    #[allow(unused_variables)]
1255    pub async fn zcard(&mut self, key: &str) -> Result<Option<u64>> {
1256        todo!("ZCARD command is not implemented yet");
1257        // let frame: Frame = ZCard::new(key).into_stream();
1258
1259        // self.conn.write_frame(&frame).await?;
1260
1261        // match self.read_response().await? {
1262        //     Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
1263        //     Response::Null => Ok(None),
1264        //     Response::Error(err) => Err(err),
1265        //     _ => Err(RedisError::UnexpectedResponseType),
1266        // }
1267    }
1268
1269    /// Sends a ZCOUNT command to the Redis server.
1270    #[allow(unused_variables)]
1271    pub async fn zcount(&mut self, key: &str, min: f64, max: f64) -> Result<Option<u64>> {
1272        todo!("ZCOUNT command is not implemented yet");
1273        // let frame: Frame = ZCount::new(key, min, max).into_stream();
1274
1275        // self.conn.write_frame(&frame).await?;
1276
1277        // match self.read_response().await? {
1278        //     Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
1279        //     Response::Null => Ok(None),
1280        //     Response::Error(err) => Err(err),
1281        //     _ => Err(RedisError::UnexpectedResponseType),
1282        // }
1283    }
1284
1285    /// Sends a ZINCRBY command to the Redis server.
1286    #[allow(unused_variables)]
1287    pub async fn zincr_by(
1288        &mut self,
1289        key: &str,
1290        increment: f64,
1291        member: &[u8],
1292    ) -> Result<Option<f64>> {
1293        todo!("ZINCRBY command is not implemented yet");
1294        // let frame: Frame = ZIncrBy::new(key, increment, member).into_stream();
1295
1296        // self.conn.write_frame(&frame).await?;
1297
1298        // match self.read_response().await? {
1299        //     Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<f64>()?)),
1300        //     Response::Null => Ok(None),
1301        //     Response::Error(err) => Err(err),
1302        //     _ => Err(RedisError::UnexpectedResponseType),
1303        // }
1304    }
1305
1306    /// Reads the response from the server. The response is a searilzied frame.
1307    /// It decodes the frame and returns the human readable message to the client.
1308    ///
1309    /// # Returns
1310    ///
1311    /// * `Ok(Some(Bytes))` if the response is successfully read
1312    /// * `Ok(None)` if the response is empty
1313    /// * `Err(RedisError)` if an error occurs
1314    async fn read_response(&mut self) -> Result<Response> {
1315        match self.conn.read_frame().await? {
1316            Some(Frame::SimpleString(data)) => Ok(Response::Simple(data.into_bytes())),
1317            Some(Frame::SimpleError(data)) => Ok(Response::Error(RedisError::Other(anyhow!(data)))),
1318            Some(Frame::Integer(data)) => Ok(Response::Simple(data.to_string().into_bytes())),
1319            Some(Frame::BulkString(data)) => Ok(Response::Simple(data.to_vec())),
1320            Some(Frame::Array(data)) => {
1321                let result: Vec<Vec<u8>> = data
1322                    .into_iter()
1323                    .map(|frame| match frame {
1324                        Frame::BulkString(data) => data.to_vec(),
1325                        Frame::SimpleString(data) => data.into_bytes(),
1326                        Frame::Integer(data) => data.to_string().into_bytes(),
1327                        Frame::Array(data) => {
1328                            let result = data
1329                                .into_iter()
1330                                .map(|frame| match frame {
1331                                    Frame::BulkString(data) => data.to_vec(),
1332                                    Frame::SimpleString(data) => data.into_bytes(),
1333                                    Frame::Integer(data) => data.to_string().into_bytes(),
1334                                    Frame::Null => vec![],
1335                                    _ => {
1336                                        vec![]
1337                                    }
1338                                })
1339                                .collect::<Vec<_>>();
1340                            result.concat()
1341                        }
1342                        Frame::Null => vec![],
1343                        _ => vec![],
1344                    })
1345                    .collect();
1346
1347                Ok(Response::Array(result))
1348            }
1349            Some(Frame::Null) => Ok(Response::Null), // nil reply usually means no error
1350            Some(Frame::Boolean(data)) => {
1351                if data {
1352                    Ok(Response::Simple("true".into()))
1353                } else {
1354                    Ok(Response::Simple("false".into()))
1355                }
1356            }
1357            Some(Frame::Double(data)) => Ok(Response::Simple(data.to_string().into_bytes())),
1358            Some(Frame::BulkError(data)) => Ok(Response::Error(RedisError::Other(anyhow!(
1359                String::from_utf8_lossy(&data).to_string()
1360            )))),
1361            Some(Frame::Map(data)) => {
1362                let result: HashMap<String, Vec<u8>> = data
1363                    .into_iter()
1364                    .filter_map(|(key, value)| {
1365                        let key = match key {
1366                            Frame::BulkString(data) => String::from_utf8(data.to_vec()).ok(),
1367                            Frame::SimpleString(data) => Some(data),
1368                            Frame::Integer(data) => Some(data.to_string()),
1369                            _ => None,
1370                        };
1371
1372                        let value = match value {
1373                            Frame::BulkString(data) => Some(data.to_vec()),
1374                            Frame::SimpleString(data) => Some(data.into_bytes()),
1375                            Frame::Integer(data) => Some(data.to_string().into_bytes()),
1376                            _ => None,
1377                        };
1378
1379                        match (key, value) {
1380                            (Some(k), Some(v)) => Some((k, v)),
1381                            _ => None,
1382                        }
1383                    })
1384                    .collect();
1385
1386                Ok(Response::Map(result))
1387            }
1388            // todo: array response needed here
1389            Some(_) => unimplemented!(""),
1390            None => Err(RedisError::Unknown),
1391        }
1392    }
1393}