Skip to main content

kevy_client_async/
cmd_set.rs

1//! Async mirror of set commands on `kevy_client::Connection`.
2
3use std::io;
4
5use kevy_resp::Reply;
6
7use crate::codec::AsyncRespCodec;
8use crate::conn::AsyncConnection;
9use crate::reply::{array_to_bulks, string, unexpected, vec2, vec3};
10use crate::transport::AsyncTransport;
11
12impl AsyncConnection {
13    /// `SADD key member [member ...]`. Returns count of newly added.
14    pub async fn sadd(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
15        set_multi(self.codec_mut(), b"SADD", key, members).await
16    }
17
18    /// `SREM key member [member ...]`. Returns count actually removed.
19    pub async fn srem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
20        set_multi(self.codec_mut(), b"SREM", key, members).await
21    }
22
23    /// `SMEMBERS key`. Implementation-defined order; empty if absent.
24    pub async fn smembers(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
25        match self.codec_mut().request(&vec2(b"SMEMBERS", key)).await? {
26            Reply::Array(items) => array_to_bulks(items),
27            Reply::Error(e) => Err(io::Error::other(string(e))),
28            other => Err(unexpected(other)),
29        }
30    }
31
32    /// `SCARD key`. 0 if absent.
33    pub async fn scard(&mut self, key: &[u8]) -> io::Result<usize> {
34        match self.codec_mut().request(&vec2(b"SCARD", key)).await? {
35            Reply::Int(n) if n >= 0 => Ok(n as usize),
36            Reply::Error(e) => Err(io::Error::other(string(e))),
37            other => Err(unexpected(other)),
38        }
39    }
40
41    /// `SISMEMBER key member`. `false` if absent.
42    pub async fn sismember(&mut self, key: &[u8], member: &[u8]) -> io::Result<bool> {
43        match self
44            .codec_mut()
45            .request(&vec3(b"SISMEMBER", key, member))
46            .await?
47        {
48            Reply::Int(1) => Ok(true),
49            Reply::Int(0) => Ok(false),
50            Reply::Error(e) => Err(io::Error::other(string(e))),
51            other => Err(unexpected(other)),
52        }
53    }
54
55    /// `SINTER key [key ...]` — intersection of all sets.
56    pub async fn sinter(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
57        set_combine(self.codec_mut(), b"SINTER", keys).await
58    }
59
60    /// `SUNION key [key ...]` — union of all sets.
61    pub async fn sunion(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
62        set_combine(self.codec_mut(), b"SUNION", keys).await
63    }
64
65    /// `SDIFF key [key ...]` — first set minus the rest.
66    pub async fn sdiff(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
67        set_combine(self.codec_mut(), b"SDIFF", keys).await
68    }
69}
70
71pub(crate) async fn set_multi<T: AsyncTransport>(
72    c: &mut AsyncRespCodec<T>,
73    verb: &[u8],
74    key: &[u8],
75    members: &[&[u8]],
76) -> io::Result<usize> {
77    let mut args = Vec::with_capacity(members.len() + 2);
78    args.push(verb.to_vec());
79    args.push(key.to_vec());
80    args.extend(members.iter().map(|m| m.to_vec()));
81    match c.request(&args).await? {
82        Reply::Int(n) if n >= 0 => Ok(n as usize),
83        Reply::Error(e) => Err(io::Error::other(string(e))),
84        other => Err(unexpected(other)),
85    }
86}
87
88async fn set_combine<T: AsyncTransport>(
89    c: &mut AsyncRespCodec<T>,
90    verb: &[u8],
91    keys: &[&[u8]],
92) -> io::Result<Vec<Vec<u8>>> {
93    let mut args = Vec::with_capacity(keys.len() + 1);
94    args.push(verb.to_vec());
95    args.extend(keys.iter().map(|k| k.to_vec()));
96    match c.request(&args).await? {
97        Reply::Array(items) => array_to_bulks(items),
98        Reply::Error(e) => Err(io::Error::other(string(e))),
99        other => Err(unexpected(other)),
100    }
101}