1use bytes::Bytes;
15use ember_protocol::types::Frame;
16
17use crate::connection::{Client, ClientError};
18use crate::pipeline::Pipeline;
19use crate::subscriber::Subscriber;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ScanPage {
28 pub cursor: u64,
30 pub keys: Vec<Bytes>,
32}
33
34#[derive(Debug, Clone)]
36pub struct SlowlogEntry {
37 pub id: i64,
39 pub timestamp: i64,
41 pub duration_us: i64,
43 pub command: Vec<Bytes>,
45}
46
47fn cmd2(name: &'static [u8], a: &[u8]) -> Frame {
50 Frame::Array(vec![
51 Frame::Bulk(Bytes::from_static(name)),
52 Frame::Bulk(Bytes::copy_from_slice(a)),
53 ])
54}
55
56fn cmd3(name: &'static [u8], a: &[u8], b: &[u8]) -> Frame {
57 Frame::Array(vec![
58 Frame::Bulk(Bytes::from_static(name)),
59 Frame::Bulk(Bytes::copy_from_slice(a)),
60 Frame::Bulk(Bytes::copy_from_slice(b)),
61 ])
62}
63
64fn cmd4(name: &'static [u8], a: &[u8], b: &[u8], c: &[u8]) -> Frame {
65 Frame::Array(vec![
66 Frame::Bulk(Bytes::from_static(name)),
67 Frame::Bulk(Bytes::copy_from_slice(a)),
68 Frame::Bulk(Bytes::copy_from_slice(b)),
69 Frame::Bulk(Bytes::copy_from_slice(c)),
70 ])
71}
72
73fn optional_bytes(frame: Frame) -> Result<Option<Bytes>, ClientError> {
83 match frame {
84 Frame::Bulk(b) => Ok(Some(b)),
85 Frame::Null => Ok(None),
86 Frame::Error(e) => Err(ClientError::Server(e)),
87 other => Err(ClientError::Protocol(format!(
88 "expected bulk or null, got {other:?}"
89 ))),
90 }
91}
92
93fn integer(frame: Frame) -> Result<i64, ClientError> {
98 match frame {
99 Frame::Integer(n) => Ok(n),
100 Frame::Error(e) => Err(ClientError::Server(e)),
101 other => Err(ClientError::Protocol(format!(
102 "expected integer, got {other:?}"
103 ))),
104 }
105}
106
107fn optional_integer(frame: Frame) -> Result<Option<i64>, ClientError> {
111 match frame {
112 Frame::Integer(n) => Ok(Some(n)),
113 Frame::Null => Ok(None),
114 Frame::Error(e) => Err(ClientError::Server(e)),
115 other => Err(ClientError::Protocol(format!(
116 "expected integer or null, got {other:?}"
117 ))),
118 }
119}
120
121fn bool_flag(frame: Frame) -> Result<bool, ClientError> {
125 match frame {
126 Frame::Integer(1) => Ok(true),
127 Frame::Integer(0) => Ok(false),
128 Frame::Error(e) => Err(ClientError::Server(e)),
129 other => Err(ClientError::Protocol(format!(
130 "expected 0 or 1, got {other:?}"
131 ))),
132 }
133}
134
135fn ok(frame: Frame) -> Result<(), ClientError> {
139 match frame {
140 Frame::Simple(s) if s == "OK" => Ok(()),
141 Frame::Error(e) => Err(ClientError::Server(e)),
142 other => Err(ClientError::Protocol(format!(
143 "expected +OK, got {other:?}"
144 ))),
145 }
146}
147
148fn bytes_vec(frame: Frame) -> Result<Vec<Bytes>, ClientError> {
152 match frame {
153 Frame::Array(elems) => elems
154 .into_iter()
155 .map(|e| match e {
156 Frame::Bulk(b) => Ok(b),
157 Frame::Error(e) => Err(ClientError::Server(e)),
158 other => Err(ClientError::Protocol(format!(
159 "expected bulk in array, got {other:?}"
160 ))),
161 })
162 .collect(),
163 Frame::Null => Ok(Vec::new()),
164 Frame::Error(e) => Err(ClientError::Server(e)),
165 other => Err(ClientError::Protocol(format!(
166 "expected array, got {other:?}"
167 ))),
168 }
169}
170
171fn optional_bytes_vec(frame: Frame) -> Result<Vec<Option<Bytes>>, ClientError> {
175 match frame {
176 Frame::Array(elems) => elems
177 .into_iter()
178 .map(|e| match e {
179 Frame::Bulk(b) => Ok(Some(b)),
180 Frame::Null => Ok(None),
181 Frame::Error(e) => Err(ClientError::Server(e)),
182 other => Err(ClientError::Protocol(format!(
183 "unexpected element in array: {other:?}"
184 ))),
185 })
186 .collect(),
187 Frame::Error(e) => Err(ClientError::Server(e)),
188 other => Err(ClientError::Protocol(format!(
189 "expected array, got {other:?}"
190 ))),
191 }
192}
193
194fn pairs(frame: Frame) -> Result<Vec<(Bytes, Bytes)>, ClientError> {
198 let elems = match frame {
199 Frame::Array(e) => e,
200 Frame::Null => return Ok(Vec::new()),
201 Frame::Error(e) => return Err(ClientError::Server(e)),
202 other => {
203 return Err(ClientError::Protocol(format!(
204 "expected array for pairs, got {other:?}"
205 )))
206 }
207 };
208
209 if elems.len() % 2 != 0 {
210 return Err(ClientError::Protocol(format!(
211 "pairs array has odd length ({})",
212 elems.len()
213 )));
214 }
215
216 let mut result = Vec::with_capacity(elems.len() / 2);
217 let mut iter = elems.into_iter();
218 while let (Some(k), Some(v)) = (iter.next(), iter.next()) {
219 let key = match k {
220 Frame::Bulk(b) => b,
221 other => {
222 return Err(ClientError::Protocol(format!(
223 "expected bulk key in pairs, got {other:?}"
224 )))
225 }
226 };
227 let val = match v {
228 Frame::Bulk(b) => b,
229 other => {
230 return Err(ClientError::Protocol(format!(
231 "expected bulk value in pairs, got {other:?}"
232 )))
233 }
234 };
235 result.push((key, val));
236 }
237 Ok(result)
238}
239
240fn optional_score(frame: Frame) -> Result<Option<f64>, ClientError> {
244 match frame {
245 Frame::Bulk(b) => {
246 let s = std::str::from_utf8(&b)
247 .map_err(|_| ClientError::Protocol("score is not valid UTF-8".into()))?;
248 s.parse::<f64>()
249 .map(Some)
250 .map_err(|_| ClientError::Protocol(format!("score is not a valid float: {s:?}")))
251 }
252 Frame::Null => Ok(None),
253 Frame::Error(e) => Err(ClientError::Server(e)),
254 other => Err(ClientError::Protocol(format!(
255 "expected bulk or null for score, got {other:?}"
256 ))),
257 }
258}
259
260fn scored_members(frame: Frame) -> Result<Vec<(Bytes, f64)>, ClientError> {
264 let elems = match frame {
265 Frame::Array(e) => e,
266 Frame::Null => return Ok(Vec::new()),
267 Frame::Error(e) => return Err(ClientError::Server(e)),
268 other => {
269 return Err(ClientError::Protocol(format!(
270 "expected array for scored members, got {other:?}"
271 )))
272 }
273 };
274
275 if elems.len() % 2 != 0 {
276 return Err(ClientError::Protocol(format!(
277 "WITHSCORES array has odd length ({})",
278 elems.len()
279 )));
280 }
281
282 let mut result = Vec::with_capacity(elems.len() / 2);
283 let mut iter = elems.into_iter();
284 while let (Some(member_frame), Some(score_frame)) = (iter.next(), iter.next()) {
285 let member = match member_frame {
286 Frame::Bulk(b) => b,
287 other => {
288 return Err(ClientError::Protocol(format!(
289 "expected bulk member, got {other:?}"
290 )))
291 }
292 };
293 let score = match score_frame {
294 Frame::Bulk(b) => {
295 let s = std::str::from_utf8(&b)
296 .map_err(|_| ClientError::Protocol("score is not valid UTF-8".into()))?;
297 s.parse::<f64>().map_err(|_| {
298 ClientError::Protocol(format!("score is not a valid float: {s:?}"))
299 })?
300 }
301 other => {
302 return Err(ClientError::Protocol(format!(
303 "expected bulk score, got {other:?}"
304 )))
305 }
306 };
307 result.push((member, score));
308 }
309 Ok(result)
310}
311
312fn string_value(frame: Frame) -> Result<String, ClientError> {
316 match frame {
317 Frame::Simple(s) => Ok(s),
318 Frame::Bulk(b) => String::from_utf8(b.to_vec())
319 .map_err(|_| ClientError::Protocol("response is not valid UTF-8".into())),
320 Frame::Error(e) => Err(ClientError::Server(e)),
321 other => Err(ClientError::Protocol(format!(
322 "expected simple or bulk string, got {other:?}"
323 ))),
324 }
325}
326
327fn float_value(frame: Frame) -> Result<f64, ClientError> {
331 match frame {
332 Frame::Bulk(b) => {
333 let s = std::str::from_utf8(&b)
334 .map_err(|_| ClientError::Protocol("float response is not valid UTF-8".into()))?;
335 s.parse::<f64>()
336 .map_err(|_| ClientError::Protocol(format!("not a valid float: {s:?}")))
337 }
338 Frame::Error(e) => Err(ClientError::Server(e)),
339 other => Err(ClientError::Protocol(format!(
340 "expected bulk float, got {other:?}"
341 ))),
342 }
343}
344
345fn scan_page(frame: Frame) -> Result<ScanPage, ClientError> {
349 let elems = match frame {
350 Frame::Array(e) => e,
351 Frame::Error(e) => return Err(ClientError::Server(e)),
352 other => {
353 return Err(ClientError::Protocol(format!(
354 "expected array for SCAN, got {other:?}"
355 )))
356 }
357 };
358
359 if elems.len() != 2 {
360 return Err(ClientError::Protocol(format!(
361 "SCAN response must have 2 elements, got {}",
362 elems.len()
363 )));
364 }
365
366 let mut iter = elems.into_iter();
367 let cursor_frame = iter
369 .next()
370 .ok_or_else(|| ClientError::Protocol("SCAN response missing cursor element".into()))?;
371 let keys_frame = iter
372 .next()
373 .ok_or_else(|| ClientError::Protocol("SCAN response missing keys element".into()))?;
374
375 let cursor = match cursor_frame {
376 Frame::Bulk(b) => {
377 let s = std::str::from_utf8(&b)
378 .map_err(|_| ClientError::Protocol("SCAN cursor is not valid UTF-8".into()))?;
379 s.parse::<u64>()
380 .map_err(|_| ClientError::Protocol(format!("SCAN cursor is not a u64: {s:?}")))?
381 }
382 other => {
383 return Err(ClientError::Protocol(format!(
384 "expected bulk cursor in SCAN, got {other:?}"
385 )))
386 }
387 };
388
389 let keys = bytes_vec(keys_frame)?;
390 Ok(ScanPage { cursor, keys })
391}
392
393fn slowlog_entries(frame: Frame) -> Result<Vec<SlowlogEntry>, ClientError> {
397 let outer = match frame {
398 Frame::Array(e) => e,
399 Frame::Null => return Ok(Vec::new()),
400 Frame::Error(e) => return Err(ClientError::Server(e)),
401 other => {
402 return Err(ClientError::Protocol(format!(
403 "expected array for SLOWLOG, got {other:?}"
404 )))
405 }
406 };
407
408 outer
409 .into_iter()
410 .map(|entry_frame| {
411 let entry = match entry_frame {
412 Frame::Array(e) => e,
413 other => {
414 return Err(ClientError::Protocol(format!(
415 "expected array for slowlog entry, got {other:?}"
416 )))
417 }
418 };
419
420 if entry.len() < 4 {
421 return Err(ClientError::Protocol(format!(
422 "slowlog entry too short: {} elements",
423 entry.len()
424 )));
425 }
426
427 let id = match &entry[0] {
428 Frame::Integer(n) => *n,
429 other => {
430 return Err(ClientError::Protocol(format!(
431 "expected integer id in slowlog, got {other:?}"
432 )))
433 }
434 };
435 let timestamp = match &entry[1] {
436 Frame::Integer(n) => *n,
437 other => {
438 return Err(ClientError::Protocol(format!(
439 "expected integer timestamp in slowlog, got {other:?}"
440 )))
441 }
442 };
443 let duration_us = match &entry[2] {
444 Frame::Integer(n) => *n,
445 other => {
446 return Err(ClientError::Protocol(format!(
447 "expected integer duration in slowlog, got {other:?}"
448 )))
449 }
450 };
451 let command = match entry.into_iter().nth(3).ok_or_else(|| {
452 ClientError::Protocol("slowlog entry missing command field".into())
453 })? {
454 Frame::Array(parts) => parts
455 .into_iter()
456 .map(|p| match p {
457 Frame::Bulk(b) => Ok(b),
458 other => Err(ClientError::Protocol(format!(
459 "expected bulk in slowlog command, got {other:?}"
460 ))),
461 })
462 .collect::<Result<Vec<_>, _>>()?,
463 other => {
464 return Err(ClientError::Protocol(format!(
465 "expected array for slowlog command, got {other:?}"
466 )))
467 }
468 };
469
470 Ok(SlowlogEntry {
471 id,
472 timestamp,
473 duration_us,
474 command,
475 })
476 })
477 .collect()
478}
479
480fn numsub_pairs(frame: Frame) -> Result<Vec<(Bytes, i64)>, ClientError> {
484 let elems = match frame {
485 Frame::Array(e) => e,
486 Frame::Null => return Ok(Vec::new()),
487 Frame::Error(e) => return Err(ClientError::Server(e)),
488 other => {
489 return Err(ClientError::Protocol(format!(
490 "expected array for PUBSUB NUMSUB, got {other:?}"
491 )))
492 }
493 };
494
495 if elems.len() % 2 != 0 {
496 return Err(ClientError::Protocol(format!(
497 "PUBSUB NUMSUB array has odd length ({})",
498 elems.len()
499 )));
500 }
501
502 let mut result = Vec::with_capacity(elems.len() / 2);
503 let mut iter = elems.into_iter();
504 while let (Some(ch_frame), Some(cnt_frame)) = (iter.next(), iter.next()) {
505 let channel = match ch_frame {
506 Frame::Bulk(b) => b,
507 other => {
508 return Err(ClientError::Protocol(format!(
509 "expected bulk channel in PUBSUB NUMSUB, got {other:?}"
510 )))
511 }
512 };
513 let count = match cnt_frame {
514 Frame::Integer(n) => n,
515 other => {
516 return Err(ClientError::Protocol(format!(
517 "expected integer count in PUBSUB NUMSUB, got {other:?}"
518 )))
519 }
520 };
521 result.push((channel, count));
522 }
523 Ok(result)
524}
525
526impl Client {
529 pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>, ClientError> {
533 let frame = self.send_frame(cmd2(b"GET", key.as_bytes())).await?;
534 optional_bytes(frame)
535 }
536
537 pub async fn set(&mut self, key: &str, value: impl AsRef<[u8]>) -> Result<(), ClientError> {
539 let frame = self
540 .send_frame(cmd3(b"SET", key.as_bytes(), value.as_ref()))
541 .await?;
542 ok(frame)
543 }
544
545 pub async fn set_ex(
547 &mut self,
548 key: &str,
549 value: impl AsRef<[u8]>,
550 seconds: u64,
551 ) -> Result<(), ClientError> {
552 let secs = seconds.to_string();
553 let frame = self
554 .send_frame(cmd4(
555 b"SETEX",
556 key.as_bytes(),
557 secs.as_bytes(),
558 value.as_ref(),
559 ))
560 .await?;
561 ok(frame)
562 }
563
564 pub async fn del(&mut self, keys: &[&str]) -> Result<i64, ClientError> {
566 let mut parts = Vec::with_capacity(1 + keys.len());
567 parts.push(Frame::Bulk(Bytes::from_static(b"DEL")));
568 for k in keys {
569 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
570 }
571 let frame = self.send_frame(Frame::Array(parts)).await?;
572 integer(frame)
573 }
574
575 pub async fn exists(&mut self, keys: &[&str]) -> Result<i64, ClientError> {
577 let mut parts = Vec::with_capacity(1 + keys.len());
578 parts.push(Frame::Bulk(Bytes::from_static(b"EXISTS")));
579 for k in keys {
580 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
581 }
582 let frame = self.send_frame(Frame::Array(parts)).await?;
583 integer(frame)
584 }
585
586 pub async fn expire(&mut self, key: &str, seconds: u64) -> Result<bool, ClientError> {
589 let secs = seconds.to_string();
590 let frame = self
591 .send_frame(cmd3(b"EXPIRE", key.as_bytes(), secs.as_bytes()))
592 .await?;
593 bool_flag(frame)
594 }
595
596 pub async fn persist(&mut self, key: &str) -> Result<bool, ClientError> {
599 let frame = self.send_frame(cmd2(b"PERSIST", key.as_bytes())).await?;
600 bool_flag(frame)
601 }
602
603 pub async fn ttl(&mut self, key: &str) -> Result<i64, ClientError> {
606 let frame = self.send_frame(cmd2(b"TTL", key.as_bytes())).await?;
607 integer(frame)
608 }
609
610 pub async fn pttl(&mut self, key: &str) -> Result<i64, ClientError> {
612 let frame = self.send_frame(cmd2(b"PTTL", key.as_bytes())).await?;
613 integer(frame)
614 }
615
616 pub async fn incr(&mut self, key: &str) -> Result<i64, ClientError> {
618 let frame = self.send_frame(cmd2(b"INCR", key.as_bytes())).await?;
619 integer(frame)
620 }
621
622 pub async fn decr(&mut self, key: &str) -> Result<i64, ClientError> {
624 let frame = self.send_frame(cmd2(b"DECR", key.as_bytes())).await?;
625 integer(frame)
626 }
627
628 pub async fn incrby(&mut self, key: &str, delta: i64) -> Result<i64, ClientError> {
630 let d = delta.to_string();
631 let frame = self
632 .send_frame(cmd3(b"INCRBY", key.as_bytes(), d.as_bytes()))
633 .await?;
634 integer(frame)
635 }
636
637 pub async fn decrby(&mut self, key: &str, delta: i64) -> Result<i64, ClientError> {
639 let d = delta.to_string();
640 let frame = self
641 .send_frame(cmd3(b"DECRBY", key.as_bytes(), d.as_bytes()))
642 .await?;
643 integer(frame)
644 }
645
646 pub async fn append(&mut self, key: &str, value: impl AsRef<[u8]>) -> Result<i64, ClientError> {
648 let frame = self
649 .send_frame(cmd3(b"APPEND", key.as_bytes(), value.as_ref()))
650 .await?;
651 integer(frame)
652 }
653
654 pub async fn mget(&mut self, keys: &[&str]) -> Result<Vec<Option<Bytes>>, ClientError> {
656 let mut parts = Vec::with_capacity(1 + keys.len());
657 parts.push(Frame::Bulk(Bytes::from_static(b"MGET")));
658 for k in keys {
659 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
660 }
661 let frame = self.send_frame(Frame::Array(parts)).await?;
662 optional_bytes_vec(frame)
663 }
664
665 pub async fn mset<V: AsRef<[u8]>>(&mut self, pairs: &[(&str, V)]) -> Result<(), ClientError> {
667 let mut parts = Vec::with_capacity(1 + pairs.len() * 2);
668 parts.push(Frame::Bulk(Bytes::from_static(b"MSET")));
669 for (k, v) in pairs {
670 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
671 parts.push(Frame::Bulk(Bytes::copy_from_slice(v.as_ref())));
672 }
673 let frame = self.send_frame(Frame::Array(parts)).await?;
674 ok(frame)
675 }
676
677 pub async fn getdel(&mut self, key: &str) -> Result<Option<Bytes>, ClientError> {
680 let frame = self.send_frame(cmd2(b"GETDEL", key.as_bytes())).await?;
681 optional_bytes(frame)
682 }
683
684 pub async fn lpush<V: AsRef<[u8]>>(
688 &mut self,
689 key: &str,
690 values: &[V],
691 ) -> Result<i64, ClientError> {
692 let frame = self
693 .send_frame(cmd_key_values(b"LPUSH", key, values))
694 .await?;
695 integer(frame)
696 }
697
698 pub async fn rpush<V: AsRef<[u8]>>(
700 &mut self,
701 key: &str,
702 values: &[V],
703 ) -> Result<i64, ClientError> {
704 let frame = self
705 .send_frame(cmd_key_values(b"RPUSH", key, values))
706 .await?;
707 integer(frame)
708 }
709
710 pub async fn lpop(&mut self, key: &str) -> Result<Option<Bytes>, ClientError> {
712 let frame = self.send_frame(cmd2(b"LPOP", key.as_bytes())).await?;
713 optional_bytes(frame)
714 }
715
716 pub async fn rpop(&mut self, key: &str) -> Result<Option<Bytes>, ClientError> {
718 let frame = self.send_frame(cmd2(b"RPOP", key.as_bytes())).await?;
719 optional_bytes(frame)
720 }
721
722 pub async fn lrange(
724 &mut self,
725 key: &str,
726 start: i64,
727 stop: i64,
728 ) -> Result<Vec<Bytes>, ClientError> {
729 let s = start.to_string();
730 let e = stop.to_string();
731 let frame = self
732 .send_frame(cmd4(b"LRANGE", key.as_bytes(), s.as_bytes(), e.as_bytes()))
733 .await?;
734 bytes_vec(frame)
735 }
736
737 pub async fn llen(&mut self, key: &str) -> Result<i64, ClientError> {
739 let frame = self.send_frame(cmd2(b"LLEN", key.as_bytes())).await?;
740 integer(frame)
741 }
742
743 pub async fn hset<V: AsRef<[u8]>>(
748 &mut self,
749 key: &str,
750 pairs: &[(&str, V)],
751 ) -> Result<i64, ClientError> {
752 let mut parts = Vec::with_capacity(2 + pairs.len() * 2);
753 parts.push(Frame::Bulk(Bytes::from_static(b"HSET")));
754 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
755 for (field, val) in pairs {
756 parts.push(Frame::Bulk(Bytes::copy_from_slice(field.as_bytes())));
757 parts.push(Frame::Bulk(Bytes::copy_from_slice(val.as_ref())));
758 }
759 let frame = self.send_frame(Frame::Array(parts)).await?;
760 integer(frame)
761 }
762
763 pub async fn hget(&mut self, key: &str, field: &str) -> Result<Option<Bytes>, ClientError> {
766 let frame = self
767 .send_frame(cmd3(b"HGET", key.as_bytes(), field.as_bytes()))
768 .await?;
769 optional_bytes(frame)
770 }
771
772 pub async fn hgetall(&mut self, key: &str) -> Result<Vec<(Bytes, Bytes)>, ClientError> {
774 let frame = self.send_frame(cmd2(b"HGETALL", key.as_bytes())).await?;
775 pairs(frame)
776 }
777
778 pub async fn hdel(&mut self, key: &str, fields: &[&str]) -> Result<i64, ClientError> {
780 let frame = self
781 .send_frame(cmd_key_and_keys(b"HDEL", key, fields))
782 .await?;
783 integer(frame)
784 }
785
786 pub async fn hexists(&mut self, key: &str, field: &str) -> Result<bool, ClientError> {
788 let frame = self
789 .send_frame(cmd3(b"HEXISTS", key.as_bytes(), field.as_bytes()))
790 .await?;
791 bool_flag(frame)
792 }
793
794 pub async fn hlen(&mut self, key: &str) -> Result<i64, ClientError> {
796 let frame = self.send_frame(cmd2(b"HLEN", key.as_bytes())).await?;
797 integer(frame)
798 }
799
800 pub async fn hincrby(
803 &mut self,
804 key: &str,
805 field: &str,
806 delta: i64,
807 ) -> Result<i64, ClientError> {
808 let d = delta.to_string();
809 let frame = self
810 .send_frame(cmd4(
811 b"HINCRBY",
812 key.as_bytes(),
813 field.as_bytes(),
814 d.as_bytes(),
815 ))
816 .await?;
817 integer(frame)
818 }
819
820 pub async fn hkeys(&mut self, key: &str) -> Result<Vec<Bytes>, ClientError> {
822 let frame = self.send_frame(cmd2(b"HKEYS", key.as_bytes())).await?;
823 bytes_vec(frame)
824 }
825
826 pub async fn hvals(&mut self, key: &str) -> Result<Vec<Bytes>, ClientError> {
828 let frame = self.send_frame(cmd2(b"HVALS", key.as_bytes())).await?;
829 bytes_vec(frame)
830 }
831
832 pub async fn sadd(&mut self, key: &str, members: &[&str]) -> Result<i64, ClientError> {
836 let frame = self
837 .send_frame(cmd_key_and_keys(b"SADD", key, members))
838 .await?;
839 integer(frame)
840 }
841
842 pub async fn srem(&mut self, key: &str, members: &[&str]) -> Result<i64, ClientError> {
844 let frame = self
845 .send_frame(cmd_key_and_keys(b"SREM", key, members))
846 .await?;
847 integer(frame)
848 }
849
850 pub async fn smembers(&mut self, key: &str) -> Result<Vec<Bytes>, ClientError> {
852 let frame = self.send_frame(cmd2(b"SMEMBERS", key.as_bytes())).await?;
853 bytes_vec(frame)
854 }
855
856 pub async fn sismember(&mut self, key: &str, member: &str) -> Result<bool, ClientError> {
858 let frame = self
859 .send_frame(cmd3(b"SISMEMBER", key.as_bytes(), member.as_bytes()))
860 .await?;
861 bool_flag(frame)
862 }
863
864 pub async fn scard(&mut self, key: &str) -> Result<i64, ClientError> {
866 let frame = self.send_frame(cmd2(b"SCARD", key.as_bytes())).await?;
867 integer(frame)
868 }
869
870 pub async fn zadd(&mut self, key: &str, members: &[(f64, &str)]) -> Result<i64, ClientError> {
875 let mut parts = Vec::with_capacity(2 + members.len() * 2);
876 parts.push(Frame::Bulk(Bytes::from_static(b"ZADD")));
877 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
878 for (score, member) in members {
879 let s = score.to_string();
880 parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
881 parts.push(Frame::Bulk(Bytes::copy_from_slice(member.as_bytes())));
882 }
883 let frame = self.send_frame(Frame::Array(parts)).await?;
884 integer(frame)
885 }
886
887 pub async fn zrange(
890 &mut self,
891 key: &str,
892 start: i64,
893 stop: i64,
894 ) -> Result<Vec<Bytes>, ClientError> {
895 let s = start.to_string();
896 let e = stop.to_string();
897 let frame = self
898 .send_frame(cmd4(b"ZRANGE", key.as_bytes(), s.as_bytes(), e.as_bytes()))
899 .await?;
900 bytes_vec(frame)
901 }
902
903 pub async fn zrange_withscores(
905 &mut self,
906 key: &str,
907 start: i64,
908 stop: i64,
909 ) -> Result<Vec<(Bytes, f64)>, ClientError> {
910 let s = start.to_string();
911 let e = stop.to_string();
912 let frame = self
913 .send_frame(Frame::Array(vec![
914 Frame::Bulk(Bytes::from_static(b"ZRANGE")),
915 Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())),
916 Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())),
917 Frame::Bulk(Bytes::copy_from_slice(e.as_bytes())),
918 Frame::Bulk(Bytes::from_static(b"WITHSCORES")),
919 ]))
920 .await?;
921 scored_members(frame)
922 }
923
924 pub async fn zscore(&mut self, key: &str, member: &str) -> Result<Option<f64>, ClientError> {
926 let frame = self
927 .send_frame(cmd3(b"ZSCORE", key.as_bytes(), member.as_bytes()))
928 .await?;
929 optional_score(frame)
930 }
931
932 pub async fn zrank(&mut self, key: &str, member: &str) -> Result<Option<i64>, ClientError> {
935 let frame = self
936 .send_frame(cmd3(b"ZRANK", key.as_bytes(), member.as_bytes()))
937 .await?;
938 optional_integer(frame)
939 }
940
941 pub async fn zrem(&mut self, key: &str, members: &[&str]) -> Result<i64, ClientError> {
944 let frame = self
945 .send_frame(cmd_key_and_keys(b"ZREM", key, members))
946 .await?;
947 integer(frame)
948 }
949
950 pub async fn zcard(&mut self, key: &str) -> Result<i64, ClientError> {
952 let frame = self.send_frame(cmd2(b"ZCARD", key.as_bytes())).await?;
953 integer(frame)
954 }
955
956 pub async fn ping(&mut self) -> Result<(), ClientError> {
960 let frame = self
961 .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(b"PING"))]))
962 .await?;
963 match frame {
964 Frame::Simple(s) if s == "PONG" => Ok(()),
965 Frame::Error(e) => Err(ClientError::Server(e)),
966 other => Err(ClientError::Protocol(format!(
967 "expected PONG, got {other:?}"
968 ))),
969 }
970 }
971
972 pub async fn dbsize(&mut self) -> Result<i64, ClientError> {
974 let frame = self
975 .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(
976 b"DBSIZE",
977 ))]))
978 .await?;
979 integer(frame)
980 }
981
982 pub async fn flushdb(&mut self) -> Result<(), ClientError> {
984 let frame = self
985 .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(
986 b"FLUSHDB",
987 ))]))
988 .await?;
989 ok(frame)
990 }
991
992 pub async fn strlen(&mut self, key: &str) -> Result<i64, ClientError> {
996 let frame = self.send_frame(cmd2(b"STRLEN", key.as_bytes())).await?;
997 integer(frame)
998 }
999
1000 pub async fn incr_by_float(&mut self, key: &str, delta: f64) -> Result<f64, ClientError> {
1002 let d = delta.to_string();
1003 let frame = self
1004 .send_frame(cmd3(b"INCRBYFLOAT", key.as_bytes(), d.as_bytes()))
1005 .await?;
1006 float_value(frame)
1007 }
1008
1009 pub async fn key_type(&mut self, key: &str) -> Result<String, ClientError> {
1014 let frame = self.send_frame(cmd2(b"TYPE", key.as_bytes())).await?;
1015 string_value(frame)
1016 }
1017
1018 pub async fn keys(&mut self, pattern: &str) -> Result<Vec<Bytes>, ClientError> {
1023 let frame = self.send_frame(cmd2(b"KEYS", pattern.as_bytes())).await?;
1024 bytes_vec(frame)
1025 }
1026
1027 pub async fn rename(&mut self, key: &str, newkey: &str) -> Result<(), ClientError> {
1029 let frame = self
1030 .send_frame(cmd3(b"RENAME", key.as_bytes(), newkey.as_bytes()))
1031 .await?;
1032 ok(frame)
1033 }
1034
1035 pub async fn scan(
1042 &mut self,
1043 cursor: u64,
1044 count: Option<u32>,
1045 pattern: Option<&str>,
1046 ) -> Result<ScanPage, ClientError> {
1047 let cur = cursor.to_string();
1048 let mut parts = Vec::with_capacity(6);
1049 parts.push(Frame::Bulk(Bytes::from_static(b"SCAN")));
1050 parts.push(Frame::Bulk(Bytes::copy_from_slice(cur.as_bytes())));
1051 if let Some(pat) = pattern {
1052 parts.push(Frame::Bulk(Bytes::from_static(b"MATCH")));
1053 parts.push(Frame::Bulk(Bytes::copy_from_slice(pat.as_bytes())));
1054 }
1055 if let Some(n) = count {
1056 let ns = n.to_string();
1057 parts.push(Frame::Bulk(Bytes::from_static(b"COUNT")));
1058 parts.push(Frame::Bulk(Bytes::copy_from_slice(ns.as_bytes())));
1059 }
1060 let frame = self.send_frame(Frame::Array(parts)).await?;
1061 scan_page(frame)
1062 }
1063
1064 pub async fn pexpire(&mut self, key: &str, millis: u64) -> Result<bool, ClientError> {
1067 let ms = millis.to_string();
1068 let frame = self
1069 .send_frame(cmd3(b"PEXPIRE", key.as_bytes(), ms.as_bytes()))
1070 .await?;
1071 bool_flag(frame)
1072 }
1073
1074 pub async fn hmget(
1079 &mut self,
1080 key: &str,
1081 fields: &[&str],
1082 ) -> Result<Vec<Option<Bytes>>, ClientError> {
1083 let frame = self
1084 .send_frame(cmd_key_and_keys(b"HMGET", key, fields))
1085 .await?;
1086 optional_bytes_vec(frame)
1087 }
1088
1089 pub async fn echo(&mut self, message: &str) -> Result<Bytes, ClientError> {
1093 let frame = self.send_frame(cmd2(b"ECHO", message.as_bytes())).await?;
1094 match frame {
1095 Frame::Bulk(b) => Ok(b),
1096 Frame::Error(e) => Err(ClientError::Server(e)),
1097 other => Err(ClientError::Protocol(format!(
1098 "expected bulk for ECHO, got {other:?}"
1099 ))),
1100 }
1101 }
1102
1103 pub async fn unlink(&mut self, keys: &[&str]) -> Result<i64, ClientError> {
1106 let frame = self.send_frame(cmd_keys_only(b"UNLINK", keys)).await?;
1107 integer(frame)
1108 }
1109
1110 pub async fn info(&mut self, section: Option<&str>) -> Result<String, ClientError> {
1113 let frame = match section {
1114 Some(s) => self.send_frame(cmd2(b"INFO", s.as_bytes())).await?,
1115 None => {
1116 self.send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(b"INFO"))]))
1117 .await?
1118 }
1119 };
1120 match frame {
1121 Frame::Bulk(b) => String::from_utf8(b.to_vec())
1122 .map_err(|_| ClientError::Protocol("INFO response is not valid UTF-8".into())),
1123 Frame::Error(e) => Err(ClientError::Server(e)),
1124 other => Err(ClientError::Protocol(format!(
1125 "expected bulk for INFO, got {other:?}"
1126 ))),
1127 }
1128 }
1129
1130 pub async fn bgsave(&mut self) -> Result<String, ClientError> {
1133 let frame = self
1134 .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(
1135 b"BGSAVE",
1136 ))]))
1137 .await?;
1138 string_value(frame)
1139 }
1140
1141 pub async fn bgrewriteaof(&mut self) -> Result<String, ClientError> {
1144 let frame = self
1145 .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(
1146 b"BGREWRITEAOF",
1147 ))]))
1148 .await?;
1149 string_value(frame)
1150 }
1151
1152 pub async fn slowlog_get(
1157 &mut self,
1158 count: Option<u32>,
1159 ) -> Result<Vec<SlowlogEntry>, ClientError> {
1160 let frame = match count {
1161 Some(n) => {
1162 let ns = n.to_string();
1163 self.send_frame(Frame::Array(vec![
1164 Frame::Bulk(Bytes::from_static(b"SLOWLOG")),
1165 Frame::Bulk(Bytes::from_static(b"GET")),
1166 Frame::Bulk(Bytes::copy_from_slice(ns.as_bytes())),
1167 ]))
1168 .await?
1169 }
1170 None => {
1171 self.send_frame(Frame::Array(vec![
1172 Frame::Bulk(Bytes::from_static(b"SLOWLOG")),
1173 Frame::Bulk(Bytes::from_static(b"GET")),
1174 ]))
1175 .await?
1176 }
1177 };
1178 slowlog_entries(frame)
1179 }
1180
1181 pub async fn slowlog_len(&mut self) -> Result<i64, ClientError> {
1183 let frame = self
1184 .send_frame(Frame::Array(vec![
1185 Frame::Bulk(Bytes::from_static(b"SLOWLOG")),
1186 Frame::Bulk(Bytes::from_static(b"LEN")),
1187 ]))
1188 .await?;
1189 integer(frame)
1190 }
1191
1192 pub async fn slowlog_reset(&mut self) -> Result<(), ClientError> {
1194 let frame = self
1195 .send_frame(Frame::Array(vec![
1196 Frame::Bulk(Bytes::from_static(b"SLOWLOG")),
1197 Frame::Bulk(Bytes::from_static(b"RESET")),
1198 ]))
1199 .await?;
1200 ok(frame)
1201 }
1202
1203 pub async fn publish(
1208 &mut self,
1209 channel: &str,
1210 message: impl AsRef<[u8]>,
1211 ) -> Result<i64, ClientError> {
1212 let frame = self
1213 .send_frame(cmd3(b"PUBLISH", channel.as_bytes(), message.as_ref()))
1214 .await?;
1215 integer(frame)
1216 }
1217
1218 pub async fn pubsub_channels(
1221 &mut self,
1222 pattern: Option<&str>,
1223 ) -> Result<Vec<Bytes>, ClientError> {
1224 let frame = match pattern {
1225 Some(p) => {
1226 self.send_frame(Frame::Array(vec![
1227 Frame::Bulk(Bytes::from_static(b"PUBSUB")),
1228 Frame::Bulk(Bytes::from_static(b"CHANNELS")),
1229 Frame::Bulk(Bytes::copy_from_slice(p.as_bytes())),
1230 ]))
1231 .await?
1232 }
1233 None => {
1234 self.send_frame(Frame::Array(vec![
1235 Frame::Bulk(Bytes::from_static(b"PUBSUB")),
1236 Frame::Bulk(Bytes::from_static(b"CHANNELS")),
1237 ]))
1238 .await?
1239 }
1240 };
1241 bytes_vec(frame)
1242 }
1243
1244 pub async fn pubsub_numsub(
1247 &mut self,
1248 channels: &[&str],
1249 ) -> Result<Vec<(Bytes, i64)>, ClientError> {
1250 let mut parts = Vec::with_capacity(2 + channels.len());
1251 parts.push(Frame::Bulk(Bytes::from_static(b"PUBSUB")));
1252 parts.push(Frame::Bulk(Bytes::from_static(b"NUMSUB")));
1253 for ch in channels {
1254 parts.push(Frame::Bulk(Bytes::copy_from_slice(ch.as_bytes())));
1255 }
1256 let frame = self.send_frame(Frame::Array(parts)).await?;
1257 numsub_pairs(frame)
1258 }
1259
1260 pub async fn pubsub_numpat(&mut self) -> Result<i64, ClientError> {
1262 let frame = self
1263 .send_frame(Frame::Array(vec![
1264 Frame::Bulk(Bytes::from_static(b"PUBSUB")),
1265 Frame::Bulk(Bytes::from_static(b"NUMPAT")),
1266 ]))
1267 .await?;
1268 integer(frame)
1269 }
1270
1271 pub async fn subscribe(mut self, channels: &[&str]) -> Result<Subscriber, ClientError> {
1278 let mut parts = Vec::with_capacity(1 + channels.len());
1279 parts.push(Frame::Bulk(Bytes::from_static(b"SUBSCRIBE")));
1280 for ch in channels {
1281 parts.push(Frame::Bulk(Bytes::copy_from_slice(ch.as_bytes())));
1282 }
1283 self.write_frame(Frame::Array(parts)).await?;
1284 for _ in 0..channels.len() {
1286 self.read_response().await?;
1287 }
1288 Ok(Subscriber::new(self))
1289 }
1290
1291 pub async fn psubscribe(mut self, patterns: &[&str]) -> Result<Subscriber, ClientError> {
1294 let mut parts = Vec::with_capacity(1 + patterns.len());
1295 parts.push(Frame::Bulk(Bytes::from_static(b"PSUBSCRIBE")));
1296 for p in patterns {
1297 parts.push(Frame::Bulk(Bytes::copy_from_slice(p.as_bytes())));
1298 }
1299 self.write_frame(Frame::Array(parts)).await?;
1300 for _ in 0..patterns.len() {
1302 self.read_response().await?;
1303 }
1304 Ok(Subscriber::new(self))
1305 }
1306
1307 pub async fn expiretime(&mut self, key: &str) -> Result<i64, ClientError> {
1311 let frame = self.send_frame(cmd2(b"EXPIRETIME", key.as_bytes())).await?;
1312 integer(frame)
1313 }
1314
1315 pub async fn pexpiretime(&mut self, key: &str) -> Result<i64, ClientError> {
1317 let frame = self
1318 .send_frame(cmd2(b"PEXPIRETIME", key.as_bytes()))
1319 .await?;
1320 integer(frame)
1321 }
1322
1323 pub async fn expireat(&mut self, key: &str, timestamp: u64) -> Result<bool, ClientError> {
1325 let ts = timestamp.to_string();
1326 let frame = self
1327 .send_frame(cmd3(b"EXPIREAT", key.as_bytes(), ts.as_bytes()))
1328 .await?;
1329 bool_flag(frame)
1330 }
1331
1332 pub async fn pexpireat(&mut self, key: &str, timestamp_ms: u64) -> Result<bool, ClientError> {
1334 let ts = timestamp_ms.to_string();
1335 let frame = self
1336 .send_frame(cmd3(b"PEXPIREAT", key.as_bytes(), ts.as_bytes()))
1337 .await?;
1338 bool_flag(frame)
1339 }
1340
1341 pub async fn getset(
1345 &mut self,
1346 key: &str,
1347 value: impl AsRef<[u8]>,
1348 ) -> Result<Option<Bytes>, ClientError> {
1349 let frame = self
1350 .send_frame(cmd3(b"GETSET", key.as_bytes(), value.as_ref()))
1351 .await?;
1352 optional_bytes(frame)
1353 }
1354
1355 pub async fn msetnx<V: AsRef<[u8]>>(
1358 &mut self,
1359 pairs: &[(&str, V)],
1360 ) -> Result<bool, ClientError> {
1361 let mut parts = Vec::with_capacity(1 + pairs.len() * 2);
1362 parts.push(Frame::Bulk(Bytes::from_static(b"MSETNX")));
1363 for (k, v) in pairs {
1364 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1365 parts.push(Frame::Bulk(Bytes::copy_from_slice(v.as_ref())));
1366 }
1367 let frame = self.send_frame(Frame::Array(parts)).await?;
1368 bool_flag(frame)
1369 }
1370
1371 pub async fn getbit(&mut self, key: &str, offset: u64) -> Result<i64, ClientError> {
1375 let off = offset.to_string();
1376 let frame = self
1377 .send_frame(cmd3(b"GETBIT", key.as_bytes(), off.as_bytes()))
1378 .await?;
1379 integer(frame)
1380 }
1381
1382 pub async fn setbit(&mut self, key: &str, offset: u64, value: u8) -> Result<i64, ClientError> {
1384 let off = offset.to_string();
1385 let val = value.to_string();
1386 let frame = self
1387 .send_frame(cmd4(
1388 b"SETBIT",
1389 key.as_bytes(),
1390 off.as_bytes(),
1391 val.as_bytes(),
1392 ))
1393 .await?;
1394 integer(frame)
1395 }
1396
1397 pub async fn bitcount(
1401 &mut self,
1402 key: &str,
1403 range: Option<(i64, i64, &str)>,
1404 ) -> Result<i64, ClientError> {
1405 let frame = if let Some((start, end, unit)) = range {
1406 let s = start.to_string();
1407 let e = end.to_string();
1408 let mut parts = Vec::with_capacity(5);
1409 parts.push(Frame::Bulk(Bytes::from_static(b"BITCOUNT")));
1410 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
1411 parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
1412 parts.push(Frame::Bulk(Bytes::copy_from_slice(e.as_bytes())));
1413 parts.push(Frame::Bulk(Bytes::copy_from_slice(unit.as_bytes())));
1414 self.send_frame(Frame::Array(parts)).await?
1415 } else {
1416 self.send_frame(cmd2(b"BITCOUNT", key.as_bytes())).await?
1417 };
1418 integer(frame)
1419 }
1420
1421 pub async fn bitpos(
1425 &mut self,
1426 key: &str,
1427 bit: u8,
1428 range: Option<(i64, i64, &str)>,
1429 ) -> Result<i64, ClientError> {
1430 let b = bit.to_string();
1431 let frame = if let Some((start, end, unit)) = range {
1432 let s = start.to_string();
1433 let e = end.to_string();
1434 let mut parts = Vec::with_capacity(6);
1435 parts.push(Frame::Bulk(Bytes::from_static(b"BITPOS")));
1436 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
1437 parts.push(Frame::Bulk(Bytes::copy_from_slice(b.as_bytes())));
1438 parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
1439 parts.push(Frame::Bulk(Bytes::copy_from_slice(e.as_bytes())));
1440 parts.push(Frame::Bulk(Bytes::copy_from_slice(unit.as_bytes())));
1441 self.send_frame(Frame::Array(parts)).await?
1442 } else {
1443 self.send_frame(cmd3(b"BITPOS", key.as_bytes(), b.as_bytes()))
1444 .await?
1445 };
1446 integer(frame)
1447 }
1448
1449 pub async fn bitop(&mut self, op: &str, dest: &str, keys: &[&str]) -> Result<i64, ClientError> {
1454 let mut parts = Vec::with_capacity(3 + keys.len());
1455 parts.push(Frame::Bulk(Bytes::from_static(b"BITOP")));
1456 parts.push(Frame::Bulk(Bytes::copy_from_slice(op.as_bytes())));
1457 parts.push(Frame::Bulk(Bytes::copy_from_slice(dest.as_bytes())));
1458 for k in keys {
1459 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1460 }
1461 let frame = self.send_frame(Frame::Array(parts)).await?;
1462 integer(frame)
1463 }
1464
1465 pub async fn smove(&mut self, src: &str, dst: &str, member: &str) -> Result<bool, ClientError> {
1469 let frame = self
1470 .send_frame(cmd4(
1471 b"SMOVE",
1472 src.as_bytes(),
1473 dst.as_bytes(),
1474 member.as_bytes(),
1475 ))
1476 .await?;
1477 bool_flag(frame)
1478 }
1479
1480 pub async fn sintercard(&mut self, keys: &[&str], limit: usize) -> Result<i64, ClientError> {
1482 let n = keys.len().to_string();
1483 let lim = limit.to_string();
1484 let mut parts = Vec::with_capacity(3 + keys.len() + 2);
1485 parts.push(Frame::Bulk(Bytes::from_static(b"SINTERCARD")));
1486 parts.push(Frame::Bulk(Bytes::copy_from_slice(n.as_bytes())));
1487 for k in keys {
1488 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1489 }
1490 if limit > 0 {
1491 parts.push(Frame::Bulk(Bytes::from_static(b"LIMIT")));
1492 parts.push(Frame::Bulk(Bytes::copy_from_slice(lim.as_bytes())));
1493 }
1494 let frame = self.send_frame(Frame::Array(parts)).await?;
1495 integer(frame)
1496 }
1497
1498 pub async fn lmpop(
1505 &mut self,
1506 keys: &[&str],
1507 left: bool,
1508 count: usize,
1509 ) -> Result<Option<(String, Vec<Bytes>)>, ClientError> {
1510 let n = keys.len().to_string();
1511 let dir = if left { b"LEFT" as &[u8] } else { b"RIGHT" };
1512 let cnt = count.to_string();
1513 let mut parts = Vec::with_capacity(4 + keys.len() + 2);
1514 parts.push(Frame::Bulk(Bytes::from_static(b"LMPOP")));
1515 parts.push(Frame::Bulk(Bytes::copy_from_slice(n.as_bytes())));
1516 for k in keys {
1517 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1518 }
1519 parts.push(Frame::Bulk(Bytes::copy_from_slice(dir)));
1520 if count > 0 {
1521 parts.push(Frame::Bulk(Bytes::from_static(b"COUNT")));
1522 parts.push(Frame::Bulk(Bytes::copy_from_slice(cnt.as_bytes())));
1523 }
1524 let frame = self.send_frame(Frame::Array(parts)).await?;
1525 match frame {
1526 Frame::Null => Ok(None),
1527 Frame::Array(mut elems) if elems.len() == 2 => {
1528 let key = match elems.remove(0) {
1529 Frame::Bulk(b) => String::from_utf8(b.to_vec())
1530 .map_err(|_| ClientError::Protocol("key is not valid UTF-8".into()))?,
1531 other => {
1532 return Err(ClientError::Protocol(format!(
1533 "expected bulk key in LMPOP response, got {other:?}"
1534 )))
1535 }
1536 };
1537 let elements = bytes_vec(elems.remove(0))?;
1538 Ok(Some((key, elements)))
1539 }
1540 Frame::Error(e) => Err(ClientError::Server(e)),
1541 other => Err(ClientError::Protocol(format!(
1542 "unexpected LMPOP response: {other:?}"
1543 ))),
1544 }
1545 }
1546
1547 pub async fn hrandfield(
1554 &mut self,
1555 key: &str,
1556 count: Option<i64>,
1557 ) -> Result<Vec<Bytes>, ClientError> {
1558 let frame = if let Some(n) = count {
1559 let s = n.to_string();
1560 self.send_frame(cmd3(b"HRANDFIELD", key.as_bytes(), s.as_bytes()))
1561 .await?
1562 } else {
1563 self.send_frame(cmd2(b"HRANDFIELD", key.as_bytes())).await?
1564 };
1565 match frame {
1566 Frame::Bulk(b) => Ok(vec![b]),
1567 Frame::Null => Ok(vec![]),
1568 Frame::Array(_) => bytes_vec(frame),
1569 Frame::Error(e) => Err(ClientError::Server(e)),
1570 other => Err(ClientError::Protocol(format!(
1571 "unexpected HRANDFIELD response: {other:?}"
1572 ))),
1573 }
1574 }
1575
1576 pub async fn hrandfield_withvalues(
1580 &mut self,
1581 key: &str,
1582 count: i64,
1583 ) -> Result<Vec<(Bytes, Bytes)>, ClientError> {
1584 let s = count.to_string();
1585 let frame = self
1586 .send_frame(Frame::Array(vec![
1587 Frame::Bulk(Bytes::from_static(b"HRANDFIELD")),
1588 Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())),
1589 Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())),
1590 Frame::Bulk(Bytes::from_static(b"WITHVALUES")),
1591 ]))
1592 .await?;
1593 pairs(frame)
1594 }
1595
1596 pub async fn zmpop(
1603 &mut self,
1604 keys: &[&str],
1605 min: bool,
1606 count: usize,
1607 ) -> Result<Option<(String, Vec<(Bytes, f64)>)>, ClientError> {
1608 let n = keys.len().to_string();
1609 let dir = if min { b"MIN" as &[u8] } else { b"MAX" };
1610 let cnt = count.to_string();
1611 let mut parts = Vec::with_capacity(4 + keys.len() + 2);
1612 parts.push(Frame::Bulk(Bytes::from_static(b"ZMPOP")));
1613 parts.push(Frame::Bulk(Bytes::copy_from_slice(n.as_bytes())));
1614 for k in keys {
1615 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1616 }
1617 parts.push(Frame::Bulk(Bytes::copy_from_slice(dir)));
1618 if count > 0 {
1619 parts.push(Frame::Bulk(Bytes::from_static(b"COUNT")));
1620 parts.push(Frame::Bulk(Bytes::copy_from_slice(cnt.as_bytes())));
1621 }
1622 let frame = self.send_frame(Frame::Array(parts)).await?;
1623 match frame {
1624 Frame::Null => Ok(None),
1625 Frame::Array(mut elems) if elems.len() == 2 => {
1626 let key = match elems.remove(0) {
1627 Frame::Bulk(b) => String::from_utf8(b.to_vec())
1628 .map_err(|_| ClientError::Protocol("key is not valid UTF-8".into()))?,
1629 other => {
1630 return Err(ClientError::Protocol(format!(
1631 "expected bulk key in ZMPOP response, got {other:?}"
1632 )))
1633 }
1634 };
1635 let members = scored_members(elems.remove(0))?;
1636 Ok(Some((key, members)))
1637 }
1638 Frame::Error(e) => Err(ClientError::Server(e)),
1639 other => Err(ClientError::Protocol(format!(
1640 "unexpected ZMPOP response: {other:?}"
1641 ))),
1642 }
1643 }
1644
1645 pub async fn zrandmember(
1650 &mut self,
1651 key: &str,
1652 count: Option<i64>,
1653 ) -> Result<Vec<Bytes>, ClientError> {
1654 let frame = if let Some(n) = count {
1655 let s = n.to_string();
1656 self.send_frame(cmd3(b"ZRANDMEMBER", key.as_bytes(), s.as_bytes()))
1657 .await?
1658 } else {
1659 self.send_frame(cmd2(b"ZRANDMEMBER", key.as_bytes()))
1660 .await?
1661 };
1662 match frame {
1663 Frame::Bulk(b) => Ok(vec![b]),
1664 Frame::Null => Ok(vec![]),
1665 Frame::Array(_) => bytes_vec(frame),
1666 Frame::Error(e) => Err(ClientError::Server(e)),
1667 other => Err(ClientError::Protocol(format!(
1668 "unexpected ZRANDMEMBER response: {other:?}"
1669 ))),
1670 }
1671 }
1672
1673 pub async fn zrandmember_withscores(
1677 &mut self,
1678 key: &str,
1679 count: i64,
1680 ) -> Result<Vec<(Bytes, f64)>, ClientError> {
1681 let s = count.to_string();
1682 let frame = self
1683 .send_frame(Frame::Array(vec![
1684 Frame::Bulk(Bytes::from_static(b"ZRANDMEMBER")),
1685 Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())),
1686 Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())),
1687 Frame::Bulk(Bytes::from_static(b"WITHSCORES")),
1688 ]))
1689 .await?;
1690 scored_members(frame)
1691 }
1692
1693 pub async fn execute_pipeline(
1705 &mut self,
1706 pipeline: Pipeline,
1707 ) -> Result<Vec<Frame>, ClientError> {
1708 if pipeline.is_empty() {
1709 return Ok(Vec::new());
1710 }
1711 self.send_batch(&pipeline.cmds).await
1712 }
1713}
1714
1715fn cmd_key_and_keys(cmd: &'static [u8], key: &str, rest: &[&str]) -> Frame {
1718 let mut parts = Vec::with_capacity(2 + rest.len());
1719 parts.push(Frame::Bulk(Bytes::from_static(cmd)));
1720 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
1721 for s in rest {
1722 parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
1723 }
1724 Frame::Array(parts)
1725}
1726
1727fn cmd_key_values<V: AsRef<[u8]>>(cmd: &'static [u8], key: &str, values: &[V]) -> Frame {
1728 let mut parts = Vec::with_capacity(2 + values.len());
1729 parts.push(Frame::Bulk(Bytes::from_static(cmd)));
1730 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
1731 for v in values {
1732 parts.push(Frame::Bulk(Bytes::copy_from_slice(v.as_ref())));
1733 }
1734 Frame::Array(parts)
1735}
1736
1737fn cmd_keys_only(cmd: &'static [u8], keys: &[&str]) -> Frame {
1739 let mut parts = Vec::with_capacity(1 + keys.len());
1740 parts.push(Frame::Bulk(Bytes::from_static(cmd)));
1741 for k in keys {
1742 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1743 }
1744 Frame::Array(parts)
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749 use super::*;
1750
1751 fn bulk(b: &[u8]) -> Frame {
1752 Frame::Bulk(Bytes::copy_from_slice(b))
1753 }
1754
1755 #[test]
1758 fn optional_bytes_bulk() {
1759 let f = Frame::Bulk(Bytes::from_static(b"hello"));
1760 assert_eq!(
1761 optional_bytes(f).unwrap(),
1762 Some(Bytes::from_static(b"hello"))
1763 );
1764 }
1765
1766 #[test]
1767 fn optional_bytes_null() {
1768 assert_eq!(optional_bytes(Frame::Null).unwrap(), None);
1769 }
1770
1771 #[test]
1772 fn optional_bytes_server_error() {
1773 let f = Frame::Error("WRONGTYPE".into());
1774 assert!(matches!(optional_bytes(f), Err(ClientError::Server(_))));
1775 }
1776
1777 #[test]
1778 fn optional_bytes_unexpected() {
1779 let f = Frame::Integer(1);
1780 assert!(matches!(optional_bytes(f), Err(ClientError::Protocol(_))));
1781 }
1782
1783 #[test]
1786 fn bool_flag_one() {
1787 assert!(bool_flag(Frame::Integer(1)).unwrap());
1788 }
1789
1790 #[test]
1791 fn bool_flag_zero() {
1792 assert!(!bool_flag(Frame::Integer(0)).unwrap());
1793 }
1794
1795 #[test]
1796 fn bool_flag_unexpected_value() {
1797 let f = Frame::Integer(42);
1798 assert!(matches!(bool_flag(f), Err(ClientError::Protocol(_))));
1799 }
1800
1801 #[test]
1804 fn pairs_even_array() {
1805 let f = Frame::Array(vec![
1806 bulk(b"field1"),
1807 bulk(b"val1"),
1808 bulk(b"field2"),
1809 bulk(b"val2"),
1810 ]);
1811 let result = pairs(f).unwrap();
1812 assert_eq!(result.len(), 2);
1813 assert_eq!(result[0].0, Bytes::from_static(b"field1"));
1814 assert_eq!(result[0].1, Bytes::from_static(b"val1"));
1815 }
1816
1817 #[test]
1818 fn pairs_odd_array_is_error() {
1819 let f = Frame::Array(vec![bulk(b"orphan")]);
1820 assert!(matches!(pairs(f), Err(ClientError::Protocol(_))));
1821 }
1822
1823 #[test]
1824 fn pairs_empty_array() {
1825 assert_eq!(pairs(Frame::Array(vec![])).unwrap(), vec![]);
1826 }
1827
1828 #[test]
1829 fn pairs_null() {
1830 assert_eq!(pairs(Frame::Null).unwrap(), vec![]);
1831 }
1832
1833 #[test]
1836 fn optional_score_valid_float() {
1837 let f = Frame::Bulk(Bytes::from_static(b"3.14159265358979"));
1838 let s = optional_score(f).unwrap();
1839 assert!((s.unwrap() - std::f64::consts::PI).abs() < 1e-10);
1840 }
1841
1842 #[test]
1843 fn optional_score_null() {
1844 assert_eq!(optional_score(Frame::Null).unwrap(), None);
1845 }
1846
1847 #[test]
1848 fn optional_score_malformed() {
1849 let f = Frame::Bulk(Bytes::from_static(b"notanumber"));
1850 assert!(matches!(optional_score(f), Err(ClientError::Protocol(_))));
1851 }
1852
1853 #[test]
1856 fn optional_bytes_vec_null_elements() {
1857 let f = Frame::Array(vec![
1858 Frame::Bulk(Bytes::from_static(b"value")),
1859 Frame::Null,
1860 Frame::Bulk(Bytes::from_static(b"another")),
1861 ]);
1862 let result = optional_bytes_vec(f).unwrap();
1863 assert_eq!(result.len(), 3);
1864 assert!(result[0].is_some());
1865 assert!(result[1].is_none());
1866 assert!(result[2].is_some());
1867 }
1868}