1use std::io;
10
11use kevy_resp::Reply;
12use kevy_resp_client::RespClient;
13
14use crate::{Connection, array_to_bulks, store_err, string, unexpected, vec2, vec3};
15
16impl Connection {
17 pub fn hset(&mut self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
22 match self {
23 Self::Embedded(s) => s.hset(key, pairs),
24 Self::Remote(c) => {
25 let mut args = Vec::with_capacity(2 + pairs.len() * 2);
26 args.push(b"HSET".to_vec());
27 args.push(key.to_vec());
28 for (f, v) in pairs {
29 args.push(f.to_vec());
30 args.push(v.to_vec());
31 }
32 match c.request(&args)? {
33 Reply::Int(n) if n >= 0 => Ok(n as usize),
34 Reply::Error(e) => Err(io::Error::other(string(e))),
35 other => Err(unexpected(other)),
36 }
37 }
38 }
39 }
40
41 pub fn hget(&mut self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
43 match self {
44 Self::Embedded(s) => s.hget(key, field),
45 Self::Remote(c) => match c.request(&vec3(b"HGET", key, field))? {
46 Reply::Bulk(v) => Ok(Some(v)),
47 Reply::Nil => Ok(None),
48 Reply::Error(e) => Err(io::Error::other(string(e))),
49 other => Err(unexpected(other)),
50 },
51 }
52 }
53
54 pub fn hdel(&mut self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
57 match self {
58 Self::Embedded(s) => s.hdel(key, fields),
59 Self::Remote(c) => {
60 let mut args = Vec::with_capacity(fields.len() + 2);
61 args.push(b"HDEL".to_vec());
62 args.push(key.to_vec());
63 args.extend(fields.iter().map(|f| f.to_vec()));
64 match c.request(&args)? {
65 Reply::Int(n) if n >= 0 => Ok(n as usize),
66 Reply::Error(e) => Err(io::Error::other(string(e))),
67 other => Err(unexpected(other)),
68 }
69 }
70 }
71 }
72
73 pub fn hlen(&mut self, key: &[u8]) -> io::Result<usize> {
75 match self {
76 Self::Embedded(s) => s.with(|inner| inner.hlen(key)).map_err(store_err),
77 Self::Remote(c) => match c.request(&vec2(b"HLEN", key))? {
78 Reply::Int(n) if n >= 0 => Ok(n as usize),
79 Reply::Error(e) => Err(io::Error::other(string(e))),
80 other => Err(unexpected(other)),
81 },
82 }
83 }
84
85 pub fn hgetall(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
88 match self {
89 Self::Embedded(s) => s.with(|inner| inner.hgetall(key)).map_err(store_err),
90 Self::Remote(c) => match c.request(&vec2(b"HGETALL", key))? {
91 Reply::Array(items) => array_to_bulks(items),
92 Reply::Error(e) => Err(io::Error::other(string(e))),
93 other => Err(unexpected(other)),
94 },
95 }
96 }
97
98 pub fn hkeys(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
100 match self {
101 Self::Embedded(s) => s.with(|inner| inner.hkeys(key)).map_err(store_err),
102 Self::Remote(c) => match c.request(&vec2(b"HKEYS", key))? {
103 Reply::Array(items) => array_to_bulks(items),
104 Reply::Error(e) => Err(io::Error::other(string(e))),
105 other => Err(unexpected(other)),
106 },
107 }
108 }
109
110 pub fn hvals(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
112 match self {
113 Self::Embedded(s) => s.with(|inner| inner.hvals(key)).map_err(store_err),
114 Self::Remote(c) => match c.request(&vec2(b"HVALS", key))? {
115 Reply::Array(items) => array_to_bulks(items),
116 Reply::Error(e) => Err(io::Error::other(string(e))),
117 other => Err(unexpected(other)),
118 },
119 }
120 }
121
122 pub fn lpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
126 match self {
127 Self::Embedded(s) => s.lpush(key, values),
128 Self::Remote(c) => list_push(c, b"LPUSH", key, values),
129 }
130 }
131
132 pub fn rpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
134 match self {
135 Self::Embedded(s) => s.rpush(key, values),
136 Self::Remote(c) => list_push(c, b"RPUSH", key, values),
137 }
138 }
139
140 pub fn lpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
143 match self {
144 Self::Embedded(s) => s.lpop(key, count),
145 Self::Remote(c) => list_pop(c, b"LPOP", key, count),
146 }
147 }
148
149 pub fn rpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
151 match self {
152 Self::Embedded(s) => s.rpop(key, count),
153 Self::Remote(c) => list_pop(c, b"RPOP", key, count),
154 }
155 }
156
157 pub fn llen(&mut self, key: &[u8]) -> io::Result<usize> {
159 match self {
160 Self::Embedded(s) => s.llen(key),
161 Self::Remote(c) => match c.request(&vec2(b"LLEN", key))? {
162 Reply::Int(n) if n >= 0 => Ok(n as usize),
163 Reply::Error(e) => Err(io::Error::other(string(e))),
164 other => Err(unexpected(other)),
165 },
166 }
167 }
168
169 pub fn lrange(&mut self, key: &[u8], start: i64, stop: i64) -> io::Result<Vec<Vec<u8>>> {
172 match self {
173 Self::Embedded(s) => s
174 .with(|inner| inner.lrange(key, start, stop))
175 .map_err(store_err),
176 Self::Remote(c) => {
177 let args = vec![
178 b"LRANGE".to_vec(),
179 key.to_vec(),
180 start.to_string().into_bytes(),
181 stop.to_string().into_bytes(),
182 ];
183 match c.request(&args)? {
184 Reply::Array(items) => array_to_bulks(items),
185 Reply::Error(e) => Err(io::Error::other(string(e))),
186 other => Err(unexpected(other)),
187 }
188 }
189 }
190 }
191
192 pub fn sadd(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
196 match self {
197 Self::Embedded(s) => s.sadd(key, members),
198 Self::Remote(c) => set_multi(c, b"SADD", key, members),
199 }
200 }
201
202 pub fn srem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
204 match self {
205 Self::Embedded(s) => s.srem(key, members),
206 Self::Remote(c) => set_multi(c, b"SREM", key, members),
207 }
208 }
209
210 pub fn smembers(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
212 match self {
213 Self::Embedded(s) => s.smembers(key),
214 Self::Remote(c) => match c.request(&vec2(b"SMEMBERS", key))? {
215 Reply::Array(items) => array_to_bulks(items),
216 Reply::Error(e) => Err(io::Error::other(string(e))),
217 other => Err(unexpected(other)),
218 },
219 }
220 }
221
222 pub fn scard(&mut self, key: &[u8]) -> io::Result<usize> {
224 match self {
225 Self::Embedded(s) => s.scard(key),
226 Self::Remote(c) => match c.request(&vec2(b"SCARD", key))? {
227 Reply::Int(n) if n >= 0 => Ok(n as usize),
228 Reply::Error(e) => Err(io::Error::other(string(e))),
229 other => Err(unexpected(other)),
230 },
231 }
232 }
233
234 pub fn sismember(&mut self, key: &[u8], member: &[u8]) -> io::Result<bool> {
236 match self {
237 Self::Embedded(s) => s
238 .with(|inner| inner.sismember(key, member))
239 .map_err(store_err),
240 Self::Remote(c) => match c.request(&vec3(b"SISMEMBER", key, member))? {
241 Reply::Int(1) => Ok(true),
242 Reply::Int(0) => Ok(false),
243 Reply::Error(e) => Err(io::Error::other(string(e))),
244 other => Err(unexpected(other)),
245 },
246 }
247 }
248
249 pub fn sinter(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
251 match self {
252 Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Inter),
253 Self::Remote(c) => remote_set_combine(c, b"SINTER", keys),
254 }
255 }
256
257 pub fn sunion(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
259 match self {
260 Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Union),
261 Self::Remote(c) => remote_set_combine(c, b"SUNION", keys),
262 }
263 }
264
265 pub fn sdiff(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
267 match self {
268 Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Diff),
269 Self::Remote(c) => remote_set_combine(c, b"SDIFF", keys),
270 }
271 }
272
273 pub fn zadd(&mut self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
278 match self {
279 Self::Embedded(s) => s.zadd(key, pairs),
280 Self::Remote(c) => {
281 let mut args = Vec::with_capacity(2 + pairs.len() * 2);
282 args.push(b"ZADD".to_vec());
283 args.push(key.to_vec());
284 for (score, m) in pairs {
285 args.push(score.to_string().into_bytes());
286 args.push(m.to_vec());
287 }
288 match c.request(&args)? {
289 Reply::Int(n) if n >= 0 => Ok(n as usize),
290 Reply::Error(e) => Err(io::Error::other(string(e))),
291 other => Err(unexpected(other)),
292 }
293 }
294 }
295 }
296
297 pub fn zrem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
299 match self {
300 Self::Embedded(s) => s.zrem(key, members),
301 Self::Remote(c) => set_multi(c, b"ZREM", key, members),
302 }
303 }
304
305 pub fn zscore(&mut self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
307 match self {
308 Self::Embedded(s) => s.zscore(key, member),
309 Self::Remote(c) => match c.request(&vec3(b"ZSCORE", key, member))? {
310 Reply::Bulk(v) => {
311 let s = std::str::from_utf8(&v)
312 .map_err(|_| io::Error::other("non-utf8 score reply"))?;
313 let n: f64 = s
314 .parse()
315 .map_err(|_| io::Error::other(format!("bad score float: {s}")))?;
316 Ok(Some(n))
317 }
318 Reply::Nil => Ok(None),
319 Reply::Error(e) => Err(io::Error::other(string(e))),
320 other => Err(unexpected(other)),
321 },
322 }
323 }
324
325 pub fn zcard(&mut self, key: &[u8]) -> io::Result<usize> {
327 match self {
328 Self::Embedded(s) => s.zcard(key),
329 Self::Remote(c) => match c.request(&vec2(b"ZCARD", key))? {
330 Reply::Int(n) if n >= 0 => Ok(n as usize),
331 Reply::Error(e) => Err(io::Error::other(string(e))),
332 other => Err(unexpected(other)),
333 },
334 }
335 }
336
337 pub fn zrange(&mut self, key: &[u8], start: i64, stop: i64) -> io::Result<Vec<Vec<u8>>> {
340 match self {
341 Self::Embedded(s) => s
342 .with(|inner| inner.zrange(key, start, stop))
343 .map(|pairs| pairs.into_iter().map(|(m, _score)| m).collect())
344 .map_err(store_err),
345 Self::Remote(c) => {
346 let args = vec![
347 b"ZRANGE".to_vec(),
348 key.to_vec(),
349 start.to_string().into_bytes(),
350 stop.to_string().into_bytes(),
351 ];
352 match c.request(&args)? {
353 Reply::Array(items) => array_to_bulks(items),
354 Reply::Error(e) => Err(io::Error::other(string(e))),
355 other => Err(unexpected(other)),
356 }
357 }
358 }
359 }
360}
361
362fn list_push(
368 c: &mut RespClient,
369 verb: &[u8],
370 key: &[u8],
371 values: &[&[u8]],
372) -> io::Result<usize> {
373 let mut args = Vec::with_capacity(values.len() + 2);
374 args.push(verb.to_vec());
375 args.push(key.to_vec());
376 args.extend(values.iter().map(|v| v.to_vec()));
377 match c.request(&args)? {
378 Reply::Int(n) if n >= 0 => Ok(n as usize),
379 Reply::Error(e) => Err(io::Error::other(string(e))),
380 other => Err(unexpected(other)),
381 }
382}
383
384fn list_pop(
385 c: &mut RespClient,
386 verb: &[u8],
387 key: &[u8],
388 count: usize,
389) -> io::Result<Vec<Vec<u8>>> {
390 let args = vec![verb.to_vec(), key.to_vec(), count.to_string().into_bytes()];
391 match c.request(&args)? {
392 Reply::Array(items) => array_to_bulks(items),
393 Reply::Bulk(v) => Ok(vec![v]),
394 Reply::Nil => Ok(Vec::new()),
395 Reply::Error(e) => Err(io::Error::other(string(e))),
396 other => Err(unexpected(other)),
397 }
398}
399
400fn set_multi(
401 c: &mut RespClient,
402 verb: &[u8],
403 key: &[u8],
404 members: &[&[u8]],
405) -> io::Result<usize> {
406 let mut args = Vec::with_capacity(members.len() + 2);
407 args.push(verb.to_vec());
408 args.push(key.to_vec());
409 args.extend(members.iter().map(|m| m.to_vec()));
410 match c.request(&args)? {
411 Reply::Int(n) if n >= 0 => Ok(n as usize),
412 Reply::Error(e) => Err(io::Error::other(string(e))),
413 other => Err(unexpected(other)),
414 }
415}
416
417#[derive(Clone, Copy)]
421enum SetOp {
422 Inter,
423 Union,
424 Diff,
425}
426
427fn embed_set_combine(
428 s: &kevy_embedded::Store,
429 keys: &[&[u8]],
430 op: SetOp,
431) -> io::Result<Vec<Vec<u8>>> {
432 use std::collections::HashSet;
433 if keys.is_empty() {
434 return Ok(Vec::new());
435 }
436 let snapshots: Vec<Vec<Vec<u8>>> = keys
437 .iter()
438 .map(|k| s.smembers(k))
439 .collect::<io::Result<_>>()?;
440 let mut iter = snapshots.into_iter();
441 let mut acc: HashSet<Vec<u8>> = iter.next().unwrap_or_default().into_iter().collect();
442 for rest in iter {
443 let other: HashSet<Vec<u8>> = rest.into_iter().collect();
444 acc = match op {
445 SetOp::Inter => acc.intersection(&other).cloned().collect(),
446 SetOp::Union => acc.union(&other).cloned().collect(),
447 SetOp::Diff => acc.difference(&other).cloned().collect(),
448 };
449 }
450 Ok(acc.into_iter().collect())
451}
452
453fn remote_set_combine(
454 c: &mut RespClient,
455 verb: &[u8],
456 keys: &[&[u8]],
457) -> io::Result<Vec<Vec<u8>>> {
458 let mut args = Vec::with_capacity(keys.len() + 1);
459 args.push(verb.to_vec());
460 args.extend(keys.iter().map(|k| k.to_vec()));
461 match c.request(&args)? {
462 Reply::Array(items) => array_to_bulks(items),
463 Reply::Error(e) => Err(io::Error::other(string(e))),
464 other => Err(unexpected(other)),
465 }
466}
467
468#[cfg(test)]
469#[path = "collections_tests.rs"]
470mod tests;