redis_asyncx/client.rs
1//! Redis client implementation.
2//!
3//! The clients default to RESP2 unless HELLO 3 is explicitly sent to switch to RESP3.
4//! The client is a simple wrapper around the Connection struct.
5//! It provides simple APIs to send commands to the Redis server and get the response.
6//! The client is designed to be used in an async context, using the tokio runtime.
7
8use crate::Connection;
9use crate::Frame;
10use crate::RedisError;
11use crate::Result;
12use crate::cmd::*;
13use anyhow::{Context, anyhow};
14use std::collections::HashMap;
15use std::str::from_utf8;
16use tokio::net::{TcpStream, ToSocketAddrs};
17
18#[derive(Debug)]
19pub enum Response {
20 Simple(Vec<u8>),
21 Array(Vec<Vec<u8>>),
22 Map(HashMap<String, Vec<u8>>),
23 Null,
24 Error(RedisError),
25}
26
27/// Redis client implementation.
28pub struct Client {
29 // todo: modify it to use a connection pool shared across multiple clients
30 // spawn a new connection for each client is inefficient when the number of clients is large
31 conn: Connection,
32}
33
34impl Client {
35 /// Establish a connection to the Redis server.
36 ///
37 /// # Examples
38 ///
39 /// ```ignore
40 /// use async_redis::Client;
41 ///
42 /// #[tokio::main]
43 /// async fn main() {
44 /// let mut c = Client::connect("127.0.0.1:6379").await.unwrap();
45 /// }
46 /// ```
47 pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
48 let stream = TcpStream::connect(addr)
49 .await
50 .with_context(|| "failed to connect to Redis server")?;
51
52 let conn = Connection::new(stream);
53
54 Ok(Client { conn })
55 }
56
57 /// Sends a HELLO command to the Redis server.
58 ///
59 /// # Arguments
60 ///
61 /// * `proto` - An optional protocol version to use
62 ///
63 /// # Returns
64 ///
65 /// * `Ok(HashMap<String, Vec<u8>>)` if the HELLO command is successful
66 /// * `Err(RedisError)` if an error occurs
67 pub async fn hello(&mut self, proto: Option<u8>) -> Result<HashMap<String, Vec<u8>>> {
68 let frame: Frame = Hello::new(proto).into_stream();
69
70 self.conn
71 .write_frame(&frame)
72 .await
73 .with_context(|| "failed to write frame for HELLO command")?;
74
75 match self
76 .read_response()
77 .await
78 .with_context(|| "failed to read response for HELLO command")?
79 {
80 Response::Array(data) => {
81 let map = data
82 .chunks(2)
83 .filter_map(|chunk| {
84 if chunk.len() == 2 {
85 let key = from_utf8(&chunk[0]).ok()?.to_string();
86 let value = chunk[1].to_vec();
87 Some((key, value))
88 } else {
89 None
90 }
91 })
92 .collect();
93
94 Ok(map)
95 }
96 Response::Map(data) => Ok(data),
97 Response::Error(err) => Err(err),
98 _ => Err(RedisError::UnexpectedResponseType),
99 }
100 }
101
102 /// Sends a PING command to the Redis server, optionally with a message.
103 ///
104 /// # Arguments
105 ///
106 /// * `msg` - An optional message to send to the server
107 ///
108 /// # Returns
109 ///
110 /// * `Ok(String)` if the PING command is successful
111 /// * `Err(RedisError)` if an error occurs
112 ///
113 /// # Examples
114 ///
115 /// ```ignore
116 /// use async_redis::Client;
117 ///
118 /// #[tokio::main]
119 /// async fn main() {
120 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
121 /// let resp = client.ping(Some("Hello Redis".to_string())).await.unwrap();
122 /// }
123 /// ```
124 pub async fn ping(&mut self, msg: Option<&[u8]>) -> Result<Vec<u8>> {
125 let frame: Frame = Ping::new(msg).into_stream();
126
127 self.conn
128 .write_frame(&frame)
129 .await
130 .with_context(|| "failed to write frame for PING command")?;
131
132 match self
133 .read_response()
134 .await
135 .with_context(|| "failed to read response for PING command")?
136 {
137 Response::Simple(data) => Ok(data),
138 Response::Error(err) => Err(err),
139 _ => Err(RedisError::UnexpectedResponseType),
140 }
141 }
142
143 /// Sends a GET command to the Redis server.
144 ///
145 /// # Description
146 ///
147 /// The GET command retrieves the value of a key stored on the Redis server.
148 ///
149 /// # Arguments
150 ///
151 /// * `key` - A required key to send to the server
152 ///
153 /// # Returns
154 ///
155 /// * `Ok(Some(String))` if the key to GET exists
156 /// * `Ok(None)` if the key to GET does not exist
157 /// * `Err(RedisError)` if an error occurs
158 ///
159 /// # Examples
160 ///
161 /// ```ignore
162 /// use async_redis::Client;
163 ///
164 /// #[tokio::main]
165 /// async fn main() {
166 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
167 /// let resp = client.get("mykey").await?;
168 /// }
169 /// ```
170 pub async fn get(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
171 let frame: Frame = Get::new(key).into_stream();
172
173 self.conn
174 .write_frame(&frame)
175 .await
176 .with_context(|| "failed to write frame for GET command")?;
177
178 match self
179 .read_response()
180 .await
181 .with_context(|| "failed to read response for GET command")?
182 {
183 Response::Simple(data) => Ok(Some(data)),
184 Response::Null => Ok(None),
185 Response::Error(err) => Err(err),
186 _ => Err(RedisError::UnexpectedResponseType),
187 }
188 }
189
190 /// Sends a GETEX command to the Redis server.
191 #[allow(unused_variables)]
192 pub async fn get_ex(&mut self, key: &str, seconds: i64) -> Result<Option<Vec<u8>>> {
193 todo!("GETEX command is not implemented yet");
194 // let frame: Frame = GetEx::new(key, seconds).into_stream();
195
196 // self.conn.write_frame(&frame).await?;
197
198 // match self.read_response().await? {
199 // Response::Simple(data) => Ok(Some(data)),
200 // Response::Null => Ok(None),
201 // Response::Error(err) => Err(err),
202 // _ => Err(RedisError::UnexpectedResponseType),
203 // }
204 }
205
206 /// Sends a MGET command to the Redis server.
207 #[allow(unused_variables)]
208 pub async fn mget(&mut self, keys: Vec<&str>) -> Result<Option<Vec<Vec<u8>>>> {
209 todo!("MGET command is not implemented yet");
210 // let frame: Frame = MGet::new(keys).into_stream();
211
212 // self.conn.write_frame(&frame).await?;
213
214 // match self.read_response().await? {
215 // Response::Array(data) => Ok(Some(data)),
216 // Response::Null => Ok(None),
217 // Response::Error(err) => Err(err),
218 // _ => Err(RedisError::UnexpectedResponseType),
219 // }
220 }
221
222 // todo: the real SET command has some other options like EX, PX, NX, XX
223 // we need to add these options to the SET command. Possibly with option pattern
224 /// Sends a SET command to the Redis server.
225 ///
226 /// # Description
227 ///
228 /// The SET command sets the value of a key in the Redis server.
229 ///
230 /// # Arguments
231 ///
232 /// * `key` - A required key to set
233 /// * `val` - A required value to set
234 ///
235 /// # Returns
236 ///
237 /// * `Ok(Some(String))` if the key is set successfully
238 /// * `Ok(None)` if the key is not set
239 ///
240 /// # Examples
241 ///
242 /// ```ignore
243 /// use async_redis::Client;
244 ///
245 /// #[tokio::main]
246 /// async fn main() {
247 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
248 /// let resp = client.set("mykey", "myvalue").await?;
249 /// }
250 pub async fn set(&mut self, key: &str, val: &[u8]) -> Result<Option<Vec<u8>>> {
251 let frame: Frame = Set::new(key, val).into_stream();
252
253 self.conn
254 .write_frame(&frame)
255 .await
256 .with_context(|| "failed to write frame for SET command")?;
257
258 match self
259 .read_response()
260 .await
261 .with_context(|| "failed to read response for SET command")?
262 {
263 Response::Simple(data) => Ok(Some(data)),
264 Response::Null => Ok(None),
265 Response::Error(err) => Err(err),
266 _ => Err(RedisError::UnexpectedResponseType),
267 }
268 }
269
270 /// Sends a SETEX command to the Redis server.
271 #[allow(unused_variables)]
272 pub async fn set_ex(&mut self, key: &str, val: &[u8], seconds: i64) -> Result<Option<Vec<u8>>> {
273 todo!("SETEX command is not implemented yet");
274 // let frame: Frame = SetEx::new(key, val, seconds).into_stream();
275
276 // self.conn.write_frame(&frame).await?;
277
278 // match self.read_response().await? {
279 // Response::Simple(data) => Ok(Some(data)),
280 // Response::Null => Ok(None),
281 // Response::Error(err) => Err(err),
282 // _ => Err(RedisError::UnexpectedResponseType),
283 // }
284 }
285
286 /// Sends a SETNX command to the Redis server.
287 #[allow(unused_variables)]
288 pub async fn set_nx(&mut self, key: &str, val: &[u8]) -> Result<Option<Vec<u8>>> {
289 todo!("SETNX command is not implemented yet");
290 // let frame: Frame = SetNx::new(key, val).into_stream();
291
292 // self.conn.write_frame(&frame).await?;
293
294 // match self.read_response().await? {
295 // Response::Simple(data) => Ok(Some(data)),
296 // Response::Null => Ok(None),
297 // Response::Error(err) => Err(err),
298 // _ => Err(RedisError::UnexpectedResponseType),
299 // }
300 }
301
302 /// Sends a DEL command to the Redis server.
303 ///
304 /// # Description
305 ///
306 /// The DEL command deletes a key from the Redis server.
307 ///
308 /// # Arguments
309 ///
310 /// * `keys` - A required vector of keys to delete
311 ///
312 /// # Returns
313 ///
314 /// * `Ok(u64)` the number of keys deleted
315 ///
316 /// # Examples
317 ///
318 /// ```ignore
319 ///
320 /// use async_redis::Client;
321 ///
322 /// #[tokio::main]
323 /// async fn main() {
324 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
325 /// let resp = client.del(vec!["foo", "bar", "baz"]).await?;
326 /// }
327 pub async fn del(&mut self, keys: Vec<&str>) -> Result<u64> {
328 let frame: Frame = Del::new(keys).into_stream();
329
330 self.conn
331 .write_frame(&frame)
332 .await
333 .with_context(|| "failed to write frame for DEL command")?;
334
335 match self
336 .read_response()
337 .await
338 .with_context(|| "failed to read response for DEL command")?
339 {
340 Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
341 Response::Error(err) => Err(err),
342 _ => Err(RedisError::UnexpectedResponseType),
343 }
344 }
345
346 /// Sends an EXISTS command to the Redis server.
347 ///
348 /// # Description
349 ///
350 /// The EXISTS command checks if a key exists in the Redis server.
351 ///
352 /// # Arguments
353 ///
354 /// * `keys` - A required vector of keys to check
355 ///
356 /// # Returns
357 ///
358 /// * `Ok(u64)` the number of keys that exist
359 ///
360 /// # Examples
361 ///
362 /// ```ignore
363 /// #[tokio::main]
364 /// async fn main() {
365 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
366 /// let resp = client.exists(vec!["foo", "bar", "baz"]).await?;
367 /// }
368 pub async fn exists(&mut self, keys: Vec<&str>) -> Result<u64> {
369 let frame: Frame = Exists::new(keys).into_stream();
370
371 self.conn
372 .write_frame(&frame)
373 .await
374 .with_context(|| "failed to write frame for EXISTS command")?;
375
376 match self
377 .read_response()
378 .await
379 .with_context(|| "failed to read response for EXISTS command")?
380 {
381 Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
382 Response::Error(err) => Err(err),
383 _ => Err(RedisError::UnexpectedResponseType),
384 }
385 }
386
387 // todo: add EXAT, PXAT, NX, XX options
388 /// Sends an EXPIRE command to the Redis server.
389 ///
390 /// # Description
391 ///
392 /// The EXPIRE command sets a timeout on a key. After the timeout has expired, the key will be deleted.
393 ///
394 /// # Arguments
395 ///
396 /// * `key` - A required key to set the timeout
397 /// * `seconds` - A required number of seconds to set the timeout
398 ///
399 /// # Returns
400 ///
401 /// * `Ok(1)` if the key is set successfully
402 /// * `Ok(0)` if the key is not set
403 ///
404 /// # Examples
405 ///
406 /// ```ignore
407 /// #[tokio::main]
408 /// async fn main() {
409 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
410 /// let resp = client.expire("mykey", 1).await?;
411 /// }
412 pub async fn expire(&mut self, key: &str, seconds: i64) -> Result<u64> {
413 let frame: Frame = Expire::new(key, seconds).into_stream();
414
415 self.conn
416 .write_frame(&frame)
417 .await
418 .with_context(|| "failed to write frame for EXPIRE command")?;
419
420 match self
421 .read_response()
422 .await
423 .with_context(|| "failed to read response for EXPIRE command")?
424 {
425 Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
426 Response::Error(err) => Err(err),
427 _ => Err(RedisError::UnexpectedResponseType),
428 }
429 }
430
431 /// Sends a TTL command to the Redis server.
432 ///
433 /// # Description
434 ///
435 /// The TTL command returns the remaining time to live of a key that has an expire set.
436 ///
437 /// # Arguments
438 ///
439 /// * `key` - A required key to check ttl
440 ///
441 /// # Returns
442 ///
443 /// * `Ok(-2)` if the key does not exist
444 /// * `Ok(-1)` if the key exists but has no expire set
445 /// * `Ok(other)` if the key exists and has an expire set
446 ///
447 /// # Examples
448 ///
449 /// ```ignore
450 /// #[tokio::main]
451 /// async fn main() {
452 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
453 /// let resp = client.ttl("mykey").await?;
454 /// }
455 pub async fn ttl(&mut self, key: &str) -> Result<i64> {
456 let frame: Frame = Ttl::new(key).into_stream();
457
458 self.conn
459 .write_frame(&frame)
460 .await
461 .with_context(|| "failed to write frame for TTL command")?;
462
463 match self
464 .read_response()
465 .await
466 .with_context(|| "failed to read response for TTL command")?
467 {
468 Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
469 Response::Error(err) => Err(err),
470 _ => Err(RedisError::UnexpectedResponseType),
471 }
472 }
473
474 /// Sends an INCR command to the Redis server.
475 ///
476 /// # Description
477 ///
478 /// The INCR command increments the integer value of a key by one.
479 ///
480 /// # Arguments
481 ///
482 /// * `key` - A required key to increment
483 ///
484 /// # Returns
485 ///
486 /// * `Ok(i64)` the new value of the key after increment
487 /// * `Err(RedisError)` if an error occurs
488 ///
489 /// # Examples
490 ///
491 /// ```ignore
492 /// #[tokio::main]
493 /// async fn main() {
494 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
495 /// let resp = client.incr("mykey").await?;
496 /// }
497 pub async fn incr(&mut self, key: &str) -> Result<i64> {
498 let frame: Frame = Incr::new(key).into_stream();
499
500 self.conn
501 .write_frame(&frame)
502 .await
503 .with_context(|| "failed to write frame for INCR command")?;
504
505 match self
506 .read_response()
507 .await
508 .with_context(|| "failed to read response for INCR command")?
509 {
510 Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
511 Response::Error(err) => Err(err),
512 _ => Err(RedisError::UnexpectedResponseType),
513 }
514 }
515
516 /// Sends an INCRBY command to the Redis server.
517 #[allow(unused_variables)]
518 pub async fn incr_by(&mut self, key: &str, increment: i64) -> Result<i64> {
519 todo!("INCRBY command is not implemented yet");
520 // let frame: Frame = IncrBy::new(key, increment).into_stream();
521
522 // self.conn.write_frame(&frame).await?;
523
524 // match self.read_response().await? {
525 // Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
526 // Response::Error(err) => Err(err),
527 // _ => Err(RedisError::UnexpectedResponseType),
528 // }
529 }
530
531 /// Sends an INCRBYFLOAT command to the Redis server.
532 #[allow(unused_variables)]
533 pub async fn incr_by_float(&mut self, key: &str, increment: f64) -> Result<f64> {
534 todo!("INCRBYFLOAT command is not implemented yet");
535 // let frame: Frame = IncrByFloat::new(key, increment).into_stream();
536
537 // self.conn.write_frame(&frame).await?;
538
539 // match self.read_response().await? {
540 // Response::Simple(data) => Ok(from_utf8(&data)?.parse::<f64>()?),
541 // Response::Error(err) => Err(err),
542 // _ => Err(RedisError::UnexpectedResponseType),
543 // }
544 }
545
546 /// Sends a DECR command to the Redis server.
547 ///
548 /// # Description
549 ///
550 /// The DECR command decrements the integer value of a key by one.
551 ///
552 /// # Arguments
553 ///
554 /// * `key` - A required key to decrement
555 ///
556 /// # Returns
557 ///
558 /// * `Ok(i64)` the new value of the key after decrement
559 /// * `Err(RedisError)` if an error occurs
560 ///
561 /// # Examples
562 ///
563 /// ```ignore
564 /// #[tokio::main]
565 /// async fn main() {
566 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
567 /// let resp = client.decr("mykey").await?;
568 /// }
569 pub async fn decr(&mut self, key: &str) -> Result<i64> {
570 let frame: Frame = Decr::new(key).into_stream();
571
572 self.conn
573 .write_frame(&frame)
574 .await
575 .with_context(|| "failed to write frame for DECR command")?;
576
577 match self
578 .read_response()
579 .await
580 .with_context(|| "failed to read response for DECR command")?
581 {
582 Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
583 Response::Error(err) => Err(err),
584 _ => Err(RedisError::UnexpectedResponseType),
585 }
586 }
587
588 /// Sends a DECRBY command to the Redis server.
589 #[allow(unused_variables)]
590 pub async fn decr_by(&mut self, key: &str, decrement: i64) -> Result<i64> {
591 todo!("DECRBY command is not implemented yet");
592 // let frame: Frame = DecrBy::new(key, decrement).into_stream();
593
594 // self.conn.write_frame(&frame).await?;
595
596 // match self.read_response().await? {
597 // Response::Simple(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
598 // Response::Error(err) => Err(err),
599 // _ => Err(RedisError::UnexpectedResponseType),
600 // }
601 }
602
603 /// Sends a DECRBYFLOAT command to the Redis server.
604 #[allow(unused_variables)]
605 pub async fn decr_by_float(&mut self, key: &str, decrement: f64) -> Result<f64> {
606 todo!("DECRBYFLOAT command is not implemented yet");
607 // let frame: Frame = DecrByFloat::new(key, decrement).into_stream();
608
609 // self.conn.write_frame(&frame).await?;
610
611 // match self.read_response().await? {
612 // Response::Simple(data) => Ok(from_utf8(&data)?.parse::<f64>()?),
613 // Response::Error(err) => Err(err),
614 // _ => Err(RedisError::UnexpectedResponseType),
615 // }
616 }
617
618 /// Sends an LPUSH command to the Redis server.
619 ///
620 /// # Description
621 ///
622 /// The LPUSH command inserts all the specified values at the head of the list stored at key.
623 ///
624 /// # Arguments
625 ///
626 /// * `key` - A required key to insert values
627 /// * `values` - A required vector of values to insert
628 ///
629 /// # Returns
630 ///
631 /// * `Ok(u64)` the length of the list after the push operation
632 /// * `Err(RedisError)` if an error occurs
633 ///
634 /// # Examples
635 ///
636 /// ```ignore
637 /// #[tokio::main]
638 /// async fn main() {
639 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
640 /// let resp = client.lpush("mykey", vec!["foo", "bar", "baz"]).await?;
641 /// }
642 pub async fn lpush(&mut self, key: &str, values: Vec<&[u8]>) -> Result<u64> {
643 let frame: Frame = LPush::new(key, values).into_stream();
644
645 self.conn
646 .write_frame(&frame)
647 .await
648 .with_context(|| "failed to write frame for LPUSH command")?;
649
650 match self
651 .read_response()
652 .await
653 .with_context(|| "failed to read response for LPUSH command")?
654 {
655 Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
656 Response::Error(err) => Err(err),
657 _ => Err(RedisError::UnexpectedResponseType),
658 }
659 }
660
661 /// Sends an RPUSH command to the Redis server.
662 ///
663 /// # Description
664 ///
665 /// The RPUSH command inserts all the specified values at the tail of the list stored at key.
666 ///
667 /// # Arguments
668 ///
669 /// * `key` - A required key to insert values
670 /// * `values` - A required vector of values to insert
671 ///
672 /// # Returns
673 ///
674 /// * `Ok(u64)` the length of the list after the push operation
675 ///
676 /// # Examples
677 ///
678 /// ```ignore
679 /// #[tokio::main]
680 /// async fn main() {
681 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
682 /// let resp = client.rpush("mykey", vec!["foo", "bar", "baz"]).await?;
683 /// }
684 pub async fn rpush(&mut self, key: &str, values: Vec<&[u8]>) -> Result<u64> {
685 let frame: Frame = RPush::new(key, values).into_stream();
686
687 self.conn
688 .write_frame(&frame)
689 .await
690 .with_context(|| "failed to write frame for RPUSH command")?;
691
692 match self
693 .read_response()
694 .await
695 .with_context(|| "failed to read response for RPUSH command")?
696 {
697 Response::Simple(data) => Ok(from_utf8(&data)?.parse::<u64>()?),
698 Response::Error(err) => Err(err),
699 _ => Err(RedisError::UnexpectedResponseType),
700 }
701 }
702
703 /// Sends an LPOP command to the Redis server.
704 ///
705 /// # Description
706 ///
707 /// The LPOP command removes and returns the removed elements from the head of the list stored at key.
708 ///
709 /// # Arguments
710 ///
711 /// * `key` - A required key to remove values
712 /// * `count` - An optional number of elements to remove
713 ///
714 /// # Returns
715 ///
716 /// * `Ok(Some(String))` if the key exists and the elements are removed
717 /// * `Ok(None)` if the key does not exist
718 /// * `Err(RedisError)` if an error occurs
719 ///
720 /// # Examples
721 ///
722 /// ```ignore
723 /// #[tokio::main]
724 /// async fn main() {
725 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
726 /// let resp = client.lpop("mykey", 1).await?;
727 /// }
728 pub async fn lpop(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
729 let frame: Frame = LPop::new(key, None).into_stream();
730
731 self.conn
732 .write_frame(&frame)
733 .await
734 .with_context(|| "failed to write frame for LPOP command")?;
735
736 match self
737 .read_response()
738 .await
739 .with_context(|| "failed to read response for LPOP command")?
740 {
741 Response::Simple(data) => Ok(Some(data)),
742 Response::Null => Ok(None),
743 Response::Error(err) => Err(err),
744 _ => Err(RedisError::UnexpectedResponseType),
745 }
746 }
747
748 pub async fn lpop_n(&mut self, key: &str, count: u64) -> Result<Option<Vec<Vec<u8>>>> {
749 let frame: Frame = LPop::new(key, Some(count)).into_stream();
750
751 self.conn
752 .write_frame(&frame)
753 .await
754 .with_context(|| "failed to write frame for LPOP command")?;
755
756 match self
757 .read_response()
758 .await
759 .with_context(|| "failed to read response for LPOP command")?
760 {
761 Response::Array(data) => Ok(Some(data)),
762 Response::Null => Ok(None),
763 Response::Error(err) => Err(err),
764 _ => Err(RedisError::UnexpectedResponseType),
765 }
766 }
767
768 /// Sends an RPOP command to the Redis server.
769 ///
770 /// # Description
771 ///
772 /// The RPOP command removes and returns the removed elements from the tail of the list stored at key.
773 ///
774 /// # Arguments
775 ///
776 /// * `key` - A required key to remove values
777 /// * `count` - An optional number of elements to remove
778 ///
779 /// # Returns
780 ///
781 /// * `Ok(Some(String))` if the key exists and the elements are removed
782 /// * `Ok(None)` if the key does not exist
783 /// * `Err(RedisError)` if an error occurs
784 ///
785 /// # Examples
786 ///
787 /// ```ignore
788 /// #[tokio::main]
789 /// async fn main() {
790 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
791 /// let resp = client.rpop("mykey", 1).await?;
792 /// }
793 pub async fn rpop(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
794 let frame: Frame = RPop::new(key, None).into_stream();
795
796 self.conn
797 .write_frame(&frame)
798 .await
799 .with_context(|| "failed to write frame for RPOP command")?;
800
801 match self
802 .read_response()
803 .await
804 .with_context(|| "failed to read response for RPOP command")?
805 {
806 Response::Simple(data) => Ok(Some(data)),
807 Response::Null => Ok(None),
808 Response::Error(err) => Err(err),
809 _ => Err(RedisError::UnexpectedResponseType),
810 }
811 }
812
813 pub async fn rpop_n(&mut self, key: &str, count: u64) -> Result<Option<Vec<Vec<u8>>>> {
814 let frame: Frame = RPop::new(key, Some(count)).into_stream();
815
816 self.conn
817 .write_frame(&frame)
818 .await
819 .with_context(|| "failed to write frame for RPOP command")?;
820
821 match self
822 .read_response()
823 .await
824 .with_context(|| "failed to read response for RPOP command")?
825 {
826 Response::Array(data) => Ok(Some(data)),
827 Response::Null => Ok(None),
828 Response::Error(err) => Err(err),
829 _ => Err(RedisError::UnexpectedResponseType),
830 }
831 }
832
833 /// Sends an LRANGE command to the Redis server.
834 ///
835 /// # Description
836 ///
837 /// The LRANGE command returns the specified elements of the list stored at key.
838 ///
839 /// # Arguments
840 ///
841 /// * `key` - A required key to get values
842 /// * `start` - A required start index
843 /// * `end` - A required end index
844 ///
845 /// # Returns
846 ///
847 /// * `Ok(Some(String))` if the key exists and the elements are returned
848 /// * `Ok(None)` if the key does not exist
849 /// * `Err(RedisError)` if an error occurs
850 ///
851 /// # Examples
852 ///
853 /// ```ignore
854 /// #[tokio::main]
855 /// async fn main() {
856 /// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
857 /// let resp = client.lrange("mykey", 0, -1).await?;
858 /// }
859 pub async fn lrange(&mut self, key: &str, start: i64, end: i64) -> Result<Vec<Vec<u8>>> {
860 let frame: Frame = LRange::new(key, start, end).into_stream();
861
862 self.conn
863 .write_frame(&frame)
864 .await
865 .with_context(|| "failed to write frame for LRANGE command")?;
866
867 match self
868 .read_response()
869 .await
870 .with_context(|| "failed to read response for LRANGE command")?
871 {
872 Response::Array(data) => Ok(data),
873 Response::Error(err) => Err(err),
874 _ => Err(RedisError::UnexpectedResponseType),
875 }
876 }
877
878 /// Sends an HGET command to the Redis server.
879 #[allow(unused_variables)]
880 pub async fn hget(&mut self, key: &str, field: &str) -> Result<Option<Vec<u8>>> {
881 todo!("HGET command is not implemented yet");
882 // let frame: Frame = HGet::new(key, field).into_stream();
883
884 // self.conn.write_frame(&frame).await?;
885
886 // match self.read_response().await? {
887 // Response::Simple(data) => Ok(Some(data)),
888 // Response::Null => Ok(None),
889 // Response::Error(err) => Err(err),
890 // _ => Err(RedisError::UnexpectedResponseType),
891 // }
892 }
893
894 /// Sends an HMGET command to the Redis server.
895 #[allow(unused_variables)]
896 pub async fn hmget(&mut self, key: &str, fields: Vec<&str>) -> Result<Option<Vec<Vec<u8>>>> {
897 todo!("HMGET command is not implemented yet");
898 // let frame: Frame = HMGet::new(key, fields).into_stream();
899
900 // self.conn.write_frame(&frame).await?;
901
902 // match self.read_response().await? {
903 // Response::Array(data) => Ok(Some(data)),
904 // Response::Null => Ok(None),
905 // Response::Error(err) => Err(err),
906 // _ => Err(RedisError::UnexpectedResponseType),
907 // }
908 }
909
910 /// Sends an HGETALL command to the Redis server.
911 #[allow(unused_variables)]
912 pub async fn hget_all(&mut self, key: &str) -> Result<Option<HashMap<String, Vec<u8>>>> {
913 todo!("HGETALL command is not implemented yet");
914 // let frame: Frame = HGetAll::new(key).into_stream();
915
916 // self.conn.write_frame(&frame).await?;
917
918 // match self.read_response().await? {
919 // Response::Map(data) => Ok(Some(data)),
920 // Response::Null => Ok(None),
921 // Response::Error(err) => Err(err),
922 // _ => Err(RedisError::UnexpectedResponseType),
923 // }
924 }
925
926 /// Sends an HKEYS command to the Redis server.
927 #[allow(unused_variables)]
928 pub async fn hkeys(&mut self, key: &str) -> Result<Option<Vec<Vec<u8>>>> {
929 todo!("HKEYS command is not implemented yet");
930 // let frame: Frame = HKeys::new(key).into_stream();
931
932 // self.conn.write_frame(&frame).await?;
933
934 // match self.read_response().await? {
935 // Response::Array(data) => Ok(Some(data)),
936 // Response::Null => Ok(None),
937 // Response::Error(err) => Err(err),
938 // _ => Err(RedisError::UnexpectedResponseType),
939 // }
940 }
941
942 /// Sends an HVALS command to the Redis server.
943 #[allow(unused_variables)]
944 pub async fn hvals(&mut self, key: &str) -> Result<Option<Vec<Vec<u8>>>> {
945 todo!("HVALS command is not implemented yet");
946 // let frame: Frame = HVals::new(key).into_stream();
947
948 // self.conn.write_frame(&frame).await?;
949
950 // match self.read_response().await? {
951 // Response::Array(data) => Ok(Some(data)),
952 // Response::Null => Ok(None),
953 // Response::Error(err) => Err(err),
954 // _ => Err(RedisError::UnexpectedResponseType),
955 // }
956 }
957
958 /// Sends an HLEN command to the Redis server.
959 #[allow(unused_variables)]
960 pub async fn hlen(&mut self, key: &str) -> Result<Option<u64>> {
961 todo!("HLEN command is not implemented yet");
962 // let frame: Frame = HLen::new(key).into_stream();
963
964 // self.conn.write_frame(&frame).await?;
965
966 // match self.read_response().await? {
967 // Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
968 // Response::Null => Ok(None),
969 // Response::Error(err) => Err(err),
970 // _ => Err(RedisError::UnexpectedResponseType),
971 // }
972 }
973
974 /// Sends an HSET command to the Redis server.
975 #[allow(unused_variables)]
976 pub async fn hset(&mut self, key: &str, field: &str, value: &[u8]) -> Result<Option<Vec<u8>>> {
977 todo!("HSET command is not implemented yet");
978 // let frame: Frame = HSet::new(key, field, value).into_stream();
979
980 // self.conn.write_frame(&frame).await?;
981
982 // match self.read_response().await? {
983 // Response::Simple(data) => Ok(Some(data)),
984 // Response::Null => Ok(None),
985 // Response::Error(err) => Err(err),
986 // _ => Err(RedisError::UnexpectedResponseType),
987 // }
988 }
989
990 /// Sends an HSETNX command to the Redis server.
991 #[allow(unused_variables)]
992 pub async fn hset_nx(
993 &mut self,
994 key: &str,
995 field: &str,
996 value: &[u8],
997 ) -> Result<Option<Vec<u8>>> {
998 todo!("HSETNX command is not implemented yet");
999 // let frame: Frame = HSetNx::new(key, field, value).into_stream();
1000
1001 // self.conn.write_frame(&frame).await?;
1002
1003 // match self.read_response().await? {
1004 // Response::Simple(data) => Ok(Some(data)),
1005 // Response::Null => Ok(None),
1006 // Response::Error(err) => Err(err),
1007 // _ => Err(RedisError::UnexpectedResponseType),
1008 // }
1009 }
1010
1011 /// Sends an HMSET command to the Redis server.
1012 #[allow(unused_variables)]
1013 pub async fn hmset(
1014 &mut self,
1015 key: &str,
1016 fields: HashMap<String, Vec<u8>>,
1017 ) -> Result<Option<Vec<u8>>> {
1018 todo!("HMSET command is not implemented yet");
1019 // let frame: Frame = HMSet::new(key, fields).into_stream();
1020
1021 // self.conn.write_frame(&frame).await?;
1022
1023 // match self.read_response().await? {
1024 // Response::Simple(data) => Ok(Some(data)),
1025 // Response::Null => Ok(None),
1026 // Response::Error(err) => Err(err),
1027 // _ => Err(RedisError::UnexpectedResponseType),
1028 // }
1029 }
1030
1031 /// Sends an HDEL command to the Redis server.
1032 #[allow(unused_variables)]
1033 pub async fn hdel(&mut self, key: &str, field: &str) -> Result<Option<Vec<u8>>> {
1034 todo!("HDEL command is not implemented yet");
1035 // let frame: Frame = HDel::new(key, field).into_stream();
1036
1037 // self.conn.write_frame(&frame).await?;
1038
1039 // match self.read_response().await? {
1040 // Response::Simple(data) => Ok(Some(data)),
1041 // Response::Null => Ok(None),
1042 // Response::Error(err) => Err(err),
1043 // _ => Err(RedisError::UnexpectedResponseType),
1044 // }
1045 }
1046
1047 /// Sends an SADD command to the Redis server.
1048 #[allow(unused_variables)]
1049 pub async fn sadd(&mut self, key: &str, members: Vec<&[u8]>) -> Result<Option<Vec<u8>>> {
1050 todo!("SADD command is not implemented yet");
1051 // let frame: Frame = SAdd::new(key, members).into_stream();
1052
1053 // self.conn.write_frame(&frame).await?;
1054
1055 // match self.read_response().await? {
1056 // Response::Simple(data) => Ok(Some(data)),
1057 // Response::Null => Ok(None),
1058 // Response::Error(err) => Err(err),
1059 // _ => Err(RedisError::UnexpectedResponseType),
1060 // }
1061 }
1062
1063 /// Sends an SREM command to the Redis server.
1064 #[allow(unused_variables)]
1065 pub async fn srem(&mut self, key: &str, members: Vec<&[u8]>) -> Result<Option<Vec<u8>>> {
1066 todo!("SREM command is not implemented yet");
1067 // let frame: Frame = SRem::new(key, members).into_stream();
1068
1069 // self.conn.write_frame(&frame).await?;
1070
1071 // match self.read_response().await? {
1072 // Response::Simple(data) => Ok(Some(data)),
1073 // Response::Null => Ok(None),
1074 // Response::Error(err) => Err(err),
1075 // _ => Err(RedisError::UnexpectedResponseType),
1076 // }
1077 }
1078
1079 /// Sends an SISMEMBER command to the Redis server.
1080 #[allow(unused_variables)]
1081 pub async fn sismember(&mut self, key: &str, member: &[u8]) -> Result<Option<Vec<u8>>> {
1082 todo!("SISMEMBER command is not implemented yet");
1083 // let frame: Frame = SIsMember::new(key, member).into_stream();
1084
1085 // self.conn.write_frame(&frame).await?;
1086
1087 // match self.read_response().await? {
1088 // Response::Simple(data) => Ok(Some(data)),
1089 // Response::Null => Ok(None),
1090 // Response::Error(err) => Err(err),
1091 // _ => Err(RedisError::UnexpectedResponseType),
1092 // }
1093 }
1094
1095 /// Sends an SMEMBERS command to the Redis server.
1096 #[allow(unused_variables)]
1097 pub async fn smembers(&mut self, key: &str) -> Result<Option<Vec<Vec<u8>>>> {
1098 todo!("SMEMBERS command is not implemented yet");
1099 // let frame: Frame = SMembers::new(key).into_stream();
1100
1101 // self.conn.write_frame(&frame).await?;
1102
1103 // match self.read_response().await? {
1104 // Response::Array(data) => Ok(Some(data)),
1105 // Response::Null => Ok(None),
1106 // Response::Error(err) => Err(err),
1107 // _ => Err(RedisError::UnexpectedResponseType),
1108 // }
1109 }
1110
1111 /// Sends an SPOP command to the Redis server.
1112 #[allow(unused_variables)]
1113 pub async fn spop(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
1114 todo!("SPOP command is not implemented yet");
1115 // let frame: Frame = SPop::new(key).into_stream();
1116
1117 // self.conn.write_frame(&frame).await?;
1118
1119 // match self.read_response().await? {
1120 // Response::Simple(data) => Ok(Some(data)),
1121 // Response::Null => Ok(None),
1122 // Response::Error(err) => Err(err),
1123 // _ => Err(RedisError::UnexpectedResponseType),
1124 // }
1125 }
1126
1127 /// Sends a ZADD command to the Redis server.
1128 #[allow(unused_variables)]
1129 pub async fn zadd(
1130 &mut self,
1131 key: &str,
1132 members: HashMap<String, f64>,
1133 ) -> Result<Option<Vec<u8>>> {
1134 todo!("ZADD command is not implemented yet");
1135 // let frame: Frame = ZAdd::new(key, members).into_stream();
1136
1137 // self.conn.write_frame(&frame).await?;
1138
1139 // match self.read_response().await? {
1140 // Response::Simple(data) => Ok(Some(data)),
1141 // Response::Null => Ok(None),
1142 // Response::Error(err) => Err(err),
1143 // _ => Err(RedisError::UnexpectedResponseType),
1144 // }
1145 }
1146
1147 /// Sends a ZREM command to the Redis server.
1148 #[allow(unused_variables)]
1149 pub async fn zrem(&mut self, key: &str, members: Vec<&[u8]>) -> Result<Option<Vec<u8>>> {
1150 todo!("ZREM command is not implemented yet");
1151 // let frame: Frame = ZRem::new(key, members).into_stream();
1152
1153 // self.conn.write_frame(&frame).await?;
1154
1155 // match self.read_response().await? {
1156 // Response::Simple(data) => Ok(Some(data)),
1157 // Response::Null => Ok(None),
1158 // Response::Error(err) => Err(err),
1159 // _ => Err(RedisError::UnexpectedResponseType),
1160 // }
1161 }
1162
1163 /// Sends a ZRANGE command to the Redis server.
1164 #[allow(unused_variables)]
1165 pub async fn zrange(
1166 &mut self,
1167 key: &str,
1168 start: i64,
1169 end: i64,
1170 ) -> Result<Option<Vec<Vec<u8>>>> {
1171 todo!("ZRANGE command is not implemented yet");
1172 // let frame: Frame = ZRange::new(key, start, end).into_stream();
1173
1174 // self.conn.write_frame(&frame).await?;
1175
1176 // match self.read_response().await? {
1177 // Response::Array(data) => Ok(Some(data)),
1178 // Response::Null => Ok(None),
1179 // Response::Error(err) => Err(err),
1180 // _ => Err(RedisError::UnexpectedResponseType),
1181 // }
1182 }
1183
1184 /// Sends a ZREVRANGE command to the Redis server.
1185 #[allow(unused_variables)]
1186 pub async fn zrevrange(
1187 &mut self,
1188 key: &str,
1189 start: i64,
1190 end: i64,
1191 ) -> Result<Option<Vec<Vec<u8>>>> {
1192 todo!("ZREVRANGE command is not implemented yet");
1193 // let frame: Frame = ZRevRange::new(key, start, end).into_stream();
1194
1195 // self.conn.write_frame(&frame).await?;
1196
1197 // match self.read_response().await? {
1198 // Response::Array(data) => Ok(Some(data)),
1199 // Response::Null => Ok(None),
1200 // Response::Error(err) => Err(err),
1201 // _ => Err(RedisError::UnexpectedResponseType),
1202 // }
1203 }
1204
1205 /// Sends a ZRANK command to the Redis server.
1206 #[allow(unused_variables)]
1207 pub async fn zrank(&mut self, key: &str, member: &[u8]) -> Result<Option<u64>> {
1208 todo!("ZRANK command is not implemented yet");
1209 // let frame: Frame = ZRank::new(key, member).into_stream();
1210
1211 // self.conn.write_frame(&frame).await?;
1212
1213 // match self.read_response().await? {
1214 // Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
1215 // Response::Null => Ok(None),
1216 // Response::Error(err) => Err(err),
1217 // _ => Err(RedisError::UnexpectedResponseType),
1218 // }
1219 }
1220
1221 /// Sends a ZREVRANK command to the Redis server.
1222 #[allow(unused_variables)]
1223 pub async fn zrevrank(&mut self, key: &str, member: &[u8]) -> Result<Option<u64>> {
1224 todo!("ZREVRANK command is not implemented yet");
1225 // let frame: Frame = ZRevRank::new(key, member).into_stream();
1226
1227 // self.conn.write_frame(&frame).await?;
1228
1229 // match self.read_response().await? {
1230 // Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
1231 // Response::Null => Ok(None),
1232 // Response::Error(err) => Err(err),
1233 // _ => Err(RedisError::UnexpectedResponseType),
1234 // }
1235 }
1236
1237 /// Sends a ZSCORE command to the Redis server.
1238 #[allow(unused_variables)]
1239 pub async fn zscore(&mut self, key: &str, member: &[u8]) -> Result<Option<f64>> {
1240 todo!("ZSCORE command is not implemented yet");
1241 // let frame: Frame = ZScore::new(key, member).into_stream();
1242
1243 // self.conn.write_frame(&frame).await?;
1244
1245 // match self.read_response().await? {
1246 // Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<f64>()?)),
1247 // Response::Null => Ok(None),
1248 // Response::Error(err) => Err(err),
1249 // _ => Err(RedisError::UnexpectedResponseType),
1250 // }
1251 }
1252
1253 /// Sends a ZCARD command to the Redis server.
1254 #[allow(unused_variables)]
1255 pub async fn zcard(&mut self, key: &str) -> Result<Option<u64>> {
1256 todo!("ZCARD command is not implemented yet");
1257 // let frame: Frame = ZCard::new(key).into_stream();
1258
1259 // self.conn.write_frame(&frame).await?;
1260
1261 // match self.read_response().await? {
1262 // Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
1263 // Response::Null => Ok(None),
1264 // Response::Error(err) => Err(err),
1265 // _ => Err(RedisError::UnexpectedResponseType),
1266 // }
1267 }
1268
1269 /// Sends a ZCOUNT command to the Redis server.
1270 #[allow(unused_variables)]
1271 pub async fn zcount(&mut self, key: &str, min: f64, max: f64) -> Result<Option<u64>> {
1272 todo!("ZCOUNT command is not implemented yet");
1273 // let frame: Frame = ZCount::new(key, min, max).into_stream();
1274
1275 // self.conn.write_frame(&frame).await?;
1276
1277 // match self.read_response().await? {
1278 // Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<u64>()?)),
1279 // Response::Null => Ok(None),
1280 // Response::Error(err) => Err(err),
1281 // _ => Err(RedisError::UnexpectedResponseType),
1282 // }
1283 }
1284
1285 /// Sends a ZINCRBY command to the Redis server.
1286 #[allow(unused_variables)]
1287 pub async fn zincr_by(
1288 &mut self,
1289 key: &str,
1290 increment: f64,
1291 member: &[u8],
1292 ) -> Result<Option<f64>> {
1293 todo!("ZINCRBY command is not implemented yet");
1294 // let frame: Frame = ZIncrBy::new(key, increment, member).into_stream();
1295
1296 // self.conn.write_frame(&frame).await?;
1297
1298 // match self.read_response().await? {
1299 // Response::Simple(data) => Ok(Some(from_utf8(&data)?.parse::<f64>()?)),
1300 // Response::Null => Ok(None),
1301 // Response::Error(err) => Err(err),
1302 // _ => Err(RedisError::UnexpectedResponseType),
1303 // }
1304 }
1305
1306 /// Reads the response from the server. The response is a searilzied frame.
1307 /// It decodes the frame and returns the human readable message to the client.
1308 ///
1309 /// # Returns
1310 ///
1311 /// * `Ok(Some(Bytes))` if the response is successfully read
1312 /// * `Ok(None)` if the response is empty
1313 /// * `Err(RedisError)` if an error occurs
1314 async fn read_response(&mut self) -> Result<Response> {
1315 match self.conn.read_frame().await? {
1316 Some(Frame::SimpleString(data)) => Ok(Response::Simple(data.into_bytes())),
1317 Some(Frame::SimpleError(data)) => Ok(Response::Error(RedisError::Other(anyhow!(data)))),
1318 Some(Frame::Integer(data)) => Ok(Response::Simple(data.to_string().into_bytes())),
1319 Some(Frame::BulkString(data)) => Ok(Response::Simple(data.to_vec())),
1320 Some(Frame::Array(data)) => {
1321 let result: Vec<Vec<u8>> = data
1322 .into_iter()
1323 .map(|frame| match frame {
1324 Frame::BulkString(data) => data.to_vec(),
1325 Frame::SimpleString(data) => data.into_bytes(),
1326 Frame::Integer(data) => data.to_string().into_bytes(),
1327 Frame::Array(data) => {
1328 let result = data
1329 .into_iter()
1330 .map(|frame| match frame {
1331 Frame::BulkString(data) => data.to_vec(),
1332 Frame::SimpleString(data) => data.into_bytes(),
1333 Frame::Integer(data) => data.to_string().into_bytes(),
1334 Frame::Null => vec![],
1335 _ => {
1336 vec![]
1337 }
1338 })
1339 .collect::<Vec<_>>();
1340 result.concat()
1341 }
1342 Frame::Null => vec![],
1343 _ => vec![],
1344 })
1345 .collect();
1346
1347 Ok(Response::Array(result))
1348 }
1349 Some(Frame::Null) => Ok(Response::Null), // nil reply usually means no error
1350 Some(Frame::Boolean(data)) => {
1351 if data {
1352 Ok(Response::Simple("true".into()))
1353 } else {
1354 Ok(Response::Simple("false".into()))
1355 }
1356 }
1357 Some(Frame::Double(data)) => Ok(Response::Simple(data.to_string().into_bytes())),
1358 Some(Frame::BulkError(data)) => Ok(Response::Error(RedisError::Other(anyhow!(
1359 String::from_utf8_lossy(&data).to_string()
1360 )))),
1361 Some(Frame::Map(data)) => {
1362 let result: HashMap<String, Vec<u8>> = data
1363 .into_iter()
1364 .filter_map(|(key, value)| {
1365 let key = match key {
1366 Frame::BulkString(data) => String::from_utf8(data.to_vec()).ok(),
1367 Frame::SimpleString(data) => Some(data),
1368 Frame::Integer(data) => Some(data.to_string()),
1369 _ => None,
1370 };
1371
1372 let value = match value {
1373 Frame::BulkString(data) => Some(data.to_vec()),
1374 Frame::SimpleString(data) => Some(data.into_bytes()),
1375 Frame::Integer(data) => Some(data.to_string().into_bytes()),
1376 _ => None,
1377 };
1378
1379 match (key, value) {
1380 (Some(k), Some(v)) => Some((k, v)),
1381 _ => None,
1382 }
1383 })
1384 .collect();
1385
1386 Ok(Response::Map(result))
1387 }
1388 // todo: array response needed here
1389 Some(_) => unimplemented!(""),
1390 None => Err(RedisError::Unknown),
1391 }
1392 }
1393}