1use crate::{Command, CommandList, DataType, Error, Result, Value};
2use futures::{future::BoxFuture, FutureExt};
3
4#[cfg(feature = "runtime_async_std")]
5use async_std::{
6 io,
7 net::{TcpStream, ToSocketAddrs},
8 sync::Mutex,
9};
10#[cfg(feature = "runtime_async_std")]
11use futures::{AsyncReadExt, AsyncWriteExt};
12
13#[cfg(feature = "runtime_tokio")]
14use tokio::{
15 io::{self, AsyncReadExt, AsyncWriteExt},
16 net::{TcpStream, ToSocketAddrs},
17 sync::Mutex,
18};
19
20use std::sync::Arc;
21
22pub mod builder;
23pub mod scan;
24pub mod stream;
25pub use scan::{HScanBuilder, HScanStream, ScanBuilder, ScanStream};
26pub use stream::{Message, MessageStream, PMessage, PMessageStream, ResponseStream};
27
28use builder::MSetBuilder;
29
30#[cfg(test)]
31mod test;
32
33macro_rules! check_slice_not_empty {
34 ($slice:ident) => {
35 if $slice.is_empty() {
36 return Err(Error::EmptySlice);
37 }
38 };
39}
40
41async fn read_until(r: &mut TcpStream, byte: u8) -> io::Result<Vec<u8>> {
42 let mut buffer = Vec::new();
43 let mut single = [0; 1];
44 loop {
45 r.read(&mut single).await?;
46 buffer.push(single[0]);
47 if single[0] == byte {
48 return Ok(buffer);
49 }
50 }
51}
52
53#[derive(Clone, Debug)]
59pub struct Connection {
60 pub(crate) stream: Arc<Mutex<TcpStream>>,
61}
62
63impl Connection {
64 pub async fn connect<A>(address: A) -> Result<Self>
66 where
67 A: ToSocketAddrs,
68 {
69 let stream = Arc::new(Mutex::new(
70 TcpStream::connect(address)
71 .await
72 .map_err(Error::ConnectionFailed)?,
73 ));
74
75 Ok(Self { stream })
76 }
77
78 pub async fn connect_and_auth<A, P>(address: A, password: P) -> Result<Self>
80 where
81 A: ToSocketAddrs,
82 P: AsRef<[u8]>,
83 {
84 let mut out = Self::connect(address).await?;
85 out.run_command(Command::new("AUTH").arg(&password)).await?;
86
87 Ok(out)
88 }
89
90 async fn parse_simple_value(buf: &[u8]) -> Result<Value> {
91 match buf[0] {
92 b'+' => {
93 if buf == b"+OK\r\n" {
94 Ok(Value::Ok)
95 } else {
96 Ok(Value::String(buf[1..].into()))
97 }
98 }
99 b'-' => Err(Error::RedisError(
100 String::from_utf8_lossy(&buf[1..]).to_string(),
101 )),
102 b':' => {
103 let string = String::from_utf8_lossy(&buf[1..]);
104 let num = string.trim().parse::<isize>().unwrap();
105 Ok(Value::Integer(num))
106 }
107 _ => Err(Error::UnexpectedResponse(
108 String::from_utf8_lossy(buf).to_string(),
109 )),
110 }
111 }
112
113 async fn parse_string(start: &[u8], stream: &mut TcpStream) -> Result<Value> {
114 if start == b"$-1\r\n" {
115 Ok(Value::Nil)
116 } else {
117 let num = String::from_utf8_lossy(&start[1..])
118 .trim()
119 .parse::<usize>()
120 .unwrap();
121 let mut buf = vec![0u8; num + 2]; stream.read_exact(&mut buf).await?;
123
124 buf.pop(); buf.pop();
126 Ok(Value::String(buf))
127 }
128 }
129
130 fn parse_array<'a>(start: &'a [u8], stream: &'a mut TcpStream) -> BoxFuture<'a, Result<Value>> {
131 async move {
132 let num_parsed = String::from_utf8_lossy(&start[1..])
133 .trim()
134 .parse::<i32>()
135 .unwrap();
136
137 if num_parsed < 0 {
139 return Ok(Value::Nil);
140 }
141
142 let num = num_parsed as usize;
143 let mut values = Vec::with_capacity(num);
144
145 for _ in 0..num {
146 let buf = read_until(stream, b'\n').await?;
147 match buf[0] {
148 b'+' | b'-' | b':' => values.push(Self::parse_simple_value(&buf).await?),
149 b'$' => values.push(Self::parse_string(&buf, stream).await?),
150 b'*' => values.push(Self::parse_array(&buf, stream).await?),
151 _ => {
152 return Err(Error::UnexpectedResponse(
153 String::from_utf8_lossy(&buf).to_string(),
154 ))
155 }
156 }
157 }
158
159 Ok(Value::Array(values))
160 }
161 .boxed()
162 }
163
164 pub(crate) async fn read_value(mut stream: &mut TcpStream) -> Result<Value> {
166 let buf = read_until(&mut stream, b'\n').await?;
167 match buf[0] {
168 b'+' | b'-' | b':' => Self::parse_simple_value(&buf).await,
169 b'$' => Self::parse_string(&buf, &mut stream).await,
170 b'*' => Self::parse_array(&buf, &mut stream).await,
171 _ => Err(Error::UnexpectedResponse(
172 String::from_utf8_lossy(&buf).to_string(),
173 )),
174 }
175 }
176
177 #[inline]
179 pub async fn run_command(&mut self, command: Command<'_>) -> Result<Value> {
180 let mut buffer = Vec::new();
181
182 self.run_command_with_buffer(command, &mut buffer).await
183 }
184
185 pub async fn run_command_with_buffer(
188 &mut self,
189 command: Command<'_>,
190 buffer: &mut Vec<u8>,
191 ) -> Result<Value> {
192 let mut stream = self.stream.lock().await;
193 command.serialize(buffer);
194 stream.write_all(&buffer).await?;
195
196 Ok(Self::read_value(&mut stream).await?)
197 }
198
199 #[inline]
201 pub async fn run_commands(&mut self, command: CommandList<'_>) -> Result<ResponseStream> {
202 let mut buffer = Vec::new();
203 self.run_commands_with_buffer(command, &mut buffer).await
204 }
205
206 pub async fn run_commands_with_buffer(
210 &mut self,
211 command: CommandList<'_>,
212 buf: &mut Vec<u8>,
213 ) -> Result<ResponseStream> {
214 buf.clear();
215 let mut lock = self.stream.lock().await;
216 let command_count = command.command_count();
217 command.serialize(buf);
218 lock.write_all(&buf).await?;
219 buf.clear();
220
221 Ok(ResponseStream::new(command_count, self.stream.clone()))
222 }
223
224 pub async fn hdel<K, F>(&mut self, key: K, field: F) -> Result<bool>
228 where
229 K: AsRef<[u8]>,
230 F: AsRef<[u8]>,
231 {
232 self.run_command(Command::new("HDEL").arg(&key).arg(&field))
233 .await
234 .map(|v| v.unwrap_bool())
235 }
236
237 pub async fn hdel_slice<K, F>(&mut self, key: K, fields: &[F]) -> Result<isize>
241 where
242 K: AsRef<[u8]>,
243 F: AsRef<[u8]>,
244 {
245 check_slice_not_empty!(fields);
246
247 self.run_command(Command::new("HDEL").arg(&key).args(&fields))
248 .await
249 .map(|v| v.unwrap_integer())
250 }
251
252 pub async fn hexists<K, F>(&mut self, key: K, field: F) -> Result<bool>
254 where
255 K: AsRef<[u8]>,
256 F: AsRef<[u8]>,
257 {
258 self.run_command(Command::new("HEXISTS").arg(&key).arg(&field))
259 .await
260 .map(|v| v.unwrap_bool())
261 }
262
263 pub async fn hget<K, F>(&mut self, key: K, field: F) -> Result<Option<Vec<u8>>>
265 where
266 K: AsRef<[u8]>,
267 F: AsRef<[u8]>,
268 {
269 self.run_command(Command::new("HGET").arg(&key).arg(&field))
270 .await
271 .map(|v| v.optional_string())
272 }
273
274 pub async fn hset<K, F, V>(&mut self, key: K, field: F, value: V) -> Result<isize>
278 where
279 K: AsRef<[u8]>,
280 F: AsRef<[u8]>,
281 V: AsRef<[u8]>,
282 {
283 self.run_command(Command::new("HSET").arg(&key).arg(&field).arg(&value))
284 .await
285 .map(|v| v.unwrap_integer())
286 }
287
288 pub async fn hsetnx<K, F, V>(&mut self, key: K, field: F, value: V) -> Result<bool>
293 where
294 K: AsRef<[u8]>,
295 F: AsRef<[u8]>,
296 V: AsRef<[u8]>,
297 {
298 self.run_command(Command::new("HSETNX").arg(&key).arg(&field).arg(&value))
299 .await
300 .map(|v| v.unwrap_bool())
301 }
302
303 pub async fn hset_many<K>(&mut self, key: K, builder: MSetBuilder<'_>) -> Result<isize>
308 where
309 K: AsRef<[u8]>,
310 {
311 let mut command = Command::new("HSET").arg(&key);
312 command.append_msetbuilder(&builder);
313
314 self.run_command(command).await.map(|v| v.unwrap_integer())
315 }
316
317 pub async fn hincrby<K, F>(&mut self, key: K, field: F, val: isize) -> Result<isize>
321 where
322 K: AsRef<[u8]>,
323 F: AsRef<[u8]>,
324 {
325 let val = val.to_string();
326 self.run_command(Command::new("HINCRBY").arg(&key).arg(&field).arg(&val))
327 .await
328 .map(|v| v.unwrap_integer())
329 }
330
331 pub async fn hincrbyfloat<K, F>(&mut self, key: K, field: F, val: f64) -> Result<f64>
335 where
336 K: AsRef<[u8]>,
337 F: AsRef<[u8]>,
338 {
339 let val = val.to_string();
340 let command = Command::new("HINCRBYFLOAT").arg(&key).arg(&field).arg(&val);
341 let result = self.run_command(command).await?.unwrap_string();
342 Ok(String::from_utf8_lossy(&result).parse::<f64>().unwrap())
343 }
344
345 pub async fn hkeys<K>(&mut self, key: K) -> Result<Vec<Vec<u8>>>
347 where
348 K: AsRef<[u8]>,
349 {
350 self.run_command(Command::new("HKEYS").arg(&key))
351 .await
352 .map(|v| v.unwrap_string_array())
353 }
354
355 pub async fn hlen<K>(&mut self, key: K) -> Result<isize>
357 where
358 K: AsRef<[u8]>,
359 {
360 self.run_command(Command::new("HLEN").arg(&key))
361 .await
362 .map(|v| v.unwrap_integer())
363 }
364
365 pub async fn hstrlen<K, F>(&mut self, key: K, field: F) -> Result<isize>
367 where
368 K: AsRef<[u8]>,
369 F: AsRef<[u8]>,
370 {
371 self.run_command(Command::new("HSTRLEN").arg(&key).arg(&field))
372 .await
373 .map(|v| v.unwrap_integer())
374 }
375
376 pub async fn hvals<K>(&mut self, key: K) -> Result<Vec<Value>>
378 where
379 K: AsRef<[u8]>,
380 {
381 self.run_command(Command::new("HVALS").arg(&key))
382 .await
383 .map(|v| v.unwrap_array())
384 }
385
386 pub async fn ping(&mut self) -> Result<()> {
388 self.run_command(Command::new("PING")).await.map(|_| ())
389 }
390
391 pub async fn subscribe<K>(mut self, channels: &[K]) -> Result<stream::MessageStream>
393 where
394 K: AsRef<[u8]>,
395 {
396 let command = Command::new("SUBSCRIBE").args(channels);
397
398 let _ = self.run_command(command).await?;
400 {
401 let mut stream = self.stream.lock().await;
402 for _ in 0..channels.len() - 1 {
403 let response = Self::read_value(&mut stream).await?;
404 assert_eq!(
405 response.unwrap_array()[0],
406 Value::String("subscribe".into())
407 );
408 }
409 }
410
411 Ok(stream::MessageStream::new(self))
412 }
413
414 pub async fn psubscribe<K>(mut self, patterns: &[K]) -> Result<stream::PMessageStream>
417 where
418 K: AsRef<[u8]>,
419 {
420 let command = Command::new("PSUBSCRIBE").args(patterns);
421
422 let _ = self.run_command(command).await?;
424 {
425 let mut stream = self.stream.lock().await;
426 for _ in 0..patterns.len() - 1 {
427 let response = Self::read_value(&mut stream).await?;
428 assert_eq!(
429 response.unwrap_array()[0],
430 Value::String("psubscribe".into())
431 );
432 }
433 }
434
435 Ok(stream::PMessageStream::new(self))
436 }
437
438 pub async fn publish<C, M>(&mut self, channel: C, message: M) -> Result<isize>
442 where
443 C: AsRef<[u8]>,
444 M: AsRef<[u8]>,
445 {
446 let command = Command::new("PUBLISH").arg(&channel).arg(&message);
447 self.run_command(command).await.map(|i| i.unwrap_integer())
448 }
449
450 pub async fn set<K, D>(&mut self, key: K, value: D) -> Result<()>
452 where
453 K: AsRef<[u8]>,
454 D: AsRef<[u8]>,
455 {
456 let command = Command::new("SET").arg(&key).arg(&value);
457
458 self.run_command(command).await.map(|_| ())
459 }
460
461 pub async fn set_and_expire_seconds<K, D>(
463 &mut self,
464 key: K,
465 data: D,
466 seconds: u32,
467 ) -> Result<()>
468 where
469 K: AsRef<[u8]>,
470 D: AsRef<[u8]>,
471 {
472 let seconds = seconds.to_string();
473 let command = Command::new("SET")
474 .arg(&key)
475 .arg(&data)
476 .arg(b"EX")
477 .arg(&seconds);
478
479 self.run_command(command).await.map(|_| ())
480 }
481
482 pub async fn set_and_expire_ms<K, D>(
484 &mut self,
485 key: K,
486 data: D,
487 milliseconds: u32,
488 ) -> Result<()>
489 where
490 K: AsRef<[u8]>,
491 D: AsRef<[u8]>,
492 {
493 let milliseconds = milliseconds.to_string();
494 let command = Command::new("SET")
495 .arg(&key)
496 .arg(&data)
497 .arg(b"PX")
498 .arg(&milliseconds);
499
500 self.run_command(command).await.map(|_| ())
501 }
502
503 pub async fn expire_seconds<K>(&mut self, key: K, seconds: u32) -> Result<isize>
505 where
506 K: AsRef<[u8]>,
507 {
508 let seconds = seconds.to_string();
509 let command = Command::new("EXPIRE").arg(&key).arg(&seconds);
510
511 self.run_command(command).await.map(|i| i.unwrap_integer())
512 }
513
514 pub async fn expire_ms<K>(&mut self, key: K, seconds: u32) -> Result<isize>
516 where
517 K: AsRef<[u8]>,
518 {
519 let seconds = seconds.to_string();
520 let command = Command::new("PEXPIRE").arg(&key).arg(&seconds);
521
522 self.run_command(command).await.map(|i| i.unwrap_integer())
523 }
524
525 pub async fn expire_at_seconds<K>(&mut self, key: K, timestamp: u64) -> Result<isize>
527 where
528 K: AsRef<[u8]>,
529 {
530 let timestamp = timestamp.to_string();
531 let command = Command::new("EXPIREAT").arg(&key).arg(×tamp);
532
533 self.run_command(command).await.map(|i| i.unwrap_integer())
534 }
535
536 pub async fn expire_at_ms<K>(&mut self, key: K, timestamp: u64) -> Result<isize>
538 where
539 K: AsRef<[u8]>,
540 {
541 let timestamp = timestamp.to_string();
542 let command = Command::new("PEXPIREAT").arg(&key).arg(×tamp);
543
544 self.run_command(command).await.map(|i| i.unwrap_integer())
545 }
546
547 pub async fn del<K>(&mut self, key: K) -> Result<bool>
551 where
552 K: AsRef<[u8]>,
553 {
554 let command = Command::new("DEL").arg(&key);
555 self.run_command(command).await.map(|i| i.unwrap_bool())
556 }
557
558 pub async fn del_slice<K>(&mut self, keys: &[K]) -> Result<isize>
562 where
563 K: AsRef<[u8]>,
564 {
565 check_slice_not_empty!(keys);
566 let command = Command::new("DEL").args(&keys);
567 self.run_command(command).await.map(|i| i.unwrap_integer())
568 }
569
570 pub async fn get<K>(&mut self, key: K) -> Result<Option<Vec<u8>>>
572 where
573 K: AsRef<[u8]>,
574 {
575 let command = Command::new("GET").arg(&key);
576
577 Ok(self.run_command(command).await?.optional_string())
578 }
579
580 pub async fn lpush<K, V>(&mut self, list: K, value: V) -> Result<isize>
584 where
585 K: AsRef<[u8]>,
586 V: AsRef<[u8]>,
587 {
588 let command = Command::new("LPUSH").arg(&list).arg(&value);
589
590 Ok(self.run_command(command).await?.unwrap_integer())
591 }
592
593 pub async fn lpush_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
595 where
596 K: AsRef<[u8]>,
597 V: AsRef<[u8]>,
598 {
599 check_slice_not_empty!(values);
600 let command = Command::new("LPUSH").arg(&key).args(values);
601
602 Ok(self.run_command(command).await?.unwrap_integer())
603 }
604
605 pub async fn rpush<K, V>(&mut self, list: K, value: V) -> Result<isize>
609 where
610 K: AsRef<[u8]>,
611 V: AsRef<[u8]>,
612 {
613 let command = Command::new("RPUSH").arg(&list).arg(&value);
614
615 Ok(self.run_command(command).await?.unwrap_integer())
616 }
617
618 pub async fn rpush_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
620 where
621 K: AsRef<[u8]>,
622 V: AsRef<[u8]>,
623 {
624 check_slice_not_empty!(values);
625 let command = Command::new("RPUSH").arg(&key).args(values);
626
627 Ok(self.run_command(command).await?.unwrap_integer())
628 }
629
630 pub async fn lpop<K>(&mut self, list: K) -> Result<Option<Vec<u8>>>
634 where
635 K: AsRef<[u8]>,
636 {
637 let command = Command::new("LPOP").arg(&list);
638
639 Ok(self.run_command(command).await?.optional_string())
640 }
641
642 pub async fn rpop<K>(&mut self, list: K) -> Result<Option<Vec<u8>>>
646 where
647 K: AsRef<[u8]>,
648 {
649 let command = Command::new("RPOP").arg(&list);
650
651 Ok(self.run_command(command).await?.optional_string())
652 }
653
654 pub async fn blpop<K>(
661 &mut self,
662 lists: &[K],
663 timeout: u32,
664 ) -> Result<Option<(Vec<u8>, Vec<u8>)>>
665 where
666 K: AsRef<[u8]>,
667 {
668 self.blpop_brpop(lists, timeout, "BLPOP").await
669 }
670
671 pub async fn brpop<K>(
678 &mut self,
679 lists: &[K],
680 timeout: u32,
681 ) -> Result<Option<(Vec<u8>, Vec<u8>)>>
682 where
683 K: AsRef<[u8]>,
684 {
685 self.blpop_brpop(lists, timeout, "BRPOP").await
686 }
687
688 async fn blpop_brpop<K>(
690 &mut self,
691 lists: &[K],
692 timeout: u32,
693 redis_cmd: &str,
694 ) -> Result<Option<(Vec<u8>, Vec<u8>)>>
695 where
696 K: AsRef<[u8]>,
697 {
698 let timeout = timeout.to_string();
699 let command = Command::new(redis_cmd).args(&lists).arg(&timeout);
700 match self.run_command(command).await? {
701 Value::Array(values) => {
702 let vlen = values.len();
703 if vlen == 2 {
704 let mut v = values.into_iter().map(|s| s.unwrap_string());
705 return Ok(Some(
706 (v.next().unwrap(), v.next().unwrap()), ));
708 }
709 Err(Error::UnexpectedResponse(format!(
710 "{}: wrong number of elements received: {}",
711 redis_cmd, vlen
712 )))
713 }
714 Value::Nil => Ok(None),
715 other => Err(Error::UnexpectedResponse(format!(
716 "{}: {:?}",
717 redis_cmd, other
718 ))),
719 }
720 }
721
722 pub async fn lrange<K>(&mut self, list: K, from: isize, to: isize) -> Result<Vec<Vec<u8>>>
725 where
726 K: AsRef<[u8]>,
727 {
728 let from = from.to_string();
729 let to = to.to_string();
730 let command = Command::new("LRANGE").arg(&list).arg(&from).arg(&to);
731
732 Ok(self
733 .run_command(command)
734 .await?
735 .unwrap_array()
736 .into_iter()
737 .map(|s| s.unwrap_string())
738 .collect())
739 }
740
741 pub async fn llen<K>(&mut self, list: K) -> Result<Option<isize>>
743 where
744 K: AsRef<[u8]>,
745 {
746 let command = Command::new("LLEN").arg(&list);
747 Ok(self.run_command(command).await?.optional_integer())
748 }
749
750 pub async fn lset<K, V>(&mut self, list: K, index: usize, value: V) -> Result<()>
752 where
753 K: AsRef<[u8]>,
754 V: AsRef<[u8]>,
755 {
756 let index = index.to_string();
757 let command = Command::new("LSET").arg(&list).arg(&index).arg(&value);
758
759 self.run_command(command).await?;
760 Ok(())
761 }
762
763 pub async fn ltrim<K>(&mut self, list: K, start: usize, stop: usize) -> Result<()>
765 where
766 K: AsRef<[u8]>,
767 {
768 let start = start.to_string();
769 let stop = stop.to_string();
770 let command = Command::new("LTRIM").arg(&list).arg(&start).arg(&stop);
771 self.run_command(command).await?;
772
773 Ok(())
774 }
775
776 pub async fn incr<K>(&mut self, key: K) -> Result<isize>
780 where
781 K: AsRef<[u8]>,
782 {
783 let command = Command::new("INCR").arg(&key);
784 Ok(self.run_command(command).await?.unwrap_integer())
785 }
786
787 pub async fn incrby<K>(&mut self, key: K, val: isize) -> Result<isize>
791 where
792 K: AsRef<[u8]>,
793 {
794 let val = val.to_string();
795 let command = Command::new("INCRBY").arg(&key).arg(&val);
796 Ok(self.run_command(command).await?.unwrap_integer())
797 }
798
799 pub async fn incrbyfloat<K>(&mut self, key: K, val: f64) -> Result<f64>
803 where
804 K: AsRef<[u8]>,
805 {
806 let val = val.to_string();
807 let command = Command::new("INCRBYFLOAT").arg(&key).arg(&val);
808 let result = self.run_command(command).await?.unwrap_string();
809 Ok(String::from_utf8_lossy(&result).parse::<f64>().unwrap())
810 }
811
812 pub async fn decr<K>(&mut self, key: K) -> Result<isize>
816 where
817 K: AsRef<[u8]>,
818 {
819 let command = Command::new("DECR").arg(&key);
820 Ok(self.run_command(command).await?.unwrap_integer())
821 }
822
823 pub async fn decrby<K>(&mut self, key: K, val: isize) -> Result<isize>
827 where
828 K: AsRef<[u8]>,
829 {
830 let val = val.to_string();
831 let command = Command::new("DECRBY").arg(&key).arg(&val);
832 Ok(self.run_command(command).await?.unwrap_integer())
833 }
834
835 pub async fn append<K, V>(&mut self, key: K, val: V) -> Result<isize>
839 where
840 K: AsRef<[u8]>,
841 V: AsRef<[u8]>,
842 {
843 let command = Command::new("APPEND").arg(&key).arg(&val);
844 Ok(self.run_command(command).await?.unwrap_integer())
845 }
846
847 pub async fn mget<K>(&mut self, keys: &[K]) -> Result<Vec<Option<Vec<u8>>>>
849 where
850 K: AsRef<[u8]>,
851 {
852 let command = Command::new("MGET").args(&keys);
853 let result = self.run_command(command).await?.unwrap_array();
854 let output: Vec<Option<Vec<u8>>> =
855 result.into_iter().map(|r| r.optional_string()).collect();
856 Ok(output)
857 }
858
859 pub async fn mset(&mut self, builder: MSetBuilder<'_>) -> Result<()>
887where {
888 let mut command = Command::new("MSET");
889 command.append_msetbuilder(&builder);
890 self.run_command(command).await?;
891 Ok(())
892 }
893
894 pub async fn exists<K>(&mut self, key: K) -> Result<bool>
896 where
897 K: AsRef<[u8]>,
898 {
899 let command = Command::new("EXISTS").arg(&key);
900 Ok(self.run_command(command).await? == Value::Integer(1))
901 }
902
903 pub async fn sadd<K, V>(&mut self, key: K, value: V) -> Result<bool>
905 where
906 K: AsRef<[u8]>,
907 V: AsRef<[u8]>,
908 {
909 let command = Command::new("SADD").arg(&key).arg(&value);
910
911 Ok(self.run_command(command).await?.unwrap_bool())
912 }
913
914 pub async fn sadd_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
916 where
917 K: AsRef<[u8]>,
918 V: AsRef<[u8]>,
919 {
920 let command = Command::new("SADD").arg(&key).args(&values);
921
922 Ok(self.run_command(command).await?.unwrap_integer())
923 }
924
925 pub async fn smembers<K>(&mut self, key: K) -> Result<Vec<Vec<u8>>>
927 where
928 K: AsRef<[u8]>,
929 {
930 let command = Command::new("SMEMBERS").arg(&key);
931
932 Ok(self
933 .run_command(command)
934 .await?
935 .unwrap_array()
936 .into_iter()
937 .map(|s| s.unwrap_string())
938 .collect())
939 }
940
941 pub async fn sismember<K, V>(&mut self, key: K, value: V) -> Result<bool>
943 where
944 K: AsRef<[u8]>,
945 V: AsRef<[u8]>,
946 {
947 let command = Command::new("SISMEMBER").arg(&key).arg(&value);
948
949 Ok(self.run_command(command).await?.unwrap_bool())
950 }
951
952 pub fn sscan<'a, K>(&'a mut self, key: &'a K) -> ScanBuilder
978 where
979 K: AsRef<[u8]>,
980 {
981 ScanBuilder::new("SSCAN", Some(key.as_ref()), self)
982 }
983
984 pub fn scan(&mut self) -> ScanBuilder {
1006 ScanBuilder::new("SCAN", None, self)
1007 }
1008
1009 pub fn hscan<'a, K>(&'a mut self, key: &'a K) -> HScanBuilder<'a>
1033 where
1034 K: AsRef<[u8]>,
1035 {
1036 HScanBuilder::new(key.as_ref(), self)
1037 }
1038
1039 pub async fn key_type<K>(&mut self, key: K) -> Result<Option<DataType>>
1041 where
1042 K: AsRef<[u8]>,
1043 {
1044 let command = Command::new("TYPE").arg(&key);
1045 let result = self.run_command(command).await?.unwrap_string();
1046 match result.as_slice() {
1047 b"string\r\n" => Ok(Some(DataType::String)),
1048 b"list\r\n" => Ok(Some(DataType::List)),
1049 b"set\r\n" => Ok(Some(DataType::Set)),
1050 b"zset\r\n" => Ok(Some(DataType::ZSet)),
1051 b"hash\r\n" => Ok(Some(DataType::Hash)),
1052 b"stream\r\n" => Ok(Some(DataType::Stream)),
1053 b"none\r\n" => Ok(None),
1054 _ => Err(Error::UnexpectedResponse(
1055 String::from_utf8_lossy(&result).to_string(),
1056 )),
1057 }
1058 }
1059
1060 pub async fn scard<K>(&mut self, key: &K) -> Result<isize>
1062 where
1063 K: AsRef<[u8]>,
1064 {
1065 let command = Command::new("SCARD").arg(&key);
1066 Ok(self.run_command(command).await?.unwrap_integer())
1067 }
1068
1069 pub async fn smove<S, D, M>(&mut self, source: S, destination: D, member: M) -> Result<bool>
1073 where
1074 S: AsRef<[u8]>,
1075 M: AsRef<[u8]>,
1076 D: AsRef<[u8]>,
1077 {
1078 let command = Command::new("SMOVE")
1079 .arg(&source)
1080 .arg(&destination)
1081 .arg(&member);
1082 Ok(self.run_command(command).await?.unwrap_bool())
1083 }
1084
1085 pub async fn srem<K, M>(&mut self, key: K, member: M) -> Result<bool>
1089 where
1090 K: AsRef<[u8]>,
1091 M: AsRef<[u8]>,
1092 {
1093 let command = Command::new("SREM").arg(&key).arg(&member);
1094 Ok(self.run_command(command).await?.unwrap_bool())
1095 }
1096
1097 pub async fn srem_slice<K, M>(&mut self, key: K, members: &[M]) -> Result<isize>
1101 where
1102 K: AsRef<[u8]>,
1103 M: AsRef<[u8]>,
1104 {
1105 check_slice_not_empty!(members);
1106 let command = Command::new("SREM").arg(&key).args(members);
1107 Ok(self.run_command(command).await?.unwrap_integer())
1108 }
1109
1110 pub async fn sdiff<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
1112 where
1113 S: AsRef<[u8]>,
1114 {
1115 check_slice_not_empty!(sets);
1116 let command = Command::new("SDIFF").args(sets);
1117 Ok(self.run_command(command).await?.unwrap_string_array())
1118 }
1119
1120 pub async fn sdiffstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
1124 where
1125 D: AsRef<[u8]>,
1126 S: AsRef<[u8]>,
1127 {
1128 check_slice_not_empty!(sets);
1129 let command = Command::new("SDIFFSTORE").arg(&destination).args(sets);
1130 Ok(self.run_command(command).await?.unwrap_integer())
1131 }
1132
1133 pub async fn sinter<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
1135 where
1136 S: AsRef<[u8]>,
1137 {
1138 check_slice_not_empty!(sets);
1139 let command = Command::new("SINTER").args(sets);
1140 Ok(self.run_command(command).await?.unwrap_string_array())
1141 }
1142
1143 pub async fn sinterstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
1147 where
1148 D: AsRef<[u8]>,
1149 S: AsRef<[u8]>,
1150 {
1151 check_slice_not_empty!(sets);
1152 let command = Command::new("SINTERSTORE").arg(&destination).args(sets);
1153 Ok(self.run_command(command).await?.unwrap_integer())
1154 }
1155
1156 pub async fn srandmember<S>(&mut self, set: S, count: isize) -> Result<Vec<Vec<u8>>>
1159 where
1160 S: AsRef<[u8]>,
1161 {
1162 let count = count.to_string();
1163 let command = Command::new("SRANDMEMBER").arg(&set).arg(&count);
1164 Ok(self.run_command(command).await?.unwrap_string_array())
1165 }
1166
1167 pub async fn spop<S>(&mut self, set: S, count: isize) -> Result<Vec<Vec<u8>>>
1169 where
1170 S: AsRef<[u8]>,
1171 {
1172 let count = count.to_string();
1173 let command = Command::new("SPOP").arg(&set).arg(&count);
1174 Ok(self.run_command(command).await?.unwrap_string_array())
1175 }
1176
1177 pub async fn sunion<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
1179 where
1180 S: AsRef<[u8]>,
1181 {
1182 check_slice_not_empty!(sets);
1183 let command = Command::new("SUNION").args(sets);
1184 Ok(self.run_command(command).await?.unwrap_string_array())
1185 }
1186
1187 pub async fn sunionstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
1191 where
1192 D: AsRef<[u8]>,
1193 S: AsRef<[u8]>,
1194 {
1195 check_slice_not_empty!(sets);
1196 let command = Command::new("SUNIONSTORE").arg(&destination).args(sets);
1197 Ok(self.run_command(command).await?.unwrap_integer())
1198 }
1199}