Skip to main content

kevy_client_async/
cmd_string.rs

1//! Async mirror of the string + generic key commands on
2//! [`kevy_client::Connection`]. Each method here is a 1:1 translation
3//! of the corresponding blocking method: same name, same arguments,
4//! same return type modulo `.await`.
5
6use std::io;
7use std::time::Duration;
8
9use kevy_resp::Reply;
10
11use crate::conn::AsyncConnection;
12use crate::reply::{string, unexpected, vec2, vec3};
13
14impl AsyncConnection {
15    /// `SET key value`. Unconditional set; returns on `+OK`.
16    pub async fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
17        match self.codec_mut().request(&vec3(b"SET", key, value)).await? {
18            Reply::Simple(s) if s == b"OK" => Ok(()),
19            Reply::Error(e) => Err(io::Error::other(string(e))),
20            other => Err(unexpected(other)),
21        }
22    }
23
24    /// `GET key`. `None` if absent or expired.
25    pub async fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
26        match self.codec_mut().request(&vec2(b"GET", key)).await? {
27            Reply::Bulk(v) => Ok(Some(v)),
28            Reply::Nil => Ok(None),
29            Reply::Error(e) => Err(io::Error::other(string(e))),
30            other => Err(unexpected(other)),
31        }
32    }
33
34    /// `DEL key [key ...]`. Returns the count actually removed.
35    pub async fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
36        let mut args = Vec::with_capacity(keys.len() + 1);
37        args.push(b"DEL".to_vec());
38        args.extend(keys.iter().map(|k| k.to_vec()));
39        match self.codec_mut().request(&args).await? {
40            Reply::Int(n) if n >= 0 => Ok(n as usize),
41            Reply::Error(e) => Err(io::Error::other(string(e))),
42            other => Err(unexpected(other)),
43        }
44    }
45
46    /// `EXISTS key [key ...]`. Count of keys present (a key passed N
47    /// times counts N if it exists).
48    pub async fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
49        let mut args = Vec::with_capacity(keys.len() + 1);
50        args.push(b"EXISTS".to_vec());
51        args.extend(keys.iter().map(|k| k.to_vec()));
52        match self.codec_mut().request(&args).await? {
53            Reply::Int(n) if n >= 0 => Ok(n as usize),
54            Reply::Error(e) => Err(io::Error::other(string(e))),
55            other => Err(unexpected(other)),
56        }
57    }
58
59    /// `INCR key`. Returns post-increment value.
60    pub async fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
61        match self.codec_mut().request(&vec2(b"INCR", key)).await? {
62            Reply::Int(n) => Ok(n),
63            Reply::Error(e) => Err(io::Error::other(string(e))),
64            other => Err(unexpected(other)),
65        }
66    }
67
68    /// `INCRBY key delta`. Negative delta = `DECRBY`.
69    pub async fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
70        let args = vec![
71            b"INCRBY".to_vec(),
72            key.to_vec(),
73            delta.to_string().into_bytes(),
74        ];
75        match self.codec_mut().request(&args).await? {
76            Reply::Int(n) => Ok(n),
77            Reply::Error(e) => Err(io::Error::other(string(e))),
78            other => Err(unexpected(other)),
79        }
80    }
81
82    /// `PEXPIRE key ttl_ms`. Returns whether the key existed and got
83    /// a TTL set.
84    pub async fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
85        let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
86        let args = vec![b"PEXPIRE".to_vec(), key.to_vec(), ms.to_string().into_bytes()];
87        match self.codec_mut().request(&args).await? {
88            Reply::Int(1) => Ok(true),
89            Reply::Int(0) => Ok(false),
90            Reply::Error(e) => Err(io::Error::other(string(e))),
91            other => Err(unexpected(other)),
92        }
93    }
94
95    /// `PERSIST key`. Returns whether a TTL was removed.
96    pub async fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
97        match self.codec_mut().request(&vec2(b"PERSIST", key)).await? {
98            Reply::Int(1) => Ok(true),
99            Reply::Int(0) => Ok(false),
100            Reply::Error(e) => Err(io::Error::other(string(e))),
101            other => Err(unexpected(other)),
102        }
103    }
104
105    /// `PTTL key`. Ms remaining, -2 if no key, -1 if no TTL.
106    pub async fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
107        match self.codec_mut().request(&vec2(b"PTTL", key)).await? {
108            Reply::Int(n) => Ok(n),
109            Reply::Error(e) => Err(io::Error::other(string(e))),
110            other => Err(unexpected(other)),
111        }
112    }
113
114    /// `TYPE key`. Returns Redis-style type name (`"string"`, `"hash"`,
115    /// `"list"`, `"set"`, `"zset"`, or `"none"`).
116    pub async fn type_of(&mut self, key: &[u8]) -> io::Result<String> {
117        match self.codec_mut().request(&vec2(b"TYPE", key)).await? {
118            Reply::Simple(s) => Ok(string(s)),
119            Reply::Error(e) => Err(io::Error::other(string(e))),
120            other => Err(unexpected(other)),
121        }
122    }
123
124    /// `DBSIZE`. Total live keys at call time.
125    pub async fn dbsize(&mut self) -> io::Result<usize> {
126        match self.codec_mut().request(&[b"DBSIZE".to_vec()]).await? {
127            Reply::Int(n) if n >= 0 => Ok(n as usize),
128            Reply::Error(e) => Err(io::Error::other(string(e))),
129            other => Err(unexpected(other)),
130        }
131    }
132
133    /// `FLUSHALL`. WIPES the store. Named `flushall` not `flush` to
134    /// avoid colliding with `Write::flush`'s sync-to-disk meaning.
135    pub async fn flushall(&mut self) -> io::Result<()> {
136        match self.codec_mut().request(&[b"FLUSHALL".to_vec()]).await? {
137            Reply::Simple(s) if s == b"OK" => Ok(()),
138            Reply::Error(e) => Err(io::Error::other(string(e))),
139            other => Err(unexpected(other)),
140        }
141    }
142
143    /// `SET key value PX ttl_ms`. Atomic cache-with-expiry.
144    pub async fn set_with_ttl(
145        &mut self,
146        key: &[u8],
147        value: &[u8],
148        ttl: Duration,
149    ) -> io::Result<()> {
150        let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
151        let args = vec![
152            b"SET".to_vec(),
153            key.to_vec(),
154            value.to_vec(),
155            b"PX".to_vec(),
156            ms.to_string().into_bytes(),
157        ];
158        match self.codec_mut().request(&args).await? {
159            Reply::Simple(s) if s == b"OK" => Ok(()),
160            Reply::Error(e) => Err(io::Error::other(string(e))),
161            other => Err(unexpected(other)),
162        }
163    }
164
165    /// `MGET key [key ...]` — one reply per key, in order.
166    pub async fn mget(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Option<Vec<u8>>>> {
167        let mut args = Vec::with_capacity(keys.len() + 1);
168        args.push(b"MGET".to_vec());
169        args.extend(keys.iter().map(|k| k.to_vec()));
170        match self.codec_mut().request(&args).await? {
171            Reply::Array(items) => items
172                .into_iter()
173                .map(|r| match r {
174                    Reply::Bulk(v) => Ok(Some(v)),
175                    Reply::Nil => Ok(None),
176                    other => Err(unexpected(other)),
177                })
178                .collect(),
179            Reply::Error(e) => Err(io::Error::other(string(e))),
180            other => Err(unexpected(other)),
181        }
182    }
183
184    /// `MSET key value [key value ...]` — atomic multi-set.
185    pub async fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<()> {
186        let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
187        args.push(b"MSET".to_vec());
188        for (k, v) in pairs {
189            args.push(k.to_vec());
190            args.push(v.to_vec());
191        }
192        match self.codec_mut().request(&args).await? {
193            Reply::Simple(s) if s == b"OK" => Ok(()),
194            Reply::Error(e) => Err(io::Error::other(string(e))),
195            other => Err(unexpected(other)),
196        }
197    }
198
199    /// `PUBLISH channel message`. Returns subscriber-receive count.
200    pub async fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
201        match self
202            .codec_mut()
203            .request(&vec3(b"PUBLISH", channel, message))
204            .await?
205        {
206            Reply::Int(n) if n >= 0 => Ok(n as usize),
207            Reply::Error(e) => Err(io::Error::other(string(e))),
208            other => Err(unexpected(other)),
209        }
210    }
211}