Skip to main content

kevy_client/
collections.rs

1//! Connection methods for the four collection-typed Redis data types:
2//! hash, list, set, sorted set. Plus the small `LPUSH`/`SADD`-style
3//! request builders shared between them.
4//!
5//! Lives in its own module so `lib.rs` stays focused on the `Connection`
6//! enum + open + the generic + string ops. Behaviour and API are
7//! unchanged from the single-file layout in v1.2.0 / v1.3.0.
8
9use std::io;
10
11use kevy_resp::Reply;
12use kevy_resp_client::RespClient;
13
14use crate::{Connection, array_to_bulks, store_err, string, unexpected, vec2, vec3};
15
16impl Connection {
17    // ===== Hash =====
18
19    /// `HSET key field value [field value ...]`. Returns the number of
20    /// fields that were newly added (not overwrites).
21    pub fn hset(&mut self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
22        match self {
23            Self::Embedded(s) => s.hset(key, pairs),
24            Self::Remote(c) => {
25                let mut args = Vec::with_capacity(2 + pairs.len() * 2);
26                args.push(b"HSET".to_vec());
27                args.push(key.to_vec());
28                for (f, v) in pairs {
29                    args.push(f.to_vec());
30                    args.push(v.to_vec());
31                }
32                match c.request(&args)? {
33                    Reply::Int(n) if n >= 0 => Ok(n as usize),
34                    Reply::Error(e) => Err(io::Error::other(string(e))),
35                    other => Err(unexpected(other)),
36                }
37            }
38        }
39    }
40
41    /// `HGET key field`. `None` when the key or field is absent.
42    pub fn hget(&mut self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
43        match self {
44            Self::Embedded(s) => s.hget(key, field),
45            Self::Remote(c) => match c.request(&vec3(b"HGET", key, field))? {
46                Reply::Bulk(v) => Ok(Some(v)),
47                Reply::Nil => Ok(None),
48                Reply::Error(e) => Err(io::Error::other(string(e))),
49                other => Err(unexpected(other)),
50            },
51        }
52    }
53
54    /// `HDEL key field [field ...]`. Returns the number of fields actually
55    /// removed.
56    pub fn hdel(&mut self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
57        match self {
58            Self::Embedded(s) => s.hdel(key, fields),
59            Self::Remote(c) => {
60                let mut args = Vec::with_capacity(fields.len() + 2);
61                args.push(b"HDEL".to_vec());
62                args.push(key.to_vec());
63                args.extend(fields.iter().map(|f| f.to_vec()));
64                match c.request(&args)? {
65                    Reply::Int(n) if n >= 0 => Ok(n as usize),
66                    Reply::Error(e) => Err(io::Error::other(string(e))),
67                    other => Err(unexpected(other)),
68                }
69            }
70        }
71    }
72
73    /// `HLEN key`. Number of fields in the hash (0 if absent).
74    pub fn hlen(&mut self, key: &[u8]) -> io::Result<usize> {
75        match self {
76            Self::Embedded(s) => s.with(|inner| inner.hlen(key)).map_err(store_err),
77            Self::Remote(c) => match c.request(&vec2(b"HLEN", key))? {
78                Reply::Int(n) if n >= 0 => Ok(n as usize),
79                Reply::Error(e) => Err(io::Error::other(string(e))),
80                other => Err(unexpected(other)),
81            },
82        }
83    }
84
85    /// `HGETALL key`. Returns a flat `[f0, v0, f1, v1, ...]` matching the
86    /// Redis wire shape; empty when the key is absent.
87    pub fn hgetall(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
88        match self {
89            Self::Embedded(s) => s.with(|inner| inner.hgetall(key)).map_err(store_err),
90            Self::Remote(c) => match c.request(&vec2(b"HGETALL", key))? {
91                Reply::Array(items) => array_to_bulks(items),
92                Reply::Error(e) => Err(io::Error::other(string(e))),
93                other => Err(unexpected(other)),
94            },
95        }
96    }
97
98    /// `HKEYS key`. Returns the hash's field names (empty if absent).
99    pub fn hkeys(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
100        match self {
101            Self::Embedded(s) => s.with(|inner| inner.hkeys(key)).map_err(store_err),
102            Self::Remote(c) => match c.request(&vec2(b"HKEYS", key))? {
103                Reply::Array(items) => array_to_bulks(items),
104                Reply::Error(e) => Err(io::Error::other(string(e))),
105                other => Err(unexpected(other)),
106            },
107        }
108    }
109
110    /// `HVALS key`. Returns the hash's values (empty if absent).
111    pub fn hvals(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
112        match self {
113            Self::Embedded(s) => s.with(|inner| inner.hvals(key)).map_err(store_err),
114            Self::Remote(c) => match c.request(&vec2(b"HVALS", key))? {
115                Reply::Array(items) => array_to_bulks(items),
116                Reply::Error(e) => Err(io::Error::other(string(e))),
117                other => Err(unexpected(other)),
118            },
119        }
120    }
121
122    // ===== List =====
123
124    /// `LPUSH key value [value ...]`. Returns the new list length.
125    pub fn lpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
126        match self {
127            Self::Embedded(s) => s.lpush(key, values),
128            Self::Remote(c) => list_push(c, b"LPUSH", key, values),
129        }
130    }
131
132    /// `RPUSH key value [value ...]`. Returns the new list length.
133    pub fn rpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
134        match self {
135            Self::Embedded(s) => s.rpush(key, values),
136            Self::Remote(c) => list_push(c, b"RPUSH", key, values),
137        }
138    }
139
140    /// `LPOP key count`. Returns up to `count` values from the head; empty
141    /// when the key is absent or already drained.
142    pub fn lpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
143        match self {
144            Self::Embedded(s) => s.lpop(key, count),
145            Self::Remote(c) => list_pop(c, b"LPOP", key, count),
146        }
147    }
148
149    /// `RPOP key count`. Symmetric to [`Self::lpop`] from the tail.
150    pub fn rpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
151        match self {
152            Self::Embedded(s) => s.rpop(key, count),
153            Self::Remote(c) => list_pop(c, b"RPOP", key, count),
154        }
155    }
156
157    /// `LLEN key`. 0 when the key is absent.
158    pub fn llen(&mut self, key: &[u8]) -> io::Result<usize> {
159        match self {
160            Self::Embedded(s) => s.llen(key),
161            Self::Remote(c) => match c.request(&vec2(b"LLEN", key))? {
162                Reply::Int(n) if n >= 0 => Ok(n as usize),
163                Reply::Error(e) => Err(io::Error::other(string(e))),
164                other => Err(unexpected(other)),
165            },
166        }
167    }
168
169    /// `LRANGE key start stop`. Redis-style indexing — negative offsets
170    /// count from the tail (`-1` = last element).
171    pub fn lrange(&mut self, key: &[u8], start: i64, stop: i64) -> io::Result<Vec<Vec<u8>>> {
172        match self {
173            Self::Embedded(s) => s
174                .with(|inner| inner.lrange(key, start, stop))
175                .map_err(store_err),
176            Self::Remote(c) => {
177                let args = vec![
178                    b"LRANGE".to_vec(),
179                    key.to_vec(),
180                    start.to_string().into_bytes(),
181                    stop.to_string().into_bytes(),
182                ];
183                match c.request(&args)? {
184                    Reply::Array(items) => array_to_bulks(items),
185                    Reply::Error(e) => Err(io::Error::other(string(e))),
186                    other => Err(unexpected(other)),
187                }
188            }
189        }
190    }
191
192    // ===== Set =====
193
194    /// `SADD key member [member ...]`. Returns count of newly added members.
195    pub fn sadd(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
196        match self {
197            Self::Embedded(s) => s.sadd(key, members),
198            Self::Remote(c) => set_multi(c, b"SADD", key, members),
199        }
200    }
201
202    /// `SREM key member [member ...]`. Returns count of removed members.
203    pub fn srem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
204        match self {
205            Self::Embedded(s) => s.srem(key, members),
206            Self::Remote(c) => set_multi(c, b"SREM", key, members),
207        }
208    }
209
210    /// `SMEMBERS key`. Order is implementation-defined; empty if absent.
211    pub fn smembers(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
212        match self {
213            Self::Embedded(s) => s.smembers(key),
214            Self::Remote(c) => match c.request(&vec2(b"SMEMBERS", key))? {
215                Reply::Array(items) => array_to_bulks(items),
216                Reply::Error(e) => Err(io::Error::other(string(e))),
217                other => Err(unexpected(other)),
218            },
219        }
220    }
221
222    /// `SCARD key`. 0 when the key is absent.
223    pub fn scard(&mut self, key: &[u8]) -> io::Result<usize> {
224        match self {
225            Self::Embedded(s) => s.scard(key),
226            Self::Remote(c) => match c.request(&vec2(b"SCARD", key))? {
227                Reply::Int(n) if n >= 0 => Ok(n as usize),
228                Reply::Error(e) => Err(io::Error::other(string(e))),
229                other => Err(unexpected(other)),
230            },
231        }
232    }
233
234    /// `SISMEMBER key member`. `false` when key or member absent.
235    pub fn sismember(&mut self, key: &[u8], member: &[u8]) -> io::Result<bool> {
236        match self {
237            Self::Embedded(s) => s
238                .with(|inner| inner.sismember(key, member))
239                .map_err(store_err),
240            Self::Remote(c) => match c.request(&vec3(b"SISMEMBER", key, member))? {
241                Reply::Int(1) => Ok(true),
242                Reply::Int(0) => Ok(false),
243                Reply::Error(e) => Err(io::Error::other(string(e))),
244                other => Err(unexpected(other)),
245            },
246        }
247    }
248
249    /// `SINTER key [key ...]` — intersection of all sets.
250    pub fn sinter(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
251        match self {
252            Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Inter),
253            Self::Remote(c) => remote_set_combine(c, b"SINTER", keys),
254        }
255    }
256
257    /// `SUNION key [key ...]` — union of all sets.
258    pub fn sunion(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
259        match self {
260            Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Union),
261            Self::Remote(c) => remote_set_combine(c, b"SUNION", keys),
262        }
263    }
264
265    /// `SDIFF key [key ...]` — members of the first set absent from the rest.
266    pub fn sdiff(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Vec<u8>>> {
267        match self {
268            Self::Embedded(s) => embed_set_combine(s, keys, SetOp::Diff),
269            Self::Remote(c) => remote_set_combine(c, b"SDIFF", keys),
270        }
271    }
272
273    // ===== Sorted set =====
274
275    /// `ZADD key score member [score member ...]`. Returns count of newly
276    /// added members (overwrites don't count).
277    pub fn zadd(&mut self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
278        match self {
279            Self::Embedded(s) => s.zadd(key, pairs),
280            Self::Remote(c) => {
281                let mut args = Vec::with_capacity(2 + pairs.len() * 2);
282                args.push(b"ZADD".to_vec());
283                args.push(key.to_vec());
284                for (score, m) in pairs {
285                    args.push(score.to_string().into_bytes());
286                    args.push(m.to_vec());
287                }
288                match c.request(&args)? {
289                    Reply::Int(n) if n >= 0 => Ok(n as usize),
290                    Reply::Error(e) => Err(io::Error::other(string(e))),
291                    other => Err(unexpected(other)),
292                }
293            }
294        }
295    }
296
297    /// `ZREM key member [member ...]`. Returns count of removed members.
298    pub fn zrem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
299        match self {
300            Self::Embedded(s) => s.zrem(key, members),
301            Self::Remote(c) => set_multi(c, b"ZREM", key, members),
302        }
303    }
304
305    /// `ZSCORE key member`. `None` if absent.
306    pub fn zscore(&mut self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
307        match self {
308            Self::Embedded(s) => s.zscore(key, member),
309            Self::Remote(c) => match c.request(&vec3(b"ZSCORE", key, member))? {
310                Reply::Bulk(v) => {
311                    let s = std::str::from_utf8(&v)
312                        .map_err(|_| io::Error::other("non-utf8 score reply"))?;
313                    let n: f64 = s
314                        .parse()
315                        .map_err(|_| io::Error::other(format!("bad score float: {s}")))?;
316                    Ok(Some(n))
317                }
318                Reply::Nil => Ok(None),
319                Reply::Error(e) => Err(io::Error::other(string(e))),
320                other => Err(unexpected(other)),
321            },
322        }
323    }
324
325    /// `ZCARD key`. Number of members; 0 if absent.
326    pub fn zcard(&mut self, key: &[u8]) -> io::Result<usize> {
327        match self {
328            Self::Embedded(s) => s.zcard(key),
329            Self::Remote(c) => match c.request(&vec2(b"ZCARD", key))? {
330                Reply::Int(n) if n >= 0 => Ok(n as usize),
331                Reply::Error(e) => Err(io::Error::other(string(e))),
332                other => Err(unexpected(other)),
333            },
334        }
335    }
336
337    /// `ZRANGE key start stop`. Ascending-score order; negative indices
338    /// count from the tail.
339    pub fn zrange(&mut self, key: &[u8], start: i64, stop: i64) -> io::Result<Vec<Vec<u8>>> {
340        match self {
341            Self::Embedded(s) => s
342                .with(|inner| inner.zrange(key, start, stop))
343                .map(|pairs| pairs.into_iter().map(|(m, _score)| m).collect())
344                .map_err(store_err),
345            Self::Remote(c) => {
346                let args = vec![
347                    b"ZRANGE".to_vec(),
348                    key.to_vec(),
349                    start.to_string().into_bytes(),
350                    stop.to_string().into_bytes(),
351                ];
352                match c.request(&args)? {
353                    Reply::Array(items) => array_to_bulks(items),
354                    Reply::Error(e) => Err(io::Error::other(string(e))),
355                    other => Err(unexpected(other)),
356                }
357            }
358        }
359    }
360}
361
362// ─────────────────────────────────────────────────────────────────────────
363// Shared request builders. Both backends accept a slice of byte-slices,
364// but the RESP path needs to splat them into a single argv vector.
365// ─────────────────────────────────────────────────────────────────────────
366
367fn list_push(
368    c: &mut RespClient,
369    verb: &[u8],
370    key: &[u8],
371    values: &[&[u8]],
372) -> io::Result<usize> {
373    let mut args = Vec::with_capacity(values.len() + 2);
374    args.push(verb.to_vec());
375    args.push(key.to_vec());
376    args.extend(values.iter().map(|v| v.to_vec()));
377    match c.request(&args)? {
378        Reply::Int(n) if n >= 0 => Ok(n as usize),
379        Reply::Error(e) => Err(io::Error::other(string(e))),
380        other => Err(unexpected(other)),
381    }
382}
383
384fn list_pop(
385    c: &mut RespClient,
386    verb: &[u8],
387    key: &[u8],
388    count: usize,
389) -> io::Result<Vec<Vec<u8>>> {
390    let args = vec![verb.to_vec(), key.to_vec(), count.to_string().into_bytes()];
391    match c.request(&args)? {
392        Reply::Array(items) => array_to_bulks(items),
393        Reply::Bulk(v) => Ok(vec![v]),
394        Reply::Nil => Ok(Vec::new()),
395        Reply::Error(e) => Err(io::Error::other(string(e))),
396        other => Err(unexpected(other)),
397    }
398}
399
400fn set_multi(
401    c: &mut RespClient,
402    verb: &[u8],
403    key: &[u8],
404    members: &[&[u8]],
405) -> io::Result<usize> {
406    let mut args = Vec::with_capacity(members.len() + 2);
407    args.push(verb.to_vec());
408    args.push(key.to_vec());
409    args.extend(members.iter().map(|m| m.to_vec()));
410    match c.request(&args)? {
411        Reply::Int(n) if n >= 0 => Ok(n as usize),
412        Reply::Error(e) => Err(io::Error::other(string(e))),
413        other => Err(unexpected(other)),
414    }
415}
416
417// Set-combine plumbing: each backend's path computes the intersection /
418// union / difference of N sets identified by `keys`.
419
420#[derive(Clone, Copy)]
421enum SetOp {
422    Inter,
423    Union,
424    Diff,
425}
426
427fn embed_set_combine(
428    s: &kevy_embedded::Store,
429    keys: &[&[u8]],
430    op: SetOp,
431) -> io::Result<Vec<Vec<u8>>> {
432    use std::collections::HashSet;
433    if keys.is_empty() {
434        return Ok(Vec::new());
435    }
436    let snapshots: Vec<Vec<Vec<u8>>> = keys
437        .iter()
438        .map(|k| s.smembers(k))
439        .collect::<io::Result<_>>()?;
440    let mut iter = snapshots.into_iter();
441    let mut acc: HashSet<Vec<u8>> = iter.next().unwrap_or_default().into_iter().collect();
442    for rest in iter {
443        let other: HashSet<Vec<u8>> = rest.into_iter().collect();
444        acc = match op {
445            SetOp::Inter => acc.intersection(&other).cloned().collect(),
446            SetOp::Union => acc.union(&other).cloned().collect(),
447            SetOp::Diff => acc.difference(&other).cloned().collect(),
448        };
449    }
450    Ok(acc.into_iter().collect())
451}
452
453fn remote_set_combine(
454    c: &mut RespClient,
455    verb: &[u8],
456    keys: &[&[u8]],
457) -> io::Result<Vec<Vec<u8>>> {
458    let mut args = Vec::with_capacity(keys.len() + 1);
459    args.push(verb.to_vec());
460    args.extend(keys.iter().map(|k| k.to_vec()));
461    match c.request(&args)? {
462        Reply::Array(items) => array_to_bulks(items),
463        Reply::Error(e) => Err(io::Error::other(string(e))),
464        other => Err(unexpected(other)),
465    }
466}
467
468#[cfg(test)]
469#[path = "collections_tests.rs"]
470mod tests;