use crate::{Command, CommandList, DataType, Error, Result, Value};
use futures::{future::BoxFuture, FutureExt};
#[cfg(feature = "runtime_async_std")]
use async_std::{
io,
net::{TcpStream, ToSocketAddrs},
sync::Mutex,
};
#[cfg(feature = "runtime_async_std")]
use futures::{AsyncReadExt, AsyncWriteExt};
#[cfg(feature = "runtime_tokio")]
use tokio::{
io::{self, AsyncReadExt, AsyncWriteExt},
net::{TcpStream, ToSocketAddrs},
sync::Mutex,
};
use std::sync::Arc;
pub mod builder;
pub mod scan;
pub mod stream;
pub use scan::{HScanBuilder, HScanStream, ScanBuilder, ScanStream};
pub use stream::{Message, MessageStream, PMessage, PMessageStream, ResponseStream};
use builder::MSetBuilder;
#[cfg(test)]
mod test;
macro_rules! check_slice_not_empty {
($slice:ident) => {
if $slice.is_empty() {
return Err(Error::EmptySlice);
}
};
}
async fn read_until(r: &mut TcpStream, byte: u8) -> io::Result<Vec<u8>> {
let mut buffer = Vec::new();
let mut single = [0; 1];
loop {
r.read(&mut single).await?;
buffer.push(single[0]);
if single[0] == byte {
return Ok(buffer);
}
}
}
#[derive(Clone, Debug)]
pub struct Connection {
pub(crate) stream: Arc<Mutex<TcpStream>>,
}
impl Connection {
pub async fn connect<A>(address: A) -> Result<Self>
where
A: ToSocketAddrs,
{
let stream = Arc::new(Mutex::new(
TcpStream::connect(address)
.await
.map_err(Error::ConnectionFailed)?,
));
Ok(Self { stream })
}
pub async fn connect_and_auth<A, P>(address: A, password: P) -> Result<Self>
where
A: ToSocketAddrs,
P: AsRef<[u8]>,
{
let mut out = Self::connect(address).await?;
out.run_command(Command::new("AUTH").arg(&password)).await?;
Ok(out)
}
async fn parse_simple_value(buf: &[u8]) -> Result<Value> {
match buf[0] {
b'+' => {
if buf == b"+OK\r\n" {
Ok(Value::Ok)
} else {
Ok(Value::String(buf[1..].into()))
}
}
b'-' => Err(Error::RedisError(
String::from_utf8_lossy(&buf[1..]).to_string(),
)),
b':' => {
let string = String::from_utf8_lossy(&buf[1..]);
let num = string.trim().parse::<isize>().unwrap();
Ok(Value::Integer(num))
}
_ => Err(Error::UnexpectedResponse(
String::from_utf8_lossy(buf).to_string(),
)),
}
}
async fn parse_string(start: &[u8], stream: &mut TcpStream) -> Result<Value> {
if start == b"$-1\r\n" {
Ok(Value::Nil)
} else {
let num = String::from_utf8_lossy(&start[1..])
.trim()
.parse::<usize>()
.unwrap();
let mut buf = vec![0u8; num + 2];
stream.read_exact(&mut buf).await?;
buf.pop();
buf.pop();
Ok(Value::String(buf))
}
}
fn parse_array<'a>(start: &'a [u8], stream: &'a mut TcpStream) -> BoxFuture<'a, Result<Value>> {
async move {
let num_parsed = String::from_utf8_lossy(&start[1..])
.trim()
.parse::<i32>()
.unwrap();
if num_parsed < 0 {
return Ok(Value::Nil);
}
let num = num_parsed as usize;
let mut values = Vec::with_capacity(num);
for _ in 0..num {
let buf = read_until(stream, b'\n').await?;
match buf[0] {
b'+' | b'-' | b':' => values.push(Self::parse_simple_value(&buf).await?),
b'$' => values.push(Self::parse_string(&buf, stream).await?),
b'*' => values.push(Self::parse_array(&buf, stream).await?),
_ => {
return Err(Error::UnexpectedResponse(
String::from_utf8_lossy(&buf).to_string(),
))
}
}
}
Ok(Value::Array(values))
}
.boxed()
}
pub(crate) async fn read_value(mut stream: &mut TcpStream) -> Result<Value> {
let buf = read_until(&mut stream, b'\n').await?;
match buf[0] {
b'+' | b'-' | b':' => Self::parse_simple_value(&buf).await,
b'$' => Self::parse_string(&buf, &mut stream).await,
b'*' => Self::parse_array(&buf, &mut stream).await,
_ => Err(Error::UnexpectedResponse(
String::from_utf8_lossy(&buf).to_string(),
)),
}
}
#[inline]
pub async fn run_command(&mut self, command: Command<'_>) -> Result<Value> {
let mut buffer = Vec::new();
self.run_command_with_buffer(command, &mut buffer).await
}
pub async fn run_command_with_buffer(
&mut self,
command: Command<'_>,
buffer: &mut Vec<u8>,
) -> Result<Value> {
let mut stream = self.stream.lock().await;
command.serialize(buffer);
stream.write_all(&buffer).await?;
Ok(Self::read_value(&mut stream).await?)
}
#[inline]
pub async fn run_commands(&mut self, command: CommandList<'_>) -> Result<ResponseStream> {
let mut buffer = Vec::new();
self.run_commands_with_buffer(command, &mut buffer).await
}
pub async fn run_commands_with_buffer(
&mut self,
command: CommandList<'_>,
buf: &mut Vec<u8>,
) -> Result<ResponseStream> {
buf.clear();
let mut lock = self.stream.lock().await;
let command_count = command.command_count();
command.serialize(buf);
lock.write_all(&buf).await?;
buf.clear();
Ok(ResponseStream::new(command_count, self.stream.clone()))
}
pub async fn hdel<K, F>(&mut self, key: K, field: F) -> Result<bool>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
{
self.run_command(Command::new("HDEL").arg(&key).arg(&field))
.await
.map(|v| v.unwrap_bool())
}
pub async fn hdel_slice<K, F>(&mut self, key: K, fields: &[F]) -> Result<isize>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
{
check_slice_not_empty!(fields);
self.run_command(Command::new("HDEL").arg(&key).args(&fields))
.await
.map(|v| v.unwrap_integer())
}
pub async fn hexists<K, F>(&mut self, key: K, field: F) -> Result<bool>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
{
self.run_command(Command::new("HEXISTS").arg(&key).arg(&field))
.await
.map(|v| v.unwrap_bool())
}
pub async fn hget<K, F>(&mut self, key: K, field: F) -> Result<Option<Vec<u8>>>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
{
self.run_command(Command::new("HGET").arg(&key).arg(&field))
.await
.map(|v| v.optional_string())
}
pub async fn hset<K, F, V>(&mut self, key: K, field: F, value: V) -> Result<isize>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
V: AsRef<[u8]>,
{
self.run_command(Command::new("HSET").arg(&key).arg(&field).arg(&value))
.await
.map(|v| v.unwrap_integer())
}
pub async fn hsetnx<K, F, V>(&mut self, key: K, field: F, value: V) -> Result<bool>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
V: AsRef<[u8]>,
{
self.run_command(Command::new("HSETNX").arg(&key).arg(&field).arg(&value))
.await
.map(|v| v.unwrap_bool())
}
pub async fn hset_many<K>(&mut self, key: K, builder: MSetBuilder<'_>) -> Result<isize>
where
K: AsRef<[u8]>,
{
let mut command = Command::new("HSET").arg(&key);
command.append_msetbuilder(&builder);
self.run_command(command).await.map(|v| v.unwrap_integer())
}
pub async fn hincrby<K, F>(&mut self, key: K, field: F, val: isize) -> Result<isize>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
{
let val = val.to_string();
self.run_command(Command::new("HINCRBY").arg(&key).arg(&field).arg(&val))
.await
.map(|v| v.unwrap_integer())
}
pub async fn hincrbyfloat<K, F>(&mut self, key: K, field: F, val: f64) -> Result<f64>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
{
let val = val.to_string();
let command = Command::new("HINCRBYFLOAT").arg(&key).arg(&field).arg(&val);
let result = self.run_command(command).await?.unwrap_string();
Ok(String::from_utf8_lossy(&result).parse::<f64>().unwrap())
}
pub async fn hkeys<K>(&mut self, key: K) -> Result<Vec<Vec<u8>>>
where
K: AsRef<[u8]>,
{
self.run_command(Command::new("HKEYS").arg(&key))
.await
.map(|v| v.unwrap_string_array())
}
pub async fn hlen<K>(&mut self, key: K) -> Result<isize>
where
K: AsRef<[u8]>,
{
self.run_command(Command::new("HLEN").arg(&key))
.await
.map(|v| v.unwrap_integer())
}
pub async fn hstrlen<K, F>(&mut self, key: K, field: F) -> Result<isize>
where
K: AsRef<[u8]>,
F: AsRef<[u8]>,
{
self.run_command(Command::new("HSTRLEN").arg(&key).arg(&field))
.await
.map(|v| v.unwrap_integer())
}
pub async fn hvals<K>(&mut self, key: K) -> Result<Vec<Value>>
where
K: AsRef<[u8]>,
{
self.run_command(Command::new("HVALS").arg(&key))
.await
.map(|v| v.unwrap_array())
}
pub async fn ping(&mut self) -> Result<()> {
self.run_command(Command::new("PING")).await.map(|_| ())
}
pub async fn subscribe<K>(mut self, channels: &[K]) -> Result<stream::MessageStream>
where
K: AsRef<[u8]>,
{
let command = Command::new("SUBSCRIBE").args(channels);
let _ = self.run_command(command).await?;
{
let mut stream = self.stream.lock().await;
for _ in 0..channels.len() - 1 {
let response = Self::read_value(&mut stream).await?;
assert_eq!(
response.unwrap_array()[0],
Value::String("subscribe".into())
);
}
}
Ok(stream::MessageStream::new(self))
}
pub async fn psubscribe<K>(mut self, patterns: &[K]) -> Result<stream::PMessageStream>
where
K: AsRef<[u8]>,
{
let command = Command::new("PSUBSCRIBE").args(patterns);
let _ = self.run_command(command).await?;
{
let mut stream = self.stream.lock().await;
for _ in 0..patterns.len() - 1 {
let response = Self::read_value(&mut stream).await?;
assert_eq!(
response.unwrap_array()[0],
Value::String("psubscribe".into())
);
}
}
Ok(stream::PMessageStream::new(self))
}
pub async fn publish<C, M>(&mut self, channel: C, message: M) -> Result<isize>
where
C: AsRef<[u8]>,
M: AsRef<[u8]>,
{
let command = Command::new("PUBLISH").arg(&channel).arg(&message);
self.run_command(command).await.map(|i| i.unwrap_integer())
}
pub async fn set<K, D>(&mut self, key: K, value: D) -> Result<()>
where
K: AsRef<[u8]>,
D: AsRef<[u8]>,
{
let command = Command::new("SET").arg(&key).arg(&value);
self.run_command(command).await.map(|_| ())
}
pub async fn set_and_expire_seconds<K, D>(
&mut self,
key: K,
data: D,
seconds: u32,
) -> Result<()>
where
K: AsRef<[u8]>,
D: AsRef<[u8]>,
{
let seconds = seconds.to_string();
let command = Command::new("SET")
.arg(&key)
.arg(&data)
.arg(b"EX")
.arg(&seconds);
self.run_command(command).await.map(|_| ())
}
pub async fn set_and_expire_ms<K, D>(
&mut self,
key: K,
data: D,
milliseconds: u32,
) -> Result<()>
where
K: AsRef<[u8]>,
D: AsRef<[u8]>,
{
let milliseconds = milliseconds.to_string();
let command = Command::new("SET")
.arg(&key)
.arg(&data)
.arg(b"PX")
.arg(&milliseconds);
self.run_command(command).await.map(|_| ())
}
pub async fn expire_seconds<K>(&mut self, key: K, seconds: u32) -> Result<isize>
where
K: AsRef<[u8]>,
{
let seconds = seconds.to_string();
let command = Command::new("EXPIRE").arg(&key).arg(&seconds);
self.run_command(command).await.map(|i| i.unwrap_integer())
}
pub async fn expire_ms<K>(&mut self, key: K, seconds: u32) -> Result<isize>
where
K: AsRef<[u8]>,
{
let seconds = seconds.to_string();
let command = Command::new("PEXPIRE").arg(&key).arg(&seconds);
self.run_command(command).await.map(|i| i.unwrap_integer())
}
pub async fn expire_at_seconds<K>(&mut self, key: K, timestamp: u64) -> Result<isize>
where
K: AsRef<[u8]>,
{
let timestamp = timestamp.to_string();
let command = Command::new("EXPIREAT").arg(&key).arg(×tamp);
self.run_command(command).await.map(|i| i.unwrap_integer())
}
pub async fn expire_at_ms<K>(&mut self, key: K, timestamp: u64) -> Result<isize>
where
K: AsRef<[u8]>,
{
let timestamp = timestamp.to_string();
let command = Command::new("PEXPIREAT").arg(&key).arg(×tamp);
self.run_command(command).await.map(|i| i.unwrap_integer())
}
pub async fn del<K>(&mut self, key: K) -> Result<bool>
where
K: AsRef<[u8]>,
{
let command = Command::new("DEL").arg(&key);
self.run_command(command).await.map(|i| i.unwrap_bool())
}
pub async fn del_slice<K>(&mut self, keys: &[K]) -> Result<isize>
where
K: AsRef<[u8]>,
{
check_slice_not_empty!(keys);
let command = Command::new("DEL").args(&keys);
self.run_command(command).await.map(|i| i.unwrap_integer())
}
pub async fn get<K>(&mut self, key: K) -> Result<Option<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let command = Command::new("GET").arg(&key);
Ok(self.run_command(command).await?.optional_string())
}
pub async fn lpush<K, V>(&mut self, list: K, value: V) -> Result<isize>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let command = Command::new("LPUSH").arg(&list).arg(&value);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn lpush_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
check_slice_not_empty!(values);
let command = Command::new("LPUSH").arg(&key).args(values);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn rpush<K, V>(&mut self, list: K, value: V) -> Result<isize>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let command = Command::new("RPUSH").arg(&list).arg(&value);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn rpush_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
check_slice_not_empty!(values);
let command = Command::new("RPUSH").arg(&key).args(values);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn lpop<K>(&mut self, list: K) -> Result<Option<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let command = Command::new("LPOP").arg(&list);
Ok(self.run_command(command).await?.optional_string())
}
pub async fn rpop<K>(&mut self, list: K) -> Result<Option<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let command = Command::new("RPOP").arg(&list);
Ok(self.run_command(command).await?.optional_string())
}
pub async fn blpop<K>(
&mut self,
lists: &[K],
timeout: u32,
) -> Result<Option<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
self.blpop_brpop(lists, timeout, "BLPOP").await
}
pub async fn brpop<K>(
&mut self,
lists: &[K],
timeout: u32,
) -> Result<Option<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
self.blpop_brpop(lists, timeout, "BRPOP").await
}
async fn blpop_brpop<K>(
&mut self,
lists: &[K],
timeout: u32,
redis_cmd: &str,
) -> Result<Option<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let timeout = timeout.to_string();
let command = Command::new(redis_cmd).args(&lists).arg(&timeout);
match self.run_command(command).await? {
Value::Array(values) => {
let vlen = values.len();
if vlen == 2 {
let mut v = values.into_iter().map(|s| s.unwrap_string());
return Ok(Some(
(v.next().unwrap(), v.next().unwrap()),
));
}
Err(Error::UnexpectedResponse(format!(
"{}: wrong number of elements received: {}",
redis_cmd, vlen
)))
}
Value::Nil => Ok(None),
other => Err(Error::UnexpectedResponse(format!(
"{}: {:?}",
redis_cmd, other
))),
}
}
pub async fn lrange<K>(&mut self, list: K, from: isize, to: isize) -> Result<Vec<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let from = from.to_string();
let to = to.to_string();
let command = Command::new("LRANGE").arg(&list).arg(&from).arg(&to);
Ok(self
.run_command(command)
.await?
.unwrap_array()
.into_iter()
.map(|s| s.unwrap_string())
.collect())
}
pub async fn llen<K>(&mut self, list: K) -> Result<Option<isize>>
where
K: AsRef<[u8]>,
{
let command = Command::new("LLEN").arg(&list);
Ok(self.run_command(command).await?.optional_integer())
}
pub async fn lset<K, V>(&mut self, list: K, index: usize, value: V) -> Result<()>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let index = index.to_string();
let command = Command::new("LSET").arg(&list).arg(&index).arg(&value);
self.run_command(command).await?;
Ok(())
}
pub async fn ltrim<K>(&mut self, list: K, start: usize, stop: usize) -> Result<()>
where
K: AsRef<[u8]>,
{
let start = start.to_string();
let stop = stop.to_string();
let command = Command::new("LTRIM").arg(&list).arg(&start).arg(&stop);
self.run_command(command).await?;
Ok(())
}
pub async fn incr<K>(&mut self, key: K) -> Result<isize>
where
K: AsRef<[u8]>,
{
let command = Command::new("INCR").arg(&key);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn incrby<K>(&mut self, key: K, val: isize) -> Result<isize>
where
K: AsRef<[u8]>,
{
let val = val.to_string();
let command = Command::new("INCRBY").arg(&key).arg(&val);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn incrbyfloat<K>(&mut self, key: K, val: f64) -> Result<f64>
where
K: AsRef<[u8]>,
{
let val = val.to_string();
let command = Command::new("INCRBYFLOAT").arg(&key).arg(&val);
let result = self.run_command(command).await?.unwrap_string();
Ok(String::from_utf8_lossy(&result).parse::<f64>().unwrap())
}
pub async fn decr<K>(&mut self, key: K) -> Result<isize>
where
K: AsRef<[u8]>,
{
let command = Command::new("DECR").arg(&key);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn decrby<K>(&mut self, key: K, val: isize) -> Result<isize>
where
K: AsRef<[u8]>,
{
let val = val.to_string();
let command = Command::new("DECRBY").arg(&key).arg(&val);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn append<K, V>(&mut self, key: K, val: V) -> Result<isize>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let command = Command::new("APPEND").arg(&key).arg(&val);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn mget<K>(&mut self, keys: &[K]) -> Result<Vec<Option<Vec<u8>>>>
where
K: AsRef<[u8]>,
{
let command = Command::new("MGET").args(&keys);
let result = self.run_command(command).await?.unwrap_array();
let output: Vec<Option<Vec<u8>>> =
result.into_iter().map(|r| r.optional_string()).collect();
Ok(output)
}
pub async fn mset(&mut self, builder: MSetBuilder<'_>) -> Result<()>
where {
let mut command = Command::new("MSET");
command.append_msetbuilder(&builder);
self.run_command(command).await?;
Ok(())
}
pub async fn exists<K>(&mut self, key: K) -> Result<bool>
where
K: AsRef<[u8]>,
{
let command = Command::new("EXISTS").arg(&key);
Ok(self.run_command(command).await? == Value::Integer(1))
}
pub async fn sadd<K, V>(&mut self, key: K, value: V) -> Result<bool>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let command = Command::new("SADD").arg(&key).arg(&value);
Ok(self.run_command(command).await?.unwrap_bool())
}
pub async fn sadd_slice<K, V>(&mut self, key: K, values: &[V]) -> Result<isize>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let command = Command::new("SADD").arg(&key).args(&values);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn smembers<K>(&mut self, key: K) -> Result<Vec<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let command = Command::new("SMEMBERS").arg(&key);
Ok(self
.run_command(command)
.await?
.unwrap_array()
.into_iter()
.map(|s| s.unwrap_string())
.collect())
}
pub async fn sismember<K, V>(&mut self, key: K, value: V) -> Result<bool>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let command = Command::new("SISMEMBER").arg(&key).arg(&value);
Ok(self.run_command(command).await?.unwrap_bool())
}
pub fn sscan<'a, K>(&'a mut self, key: &'a K) -> ScanBuilder
where
K: AsRef<[u8]>,
{
ScanBuilder::new("SSCAN", Some(key.as_ref()), self)
}
pub fn scan(&mut self) -> ScanBuilder {
ScanBuilder::new("SCAN", None, self)
}
pub fn hscan<'a, K>(&'a mut self, key: &'a K) -> HScanBuilder<'a>
where
K: AsRef<[u8]>,
{
HScanBuilder::new(key.as_ref(), self)
}
pub async fn key_type<K>(&mut self, key: K) -> Result<Option<DataType>>
where
K: AsRef<[u8]>,
{
let command = Command::new("TYPE").arg(&key);
let result = self.run_command(command).await?.unwrap_string();
match result.as_slice() {
b"string\r\n" => Ok(Some(DataType::String)),
b"list\r\n" => Ok(Some(DataType::List)),
b"set\r\n" => Ok(Some(DataType::Set)),
b"zset\r\n" => Ok(Some(DataType::ZSet)),
b"hash\r\n" => Ok(Some(DataType::Hash)),
b"stream\r\n" => Ok(Some(DataType::Stream)),
b"none\r\n" => Ok(None),
_ => Err(Error::UnexpectedResponse(
String::from_utf8_lossy(&result).to_string(),
)),
}
}
pub async fn scard<K>(&mut self, key: &K) -> Result<isize>
where
K: AsRef<[u8]>,
{
let command = Command::new("SCARD").arg(&key);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn smove<S, D, M>(&mut self, source: S, destination: D, member: M) -> Result<bool>
where
S: AsRef<[u8]>,
M: AsRef<[u8]>,
D: AsRef<[u8]>,
{
let command = Command::new("SMOVE")
.arg(&source)
.arg(&destination)
.arg(&member);
Ok(self.run_command(command).await?.unwrap_bool())
}
pub async fn srem<K, M>(&mut self, key: K, member: M) -> Result<bool>
where
K: AsRef<[u8]>,
M: AsRef<[u8]>,
{
let command = Command::new("SREM").arg(&key).arg(&member);
Ok(self.run_command(command).await?.unwrap_bool())
}
pub async fn srem_slice<K, M>(&mut self, key: K, members: &[M]) -> Result<isize>
where
K: AsRef<[u8]>,
M: AsRef<[u8]>,
{
check_slice_not_empty!(members);
let command = Command::new("SREM").arg(&key).args(members);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn sdiff<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
where
S: AsRef<[u8]>,
{
check_slice_not_empty!(sets);
let command = Command::new("SDIFF").args(sets);
Ok(self.run_command(command).await?.unwrap_string_array())
}
pub async fn sdiffstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
where
D: AsRef<[u8]>,
S: AsRef<[u8]>,
{
check_slice_not_empty!(sets);
let command = Command::new("SDIFFSTORE").arg(&destination).args(sets);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn sinter<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
where
S: AsRef<[u8]>,
{
check_slice_not_empty!(sets);
let command = Command::new("SINTER").args(sets);
Ok(self.run_command(command).await?.unwrap_string_array())
}
pub async fn sinterstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
where
D: AsRef<[u8]>,
S: AsRef<[u8]>,
{
check_slice_not_empty!(sets);
let command = Command::new("SINTERSTORE").arg(&destination).args(sets);
Ok(self.run_command(command).await?.unwrap_integer())
}
pub async fn srandmember<S>(&mut self, set: S, count: isize) -> Result<Vec<Vec<u8>>>
where
S: AsRef<[u8]>,
{
let count = count.to_string();
let command = Command::new("SRANDMEMBER").arg(&set).arg(&count);
Ok(self.run_command(command).await?.unwrap_string_array())
}
pub async fn spop<S>(&mut self, set: S, count: isize) -> Result<Vec<Vec<u8>>>
where
S: AsRef<[u8]>,
{
let count = count.to_string();
let command = Command::new("SPOP").arg(&set).arg(&count);
Ok(self.run_command(command).await?.unwrap_string_array())
}
pub async fn sunion<S>(&mut self, sets: &[S]) -> Result<Vec<Vec<u8>>>
where
S: AsRef<[u8]>,
{
check_slice_not_empty!(sets);
let command = Command::new("SUNION").args(sets);
Ok(self.run_command(command).await?.unwrap_string_array())
}
pub async fn sunionstore<D, S>(&mut self, destination: D, sets: &[S]) -> Result<isize>
where
D: AsRef<[u8]>,
S: AsRef<[u8]>,
{
check_slice_not_empty!(sets);
let command = Command::new("SUNIONSTORE").arg(&destination).args(sets);
Ok(self.run_command(command).await?.unwrap_integer())
}
}