1use std::io;
7use std::time::Duration;
8
9use kevy_resp::Reply;
10
11use crate::conn::AsyncConnection;
12use crate::reply::{string, unexpected, vec2, vec3};
13
14impl AsyncConnection {
15 pub async fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
17 match self.codec_mut().request(&vec3(b"SET", key, value)).await? {
18 Reply::Simple(s) if s == b"OK" => Ok(()),
19 Reply::Error(e) => Err(io::Error::other(string(e))),
20 other => Err(unexpected(other)),
21 }
22 }
23
24 pub async fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
26 match self.codec_mut().request(&vec2(b"GET", key)).await? {
27 Reply::Bulk(v) => Ok(Some(v)),
28 Reply::Nil => Ok(None),
29 Reply::Error(e) => Err(io::Error::other(string(e))),
30 other => Err(unexpected(other)),
31 }
32 }
33
34 pub async fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
36 let mut args = Vec::with_capacity(keys.len() + 1);
37 args.push(b"DEL".to_vec());
38 args.extend(keys.iter().map(|k| k.to_vec()));
39 match self.codec_mut().request(&args).await? {
40 Reply::Int(n) if n >= 0 => Ok(n as usize),
41 Reply::Error(e) => Err(io::Error::other(string(e))),
42 other => Err(unexpected(other)),
43 }
44 }
45
46 pub async fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
49 let mut args = Vec::with_capacity(keys.len() + 1);
50 args.push(b"EXISTS".to_vec());
51 args.extend(keys.iter().map(|k| k.to_vec()));
52 match self.codec_mut().request(&args).await? {
53 Reply::Int(n) if n >= 0 => Ok(n as usize),
54 Reply::Error(e) => Err(io::Error::other(string(e))),
55 other => Err(unexpected(other)),
56 }
57 }
58
59 pub async fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
61 match self.codec_mut().request(&vec2(b"INCR", key)).await? {
62 Reply::Int(n) => Ok(n),
63 Reply::Error(e) => Err(io::Error::other(string(e))),
64 other => Err(unexpected(other)),
65 }
66 }
67
68 pub async fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
70 let args = vec![
71 b"INCRBY".to_vec(),
72 key.to_vec(),
73 delta.to_string().into_bytes(),
74 ];
75 match self.codec_mut().request(&args).await? {
76 Reply::Int(n) => Ok(n),
77 Reply::Error(e) => Err(io::Error::other(string(e))),
78 other => Err(unexpected(other)),
79 }
80 }
81
82 pub async fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
85 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
86 let args = vec![b"PEXPIRE".to_vec(), key.to_vec(), ms.to_string().into_bytes()];
87 match self.codec_mut().request(&args).await? {
88 Reply::Int(1) => Ok(true),
89 Reply::Int(0) => Ok(false),
90 Reply::Error(e) => Err(io::Error::other(string(e))),
91 other => Err(unexpected(other)),
92 }
93 }
94
95 pub async fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
97 match self.codec_mut().request(&vec2(b"PERSIST", key)).await? {
98 Reply::Int(1) => Ok(true),
99 Reply::Int(0) => Ok(false),
100 Reply::Error(e) => Err(io::Error::other(string(e))),
101 other => Err(unexpected(other)),
102 }
103 }
104
105 pub async fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
107 match self.codec_mut().request(&vec2(b"PTTL", key)).await? {
108 Reply::Int(n) => Ok(n),
109 Reply::Error(e) => Err(io::Error::other(string(e))),
110 other => Err(unexpected(other)),
111 }
112 }
113
114 pub async fn type_of(&mut self, key: &[u8]) -> io::Result<String> {
117 match self.codec_mut().request(&vec2(b"TYPE", key)).await? {
118 Reply::Simple(s) => Ok(string(s)),
119 Reply::Error(e) => Err(io::Error::other(string(e))),
120 other => Err(unexpected(other)),
121 }
122 }
123
124 pub async fn dbsize(&mut self) -> io::Result<usize> {
126 match self.codec_mut().request(&[b"DBSIZE".to_vec()]).await? {
127 Reply::Int(n) if n >= 0 => Ok(n as usize),
128 Reply::Error(e) => Err(io::Error::other(string(e))),
129 other => Err(unexpected(other)),
130 }
131 }
132
133 pub async fn flushall(&mut self) -> io::Result<()> {
136 match self.codec_mut().request(&[b"FLUSHALL".to_vec()]).await? {
137 Reply::Simple(s) if s == b"OK" => Ok(()),
138 Reply::Error(e) => Err(io::Error::other(string(e))),
139 other => Err(unexpected(other)),
140 }
141 }
142
143 pub async fn set_with_ttl(
145 &mut self,
146 key: &[u8],
147 value: &[u8],
148 ttl: Duration,
149 ) -> io::Result<()> {
150 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
151 let args = vec![
152 b"SET".to_vec(),
153 key.to_vec(),
154 value.to_vec(),
155 b"PX".to_vec(),
156 ms.to_string().into_bytes(),
157 ];
158 match self.codec_mut().request(&args).await? {
159 Reply::Simple(s) if s == b"OK" => Ok(()),
160 Reply::Error(e) => Err(io::Error::other(string(e))),
161 other => Err(unexpected(other)),
162 }
163 }
164
165 pub async fn mget(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Option<Vec<u8>>>> {
167 let mut args = Vec::with_capacity(keys.len() + 1);
168 args.push(b"MGET".to_vec());
169 args.extend(keys.iter().map(|k| k.to_vec()));
170 match self.codec_mut().request(&args).await? {
171 Reply::Array(items) => items
172 .into_iter()
173 .map(|r| match r {
174 Reply::Bulk(v) => Ok(Some(v)),
175 Reply::Nil => Ok(None),
176 other => Err(unexpected(other)),
177 })
178 .collect(),
179 Reply::Error(e) => Err(io::Error::other(string(e))),
180 other => Err(unexpected(other)),
181 }
182 }
183
184 pub async fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<()> {
186 let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
187 args.push(b"MSET".to_vec());
188 for (k, v) in pairs {
189 args.push(k.to_vec());
190 args.push(v.to_vec());
191 }
192 match self.codec_mut().request(&args).await? {
193 Reply::Simple(s) if s == b"OK" => Ok(()),
194 Reply::Error(e) => Err(io::Error::other(string(e))),
195 other => Err(unexpected(other)),
196 }
197 }
198
199 pub async fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
201 match self
202 .codec_mut()
203 .request(&vec3(b"PUBLISH", channel, message))
204 .await?
205 {
206 Reply::Int(n) if n >= 0 => Ok(n as usize),
207 Reply::Error(e) => Err(io::Error::other(string(e))),
208 other => Err(unexpected(other)),
209 }
210 }
211}