Skip to main content

ember_client/
commands.rs

1//! Typed command API for the ember client.
2//!
3//! Provides ergonomic methods on [`Client`] for all common commands. Each
4//! method takes strongly-typed inputs and returns decoded Rust values — no
5//! manual frame inspection needed.
6//!
7//! Value inputs use `impl AsRef<[u8]>`, so `&str`, `String`, `Vec<u8>`, and
8//! `bytes::Bytes` all work without explicit conversion.
9//!
10//! Value outputs use `Bytes` (binary-safe, reference-counted). Convert to a
11//! `String` with `String::from_utf8(bytes.to_vec())` when you know the value
12//! is UTF-8.
13
14use bytes::Bytes;
15use ember_protocol::types::Frame;
16
17use crate::connection::{Client, ClientError};
18use crate::pipeline::Pipeline;
19use crate::subscriber::Subscriber;
20
21// --- public types ---
22
23/// A page of keys returned by [`Client::scan`].
24///
25/// Iterate until `cursor` is `0` to walk the full keyspace.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ScanPage {
28    /// Cursor for the next call. `0` means iteration is complete.
29    pub cursor: u64,
30    /// Keys returned in this page.
31    pub keys: Vec<Bytes>,
32}
33
34/// A single entry from [`Client::slowlog_get`].
35#[derive(Debug, Clone)]
36pub struct SlowlogEntry {
37    /// Monotonically increasing log entry ID.
38    pub id: i64,
39    /// Unix timestamp (seconds) when the command was logged.
40    pub timestamp: i64,
41    /// Execution time in microseconds.
42    pub duration_us: i64,
43    /// The command and its arguments as raw bytes.
44    pub command: Vec<Bytes>,
45}
46
47// --- frame construction helpers ---
48
49fn cmd2(name: &'static [u8], a: &[u8]) -> Frame {
50    Frame::Array(vec![
51        Frame::Bulk(Bytes::from_static(name)),
52        Frame::Bulk(Bytes::copy_from_slice(a)),
53    ])
54}
55
56fn cmd3(name: &'static [u8], a: &[u8], b: &[u8]) -> Frame {
57    Frame::Array(vec![
58        Frame::Bulk(Bytes::from_static(name)),
59        Frame::Bulk(Bytes::copy_from_slice(a)),
60        Frame::Bulk(Bytes::copy_from_slice(b)),
61    ])
62}
63
64fn cmd4(name: &'static [u8], a: &[u8], b: &[u8], c: &[u8]) -> Frame {
65    Frame::Array(vec![
66        Frame::Bulk(Bytes::from_static(name)),
67        Frame::Bulk(Bytes::copy_from_slice(a)),
68        Frame::Bulk(Bytes::copy_from_slice(b)),
69        Frame::Bulk(Bytes::copy_from_slice(c)),
70    ])
71}
72
73// --- frame decoding helpers ---
74//
75// Each decoder maps a raw Frame to a typed result.
76// `Frame::Error` → `ClientError::Server`
77// Unexpected variants → `ClientError::Protocol`
78
79/// Decodes a bulk string response, returning `None` for null.
80///
81/// Used by: GET, LPOP, RPOP, HGET, GETDEL.
82fn optional_bytes(frame: Frame) -> Result<Option<Bytes>, ClientError> {
83    match frame {
84        Frame::Bulk(b) => Ok(Some(b)),
85        Frame::Null => Ok(None),
86        Frame::Error(e) => Err(ClientError::Server(e)),
87        other => Err(ClientError::Protocol(format!(
88            "expected bulk or null, got {other:?}"
89        ))),
90    }
91}
92
93/// Decodes an integer response.
94///
95/// Used by: DEL, EXISTS, INCR, DECR, INCRBY, DECRBY, APPEND, LPUSH, RPUSH,
96/// LLEN, HSET, HDEL, HLEN, SADD, SREM, SCARD, ZREM, ZCARD, ZADD, DBSIZE.
97fn integer(frame: Frame) -> Result<i64, ClientError> {
98    match frame {
99        Frame::Integer(n) => Ok(n),
100        Frame::Error(e) => Err(ClientError::Server(e)),
101        other => Err(ClientError::Protocol(format!(
102            "expected integer, got {other:?}"
103        ))),
104    }
105}
106
107/// Decodes an integer response, returning `None` for null.
108///
109/// Used by: ZRANK (returns null when member is absent).
110fn optional_integer(frame: Frame) -> Result<Option<i64>, ClientError> {
111    match frame {
112        Frame::Integer(n) => Ok(Some(n)),
113        Frame::Null => Ok(None),
114        Frame::Error(e) => Err(ClientError::Server(e)),
115        other => Err(ClientError::Protocol(format!(
116            "expected integer or null, got {other:?}"
117        ))),
118    }
119}
120
121/// Decodes an integer 0/1 as a boolean.
122///
123/// Used by: EXPIRE, PERSIST, HEXISTS, SISMEMBER.
124fn bool_flag(frame: Frame) -> Result<bool, ClientError> {
125    match frame {
126        Frame::Integer(1) => Ok(true),
127        Frame::Integer(0) => Ok(false),
128        Frame::Error(e) => Err(ClientError::Server(e)),
129        other => Err(ClientError::Protocol(format!(
130            "expected 0 or 1, got {other:?}"
131        ))),
132    }
133}
134
135/// Decodes an OK simple string response.
136///
137/// Used by: SET, MSET, FLUSHDB.
138fn ok(frame: Frame) -> Result<(), ClientError> {
139    match frame {
140        Frame::Simple(s) if s == "OK" => Ok(()),
141        Frame::Error(e) => Err(ClientError::Server(e)),
142        other => Err(ClientError::Protocol(format!(
143            "expected +OK, got {other:?}"
144        ))),
145    }
146}
147
148/// Decodes an array of bulk strings.
149///
150/// Used by: LRANGE, SMEMBERS, HKEYS, HVALS, ZRANGE.
151fn bytes_vec(frame: Frame) -> Result<Vec<Bytes>, ClientError> {
152    match frame {
153        Frame::Array(elems) => elems
154            .into_iter()
155            .map(|e| match e {
156                Frame::Bulk(b) => Ok(b),
157                Frame::Error(e) => Err(ClientError::Server(e)),
158                other => Err(ClientError::Protocol(format!(
159                    "expected bulk in array, got {other:?}"
160                ))),
161            })
162            .collect(),
163        Frame::Null => Ok(Vec::new()),
164        Frame::Error(e) => Err(ClientError::Server(e)),
165        other => Err(ClientError::Protocol(format!(
166            "expected array, got {other:?}"
167        ))),
168    }
169}
170
171/// Decodes an array where individual elements may be null.
172///
173/// Used by: MGET (missing keys come back as null within the array).
174fn optional_bytes_vec(frame: Frame) -> Result<Vec<Option<Bytes>>, ClientError> {
175    match frame {
176        Frame::Array(elems) => elems
177            .into_iter()
178            .map(|e| match e {
179                Frame::Bulk(b) => Ok(Some(b)),
180                Frame::Null => Ok(None),
181                Frame::Error(e) => Err(ClientError::Server(e)),
182                other => Err(ClientError::Protocol(format!(
183                    "unexpected element in array: {other:?}"
184                ))),
185            })
186            .collect(),
187        Frame::Error(e) => Err(ClientError::Server(e)),
188        other => Err(ClientError::Protocol(format!(
189            "expected array, got {other:?}"
190        ))),
191    }
192}
193
194/// Decodes a flat array of alternating keys and values into pairs.
195///
196/// Used by: HGETALL (field, value, field, value, ...).
197fn pairs(frame: Frame) -> Result<Vec<(Bytes, Bytes)>, ClientError> {
198    let elems = match frame {
199        Frame::Array(e) => e,
200        Frame::Null => return Ok(Vec::new()),
201        Frame::Error(e) => return Err(ClientError::Server(e)),
202        other => {
203            return Err(ClientError::Protocol(format!(
204                "expected array for pairs, got {other:?}"
205            )))
206        }
207    };
208
209    if elems.len() % 2 != 0 {
210        return Err(ClientError::Protocol(format!(
211            "pairs array has odd length ({})",
212            elems.len()
213        )));
214    }
215
216    let mut result = Vec::with_capacity(elems.len() / 2);
217    let mut iter = elems.into_iter();
218    while let (Some(k), Some(v)) = (iter.next(), iter.next()) {
219        let key = match k {
220            Frame::Bulk(b) => b,
221            other => {
222                return Err(ClientError::Protocol(format!(
223                    "expected bulk key in pairs, got {other:?}"
224                )))
225            }
226        };
227        let val = match v {
228            Frame::Bulk(b) => b,
229            other => {
230                return Err(ClientError::Protocol(format!(
231                    "expected bulk value in pairs, got {other:?}"
232                )))
233            }
234        };
235        result.push((key, val));
236    }
237    Ok(result)
238}
239
240/// Decodes a score returned as a bulk string float, returning `None` for null.
241///
242/// Used by: ZSCORE.
243fn optional_score(frame: Frame) -> Result<Option<f64>, ClientError> {
244    match frame {
245        Frame::Bulk(b) => {
246            let s = std::str::from_utf8(&b)
247                .map_err(|_| ClientError::Protocol("score is not valid UTF-8".into()))?;
248            s.parse::<f64>()
249                .map(Some)
250                .map_err(|_| ClientError::Protocol(format!("score is not a valid float: {s:?}")))
251        }
252        Frame::Null => Ok(None),
253        Frame::Error(e) => Err(ClientError::Server(e)),
254        other => Err(ClientError::Protocol(format!(
255            "expected bulk or null for score, got {other:?}"
256        ))),
257    }
258}
259
260/// Decodes `ZRANGE WITHSCORES` — alternating member / score pairs.
261///
262/// Used by: zrange_withscores.
263fn scored_members(frame: Frame) -> Result<Vec<(Bytes, f64)>, ClientError> {
264    let elems = match frame {
265        Frame::Array(e) => e,
266        Frame::Null => return Ok(Vec::new()),
267        Frame::Error(e) => return Err(ClientError::Server(e)),
268        other => {
269            return Err(ClientError::Protocol(format!(
270                "expected array for scored members, got {other:?}"
271            )))
272        }
273    };
274
275    if elems.len() % 2 != 0 {
276        return Err(ClientError::Protocol(format!(
277            "WITHSCORES array has odd length ({})",
278            elems.len()
279        )));
280    }
281
282    let mut result = Vec::with_capacity(elems.len() / 2);
283    let mut iter = elems.into_iter();
284    while let (Some(member_frame), Some(score_frame)) = (iter.next(), iter.next()) {
285        let member = match member_frame {
286            Frame::Bulk(b) => b,
287            other => {
288                return Err(ClientError::Protocol(format!(
289                    "expected bulk member, got {other:?}"
290                )))
291            }
292        };
293        let score = match score_frame {
294            Frame::Bulk(b) => {
295                let s = std::str::from_utf8(&b)
296                    .map_err(|_| ClientError::Protocol("score is not valid UTF-8".into()))?;
297                s.parse::<f64>().map_err(|_| {
298                    ClientError::Protocol(format!("score is not a valid float: {s:?}"))
299                })?
300            }
301            other => {
302                return Err(ClientError::Protocol(format!(
303                    "expected bulk score, got {other:?}"
304                )))
305            }
306        };
307        result.push((member, score));
308    }
309    Ok(result)
310}
311
312/// Decodes a simple string or bulk string response into a `String`.
313///
314/// Used by: TYPE, INFO, ECHO, BGSAVE, BGREWRITEAOF.
315fn string_value(frame: Frame) -> Result<String, ClientError> {
316    match frame {
317        Frame::Simple(s) => Ok(s),
318        Frame::Bulk(b) => String::from_utf8(b.to_vec())
319            .map_err(|_| ClientError::Protocol("response is not valid UTF-8".into())),
320        Frame::Error(e) => Err(ClientError::Server(e)),
321        other => Err(ClientError::Protocol(format!(
322            "expected simple or bulk string, got {other:?}"
323        ))),
324    }
325}
326
327/// Decodes a float returned as a bulk string.
328///
329/// Used by: INCRBYFLOAT.
330fn float_value(frame: Frame) -> Result<f64, ClientError> {
331    match frame {
332        Frame::Bulk(b) => {
333            let s = std::str::from_utf8(&b)
334                .map_err(|_| ClientError::Protocol("float response is not valid UTF-8".into()))?;
335            s.parse::<f64>()
336                .map_err(|_| ClientError::Protocol(format!("not a valid float: {s:?}")))
337        }
338        Frame::Error(e) => Err(ClientError::Server(e)),
339        other => Err(ClientError::Protocol(format!(
340            "expected bulk float, got {other:?}"
341        ))),
342    }
343}
344
345/// Decodes a SCAN response into a `ScanPage`.
346///
347/// RESP3 layout: `Array([Bulk(cursor), Array([Bulk(key), ...])])`.
348fn scan_page(frame: Frame) -> Result<ScanPage, ClientError> {
349    let elems = match frame {
350        Frame::Array(e) => e,
351        Frame::Error(e) => return Err(ClientError::Server(e)),
352        other => {
353            return Err(ClientError::Protocol(format!(
354                "expected array for SCAN, got {other:?}"
355            )))
356        }
357    };
358
359    if elems.len() != 2 {
360        return Err(ClientError::Protocol(format!(
361            "SCAN response must have 2 elements, got {}",
362            elems.len()
363        )));
364    }
365
366    let mut iter = elems.into_iter();
367    // Safety: length was validated to be exactly 2 above.
368    let cursor_frame = iter
369        .next()
370        .ok_or_else(|| ClientError::Protocol("SCAN response missing cursor element".into()))?;
371    let keys_frame = iter
372        .next()
373        .ok_or_else(|| ClientError::Protocol("SCAN response missing keys element".into()))?;
374
375    let cursor = match cursor_frame {
376        Frame::Bulk(b) => {
377            let s = std::str::from_utf8(&b)
378                .map_err(|_| ClientError::Protocol("SCAN cursor is not valid UTF-8".into()))?;
379            s.parse::<u64>()
380                .map_err(|_| ClientError::Protocol(format!("SCAN cursor is not a u64: {s:?}")))?
381        }
382        other => {
383            return Err(ClientError::Protocol(format!(
384                "expected bulk cursor in SCAN, got {other:?}"
385            )))
386        }
387    };
388
389    let keys = bytes_vec(keys_frame)?;
390    Ok(ScanPage { cursor, keys })
391}
392
393/// Decodes a SLOWLOG GET response into a list of entries.
394///
395/// Each entry: `Array([Integer(id), Integer(ts), Integer(us), Array([Bulk(cmd), ...])])`.
396fn slowlog_entries(frame: Frame) -> Result<Vec<SlowlogEntry>, ClientError> {
397    let outer = match frame {
398        Frame::Array(e) => e,
399        Frame::Null => return Ok(Vec::new()),
400        Frame::Error(e) => return Err(ClientError::Server(e)),
401        other => {
402            return Err(ClientError::Protocol(format!(
403                "expected array for SLOWLOG, got {other:?}"
404            )))
405        }
406    };
407
408    outer
409        .into_iter()
410        .map(|entry_frame| {
411            let entry = match entry_frame {
412                Frame::Array(e) => e,
413                other => {
414                    return Err(ClientError::Protocol(format!(
415                        "expected array for slowlog entry, got {other:?}"
416                    )))
417                }
418            };
419
420            if entry.len() < 4 {
421                return Err(ClientError::Protocol(format!(
422                    "slowlog entry too short: {} elements",
423                    entry.len()
424                )));
425            }
426
427            let id = match &entry[0] {
428                Frame::Integer(n) => *n,
429                other => {
430                    return Err(ClientError::Protocol(format!(
431                        "expected integer id in slowlog, got {other:?}"
432                    )))
433                }
434            };
435            let timestamp = match &entry[1] {
436                Frame::Integer(n) => *n,
437                other => {
438                    return Err(ClientError::Protocol(format!(
439                        "expected integer timestamp in slowlog, got {other:?}"
440                    )))
441                }
442            };
443            let duration_us = match &entry[2] {
444                Frame::Integer(n) => *n,
445                other => {
446                    return Err(ClientError::Protocol(format!(
447                        "expected integer duration in slowlog, got {other:?}"
448                    )))
449                }
450            };
451            let command = match entry.into_iter().nth(3).ok_or_else(|| {
452                ClientError::Protocol("slowlog entry missing command field".into())
453            })? {
454                Frame::Array(parts) => parts
455                    .into_iter()
456                    .map(|p| match p {
457                        Frame::Bulk(b) => Ok(b),
458                        other => Err(ClientError::Protocol(format!(
459                            "expected bulk in slowlog command, got {other:?}"
460                        ))),
461                    })
462                    .collect::<Result<Vec<_>, _>>()?,
463                other => {
464                    return Err(ClientError::Protocol(format!(
465                        "expected array for slowlog command, got {other:?}"
466                    )))
467                }
468            };
469
470            Ok(SlowlogEntry {
471                id,
472                timestamp,
473                duration_us,
474                command,
475            })
476        })
477        .collect()
478}
479
480/// Decodes a PUBSUB NUMSUB response into channel/count pairs.
481///
482/// Layout: `Array([Bulk(channel), Integer(count), ...])` — alternating.
483fn numsub_pairs(frame: Frame) -> Result<Vec<(Bytes, i64)>, ClientError> {
484    let elems = match frame {
485        Frame::Array(e) => e,
486        Frame::Null => return Ok(Vec::new()),
487        Frame::Error(e) => return Err(ClientError::Server(e)),
488        other => {
489            return Err(ClientError::Protocol(format!(
490                "expected array for PUBSUB NUMSUB, got {other:?}"
491            )))
492        }
493    };
494
495    if elems.len() % 2 != 0 {
496        return Err(ClientError::Protocol(format!(
497            "PUBSUB NUMSUB array has odd length ({})",
498            elems.len()
499        )));
500    }
501
502    let mut result = Vec::with_capacity(elems.len() / 2);
503    let mut iter = elems.into_iter();
504    while let (Some(ch_frame), Some(cnt_frame)) = (iter.next(), iter.next()) {
505        let channel = match ch_frame {
506            Frame::Bulk(b) => b,
507            other => {
508                return Err(ClientError::Protocol(format!(
509                    "expected bulk channel in PUBSUB NUMSUB, got {other:?}"
510                )))
511            }
512        };
513        let count = match cnt_frame {
514            Frame::Integer(n) => n,
515            other => {
516                return Err(ClientError::Protocol(format!(
517                    "expected integer count in PUBSUB NUMSUB, got {other:?}"
518                )))
519            }
520        };
521        result.push((channel, count));
522    }
523    Ok(result)
524}
525
526// --- typed command API ---
527
528impl Client {
529    // --- string commands ---
530
531    /// Returns the value for `key`, or `None` if it does not exist.
532    pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>, ClientError> {
533        let frame = self.send_frame(cmd2(b"GET", key.as_bytes())).await?;
534        optional_bytes(frame)
535    }
536
537    /// Sets `key` to `value` with no expiry.
538    pub async fn set(&mut self, key: &str, value: impl AsRef<[u8]>) -> Result<(), ClientError> {
539        let frame = self
540            .send_frame(cmd3(b"SET", key.as_bytes(), value.as_ref()))
541            .await?;
542        ok(frame)
543    }
544
545    /// Sets `key` to `value` with an expiry of `seconds`.
546    pub async fn set_ex(
547        &mut self,
548        key: &str,
549        value: impl AsRef<[u8]>,
550        seconds: u64,
551    ) -> Result<(), ClientError> {
552        let secs = seconds.to_string();
553        let frame = self
554            .send_frame(cmd4(
555                b"SETEX",
556                key.as_bytes(),
557                secs.as_bytes(),
558                value.as_ref(),
559            ))
560            .await?;
561        ok(frame)
562    }
563
564    /// Deletes one or more keys. Returns the number of keys that were removed.
565    pub async fn del(&mut self, keys: &[&str]) -> Result<i64, ClientError> {
566        let mut parts = Vec::with_capacity(1 + keys.len());
567        parts.push(Frame::Bulk(Bytes::from_static(b"DEL")));
568        for k in keys {
569            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
570        }
571        let frame = self.send_frame(Frame::Array(parts)).await?;
572        integer(frame)
573    }
574
575    /// Returns the number of supplied keys that exist.
576    pub async fn exists(&mut self, keys: &[&str]) -> Result<i64, ClientError> {
577        let mut parts = Vec::with_capacity(1 + keys.len());
578        parts.push(Frame::Bulk(Bytes::from_static(b"EXISTS")));
579        for k in keys {
580            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
581        }
582        let frame = self.send_frame(Frame::Array(parts)).await?;
583        integer(frame)
584    }
585
586    /// Sets a timeout of `seconds` on `key`. Returns `true` if the timeout
587    /// was set, `false` if the key does not exist.
588    pub async fn expire(&mut self, key: &str, seconds: u64) -> Result<bool, ClientError> {
589        let secs = seconds.to_string();
590        let frame = self
591            .send_frame(cmd3(b"EXPIRE", key.as_bytes(), secs.as_bytes()))
592            .await?;
593        bool_flag(frame)
594    }
595
596    /// Removes any existing timeout on `key`. Returns `true` if the timeout
597    /// was removed, `false` if the key has no timeout or does not exist.
598    pub async fn persist(&mut self, key: &str) -> Result<bool, ClientError> {
599        let frame = self.send_frame(cmd2(b"PERSIST", key.as_bytes())).await?;
600        bool_flag(frame)
601    }
602
603    /// Returns the remaining TTL in seconds. Returns `-2` if the key does not
604    /// exist, `-1` if it has no expiry.
605    pub async fn ttl(&mut self, key: &str) -> Result<i64, ClientError> {
606        let frame = self.send_frame(cmd2(b"TTL", key.as_bytes())).await?;
607        integer(frame)
608    }
609
610    /// Returns the remaining TTL in milliseconds.
611    pub async fn pttl(&mut self, key: &str) -> Result<i64, ClientError> {
612        let frame = self.send_frame(cmd2(b"PTTL", key.as_bytes())).await?;
613        integer(frame)
614    }
615
616    /// Increments the integer stored at `key` by 1. Returns the new value.
617    pub async fn incr(&mut self, key: &str) -> Result<i64, ClientError> {
618        let frame = self.send_frame(cmd2(b"INCR", key.as_bytes())).await?;
619        integer(frame)
620    }
621
622    /// Decrements the integer stored at `key` by 1. Returns the new value.
623    pub async fn decr(&mut self, key: &str) -> Result<i64, ClientError> {
624        let frame = self.send_frame(cmd2(b"DECR", key.as_bytes())).await?;
625        integer(frame)
626    }
627
628    /// Increments the integer stored at `key` by `delta`. Returns the new value.
629    pub async fn incrby(&mut self, key: &str, delta: i64) -> Result<i64, ClientError> {
630        let d = delta.to_string();
631        let frame = self
632            .send_frame(cmd3(b"INCRBY", key.as_bytes(), d.as_bytes()))
633            .await?;
634        integer(frame)
635    }
636
637    /// Decrements the integer stored at `key` by `delta`. Returns the new value.
638    pub async fn decrby(&mut self, key: &str, delta: i64) -> Result<i64, ClientError> {
639        let d = delta.to_string();
640        let frame = self
641            .send_frame(cmd3(b"DECRBY", key.as_bytes(), d.as_bytes()))
642            .await?;
643        integer(frame)
644    }
645
646    /// Appends `value` to the string at `key`. Returns the new length.
647    pub async fn append(&mut self, key: &str, value: impl AsRef<[u8]>) -> Result<i64, ClientError> {
648        let frame = self
649            .send_frame(cmd3(b"APPEND", key.as_bytes(), value.as_ref()))
650            .await?;
651        integer(frame)
652    }
653
654    /// Returns the values for multiple keys. Missing keys are `None`.
655    pub async fn mget(&mut self, keys: &[&str]) -> Result<Vec<Option<Bytes>>, ClientError> {
656        let mut parts = Vec::with_capacity(1 + keys.len());
657        parts.push(Frame::Bulk(Bytes::from_static(b"MGET")));
658        for k in keys {
659            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
660        }
661        let frame = self.send_frame(Frame::Array(parts)).await?;
662        optional_bytes_vec(frame)
663    }
664
665    /// Sets multiple key-value pairs atomically.
666    pub async fn mset<V: AsRef<[u8]>>(&mut self, pairs: &[(&str, V)]) -> Result<(), ClientError> {
667        let mut parts = Vec::with_capacity(1 + pairs.len() * 2);
668        parts.push(Frame::Bulk(Bytes::from_static(b"MSET")));
669        for (k, v) in pairs {
670            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
671            parts.push(Frame::Bulk(Bytes::copy_from_slice(v.as_ref())));
672        }
673        let frame = self.send_frame(Frame::Array(parts)).await?;
674        ok(frame)
675    }
676
677    /// Returns the value of `key` and deletes it atomically. Returns `None`
678    /// if the key does not exist.
679    pub async fn getdel(&mut self, key: &str) -> Result<Option<Bytes>, ClientError> {
680        let frame = self.send_frame(cmd2(b"GETDEL", key.as_bytes())).await?;
681        optional_bytes(frame)
682    }
683
684    // --- list commands ---
685
686    /// Prepends `values` to the list at `key`. Returns the new list length.
687    pub async fn lpush<V: AsRef<[u8]>>(
688        &mut self,
689        key: &str,
690        values: &[V],
691    ) -> Result<i64, ClientError> {
692        let frame = self
693            .send_frame(cmd_key_values(b"LPUSH", key, values))
694            .await?;
695        integer(frame)
696    }
697
698    /// Appends `values` to the list at `key`. Returns the new list length.
699    pub async fn rpush<V: AsRef<[u8]>>(
700        &mut self,
701        key: &str,
702        values: &[V],
703    ) -> Result<i64, ClientError> {
704        let frame = self
705            .send_frame(cmd_key_values(b"RPUSH", key, values))
706            .await?;
707        integer(frame)
708    }
709
710    /// Removes and returns the first element of the list at `key`.
711    pub async fn lpop(&mut self, key: &str) -> Result<Option<Bytes>, ClientError> {
712        let frame = self.send_frame(cmd2(b"LPOP", key.as_bytes())).await?;
713        optional_bytes(frame)
714    }
715
716    /// Removes and returns the last element of the list at `key`.
717    pub async fn rpop(&mut self, key: &str) -> Result<Option<Bytes>, ClientError> {
718        let frame = self.send_frame(cmd2(b"RPOP", key.as_bytes())).await?;
719        optional_bytes(frame)
720    }
721
722    /// Returns the elements of the list between `start` and `stop` (inclusive).
723    pub async fn lrange(
724        &mut self,
725        key: &str,
726        start: i64,
727        stop: i64,
728    ) -> Result<Vec<Bytes>, ClientError> {
729        let s = start.to_string();
730        let e = stop.to_string();
731        let frame = self
732            .send_frame(cmd4(b"LRANGE", key.as_bytes(), s.as_bytes(), e.as_bytes()))
733            .await?;
734        bytes_vec(frame)
735    }
736
737    /// Returns the length of the list at `key`. Returns 0 for missing keys.
738    pub async fn llen(&mut self, key: &str) -> Result<i64, ClientError> {
739        let frame = self.send_frame(cmd2(b"LLEN", key.as_bytes())).await?;
740        integer(frame)
741    }
742
743    // --- hash commands ---
744
745    /// Sets `field`/`value` pairs on the hash at `key`. Returns the number of
746    /// new fields added.
747    pub async fn hset<V: AsRef<[u8]>>(
748        &mut self,
749        key: &str,
750        pairs: &[(&str, V)],
751    ) -> Result<i64, ClientError> {
752        let mut parts = Vec::with_capacity(2 + pairs.len() * 2);
753        parts.push(Frame::Bulk(Bytes::from_static(b"HSET")));
754        parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
755        for (field, val) in pairs {
756            parts.push(Frame::Bulk(Bytes::copy_from_slice(field.as_bytes())));
757            parts.push(Frame::Bulk(Bytes::copy_from_slice(val.as_ref())));
758        }
759        let frame = self.send_frame(Frame::Array(parts)).await?;
760        integer(frame)
761    }
762
763    /// Returns the value at `field` in the hash at `key`, or `None` if either
764    /// does not exist.
765    pub async fn hget(&mut self, key: &str, field: &str) -> Result<Option<Bytes>, ClientError> {
766        let frame = self
767            .send_frame(cmd3(b"HGET", key.as_bytes(), field.as_bytes()))
768            .await?;
769        optional_bytes(frame)
770    }
771
772    /// Returns all field-value pairs in the hash at `key`.
773    pub async fn hgetall(&mut self, key: &str) -> Result<Vec<(Bytes, Bytes)>, ClientError> {
774        let frame = self.send_frame(cmd2(b"HGETALL", key.as_bytes())).await?;
775        pairs(frame)
776    }
777
778    /// Deletes `fields` from the hash at `key`. Returns the number removed.
779    pub async fn hdel(&mut self, key: &str, fields: &[&str]) -> Result<i64, ClientError> {
780        let frame = self
781            .send_frame(cmd_key_and_keys(b"HDEL", key, fields))
782            .await?;
783        integer(frame)
784    }
785
786    /// Returns `true` if `field` exists in the hash at `key`.
787    pub async fn hexists(&mut self, key: &str, field: &str) -> Result<bool, ClientError> {
788        let frame = self
789            .send_frame(cmd3(b"HEXISTS", key.as_bytes(), field.as_bytes()))
790            .await?;
791        bool_flag(frame)
792    }
793
794    /// Returns the number of fields in the hash at `key`.
795    pub async fn hlen(&mut self, key: &str) -> Result<i64, ClientError> {
796        let frame = self.send_frame(cmd2(b"HLEN", key.as_bytes())).await?;
797        integer(frame)
798    }
799
800    /// Increments the integer stored at `field` in the hash at `key` by
801    /// `delta`. Returns the new value.
802    pub async fn hincrby(
803        &mut self,
804        key: &str,
805        field: &str,
806        delta: i64,
807    ) -> Result<i64, ClientError> {
808        let d = delta.to_string();
809        let frame = self
810            .send_frame(cmd4(
811                b"HINCRBY",
812                key.as_bytes(),
813                field.as_bytes(),
814                d.as_bytes(),
815            ))
816            .await?;
817        integer(frame)
818    }
819
820    /// Returns all field names in the hash at `key`.
821    pub async fn hkeys(&mut self, key: &str) -> Result<Vec<Bytes>, ClientError> {
822        let frame = self.send_frame(cmd2(b"HKEYS", key.as_bytes())).await?;
823        bytes_vec(frame)
824    }
825
826    /// Returns all values in the hash at `key`.
827    pub async fn hvals(&mut self, key: &str) -> Result<Vec<Bytes>, ClientError> {
828        let frame = self.send_frame(cmd2(b"HVALS", key.as_bytes())).await?;
829        bytes_vec(frame)
830    }
831
832    // --- set commands ---
833
834    /// Adds `members` to the set at `key`. Returns the number added.
835    pub async fn sadd(&mut self, key: &str, members: &[&str]) -> Result<i64, ClientError> {
836        let frame = self
837            .send_frame(cmd_key_and_keys(b"SADD", key, members))
838            .await?;
839        integer(frame)
840    }
841
842    /// Removes `members` from the set at `key`. Returns the number removed.
843    pub async fn srem(&mut self, key: &str, members: &[&str]) -> Result<i64, ClientError> {
844        let frame = self
845            .send_frame(cmd_key_and_keys(b"SREM", key, members))
846            .await?;
847        integer(frame)
848    }
849
850    /// Returns all members of the set at `key`.
851    pub async fn smembers(&mut self, key: &str) -> Result<Vec<Bytes>, ClientError> {
852        let frame = self.send_frame(cmd2(b"SMEMBERS", key.as_bytes())).await?;
853        bytes_vec(frame)
854    }
855
856    /// Returns `true` if `member` belongs to the set at `key`.
857    pub async fn sismember(&mut self, key: &str, member: &str) -> Result<bool, ClientError> {
858        let frame = self
859            .send_frame(cmd3(b"SISMEMBER", key.as_bytes(), member.as_bytes()))
860            .await?;
861        bool_flag(frame)
862    }
863
864    /// Returns the cardinality (number of members) of the set at `key`.
865    pub async fn scard(&mut self, key: &str) -> Result<i64, ClientError> {
866        let frame = self.send_frame(cmd2(b"SCARD", key.as_bytes())).await?;
867        integer(frame)
868    }
869
870    // --- sorted set commands ---
871
872    /// Adds `members` to the sorted set at `key`. Each member is a
873    /// `(score, name)` pair. Returns the number of new members added.
874    pub async fn zadd(&mut self, key: &str, members: &[(f64, &str)]) -> Result<i64, ClientError> {
875        let mut parts = Vec::with_capacity(2 + members.len() * 2);
876        parts.push(Frame::Bulk(Bytes::from_static(b"ZADD")));
877        parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
878        for (score, member) in members {
879            let s = score.to_string();
880            parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
881            parts.push(Frame::Bulk(Bytes::copy_from_slice(member.as_bytes())));
882        }
883        let frame = self.send_frame(Frame::Array(parts)).await?;
884        integer(frame)
885    }
886
887    /// Returns members of the sorted set between rank `start` and `stop`
888    /// (0-based, inclusive).
889    pub async fn zrange(
890        &mut self,
891        key: &str,
892        start: i64,
893        stop: i64,
894    ) -> Result<Vec<Bytes>, ClientError> {
895        let s = start.to_string();
896        let e = stop.to_string();
897        let frame = self
898            .send_frame(cmd4(b"ZRANGE", key.as_bytes(), s.as_bytes(), e.as_bytes()))
899            .await?;
900        bytes_vec(frame)
901    }
902
903    /// Like `zrange`, but also returns scores as `(member, score)` pairs.
904    pub async fn zrange_withscores(
905        &mut self,
906        key: &str,
907        start: i64,
908        stop: i64,
909    ) -> Result<Vec<(Bytes, f64)>, ClientError> {
910        let s = start.to_string();
911        let e = stop.to_string();
912        let frame = self
913            .send_frame(Frame::Array(vec![
914                Frame::Bulk(Bytes::from_static(b"ZRANGE")),
915                Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())),
916                Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())),
917                Frame::Bulk(Bytes::copy_from_slice(e.as_bytes())),
918                Frame::Bulk(Bytes::from_static(b"WITHSCORES")),
919            ]))
920            .await?;
921        scored_members(frame)
922    }
923
924    /// Returns the score of `member` in the sorted set, or `None` if absent.
925    pub async fn zscore(&mut self, key: &str, member: &str) -> Result<Option<f64>, ClientError> {
926        let frame = self
927            .send_frame(cmd3(b"ZSCORE", key.as_bytes(), member.as_bytes()))
928            .await?;
929        optional_score(frame)
930    }
931
932    /// Returns the rank of `member` in the sorted set (0-based, ascending
933    /// score order), or `None` if absent.
934    pub async fn zrank(&mut self, key: &str, member: &str) -> Result<Option<i64>, ClientError> {
935        let frame = self
936            .send_frame(cmd3(b"ZRANK", key.as_bytes(), member.as_bytes()))
937            .await?;
938        optional_integer(frame)
939    }
940
941    /// Removes `members` from the sorted set at `key`. Returns the number
942    /// removed.
943    pub async fn zrem(&mut self, key: &str, members: &[&str]) -> Result<i64, ClientError> {
944        let frame = self
945            .send_frame(cmd_key_and_keys(b"ZREM", key, members))
946            .await?;
947        integer(frame)
948    }
949
950    /// Returns the number of members in the sorted set at `key`.
951    pub async fn zcard(&mut self, key: &str) -> Result<i64, ClientError> {
952        let frame = self.send_frame(cmd2(b"ZCARD", key.as_bytes())).await?;
953        integer(frame)
954    }
955
956    // --- server commands ---
957
958    /// Sends a `PING` and expects a `PONG` response.
959    pub async fn ping(&mut self) -> Result<(), ClientError> {
960        let frame = self
961            .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(b"PING"))]))
962            .await?;
963        match frame {
964            Frame::Simple(s) if s == "PONG" => Ok(()),
965            Frame::Error(e) => Err(ClientError::Server(e)),
966            other => Err(ClientError::Protocol(format!(
967                "expected PONG, got {other:?}"
968            ))),
969        }
970    }
971
972    /// Returns the number of keys in the current database.
973    pub async fn dbsize(&mut self) -> Result<i64, ClientError> {
974        let frame = self
975            .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(
976                b"DBSIZE",
977            ))]))
978            .await?;
979        integer(frame)
980    }
981
982    /// Removes all keys from the current database.
983    pub async fn flushdb(&mut self) -> Result<(), ClientError> {
984        let frame = self
985            .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(
986                b"FLUSHDB",
987            ))]))
988            .await?;
989        ok(frame)
990    }
991
992    // --- more string commands ---
993
994    /// Returns the length of the string at `key`. Returns 0 for missing keys.
995    pub async fn strlen(&mut self, key: &str) -> Result<i64, ClientError> {
996        let frame = self.send_frame(cmd2(b"STRLEN", key.as_bytes())).await?;
997        integer(frame)
998    }
999
1000    /// Increments the float stored at `key` by `delta`. Returns the new value.
1001    pub async fn incr_by_float(&mut self, key: &str, delta: f64) -> Result<f64, ClientError> {
1002        let d = delta.to_string();
1003        let frame = self
1004            .send_frame(cmd3(b"INCRBYFLOAT", key.as_bytes(), d.as_bytes()))
1005            .await?;
1006        float_value(frame)
1007    }
1008
1009    // --- key commands ---
1010
1011    /// Returns the type of the value stored at `key` as a string:
1012    /// `"string"`, `"list"`, `"set"`, `"zset"`, `"hash"`, or `"none"`.
1013    pub async fn key_type(&mut self, key: &str) -> Result<String, ClientError> {
1014        let frame = self.send_frame(cmd2(b"TYPE", key.as_bytes())).await?;
1015        string_value(frame)
1016    }
1017
1018    /// Returns all keys matching `pattern`.
1019    ///
1020    /// Use `"*"` to return every key. This is a blocking O(N) scan — prefer
1021    /// [`Client::scan`] in production.
1022    pub async fn keys(&mut self, pattern: &str) -> Result<Vec<Bytes>, ClientError> {
1023        let frame = self.send_frame(cmd2(b"KEYS", pattern.as_bytes())).await?;
1024        bytes_vec(frame)
1025    }
1026
1027    /// Renames `key` to `newkey`. Returns an error if `key` does not exist.
1028    pub async fn rename(&mut self, key: &str, newkey: &str) -> Result<(), ClientError> {
1029        let frame = self
1030            .send_frame(cmd3(b"RENAME", key.as_bytes(), newkey.as_bytes()))
1031            .await?;
1032        ok(frame)
1033    }
1034
1035    /// Incrementally iterates keys in the keyspace.
1036    ///
1037    /// Pass `cursor: 0` to start a new iteration. Continue calling with the
1038    /// returned cursor until the cursor is `0` again. An optional `pattern`
1039    /// filters by glob and `count` hints at the page size (server may return
1040    /// more or fewer).
1041    pub async fn scan(
1042        &mut self,
1043        cursor: u64,
1044        count: Option<u32>,
1045        pattern: Option<&str>,
1046    ) -> Result<ScanPage, ClientError> {
1047        let cur = cursor.to_string();
1048        let mut parts = Vec::with_capacity(6);
1049        parts.push(Frame::Bulk(Bytes::from_static(b"SCAN")));
1050        parts.push(Frame::Bulk(Bytes::copy_from_slice(cur.as_bytes())));
1051        if let Some(pat) = pattern {
1052            parts.push(Frame::Bulk(Bytes::from_static(b"MATCH")));
1053            parts.push(Frame::Bulk(Bytes::copy_from_slice(pat.as_bytes())));
1054        }
1055        if let Some(n) = count {
1056            let ns = n.to_string();
1057            parts.push(Frame::Bulk(Bytes::from_static(b"COUNT")));
1058            parts.push(Frame::Bulk(Bytes::copy_from_slice(ns.as_bytes())));
1059        }
1060        let frame = self.send_frame(Frame::Array(parts)).await?;
1061        scan_page(frame)
1062    }
1063
1064    /// Sets a timeout of `millis` milliseconds on `key`. Returns `true` if
1065    /// the timeout was set, `false` if the key does not exist.
1066    pub async fn pexpire(&mut self, key: &str, millis: u64) -> Result<bool, ClientError> {
1067        let ms = millis.to_string();
1068        let frame = self
1069            .send_frame(cmd3(b"PEXPIRE", key.as_bytes(), ms.as_bytes()))
1070            .await?;
1071        bool_flag(frame)
1072    }
1073
1074    // --- more hash commands ---
1075
1076    /// Returns values for multiple `fields` in the hash at `key`. Missing
1077    /// fields are `None`.
1078    pub async fn hmget(
1079        &mut self,
1080        key: &str,
1081        fields: &[&str],
1082    ) -> Result<Vec<Option<Bytes>>, ClientError> {
1083        let frame = self
1084            .send_frame(cmd_key_and_keys(b"HMGET", key, fields))
1085            .await?;
1086        optional_bytes_vec(frame)
1087    }
1088
1089    // --- more server commands ---
1090
1091    /// Echoes `message` back from the server. Useful for round-trip testing.
1092    pub async fn echo(&mut self, message: &str) -> Result<Bytes, ClientError> {
1093        let frame = self.send_frame(cmd2(b"ECHO", message.as_bytes())).await?;
1094        match frame {
1095            Frame::Bulk(b) => Ok(b),
1096            Frame::Error(e) => Err(ClientError::Server(e)),
1097            other => Err(ClientError::Protocol(format!(
1098                "expected bulk for ECHO, got {other:?}"
1099            ))),
1100        }
1101    }
1102
1103    /// Deletes keys asynchronously. Behaves like `del` but frees memory in
1104    /// the background for large values. Returns the number of keys removed.
1105    pub async fn unlink(&mut self, keys: &[&str]) -> Result<i64, ClientError> {
1106        let frame = self.send_frame(cmd_keys_only(b"UNLINK", keys)).await?;
1107        integer(frame)
1108    }
1109
1110    /// Returns server information. Pass `Some("keyspace")` for a specific
1111    /// section, or `None` for all sections.
1112    pub async fn info(&mut self, section: Option<&str>) -> Result<String, ClientError> {
1113        let frame = match section {
1114            Some(s) => self.send_frame(cmd2(b"INFO", s.as_bytes())).await?,
1115            None => {
1116                self.send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(b"INFO"))]))
1117                    .await?
1118            }
1119        };
1120        match frame {
1121            Frame::Bulk(b) => String::from_utf8(b.to_vec())
1122                .map_err(|_| ClientError::Protocol("INFO response is not valid UTF-8".into())),
1123            Frame::Error(e) => Err(ClientError::Server(e)),
1124            other => Err(ClientError::Protocol(format!(
1125                "expected bulk for INFO, got {other:?}"
1126            ))),
1127        }
1128    }
1129
1130    /// Triggers a background snapshot (`BGSAVE`). Returns the server status
1131    /// message.
1132    pub async fn bgsave(&mut self) -> Result<String, ClientError> {
1133        let frame = self
1134            .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(
1135                b"BGSAVE",
1136            ))]))
1137            .await?;
1138        string_value(frame)
1139    }
1140
1141    /// Triggers an AOF rewrite in the background (`BGREWRITEAOF`). Returns
1142    /// the server status message.
1143    pub async fn bgrewriteaof(&mut self) -> Result<String, ClientError> {
1144        let frame = self
1145            .send_frame(Frame::Array(vec![Frame::Bulk(Bytes::from_static(
1146                b"BGREWRITEAOF",
1147            ))]))
1148            .await?;
1149        string_value(frame)
1150    }
1151
1152    // --- slowlog commands ---
1153
1154    /// Returns up to `count` recent slow-log entries, or all entries if
1155    /// `count` is `None`.
1156    pub async fn slowlog_get(
1157        &mut self,
1158        count: Option<u32>,
1159    ) -> Result<Vec<SlowlogEntry>, ClientError> {
1160        let frame = match count {
1161            Some(n) => {
1162                let ns = n.to_string();
1163                self.send_frame(Frame::Array(vec![
1164                    Frame::Bulk(Bytes::from_static(b"SLOWLOG")),
1165                    Frame::Bulk(Bytes::from_static(b"GET")),
1166                    Frame::Bulk(Bytes::copy_from_slice(ns.as_bytes())),
1167                ]))
1168                .await?
1169            }
1170            None => {
1171                self.send_frame(Frame::Array(vec![
1172                    Frame::Bulk(Bytes::from_static(b"SLOWLOG")),
1173                    Frame::Bulk(Bytes::from_static(b"GET")),
1174                ]))
1175                .await?
1176            }
1177        };
1178        slowlog_entries(frame)
1179    }
1180
1181    /// Returns the number of entries currently in the slow log.
1182    pub async fn slowlog_len(&mut self) -> Result<i64, ClientError> {
1183        let frame = self
1184            .send_frame(Frame::Array(vec![
1185                Frame::Bulk(Bytes::from_static(b"SLOWLOG")),
1186                Frame::Bulk(Bytes::from_static(b"LEN")),
1187            ]))
1188            .await?;
1189        integer(frame)
1190    }
1191
1192    /// Clears all entries from the slow log.
1193    pub async fn slowlog_reset(&mut self) -> Result<(), ClientError> {
1194        let frame = self
1195            .send_frame(Frame::Array(vec![
1196                Frame::Bulk(Bytes::from_static(b"SLOWLOG")),
1197                Frame::Bulk(Bytes::from_static(b"RESET")),
1198            ]))
1199            .await?;
1200        ok(frame)
1201    }
1202
1203    // --- pub/sub commands (request-response only) ---
1204
1205    /// Publishes `message` to `channel`. Returns the number of subscribers
1206    /// that received the message.
1207    pub async fn publish(
1208        &mut self,
1209        channel: &str,
1210        message: impl AsRef<[u8]>,
1211    ) -> Result<i64, ClientError> {
1212        let frame = self
1213            .send_frame(cmd3(b"PUBLISH", channel.as_bytes(), message.as_ref()))
1214            .await?;
1215        integer(frame)
1216    }
1217
1218    /// Returns the names of active pub/sub channels. Pass `Some(pattern)` to
1219    /// filter by glob, or `None` for all channels.
1220    pub async fn pubsub_channels(
1221        &mut self,
1222        pattern: Option<&str>,
1223    ) -> Result<Vec<Bytes>, ClientError> {
1224        let frame = match pattern {
1225            Some(p) => {
1226                self.send_frame(Frame::Array(vec![
1227                    Frame::Bulk(Bytes::from_static(b"PUBSUB")),
1228                    Frame::Bulk(Bytes::from_static(b"CHANNELS")),
1229                    Frame::Bulk(Bytes::copy_from_slice(p.as_bytes())),
1230                ]))
1231                .await?
1232            }
1233            None => {
1234                self.send_frame(Frame::Array(vec![
1235                    Frame::Bulk(Bytes::from_static(b"PUBSUB")),
1236                    Frame::Bulk(Bytes::from_static(b"CHANNELS")),
1237                ]))
1238                .await?
1239            }
1240        };
1241        bytes_vec(frame)
1242    }
1243
1244    /// Returns the subscriber counts for the given channels as
1245    /// `(channel, count)` pairs.
1246    pub async fn pubsub_numsub(
1247        &mut self,
1248        channels: &[&str],
1249    ) -> Result<Vec<(Bytes, i64)>, ClientError> {
1250        let mut parts = Vec::with_capacity(2 + channels.len());
1251        parts.push(Frame::Bulk(Bytes::from_static(b"PUBSUB")));
1252        parts.push(Frame::Bulk(Bytes::from_static(b"NUMSUB")));
1253        for ch in channels {
1254            parts.push(Frame::Bulk(Bytes::copy_from_slice(ch.as_bytes())));
1255        }
1256        let frame = self.send_frame(Frame::Array(parts)).await?;
1257        numsub_pairs(frame)
1258    }
1259
1260    /// Returns the number of active pattern subscriptions across all clients.
1261    pub async fn pubsub_numpat(&mut self) -> Result<i64, ClientError> {
1262        let frame = self
1263            .send_frame(Frame::Array(vec![
1264                Frame::Bulk(Bytes::from_static(b"PUBSUB")),
1265                Frame::Bulk(Bytes::from_static(b"NUMPAT")),
1266            ]))
1267            .await?;
1268        integer(frame)
1269    }
1270
1271    // --- pub/sub subscriber mode ---
1272
1273    /// Puts the connection into subscriber mode on `channels`.
1274    ///
1275    /// The connection is consumed and a [`Subscriber`] is returned. Use a
1276    /// separate [`Client`] for regular commands while subscribed.
1277    pub async fn subscribe(mut self, channels: &[&str]) -> Result<Subscriber, ClientError> {
1278        let mut parts = Vec::with_capacity(1 + channels.len());
1279        parts.push(Frame::Bulk(Bytes::from_static(b"SUBSCRIBE")));
1280        for ch in channels {
1281            parts.push(Frame::Bulk(Bytes::copy_from_slice(ch.as_bytes())));
1282        }
1283        self.write_frame(Frame::Array(parts)).await?;
1284        // drain confirmation frames (one per channel)
1285        for _ in 0..channels.len() {
1286            self.read_response().await?;
1287        }
1288        Ok(Subscriber::new(self))
1289    }
1290
1291    /// Same as [`subscribe`](Client::subscribe) but subscribes to glob
1292    /// patterns with `PSUBSCRIBE`.
1293    pub async fn psubscribe(mut self, patterns: &[&str]) -> Result<Subscriber, ClientError> {
1294        let mut parts = Vec::with_capacity(1 + patterns.len());
1295        parts.push(Frame::Bulk(Bytes::from_static(b"PSUBSCRIBE")));
1296        for p in patterns {
1297            parts.push(Frame::Bulk(Bytes::copy_from_slice(p.as_bytes())));
1298        }
1299        self.write_frame(Frame::Array(parts)).await?;
1300        // drain confirmation frames (one per pattern)
1301        for _ in 0..patterns.len() {
1302            self.read_response().await?;
1303        }
1304        Ok(Subscriber::new(self))
1305    }
1306
1307    // --- expiry commands ---
1308
1309    /// Returns the absolute unix expiry timestamp in seconds, or -1 if no expiry, -2 if key missing.
1310    pub async fn expiretime(&mut self, key: &str) -> Result<i64, ClientError> {
1311        let frame = self.send_frame(cmd2(b"EXPIRETIME", key.as_bytes())).await?;
1312        integer(frame)
1313    }
1314
1315    /// Returns the absolute unix expiry timestamp in milliseconds, or -1 if no expiry, -2 if key missing.
1316    pub async fn pexpiretime(&mut self, key: &str) -> Result<i64, ClientError> {
1317        let frame = self
1318            .send_frame(cmd2(b"PEXPIRETIME", key.as_bytes()))
1319            .await?;
1320        integer(frame)
1321    }
1322
1323    /// Sets expiry at an absolute unix timestamp (seconds). Returns `true` if the timeout was set.
1324    pub async fn expireat(&mut self, key: &str, timestamp: u64) -> Result<bool, ClientError> {
1325        let ts = timestamp.to_string();
1326        let frame = self
1327            .send_frame(cmd3(b"EXPIREAT", key.as_bytes(), ts.as_bytes()))
1328            .await?;
1329        bool_flag(frame)
1330    }
1331
1332    /// Sets expiry at an absolute unix timestamp (milliseconds). Returns `true` if the timeout was set.
1333    pub async fn pexpireat(&mut self, key: &str, timestamp_ms: u64) -> Result<bool, ClientError> {
1334        let ts = timestamp_ms.to_string();
1335        let frame = self
1336            .send_frame(cmd3(b"PEXPIREAT", key.as_bytes(), ts.as_bytes()))
1337            .await?;
1338        bool_flag(frame)
1339    }
1340
1341    // --- string commands (extended) ---
1342
1343    /// Sets `key` to `value`, returning the old value. Returns `None` if the key didn't exist.
1344    pub async fn getset(
1345        &mut self,
1346        key: &str,
1347        value: impl AsRef<[u8]>,
1348    ) -> Result<Option<Bytes>, ClientError> {
1349        let frame = self
1350            .send_frame(cmd3(b"GETSET", key.as_bytes(), value.as_ref()))
1351            .await?;
1352        optional_bytes(frame)
1353    }
1354
1355    /// Sets multiple key-value pairs only if none of the keys already exist.
1356    /// Returns `true` if all keys were set, `false` if any key existed.
1357    pub async fn msetnx<V: AsRef<[u8]>>(
1358        &mut self,
1359        pairs: &[(&str, V)],
1360    ) -> Result<bool, ClientError> {
1361        let mut parts = Vec::with_capacity(1 + pairs.len() * 2);
1362        parts.push(Frame::Bulk(Bytes::from_static(b"MSETNX")));
1363        for (k, v) in pairs {
1364            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1365            parts.push(Frame::Bulk(Bytes::copy_from_slice(v.as_ref())));
1366        }
1367        let frame = self.send_frame(Frame::Array(parts)).await?;
1368        bool_flag(frame)
1369    }
1370
1371    // --- bitmap commands ---
1372
1373    /// Returns the bit value at `offset` in the string stored at `key`.
1374    pub async fn getbit(&mut self, key: &str, offset: u64) -> Result<i64, ClientError> {
1375        let off = offset.to_string();
1376        let frame = self
1377            .send_frame(cmd3(b"GETBIT", key.as_bytes(), off.as_bytes()))
1378            .await?;
1379        integer(frame)
1380    }
1381
1382    /// Sets or clears the bit at `offset`. Returns the original bit value.
1383    pub async fn setbit(&mut self, key: &str, offset: u64, value: u8) -> Result<i64, ClientError> {
1384        let off = offset.to_string();
1385        let val = value.to_string();
1386        let frame = self
1387            .send_frame(cmd4(
1388                b"SETBIT",
1389                key.as_bytes(),
1390                off.as_bytes(),
1391                val.as_bytes(),
1392            ))
1393            .await?;
1394        integer(frame)
1395    }
1396
1397    /// Counts set bits in the string at `key`.
1398    ///
1399    /// `range`: optional `(start, end, unit)` where unit is `"BYTE"` or `"BIT"`.
1400    pub async fn bitcount(
1401        &mut self,
1402        key: &str,
1403        range: Option<(i64, i64, &str)>,
1404    ) -> Result<i64, ClientError> {
1405        let frame = if let Some((start, end, unit)) = range {
1406            let s = start.to_string();
1407            let e = end.to_string();
1408            let mut parts = Vec::with_capacity(5);
1409            parts.push(Frame::Bulk(Bytes::from_static(b"BITCOUNT")));
1410            parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
1411            parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
1412            parts.push(Frame::Bulk(Bytes::copy_from_slice(e.as_bytes())));
1413            parts.push(Frame::Bulk(Bytes::copy_from_slice(unit.as_bytes())));
1414            self.send_frame(Frame::Array(parts)).await?
1415        } else {
1416            self.send_frame(cmd2(b"BITCOUNT", key.as_bytes())).await?
1417        };
1418        integer(frame)
1419    }
1420
1421    /// Finds the first set (`bit=1`) or clear (`bit=0`) bit in the string at `key`.
1422    ///
1423    /// `range`: optional `(start, end, unit)` where unit is `"BYTE"` or `"BIT"`.
1424    pub async fn bitpos(
1425        &mut self,
1426        key: &str,
1427        bit: u8,
1428        range: Option<(i64, i64, &str)>,
1429    ) -> Result<i64, ClientError> {
1430        let b = bit.to_string();
1431        let frame = if let Some((start, end, unit)) = range {
1432            let s = start.to_string();
1433            let e = end.to_string();
1434            let mut parts = Vec::with_capacity(6);
1435            parts.push(Frame::Bulk(Bytes::from_static(b"BITPOS")));
1436            parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
1437            parts.push(Frame::Bulk(Bytes::copy_from_slice(b.as_bytes())));
1438            parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
1439            parts.push(Frame::Bulk(Bytes::copy_from_slice(e.as_bytes())));
1440            parts.push(Frame::Bulk(Bytes::copy_from_slice(unit.as_bytes())));
1441            self.send_frame(Frame::Array(parts)).await?
1442        } else {
1443            self.send_frame(cmd3(b"BITPOS", key.as_bytes(), b.as_bytes()))
1444                .await?
1445        };
1446        integer(frame)
1447    }
1448
1449    /// Performs a bitwise operation between strings.
1450    ///
1451    /// `op` is `"AND"`, `"OR"`, `"XOR"`, or `"NOT"`. Result is stored at `dest`.
1452    /// Returns the length of the resulting string.
1453    pub async fn bitop(&mut self, op: &str, dest: &str, keys: &[&str]) -> Result<i64, ClientError> {
1454        let mut parts = Vec::with_capacity(3 + keys.len());
1455        parts.push(Frame::Bulk(Bytes::from_static(b"BITOP")));
1456        parts.push(Frame::Bulk(Bytes::copy_from_slice(op.as_bytes())));
1457        parts.push(Frame::Bulk(Bytes::copy_from_slice(dest.as_bytes())));
1458        for k in keys {
1459            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1460        }
1461        let frame = self.send_frame(Frame::Array(parts)).await?;
1462        integer(frame)
1463    }
1464
1465    // --- set commands (extended) ---
1466
1467    /// Atomically moves `member` from `src` to `dst`. Returns `true` if the move succeeded.
1468    pub async fn smove(&mut self, src: &str, dst: &str, member: &str) -> Result<bool, ClientError> {
1469        let frame = self
1470            .send_frame(cmd4(
1471                b"SMOVE",
1472                src.as_bytes(),
1473                dst.as_bytes(),
1474                member.as_bytes(),
1475            ))
1476            .await?;
1477        bool_flag(frame)
1478    }
1479
1480    /// Returns the cardinality of the intersection of multiple sets. `limit=0` means no limit.
1481    pub async fn sintercard(&mut self, keys: &[&str], limit: usize) -> Result<i64, ClientError> {
1482        let n = keys.len().to_string();
1483        let lim = limit.to_string();
1484        let mut parts = Vec::with_capacity(3 + keys.len() + 2);
1485        parts.push(Frame::Bulk(Bytes::from_static(b"SINTERCARD")));
1486        parts.push(Frame::Bulk(Bytes::copy_from_slice(n.as_bytes())));
1487        for k in keys {
1488            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1489        }
1490        if limit > 0 {
1491            parts.push(Frame::Bulk(Bytes::from_static(b"LIMIT")));
1492            parts.push(Frame::Bulk(Bytes::copy_from_slice(lim.as_bytes())));
1493        }
1494        let frame = self.send_frame(Frame::Array(parts)).await?;
1495        integer(frame)
1496    }
1497
1498    // --- list commands (extended) ---
1499
1500    /// Pops up to `count` elements from the first non-empty list in `keys`.
1501    ///
1502    /// `left=true` pops from the head, `left=false` from the tail.
1503    /// Returns `None` if all lists are empty, or `Some((key, elements))`.
1504    pub async fn lmpop(
1505        &mut self,
1506        keys: &[&str],
1507        left: bool,
1508        count: usize,
1509    ) -> Result<Option<(String, Vec<Bytes>)>, ClientError> {
1510        let n = keys.len().to_string();
1511        let dir = if left { b"LEFT" as &[u8] } else { b"RIGHT" };
1512        let cnt = count.to_string();
1513        let mut parts = Vec::with_capacity(4 + keys.len() + 2);
1514        parts.push(Frame::Bulk(Bytes::from_static(b"LMPOP")));
1515        parts.push(Frame::Bulk(Bytes::copy_from_slice(n.as_bytes())));
1516        for k in keys {
1517            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1518        }
1519        parts.push(Frame::Bulk(Bytes::copy_from_slice(dir)));
1520        if count > 0 {
1521            parts.push(Frame::Bulk(Bytes::from_static(b"COUNT")));
1522            parts.push(Frame::Bulk(Bytes::copy_from_slice(cnt.as_bytes())));
1523        }
1524        let frame = self.send_frame(Frame::Array(parts)).await?;
1525        match frame {
1526            Frame::Null => Ok(None),
1527            Frame::Array(mut elems) if elems.len() == 2 => {
1528                let key = match elems.remove(0) {
1529                    Frame::Bulk(b) => String::from_utf8(b.to_vec())
1530                        .map_err(|_| ClientError::Protocol("key is not valid UTF-8".into()))?,
1531                    other => {
1532                        return Err(ClientError::Protocol(format!(
1533                            "expected bulk key in LMPOP response, got {other:?}"
1534                        )))
1535                    }
1536                };
1537                let elements = bytes_vec(elems.remove(0))?;
1538                Ok(Some((key, elements)))
1539            }
1540            Frame::Error(e) => Err(ClientError::Server(e)),
1541            other => Err(ClientError::Protocol(format!(
1542                "unexpected LMPOP response: {other:?}"
1543            ))),
1544        }
1545    }
1546
1547    // --- hash commands (extended) ---
1548
1549    /// Returns random field(s) from the hash at `key`.
1550    ///
1551    /// `count=None` returns a single field name as a one-element Vec.
1552    /// Positive count returns that many distinct fields; negative allows repeats.
1553    pub async fn hrandfield(
1554        &mut self,
1555        key: &str,
1556        count: Option<i64>,
1557    ) -> Result<Vec<Bytes>, ClientError> {
1558        let frame = if let Some(n) = count {
1559            let s = n.to_string();
1560            self.send_frame(cmd3(b"HRANDFIELD", key.as_bytes(), s.as_bytes()))
1561                .await?
1562        } else {
1563            self.send_frame(cmd2(b"HRANDFIELD", key.as_bytes())).await?
1564        };
1565        match frame {
1566            Frame::Bulk(b) => Ok(vec![b]),
1567            Frame::Null => Ok(vec![]),
1568            Frame::Array(_) => bytes_vec(frame),
1569            Frame::Error(e) => Err(ClientError::Server(e)),
1570            other => Err(ClientError::Protocol(format!(
1571                "unexpected HRANDFIELD response: {other:?}"
1572            ))),
1573        }
1574    }
1575
1576    /// Returns random field-value pairs from the hash at `key`.
1577    ///
1578    /// `count` controls how many pairs to return (positive = distinct, negative = allow repeats).
1579    pub async fn hrandfield_withvalues(
1580        &mut self,
1581        key: &str,
1582        count: i64,
1583    ) -> Result<Vec<(Bytes, Bytes)>, ClientError> {
1584        let s = count.to_string();
1585        let frame = self
1586            .send_frame(Frame::Array(vec![
1587                Frame::Bulk(Bytes::from_static(b"HRANDFIELD")),
1588                Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())),
1589                Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())),
1590                Frame::Bulk(Bytes::from_static(b"WITHVALUES")),
1591            ]))
1592            .await?;
1593        pairs(frame)
1594    }
1595
1596    // --- sorted set commands (extended) ---
1597
1598    /// Pops up to `count` elements from the first non-empty sorted set in `keys`.
1599    ///
1600    /// `min=true` pops minimum-score members, `min=false` pops maximum-score members.
1601    /// Returns `None` if all sorted sets are empty, or `Some((key, members))`.
1602    pub async fn zmpop(
1603        &mut self,
1604        keys: &[&str],
1605        min: bool,
1606        count: usize,
1607    ) -> Result<Option<(String, Vec<(Bytes, f64)>)>, ClientError> {
1608        let n = keys.len().to_string();
1609        let dir = if min { b"MIN" as &[u8] } else { b"MAX" };
1610        let cnt = count.to_string();
1611        let mut parts = Vec::with_capacity(4 + keys.len() + 2);
1612        parts.push(Frame::Bulk(Bytes::from_static(b"ZMPOP")));
1613        parts.push(Frame::Bulk(Bytes::copy_from_slice(n.as_bytes())));
1614        for k in keys {
1615            parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1616        }
1617        parts.push(Frame::Bulk(Bytes::copy_from_slice(dir)));
1618        if count > 0 {
1619            parts.push(Frame::Bulk(Bytes::from_static(b"COUNT")));
1620            parts.push(Frame::Bulk(Bytes::copy_from_slice(cnt.as_bytes())));
1621        }
1622        let frame = self.send_frame(Frame::Array(parts)).await?;
1623        match frame {
1624            Frame::Null => Ok(None),
1625            Frame::Array(mut elems) if elems.len() == 2 => {
1626                let key = match elems.remove(0) {
1627                    Frame::Bulk(b) => String::from_utf8(b.to_vec())
1628                        .map_err(|_| ClientError::Protocol("key is not valid UTF-8".into()))?,
1629                    other => {
1630                        return Err(ClientError::Protocol(format!(
1631                            "expected bulk key in ZMPOP response, got {other:?}"
1632                        )))
1633                    }
1634                };
1635                let members = scored_members(elems.remove(0))?;
1636                Ok(Some((key, members)))
1637            }
1638            Frame::Error(e) => Err(ClientError::Server(e)),
1639            other => Err(ClientError::Protocol(format!(
1640                "unexpected ZMPOP response: {other:?}"
1641            ))),
1642        }
1643    }
1644
1645    /// Returns random member(s) from the sorted set at `key`.
1646    ///
1647    /// `count=None` returns a single member name. Positive count returns distinct members;
1648    /// negative allows repeats.
1649    pub async fn zrandmember(
1650        &mut self,
1651        key: &str,
1652        count: Option<i64>,
1653    ) -> Result<Vec<Bytes>, ClientError> {
1654        let frame = if let Some(n) = count {
1655            let s = n.to_string();
1656            self.send_frame(cmd3(b"ZRANDMEMBER", key.as_bytes(), s.as_bytes()))
1657                .await?
1658        } else {
1659            self.send_frame(cmd2(b"ZRANDMEMBER", key.as_bytes()))
1660                .await?
1661        };
1662        match frame {
1663            Frame::Bulk(b) => Ok(vec![b]),
1664            Frame::Null => Ok(vec![]),
1665            Frame::Array(_) => bytes_vec(frame),
1666            Frame::Error(e) => Err(ClientError::Server(e)),
1667            other => Err(ClientError::Protocol(format!(
1668                "unexpected ZRANDMEMBER response: {other:?}"
1669            ))),
1670        }
1671    }
1672
1673    /// Returns random member-score pairs from the sorted set at `key`.
1674    ///
1675    /// `count` controls how many pairs to return (positive = distinct, negative = allow repeats).
1676    pub async fn zrandmember_withscores(
1677        &mut self,
1678        key: &str,
1679        count: i64,
1680    ) -> Result<Vec<(Bytes, f64)>, ClientError> {
1681        let s = count.to_string();
1682        let frame = self
1683            .send_frame(Frame::Array(vec![
1684                Frame::Bulk(Bytes::from_static(b"ZRANDMEMBER")),
1685                Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())),
1686                Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())),
1687                Frame::Bulk(Bytes::from_static(b"WITHSCORES")),
1688            ]))
1689            .await?;
1690        scored_members(frame)
1691    }
1692
1693    // --- pipeline ---
1694
1695    /// Executes all commands queued in `pipeline` as a single batch.
1696    ///
1697    /// Returns one raw [`Frame`] per command in the same order they were
1698    /// queued. An empty pipeline returns an empty `Vec` without touching the
1699    /// network.
1700    ///
1701    /// Use this when you need full control over decoding the responses (e.g.
1702    /// mixed command types in one batch). For homogeneous batches the typed
1703    /// builder methods on [`Pipeline`] pair well with a manual decode loop.
1704    pub async fn execute_pipeline(
1705        &mut self,
1706        pipeline: Pipeline,
1707    ) -> Result<Vec<Frame>, ClientError> {
1708        if pipeline.is_empty() {
1709            return Ok(Vec::new());
1710        }
1711        self.send_batch(&pipeline.cmds).await
1712    }
1713}
1714
1715// --- shared frame construction helpers used by multiple command groups ---
1716
1717fn cmd_key_and_keys(cmd: &'static [u8], key: &str, rest: &[&str]) -> Frame {
1718    let mut parts = Vec::with_capacity(2 + rest.len());
1719    parts.push(Frame::Bulk(Bytes::from_static(cmd)));
1720    parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
1721    for s in rest {
1722        parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
1723    }
1724    Frame::Array(parts)
1725}
1726
1727fn cmd_key_values<V: AsRef<[u8]>>(cmd: &'static [u8], key: &str, values: &[V]) -> Frame {
1728    let mut parts = Vec::with_capacity(2 + values.len());
1729    parts.push(Frame::Bulk(Bytes::from_static(cmd)));
1730    parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
1731    for v in values {
1732        parts.push(Frame::Bulk(Bytes::copy_from_slice(v.as_ref())));
1733    }
1734    Frame::Array(parts)
1735}
1736
1737/// `CMD key1 key2 ...` (no separate leading key argument)
1738fn cmd_keys_only(cmd: &'static [u8], keys: &[&str]) -> Frame {
1739    let mut parts = Vec::with_capacity(1 + keys.len());
1740    parts.push(Frame::Bulk(Bytes::from_static(cmd)));
1741    for k in keys {
1742        parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
1743    }
1744    Frame::Array(parts)
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749    use super::*;
1750
1751    fn bulk(b: &[u8]) -> Frame {
1752        Frame::Bulk(Bytes::copy_from_slice(b))
1753    }
1754
1755    // --- optional_bytes ---
1756
1757    #[test]
1758    fn optional_bytes_bulk() {
1759        let f = Frame::Bulk(Bytes::from_static(b"hello"));
1760        assert_eq!(
1761            optional_bytes(f).unwrap(),
1762            Some(Bytes::from_static(b"hello"))
1763        );
1764    }
1765
1766    #[test]
1767    fn optional_bytes_null() {
1768        assert_eq!(optional_bytes(Frame::Null).unwrap(), None);
1769    }
1770
1771    #[test]
1772    fn optional_bytes_server_error() {
1773        let f = Frame::Error("WRONGTYPE".into());
1774        assert!(matches!(optional_bytes(f), Err(ClientError::Server(_))));
1775    }
1776
1777    #[test]
1778    fn optional_bytes_unexpected() {
1779        let f = Frame::Integer(1);
1780        assert!(matches!(optional_bytes(f), Err(ClientError::Protocol(_))));
1781    }
1782
1783    // --- bool_flag ---
1784
1785    #[test]
1786    fn bool_flag_one() {
1787        assert!(bool_flag(Frame::Integer(1)).unwrap());
1788    }
1789
1790    #[test]
1791    fn bool_flag_zero() {
1792        assert!(!bool_flag(Frame::Integer(0)).unwrap());
1793    }
1794
1795    #[test]
1796    fn bool_flag_unexpected_value() {
1797        let f = Frame::Integer(42);
1798        assert!(matches!(bool_flag(f), Err(ClientError::Protocol(_))));
1799    }
1800
1801    // --- pairs ---
1802
1803    #[test]
1804    fn pairs_even_array() {
1805        let f = Frame::Array(vec![
1806            bulk(b"field1"),
1807            bulk(b"val1"),
1808            bulk(b"field2"),
1809            bulk(b"val2"),
1810        ]);
1811        let result = pairs(f).unwrap();
1812        assert_eq!(result.len(), 2);
1813        assert_eq!(result[0].0, Bytes::from_static(b"field1"));
1814        assert_eq!(result[0].1, Bytes::from_static(b"val1"));
1815    }
1816
1817    #[test]
1818    fn pairs_odd_array_is_error() {
1819        let f = Frame::Array(vec![bulk(b"orphan")]);
1820        assert!(matches!(pairs(f), Err(ClientError::Protocol(_))));
1821    }
1822
1823    #[test]
1824    fn pairs_empty_array() {
1825        assert_eq!(pairs(Frame::Array(vec![])).unwrap(), vec![]);
1826    }
1827
1828    #[test]
1829    fn pairs_null() {
1830        assert_eq!(pairs(Frame::Null).unwrap(), vec![]);
1831    }
1832
1833    // --- optional_score ---
1834
1835    #[test]
1836    fn optional_score_valid_float() {
1837        let f = Frame::Bulk(Bytes::from_static(b"3.14159265358979"));
1838        let s = optional_score(f).unwrap();
1839        assert!((s.unwrap() - std::f64::consts::PI).abs() < 1e-10);
1840    }
1841
1842    #[test]
1843    fn optional_score_null() {
1844        assert_eq!(optional_score(Frame::Null).unwrap(), None);
1845    }
1846
1847    #[test]
1848    fn optional_score_malformed() {
1849        let f = Frame::Bulk(Bytes::from_static(b"notanumber"));
1850        assert!(matches!(optional_score(f), Err(ClientError::Protocol(_))));
1851    }
1852
1853    // --- optional_bytes_vec (MGET) ---
1854
1855    #[test]
1856    fn optional_bytes_vec_null_elements() {
1857        let f = Frame::Array(vec![
1858            Frame::Bulk(Bytes::from_static(b"value")),
1859            Frame::Null,
1860            Frame::Bulk(Bytes::from_static(b"another")),
1861        ]);
1862        let result = optional_bytes_vec(f).unwrap();
1863        assert_eq!(result.len(), 3);
1864        assert!(result[0].is_some());
1865        assert!(result[1].is_none());
1866        assert!(result[2].is_some());
1867    }
1868}