1use std::io;
10
11use kevy_resp::Reply;
12use kevy_resp_client::RespClient;
13
14use crate::{Connection, array_to_bulks, store_err, string, unexpected, vec2, vec3};
15
16impl Connection {
17 pub fn hset(&mut self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
22 match self {
23 Self::Embedded(s) => s.hset(key, pairs),
24 Self::Remote(c) => {
25 let mut args = Vec::with_capacity(2 + pairs.len() * 2);
26 args.push(b"HSET".to_vec());
27 args.push(key.to_vec());
28 for (f, v) in pairs {
29 args.push(f.to_vec());
30 args.push(v.to_vec());
31 }
32 match c.request(&args)? {
33 Reply::Int(n) if n >= 0 => Ok(n as usize),
34 Reply::Error(e) => Err(io::Error::other(string(e))),
35 other => Err(unexpected(other)),
36 }
37 }
38 }
39 }
40
41 pub fn hget(&mut self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
43 match self {
44 Self::Embedded(s) => s.hget(key, field),
45 Self::Remote(c) => match c.request(&vec3(b"HGET", key, field))? {
46 Reply::Bulk(v) => Ok(Some(v)),
47 Reply::Nil => Ok(None),
48 Reply::Error(e) => Err(io::Error::other(string(e))),
49 other => Err(unexpected(other)),
50 },
51 }
52 }
53
54 pub fn hdel(&mut self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
57 match self {
58 Self::Embedded(s) => s.hdel(key, fields),
59 Self::Remote(c) => {
60 let mut args = Vec::with_capacity(fields.len() + 2);
61 args.push(b"HDEL".to_vec());
62 args.push(key.to_vec());
63 args.extend(fields.iter().map(|f| f.to_vec()));
64 match c.request(&args)? {
65 Reply::Int(n) if n >= 0 => Ok(n as usize),
66 Reply::Error(e) => Err(io::Error::other(string(e))),
67 other => Err(unexpected(other)),
68 }
69 }
70 }
71 }
72
73 pub fn hlen(&mut self, key: &[u8]) -> io::Result<usize> {
75 match self {
76 Self::Embedded(s) => s.with(|inner| inner.hlen(key)).map_err(store_err),
77 Self::Remote(c) => match c.request(&vec2(b"HLEN", key))? {
78 Reply::Int(n) if n >= 0 => Ok(n as usize),
79 Reply::Error(e) => Err(io::Error::other(string(e))),
80 other => Err(unexpected(other)),
81 },
82 }
83 }
84
85 pub fn hgetall(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
88 match self {
89 Self::Embedded(s) => s.with(|inner| inner.hgetall(key)).map_err(store_err),
90 Self::Remote(c) => match c.request(&vec2(b"HGETALL", key))? {
91 Reply::Array(items) => array_to_bulks(items),
92 Reply::Error(e) => Err(io::Error::other(string(e))),
93 other => Err(unexpected(other)),
94 },
95 }
96 }
97
98 pub fn hkeys(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
100 match self {
101 Self::Embedded(s) => s.with(|inner| inner.hkeys(key)).map_err(store_err),
102 Self::Remote(c) => match c.request(&vec2(b"HKEYS", key))? {
103 Reply::Array(items) => array_to_bulks(items),
104 Reply::Error(e) => Err(io::Error::other(string(e))),
105 other => Err(unexpected(other)),
106 },
107 }
108 }
109
110 pub fn hvals(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
112 match self {
113 Self::Embedded(s) => s.with(|inner| inner.hvals(key)).map_err(store_err),
114 Self::Remote(c) => match c.request(&vec2(b"HVALS", key))? {
115 Reply::Array(items) => array_to_bulks(items),
116 Reply::Error(e) => Err(io::Error::other(string(e))),
117 other => Err(unexpected(other)),
118 },
119 }
120 }
121
122 pub fn lpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
126 match self {
127 Self::Embedded(s) => s.lpush(key, values),
128 Self::Remote(c) => list_push(c, b"LPUSH", key, values),
129 }
130 }
131
132 pub fn rpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
134 match self {
135 Self::Embedded(s) => s.rpush(key, values),
136 Self::Remote(c) => list_push(c, b"RPUSH", key, values),
137 }
138 }
139
140 pub fn lpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
143 match self {
144 Self::Embedded(s) => s.lpop(key, count),
145 Self::Remote(c) => list_pop(c, b"LPOP", key, count),
146 }
147 }
148
149 pub fn rpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
151 match self {
152 Self::Embedded(s) => s.rpop(key, count),
153 Self::Remote(c) => list_pop(c, b"RPOP", key, count),
154 }
155 }
156
157 pub fn llen(&mut self, key: &[u8]) -> io::Result<usize> {
159 match self {
160 Self::Embedded(s) => s.llen(key),
161 Self::Remote(c) => match c.request(&vec2(b"LLEN", key))? {
162 Reply::Int(n) if n >= 0 => Ok(n as usize),
163 Reply::Error(e) => Err(io::Error::other(string(e))),
164 other => Err(unexpected(other)),
165 },
166 }
167 }
168
169 pub fn lrange(&mut self, key: &[u8], start: i64, stop: i64) -> io::Result<Vec<Vec<u8>>> {
172 match self {
173 Self::Embedded(s) => s
174 .with(|inner| inner.lrange(key, start, stop))
175 .map_err(store_err),
176 Self::Remote(c) => {
177 let args = vec![
178 b"LRANGE".to_vec(),
179 key.to_vec(),
180 start.to_string().into_bytes(),
181 stop.to_string().into_bytes(),
182 ];
183 match c.request(&args)? {
184 Reply::Array(items) => array_to_bulks(items),
185 Reply::Error(e) => Err(io::Error::other(string(e))),
186 other => Err(unexpected(other)),
187 }
188 }
189 }
190 }
191
192 pub fn sadd(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
196 match self {
197 Self::Embedded(s) => s.sadd(key, members),
198 Self::Remote(c) => set_multi(c, b"SADD", key, members),
199 }
200 }
201
202 pub fn srem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
204 match self {
205 Self::Embedded(s) => s.srem(key, members),
206 Self::Remote(c) => set_multi(c, b"SREM", key, members),
207 }
208 }
209
210 pub fn smembers(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
212 match self {
213 Self::Embedded(s) => s.smembers(key),
214 Self::Remote(c) => match c.request(&vec2(b"SMEMBERS", key))? {
215 Reply::Array(items) => array_to_bulks(items),
216 Reply::Error(e) => Err(io::Error::other(string(e))),
217 other => Err(unexpected(other)),
218 },
219 }
220 }
221
222 pub fn scard(&mut self, key: &[u8]) -> io::Result<usize> {
224 match self {
225 Self::Embedded(s) => s.scard(key),
226 Self::Remote(c) => match c.request(&vec2(b"SCARD", key))? {
227 Reply::Int(n) if n >= 0 => Ok(n as usize),
228 Reply::Error(e) => Err(io::Error::other(string(e))),
229 other => Err(unexpected(other)),
230 },
231 }
232 }
233
234 pub fn sismember(&mut self, key: &[u8], member: &[u8]) -> io::Result<bool> {
236 match self {
237 Self::Embedded(s) => s
238 .with(|inner| inner.sismember(key, member))
239 .map_err(store_err),
240 Self::Remote(c) => match c.request(&vec3(b"SISMEMBER", key, member))? {
241 Reply::Int(1) => Ok(true),
242 Reply::Int(0) => Ok(false),
243 Reply::Error(e) => Err(io::Error::other(string(e))),
244 other => Err(unexpected(other)),
245 },
246 }
247 }
248
249 pub fn sinter(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
251 match self {
252 Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Inter),
253 Self::Remote(c) => remote_set_combine(c, b"SINTER", keys),
254 }
255 }
256
257 pub fn sunion(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
259 match self {
260 Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Union),
261 Self::Remote(c) => remote_set_combine(c, b"SUNION", keys),
262 }
263 }
264
265 pub fn sdiff(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
267 match self {
268 Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Diff),
269 Self::Remote(c) => remote_set_combine(c, b"SDIFF", keys),
270 }
271 }
272
273 pub fn zadd(&mut self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
278 match self {
279 Self::Embedded(s) => s.zadd(key, pairs),
280 Self::Remote(c) => {
281 let mut args = Vec::with_capacity(2 + pairs.len() * 2);
282 args.push(b"ZADD".to_vec());
283 args.push(key.to_vec());
284 for (score, m) in pairs {
285 args.push(score.to_string().into_bytes());
286 args.push(m.to_vec());
287 }
288 match c.request(&args)? {
289 Reply::Int(n) if n >= 0 => Ok(n as usize),
290 Reply::Error(e) => Err(io::Error::other(string(e))),
291 other => Err(unexpected(other)),
292 }
293 }
294 }
295 }
296
297 pub fn zrem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
299 match self {
300 Self::Embedded(s) => s.zrem(key, members),
301 Self::Remote(c) => set_multi(c, b"ZREM", key, members),
302 }
303 }
304
305 pub fn zscore(&mut self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
307 match self {
308 Self::Embedded(s) => s.zscore(key, member),
309 Self::Remote(c) => match c.request(&vec3(b"ZSCORE", key, member))? {
310 Reply::Bulk(v) => {
311 let s = std::str::from_utf8(&v)
312 .map_err(|_| io::Error::other("non-utf8 score reply"))?;
313 let n: f64 = s
314 .parse()
315 .map_err(|_| io::Error::other(format!("bad score float: {s}")))?;
316 Ok(Some(n))
317 }
318 Reply::Nil => Ok(None),
319 Reply::Error(e) => Err(io::Error::other(string(e))),
320 other => Err(unexpected(other)),
321 },
322 }
323 }
324
325 pub fn zcard(&mut self, key: &[u8]) -> io::Result<usize> {
327 match self {
328 Self::Embedded(s) => s.zcard(key),
329 Self::Remote(c) => match c.request(&vec2(b"ZCARD", key))? {
330 Reply::Int(n) if n >= 0 => Ok(n as usize),
331 Reply::Error(e) => Err(io::Error::other(string(e))),
332 other => Err(unexpected(other)),
333 },
334 }
335 }
336
337 pub fn zrange(&mut self, key: &[u8], start: i64, stop: i64) -> io::Result<Vec<Vec<u8>>> {
340 match self {
341 Self::Embedded(s) => s
342 .with(|inner| inner.zrange(key, start, stop))
343 .map(|pairs| pairs.into_iter().map(|(m, _score)| m).collect())
344 .map_err(store_err),
345 Self::Remote(c) => {
346 let args = vec![
347 b"ZRANGE".to_vec(),
348 key.to_vec(),
349 start.to_string().into_bytes(),
350 stop.to_string().into_bytes(),
351 ];
352 match c.request(&args)? {
353 Reply::Array(items) => array_to_bulks(items),
354 Reply::Error(e) => Err(io::Error::other(string(e))),
355 other => Err(unexpected(other)),
356 }
357 }
358 }
359 }
360}
361
362fn list_push(
368 c: &mut RespClient,
369 verb: &[u8],
370 key: &[u8],
371 values: &[&[u8]],
372) -> io::Result<usize> {
373 let mut args = Vec::with_capacity(values.len() + 2);
374 args.push(verb.to_vec());
375 args.push(key.to_vec());
376 args.extend(values.iter().map(|v| v.to_vec()));
377 match c.request(&args)? {
378 Reply::Int(n) if n >= 0 => Ok(n as usize),
379 Reply::Error(e) => Err(io::Error::other(string(e))),
380 other => Err(unexpected(other)),
381 }
382}
383
384fn list_pop(
385 c: &mut RespClient,
386 verb: &[u8],
387 key: &[u8],
388 count: usize,
389) -> io::Result<Vec<Vec<u8>>> {
390 let args = vec![verb.to_vec(), key.to_vec(), count.to_string().into_bytes()];
391 match c.request(&args)? {
392 Reply::Array(items) => array_to_bulks(items),
393 Reply::Bulk(v) => Ok(vec![v]),
394 Reply::Nil => Ok(Vec::new()),
395 Reply::Error(e) => Err(io::Error::other(string(e))),
396 other => Err(unexpected(other)),
397 }
398}
399
400fn set_multi(
401 c: &mut RespClient,
402 verb: &[u8],
403 key: &[u8],
404 members: &[&[u8]],
405) -> io::Result<usize> {
406 let mut args = Vec::with_capacity(members.len() + 2);
407 args.push(verb.to_vec());
408 args.push(key.to_vec());
409 args.extend(members.iter().map(|m| m.to_vec()));
410 match c.request(&args)? {
411 Reply::Int(n) if n >= 0 => Ok(n as usize),
412 Reply::Error(e) => Err(io::Error::other(string(e))),
413 other => Err(unexpected(other)),
414 }
415}
416
417#[derive(Clone, Copy)]
421enum SetOp {
422 Inter,
423 Union,
424 Diff,
425}
426
427fn embed_set_combine(
428 s: &kevy_embedded::Store,
429 keys: &[&[u8]],
430 op: SetOp,
431) -> io::Result<Vec<Vec<u8>>> {
432 use std::collections::HashSet;
433 if keys.is_empty() {
434 return Ok(Vec::new());
435 }
436 let snapshots: Vec<Vec<Vec<u8>>> = keys
437 .iter()
438 .map(|k| s.smembers(k))
439 .collect::<io::Result<_>>()?;
440 let mut iter = snapshots.into_iter();
441 let mut acc: HashSet<Vec<u8>> = iter.next().unwrap_or_default().into_iter().collect();
442 for rest in iter {
443 let other: HashSet<Vec<u8>> = rest.into_iter().collect();
444 acc = match op {
445 SetOp::Inter => acc.intersection(&other).cloned().collect(),
446 SetOp::Union => acc.union(&other).cloned().collect(),
447 SetOp::Diff => acc.difference(&other).cloned().collect(),
448 };
449 }
450 Ok(acc.into_iter().collect())
451}
452
453fn remote_set_combine(
454 c: &mut RespClient,
455 verb: &[u8],
456 keys: &[&[u8]],
457) -> io::Result<Vec<Vec<u8>>> {
458 let mut args = Vec::with_capacity(keys.len() + 1);
459 args.push(verb.to_vec());
460 args.extend(keys.iter().map(|k| k.to_vec()));
461 match c.request(&args)? {
462 Reply::Array(items) => array_to_bulks(items),
463 Reply::Error(e) => Err(io::Error::other(string(e))),
464 other => Err(unexpected(other)),
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 #[test]
473 fn embedded_hash_methods() {
474 let mut c = Connection::open("mem://").unwrap();
475 let pairs: &[(&[u8], &[u8])] = &[
476 (b"name".as_ref(), b"alice".as_ref()),
477 (b"age".as_ref(), b"30".as_ref()),
478 ];
479 assert_eq!(c.hset(b"u:1", pairs).unwrap(), 2);
480 assert_eq!(c.hget(b"u:1", b"name").unwrap(), Some(b"alice".to_vec()));
481 assert_eq!(c.hget(b"u:1", b"missing").unwrap(), None);
482 assert_eq!(c.hlen(b"u:1").unwrap(), 2);
483
484 let mut all = c.hgetall(b"u:1").unwrap();
485 all.sort();
486 assert!(all.contains(&b"alice".to_vec()));
487 assert!(all.contains(&b"name".to_vec()));
488
489 let mut keys = c.hkeys(b"u:1").unwrap();
490 keys.sort();
491 assert_eq!(keys, vec![b"age".to_vec(), b"name".to_vec()]);
492
493 let mut vals = c.hvals(b"u:1").unwrap();
494 vals.sort();
495 assert_eq!(vals, vec![b"30".to_vec(), b"alice".to_vec()]);
496
497 assert_eq!(c.hdel(b"u:1", &[&b"age"[..], &b"missing"[..]]).unwrap(), 1);
498 assert_eq!(c.hlen(b"u:1").unwrap(), 1);
499 }
500
501 #[test]
502 fn embedded_list_methods() {
503 let mut c = Connection::open("mem://").unwrap();
504 assert_eq!(c.rpush(b"q", &[&b"a"[..], &b"b"[..], &b"c"[..]]).unwrap(), 3);
505 assert_eq!(c.lpush(b"q", &[&b"z"[..]]).unwrap(), 4);
506 assert_eq!(c.llen(b"q").unwrap(), 4);
507
508 assert_eq!(
509 c.lrange(b"q", 0, -1).unwrap(),
510 vec![b"z".to_vec(), b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]
511 );
512
513 assert_eq!(c.lpop(b"q", 2).unwrap(), vec![b"z".to_vec(), b"a".to_vec()]);
514 assert_eq!(c.rpop(b"q", 1).unwrap(), vec![b"c".to_vec()]);
515 assert_eq!(c.llen(b"q").unwrap(), 1);
516 }
517
518 #[test]
519 fn embedded_set_methods() {
520 let mut c = Connection::open("mem://").unwrap();
521 assert_eq!(
522 c.sadd(b"s", &[&b"a"[..], &b"b"[..], &b"a"[..]]).unwrap(),
523 2
524 );
525 assert_eq!(c.scard(b"s").unwrap(), 2);
526 assert!(c.sismember(b"s", b"a").unwrap());
527 assert!(!c.sismember(b"s", b"missing").unwrap());
528
529 let mut m = c.smembers(b"s").unwrap();
530 m.sort();
531 assert_eq!(m, vec![b"a".to_vec(), b"b".to_vec()]);
532
533 assert_eq!(c.srem(b"s", &[&b"a"[..]]).unwrap(), 1);
534 assert_eq!(c.scard(b"s").unwrap(), 1);
535 }
536
537 #[test]
538 fn embedded_zset_methods() {
539 let mut c = Connection::open("mem://").unwrap();
540 let pairs: &[(f64, &[u8])] = &[
541 (100.0, b"alice".as_ref()),
542 (200.0, b"bob".as_ref()),
543 (50.0, b"carol".as_ref()),
544 ];
545 assert_eq!(c.zadd(b"lb", pairs).unwrap(), 3);
546 assert_eq!(c.zscore(b"lb", b"bob").unwrap(), Some(200.0));
547 assert_eq!(c.zscore(b"lb", b"none").unwrap(), None);
548 assert_eq!(c.zcard(b"lb").unwrap(), 3);
549
550 let r = c.zrange(b"lb", 0, -1).unwrap();
551 assert_eq!(
552 r,
553 vec![b"carol".to_vec(), b"alice".to_vec(), b"bob".to_vec()]
554 );
555
556 assert_eq!(c.zrem(b"lb", &[&b"carol"[..]]).unwrap(), 1);
557 assert_eq!(c.zcard(b"lb").unwrap(), 2);
558 }
559
560 #[test]
561 fn embedded_set_combine_ops() {
562 let mut c = Connection::open("mem://").unwrap();
563 c.sadd(b"a", &[&b"x"[..], &b"y"[..], &b"z"[..]]).unwrap();
564 c.sadd(b"b", &[&b"y"[..], &b"z"[..], &b"w"[..]]).unwrap();
565
566 let mut inter = c.sinter(&[&b"a"[..], &b"b"[..]]).unwrap();
567 inter.sort();
568 assert_eq!(inter, vec![b"y".to_vec(), b"z".to_vec()]);
569
570 let mut union = c.sunion(&[&b"a"[..], &b"b"[..]]).unwrap();
571 union.sort();
572 assert_eq!(
573 union,
574 vec![b"w".to_vec(), b"x".to_vec(), b"y".to_vec(), b"z".to_vec()]
575 );
576
577 let mut diff = c.sdiff(&[&b"a"[..], &b"b"[..]]).unwrap();
578 diff.sort();
579 assert_eq!(diff, vec![b"x".to_vec()]);
580
581 assert!(c.sinter(&[]).unwrap().is_empty());
583 }
584}